ConcurrentHashMap 是併發中的重中之重,也是最經常使用的數據結果,以前的文章中,咱們介紹了 putVal 方法。併發編程之 ConcurrentHashMap(JDK 1.8) putVal 源碼分析。其中分析了 initTable 方法和 putVal 方法,但也留下了一句話:java
這篇文章僅僅是 ConcurrentHashMap 的開頭,關於 ConcurrentHashMap 裏面的精華太多,值得咱們好好學習。node
說道精華,他的擴容方法絕對是精華,要知道,ConcurrentHashMap 擴容是高度併發的。算法
今天來逐行分析源碼。編程
首先說結論。源碼加註釋我會放在後面。該方法的執行邏輯以下:數組
經過計算 CPU 核心數和 Map 數組的長度獲得每一個線程(CPU)要幫助處理多少個桶,而且這裏每一個線程處理都是平均的。默認每一個線程處理 16 個桶。所以,若是長度是 16 的時候,擴容的時候只會有一個線程擴容。多線程
初始化臨時變量 nextTable。將其在原有基礎上擴容兩倍。併發
死循環開始轉移。多線程併發轉移就是在這個死循環中,根據一個 finishing 變量來判斷,該變量爲 true 表示擴容結束,不然繼續擴容。ide
3.1 進入一個 while 循環,分配數組中一個桶的區間給線程,默認是 16. 從大到小進行分配。當拿到分配值後,進行 i-- 遞減。這個 i 就是數組下標。(其中有一個 bound 參數,這個參數指的是該線程這次能夠處理的區間的最小下標,超過這個下標,就須要從新領取區間或者結束擴容,還有一個 advance 參數,該參數指的是是否繼續遞減轉移下一個桶,若是爲 true,表示能夠繼續向後推動,反之,說明尚未處理好當前桶,不能推動
) 3.2 出 while 循環,進 if 判斷,判斷擴容是否結束,若是擴容結束,清空臨死變量,更新 table 變量,更新庫容閾值。若是沒完成,但已經沒法領取區間(沒了),該線程退出該方法,並將 sizeCtl 減一,表示擴容的線程少一個了。若是減完這個數之後,sizeCtl 迴歸了初始狀態,表示沒有線程再擴容了,該方法全部的線程擴容結束了。(這裏主要是判斷擴容任務是否結束,若是結束了就讓線程退出該方法,並更新相關變量
)。而後檢查全部的桶,防止遺漏。 3.3 若是沒有完成任務,且 i 對應的槽位是空,嘗試 CAS 插入佔位符,讓 putVal 方法的線程感知。 3.4 若是 i 對應的槽位不是空,且有了佔位符,那麼該線程跳過這個槽位,處理下一個槽位。 3.5 若是以上都是否是,說明這個槽位有一個實際的值。開始同步處理這個桶。 3.6 到這裏,都尚未對桶內數據進行轉移,只是計算了下標和處理區間,而後一些完成狀態判斷。同時,若是對應下標內沒有數據或已經被佔位了,就跳過了。源碼分析
處理每一個桶的行爲都是同步的。防止 putVal 的時候向鏈表插入數據。 4.1 若是這個桶是鏈表,那麼就將這個鏈表根據 length 取於拆成兩份,取於結果是 0 的放在新表的低位,取於結果是 1 放在新表的高位。 4.2 若是這個桶是紅黑數,那麼也拆成 2 份,方式和鏈表的方式同樣,而後,判斷拆分過的樹的節點數量,若是數量小於等於 6,改形成鏈表。反之,繼續使用紅黑樹結構。 4.3 到這裏,就完成了一個桶從舊錶轉移到新表的過程。性能
好,以上,就是 transfer 方法的整體邏輯。仍是挺複雜的。再進行精簡,分紅 3 步驟:
大致就是上的的 3 個步驟。
再來看看源碼和註釋。
源碼加註釋:
/** * Moves and/or copies the nodes in each bin to new table. See * above for explanation. * * transferIndex 表示轉移時的下標,初始爲擴容前的 length。 * * 咱們假設長度是 32 */
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
// 將 length / 8 而後除以 CPU核心數。若是獲得的結果小於 16,那麼就使用 16。
// 這裏的目的是讓每一個 CPU 處理的桶同樣多,避免出現轉移任務不均勻的現象,若是桶較少的話,默認一個 CPU(一個線程)處理 16 個桶
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range 細分範圍 stridea:TODO
// 新的 table 還沒有初始化
if (nextTab == null) { // initiating
try {
// 擴容 2 倍
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
// 更新
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
// 擴容失敗, sizeCtl 使用 int 最大值。
sizeCtl = Integer.MAX_VALUE;
return;// 結束
}
// 更新成員變量
nextTable = nextTab;
// 更新轉移下標,就是 老的 tab 的 length
transferIndex = n;
}
// 新 tab 的 length
int nextn = nextTab.length;
// 建立一個 fwd 節點,用於佔位。當別的線程發現這個槽位中是 fwd 類型的節點,則跳過這個節點。
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
// 首次推動爲 true,若是等於 true,說明須要再次推動一個下標(i--),反之,若是是 false,那麼就不能推動下標,須要將當前的下標處理完畢才能繼續推動
boolean advance = true;
// 完成狀態,若是是 true,就結束此方法。
boolean finishing = false; // to ensure sweep before committing nextTab
// 死循環,i 表示下標,bound 表示當前線程能夠處理的當前桶區間最小下標
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
// 若是當前線程能夠向後推動;這個循環就是控制 i 遞減。同時,每一個線程都會進入這裏取得本身須要轉移的桶的區間
while (advance) {
int nextIndex, nextBound;
// 對 i 減一,判斷是否大於等於 bound (正常狀況下,若是大於 bound 不成立,說明該線程上次領取的任務已經完成了。那麼,須要在下面繼續領取任務)
// 若是對 i 減一大於等於 bound(還須要繼續作任務),或者完成了,修改推動狀態爲 false,不能推動了。任務成功後修改推動狀態爲 true。
// 一般,第一次進入循環,i-- 這個判斷會沒法經過,從而走下面的 nextIndex 賦值操做(獲取最新的轉移下標)。其他狀況都是:若是能夠推動,將 i 減一,而後修改爲不可推動。若是 i 對應的桶處理成功了,改爲能夠推動。
if (--i >= bound || finishing)
advance = false;// 這裏設置 false,是爲了防止在沒有成功處理一個桶的狀況下卻進行了推動
// 這裏的目的是:1. 當一個線程進入時,會選取最新的轉移下標。2. 當一個線程處理完本身的區間時,若是還有剩餘區間的沒有別的線程處理。再次獲取區間。
else if ((nextIndex = transferIndex) <= 0) {
// 若是小於等於0,說明沒有區間了 ,i 改爲 -1,推動狀態變成 false,再也不推動,表示,擴容結束了,當前線程能夠退出了
// 這個 -1 會在下面的 if 塊裏判斷,從而進入完成狀態判斷
i = -1;
advance = false;// 這裏設置 false,是爲了防止在沒有成功處理一個桶的狀況下卻進行了推動
}// CAS 修改 transferIndex,即 length - 區間值,留下剩餘的區間值供後面的線程使用
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;// 這個值就是當前線程能夠處理的最小當前區間最小下標
i = nextIndex - 1; // 初次對i 賦值,這個就是當前線程能夠處理的當前區間的最大下標
advance = false; // 這裏設置 false,是爲了防止在沒有成功處理一個桶的狀況下卻進行了推動,這樣對致使漏掉某個桶。下面的 if (tabAt(tab, i) == f) 判斷會出現這樣的狀況。
}
}// 若是 i 小於0 (不在 tab 下標內,按照上面的判斷,領取最後一段區間的線程擴容結束)
// 若是 i >= tab.length(不知道爲何這麼判斷)
// 若是 i + tab.length >= nextTable.length (不知道爲何這麼判斷)
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) { // 若是完成了擴容
nextTable = null;// 刪除成員變量
table = nextTab;// 更新 table
sizeCtl = (n << 1) - (n >>> 1); // 更新閾值
return;// 結束方法。
}// 若是沒完成
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {// 嘗試將 sc -1. 表示這個線程結束幫助擴容了,將 sc 的低 16 位減一。
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)// 若是 sc - 2 不等於標識符左移 16 位。若是他們相等了,說明沒有線程在幫助他們擴容了。也就是說,擴容結束了。
return;// 不相等,說明沒結束,當前線程結束方法。
finishing = advance = true;// 若是相等,擴容結束了,更新 finising 變量
i = n; // 再次循環檢查一下整張表
}
}
else if ((f = tabAt(tab, i)) == null) // 獲取老 tab i 下標位置的變量,若是是 null,就使用 fwd 佔位。
advance = casTabAt(tab, i, null, fwd);// 若是成功寫入 fwd 佔位,再次推動一個下標
else if ((fh = f.hash) == MOVED)// 若是不是 null 且 hash 值是 MOVED。
advance = true; // already processed // 說明別的線程已經處理過了,再次推動一個下標
else {// 到這裏,說明這個位置有實際值了,且不是佔位符。對這個節點上鎖。爲何上鎖,防止 putVal 的時候向鏈表插入數據
synchronized (f) {
// 判斷 i 下標處的桶節點是否和 f 相同
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;// low, height 高位桶,低位桶
// 若是 f 的 hash 值大於 0 。TreeBin 的 hash 是 -2
if (fh >= 0) {
// 對老長度進行與運算(第一個操做數的的第n位於第二個操做數的第n位若是都是1,那麼結果的第n爲也爲1,不然爲0)
// 因爲 Map 的長度都是 2 的次方(000001000 這類的數字),那麼取於 length 只有 2 種結果,一種是 0,一種是1
// 若是是結果是0 ,Doug Lea 將其放在低位,反之放在高位,目的是將鏈表從新 hash,放到對應的位置上,讓新的取於算法可以擊中他。
int runBit = fh & n;
Node<K,V> lastRun = f; // 尾節點,且和頭節點的 hash 值取於不相等
// 遍歷這個桶
for (Node<K,V> p = f.next; p != null; p = p.next) {
// 取於桶中每一個節點的 hash 值
int b = p.hash & n;
// 若是節點的 hash 值和首節點的 hash 值取於結果不一樣
if (b != runBit) {
runBit = b; // 更新 runBit,用於下面判斷 lastRun 該賦值給 ln 仍是 hn。
lastRun = p; // 這個 lastRun 保證後面的節點與本身的取於值相同,避免後面沒有必要的循環
}
}
if (runBit == 0) {// 若是最後更新的 runBit 是 0 ,設置低位節點
ln = lastRun;
hn = null;
}
else {
hn = lastRun; // 若是最後更新的 runBit 是 1, 設置高位節點
ln = null;
}// 再次循環,生成兩個鏈表,lastRun 做爲中止條件,這樣就是避免無謂的循環(lastRun 後面都是相同的取於結果)
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
// 若是與運算結果是 0,那麼就還在低位
if ((ph & n) == 0) // 若是是0 ,那麼建立低位節點
ln = new Node<K,V>(ph, pk, pv, ln);
else // 1 則建立高位
hn = new Node<K,V>(ph, pk, pv, hn);
}
// 其實這裏相似 hashMap
// 設置低位鏈表放在新鏈表的 i
setTabAt(nextTab, i, ln);
// 設置高位鏈表,在原有長度上加 n
setTabAt(nextTab, i + n, hn);
// 將舊的鏈表設置成佔位符
setTabAt(tab, i, fwd);
// 繼續向後推動
advance = true;
}// 若是是紅黑樹
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
// 遍歷
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
// 和鏈表相同的判斷,與運算 == 0 的放在低位
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
} // 不是 0 的放在高位
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
// 若是樹的節點數小於等於 6,那麼轉成鏈表,反之,建立一個新的樹
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
// 低位樹
setTabAt(nextTab, i, ln);
// 高位數
setTabAt(nextTab, i + n, hn);
// 舊的設置成佔位符
setTabAt(tab, i, fwd);
// 繼續向後推動
advance = true;
}
}
}
}
}
}
複製代碼
代碼加註釋比較長,有興趣能夠逐行對照,有 2 個判斷樓主看不懂爲何這麼判斷,知道的同窗能夠提醒一下。
而後,說說精華的部分。
假設總長度是 64 ,每一個線程能夠分到 16 個桶,各自處理,不會互相影響。
擴容前的狀態。
當對 4 號桶或者 10 號桶進行轉移的時候,會將鏈表拆成兩份,規則是根據節點的 hash 值取於 length,若是結果是 0,放在低位,不然放在高位。
所以,10 號桶的數據,黑色節點會放在新表的 10 號位置,白色節點會放在新桶的 26 號位置。
下圖是循環處理桶中數據的邏輯:
處理完以後,新桶的數據是這樣的:
transfer 方法能夠說很牛逼,很精華,內部多線程擴容性能很高,
經過給每一個線程分配桶區間,避免線程間的爭用,經過爲每一個桶節點加鎖,避免 putVal 方法致使數據不一致。同時,在擴容的時候,也會將鏈表拆成兩份,這點和 HashMap 的 resize 方法相似。
而若是有新的線程想 put 數據時,也會幫助其擴容。鬼斧神工,使人讚歎。