ConcurrentHashMap與 HashMap 同樣,都是鍵值對數據存儲結構,不過 HashMap 是非線程安全,而ConcurrentHashMap是線程安全的,提及線程安全又不得不與HashTable對比一下,HashTable相比ConcurrentHashMap效率低,接下來看看ConcurrentHashMap究竟是如何保證線程安全且比HashTable優秀(JDK1.8)算法
仍是從put()開始數組
public V put(K key, V value) { return putVal(key, value, false); }
put()會再調一個putValue(),由於要傳入 onlyIfAbsent 參數贊成更新元素,與HashMap不同的是 key的散列算法不是在put()完成,而是在putValue()完成安全
/** * key:儲存到HashMap的key * value:儲存到HashMap的key對應的value * onlyIfAbsent:若是包含了該key,則不更新對應的值,衆所周知,put()除了新增以外,還有更新的功能,是由於put()調用putVal()時候傳的都是false */ final V putVal(K key, V value, boolean onlyIfAbsent) { //ConcurrentHashMap的鍵值都不容許爲null if (key == null || value == null) throw new NullPointerException(); //計算key的hash值 int hash = spread(key.hashCode()); // 記錄當前儲存的位置是否須要轉行成紅黑樹結構 int binCount = 0; // 循環ConcurrentHashMap的Node數組,找出對於存儲的位置 for (ConcurrentHashMap.Node<K,V>[] tab = table;;) { // 定義了幾個變量: // f:存放在數組相同下標位置的第一個元素 // n:當前ConcurrentHashMap的Node數組的長度 // i:當前存儲元素須要存放的數組下標 // fh : 存放在數組相同下標位置的第一個元素的hash值 ConcurrentHashMap.Node<K,V> f; int n, i, fh; // 判斷ConcurrentHashMap的Node數組長度,若是等於null或者長度等於0的狀況,則進行擴容操做 if (tab == null || (n = tab.length) == 0) tab = initTable(); // 判斷當前元素須要存儲的數組位置是否爲null,若是是null的狀況下則嘗試存放數據 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // 這裏使用了CAS操做(樂觀鎖),嘗試存放元素,若是元素存放成功則直接結束當前put(),不然繼續找位置 if (casTabAt(tab, i, null, new ConcurrentHashMap.Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin } // 這裏判斷當前ConcurrentHashMap是否正在進行擴容操做 // 若是是在進行擴容操做,則當前線程也幫忙擴容,而且存放元素 else if ((fh = f.hash) == MOVED) // 幫忙進行擴容操做,helpTransfer() 這個函數是幫助正在擴容的線程 // 一塊兒進行擴容操做而且存放元素 tab = helpTransfer(tab, f); else { // 定義了一個指向舊值的變量,當本次put()操做是更新操做時,則使用該變量指向舊值 // 而且更新完成以後返回舊值 V oldVal = null; // 上面也說過 f 是每一個相同元素下標位置的首個元素, // 這裏對 f 進行了加鎖,這樣能鎖住全部即將存在 i 位置元素的put()操做 // 而且不影響其餘下標位置的元素插入,相比HashTable的方法鎖大大提升了性能 synchronized (f) { // 若是 f 沒又改變,則繼續操做,不然重複第一個循環 if (tabAt(tab, i) == f) { // 判斷當前節點是否鏈表節點 if (fh >= 0) { // 記錄當前儲存的位置是否須要轉行成紅黑樹結構 binCount = 1; // 遍歷鏈表找出存放的位置 for (ConcurrentHashMap.Node<K,V> e = f;; ++binCount) { K ek; // 若是遍歷到的節點 key一致,則進行更新操做 而且oldVal變量存儲舊值,用於後續返回 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } // 若是一致沒有相同的key,則在鏈表最後添加新節點 ConcurrentHashMap.Node<K,V> pred = e; if ((e = e.next) == null) { pred.next = new ConcurrentHashMap.Node<K,V>(hash, key, value, null); break; } } } // 若是當前數組下標節點等於 TreeBin 的時候,則進行按紅黑樹規則進行添加或修改操做 else if (f instanceof ConcurrentHashMap.TreeBin) { ConcurrentHashMap.Node<K,V> p; binCount = 2; if ((p = ((ConcurrentHashMap.TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } // 判斷當前鏈表的長度是否須要轉換成紅黑樹結構 // static final int TREEIFY_THRESHOLD = 8; // 當鏈表長度大於等於 8 的時候,則進行轉換成紅黑樹操做 if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } // 記錄當前ConcurrentHashMap長度而且判斷是否須要進行擴容操做 addCount(1L, binCount); return null; }
put()操做使用了synchronized 與CAS樂觀鎖,當存儲在數組相同位置的元素爲null時候,會使用CAS操做進行嘗試存儲,若是存儲成功則直接走最後的記錄長度和判斷是否須要擴容流程。數組相同位置的元素不爲null,則先使用 synchronized 鎖住該對象,這樣就鎖住了該數組下標的整個數據結構(鏈表或者紅黑樹的管理者TreeBin)再進行與HashMap大體相同的鏈表或者紅黑樹存儲規則數據結構
下面隨便畫了一個 此處 synchronized 鎖的圖多線程
接下來看看初始化容量函數 initTable()併發
private final ConcurrentHashMap.Node<K,V>[] initTable() { ConcurrentHashMap.Node<K,V>[] tab; int sc; // 判斷當前ConcurrentHashMap數組長度是否須要擴容 while ((tab = table) == null || tab.length == 0) { // 判斷是否有其餘線程在進行初始化容量操做 // 若是 sizeCtl < 0 則說明有其餘線程在進行初始化容量操做 if ((sc = sizeCtl) < 0) // 若是有其餘線程在進行初始化容量,則下降當前線程的優先級 Thread.yield(); // lost initialization race; just spin // 若是沒有其餘線程進行初始化容量則使用CAS操做嘗試獲取到初始化容量的資格 else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { // 再判斷一次是否須要初始化容量 // 當首個線程先取得初始化容量資格時候,很大可能已經完成了 // 其餘減低了優先級的線程即便再獲取到容量初始化資格都會被該判斷阻擋 if ((tab = table) == null || tab.length == 0) { // 獲取默認初始化容量 int n = (sc > 0) ? sc : DEFAULT_CAPACITY; // 初始化ConcurrentHashMap 數組 @SuppressWarnings("unchecked") ConcurrentHashMap.Node<K,V>[] nt = (ConcurrentHashMap.Node<K,V>[])new ConcurrentHashMap.Node<?,?>[n]; table = tab = nt; sc = n - (n >>> 2); } } finally { sizeCtl = sc; } break; } } // 返回初始化的數組 return tab; }
initTable()主要也仍是使用到了CAS操做,使只有一個線程常常初始化,而且table使用了volatile修飾線程共享資源,初始化一旦完成,其餘線程立馬知道了該table已經初始化,在高併發多線程狀況下,有些被下降了優先級的線程後面也會經過CAS獲取到初始化資格,可是下面還有一個判斷table的長度,就無需再進行擴容操做了,因此ConcurrentHashMap的容量初始化是由雙重校驗+CAS+volatile保證了其線程安全。HashMap裏面擴容與初始化容量都是同一個函數,而ConcurrentHashMap則是分開了2個函數,初始化容量一個,擴容一個(transfer())dom
transient volatile Node<K,V>[] table;
下面看看擴容函數 transfer()與協助擴容函數 helpTransfer()ide
/** * tab:當前的table * nextTab:擴容後的table */ private final void transfer(ConcurrentHashMap.Node<K,V>[] tab, ConcurrentHashMap.Node<K,V>[] nextTab) { int n = tab.length, stride; // 獲取CPU每一個核的處理量,默認16 // private static final int MIN_TRANSFER_STRIDE = 16; if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range // 判斷擴容後的 table 是否爲null // 若是爲null則在此處進行建立新 table if (nextTab == null) { // initiating try { @SuppressWarnings("unchecked") ConcurrentHashMap.Node<K,V>[] nt = (ConcurrentHashMap.Node<K,V>[])new ConcurrentHashMap.Node<?,?>[n << 1]; nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return; } // 記錄新的table nextTable = nextTab; // 記錄舊的table長度 transferIndex = n; } // 記錄新 table的長度 int nextn = nextTab.length; // 建立一個 ForwardingNode 而且說明當前的table正在進行擴容操做 ConcurrentHashMap.ForwardingNode<K,V> fwd = new ConcurrentHashMap.ForwardingNode<K,V>(nextTab); // 記錄當前 f 當前數組下標位置的某個元素數據是否須要計算新table下的新下標 boolean advance = true; // 記錄是否完成數據掃描 boolean finishing = false; // to ensure sweep before committing nextTab for (int i = 0, bound = 0;;) { ConcurrentHashMap.Node<K,V> f; int fh; // 遍歷舊table中的節點,計算新table中的下標 while (advance) { int nextIndex, nextBound; if (--i >= bound || finishing) advance = false; else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } } // 當該舊table所有節點轉移到新的table上時,結束擴容 if (i < 0 || i >= n || i + n >= nextn) { int sc; if (finishing) { // talbe 指向新table nextTable = null; table = nextTab; //擴容閾值設置爲原來容量的1.5倍 sizeCtl = (n << 1) - (n >>> 1); return; } // 使用CAS操做更新這個擴容閾值 if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; finishing = advance = true; i = n; // recheck before commit } } // 判斷存放節點的數組下標位置是否爲null // 若是爲null則嘗試存放 else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd); // 若是不爲null則判斷改位置是否已經轉移完成 // 若是是則再也不轉移當前下標位置的元素 else if ((fh = f.hash) == MOVED) advance = true; // already processed else { // 鎖住數組的下標的第一個元素 synchronized (f) { // 判斷 f 的值有沒有被修改 // 若是 f 沒又改變,則繼續操做,不然重複第一個循環 if (tabAt(tab, i) == f) { ConcurrentHashMap.Node<K,V> ln, hn; // 判斷是否鏈表結構 // 若是是,則根據鏈表結構的存儲規則存儲元素 if (fh >= 0) { int runBit = fh & n; ConcurrentHashMap.Node<K,V> lastRun = f; for (ConcurrentHashMap.Node<K,V> p = f.next; p != null; p = p.next) { // 把節點存放新table的原位或者移動 n 位存放 // 其實就是計算該節點是存放在當前下標位置仍是 i+n的下標位置 int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } for (ConcurrentHashMap.Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new ConcurrentHashMap.Node<K,V>(ph, pk, pv, ln); else hn = new ConcurrentHashMap.Node<K,V>(ph, pk, pv, hn); } // 經過上面的計算把ln鏈表放在本來的下標位置 // 把hn鏈表放在 本來下標+n的位置 setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); // 在舊table存放 ForwardingNode 表明該數組下標位置的數據已經完成轉移 setTabAt(tab, i, fwd); advance = true; } // 判斷是否紅黑樹結構 // 若是是,則根據紅黑樹結構的存儲規則存儲元素 else if (f instanceof ConcurrentHashMap.TreeBin) { ConcurrentHashMap.TreeBin<K,V> t = (ConcurrentHashMap.TreeBin<K,V>)f; ConcurrentHashMap.TreeNode<K,V> lo = null, loTail = null; ConcurrentHashMap.TreeNode<K,V> hi = null, hiTail = null; int lc = 0, hc = 0; for (ConcurrentHashMap.Node<K,V> e = t.first; e != null; e = e.next) { int h = e.hash; ConcurrentHashMap.TreeNode<K,V> p = new ConcurrentHashMap.TreeNode<K,V> (h, e.key, e.val, null, null); // 與上面鏈表的操做大體相同 // 計算當前元素存放的位置 if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } // 若是經過計算分開重組後的紅黑樹長度小於等於6則轉換成鏈表 // static final int UNTREEIFY_THRESHOLD = 6; ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new ConcurrentHashMap.TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new ConcurrentHashMap.TreeBin<K,V>(hi) : t; // 把計算後存放在原來位置的 ln繼續存放在原來的位置 // hn存放在 i+n 的位置 setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); // 在舊table存放 ForwardingNode 表明該數組下標位置的數據已經完成轉移 setTabAt(tab, i, fwd); advance = true; } } } } } }
final ConcurrentHashMap.Node<K,V>[] helpTransfer(ConcurrentHashMap.Node<K,V>[] tab, ConcurrentHashMap.Node<K,V> f) { ConcurrentHashMap.Node<K,V>[] nextTab; int sc; // 判斷當前 table是否在進行擴容操做 if (tab != null && (f instanceof ConcurrentHashMap.ForwardingNode) && (nextTab = ((ConcurrentHashMap.ForwardingNode<K,V>)f).nextTable) != null) { // 獲取擴容後的table大小 int rs = resizeStamp(tab.length); // 再次判斷擴容操做是否還在進行,而且是同一次擴容操做 while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0) break; // 經過CAS擴容操做嘗試獲取擴容操做資格 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { transfer(tab, nextTab); break; } } return nextTab; } return table; }
每次擴容是容量原來的2倍,而且舊table原元素只會存放在新table的 i 或者 i+n 的位置而不是從新計算hash新的 i ,這樣避免了多線程擴容狀況下同時操做 i 引發的各類麻煩,而且put()操做的時候有可能會幫忙一塊兒擴容,分擔了單線程操做的壓力,當舊的 i 位置的節點所有轉移完成使用 ForwardingNode 標記當前位置因此節點已經轉移完成函數
會引發擴容操做的2個函數addCount()與treeifyBin()高併發
private final void addCount(long x, int check) { CounterCell[] as; long b, s; // 使用CAS操做更新baseCount if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { CounterCell a; long v; int m; boolean uncontended = true; // 多線程修改baseCount時,沒修改爲功的線程會執行fullAddCount(),把x的值添加到counterCell類中 if (as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { fullAddCount(x, uncontended); return; } if (check <= 1) return; s = sumCount(); } // 當check >= 0 的時候證實須要擴容 if (check >= 0) { Node<K,V>[] tab, nt; int n, sc; while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { int rs = resizeStamp(n); if (sc < 0) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) // 其餘線程在初始化,break; break; // 其餘線程正在擴容,協助擴容 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } // 只有前線程在擴容 else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) transfer(tab, null); s = sumCount(); } } }
private final void treeifyBin(Node<K,V>[] tab, int index) { Node<K,V> b; int n, sc; if (tab != null) { //若是table的長度<64 就擴大一倍 // 就是說當前 table的長度>64 且鏈表節點數量 >= 8時候纔會轉換成紅黑樹 if ((n = tab.length) < MIN_TREEIFY_CAPACITY) tryPresize(n << 1); else if ((b = tabAt(tab, index)) != null && b.hash >= 0) { // 鎖住當前數組位置,進行紅黑樹轉換 synchronized (b) { if (tabAt(tab, index) == b) { TreeNode<K,V> hd = null, tl = null; for (Node<K,V> e = b; e != null; e = e.next) { TreeNode<K,V> p = new TreeNode<K,V>(e.hash, e.key, e.val, null, null); if ((p.prev = tl) == null) hd = p; else tl.next = p; tl = p; } //在原來下標的位置 用TreeBin替換掉原來的Node對象 setTabAt(tab, index, new TreeBin<K,V>(hd)); } } } } }
addCount()的主要做用是增長當前ConcurrentHashMap的元素數量而且判斷是否須要擴容,當容量已經達到了擴容閾值則進行擴容操做
treeifyBin()則是用於進行樹結構轉換和判斷是否須要擴容,當容量 < 64 時則進行擴容而非轉換成樹結構,只有容量>64且鏈表長度>=8時纔會進行樹結構轉換
與HashMap不一樣的是多了2個內部類:ForwardingNode 與 TreeBin
ForwardingNode:只有擴容操做的時候纔會使用到,標記數組的 i 位置是否已經完成
TreeBin:HashMap是直接使用TreeNode,而ConcurrentHashMap則是經過TreeBin來管理TreeNode來使用
ConcurrentHashMap經過CAS和巧妙地運用synchronized 提供了一個安全和高效並且很是精彩解決方案
ConcurrentHashMap的一些簡單學習記錄到此結束,世界真的很大