本文將主要講述 JDK1.8 版本 的 ConcurrentHashMap,其內部結構和不少的哈希優化算法,都是和 JDK1.8 版本的 HashMap是同樣的,因此在閱讀本文以前,必定要先了解 HashMap,能夠參考 HashMap 相關;另外 ConcurrentHashMap 中一樣有紅黑樹,這部分能夠先不看不影響總體結構把握,有興趣的能夠查看 紅黑樹;html
CHM 的源碼有 6k 多行,包含的內容多,精巧,不容易理解;建議在查看源碼的時候,能夠首先把握總體結構脈絡,對於一些精巧的優化,哈希技巧能夠先了解目的就能夠了,不用深究;對總體把握比較清楚後,在逐步分析,能夠比較快速的看懂;java
JDK1.8 版本中的 CHM,和 JDK1.7 版本的差異很是大,在查看資料的時候要注意區分,1.7 中主要是使用 Segment 分段鎖 來解決併發問題的;而在 1.8 中則徹底沒有這些稍顯臃腫的結構,其結構基本和 HashMap 是同樣的,都是 數組 + 鏈表 + 紅黑樹,如圖所示:node
其主要區別就在 CHM 支持併發:算法
以上講的這些不太清楚也沒有關係,主要是有一個印象,大體清楚 CHM 的實現方向,具體細節後面還會結合源碼詳細講解;數組
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable { private static final int MAXIMUM_CAPACITY = 1 << 30; // 最大容量 private static final int DEFAULT_CAPACITY = 16; // 默認初始化容量 private static final int DEFAULT_CONCURRENCY_LEVEL = 16; // 併發級別,爲兼容1.7,實際未用 private static final float LOAD_FACTOR = 0.75f; // 固定負載係數,n - (n >>> 2) static final int TREEIFY_THRESHOLD = 8; // 鏈表超過8時,轉爲紅黑樹 static final int UNTREEIFY_THRESHOLD = 6; // 紅黑樹低於6時,轉爲鏈表 static final int MIN_TREEIFY_CAPACITY = 64; // 樹化最小容量,容量小於64時,先擴容 private static final int MIN_TRANSFER_STRIDE = 16; // 擴容時拆分散列表,最小步長 private static int RESIZE_STAMP_BITS = 16; private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1; // 可參與擴容的最大線程 static final int NCPU = Runtime.getRuntime().availableProcessors(); // CPU 數 transient volatile Node<K,V>[] table; // 散列表 private transient volatile Node<K,V>[] nextTable; // 擴容時的過分表 private transient volatile int sizeCtl; // 最重要的狀態變量,下面詳講 private transient volatile int transferIndex; // 擴容進度指示 private transient volatile long baseCount; // 計數器,基礎基數 private transient volatile int cellsBusy; // 計數器,併發標記 private transient volatile CounterCell[] counterCells; // 計數器,併發累計 public ConcurrentHashMap() { } public ConcurrentHashMap(int initialCapacity) { if (initialCapacity < 0) throw new IllegalArgumentException(); int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1)); // 注意這裏不是0.75,後面介紹 this.sizeCtl = cap; } public ConcurrentHashMap(Map<? extends K, ? extends V> m) { this.sizeCtl = DEFAULT_CAPACITY; putAll(m); } public ConcurrentHashMap(int initialCapacity, float loadFactor) { this(initialCapacity, loadFactor, 1); } public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (initialCapacity < concurrencyLevel) // Use at least as many bins initialCapacity = concurrencyLevel; // as estimated threads long size = (long)(1.0 + (long)initialCapacity / loadFactor); // 注意這裏的初始化 int cap = (size >= (long)MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : tableSizeFor((int)size); this.sizeCtl = cap; } ... }
上面有幾個重要的地方這裏單獨講:緩存
LOAD_FACTOR:多線程
這裏的負載係數,同 HashMap 等其餘 Map 的係數有明顯區別:併發
一般的係數默認 0.75,能夠由構造函數傳入,當節點數 size 超過 loadFactor * capacity 時擴容;ide
而 CMH 的係數則固定 0.75(使用 n - (n >>> 2)
表示),構造函數傳入的係數隻影響初始化容量,見第5個構造函數;函數
上面第二個構造函數中,initialCapacity + (initialCapacity >>> 1) + 1)
,這裏竟然不是使用的默認0.75,能夠看做bug,也可視做優化,見
sizeCtl:
sizeCtl 是 CHM 中最重要的狀態變量,其中包括不少中狀態,這裏先總體介紹幫助後面源碼理解;
sizeCtl = 0 :初始值,還未指定初始容量;
sizeCtl > 0 :
sizeCtl = -1 :表示正在初始化;
sizeCtl < -1 :表示正在擴容,具體結構如圖所示:
計算代碼以下:
/* * n=64 * Integer.numberOfLeadingZeros(n)=26 * resizeStamp(64) = 0001 1010 | 1000 0000 0000 0000 = 1000 0000 0001 1010 */ static final int resizeStamp(int n) { return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1)); }
因此 resizeStamp(64) << RESIZE_STAMP_SHIFT) + 2
,表示擴容目標爲 64,有一個線程正在擴容;
static class Node<K,V> implements Map.Entry<K,V> { // 哈希表普通節點 final int hash; final K key; volatile V val; volatile Node<K,V> next; Node<K,V> find(int h, Object k) {} // 主要在擴容時,利用多態查詢已轉移節點 } static final class ForwardingNode<K,V> extends Node<K,V> { // 標識擴容節點 final Node<K,V>[] nextTable; // 指向成員變量 ConcurrentHashMap.nextTable ForwardingNode(Node<K,V>[] tab) { super(MOVED, null, null, null); // hash = -1,快速肯定 ForwardingNode 節點 this.nextTable = tab; } Node<K,V> find(int h, Object k) {} } static final class TreeBin<K,V> extends Node<K,V> { // 紅黑樹根節點 TreeBin(TreeNode<K,V> b) { super(TREEBIN, null, null, null); // hash = -2,快速肯定紅黑樹, ... } } static final class TreeNode<K,V> extends Node<K,V> { } // 紅黑樹普通節點,其 hash 同 Node 普通節點 > 0;
static final int MOVED = -1; // hash for forwarding nodes static final int TREEBIN = -2; // hash for roots of trees static final int RESERVED = -3; // hash for transient reservations static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash // 讓高位16位,參與哈希桶定位運算的同時,保證 hash 爲正 static final int spread(int h) { return (h ^ (h >>> 16)) & HASH_BITS; }
除此以外還有,
hash % length = hash & (length-1)
;(e.hash & oldCap)
,0 - 位置不變,1 - 原來的位置 + oldCap;以上這些哈希優化的具體原理,都在以前的博客講過了,就不在重複了,HashMap 相關;
咱們都知道一個數組即便聲明爲 volatile
,也只能保證這個數組引用自己的可見性,其內部元素的可見性是沒法保證的,若是每次都加鎖,則效率必然大大下降,在 CHM 中則使用 Unsafe
方法來保證:
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) { return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE); } static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) { return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v); } static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) { U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v); }
private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) { if ((sc = sizeCtl) < 0) Thread.yield(); // 有其餘線程在初始化 else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { // 設置狀態 -1 try { if ((tab = table) == null || tab.length == 0) { int n = (sc > 0) ? sc : DEFAULT_CAPACITY; // 注意此時的 sizeCtl 表示初始容量,完畢後表示擴容閾值 @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt; sc = n - (n >>> 2); // 同 0.75n } } finally { sizeCtl = sc; // 注意這裏沒有 CAS 更新,這就是狀態變量的高明瞭,由於前面設置了 -1,此時這裏沒有競爭 } break; } } return tab; }
get 方法可能看代碼不是很長,可是他卻能 保證無鎖狀態下的內存一致性 ,他的每一句代碼都要仔細理解,多設想一下若是發生競爭會怎樣,如此纔能有所得;
public V get(Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; int h = spread(key.hashCode()); // 計算 hash if ((tab = table) != null && (n = tab.length) > 0 && // 確保 table 已經初始化 // 確保對應的哈希桶不爲空,注意這裏是 Volatile 語義獲取;由於擴容的時候,是徹底拷貝,因此只要不爲空,則鏈表必然完整 (e = tabAt(tab, (n - 1) & h)) != null) { if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } // hash < 0,則必然在擴容,原來位置的節點可能所有移動到 i + oldCap 位置,因此利用多態到 nextTable 中查找 else if (eh < 0) return (p = e.find(h, key)) != null ? p.val : null; while ((e = e.next) != null) { // 遍歷鏈表 if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; }
注意 CHM 的 key 和 value 都不能爲空
final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); int hash = spread(key.hashCode()); // hash 計算 int binCount = 0; // 狀態變量,主要表示查找鏈表節點數,最後判斷是否轉爲紅黑樹 for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0) tab = initTable(); // 初始化 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // cas 獲取哈希桶 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) // cas 更新,失敗時繼續循環更新 break; // no lock when adding to empty bin } else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); // 正在擴容的時候,先幫助擴容 else { V oldVal = null; synchronized (f) { // 注意這裏只鎖定了一個哈希桶,因此比 1.7 中的 Segment 分段鎖 粒度更低 if (tabAt(tab, i) == f) { // 確認該哈希桶是否已經移動 if (fh >= 0) { // hash >=0 則必然是普通節點,直接遍歷鏈表便可 binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; if(e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { // 查找成功 oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; if ((e = e.next) == null) { // 查找失敗時,直接在末尾添加新節點 pred.next = new Node<K,V>(hash, key, value, null); break; } } } else if (f instanceof TreeBin) { // 樹根節點 Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { // 紅黑樹查找 oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); // 若是鏈表長度大於8,轉爲紅黑樹 if (oldVal != null) return oldVal; break; } } } addCount(1L, binCount); // 計數加一,注意這裏使用的是計數器,普通的 Atomic 變量仍然可能稱爲性能瓶頸; return null; }
其具體流程如圖所示:
擴容操做一直都是比較慢的操做,而 CHM 中巧妙的利用任務劃分,使得多個線程可能同時參與擴容;另外擴容條件也有兩個:
其擴容的過程可描述爲:
圖形化表示以下:
源碼分析以下:
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { int n = tab.length, stride; if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // 根據 CPU 數量計算任務步長 if (nextTab == null) { // 初始化 nextTab try { @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; // 擴容一倍 nextTab = nt; } catch (Throwable ex) { sizeCtl = Integer.MAX_VALUE; // 發生 OOM 時,再也不擴容 return; } nextTable = nextTab; transferIndex = n; } int nextn = nextTab.length; ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); // 標記空桶,或已經轉移完畢的桶 boolean advance = true; boolean finishing = false; // to ensure sweep before committing nextTab for (int i = 0, bound = 0;;) { // 逆向遍歷擴容 Node<K,V> f; int fh; while (advance) { // 向前獲取哈希桶 int nextIndex, nextBound; if (--i >= bound || finishing) // 已經取到哈希桶,或已完成時退出 advance = false; else if ((nextIndex = transferIndex) <= 0) { // 遍歷到達頭節點,已經沒有待遷移的桶,線程準備退出 i = -1; advance = false; } else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { // 當前任務完成,領取下一批哈希桶 bound = nextBound; i = nextIndex - 1; // 索引指向下一批哈希桶 advance = false; } } // i < 0 :表示擴容結束,已經沒有待移動的哈希桶 // i >= n :擴容結束,再次檢查確認 // i + n >= nextn : 在使用 nextTable 替換 table 時,有線程進入擴容就會出現 if (i < 0 || i >= n || i + n >= nextn) { // 完成擴容準備退出 int sc; if (finishing) { // 兩次檢查,只有最後一個擴容線程退出時,才更新變量 nextTable = null; table = nextTab; sizeCtl = (n << 1) - (n >>> 1); // 0.75*2*n return; } if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { // 擴容線程減一 if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; // 不是最後一個線程,直接退出 finishing = advance = true; // 最後一個線程,再次檢查 i = n; // recheck before commit } } else if ((f = tabAt(tab, i)) == null) // 當前節點爲空,直接標記爲 ForwardingNode,而後繼續獲取下一個桶 advance = casTabAt(tab, i, null, fwd); // 以前的線程已經完成該桶的移動,直接跳過,正常狀況下本身的任務區間,不會出現 ForwardingNode 節點, else if ((fh = f.hash) == MOVED) // 此處爲極端條件下的健壯性檢查 advance = true; // already processed // 開始處理鏈表 else { // 注意在 get 的時候,能夠無鎖獲取,是由於擴容是全拷貝節點,完成後最後在更新哈希桶 // 而在 put 的時候,是直接將節點加入尾部,獲取修改其中的值,此時若是容許 put 操做,最後就會發生髒讀, // 因此 put 和 transfer,須要競爭同一把鎖,也就是對應的哈希桶,以保證內存一致性效果 synchronized (f) { if (tabAt(tab, i) == f) { // 確認鎖定的是同一個桶 Node<K,V> ln, hn; if (fh >= 0) { // 正常節點 int runBit = fh & n; // hash & n,判斷擴容後的索引 Node<K,V> lastRun = f; // 此處找到鏈表最後擴容後處於同一位置的連續節點,這樣最後一節就不用再一次複製了 for (Node<K,V> p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } // 依次將鏈表拆分紅,lo、hi 兩條鏈表,即位置不變的鏈表,和位置 + oldCap 的鏈表 // 注意最後一節鏈表沒有new,而是直接使用原來的節點 // 同時鏈表的順序也被打亂了,lastRun 到最後爲正序,前面一節爲逆序 for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } setTabAt(nextTab, i, ln); // 插入 lo 鏈表 setTabAt(nextTab, i + n, hn); // 插入 hi 鏈表 setTabAt(tab, i, fwd); // 哈希桶移動完成,標記爲 ForwardingNode 節點 advance = true; // 繼續獲取下一個桶 } else if (f instanceof TreeBin) { // 拆分成黑樹 TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null, loTail = null; // 爲避免最後在反向遍歷,先留頭結點的引用, TreeNode<K,V> hi = null, hiTail = null; // 由於順序的鏈表,能夠加速紅黑樹構造 int lc = 0, hc = 0; // 一樣記錄 lo,hi 鏈表的長度 for (Node<K,V> e = t.first; e != null; e = e.next) { // 中序遍歷紅黑樹 int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V>(h, e.key, e.val, null, null); // 構造紅黑樹節點 if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } // 判斷是否須要將其轉化爲紅黑樹,同時若是隻有一條鏈,那麼就能夠不用在構造 ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } } } } } }
還有其餘相關方法不是很複雜,就不詳細講了,好比 tryPresize,helpTransfer,addCount
當獲取 Map.size 的時候,若是使用 Atomic 變量,很容易致使過分競爭,產生性能瓶頸,因此 CHM 中使用了,計數器的方式:
public int size() { long n = sumCount(); return ((n < 0L) ? 0 : (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n); }
private transient volatile CounterCell[] counterCells; // 計數器 @sun.misc.Contended static final class CounterCell { // @sun.misc.Contended 避免僞緩存 volatile long value; CounterCell(long x) { value = x; } } final long sumCount() { CounterCell[] as = counterCells; CounterCell a; long sum = baseCount; if (as != null) { for (int i = 0; i < as.length; ++i) { // 累計計數 if ((a = as[i]) != null) sum += a.value; } } return sum; }
具體細節還比較多,以後在單獨開一篇博客詳細講解;