線程封閉之ThreadLocal源碼詳解

掘金江溢Jonny,轉載請註明原創出處,謝謝!java

本文內容將基於JDK1.7的源碼進行討論,而且在文章的結尾,筆者將會給出一些經驗之談,但願能給學習者帶來些幫助。 算法

1、線程封閉

在《Java併發編程實戰》一書中提到,「當訪問共享的可變數據時,一般須要使用同步。一種避免使用同步的方式就是不共享數據」。所以提出了「線程封閉」的概念,一種常用線程封閉的應用場景就是JDBC的Connection,經過線程封閉技術,能夠把連接對象封閉在某個線程內部,從而避免出現多個線程共享同一個連接的狀況。而線程封閉總共有三種類型的呈現形式:編程

1)Ad-hoc線程封閉。維護線程封閉性的職責由程序實現來承擔,然而這種實現方式是脆弱的;數組

2)棧封閉。實際上經過儘可能使用局部變量的方式,避免其餘線程獲取數據;安全

3)ThreadLocal類。經過JDK提供的ThreadLocal類,能夠保證某個對象僅在線程內部被訪問,而該類正是本篇文章將要討論的內容。bash

2、誤區

網上不少人會想固然的認爲,ThreadLocal的實現就是一個相似Map<Thread, T>的對象,其中對象中保存了特定某個線程的值,然而實際上的實現並不是如此,筆者在這裏將就着JDK 1.7的源碼對ThreadLocal的實現進行解讀,若是有不對的或者不理解的地方,歡迎留言斧正。微信

3、舉個栗子

SimpleDateFormat是JDK提供的,一類用於處理時間格式的工具,可是由於早期的實現,致使這個類並不是是一個線程安全的實現,所以,在使用的時候咱們會須要使用線程封閉技術來保證使用該類過程當中的線程安全,在這裏,咱們使用了ThreadLocal,下面的實現是使用SimpleDateFormat格式化當前時間並輸出:數據結構

private static ThreadLocal<SimpleDateFormat> localFormatter =
                     new ThreadLocal<SimpleDateFormat>();
static {
    localFormatter.set(new SimpleDateFormat("yyyyMMdd"));
}
 
public static void main(String[] args) {
    Date now = new Date();
    System.out.println(localFormatter.get().format(now));
}
複製代碼

4、系統設計

在JDK 1.7中,ThreadLocal是一個以下圖所示的設計: 併發

ThreadLocal設計
能夠在圖裏看到,每一個線程內部都持有一個ThreadLocal.ThreadLocalMap類型的對象,可是該對象只能被ThreadLocal類處理。那麼讀者暫時能夠理解成,每一個線程的內部都持有了一個相似Map<ThreadLocal, T>結構的表(實際上,Map的維護的鍵值對,是一個WeakReference的弱引用結構,這個比SoftReference還要弱一點)。

爲何這樣設計?

看到這裏,有的讀者會產生這樣的提問,爲何是這樣的設計?好問題,按照不少的人的想法裏,應該有兩種設計方式:函數

1)全局ConcurrentMap<Thread,T>結構。該設計在對應的ThreadLocal對象內維持一個本地變量表,以當前線程(使用Thread.currentThread()方法)做爲key,查找對應的的本地變量(value值),那麼這麼設計存在什麼問題呢?

第一,全局的ConcurrentMap<Thread, T>表,這類數據結構雖然是一類分段式且線程安全的容器,可是這類容器仍然會有線程同步的的額外開銷;

第二,隨着線程的銷燬,原有的ConcurrentMap<Thread, T>沒有被回收,所以致使了內存泄露;

2)局部HashMap<ThreadLocal, T>的結構。在該設計下,每一個線程對象維護一個Map<ThreadLocal, T>,能夠這樣仍然會存在一些問題:

好比某個線程執行時間很是長,然而在此過程當中,某個對象已經不可達(理論上能夠被GC),可是因爲HashMap<ThreadLocal, T>數據結構的存在,仍然有對象被當前線程強引用,從而致使了該對象不能被GC,所以一樣也會致使內存泄露。

5、源碼實現

在闡述完ThreadLocal設計之後,咱們一塊兒來看看JDK1.7 是怎麼實現ThreadLocal的。

ThreadLocal類的自己實現比較簡單,其代碼的核心和精髓實際都在它的內部靜態類ThreadLocalMap中,所以這裏咱們再也不贅述ThreadLocal類的各類接口方法,直接進入主題,一塊兒來研究ThreadLocalMap類相關的源碼。

首先咱們翻閱Thread類的源碼,能夠看到這麼一句:

public
class Thread implements Runnable {
...
ThreadLocal.ThreadLocalMap threadLocals = null; // 注意這裏
...
}
複製代碼

能夠看到在每一個Thread類的內部,都耦合了一個ThreadLocalMap類型的引用,因爲ThreadLocalMap類是ThreadLocal類的私有內嵌類,所以ThreadLocalMap類型的對象只能由ThreadLocal類打理:

public class ThreadLocal<T> {
    ...
    // 內部私有靜態類
    static class ThreadLocalMap {
        ...
    }
    ...
}
複製代碼

關於ThreadLocalMap類實現,咱們也能夠把它理解成是一類哈希表,那麼做爲哈希表,就要包含:數據結構尋址方式哈希表擴容(Rehash),除了哈希表的部分外,ThreadLocalMap還包含了「垃圾回收」的過程。所以,咱們將按以上模塊分別介紹ThreadLocalMap類的實現。

1. 數據結構

那麼接下來咱們看看ThreadLocalMap中數據結構的定義:

static class ThreadLocalMap {
    static class Entry extends WeakReference<ThreadLocal> {
        Object value; // 實際保存的值
 
        Entry(ThreadLocal k, Object v) {
            super(k);
            value = v;
        }
    }
 
    /** * 哈希表初始大小,可是這個值不管怎麼變化都必須是2的N次方 */
    private static final int INITIAL_CAPACITY = 16;
 
    /** * 哈希表中實際存放對象的容器,該容器的大小也必須是2的冪數倍 */
    private Entry[] table;
 
    /** * 表中Entry元素的數量 */
    private int size = 0;
 
    /** * 哈希表的擴容閾值 */
    private int threshold; // 默認值爲0
 
    private void setThreshold(int len) {
        threshold = len * 2 / 3;
    }
  
    ...
    /** * 並非Thread被建立後就必定會建立一個新的ThreadLocalMap, * 除非當前Thread真的用了ThreadLocal * 而且賦值到ThreadLocal後纔會建立一個ThreadLocalMap */
    ThreadLocalMap(ThreadLocal firstKey, Object firstValue) {
        table = new Entry[INITIAL_CAPACITY];
        int i = firstKey.threadLocalHashCode 
              & (INITIAL_CAPACITY - 1);
        table[i] = new Entry(firstKey, firstValue);
        size = 1;
        setThreshold(INITIAL_CAPACITY);
    }
複製代碼

能夠從上面看到這些信息:

1)存放對象信息的表是一個數組。這類方式和HashMap有點像;

2)數組元素是一個**WeakReference(弱引用)**的實現。弱引用是一類比軟引用更加脆弱的類型(按照強弱程度分別爲 強引用>軟引用 > 弱引用 > 虛引用),至於爲何使用弱引用,這是由於線程的執行時間可能很長,可是對應的ThreadLocal對象生成時間未必有線程的執行壽命那般長,在對應ThreadLocal對象由該線程做爲根節點出發,邏輯上不可達時,就應該能夠被GC,若是使用了強引用,該對象沒法被成功GC,所以會帶來內存泄露的問題;

3)哈希表的大小必須是2的N次方。至於這部分,在後面會提到,實際上這個長度的設計和位運算有關;

4)閾值threshold。這個概念一樣和HashMap內部實現的閾值相似,當數組長度到了某個閾值時,爲了減小散列函數的碰撞,不得不擴展容量大小;

結構如圖所示,虛線部分表示的是一個弱引用

Entry引用

二、尋址方式

首先咱們根據getEntry()方法一塊兒來觀察一下根據哈希算法尋址某個元素的過程,能夠看到,這是一類「直接尋址法」的實現:

private Entry getEntry(ThreadLocal key) {
    int i = key.threadLocalHashCode & (table.length - 1);
    Entry e = table[i];
    if (e != null && e.get() == key)
        return e;
    else
        // 尋址失敗,須要繼續探察
        return getEntryAfterMiss(key, i, e);
}
複製代碼

在這裏咱們注意到一個「key.threadLocalHashCode」對象,該對象的生成方式以下:

public class ThreadLocal<T> {
    private final int threadLocalHashCode = 
                                    nextHashCode();
 
    /** * 計算哈希值相關的魔數 */
    private static final int HASH_INCREMENT = 0x61c88647;
 
    /** * 返回遞增後的哈希值 */
    private static int nextHashCode() {
        return nextHashCode.getAndAdd(HASH_INCREMENT);
    }
複製代碼

根據一個固定的值0x61c88647(爲何是這個數字,咱們稍後再提),在每次生成新的ThreadLocal對象時遞增這個哈希值

以前已經提到了,table的length必須知足2的N次方,所以按照位運算"key.threadLocalHashCode & (table.length - 1)"得到是哈希值的的末N位,根據這一哈希算法計算的結果取到哈希表中對應的元素。但是這個時候,又會遇到哈希算法的經典問題——哈希碰撞

針對哈希碰撞,咱們一般有三種手段:

1)拉鍊法。這類哈希碰撞的解決方法將全部關鍵字爲同義詞的記錄存儲在同一線性鏈表中。JDK1.7已經在HashMap類中實現了,感興趣的能夠去看看;

2)再哈希法。當發生衝突時,使用第二個、第三個、哈希函數計算地址,直到無衝突時。缺點:計算時間增長。好比第一次按照姓首字母進行哈希,若是產生衝突能夠按照姓字母首字母第二位進行哈希,再衝突,第三位,直到不衝突爲止;

3)開放地址法(ThreadLocalMap使用的正是這類方法)。所謂的開放定址法就是一旦發生了衝突,就去尋找下一個空的散列地址,只要散列表足夠大,空的散列地址總能找到,並將記錄存入。

那麼咱們一塊兒來看看ThreadLocalMap的實現,咱們經過getEntry()方法按照哈希函數取得哈希表中的值,在該方法內部,咱們將用到一個getEntryAfterMiss()方法:

/** * 若是在getEntry方法中不能立刻找到對應的Entry,將調用該方法 * * @param e table[i]對應的entry值 */
private Entry getEntryAfterMiss( ThreadLocal key, int i, Entry e) {
    Entry[] tab = table;
    int len = tab.length;
 
    while (e != null) {
        ThreadLocal k = e.get();
        if (k == key)
            return e;
        if (k == null)
            // 對從該位置開始的的對象進行清理(開發者主動GC)
            expungeStaleEntry(i); 
        else
            // 查找下一個對象
            i = nextIndex(i, len); 
        e = tab[i];
    }
    return null;
}
複製代碼

在該方法中能夠看到,當根據哈希函數直接查找對應的位置失敗後,就會從當前的位置日後開始尋找,直到找到對應的key值,另外,若是發現有key值已經被GC了,那麼相應的,也應該啓動expungeStaleEntry()方法,清理掉無效的Entry。

相似的,ThreadLocalMap類的set方法,也是按照 「根據哈希函數查找位置→ 若是查找不成功就沿着當前位置查找 → 若是發現垃圾數據及時清理」 的路徑進行着:

private void set(ThreadLocal key, Object value) {
 
    Entry[] tab = table;
    int len = tab.length;
    int i = key.threadLocalHashCode & (len-1);
 
    for (Entry e = tab[i];
         e != null;
         e = tab[i = nextIndex(i, len)]) {
        ThreadLocal k = e.get();
 
        if (k == key) {
            e.value = value;
            return;
        }
 
        if (k == null) {
            // 清理無效數據
            replaceStaleEntry(key, value, i); 
            return;
        }
    }
 
    tab[i] = new Entry(key, value);
    int sz = ++size;
    // 清理無效數據後判斷是否仍需擴容
    if (!cleanSomeSlots(i, sz) && sz >= threshold) 
        rehash(); // 擴容
}
複製代碼

該函數在「尋址方式」上和getEntry()方法相似,所以就不展開闡述了。

爲何是0x61c88647

這個魔數的選取與斐波那契散列有關,0x61c88647對應的十進制爲1640531527。斐波那契散列的乘數能夠用(long) ((1L << 31) * (Math.sqrt(5) - 1))能夠獲得2654435769,若是把這個值給轉爲帶符號的int,則會獲得-1640531527(也就是0x61c88647)。經過理論與實踐,當咱們用0x61c88647做爲魔數累加爲每一個ThreadLocal分配各自的ID也就是threadLocalHashCode再與2的冪取模,獲得的結果分佈很均勻。ThreadLocalMap使用的是線性探測法,均勻分佈的好處在於很快就能探測到下一個臨近的可用slot,從而保證效率。

三、哈希表擴容(Rehash)

咱們一塊兒來回憶一下,table對象的起始容量是能夠容納16個對象,在set()方法的尾部能夠看到如下內容:

// 清理無效數據後判斷是否仍需擴容
if (!cleanSomeSlots(i, sz) && sz >= threshold) 
        rehash(); // 擴容
複製代碼

若是當前容量大小大於閾值(threshold)後,將會發起一次擴容(rehash)操做。

private void rehash() {
    expungeStaleEntries();
 
    if (size >= threshold - threshold / 4)
        resize();
}
複製代碼

在該方法中,首先嚐試完全清理表中的無效元素(失效的弱引用),而後判斷當前是否仍然大於threshold值的3/4。

而threshold值,在文章開始的時候就已經提起過,是當前容量大小的2/3:

/** * 在當前容量大小超過table大小的2/3時可能會觸發一次rehash操做 */
private void setThreshold(int len) {
    threshold = len * 2 / 3;
}
複製代碼

那麼咱們一塊兒看看resize()方法:

/** * 成倍擴容table */
private void resize() {
    Entry[] oldTab = table;
    int oldLen = oldTab.length;
    int newLen = oldLen * 2; // 直接倍增
    Entry[] newTab = new Entry[newLen];
    int count = 0;
 
    for (int j = 0; j < oldLen; ++j) {
        Entry e = oldTab[j];
        if (e != null) {
            ThreadLocal k = e.get();
            if (k == null) {
                e.value = null; // 釋放無效的對象
            } else {
                int h = k.threadLocalHashCode & (newLen - 1);
                while (newTab[h] != null)
                    h = nextIndex(h, newLen);
                newTab[h] = e;
                count++;
            }
        }
    }
 
    setThreshold(newLen);
    size = count;
    table = newTab;
}
複製代碼

在該方法內部,首先建立一個新的表,表的大小是原來表大小的兩倍,而後再逐個複製原表內容到新表中,若是發現有無效對象,則把Entry對象中對應的value引用置爲NULL,方便後面垃圾收集器對該對象的回收。

四、垃圾回收

此時筆者再次貼出引用的圖示:

Entry引用
能夠看到Entry對象到ThreadLocal對象是一個弱引用的關係,而指向Object對象仍然是一個強引用的關係,所以,雖然因爲弱引用的ThreadLocal對象隨着ROOT路徑不可達而被垃圾收集器清理後,可是仍然殘留有Object對象,不及時清理會存在「內存泄露」的問題。

那麼咱們看看和垃圾收集有關的方法:

/** * 該方法將在set方法中被調用,在set某個值時,經過散列函數指向某個位置,然而 * 此時該位置上存在一個垃圾Entry,將會嘗試使用此方法用新值覆蓋舊值,不過該方 * 法還承擔了「主動垃圾回收」的功能。 * * @param key 以ThreadLocal類對象做爲key * @param value 經過ThreadLocal類對象找到對應的值 */
private void replaceStaleEntry( ThreadLocal key, Object value,int staleSlot) {
    Entry[] tab = table;
    int len = tab.length;
    Entry e;
 
    // 向前掃描,查找最前的一個無效slot
    int slotToExpunge = staleSlot;
    for (int i = prevIndex(staleSlot, len);
         (e = tab[i]) != null;
         i = prevIndex(i, len))
        if (e.get() == null)
            slotToExpunge = i;
 
 
    // 向後遍歷table,直到當前表中所指的位置是一個空值或
    // 者已經找到了和ThreadLocal對象匹配的值
    for (int i = nextIndex(staleSlot, len);
         (e = tab[i]) != null;
         i = nextIndex(i, len)) {
        ThreadLocal k = e.get();
 
        // 以前設置新值時,若是當前哈希位存在衝突,
        // 那麼就要順延到後面空的slot中存放。
        // 既然當前哈希位原來對應的ThreadLocal對象已經
        // 被回收了,那麼被順延放置的ThreadLocal對象
        // 天然就要被向前調整到當前位置中去
        if (k == key) {
            e.value = value;
 
            tab[i] = tab[staleSlot];
            tab[staleSlot] = e; // swap操做
 
            if (slotToExpunge == staleSlot)
                slotToExpunge = i;
            // 清理一波無效slot
            cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
            return; // 找到了就直接返回
        }
 
        // 若是當前的slot已經無效,而且向前掃描過程當中沒有無效slot,
        // 則更新slotToExpunge爲當前位置
        if (k == null && slotToExpunge == staleSlot)
            slotToExpunge = i;
    }
 
    // key沒找到就原地建立一個新的
    tab[staleSlot].value = null;
    tab[staleSlot] = new Entry(key, value);
 
    // 在探測過程當中若是發現任何無效slot,
    // 則作一次清理(連續段清理+啓發式清理)
    if (slotToExpunge != staleSlot)
        cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
}
 
/** * 這個函數作了兩件事情: * 1)清理當前無效slot(由staleSlot指定位置) * 2)從staleSlot開始,一直到null位,清理掉中間全部的無效slot */
private int expungeStaleEntry(int staleSlot) {
    Entry[] tab = table;
    int len = tab.length;
 
    // 清理當前無效slot(由staleSlot指定位置)
    tab[staleSlot].value = null;
    tab[staleSlot] = null;
    size--;
 
    // 從staleSlot開始,一直到null位,清理掉中間全部的無效slot
    Entry e;
    int i;
    for (i = nextIndex(staleSlot, len);
         (e = tab[i]) != null;
         i = nextIndex(i, len)) {
        ThreadLocal k = e.get();
        if (k == null) {
            // 清理掉無效slot
            e.value = null;
            tab[i] = null;
            size--;
        } else {
            int h = k.threadLocalHashCode & (len - 1);
            // 當前ThreadLocal不在它計算出來的哈希位上,
            // 說明以前在插入的時候被順延到哈希位後面放置了,
            // 所以此時須要向前調整位置
            if (h != i) {
                tab[i] = null;
 
                // 從計算出來的哈希位開始日後查找,找到一個適合它的空位
                while (tab[h] != null)
                    h = nextIndex(h, len);
                tab[h] = e;
            }
        }
    }
    return i;
}
 
/** * 啓發式地清理slot, * n是用於控制控制掃描次數的 * 正常狀況下若是log2(n)次掃描沒有發現無效slot,函數就結束了 * 可是若是發現了無效的slot,將n置爲table的長度len,作一次連續段的清理 * 再從下一個空的slot開始繼續掃描 */
private boolean cleanSomeSlots(int i, int n) {
    boolean removed = false;
    Entry[] tab = table;
    int len = tab.length;
    do {
        i = nextIndex(i, len);
        Entry e = tab[i];
        if (e != null && e.get() == null) {
            n = len;
            removed = true;
            i = expungeStaleEntry(i);
        }
    } while ( (n >>>= 1) != 0);
    return removed;
}
複製代碼

下面我來圖解一下expungeStaleEntry方法的流程:

expungeStaleEntry方法

以上是ThreadLocal源碼介紹的所有內容。下面筆者將補充一些在實際開發過程當中遇到的問題,做爲補充信息一併分享。

6、經驗之談

一、謹慎在ThreadExecutorPool中使用ThreadLocal

在ThreadExecutorPool中,Thread是複用的,所以每一個Thread對應的ThreadLocal空間也是被複用的,若是開發者不但願ThreadExecutorPool中的下一個Task能讀取到上一個Task在ThreadLocal中存入的信息,那就不該該使用ThreadLocal。

舉個例子:

final ThreadLocal<String> threadLocal = 
       new ThreadLocal<String>();
// 線程池大小爲1
ThreadPoolExecutor threadPoolExecutor = 
      new ThreadPoolExecutor(1, 1, 60, TimeUnit.SECONDS, 
                    new LinkedBlockingDeque<Runnable>());
// 任務1
threadPoolExecutor.execute(new Runnable() {
    public void run() {
        threadLocal.set("first insert"); 
    }
});
// 任務2
threadPoolExecutor.execute(new Runnable() {
    public void run() {
        System.out.println(threadLocal.get());
    }
});
複製代碼

像這樣,第二個任務能讀取到第一個任務插入的數據。可是若是此時線程池中任務一拋出一個異常出來:

final ThreadLocal<String> threadLocal = 
                      new ThreadLocal<String>();
// 線程池大小爲1
ThreadPoolExecutor threadPoolExecutor = 
  new ThreadPoolExecutor(1, 1, 60, TimeUnit.SECONDS, 
             new LinkedBlockingDeque<Runnable>());
// 任務1
threadPoolExecutor.execute(new Runnable() {
    public void run() {
        threadLocal.set("first insert");
        // 拋一個異常
        throw new RuntimeException("throw a exception"); 
 
    }
});
// 任務2
threadPoolExecutor.execute(new Runnable() {
    public void run() {
        System.out.println(threadLocal.get());
    }
});

複製代碼

那麼此時,第二個任務沒法讀取到第一個任務插入的數據(由於第一個線程由於拋異常已經死了,任務二用的是新線程執行)

二、不要濫用ThreadLocal

不少開發者爲了可以在類和類直接傳輸數據,而不想把方法裏的參數表寫得過於龐大,那麼可能會帶來類於類直接重度耦合的問題,這樣不利於後面的開發。

三、要先set才能get

繼續舉個例子:

public class TestMain {
    public ThreadLocal<Integer> intThreadLocal = 
                          new ThreadLocal<Integer>();
 
    public int getCount() {
        return intThreadLocal.get();
    }
 
    public static void main(String[] args) {
        System.out.println(new TestMain().getCount());
    }
}
複製代碼

在這裏,沒有先set就直接get,將會拋出一個NullPointerException,緣由咱們一塊兒來回顧一下ThreadLocal的代碼:

public T get() {
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    if (map != null) {
        ThreadLocalMap.Entry e = map.getEntry(this);
        if (e != null)
            return (T)e.value;
    }
    return setInitialValue(); // 返回了NULL致使NPE
}
 
private T setInitialValue() {
    T value = initialValue(); // 這裏返回了NULL
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    if (map != null)
        map.set(this, value);
    else
        createMap(t, value);
    return value;
}
複製代碼

以上就是「線程封閉之ThreadLocal源碼詳解」的所有內容了,若是還想進一步的交流,歡迎關注個人微信公衆號「Jonny的日知錄」~:-D

相關文章
相關標籤/搜索