ThreadLocal详解

本文最后更新于:9 个月前

引言

在多线程编程中,确保线程安全是至关重要的。Java提供了多种机制来处理线程间的数据隔离,其中ThreadLocal是一个非常强大的工具,它可以为每个使用该变量的线程提供一个独立的变量副本。这意味着每个线程都可以在不影响其他线程的情况下,修改自己的副本。这种方式特别适用于管理会话信息、用户ID、事务ID等线程敏感的数据。

常用API

  • public ThreadLocal():创建ThreadLocal对象
  • public void set(T value):设置当前线程绑定的局部变量
  • public T get():获取当前线程绑定的局部变量
  • public void remove():移除当前线程绑定的局部变量

解决问题

问题复现

现有Demo01类,该类中有一个普通的String类型的变量content,创建一个实例对象,开启多个线程去调用该对象的setget方法,观察输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class Demo01 {

private String content;

private String getContent() {
return this.content;
}

private void setContent(String content) {
this.content = content;
}

private void remove() {
this.content = null;
}

public static void main(String[] args) {
Demo01 demo01 = new Demo01();
for (int i = 0; i < 10; i++) {
new Thread(() -> {
demo01.setContent(Thread.currentThread().getName() + "的数据");
try {
Thread.sleep(200);
} catch (Exception ignored) {
}
System.out.println(Thread.currentThread().getName() + " --> " + demo01.getContent());
demo01.remove();
}).start();
}
}

}

输出如下:

观察到有线程错误地使用了非本线程设置的content

解决方法一

通过synchronized锁定代码块来实现线程之间数据不相互影响

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public static void main(String[] args) {
Demo02 demo01 = new Demo02();
for (int i = 0; i < 10; i++) {
new Thread(() -> {
synchronized (Demo02.class) {
demo01.setContent(Thread.currentThread().getName() + "的数据");
try {
Thread.sleep(200);
} catch (Exception ignored) {
}
System.out.println(Thread.currentThread().getName() + " --> " + demo01.getContent());
demo01.remove();
}
}).start();
}
}

解决方法二

将变量替换为ThreadLocal <String>类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class Demo01 {

ThreadLocal<String> content = new ThreadLocal<>();

private String getContent() {
return content.get();
}

private void setContent(String content) {
this.content.set(content);
}

private void remove() {
this.content.remove();
}

public static void main(String[] args) {
Demo01 demo01 = new Demo01();
for (int i = 0; i < 10; i++) {
new Thread(() -> {
demo01.setContent(Thread.currentThread().getName() + "的数据");
System.out.println(Thread.currentThread().getName() + " --> " + demo01.getContent());
demo01.remove();
}).start();
}
}

}

以上两个方法运行多次,未出现线程之间相互影响的情况

总结

synchronized ThreadLocal
原理 以“时间”换“空间”,多个线程排队访问同一个变量 以“空间”换“时间”,每个线程访问各自的变量
侧重点 多个线程之间访问资源的同步 多个线程中每个线程的数据隔离

内部结构

ThreadLocal

ThreadLocal 的核心是 ThreadLocalMap,这是一个定制的哈希表,用于存储每个线程的局部变量。ThreadLocalMap 不是全局的,而是作为 Thread 类的一个字段存在,每个线程都有自己的 ThreadLocalMap 实例。

ThreadLocalMap

ThreadLocal.ThreadLocalMap 是一个定制的哈希表,其中包含了 ThreadLocalMap.Entry 类型的元素,具体结构包括:

  • Entry:这是 ThreadLocalMap 中的存储单元,每个 Entry 是一个 WeakReference(弱引用)对 ThreadLocal 对象的引用,并直接持有一个对值(用户存储的对象)的引用。这种设计允许在没有外部强引用时,ThreadLocal 对象本身可以被垃圾回收,以避免内存泄漏。
  • 键(Key)ThreadLocal 对象,作为弱引用存储。
  • 值(Value):与 ThreadLocal 相关联的用户数据。

核心方法

ThreadLocal

计算哈希值

1
2
3
4
5
6
7
8
9
private final int threadLocalHashCode = nextHashCode();
// 原子递增计数器,为了线程安全
private static AtomicInteger nextHashCode = new AtomicInteger();
// 黄金分割比的一个近似表达
private static final int HASH_INCREMENT = 0x61c88647;
// 当前值加上增量并更新,然后返回旧值
private static int nextHashCode() {
return nextHashCode.getAndAdd(HASH_INCREMENT);
}

get

获取线程的局部变量值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public T get() {
// 当前线程
Thread t = Thread.currentThread();
// 从线程中获取ThreadLocalMap
ThreadLocalMap map = getMap(t);
if (map != null) {
// 自身为key获取entry
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
T result = (T)e.value; // 获取entry的值
return result;
}
}
return setInitialValue(); // 值为空则设置默认值(null)并返回
}

set

设置线程的局部变量值

1
2
3
4
5
6
7
8
9
10
11
public void set(T value) {
Thread t = Thread.currentThread();
// 从线程中获取ThreadLocalMap
ThreadLocalMap map = getMap(t);
if (map != null)
// ThreadLocalMap不为空则设置值
map.set(this, value);
else
// ThreadLocalMap为空则创建并设置值
createMap(t, value);
}

remove

移除ThreadLocalMapentry

1
2
3
4
5
6
public void remove() {
ThreadLocalMap m = getMap(Thread.currentThread());
if (m != null)
// 将ThreadLocalMap中的entry清除
m.remove(this);
}

ThreadLocalMap

Entry类

1
2
3
4
5
6
7
8
9
10
// 继承弱引用
static class Entry extends WeakReference<ThreadLocal<?>> {
// ThreadLocalMap get set的值或对象
Object value;

Entry(ThreadLocal<?> k, Object v) {
super(k);
value = v;
}
}

构造方法

1
2
3
4
5
6
7
8
9
10
11
12
ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
// 初始化一个entry数组,大小为INITIAL_CAPACITY=16
table = new Entry[INITIAL_CAPACITY];
// &运算计算出数组下标
int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
// 设置key和value,其中key为ThreadLocal对象本身
table[i] = new Entry(firstKey, firstValue);
// 更新数组大小
size = 1;
// 设置阈值,保持负载为 2/3 size
setThreshold(INITIAL_CAPACITY);
}

下标计算方法ThreadLocal的哈希值对数组大小进行%运算,但这里为&运算。假设INITIAL_CAPACITY是一个 **2n**,如16(二进制表示为10000),这个数-1=15(二进制为1111),不难发现这种数的二进制的结果是一个全为1的数,这个数跟别的数进行&运算能轻易的算出值,并且这个值就是对 2n 进行模运算的值,这也是为什么ThreadLocalMapEntry[] tablesize为 **2n**。

nextIndex / prevIndex

获取下一个 / 前一个entry,处理数组的环绕行为

1
2
3
4
5
6
7
private static int nextIndex(int i, int len) {
return ((i + 1 < len) ? i + 1 : 0);
}

private static int prevIndex(int i, int len) {
return ((i - 1 >= 0) ? i - 1 : len - 1);
}

getEntry

获取entry,获取为空时会顺便清理无效(不被强引用)的entry

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private Entry getEntry(ThreadLocal<?> key) {
// 计算Entry数组下标
int i = key.threadLocalHashCode & (table.length - 1);
Entry e = table[i];
if (e != null && e.get() == key)
return e;
else
// 为空时调用的方法
return getEntryAfterMiss(key, i, e);
}

private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
Entry[] tab = table;
int len = tab.length;

while (e != null) {
ThreadLocal<?> k = e.get();
if (k == key)
// Entry对应的ThreadLocal为符合,则返回Entry
return e;
if (k == null)
// Entry对应的ThreadLocal为空,说明这个entry已经过时了,调用删除方法
expungeStaleEntry(i);
else
i = nextIndex(i, len);
e = tab[i];
}
return null;
}

expungeStaleEntry

清除无效(即已经不再被引用)的entry,并可能重新哈希其他entry以保持哈希表的整洁和效率

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private int expungeStaleEntry(int staleSlot) {
Entry[] tab = table;
int len = tab.length;

// 清除槽位中的value和entry,帮助垃圾收集器清理这些对象
tab[staleSlot].value = null;
tab[staleSlot] = null;
size--;

// 从下一索引开始遍历直至遇到一个空槽位
Entry e;
int i;
for (i = nextIndex(staleSlot, len);
(e = tab[i]) != null;
i = nextIndex(i, len)) {
ThreadLocal<?> k = e.get();
// 如果entry的key已被回收,则清除该entry
if (k == null) {
e.value = null;
tab[i] = null;
size--;
} else {
// 重新进行哈希获取理想的下标值
int h = k.threadLocalHashCode & (len - 1);
// 若当前下标不等于理想下标,则清除entry并将entry设置到理想下标处(槽位被占则往后顺位)
if (h != i) {
tab[i] = null;
while (tab[h] != null)
h = nextIndex(h, len);
tab[h] = e;
}
}
}
return i;
}

set

设置entry的值,可能会进行清理操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private void set(ThreadLocal<?> key, Object value) {

Entry[] tab = table;
int len = tab.length;
int i = key.threadLocalHashCode & (len-1);

// 从计算出的下标值开始循环直至空entry
for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) {
ThreadLocal<?> k = e.get();
// 找到了对应的ThreadLocal,则进行更新,方法结束
if (k == key) {
e.value = value;
return;
}
// 槽位对应的key为空,则进行替换
if (k == null) {
replaceStaleEntry(key, value, i);
return;
}
}
// 没有找到匹配的entry且没有空引用,则新建entry
tab[i] = new Entry(key, value);
int sz = ++size;
// 添加新entry后,重新执行清理操作,如果达到阈值则重新进行哈希
if (!cleanSomeSlots(i, sz) && sz >= threshold)
rehash();
}

replaceStaleEntry

处理已经变为无效entry,并尝试优化哈希表的结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
private void replaceStaleEntry(ThreadLocal<?> key, Object value, int staleSlot) {
Entry[] tab = table;
int len = tab.length;
Entry e;
// 开始清理的位置
int slotToExpunge = staleSlot;
// 向前遍历寻找,查找更早的无效entry,确定开始清理的位置,遇到空槽位则停下
for (int i = prevIndex(staleSlot, len);
(e = tab[i]) != null;
i = prevIndex(i, len))
if (e.get() == null)
slotToExpunge = i;
// 开始往后遍历
for (int i = nextIndex(staleSlot, len);
(e = tab[i]) != null;
i = nextIndex(i, len)) {
ThreadLocal<?> k = e.get();
// 找到相同key的entry,将staleSlot处的entry设置为该处的entry
if (k == key) {
e.value = value;
tab[i] = tab[staleSlot];
tab[staleSlot] = e;
// 如果需要清理的位置为staleSlot,则将其置为i,因为i是一个明确的被新entry占用的插槽处
if (slotToExpunge == staleSlot)
slotToExpunge = i;
cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
return;
}

// 清理位置更新
if (k == null && slotToExpunge == staleSlot)
slotToExpunge = i;
}

// 没有找到entry则设置一个新的entry到staleSlot处
tab[staleSlot].value = null;
tab[staleSlot] = new Entry(key, value);

// 已发现的有其它的清理位置,则从那开始清理
if (slotToExpunge != staleSlot)
cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
}

cleanSomeSlots

i+1处开始清理一些无效条目,范围减半清除(避免一次清理过多数据)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private boolean cleanSomeSlots(int i, int n) {
boolean removed = false;
Entry[] tab = table;
int len = tab.length;
// 减半清除无效entry
do {
i = nextIndex(i, len);
Entry e = tab[i];
if (e != null && e.get() == null) {
n = len;
// 清除了就返回true
removed = true;
i = expungeStaleEntry(i);
}
} while ( (n >>>= 1) != 0);
return removed;
}

rehash

重新进行哈希,遍历清除所有无效entry

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private void rehash() {
expungeStaleEntries();

// 达到阈值的3/4,进行扩容
if (size >= threshold - threshold / 4)
resize();
}

// 遍历清除所有无效entry
private void expungeStaleEntries() {
Entry[] tab = table;
int len = tab.length;
for (int j = 0; j < len; j++) {
Entry e = tab[j];
if (e != null && e.get() == null)
expungeStaleEntry(j);
}
}

resize

扩容条目数组table,期间也会对无效entry进行清除

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private void resize() {
Entry[] oldTab = table;
int oldLen = oldTab.length;
// 新entry数组大小为原来的两倍
int newLen = oldLen * 2;
Entry[] newTab = new Entry[newLen];
int count = 0;

for (int j = 0; j < oldLen; ++j) {
Entry e = oldTab[j];
if (e != null) {
ThreadLocal<?> k = e.get();
if (k == null) {
e.value = null; // Help the GC
} else {
// 重新计算哈希值并设置到对应插槽处
int h = k.threadLocalHashCode & (newLen - 1);
while (newTab[h] != null)
h = nextIndex(h, newLen);
newTab[h] = e;
count++;
}
}
}

// 阈值、大小、条目数组更新
setThreshold(newLen);
size = count;
table = newTab;
}

实际应用场景

在数据库连接管理中,每个线程可能需要独立的数据库连接,或者至少需要确保自己的数据库操作不会受到其他线程的干扰。使用 ThreadLocal 来存储每个线程的数据库连接可以确保,如果能结合连接池,则会是一个更加稳妥且高效的方案,主要体现在以下方面:

  • 线程安全:通过 ThreadLocal 确保每个线程拥有独立的数据库连接。这样,每个线程的数据库操作都是隔离的,避免了多线程间的干扰和数据一致性问题。
  • 连接复用:使用连接池可以有效管理多个数据库连接,提供连接复用的功能。每个线程通过 ThreadLocal 获取连接时,实际上是从连接池中获取,用完后再归还到连接池。这不仅减少了频繁创建和销毁连接的开销,而且增加了连接的利用率。
  • 性能提升:连接池可以预先创建一定数量的数据库连接,并且根据需求动态调整,这可以大大减少因为连接创建所需时间而造成的延迟。同时,由于每个线程都能及时获取到数据库连接,也减少了等待时间,从而提高了整体应用的响应速度和吞吐量。
  • 资源管理:连接池配合 ThreadLocal 使用,还可以更好地进行连接的生命周期管理。例如,在一个应用请求的开始创建连接,在结束时关闭连接。这样的管理确保了连接资源的有效分配,并防止了内存泄漏等问题。

代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class DatabaseConnectionManager {
private static ThreadLocal<Connection> connectionThreadLocal = new ThreadLocal<>();
private static DataSource dataSource = setupDataSource();

private static DataSource setupDataSource() {
// 设置连接池
HikariConfig config = new HikariConfig();
config.setJdbcUrl("jdbc:mysql://url");
config.setUsername("user");
config.setPassword("password");
return new HikariDataSource(config);
}

public static Connection getConnection() throws SQLException {
Connection conn = connectionThreadLocal.get();
if (conn == null || conn.isClosed()) {
// 从连接池获取连接
conn = dataSource.getConnection();
connectionThreadLocal.set(conn);
}
return conn;
}

public static void releaseConnection() throws SQLException {
Connection conn = connectionThreadLocal.get();
if (conn != null) {
// 关闭连接,将连接归还到连接池
conn.close();
// 重要:确保清除ThreadLocal存储,避免内存泄漏
connectionThreadLocal.remove();
}
}
}

线性探测设计

nextIndex 方法用于计算环绕数组的下一个索引位置。由于 ThreadLocalMap 的底层是一个循环数组,当到达数组的末尾时,需要从数组的开始处继续查找空槽位或处理冲突。这种循环处理是典型的哈希表的开放地址法中的线性探测策略的一部分。源码如下:

1
2
3
private static int nextIndex(int i, int len) {
return ((i + 1 < len) ? i + 1 : 0);
}

使用场景

  • 插入操作:在向 ThreadLocalMap 添加新的 ThreadLocal 时,如果计算得出的槽位已经被占用,nextIndex 用于找到下一个空闲槽位。
  • 删除操作:在从 ThreadLocalMap 删除条目时,nextIndex 用于帮助重新定位和填补可能留下的空洞,确保哈希表的连续性和减少碰撞。
  • 清理操作:在执行如 expungeStaleEntrycleanSomeSlots 等清理无效条目的操作时,nextIndex 被用来逐个检查条目,直到整个数组被适当地处理。

通过这种方式,nextIndex 方法支持 ThreadLocalMap 的有效运行,确保其性能和稳定性,特别是在处理数组环绕行为时。这是哈希表实现中处理开放寻址冲突解决策略的一个关键工具

弱引用设计

ThreadLocal 设计为弱引用主要是为了确保 ThreadLocal 对象的生命周期不会因为它被 ThreadLocalMap 引用而不合理地延长,从而避免潜在的内存泄漏问题。具体作用可以从以下几个方面来理解:

防止内存泄漏

如果 ThreadLocal 对象被设计为强引用,那么只要线程仍然活着,ThreadLocal 对象以及它所持有的任何资源都不会被垃圾回收,即使这些 ThreadLocal 对象已经不再需要了。这在长时间运行的线程或线程池中尤其成问题,因为这些线程可能不会很快结束,导致长时间占用不必要的内存。

解绑生命周期

通过使用弱引用,ThreadLocal 对象的生命周期就不再依赖于其存储在 ThreadLocalMap 中的状态。也就是说,一旦外部没有其他强引用指向 ThreadLocal 对象,这个对象就可以被垃圾收集器回收,无论其对应的线程是否仍然存在。这样做有效地解决了 ThreadLocal 可能导致的内存泄漏问题,因为 ThreadLocal 本身不会阻止其被回收。

自动清理

ThreadLocalMap 的实现中,每次访问 ThreadLocalMap(例如通过 get()set()remove() 方法)时,都有一个清理过程,用于检查并清除那些已经被垃圾回收的 ThreadLocal 对象的条目。这种自动清理机制有助于保持内存的健康状态,尤其是在长时间运行的线程中。所以代码编写者在使用ThreadLocal时一定要手动调用remove()方法,清理Entry数组中没有被强引用的ThreadLocal

强引用设计

如果 ThreadLocal 以强引用形式存在,那么只要线程存活,ThreadLocal 对象和其关联的值都不会被垃圾回收器回收。这会导致更严重的内存泄漏,因为它不仅保持了值,同时也保持了 ThreadLocal 对象本身。

静态与非静态权衡

ThreadLocal可以定义为静态非静态

静态 ThreadLocal:

  • 用途:通常用于那些需要跨多个线程但每个线程需要保持独立状态的应用场景。
  • 生命周期:随着类的加载而加载,随着类的卸载而卸载,因此在应用运行期间,静态 ThreadLocal 不会被垃圾回收。
  • 例子:上述实际应用中,将 ThreadLocal 设置为静态属性,用于存储数据库连接;用户会话信息存储等。

非静态 ThreadLocal:

  • 用途:适合那些仅在类的实例生命周期内需要保持线程局部状态的情况。
  • 生命周期:与它所属的对象实例的生命周期相关,对象被回收时,非静态 ThreadLocal 也可能随之被回收(如果没有其他强引用)。
  • 例子:在一个对象处理特定任务时,可能需要跟踪任务执行的某些状态,但这些状态只在对象生命周期内有效。

代码编写者确实需要在使用 ThreadLocal 时做出权衡,选择静态或非静态形式应根据具体应用的需求资源管理策略来决定。这种灵活性ThreadLocal 成为处理线程局部存储的强大工具,但也要求开发者必须谨慎地设计和实现以防止资源泄漏和其他潜在问题。

总结

ThreadLocal提供了一种优雅的方式来维护线程间的数据隔离,同时它也帮助我们避免了使用同步方法或同步块,这些同步机制可能会导致性能下降。通过使用ThreadLocal,我们可以提高应用程序在多线程环境下的性能,因为每个线程访问的是自己独立的变量。然而,ThreadLocal也应谨慎使用,不当的使用可能会导致内存泄露。例如,如果ThreadLocal没有被及时清除,那么由于其内部是通过弱引用实现的,可能会在某些情况下造成旧的线程上下文的长时间存留。因此,务必确保在不需要它时,适当地清理每个线程本地内存,以维护系统的健康状态和性能。


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!