谈谈ThreadLocal
ThreadLocal,即线程局部变量,它被设计用来解决变量共享的线程安全问题。线程安全问题发生的根本原因在于多个线程会对同一个临界区资源进行操作。Synchonized和ThreadLocal是两种不同的解决多线程并发访问的方式。Synchonized使用了锁机制,使得同一时间只有一个线程能访问变量,而ThreadLocal是为每一个线程拷贝变量的副本,隔离了多个线程对数据的共享。所以,Synchronized用于线程间的数据共享,ThreadLocal用于线程间的数据隔离。在开发中如果遇到在一个线程中一些数据在不同层次的代码中需要通过参数来回传递,但是不想增加性能损耗的问题时,最好使用ThreadLocal来解决这样的安全问题。从根本上看,其实使用ThreadLocal是一种“空间换时间”的方案。 引用类型为了理解ThreadLocal的实现原理,首先需要了解引用类型的概念。根据引用类型语义的强弱,可以把引用分为四种类型。分别为强引用、软引用、弱引用和虚引用,它们的强弱关系依次递减。
ThreadLocal的使用这个例子简单的示范了ThreadLocal类的使用。 import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; public class Counter { private static ThreadLocal<Integer> NUM = new ThreadLocal<>(); public static int add(int n) { NUM.set(n); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } return NUM.get() + 1; } public static void main(String[] args) { ExecutorService service = Executors.newFixedThreadPool(10); for (int i = 0; i < 10; i++) { int n = i; service.execute(() -> { System.out.println(n + " " + Counter.add(n)); }); } } } ThreadLocal实现原理下图是ThreadLocal实现原理图,图中黑色实线箭头为强引用,红色虚线箭头的代表弱引用。可以看出,ThreadLocalMap的Entry使用虚引用存储ThreadLocal。 ThreadLocal一共有三个核心方法, set方法从 public void set(T value) { // 1.获取当前线程的实例对象 Thread t = Thread.currentThread(); // 2.获取ThreadLocalMap对象 ThreadLocalMap map = getMap(t); if (map != null) // 3.如果Map存在,则以当前ThreadLocal实例为key,值为Value进行存储 map.set(this,value); else // 4.如果Map不存在,则新建ThreadLocalMap并存入Value createMap(t,value); } 可以看出,通过 ThreadLocalMap getMap(Thread t) { return t.threadLocals; } ThreadLocalMap的引用定义在Thread中 ThreadLocal.ThreadLocalMap threadLocals = null; 在map为空时,会调用createMap这个方法,该方法会new一个ThreadLocalMap对象,然后将当前ThreadLocal实例作为key,值为value存入map。 void createMap(Thread t,T firstValue) { t.threadLocals = new ThreadLocalMap(this,firstValue); } get方法
public T get() { // 1. 获取当前线程的实例对象 Thread t = Thread.currentThread(); // 2. 获取当前ThreadLocal实例的ThreadLocalMap对象 ThreadLocalMap map = getMap(t); if (map != null) { // 3. 如果map存在,则获取Entry ThreadLocalMap.Entry e = map.getEntry(this); if (e != null) { // 4. 如果Entry不为空,那么将value取出并返回 @SuppressWarnings("unchecked") T result = (T)e.value; return result; } } // 5. 如果map不存在或者map存在Entry为空,那么初始化map return setInitialValue(); } 在当前线程的ThreadLocalMap实例不为空时,通过ThreadLocal实例作为key获取entry,拿到value值。如果ThreadLocalMap为空,或者entry为空时,说明还没有设置value值,调用 private T setInitialValue() { // 1. 获取value值 T value = initialValue(); // 2. 获取当前线程的实例对象 Thread t = Thread.currentThread(); // 3. 获取当前ThreadLocal实例的ThreadLocalMap对象 ThreadLocalMap map = getMap(t); if (map != null) // 4.如果map存在,设置key和value map.set(this,value); else // 5. 如果map不存在,新建map并存入value createMap(t,value); // 6. 返回value return value; } 在 protected T initialValue() { return null; } remove方法
public void remove() { // 1. 获取当前线程的ThreadLocalMap对象 ThreadLocalMap m = getMap(Thread.currentThread()); if (m != null) // 2. 如果map存在,调用remove方法,传入当前ThreadLocal实例 m.remove(this); } 在ThreadLocalMap实例不为空时,调用它的remove方法传入当前ThreadLocal实例作为参数来删除数据。 ThreadLocalMap分析下面来详细分析一下ThreadLocalMap中的重要方法,ThreadLocalMap一共由三个主要的方法,包括 static class ThreadLocalMap { ... } ThreadLocalMap内部使用Entry数组来存储数据,数组的长度为2的N次方,初始值设为16. // 存储数据的table数组 private Entry[] table; private static final int INITIAL_CAPACITY = 16; static class Entry extends WeakReference<ThreadLocal<?>> { /** The value associated with this ThreadLocal. */ Object value; Entry(ThreadLocal<?> k,Object v) { // 将k包装为一个WeakReference super(k); value = v; } } set方法private void set(ThreadLocal<?> key,Object value) { // We don't use a fast path as with get() because it is at // least as common to use set() to create new entries as // it is to replace existing ones,in which case,a fast // path would fail more often than not. // 1. 获取Entry数组 Entry[] tab = table; // 2. 获取数组的长度 int len = tab.length; // 3. 寻找数组中数据存入的位置 int i = key.threadLocalHashCode & (len-1); // 4. 当遇到hash冲突时 for (Entry e = tab[i]; e != null; // 5. 线性探测下一位置 e = tab[i = nextIndex(i,len)]) { // 6. 由于Entry继承自WeakReference,调用get方法来得到threadLocal引用 ThreadLocal<?> k = e.get(); // 7. 如果是同一个ThreadLocal实例的话,覆盖旧value值 if (k == key) { e.value = value; return; } // 8. 如果k为空,说明threadLocal实例已经被回收 if (k == null) { // 9. 同时替换掉key和value replaceStaleEntry(key,value,i); return; } } // 5. 如果没有遇到hash冲突,那么直接在位置i新建Entry对象,存入value值 tab[i] = new Entry(key,value); // 6. 增加数据长度值 int sz = ++size; // 7. 清除掉一些key为null的entry实例,如果超过容量,则进行扩容 if (!cleanSomeSlots(i,sz) && sz >= threshold) rehash(); } 需要注意的是,在寻找数据存入数组的位置时,使用了 private final int threadLocalHashCode = nextHashCode(); private static AtomicInteger nextHashCode = new AtomicInteger(); private static final int HASH_INCREMENT = 0x61c88647; private static int nextHashCode() { return nextHashCode.getAndAdd(HASH_INCREMENT); } 从源码总可以看出,threadLocalHashCode是由一个AtomicInteger变量加上 private static int nextIndex(int i,int len) { return ((i + 1 < len) ? i + 1 : 0); }
由于Entry中ThreadLocal实例采用虚引用存储,因此在没有实引用引用它时,这个实例将会被回收,使key为空,当key为空时,value已经没用,这个时候使用 注意ThreadLocal的 private void replaceStaleEntry(ThreadLocal<?> key,Object value,int staleSlot) { // 1. 获取Entry数组 Entry[] tab = table; // 2. 获取数组长度 int len = tab.length; Entry e; // Back up to check for prior stale entry in current run. // We clean out whole runs at a time to avoid continual // incremental rehashing due to garbage collector freeing // up refs in bunches (i.e.,whenever the collector runs). int slotToExpunge = staleSlot; // 3. 从右往左探测,探测到第一个k为null的entry,称之为脏entry for (int i = prevIndex(staleSlot,len); (e = tab[i]) != null; i = prevIndex(i,len)) if (e.get() == null) // 4. 记录第一个k为null的entry下标 slotToExpunge = i; // Find either the key or trailing null slot of run,whichever // occurs first // 5. 从左往右探测,寻找是否有旧数据 for (int i = nextIndex(staleSlot,len); (e = tab[i]) != null; i = nextIndex(i,len)) { ThreadLocal<?> k = e.get(); // If we find key,then we need to swap it // with the stale entry to maintain hash table order. // The newly stale slot,or any other stale slot // encountered above it,can then be sent to expungeStaleEntry // to remove or rehash all of the other entries in run. // 6. 如果找到旧数据 if (k == key) { // 7. 将旧数据覆盖 e.value = value; // 8. 交换这个存入数据的entry和之前的脏entry的位置 tab[i] = tab[staleSlot]; tab[staleSlot] = e; // Start expunge at preceding stale entry if it exists // 9. 如果最左脏entry的位置等于i的时候,设置slotToExpunge的位置 if (slotToExpunge == staleSlot) slotToExpunge = i; // 10. 清理脏entry cleanSomeSlots(expungeStaleEntry(slotToExpunge),len); return; } // If we didn't find stale entry on backward scan,the // first stale entry seen while scanning for key is the // first still present in the run. // 11. 如果在右侧发现脏entry同时左侧没有脏entry,以当前位置为起点,清理脏entry if (k == null && slotToExpunge == staleSlot) slotToExpunge = i; } // If key not found,put new entry in stale slot // 12. 如果没有发现有旧数据,那么直接覆盖脏entry tab[staleSlot].value = null; tab[staleSlot] = new Entry(key,value); // If there are any other stale entries in run,expunge them if (slotToExpunge != staleSlot) cleanSomeSlots(expungeStaleEntry(slotToExpunge),len); } 在上面的代码中, 清理脏entry调用了 private boolean cleanSomeSlots(int i,int n) { boolean removed = false; Entry[] tab = table; int len = tab.length; do { i = nextIndex(i,len); Entry e = tab[i]; if (e != null && e.get() == null) { n = len; removed = true; i = expungeStaleEntry(i); } } while ( (n >>>= 1) != 0); return removed; }
private int expungeStaleEntry(int staleSlot) { Entry[] tab = table; int len = tab.length; // expunge entry at staleSlot // 1. 将value设置为null,使GC能回收 tab[staleSlot].value = null; // 2. 清除当前脏entry tab[staleSlot] = null; // 3. 数据长度减一 size--; // Rehash until we encounter null Entry e; int i; // 4. 向后探测 for (i = nextIndex(staleSlot,len); (e = tab[i]) != null; i = nextIndex(i,len)) { ThreadLocal<?> k = e.get(); // 5. 如果遇到脏entry,将entry清除 if (k == null) { e.value = null; tab[i] = null; size--; } else { // 6. 重新使用hash确定位置 int h = k.threadLocalHashCode & (len - 1); // 7. 如果k是因为有数据占据了位置变化位置的情况下,将entry挪回原来的位置。 if (h != i) { tab[i] = null; // Unlike Knuth 6.4 Algorithm R,we must scan until // null because multiple entries could have been stale. while (tab[h] != null) h = nextIndex(h,len); tab[h] = e; } } } // 8. 如果向后探测时当前位置entry为空,返回当前位置 return i; } private void setThreshold(int len) { threshold = len * 2 / 3; } private void rehash() { expungeStaleEntries(); // Use lower threshold for doubling to avoid hysteresis if (size >= threshold - threshold / 4) resize(); } private void resize() { Entry[] oldTab = table; int oldLen = oldTab.length; int newLen = oldLen * 2; Entry[] newTab = new Entry[newLen]; int count = 0; for (int j = 0; j < oldLen; ++j) { Entry e = oldTab[j]; if (e != null) { ThreadLocal<?> k = e.get(); if (k == null) { e.value = null; // Help the GC } else { int h = k.threadLocalHashCode & (newLen - 1); while (newTab[h] != null) h = nextIndex(h,newLen); newTab[h] = e; count++; } } } setThreshold(newLen); size = count; table = newTab; } getEntry方法private Entry getEntry(ThreadLocal<?> key) { int i = key.threadLocalHashCode & (table.length - 1); Entry e = table[i]; if (e != null && e.get() == key) return e; else return getEntryAfterMiss(key,i,e); } remove方法/** * Remove the entry for key. */ private void remove(ThreadLocal<?> key) { Entry[] tab = table; int len = tab.length; int i = key.threadLocalHashCode & (len-1); for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i,len)]) { if (e.get() == key) { e.clear(); expungeStaleEntry(i); return; } } } private void remove(ThreadLocal<?> key) { // 1. 获取当前ThreadLocal实例的Entry数组 Entry[] tab = table; // 2. 当前ThreadLocal实例的Entry数组中键值对的个数 int len = tab.length; // 3. 获取当前key的位置 int i = key.threadLocalHashCode & (len-1); for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i,len)]) { if (e.get() == key) { e.clear(); expungeStaleEntry(i); return; } } } readLocal的副作用脏数据线程池会重用对象,所以与线程绑定的类的静态属性ThreadLocal变量也会被重用。如果在实现的线程 内存泄漏由于ThreadLocalMap中存储的是ThreadLocal的弱引用,所以当引用ThreadLocal的实引用不存在时,ThreadLocal实例会被GC回收,造成Entry的key为空,而value不会被回收,所以会发生内存泄漏。因此,在每次使用完ThreadLocal后,必须要及时的调用 (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |