目錄html
本文是基於JDK8分析ConcurrentHashMap的實現,在前一篇博文多線程十一之ConcurrentHashMap1.7源碼分析中分析了JDK7中是如何實現知足併發且線程安全的ConcurrentHashMap:ConcurrentHashMap根據初始的併發度concurrencyLevel構建片斷Segment數組,最多能夠實現有concurrencyLevel個線程進行併發讀寫,相對於HashTable利用synchronized
關鍵字對每一個方法作同步大大提升了效率。可是JDK7中ConcurrentHashMap的實現仍然有不足:Segment數組沒法擴容,併發度受到Segment數組大小限制,只能支持N個線程的併發(N爲Segment數組長度且N不可變)
JDK8中ConcurrentHashMap作了以下改進:併發的最小單位再也不是JDK7中的Segment片斷,而是存儲鍵值對的節點Node,線程操做ConcurrentHashMap時鎖住的是裝載Node的桶,鎖的粒度變得更小了,意味着併發能力進一步加強,另外引入了紅黑樹來解決多個節點哈希衝突形成查詢效率降低的問題。java
JDK8中再也不使用Segment片斷鎖,而是使用CAS+synchronized來保證併發時的線程安全。node
ConcurrentHashMap定義了不少常量,以下:算法
//Node桶數組的最大長度:2^30 private static final int MAXIMUM_CAPACITY = 1 << 30; //Node數組默認長度,必須是2的冪次方 private static final int DEFAULT_CAPACITY = 16; //轉換爲數組的最大長度 static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; //爲了兼容性,Java8中沒用到這個屬性 private static final int DEFAULT_CONCURRENCY_LEVEL = 16; //默認的HashMap加載因子 private static final float LOAD_FACTOR = 0.75f; //鏈表轉紅黑樹閾值,同一個桶中由於hash衝突的鏈表節點超過8個就轉換爲紅黑樹 static final int TREEIFY_THRESHOLD = 8; //紅黑樹轉鏈表閾值 static final int UNTREEIFY_THRESHOLD = 6; //樹化最小Node數組長度 static final int MIN_TREEIFY_CAPACITY = 64; //擴容時單個線程推動最小步長,在擴容那裏詳細解釋用途 private static final int MIN_TRANSFER_STRIDE = 16; //用於對Node數組tab生成一個stamp private static int RESIZE_STAMP_BITS = 16; //2^15-1,用於幫助擴容的最大線程數量 private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1; //sizeCtl中記錄size大小的偏移量 private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS; //特定Node的hash值,程序中根據這些hash值斷定Node類型 static final int MOVED = -1; // hash for forwarding nodes static final int TREEBIN = -2; // hash for roots of trees static final int RESERVED = -3; // hash for transient reservations static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash //運行環境CPU數量,transfer擴容時會用到這個參數 static final int NCPU = Runtime.getRuntime().availableProcessors();
sizeCtl:控制標識符,用來控制table初始化和擴容操做的,當ConcurrentHashMap處於不一樣的狀態階段時sizeCtl
的值也不一樣,表明的含義也不一樣數組
//存放鍵值對Node數組 transient volatile Node<K,V>[] table; //擴容時指向新生成的數組,不擴容爲null private transient volatile Node<K,V>[] nextTable; //用於統計哈希表中鍵值對個數 private transient volatile long baseCount; //控制標識符,當ConcurrentHashMap處於初始化/擴容等不一樣狀態,sizeCtl的值也不一樣,表示的意義也不一樣 private transient volatile int sizeCtl; //擴容時線程把數據從老數組向轉移的起點 private transient volatile int transferIndex;
能夠看到在調用構造方法建立ConcurrentHashMap對象時,只是根據傳入參數計算桶數組初始長度賦值給sizeCtl
,並無初始化table數組,延遲加載。安全
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (initialCapacity < concurrencyLevel) // Use at least as many bins initialCapacity = concurrencyLevel; // as estimated threads long size = (long)(1.0 + (long)initialCapacity / loadFactor); int cap = (size >= (long)MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : tableSizeFor((int)size); //初始化時根據這個值做爲桶數組table的長度 this.sizeCtl = cap; }
用於存儲key-value鍵值對,當哈希衝突映射到同一個桶中會造成鏈表,Node是ConcurrentHashMap中最基礎的數據結構。數據結構
static class Node<K,V> implements Map.Entry<K,V> { final int hash; final K key; volatile V val; volatile Node<K,V> next; Node(int hash, K key, V val, Node<K,V> next) { this.hash = hash; this.key = key; this.val = val; this.next = next; } //get方法會調用find方法,子類也會重寫find方法 Node<K,V> find(int h, Object k) { Node<K,V> e = this; if (k != null) { do { K ek; if (e.hash == h && ((ek = e.key) == k || (ek != null && k.equals(ek)))) return e; } while ((e = e.next) != null); } return null; } }
繼承父類Node,不存儲鍵值對,是一個標記節點,ForwardingNode的hash值固定爲MOVED
(-1),當ConcurrentHashMap處於擴容階段時會把數據從老的table數組賦值到新的擴容後的table數組,每當完成一個桶,就會在老數組的桶的位置放上ForwardingNode表示這個位置已經複製完成,若是要這個這個地方的Node鍵值對,就去擴容後的nextTable中去尋找。多線程
static final class ForwardingNode<K,V> extends Node<K,V> { final Node<K,V>[] nextTable; ForwardingNode(Node<K,V>[] tab) { super(MOVED, null, null, null); this.nextTable = tab; } //尋找key爲k且hash值爲h的鍵值對節點 Node<K,V> find(int h, Object k) { // loop to avoid arbitrarily deep recursion on forwarding nodes outer: for (Node<K,V>[] tab = nextTable;;) { Node<K,V> e; int n; if (k == null || tab == null || (n = tab.length) == 0 || (e = tabAt(tab, (n - 1) & h)) == null) return null; for (;;) { int eh; K ek; if ((eh = e.hash) == h && ((ek = e.key) == k || (ek != null && k.equals(ek)))) return e; if (eh < 0) { if (e instanceof ForwardingNode) { tab = ((ForwardingNode<K,V>)e).nextTable; continue outer; } else return e.find(h, k); } if ((e = e.next) == null) return null; } } } }
當鏈表轉換爲紅黑樹時,table數組存儲的並非紅黑樹的根節點,而是TreeBin節點,用來標識這裏存放的是一個紅黑樹,不存儲鍵值對數據,而是指向紅黑樹的根節點root。併發
static final class TreeBin<K,V> extends Node<K,V> { TreeNode<K,V> root; volatile TreeNode<K,V> first; volatile Thread waiter; volatile int lockState; // values for lockState static final int WRITER = 1; // set while holding write lock static final int WAITER = 2; // set when waiting for write lock static final int READER = 4; // increment value for setting read lock /** * Creates bin with initial set of nodes headed by b. */ TreeBin(TreeNode<K,V> b) { super(TREEBIN, null, null, null); this.first = b; TreeNode<K,V> r = null; for (TreeNode<K,V> x = b, next; x != null; x = next) { next = (TreeNode<K,V>)x.next; x.left = x.right = null; if (r == null) { x.parent = null; x.red = false; r = x; } else { K k = x.key; int h = x.hash; Class<?> kc = null; for (TreeNode<K,V> p = r;;) { int dir, ph; K pk = p.key; if ((ph = p.hash) > h) dir = -1; else if (ph < h) dir = 1; else if ((kc == null && (kc = comparableClassFor(k)) == null) || (dir = compareComparables(kc, k, pk)) == 0) dir = tieBreakOrder(k, pk); TreeNode<K,V> xp = p; if ((p = (dir <= 0) ? p.left : p.right) == null) { x.parent = xp; if (dir <= 0) xp.left = x; else xp.right = x; r = balanceInsertion(r, x); break; } } } } this.root = r; assert checkInvariants(root); } }
用於構建紅黑樹的基礎數據結構。app
static final class TreeNode<K,V> extends Node<K,V> { TreeNode<K,V> parent; // red-black tree links TreeNode<K,V> left; TreeNode<K,V> right; TreeNode<K,V> prev; // needed to unlink next upon deletion boolean red; TreeNode(int hash, K key, V val, Node<K,V> next, TreeNode<K,V> parent) { super(hash, key, val, next); this.parent = parent; } Node<K,V> find(int h, Object k) { return findTreeNode(h, k, null); } final TreeNode<K,V> findTreeNode(int h, Object k, Class<?> kc) { if (k != null) { TreeNode<K,V> p = this; do { int ph, dir; K pk; TreeNode<K,V> q; TreeNode<K,V> pl = p.left, pr = p.right; if ((ph = p.hash) > h) p = pl; else if (ph < h) p = pr; else if ((pk = p.key) == k || (pk != null && k.equals(pk))) return p; else if (pl == null) p = pr; else if (pr == null) p = pl; else if ((kc != null || (kc = comparableClassFor(k)) != null) && (dir = compareComparables(kc, k, pk)) != 0) p = (dir < 0) ? pl : pr; else if ((q = pr.findTreeNode(h, k, kc)) != null) return q; else p = pl; } while (p != null); } return null; } }
public V put(K key, V value) { return putVal(key, value, false); } final V putVal(K key, V value, boolean onlyIfAbsent) { //校驗參數合法性 if (key == null || value == null) throw new NullPointerException(); //計算鍵的hash值 int hash = spread(key.hashCode()); //用來桶中節點個數,判斷是否須要轉換爲紅黑樹 int binCount = 0; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; //若是table桶數組尚未初始化,那麼調用initTable初始化table if (tab == null || (n = tab.length) == 0) tab = initTable(); //key的hash值映射到的桶爲空,也就是沒有存放過鍵值對,直接把鍵值對存放到這個桶裏 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin } //桶裏的節點是ForwardingNode,ConcurrentHashMap處於擴容階段,讓當前線程幫助擴容 else if ((fh = f.hash) == MOVED) //helpTransfer在擴容裏詳細分析 tab = helpTransfer(tab, f); //若是不是以上狀況,說明桶中已經有元素,多是一個鏈表,多是紅黑樹 else { V oldVal = null; //把桶中元素鎖住 synchronized (f) { if (tabAt(tab, i) == f) { //fh是桶中節點hash值,大於零說明不是TreeBin,是鏈表 if (fh >= 0) { binCount = 1; //循環遍歷鏈表 for (Node<K,V> e = f;; ++binCount) { K ek; //鏈表中有鍵值對的鍵與要插入的鍵相同,直接替換value if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; //把鍵值對插入鏈表末尾 if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } //fh小於0,桶中存放的是紅黑樹 else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } //檢查是否須要把鏈表轉爲紅黑樹 if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } //更新ConcurrentHashMap中存放鍵值對個數 addCount(1L, binCount); return null; }
經過構造器的源碼咱們能夠知道構造器新建ConcurrentHashMap對象時只設定了table數組的長度,沒有初始化,經過initTable來實現延遲加載的。
private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) { //sizeCtl < 0 表示有其餘線程在初始化,掛起當前線程 if ((sc = sizeCtl) < 0) Thread.yield(); // lost initialization race; just spin //sizeCtl設爲-1,由當前線程負責桶數組的初始化 else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { if ((tab = table) == null || tab.length == 0) { //根據參數設置桶數組長度 int n = (sc > 0) ? sc : DEFAULT_CAPACITY; //初始化數組 @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt; //sizeCtl爲0.75n,達到0.75n的閾值就開始擴容 sc = n - (n >>> 2); } } finally { sizeCtl = sc; } break; } } return tab; }
get方法比較簡單,根據key
獲取保存的鍵值對的value,主要有如下幾步:
(n - 1) & h)
public V get(Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; //計算key的hash值 int h = spread(key.hashCode()); //判斷桶數組不爲空並找到key對應的桶 if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { //桶中的第一個元素就是要找的鍵 if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } //桶中元素是ForWardingNode或TreeBin else if (eh < 0) return (p = e.find(h, key)) != null ? p.val : null; //繼續在桶中的單鏈表尋找 while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; }
size方法用來統計ConcurrentHashMap中存儲的鍵值對個數
public int size() { //調用sumCount方法 long n = sumCount(); return ((n < 0L) ? 0 : (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n); }
能夠看到sumCount就是統計baseCount
和CountCell
數組的和,爲何要這樣統計呢?在putVal方法的末尾會調用addCount方法更新鍵值對的個數,去看看addCount方法。
final long sumCount() { CounterCell[] as = counterCells; CounterCell a; long sum = baseCount; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) sum += a.value; } } return sum; }
addCount方法分爲兩部分,一部分是更新鍵值對數量,另外一部分是檢查是否須要擴容。更新鍵值對數量先經過CAS更新baseCount,若是CAS更新失敗,說明線程併發競爭激烈,就經過CAS加死循環把要更新的值加到CounterCell數組中,因此鍵值對的總數是baseCount以及遍歷CounterCell的和。
private final void addCount(long x, int check) { CounterCell[] as; long b, s; //CAS更新baseCount if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { CounterCell a; long v; int m; boolean uncontended = true; //CAS更新CountCell if (as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { fullAddCount(x, uncontended); return; } if (check <= 1) return; s = sumCount(); } //檢查是否須要擴容 ...... }
咱們在回頭看一下size方法:由於n是long類型的,而size方法的返回值是int類型,因此會比較n是否超出了int範圍:
return ((n < 0L) ? 0 : (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n);
若是超出了範圍就返回Integer.MAX_VALUE
。爲何鍵值對的數量會超出int範圍呢?雖然ConcurrentHashMap中定義了常量MAXIMUM_CAPACITY
限定了table數組的最大長度(2^30),可是因爲hash衝突的緣由一個桶中可能存儲多個鍵值對,數據量大的情境下是有可能超過int範圍,因此JDK建議使用mappingCount方法,實現與size方法一致返回值爲long類型,不受int範圍限制。
public long mappingCount() { long n = sumCount(); return (n < 0L) ? 0L : n; // ignore transient negative values }
ConcurrentHashMap的擴容就是經過transfer,transfer方法能夠說將線程併發利用到極致。在詳細分析transfer實現以前咱們得知道是什麼方法觸發了ConcurrentHashMap的擴容動做呢?答案就是以前分析了一半的addCount方法,主要有如下幾個步驟:
s>=(long)(sc=sizeCtl)
從上面的分析中能夠看到在擴容的流程中sizeCtl是一個很重要的狀態量,sizeCtl能夠表示擴容是否開始,以及參與擴容線程的個數,下面分析sizeCtl在擴容中的變換:
private final void addCount(long x, int check) { //更新baseCount ...... //檢查是否須要擴容 if (check >= 0) { Node<K,V>[] tab, nt; int n, sc; //鍵值對個數超過閾值sizeCtl while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { //根據當前當前桶數組長度生成擴容戳 int rs = resizeStamp(n); //sc<0,擴容已經開始 if (sc < 0) { //擴容完畢 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; //當前線程假如擴容 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } //當前線程是開始擴容的第一個線程 else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) transfer(tab, null); s = sumCount(); } } }
sizeCtl與擴容在擴容流程中有重要做用,若是線程是第一個開始擴容的線程的話,會把sizeCtl以CAS的方式設置爲(rs << RESIZE_STAMP_SHIFT) + 2)
,這個rs是由resizeStamp方法生成的:
resizeStamp是根據傳入的桶數組長度生成一個擴容戳(resizeStamp),咱們知道桶數組長度,也就是傳入的n是2的冪次方,Integer.numberOfLeadingZeros(n)
返回n的最高非零位前面的0的個數,再與1左移15位的結果異或獲得生成的戳,若是咱們傳入默認桶數組長度16,返回的結果爲32795,計算過程以下:
能夠看到當桶數組長度爲16的ConcurrentHashMap開始擴容時,resizeStamp的返回值是0000 0000 0000 0000 1000 0000 0001 1011
,也就是32795。
當第一個線程開始擴容時,會設置sizeCtl的值爲(rs << RESIZE_STAMP_SHIFT) + 2)
,以下:
此時sizeCtl
的值二進制形式爲1000 0000 0001 1011 0000 0000 0000 0010
,sizeCtl
分爲兩部分,它的高16位由risizeCtl(n)的結果組成,若是有n個線程加入擴容,低16位的值爲1+n
。因爲此時sizeCtl的符號位爲1,因此處於擴容狀態sizeCtl
的值老是小於0的負數。
static final int resizeStamp(int n) { //numberOfLeadingZeros:該方法的做用是返回無符號整型i的最高非零位前面的0的個數,包括符號位在內 //1 << (RESIZE_STAMP_BITS - 1):把1左移(RESIZE_STAMP_BITS - 1)位,也就是15位 return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1)); }
transfer是擴容的核心方法,也能夠說是ConcurrentHashMap中設計最精巧的方法了。傳入兩個參數,tab
指向擴容前的數組,nextTab
指向數據要轉移過去的新數組,第一個開始擴容的線程調用transfer方法傳入的nextTab
參數爲空,後續擴容線程傳入的是成員變量nextTable
。
transfer方法開始把鍵值對從擴容前的數組轉移到新數組,這個方法並非同步的,不是說開始擴容的線程負責把轉移全部的數據,併發大師Doug Lea每一個參與擴容的線程負責轉移老數組的一部分數據,轉移完了以後若是擴容尚未結束再取一段數據,轉移數據的過程是併發的,最多能夠有MAX_RESIZERS
(2^16)個線程同時參與,大體過程以下:
從上圖能夠看出,線程轉移數據是從後往前開始的,當轉移過程發現index<0
說明擴容結束。
具體ConcurrentHashMap是怎麼遷移數據,下面以桶中的鏈表爲例說明。以下圖所示8號桶中有6個鏈表節點,擴容線程會把這些節點分爲兩部分:節點hash值與數組長度作與運算結果爲0的節點放到低位桶,結果爲1的放到低位桶,而後把低位桶放到新數組索引爲8的桶中,把高位桶放到索引爲24(i+n)的桶中。
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { int n = tab.length, stride; //肯定每一個線程一次擴容負責處理多少個桶 if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range //擴容的新桶數組nextTable若爲空,構建數組 if (nextTab == null) { // initiating try { //新數組長度是如今數組長度兩倍,N=2n Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return; } //更新成員變量 nextTable = nextTab; //初始化transferIndex,從舊數組的末尾開始 transferIndex = n; } int nextn = nextTab.length; //當線程完成一個桶的數據轉移,就在舊數組桶的位置放上ForwardingNode標記節點,指向轉移後的新節點 //用來標記這個桶數據已經遷移完成 ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); // 首次推動爲 true,若是等於 true,說明須要再次推動一個下標(i--),反之,若是是 false, //那麼就不能推動下標,須要將當前的下標處理完畢才能繼續推動 boolean advance = true; //擴容是否完成標誌位 boolean finishing = false; // to ensure sweep before committing nextTab //在這個死循環內線程負責轉移期間內桶數組數據 for (int i = 0, bound = 0;;) { Node<K,V> f; int fh; //advance爲true,線程向前推動,使得i--,i就是要處理的桶的索引,bound表示區間的下界 //transferIndex是區間的上界,i<bound說明線程處理完這個區間數據 while (advance) { int nextIndex, nextBound; //i必然大於bound if (--i >= bound || finishing) advance = false; //transferIndex<=0 擴容完畢 else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } // CAS 修改 transferIndex,即 length - 區間值,留下剩餘的區間值供後面的線程使用 else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } } //檢查擴容是否完成或當前線程是否完成任務 //i<0 擴容完成 //i >= tab.length或i + tab.length >= nextTable.length 都不可能發生 if (i < 0 || i >= n || i + n >= nextn) { int sc; //finishing爲true,說明擴容已經完成 if (finishing) { //把nextTable置爲空 nextTable = null; //table指向擴容後的桶數組 table = nextTab; //設置下次擴容閾值sizeCtl,這裏的n是擴容前舊數組的長度 //sizeCtl=2n-0.5n=1.5n 新數組長度N=2n。加載因子:1.5n/2n=0.75 //從這裏能夠看出即便構造方法傳入的加載因子不是0.75也不影響 sizeCtl = (n << 1) - (n >>> 1); return; } //finishng不爲true,嘗試將 sc -1. 表示這個線程結束幫助擴容了,將 sc 的低 16 位減一 if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { //若是 sc - 2 不等於標識符左移 16 位。若是他們相等了,說明沒有線程在幫助他們擴容了。 //也就是說,擴容結束了。不相等說明沒有結束,可是當前線程的擴容任務完成了 if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; //相等,說明擴容已經完成 finishing = advance = true; i = n; // recheck before commit } } //正式開始遷移數據到新的桶數組 //若是i對應的桶爲null,直接把標誌節點放到桶中 else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd); //根據hash值判斷i對應的桶中節點爲ForwardingNode,說明這個桶已經被處理了 else if ((fh = f.hash) == MOVED) advance = true; // already processed //桶中節點多是鏈表節點Node,也多是紅黑樹節點TreeBin else { //把桶中節點鎖住,防止插入或刪除 synchronized (f) { if (tabAt(tab, i) == f) { //ln:低位桶 hn:高位桶 Node<K,V> ln, hn; //桶中節點hash值大於0,是鏈表類型節點Node if (fh >= 0) { //runBit由節點hash值與舊數組的長度作與運算,因爲數組長度是2的冪次方, //因此runBit要麼爲1,要麼爲0 int runBit = fh & n; //從鏈表尾部到laseRun節點,runBit值相同 Node<K,V> lastRun = f; //循環遍歷找lastRun節點 for (Node<K,V> p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } //runBit爲0,從lastRun到鏈表尾節點都放到ln低位桶 if (runBit == 0) { ln = lastRun; hn = null; } //不然放到高位桶 else { hn = lastRun; ln = null; } //從鏈表頭部遍歷到lastRun節點,節點的runBit爲0放到地位桶,爲1放到高位桶 for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) //頭插法 ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } //插入到新桶數組中 setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); //在舊數組的桶中放入標記節點 setTabAt(tab, i, fwd); advance = true; } //桶中節點爲紅黑樹,處理邏輯與上面類似 else if (f instanceof TreeBin) { TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null, loTail = null; TreeNode<K,V> hi = null, hiTail = null; int lc = 0, hc = 0; for (Node<K,V> e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } } } } } }
除了addCount
會在檢查是否須要擴容的時候觸發transfer
方法,當putVal
方法添加的鍵值對映射到對應的桶中節點類型爲ForwardingNode
時,也會觸發擴容,也就是以下代碼:
會調用helpTransfer
方法幫助擴容,下面重點分析程序中的這一段判斷擴容是否完成的if條件:
sc >>>RESIZE_STAMP_SHIFT)!=rs
sizeCtl
時知道:當第一個開始給ConcurrentHashMap擴容時,會設置sizeCtl
的值爲rs << RESIZE_STAMP_SHIFT) + 2
,從transfer
方法能夠知道,當擴容結束時會設置sizeCtl = (n << 1) - (n >>> 1)
從新變爲正數。因此當sc >>>RESIZE_STAMP_SHIFT)!=rs
說明此時sizeCtl
的值爲(n << 1) - (n >>> 1)
,擴容已經結束了。final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) { Node<K,V>[] nextTab; int sc; //檢查擴容是否結束 if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) { int rs = resizeStamp(tab.length); while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) { //擴容完畢 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0) break; //當前線程加入擴容 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { transfer(tab, nextTab); break; } } return nextTab; } return table; }
寫到這裏關於JDK8中的ConcurrentHashMap已經學習完畢,花了差很少5天的時間。ConcurrentHashMap做爲多線程環境下的HashMap,向咱們展現瞭如何在多線程環境下保持線程安全、提升併發效率的技巧:即減少鎖的粒度、利用CAS無鎖算法,尤爲是其中的transfer
方法,是值得咱們再三學習和挖掘的。