併發系列(2)之 ThreadLocal 詳解

本文將主要結合源碼講述 ThreadLocal 的使用場景和內部結構,以及 ThreadLocalMap 的內部結構;另外在閱讀文本以前只好先了解一下引用和 HashMap 的相關知識,能夠參考 Reference 框架概覽Reference 徹底解讀HashMap 相關html

1、使用場景

一般狀況下避免多線程問題有三種方法:java

  • 不使用共享狀態變量;
  • 狀態變量爲不可變的;
  • 訪問共享變量時使用同步;

而 ThreadLocal 則是經過每一個線程獨享狀態變量的方式,即不使用共享狀態變量,來消除多線程問題的,例如:數組

@Slf4j public class TestThreadlocal {
  private static ThreadLocal<String> local = ThreadLocal.withInitial(() -> "init");

  public static void main(String[] args) throws InterruptedException {
    Runnable r = new TT();
    new Thread(r, "thread1").start();
    Thread.sleep(2000);
    new Thread(r, "thread2").start();
    log.info("exit");
  }

  private static class TT implements Runnable {
    @Override
    public void run() {
      log.info(local.get());
      local.set(Thread.currentThread().getName());
      log.info("set local name and get: {}", local.get());
    }
  }
}

// 打印:數據結構

[14 19:27:39,818 INFO ] [thread1] TestThreadlocal - init
[14 19:27:39,819 INFO ] [thread1] TestThreadlocal - set local name and get: thread1
[14 19:27:41,818 INFO ] [main]    TestThreadlocal - exit
[14 19:27:41,819 INFO ] [thread2] TestThreadlocal - init
[14 19:27:41,819 INFO ] [thread2] TestThreadlocal - set local name and get: thread2

能夠看到線程1和線程2雖然使用的是同一個 ThreadLocal 變量,可是他們之間卻沒有互相影響;其緣由就是每一個使用 ThreadLocal 變量的線程都會在各自的線程中保存一份 獨立 的副本,因此各個線程之間沒有相互影響;多線程

2、ThreadLocal 結構概述

ThreadLocal 的大致結構如圖所示:框架

threadlocal

如圖所示:ide

  • 在使用 ThreadLocal 的時候,是首先得到當前線程;
  • 而後取到線程的成員變量 ThreadLocalMap(暫時能夠理解爲和WeakHashMap類似,後面會詳細講到);
  • 而後以當前的 ThreadLocal 變量做爲 Key,取到 Entry;
  • 最後返回 Entry 中的 value;

其源代碼以下:this

public T get() {
  Thread t = Thread.currentThread();
  ThreadLocalMap map = getMap(t);
  if (map != null) {
    ThreadLocalMap.Entry e = map.getEntry(this);
    if (e != null) {
      @SuppressWarnings("unchecked")
      T result = (T)e.value;
      return result;
    }
  }
  return setInitialValue();
}

ThreadLocalMap getMap(Thread t) {
  return t.threadLocals;
}


ThreadLocalMap.Entry:線程

另外還須要注意這裏的 Entry,code

static class ThreadLocalMap {
  static class Entry extends WeakReference<ThreadLocal<?>> {
    /** The value associated with this ThreadLocal. */
    Object value;

    Entry(ThreadLocal<?> k, Object v) {
      super(k);
      value = v;
    }
  }
  
  ...
}

Reference(T referent) {
  this(referent, null);
}

能夠看到 Entry 繼承了 WeakReference,而且沒有傳入 ReferenceQueue;關於 Reference 的部分下面我簡單介紹,具體的能夠參考我上面提到了兩個博客;

reference

WeakReference 表示當傳入的 referent(這裏就是 ThreadLocal 自身),變成弱引用的時候(即沒有強引用指向他的時候);下一次 GC 將自動回收弱引用;這裏沒有傳入 ReferenceQueue,也就表明不能集中監測回收已棄用的 Entry,而須要再次訪問到對應的位置時才能檢測到,具體內容下面還有講到,注意這也是和 WeakHashMap 最大的兩個區別之一;

注意若是沒有手動移除 ThreadLocal,而他有一直以強引用狀態存活,就會致使 value 沒法回收,至最終 OOM;因此在使用 ThreadLocal 的時候,最後必定要手動移除;


3、ThreadLocalMap 結構概述

1. set 方法

ThreadLocalMap 看名字大體能夠知道是相似於 HashMap的數據結構;可是有一個重要的區別是,HashMap 使用拉鍊法解決哈希衝突,而 ThreadLocalMap 是使用線性探測法解決哈希衝突;具體結構如圖所示:

reference

如圖所示,ThreadLocalMap 裏面沒有鏈表的結構,當使用 threadLocalHashCode & (len - 1); 定位到哈希槽時,若是該位置爲空則直接插入,若是不爲空則檢查下一個位置,直到遇到空的哈希槽;

另外它和咱們一般見到的線性探測有點區別,在插入或刪除的時候,會有哈希槽的移動;

reference

源碼以下:

public void set(T value) {
  Thread t = Thread.currentThread();
  ThreadLocalMap map = getMap(t);
  if (map != null)
    map.set(this, value);
  else
    createMap(t, value);    // 延遲初始化
}

private void set(ThreadLocal<?> key, Object value) {
  Entry[] tab = table;
  int len = tab.length;
  int i = key.threadLocalHashCode & (len-1);    // 定位哈希槽
  
  // 若是本來的位置不爲空,則依次向後查找
  for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) {
    ThreadLocal<?> k = e.get();
  
    // 若是 threadLocal 已經存在,則直接用新值替代舊值
    if (k == key) {
      e.value = value;
      return;
    }
  
    // 若是向後找到一個已經棄用的哈希槽,則將其替換
    if (k == null) {
      replaceStaleEntry(key, value, i);
      return;
    }
  }
  
  // 若是定位的哈希槽爲空,則直接插入新值
  tab[i] = new Entry(key, value);
  int sz = ++size;
  
  // 最後掃描其餘棄用的哈希槽,若是最終超過閾值則擴容
  if (!cleanSomeSlots(i, sz) && sz >= threshold)
    rehash();
  }
}

private void replaceStaleEntry(ThreadLocal<?> key, Object value, int staleSlot) {
  Entry[] tab = table;
  int len = tab.length;
  Entry e;

  int slotToExpunge = staleSlot;
  
  // 以 staleSlot 爲基礎,向前查找到最前面一個棄用的哈希槽,並確立清除開始位置
  for (int i = prevIndex(staleSlot, len); (e = tab[i]) != null; i = prevIndex(i, len))
    if (e.get() == null) slotToExpunge = i;

  // 以 staleSlot 爲基礎,向後查找已經存在的 ThreadLocal
  for (int i = nextIndex(staleSlot, len); (e = tab[i]) != null; i = nextIndex(i, len)) {
    ThreadLocal<?> k = e.get();
    
    // 若是向後還有目標 ThreadLocal,則交換位置
    if (k == key) {
      e.value = value;

      tab[i] = tab[staleSlot];
      tab[staleSlot] = e;

      // 剛交換的位置若是等於清除開始位置,則將其指向目標位置以後
      if (slotToExpunge == staleSlot) slotToExpunge = i;
      
      // 從開始清除位置開始掃描全表,並清除
      cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
      return;
    }
    
        // 若是在目標位置後面未找到目標 ThreadLocal,則 staleSlot 仍然是目標位置,並將開始清除位置指向後面
    if (k == null && slotToExpunge == staleSlot)
      slotToExpunge = i;
  }
  
  // 在目標位置替換
  tab[staleSlot].value = null;
  tab[staleSlot] = new Entry(key, value);

  // 若是開始清除的位置,不是目標位置,則掃描全表並清除
  if (slotToExpunge != staleSlot)
    cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
}

其中整體思路是:

  • 若是目標位置爲空,則直接插入;
  • 若是不爲空,則向後查詢,看是否有目標key存在,若是存在則交換位置,並插入;
  • 另外還須要肯定一個跳躍掃描全表的起始位置,必須是棄用的哈希槽,若是目標位置前面有就找最前面的,若是沒有就用後面的;


2. get 方法

public T get() {
  Thread t = Thread.currentThread();
  ThreadLocalMap map = getMap(t);
  if (map != null) {
    ThreadLocalMap.Entry e = map.getEntry(this);
    if (e != null) {
      @SuppressWarnings("unchecked")
      T result = (T)e.value;
      return result;
    }
  }
  return setInitialValue();
}

private Entry getEntry(ThreadLocal<?> key) {
  int i = key.threadLocalHashCode & (table.length - 1);
  Entry e = table[i];
  if (e != null && e.get() == key)
    return e;
  else
    return getEntryAfterMiss(key, i, e);
}

private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
  Entry[] tab = table;
  int len = tab.length;

  while (e != null) {
    ThreadLocal<?> k = e.get();
    if (k == key)
      return e;
    if (k == null)
      expungeStaleEntry(i);
    else
      i = nextIndex(i, len);
    e = tab[i];
  }
  return null;
}

從源碼裏面也能夠看到上面講的邏輯:

  • 首先獲取 ThreadLocalMap,若是 map 爲空則初始化;也可使用 Thread.withInitial(Supplier<? extends S> supplier);工廠方法建立以初始值的 ThreadLocal,或則直接覆蓋 Thread.initialValue()方法;
  • 而後用哈希定位哈希槽,若是命中則返回,未命中則向後一次查詢;
  • 若是最終未找到,則用 Thread.initialValue() 方法返回初始值;


3. remove 方法

public void remove() {
  ThreadLocalMap m = getMap(Thread.currentThread());
  if (m != null) m.remove(this);
}

private void remove(ThreadLocal<?> key) {
  Entry[] tab = table;
  int len = tab.length;
  int i = key.threadLocalHashCode & (len-1);
  for (Entry e = tab[i];
    e != null;
    e = tab[i = nextIndex(i, len)]) {
    if (e.get() == key) {
      e.clear();
      expungeStaleEntry(i);
      return;
    }
  }
}

public void clear() {
  this.referent = null;
}

移除的邏輯也可 HashMap 相似:

  • 首先查找到目標哈希槽,而後清除;
  • 注意這裏的清除並不是直接將 Entry 置爲 null,而是先將 WeakReference 的 referent置爲空,在掃描全表;實際上是在模擬了 WeakReference 清除的過程,若是 ThreadLocal 變成弱引用,在訪問一次 ThreadLocalMap,其清除的過程是同樣的;
  • 另外注意這裏清除後和 HashMap 同樣,容量是不會縮小的;


4. ThreadLocal 哈希計算

int index = key.threadLocalHashCode & (len-1);

private final int threadLocalHashCode = nextHashCode();
private static int nextHashCode() { return nextHashCode.getAndAdd(HASH_INCREMENT); }
private static AtomicInteger nextHashCode = new AtomicInteger();
private static final int HASH_INCREMENT = 0x61c88647;

這裏哈希槽的定位仍然是使用的除留餘數法,當容量是2的冪時,hash % length = hash & (length-1);可是 ThreadLocalMap 和 HashMap 有點區別的是,ThreadLocalMap 的 key 都是 ThreadLocal,若是這裏使用一般意義的哈希計算方法,那確定每一個 key 都會發生哈希碰撞;因此須要用一種方法將相同的 key 區分開,並均勻的分佈到 2的冪的數組中;

因此就看到了上面的計算方法,ThreadLocal 的哈希值每次增長 0x61c88647;具體緣由你們能夠參見源碼註釋,其目的就是能使 key 均勻的分佈到 2的冪的數組中;


5. 清除方法

cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);

private boolean cleanSomeSlots(int i, int n) {
  boolean removed = false;
  Entry[] tab = table;
  int len = tab.length;
  do {
    i = nextIndex(i, len);
    Entry e = tab[i];
    if (e != null && e.get() == null) {
      n = len;
      removed = true;
      i = expungeStaleEntry(i);
    }
  } while ( (n >>>= 1) != 0);
  return removed;
}

private int expungeStaleEntry(int staleSlot) {
  Entry[] tab = table;
  int len = tab.length;

  // expunge entry at staleSlot
  tab[staleSlot].value = null;
  tab[staleSlot] = null;
  size--;

  // Rehash until we encounter null
  Entry e;
  int i;
  for (i = nextIndex(staleSlot, len); (e = tab[i]) != null; i = nextIndex(i, len)) {
    ThreadLocal<?> k = e.get();
    if (k == null) {
      e.value = null;
      tab[i] = null;
      size--;
    } else {
      int h = k.threadLocalHashCode & (len - 1);
      if (h != i) {
        tab[i] = null;

        // Unlike Knuth 6.4 Algorithm R, we must scan until
        // null because multiple entries could have been stale.
        while (tab[h] != null)
          h = nextIndex(h, len);
        tab[h] = e;
      }
    }
  }
  return i;
}

expungeStaleEntry:

  • 首先清除目標位置;
  • 而後向後依次掃描,直到遇到空的哈希槽;
  • 若是遇到已棄用的哈希槽則清除,若是遇到因哈希衝突後移的 ThreadLocal,則前移;

cleanSomeSlots 則是向後偏移調用 expungeStaleEntry 方法 log(n) 次,cleanSomeSlots(expungeStaleEntry(slotToExpunge), len); 連用就能夠掃描全表清除已棄用的哈希槽;


6. 擴容方法

private void rehash() {
  expungeStaleEntries();

  // Use lower threshold for doubling to avoid hysteresis
  if (size >= threshold - threshold / 4) resize();
}

private void expungeStaleEntries() {
  Entry[] tab = table;
  int len = tab.length;
  for (int j = 0; j < len; j++) {
    Entry e = tab[j];
    if (e != null && e.get() == null) expungeStaleEntry(j);
  }
}

private void resize() {
  Entry[] oldTab = table;
  int oldLen = oldTab.length;
  int newLen = oldLen * 2;
  Entry[] newTab = new Entry[newLen];
  int count = 0;

  for (int j = 0; j < oldLen; ++j) {
    Entry e = oldTab[j];
    if (e != null) {
      ThreadLocal<?> k = e.get();
      if (k == null) {
        e.value = null; // Help the GC
      } else {
        int h = k.threadLocalHashCode & (newLen - 1);
        while (newTab[h] != null)
          h = nextIndex(h, newLen);
        newTab[h] = e;
        count++;
      }
    }
  }

  setThreshold(newLen);
  size = count;
  table = newTab;
}

擴容時:

  • 首先掃描全表清除已棄用的哈希槽;
  • 若是清除後仍然超過閾值,則擴容;
  • 擴容時,容量增長 1 倍(初始容量爲 16,因此容量一直是2的冪),而後將舊錶中的值,依次插到新表;


4、InheritableThreadLocal

InheritableThreadLocal 是能夠被繼承的 ThreaLocal;在 Thread 中有成員變量用來繼承父類的 ThreadLocalMap ;ThreadLocal.ThreadLocalMap inheritableThreadLocals;好比:

@Slf4j public class TestThreadlocal {
  private static InheritableThreadLocal<String> local = new InheritableThreadLocal();

  public static void main(String[] args) throws InterruptedException {
    Runnable r = new TT();

    local.set("parent");
    log.info("get: {}", local.get());
    Thread.sleep(1000);
    new Thread(r, "child").start();
    log.info("exit");

  }

  private static class TT implements Runnable {
    @Override
    public void run() {
      log.info(local.get());
      local.set(Thread.currentThread().getName());
      log.info("set local name and get: {}", local.get());
    }
  }
}

// 打印:

[15 10:58:29,878 INFO ] [main]  TestThreadlocal - get: parent
[15 10:58:30,878 INFO ] [main]  TestThreadlocal - exit
[15 10:58:30,878 INFO ] [child] TestThreadlocal - parent
[15 10:58:30,878 INFO ] [child] TestThreadlocal - set local name and get: child

總結

  • ThreadLocal 經過線程獨佔的方式,也就是隔離的方式,避免了多線程問題;
  • 在使用 ThreadLocal 的時候必定要手動移除,以免內存泄漏;
相關文章
相關標籤/搜索