Java併發編程之ThreadLocal源碼分析

ThreadLocal介紹

ThreadLocal是JDK1.2提供的,做用是給單個線程內的共享變量提供載具。每一個線程之間的ThreadLocal裏的數據是相互隔離的,並隨着線程的消亡而消亡。java

使用

ThreadLocal提供了get(),set(T value),remove()3個對外方法。數組

  • 1.調用get()獲取值
  • 2.調用set(T value)設置值
  • 3.調用remove()刪除值

使用中的坑

ThreadLocal常被用來作登入狀態信息的存儲。可是若是當前線程操做完不對狀態信息作remove()可能會出現坑。咱們拿購買商品舉個例子:jvm

    1. A用戶已經登入,請求購買
    1. ThreadLocal存儲A用戶信息。
    1. 獲取ThreadLocal裏A用戶信息調用去請求購買接口,並返回成功。
    1. A用戶線程未被系統回收,待重複利用。
    1. B用戶也發起請求購買請求,並重用了A用戶使用過的線程,此時B用戶並未登入,因此跳過ThreadLocal存儲B用戶信息的邏輯。
    1. 正常狀況下B用戶會返回須要登入的提示,但此時ThreadLocal存儲A用戶信息並未被清除,獲取A用戶信息並調用去請求購買接口,並返回成功。

能夠看到B用戶使用了A用戶的信息去購買了商品,正確的作法應該是每一個線程使用結束後去remove()。源碼分析

ThreadLocal原理

ThreadLocal的UML圖以下this

調用set方法真正的數據是存在ThreadLocalMap裏的,而ThreadLocalMap是線程Thread的成員變量,因此說線程Thread被jvm回收後ThreadLocalMap也會被回收。ThreadLocalMap的實現是採用順序存儲結構哈希表,它跟HashMap不一樣,每一個hash地址只能存一個數據。它key存的是ThreadLocal自己並且它的Entry繼承至WeakReference,因此它的key若是沒被強引用會在GC的觸發的時候回收掉。線程

set(T value)源碼分析

public void set(T value) {
    	  //獲取當前線程
        Thread t = Thread.currentThread();
        //根據當前線程獲取ThreadLocalMap
        ThreadLocalMap map = getMap(t);
        //若是map不爲空設置值
        if (map != null)
            map.set(this, value);
        //若是map爲空說明線程中成員變量ThreadLocalMap還沒被建立,則建立map
        else
            createMap(t, value);
    }

這個方法主要根據當前線程獲取ThreadLocalMap,若是還沒初始化則調用createMap(t, value)初始化,反之調用map.set(this, value)設置值。設計

下面看下getMap(t)的實現code

ThreadLocalMap getMap(Thread t) {
        return t.threadLocals;
    }

很簡單就是獲取Thread的成員變量threadLocalsblog

先來看下map爲空調用createMap(t, value)去建立ThreadLocalMap的狀況:繼承

void createMap(Thread t, T firstValue) {
        t.threadLocals = new ThreadLocalMap(this, firstValue);
    }

createMap(t, value)直接是new的ThreadLocalMap,ThreadLocalMap構造方法以下:

ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
	//1.建立和初始化table容量
    table = new Entry[INITIAL_CAPACITY];
    //2.快速hash獲取下標地址
    int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
    //3.建立Entry,存放第一個數據
    table[i] = new Entry(firstKey, firstValue);
    //4.設置存儲個數
    size = 1;
    //5.設置擴容閥值
    setThreshold(INITIAL_CAPACITY);
}
  • 1.先建立和初始化table容量,table就是存放數據的容器,容器初始容量爲16。
  • 2.使用快速hash獲取下標地址,這裏看下獲取firstKey.threadLocalHashCode的代碼:
private final int threadLocalHashCode = nextHashCode();
private static AtomicInteger nextHashCode =
    new AtomicInteger();
private static final int HASH_INCREMENT = 0x61c88647;

private static int nextHashCode() {
    return nextHashCode.getAndAdd(HASH_INCREMENT);
}

nextHashCode()的方法每次建立ThreadLocal都會加HASH_INCREMENT從新計算threadLocalHashCode的值,HASH_INCREMENT這個魔數的選取與斐波那契散列有關爲了讓哈希碼能均勻的分佈在2的N次方的數組裏,這裏指table數組。

  • 3.建立Entry,存放第一個數據,這裏以ThreadLocal本身自己key,Entry繼承至WeakReference,代碼以下:
static class Entry extends WeakReference<ThreadLocal<?>> {
    /** The value associated with this ThreadLocal. */
    Object value;

    Entry(ThreadLocal<?> k, Object v) {
        super(k);
        value = v;
    }
}

因爲key是弱引用,因此它的key若是沒被強引用會在GC的觸發的時候回收掉。

  • 4.設置長度爲1。
  • 5.設置擴容閥值,看下實現
private void setThreshold(int len) {
       threshold = len * 2 / 3;
}

擴容閥值的計算是容量大小的2/3是,這裏結果是10。

下面看下map.set(this, value)實現

private void set(ThreadLocal<?> key, Object value) {
	//調用set以前已經作過判斷,因此table已經初始化了
    Entry[] tab = table;
   //獲取tab的長度
    int len = tab.length;
   //1.快速hash獲取下標地址
    int i = key.threadLocalHashCode & (len-1);
	//2.用線性探測法解決衝突
    for (Entry e = tab[i];
         e != null;
         //取下個下標值
         e = tab[i = nextIndex(i, len)]) {
        ThreadLocal<?> k = e.get();
        //3.若是這個key已經存在,從新設置值
        if (k == key) {
            e.value = value;
            return;
        }
		//4.若是key已通過期,則替換這個髒槽
        if (k == null) {
            replaceStaleEntry(key, value, i);
            return;
        }
    }
    //5.建立Entry
    tab[i] = new Entry(key, value);
    //6.存儲個數加1
    int sz = ++size;
    //7.清理key已通過期清理的髒槽,若是沒髒槽而且存儲個數已經大於擴容閥值,則擴容
    if (!cleanSomeSlots(i, sz) && sz >= threshold)
        rehash();
}
  • 1.快速hash獲取下標地址。
  • 2.用線性探測法解決衝突,調用nextIndex(i, len)遍歷table。咱們看下nextIndex(i, len)的源碼
private static int nextIndex(int i, int len) {
    return ((i + 1 < len) ? i + 1 : 0);
}

遍歷的實現其實設計了一個環,從i開始遍歷到達len長度後又開始從0開始。實際上這裏用線性探測法解決衝突不會到達len長度,由於在到達以前已經進行了擴容。

  • 3.若是找到的這個key已經存在,從新設置值。
  • 4.若是找到的key已通過期,則替換這個髒槽。
  • 5.建立Entry。
  • 6.存儲個數加1。
  • 7.清理key已通過期清理的髒槽,若是未清理到髒槽而且存儲個數已經大於擴容閥值,則調用rehash()重hash。

下面來看下replaceStaleEntry(key, value, i)的源碼

private void replaceStaleEntry(ThreadLocal<?> key, Object value,
                               int staleSlot) {
    Entry[] tab = table;
    int len = tab.length;
    Entry e;
    int slotToExpunge = staleSlot;
    //1.向前查找,找到第一個key過時的髒槽
    for (int i = prevIndex(staleSlot, len);
         (e = tab[i]) != null;
         i = prevIndex(i, len))
        if (e.get() == null)
            slotToExpunge = i;
    //2.從staleSlot位置開始向後查找,若是找到key,交換至staleSlot位置的髒槽
    for (int i = nextIndex(staleSlot, len);
         (e = tab[i]) != null;
         i = nextIndex(i, len)) {
        ThreadLocal<?> k = e.get();
        //找到key
        if (k == key) {
            //交換至staleSlot位置的髒槽
            e.value = value;
            tab[i] = tab[staleSlot];
            tab[staleSlot] = e;
            //若是slotToExpunge == staleSlot,說明前面沒有髒槽,直接從i位置開始清理
            if (slotToExpunge == staleSlot)
                slotToExpunge = i;
            //清理髒槽
            cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
            return;
        }
        //若是slotToExpunge == staleSlot,說明前面沒有髒槽,直接從i位置開始清理
        if (k == null && slotToExpunge == staleSlot)
            slotToExpunge = i;
    }
    //3.若是沒有找到key,則建立一個新的Entry放至staleSlot位置的髒槽
    tab[staleSlot].value = null;
    tab[staleSlot] = new Entry(key, value);
    //4.若是運行過程當中有找到髒槽,清理之
    if (slotToExpunge != staleSlot)
        cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
}
  • 1.從staleSlot位置向前查找,找到第一個key過時的髒槽
  • 2.從staleSlot位置開始向後查找,若是找到key,交換至staleSlot位置的髒槽
  • 3.若是沒有找到key,則建立一個新的Entry放至staleSlot位置的髒槽
  • 4.若是運行過程當中有找到髒槽,清理之,這裏slotToExpunge != staleSlot 成立說明slotToExpunge已經改變說明找到了髒槽。

cleanSomeSlots(expungeStaleEntry(slotToExpunge), len)的做用主要是清理髒槽expungeStaleEntry(slotToExpunge)方法做用的從slotToExpunge位置(包括slotToExpunge)開始清理臨近的髒槽。

下面來看下expungeStaleEntry(slotToExpunge)的源碼

private int expungeStaleEntry(int staleSlot) {
    Entry[] tab = table;
    int len = tab.length;

    // 1.清理槽
    tab[staleSlot].value = null;
    tab[staleSlot] = null;
    //存儲個數減一
    size--;

    // 2.重hash或清理staleSlot以後的槽,直到空值
    Entry e;
    int i;
    for (i = nextIndex(staleSlot, len);
         (e = tab[i]) != null;
         i = nextIndex(i, len)) {
        ThreadLocal<?> k = e.get();
        //2.1若是輪詢到的k爲空,則清理之
        if (k == null) {
            e.value = null;
            tab[i] = null;
            size--;
        } else {
        		//2.2重hash,從新設置hash已經改變的Entry的位置
            int h = k.threadLocalHashCode & (len - 1);
            if (h != i) {
                tab[i] = null;

                while (tab[h] != null)
                    h = nextIndex(h, len);
                tab[h] = e;
            }
        }
    }
    //3.返回清理遍歷的最後位置i
    return i;
}
  • 1.清理staleSlot位置的槽。這裏清理的邏輯就是將value設置爲null並將整個Entry設置爲null,以便後續新Entry覆蓋使用。
  • 2.重hash或清理staleSlot以後的槽,直到空值。
    • 2.1 若是輪詢到的k爲空,則清理之。
    • 2.2 重hash,從新設置hash已經改變的Entry的位置。這裏i地址有多是通過線性探測解決的衝突的方式找到的地址,由於前面的槽已經被清理過因此線性探測解決的衝突方法找到的地址可能已經不是i,因此這邊須要從新用線性探測解決的衝突方法查找新地址。
  • 3 .返回清理遍歷的最後位置i。 總結下expungeStaleEntry(slotToExpunge)邏輯其實不單單清理傳的slotToExpunge地址的槽也會清理它臨近的槽。

拿到清理遍歷的最後位置i後會調用cleanSomeSlots(int i, int n)繼續從i開始清理髒槽下面來看下的源碼:

private boolean cleanSomeSlots(int i, int n) {
    boolean removed = false;
    Entry[] tab = table;
    int len = tab.length;
    do {
    	//環形遍歷
        i = nextIndex(i, len);
        Entry e = tab[i];
        //若是是髒槽,則清理之
        if (e != null && e.get() == null) {
            n = len;
            removed = true;
            //最終調用expungeStaleEntry(i)去清理
            i = expungeStaleEntry(i);
        }
    //log2(n)清理次數
    } while ( (n >>>= 1) != 0);
    return removed;
}

cleanSomeSlots(int i, int n)主要功能就是從i位置開始遍歷log2(n)次去清理槽,爲何是log2(n)次官方給的緣由是簡單,快速。因此這個方法可能不是清理全部的髒槽,而是簡單快速的清理幾個髒槽。

下面來看下rehash()方法

private void rehash() {
	//1.清理全部的髒槽
    expungeStaleEntries();

    //2.若是清理事後存儲個數仍是大於擴容閥值的3/4,則擴容
    if (size >= threshold - threshold / 4)
        resize();
}
  • 1.調用expungeStaleEntries()清理全部的髒槽。
  • 2.若是清理事後存儲個數仍是大於擴容閥值的3/4,則擴容。

下面看下expungeStaleEntries()方法源碼

private void expungeStaleEntries() {
    Entry[] tab = table;
    int len = tab.length;
    for (int j = 0; j < len; j++) {
        Entry e = tab[j];
        if (e != null && e.get() == null)
            expungeStaleEntry(j);
    }
}

代碼很簡單就是遍歷全部的table並清理髒槽。

下面看下resize()方法源碼

private void resize() {
            Entry[] oldTab = table;
            //獲取老table容量
            int oldLen = oldTab.length;
            //新table容量擴大2倍
            int newLen = oldLen * 2;
            Entry[] newTab = new Entry[newLen];
            int count = 0;
            //遍歷老的table,對全部Entry重hash定位
            for (int j = 0; j < oldLen; ++j) {
                Entry e = oldTab[j];
                if (e != null) {
                    ThreadLocal<?> k = e.get();
                    //若是遇到髒槽,清理之幫助GC
                    if (k == null) {
                        e.value = null; // Help the GC
                    } else {
                    	 //重hash
                        int h = k.threadLocalHashCode & (newLen - 1);
                        //線性探測法解決衝突
                        while (newTab[h] != null)
                            h = nextIndex(h, newLen);
                        newTab[h] = e;
                        count++;
                    }
                }
            }
			  //從新計算擴容閥值
            setThreshold(newLen);
            //從新設置存儲個數
            size = count;
            //從新設置table
            table = newTab;
        }

resize()實現也比較簡單,先建立比原來大2倍的Entry數組,並遍歷老的table,對全部Entry重hash定位,若是衝突就是採用線性探測法解決衝突。

get()源碼分析

看下get()的源碼

public T get() {
	 //獲取當前線程
    Thread t = Thread.currentThread();
    //根據當前線程獲取ThreadLocalMap
    ThreadLocalMap map = getMap(t);
    //若是不爲空獲取Entry
    if (map != null) {
        ThreadLocalMap.Entry e = map.getEntry(this);
        if (e != null) {
            @SuppressWarnings("unchecked")
            T result = (T)e.value;
            return result;
        }
    }
    //爲空獲取初始化的值
    return setInitialValue();
}

這個方法主要根據當前線程獲取ThreadLocalMap,若是還沒初始化則調用setInitialValue()初始化並返回值,反之調用map.getEntry(this)獲取值。

先來看下map不爲空調用map.getEntry(this)的源碼:

private Entry getEntry(ThreadLocal<?> key) {
	 //1.快速hash獲取hash地址
    int i = key.threadLocalHashCode & (table.length - 1);
    Entry e = table[i];
    //2.若是找到Entry,則返回
    if (e != null && e.get() == key)
        return e;
    //3.若是未快速找到,則去遍歷查找
    else
        return getEntryAfterMiss(key, i, e);
}
  • 1.快速hash獲取hash地址
  • 2.若是找到Entry,則返回
  • 3.若是未快速找到,則調用getEntryAfterMiss(key, i, e)去遍歷查找,因爲用線性探測法解決衝突,

來看下getEntryAfterMiss(key, i, e)的源碼:

private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
    Entry[] tab = table;
    int len = tab.length;
    //從i位置開始遍歷table
    while (e != null) {
        ThreadLocal<?> k = e.get();
        //若是找到直接返回
        if (k == key)
            return e;
        //若是是髒槽清理之
        if (k == null)
            expungeStaleEntry(i);
        //獲取下個地址
        else
            i = nextIndex(i, len);
        e = tab[i];
    }
    return null;
}

實現很簡單就是從i位置開始遍歷table,找到就返回Entry,遍歷過程當中順便清理髒槽。

再來看下setInitialValue()的源碼:

private T setInitialValue() {
	 //1.獲取默認初始化值
    T value = initialValue();
    Thread t = Thread.currentThread();
    //2.根據當前線程獲取ThreadLocalMap
    ThreadLocalMap map = getMap(t);
    //3.不爲空,設置值
    if (map != null)
        map.set(this, value);
    //4.反之初始化map
    else
        createMap(t, value);
    return value;
}
  • 1.獲取默認初始化值,這裏initialValue()是默認返回null的,源碼以下:
protected T initialValue() {
    return null;
}

這個能夠本身實現覆蓋原來的方法。

  • 2.根據當前線程獲取ThreadLocalMap。
  • 3.不爲空,則調用map.set(this, value)設置值。
  • 4.反之則調用createMap(t, value)初始化map。

remove()源碼分析

直接看下remove()源碼

public void remove() {
 //1.根據當前線程獲取ThreadLocalMap
 ThreadLocalMap m = getMap(Thread.currentThread());
 //2.若是map已經存在則調用m.remove(this)刪除值
 if (m != null)
     m.remove(this);
}
  • 1.根據當前線程獲取ThreadLocalMap
  • 2.若是map已經存在則調用m.remove(this)刪除key爲自己的Entry

下面來看下m.remove(this)的源碼:

private void remove(ThreadLocal<?> key) {
    Entry[] tab = table;
    int len = tab.length;
    //快速hash到地址
    int i = key.threadLocalHashCode & (len-1);
    //向後查找
    for (Entry e = tab[i];
         e != null;
         e = tab[i = nextIndex(i, len)]) {
        //若是找到
        if (e.get() == key) {
        	  //清理key
            e.clear();
            //清理髒槽
            expungeStaleEntry(i);
            return;
        }
    }
}

實現很簡單,先快速hash到地址i,而後從這個地址i日後查找key(包括地址i)直到槽爲空,若是找到則清理之。

相關文章
相關標籤/搜索