上篇文章詳細介紹了HashMap的源碼及原理,本文趁熱打鐵繼續分析ConcurrentHashMap的原理。html
首先在看本文以前,但願對HashMap有一個詳細的瞭解。否則看直接看ConcurrentHashMap的源碼仍是有些費勁的。node
相信對HashMap,HashTable有必定了解,應該知道HashMap是不具有線程安全性的,在resize時會丟數據(JDK8),而HashTable雖然保證了線程安全性,可是其是經過給每一個方法加Synchronized關鍵字達到的同步目的。可是都知道Synchronized在競爭激烈的多線程併發環境中,在性能上的表現是很是不如人意的。那在高併發環境中HashMap如何保證線程安全而又不浪費太多性能呢?答案就是Java J.U.C併發包中的ConcurrentHashMap。數組
依然開局一張圖。JDK8中的ConcurrentHashMap數據結構。安全
呃呵,和HashMap的結構是同樣的,沒錯在數據結構層面,ConcurrentHashMap和HashMap是徹底同樣的。有了這個基礎繼續往下看。數據結構
ConcurrentHashMap的歷史版本大體分界線在JDK8。也就是能夠分爲JDK8和JDK8之前版本。多線程
數據結構的區別併發
在JDK8以前HashMap沒有引入紅黑樹,一樣的ConcurrentHashMap也沒有引入紅黑樹。並且ConcurrentHashMap採用的是分段數組的底層數據結構。ide
在JDK7中的數據結構。函數
從上圖咱們不難看出其在數據結構方面的差異。高併發
鎖的區別
JDK7中爲了提升併發性能採用了這種分段的設計。因此在JDK7中ConcurrentHashMap採用的是分段鎖,也就是在每一個Segment上加ReentrantLock實現的線程安全線。關於ReetrantLock後面有時間會介紹,大體來講ReetrantLoack是比Synchronized更細粒度的一種鎖。使用得當的話其性能要比Synchronized表現要好,可是若是實現不得當容易形成死鎖。
這種基於Segment和ReetrantLock的設計相對HashTable來講大大提升了併發性能。也就是說多個線程能夠併發的操做多個Segment,而HashTable是經過給每一個方法加Synchronized即將多線程串行而實現的。因此在必定程度上提升了併發性能。可是這種性能的提高表現相對JDK8來講顯得不值一提。
若是說JDK7 ConcurrentHashMap相對HashTable來講是串行到多個線程併發的改進。而JDK8則是經過比Segment更細粒度的併發控制大大提升了其並發表現。
JDK8中ConcurrentHashMap採用的是CAS+Synchronized鎖而且鎖粒度是每個桶。簡單來講JDK7中鎖的粒度是Segment,JDK8鎖粒度細化到了桶級別。可想而知鎖粒度是大大提到了。輔之以代碼的優化,JDK8中的ConcurrentHashMap在性能上的表現很是優秀。
簡單總結一下,從HashTable到JDK7 ConcurrentHashMap再到JDK8 ConcurrentHashMap。是從同步到併發再到高併發的進步。
//正在擴容,對應fwd類型的節點的hash static final int MOVED = -1; // hash for forwarding nodes //當前數組 transient volatile Node<K,V>[] table; //擴容時用到的,擴容後的數組。 private transient volatile Node<K,V>[] nextTable; //1,大於零,表示size * 0.75。 //2,等於-1,表示正在初始化。 //3,-(n + 1),表示正在執行擴容的線程其只表示基數,而不是真正的數量,須要計算得出的哦 private transient volatile int sizeCtl;
1 @SuppressWarnings("unchecked") //transient volatile Node<K,V>[] table; tab變量確實是volatile 2 static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {//獲取table中索引 i 處的元素。 3 return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);//若是tab是volatile變量,則該方法保證其可見性。 4 } 5 6 static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,//經過CAS設置table索引爲 i 處的元素。 7 Node<K,V> c, Node<K,V> v) { 8 return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v); 9 } 10 //transient volatile Node<K,V>[] table; tab變量確實是volatile 11 static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {//修改table 索引 i 處的元素。 12 U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);//若是tab是volatile變量,則該方法保證其可見性。 13 }
咱們不難看出 以上三個方法都是調用的Unsafe(U)類中的方法,Unsafe類中定義了大量對內存的操做方法,是native的,不建議開發者直接使用。
tabAt和setTabAt最終調用的兩個方法分別是 U.getObjectVolatile()和U.putObjectVolatile 顧名思義其是經過volatile保證的tab的可見性(Volatile只保證可見性不保證原子性哦)。前提是tab變量是Volatile修飾的變量。咱們經過調用棧,最紅能夠看到其實tab就是ConcurrentHashMap中的table。而這個變量是這麼定義的。
transient volatile Node<K,V>[] table;
可見其確實是Volatile修飾的變量。
再看
casTabAt方法,這個就是CAS方法了。
CAS:Compare and Swap三個單詞的縮寫,即:比較交換的意思。CAS在Java中又稱之爲樂觀鎖即咱們總認爲是沒有鎖的。
while(true){ CAS(); }
通常的經過上述用法達到自旋的目的。CAS通常經過自旋達到自旋鎖的目的,即認爲沒有鎖,失敗重試,這種思路。更多內容請自行百度。CAS很重要哦。
1 public V put(K key, V value) { 2 return putVal(key, value, false); 3 } 4 5 /** Implementation for put and putIfAbsent */ 6 final V putVal(K key, V value, boolean onlyIfAbsent) { 7 if (key == null || value == null) throw new NullPointerException(); 8 int hash = spread(key.hashCode());//hash,對hashcode再散列 9 int binCount = 0; 10 for (Node<K,V>[] tab = table;;) {//迭代桶數組,自旋 11 Node<K,V> f; int n, i, fh; 12 if (tab == null || (n = tab.length) == 0)//懶加載。若是爲空,則進行初始化 13 tab = initTable();//初始化桶數組 14 //(n - 1) & hash)計算下標,取值,爲空即無hash碰撞 15 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { 16 if (casTabAt(tab, i, null, 17 new Node<K,V>(hash, key, value, null)))//經過cas插入新值 18 break; // no lock when adding to empty bin 19 } 20 //判斷是否正在擴容。若是正在擴容,當前線程幫助進行擴容。 21 //每一個線程只能同時負責一個桶上的數據遷移,而且不影響其它桶的put和get操做。 22 //(很牛逼的思路,能這麼作創建在更細粒度的鎖基礎上) 23 else if ((fh = f.hash) == MOVED) 24 tab = helpTransfer(tab, f); 25 else {//put5,存在hash碰撞 26 V oldVal = null; 27 //此處,f在上面已經被賦值,f爲當前下標桶的首元素。對鏈表來講是鏈表頭對紅黑樹來講是紅黑樹的頭元素。 28 synchronized (f) { 29 //再次檢查當前節點是否有變化,有變化進入下一輪自旋 30 //爲何再次檢查?由於不能保證,當前線程到這裏,有沒有其餘線程對該節點進行修改 31 if (tabAt(tab, i) == f) { 32 if (fh >= 0) {//當前桶爲鏈表 33 binCount = 1; 34 for (Node<K,V> e = f;; ++binCount) {//迭代鏈表節點 35 K ek; 36 if (e.hash == hash &&//key相同,覆蓋(onlyIfAbsent有什麼用?) 37 ((ek = e.key) == key || 38 (ek != null && key.equals(ek)))) { 39 oldVal = e.val; 40 if (!onlyIfAbsent) 41 e.val = value; 42 break; 43 } 44 Node<K,V> pred = e; 45 //找到鏈表尾部,插入新節點。(什麼這裏不用CAS?由於這在同步代碼塊裏面) 46 if ((e = e.next) == null) { 47 pred.next = new Node<K,V>(hash, key, 48 value, null); 49 break; 50 } 51 } 52 } 53 else if (f instanceof TreeBin) {//當前桶爲紅黑樹 54 Node<K,V> p; 55 binCount = 2; 56 if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, 57 value)) != null) {//想紅黑樹插入新節點 58 oldVal = p.val; 59 if (!onlyIfAbsent) 60 p.val = value; 61 } 62 } 63 } 64 } 65 if (binCount != 0) { 66 //樹化。binCount > 8,進行樹化,鏈表轉紅黑樹 67 if (binCount >= TREEIFY_THRESHOLD) 68 //若是容量 < 64則直接進行擴容;不轉紅黑樹。 69 //(你想一想,假如容量爲16,你就插入了9個元素,巧了,都在同一個桶裏面, 70 //若是這時進行樹化,時間複雜度會增長,性能降低,不如直接進行擴容,空間換時間) 71 treeifyBin(tab, i); 72 if (oldVal != null) 73 return oldVal; 74 break; 75 } 76 } 77 } 78 addCount(1L, binCount);//擴容。addCount內部會進行判斷要不要擴容 79 return null; 80 }
總結以上過程
1,懶加載,未初始化則初始化table 2,hash,hashcode再散列,並計算下標 3,無碰撞,經過CAS插入 4,有碰撞 4.1、若是正在擴容,協助其它線程去擴容 4.2、若是是鏈表,插入鏈表 4.3、若是是紅黑樹,插入紅黑樹 4.4、若是鏈表長度超過8,樹化 4.5,若是key已經存在,覆蓋舊值 5,須要擴容,則擴容
相比HashMap過程多了一個協助擴容。
以上源碼須要注意的是
1 for (Node<K,V>[] tab = table;;) {//迭代桶數組,自旋 2 3 }
這是一個自旋的過程,若是CAS修改失敗會進入下一輪自旋。好久之前看這段源碼的時候,我老是在想,CAS失敗了不就丟數據了嗎?因此這個自旋,也稱之爲自旋鎖會保證數據必定能插入成功。
說說上面鎖競爭的狀況,以上過程咱們不難發現對table的修改都是經過CAS操做實現的。好比下面這行代碼,若是已經有線程正在操做 i 位置的元素,則意味着本輪自旋將會失敗,繼續自旋,當其餘線程修改完成,本線程再次運行到tabAt覺得是Volatile操做,其餘線程的修改對本線程當即可見(詳見Volatile關鍵字內存語義的內容)。本線程經過tabAt發現該處已經存在元素,即發生碰撞,繼續往下運行。
1 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { 2 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))//經過cas插入新值 3 break; // no lock when adding to empty bin 4 }
線程的調度須要操做系統從用戶態轉爲內核態,這是很是重量級的操做。CAS+自旋組成的自旋鎖保證了線程不會進入阻塞態。
而後繼續往下看
synchronized (f) { //再次檢查當前節點是否有變化,有變化進入下一輪自旋 //爲何再次檢查?由於不能保證,當前線程運行到這裏,有沒有其餘線程對該節點進行修改 if (tabAt(tab, i) == f) {
先看這行代碼 synchronized (f) 這個f是一個桶的頭元素。也就是說在JDK8中synchronized鎖僅僅只鎖鏈表頭或者紅黑樹的頭(其實就是鎖一個桶,由於要訪問鏈表或者紅黑樹總要從頭開始訪問吧)
再看 if (tabAt(tab, i) == f) {} 其實就是雙重檢測(參考單例的雙重檢測),爲何要再檢查一遍呢?由於不能保證當前線程運行到這裏,有沒有其餘線程已經對該節點進行了修改。
initTable()
1 private final Node<K,V>[] initTable() { 2 Node<K,V>[] tab; int sc; 3 while ((tab = table) == null || tab.length == 0) { 4 // 賦值sc。並當sizeCtl == -1 即當前有線程正在執行初始化 5 if ((sc = sizeCtl) < 0) 6 //yield()暫停當前正在執行的線程,執行其餘線程 7 //(這是一個通知,可是這是不必定會讓當前線程中止,要取決於線程調度器) 8 //就是我想讓出資源,可是這只是一廂情願的事情,線程調度器會考慮你的方法,可是不必定採納。 9 Thread.yield(); 10 //修改 sizeCtl 的值爲 -1。 SIZECTL 爲 sizeCtl 的內存地址。 11 else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { 12 try { 13 //執行初始化過程 14 if ((tab = table) == null || tab.length == 0) { 15 //sc在上面已經賦值,=原來 sizeCtl的值。是非討厭JDK源碼這種賦值方式。 16 int n = (sc > 0) ? sc : DEFAULT_CAPACITY; 17 @SuppressWarnings("unchecked") 18 //建立一個sc長度的table。 19 Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; 20 table = tab = nt; 21 sc = n - (n >>> 2); 22 } 23 } finally { 24 //初始化完成, sizeCtl從新賦值爲當前數組的長度。 25 sizeCtl = sc; 26 } 27 break; 28 } 29 } 30 return tab; 31 }
以上過程,一樣是經過CAS實現的初始化控制,保證只有一個線程去執行初始化。
helpTransfer(tab, f);方法咱們後面介紹完擴容再說。
看完以上put過程,咱們能發現,JDK8經過CAS+自旋鎖將鎖的粒度控制在每個桶上,相對於JDK7中Segment鎖,鎖粒度提升了不少。而且CAS+自旋鎖保證了不會出現線程的切花這種重量級的操做。
1 //tab舊桶數組,nextTab新桶數組 2 private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { 3 int n = tab.length, stride; 4 //控制併發數,控制CPU的資源 5 if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) 6 stride = MIN_TRANSFER_STRIDE; // subdivide range 7 if (nextTab == null) { // initiating//新數組爲空,則初始化新數組 8 try { 9 @SuppressWarnings("unchecked") 10 //擴容爲原來的兩倍 n << 1 11 Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; 12 nextTab = nt; 13 } catch (Throwable ex) { // try to cope with OOME 14 sizeCtl = Integer.MAX_VALUE; 15 return; 16 } 17 nextTable = nextTab; 18 transferIndex = n; 19 } 20 int nextn = nextTab.length; 21 //在這裏面進行new Node將node.hash置爲-1。表示該桶正在進行移動。 22 //(這裏很重要的一點是,只鎖表頭,因此只須要將鏈表(或者紅黑樹)頭結點.hash置爲-1便可) 23 ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); 24 //advance是控制是否繼續進行移動的條件,當advance == false,表示正在移動一個桶。 25 //true表示能夠繼續進行下一個桶的移動 26 boolean advance = true; 27 boolean finishing = false; // to ensure sweep before committing nextTab 28 for (int i = 0, bound = 0;;) {//自旋 29 Node<K,V> f; int fh; 30 while (advance) {//start 31 int nextIndex, nextBound; 32 //當前桶是否是已經移動完了 33 if (--i >= bound || finishing) 34 advance = false; 35 //兩個中止移動的條件。移動完了。(這個是真正中止的條件。下面那個條件會進行一次檢查) 36 else if ((nextIndex = transferIndex) <= 0) { 37 i = -1; 38 advance = false; 39 } 40 else if (U.compareAndSwapInt 41 (this, TRANSFERINDEX, nextIndex, 42 nextBound = (nextIndex > stride ? 43 nextIndex - stride : 0))) { 44 bound = nextBound; 45 i = nextIndex - 1; 46 advance = false; 47 } 48 } 49 if (i < 0 || i >= n || i + n >= nextn) { 50 int sc; 51 if (finishing) {//結束擴容 52 nextTable = null; 53 table = nextTab; 54 sizeCtl = (n << 1) - (n >>> 1); 55 return; 56 } 57 if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { 58 if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) 59 return; 60 finishing = advance = true; 61 i = n; // recheck before commit 再次檢查一遍,防止有桶中還有數據沒移動。 62 } 63 }//end 從start到end可看可不看就是條件控制,包括結束條件的控制,移動進度的控制等。 64 //該桶沒數據 65 else if ((f = tabAt(tab, i)) == null) 66 //將oldtab中的該桶設置爲fwd節點,hash=-1 67 advance = casTabAt(tab, i, null, fwd); 68 //已經移動過的桶其hash=-1 69 else if ((fh = f.hash) == MOVED) 70 advance = true; // already processed 71 else { 72 synchronized (f) {//上鎖 73 if (tabAt(tab, i) == f) { 74 //ln新鏈表,不須要移動的節點從新組組織成的鏈表。 75 //hn新鏈表,須要移動的節點從新組織成的鏈表 76 Node<K,V> ln, hn; 77 if (fh >= 0) {//鏈表 78 int runBit = fh & n; 79 Node<K,V> lastRun = f; 80 //start 81 //從start,到end之間。不看也行。實在費腦子。其實這段代碼寫的有點讓人費解 82 //主要是不認真看不知道做者的意圖。本意是這樣的。判斷是否是能夠從某個節點n開始 83 //後面的節點是否是都是和節點n同樣,移動的目標桶同樣的。 84 //若是是同樣的,則後面的這些節點就不用移動了,只須要移動n節點便可。 85 //(注意鏈表的引用,next指針就把後面的都帶過去了) 86 //想一個極端狀況,若是在這裏迭代後發現,全部節點,擴容後數據移動的目標桶都是同樣的。 87 //則只須要移動頭結點便可。不用從新拼接鏈表了。 88 for (Node<K,V> p = f.next; p != null; p = p.next) { 89 int b = p.hash & n; 90 if (b != runBit) { 91 runBit = b; 92 lastRun = p; 93 } 94 } 95 if (runBit == 0) {// runBit== 0 表示該節點不須要移動 96 ln = lastRun; 97 hn = null; 98 } 99 else { 100 hn = lastRun; 101 ln = null; 102 }//end 103 for (Node<K,V> p = f; p != lastRun; p = p.next) { 104 int ph = p.hash; K pk = p.key; V pv = p.val; 105 if ((ph & n) == 0) 106 ln = new Node<K,V>(ph, pk, pv, ln); 107 else 108 hn = new Node<K,V>(ph, pk, pv, hn); 109 } 110 setTabAt(nextTab, i, ln); 111 setTabAt(nextTab, i + n, hn); 112 setTabAt(tab, i, fwd); 113 advance = true; 114 } 115 else if (f instanceof TreeBin) {//紅黑樹 116 TreeBin<K,V> t = (TreeBin<K,V>)f; 117 TreeNode<K,V> lo = null, loTail = null; 118 TreeNode<K,V> hi = null, hiTail = null; 119 int lc = 0, hc = 0; 120 for (Node<K,V> e = t.first; e != null; e = e.next) { 121 int h = e.hash; 122 TreeNode<K,V> p = new TreeNode<K,V> 123 (h, e.key, e.val, null, null); 124 if ((h & n) == 0) { 125 if ((p.prev = loTail) == null) 126 lo = p; 127 else 128 loTail.next = p; 129 loTail = p; 130 ++lc; 131 } 132 else { 133 if ((p.prev = hiTail) == null) 134 hi = p; 135 else 136 hiTail.next = p; 137 hiTail = p; 138 ++hc; 139 } 140 } 141 ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : 142 (hc != 0) ? new TreeBin<K,V>(lo) : t; 143 hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : 144 (lc != 0) ? new TreeBin<K,V>(hi) : t; 145 setTabAt(nextTab, i, ln); 146 setTabAt(nextTab, i + n, hn); 147 setTabAt(tab, i, fwd); 148 advance = true; 149 } 150 } 151 } 152 } 153 } 154 }
ForwardingNode
1 static final class ForwardingNode<K,V> extends Node<K,V> { 2 final Node<K,V>[] nextTable; 3 ForwardingNode(Node<K,V>[] tab) { 4 super(MOVED, null, null, null); 5 this.nextTable = tab; 6 } 7 }
看一下這個內部類,其實呢其就是一個起到標識做用的節點,該節點看上面代碼可知,該節點最主要的特色就是hash=MOVED=-1。hash=-1的節點在ConcurrentHashMap中表示該桶是被擴容過程遷移過的桶。而後當前線程判斷若是該桶已經被遷移。不管put仍是get都去新的數組中操做。還有一點很重要,還能夠經過ForwardingNode中 nextTable獲取到新的數組。
1 //該桶沒數據 2 else if ((f = tabAt(tab, i)) == null) 3 //將oldtab中的該桶設置爲fwd節點,hash=-1 4 advance = casTabAt(tab, i, null, fwd);
看上面代碼,先判斷該桶還有沒有數據。沒數據不用遷移,等同於已經遷移完了。其餘線程put會直接put到新的數組中。
1 //已經移動過的桶其hash=-1; 2 else if ((fh = f.hash) == MOVED) 3 advance = true; // already processed
若是該桶已經移動則跳過。
到此咱們能看出什麼?主要是已經移動完的設置成fwd節點,其它線程看到該桶已經移動,則會到新的table中操做。若是未移動,還直接操做當前table,由於就算put,待會處理到該桶,同樣移動到新桶,也沒啥影響。若是是正在移動的接下來會看到加了Synchronized鎖,保證只有一個線程能操做當前桶。簡直不要太妙。
畫重點,擴容過程
1 synchronized (f) {//上鎖 2 if (tabAt(tab, i) == f) { 3 //ln新鏈表,不須要移動的節點從新組組織成的鏈表。 4 //hn新鏈表,須要移動的節點從新組織成的鏈表 5 Node<K,V> ln, hn; 6 if (fh >= 0) {//鏈表 7 int runBit = fh & n; 8 Node<K,V> lastRun = f; 9 //start 10 //從start,到end之間。不看也行。實在費腦子。其實這段代碼寫的有點讓人費解 11 //主要是不認真看不知道做者的意圖。本意是這樣的。判斷是否是能夠從某個節點n開始 12 //後面的節點是否是都是和節點n同樣,移動的目標桶同樣的。 13 //若是是同樣的,則後面的這些節點就不用移動了,只須要移動n節點便可。 14 //(注意鏈表的引用,next指針就把後面的都帶過去了) 15 //想一個極端狀況,若是在這裏迭代後發現,全部節點,擴容後數據移動的目標桶都是同樣的。 16 //則只須要移動頭結點便可。不用從新拼接鏈表了。 17 for (Node<K,V> p = f.next; p != null; p = p.next) { 18 int b = p.hash & n; 19 if (b != runBit) { 20 runBit = b; 21 lastRun = p; 22 } 23 } 24 if (runBit == 0) {// runBit== 0 表示該節點不須要移動 25 ln = lastRun; 26 hn = null; 27 } 28 else { 29 hn = lastRun; 30 ln = null; 31 }//end 32 for (Node<K,V> p = f; p != lastRun; p = p.next) { 33 int ph = p.hash; K pk = p.key; V pv = p.val; 34 if ((ph & n) == 0) 35 ln = new Node<K,V>(ph, pk, pv, ln); 36 else 37 hn = new Node<K,V>(ph, pk, pv, hn); 38 } 39 setTabAt(nextTab, i, ln); 40 setTabAt(nextTab, i + n, hn); 41 setTabAt(tab, i, fwd); 42 advance = true; 43 } 44 else if (f instanceof TreeBin) {//紅黑樹 45 //紅黑樹跳過 46 } 47 } 48 }
5.2.一、併發控制
首先擴容過程是在synchronized同步代碼塊中的。而且只鎖了一個表頭。可看到沒有鎖新數組nextTab的桶。想一想,oldTab(tab變量)和nextTab都是多個線程共享的變量,爲何只有隻鎖了oldTab正在操做的桶?若是有多個線程向nextTab同時遷移數據怎麼辦?會不會存在線程安全性問題?
TIPS: 統一術語 tab = oldTab = table(舊數組) newTab = nextTab(擴容後新數組) oldIndex即在oldTab中的索引位 newIndex即在newTab中的位置
在上一篇文章中介紹HashMap的時候詳細介紹了HashMap擴容中,oldTab舊桶遷移向newTab只有兩個目標桶。再簡單回顧一遍。
上面這張圖形象的展現了舊桶在擴容後的兩個去向:1,索引位原地不動,2,索引位爲oldCap+oldIndex。(關於爲何是這兩個去向,在HashMap擴容中已經詳細介紹了)
若是你還沒懂個人疑問,請參考下面這個圖。
前提,ConcurrentHashMap是併發擴容,能夠有多個線程同時擴容,其次若是如上圖紅線那樣,oldTab中有多個桶中的數據遷移到newTab中的同一個桶中,若是出現這種狀況就意味着存在線程安全性問題。
從上圖5-1中,兩個數據遷移的方向可知,擴容前,oldIndex不一樣就表示不在一個桶,擴容後的兩個去向若是oldIndex不同,也必定不在同一個桶。因此不會出現5-2圖中紅線的那種狀況,也就說明在擴容過程當中不須要鎖newTab。佩服+2
5.2.二、數據遷移
//ln新鏈表,不須要移動的節點從新組組織成的鏈表。 //hn新鏈表,須要移動的節點從新組織成的鏈表 Node<K,V> ln, hn;
int runBit = fh & n;
看兩個變量,上面說過擴容後,舊桶中的數據只有兩個遷移的方向。ln正是數據遷移後索引位依然是oldIndex的數據的鏈表,hn是遷移後須要遷移到oldCap + oldIndex索引位的鏈表。
關注一下runBit變量,若是 runBit == 0 成立則說明遷移後桶的索引位依然是oldIndex。詳見HashMap擴容分析。
重點關注一下start到end之間的代碼
關於這段代碼,首選咱們假設一種極端狀況,若是當前正在移動的桶中的數據在rehash以後,數據遷移的目標桶除了第一個節點的目標桶是oldIndex以外,後面的數據的目標桶都是oldIndex + oldCap。咱們還須要處理後面的節點嗎?不須要,由於只須要將第二個節點移動到newTab的oldIndex + oldCap位置便可。第二個元素也就是lastRun變量。相對於HashMap徹底的將數據組織成兩個鏈表,這也算得上是一個性能上的優化吧。
接着往下看
代碼段1:
1 for (Node<K,V> p = f.next; p != null; p = p.next) { 2 int b = p.hash & n; 3 if (b != runBit) {//相同跳過 4 runBit = b; 5 lastRun = p; 6 } 7 }
以上代碼經過對鏈表的一次掃描決定了lastRun。
代碼段2:
1 if (runBit == 0) {// runBit== 0 表示該節點不須要移動 2 ln = lastRun; 3 hn = null; 4 } 5 else { 6 hn = lastRun; 7 ln = null; 8 }//end
根據lastRun指向的節點的runBit決定後續節點在擴容後是oldIndex + oldCap仍是oldIndex。
代碼段3:
1 for (Node<K,V> p = f; p != lastRun; p = p.next) { 2 int ph = p.hash; K pk = p.key; V pv = p.val; 3 if ((ph & n) == 0) 4 ln = new Node<K,V>(ph, pk, pv, ln); 5 else 6 hn = new Node<K,V>(ph, pk, pv, hn); 7 }
上述代碼會從新組織兩個新鏈表。注意這個迭代到lastRun位置結束,由於以上過程已經肯定了lastRun的歸屬。
看一下 ln = new Node<K,V>(ph, pk, pv, ln); 從新組織鏈表的代碼,也就是ln會成爲新new出來的node的下一個節點。
這樣有什麼問題?問題就是節點在舊桶中的相對順序在新桶中將相反。也就是next的指針翻轉一下。能夠看一下node的構造函數就明瞭了。
演示擴容過程
假設當前擴容前oldCap即oldTab的長度爲2,擴容後newCap即newTab的長度爲4。以下圖看擴容過程,橘色的表明遷移後索引位依然是oldIndex,綠色表明擴容後索引位爲oldIndex + oldCap。
上述代碼段1迭代找到了lastRun即指向node(11),代碼段2將lastRun賦值給hn。代碼段3執行過程以下
1,將node(1)拼接到ln 2,將node(3)拼接到hn,此時注意,hn已經lastRun指向的節點node(11),此時hn=3—>11—>15—>19—>null 3,處理node(5)拼接到ln 4,處理...
對比JDK7 HashMap,JDK8 HashMap,JDK8 ConcurrentHashMap在擴容後對節點相對順序的保證方面,JDK7 HashMap是徹底倒序。JDK8 HashMap不改變相對順序。JDK8 ConcurrentHashMap 保證部分節點的相對順序,其他的倒序。
題外話,從代碼風格和死路上,猜想一下ConcurrentHashMap應該是來自JDK7的HashMap。
1 setTabAt(nextTab, i, ln); 2 setTabAt(nextTab, i + n, hn); 3 setTabAt(tab, i, fwd); 4 advance = true;
ln和hn兩個鏈表各回各家各找各媽。
回過頭來再看put方法中的幫助擴容
1 else if ((fh = f.hash) == MOVED) 2 tab = helpTransfer(tab, f);
在put方法中有這樣一行判斷,當f.hash = MOVED即當前HashMap正在擴容中,則當前線程會去嘗試幫助擴容。
1 final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) { 2 Node<K,V>[] nextTab; int sc; 3 if (tab != null && (f instanceof ForwardingNode) &&//條件判斷 4 (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {//從fwd節點中取出新table 5 int rs = resizeStamp(tab.length); 6 while (nextTab == nextTable && table == tab && 7 (sc = sizeCtl) < 0) { 8 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || 9 sc == rs + MAX_RESIZERS || transferIndex <= 0) 10 break; 11 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {//修改sizeCtl = sizeCtl + 1,表示多了一個線程參與擴容 12 transfer(tab, nextTab); 13 break; 14 } 15 } 16 return nextTab; 17 } 18 return table; 19 }
在helpTransfer方法中會首先作一系列判斷,經過fwd節點獲取到nextTab即新的數組。經過CAS 實現sizeCtl++操做,表示多了一個線程進行擴容,由於在擴容方法中對擴容線程數量有控制。
最後的最後,擴容的時機
說一下觸發擴容的操做,總的來講就是put操做,可是有兩個時機很重要,其一就是addCount方法中,每次put一個元素,在addCount方法中都會判斷需不須要進行擴容。另外就是treeifyBin方法中,若是桶中數據超過了8個而且數組長度<64則不會進行樹化,而是進行擴容。關於這個在HashMap源碼介紹中也有介紹。你想一想,假如容量爲16,你就插入了9個元素,巧了,都在同一個桶裏面,若是這時進行樹化,樹化自己就是一個耗時的過程。時間複雜度會增長,性能降低,不如直接進行擴容,空間換時間。
終於擴容過程寫完了。很經典,想讀懂也很費勁。
1 public V get(Object key) { 2 Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; 3 int h = spread(key.hashCode());//hash 4 if ((tab = table) != null && (n = tab.length) > 0 && 5 (e = tabAt(tab, (n - 1) & h)) != null) {//取桶 6 if ((eh = e.hash) == h) {//key相同直接返回 7 if ((ek = e.key) == key || (ek != null && key.equals(ek))) 8 return e.val; 9 } 10 else if (eh < 0)//hash < 0 表示正在擴容 11 //在這裏須要很是注意的一點,擴容後的桶會放入fwd節點 12 //該節點hash = MOVED,fwd.nextTable爲擴容後新的數組。 13 return (p = e.find(h, key)) != null ? p.val : null; 14 while ((e = e.next) != null) {//迭代鏈表 15 if (e.hash == h && 16 ((ek = e.key) == key || (ek != null && key.equals(ek)))) 17 return e.val; 18 } 19 } 20 return null; 21 }
get源碼只關注下面這行
return (p = e.find(h, key)) != null ? p.val : null;
當該桶已經被移動,則經過e.find方法去nextTab新數組查找。首先在5章節resize擴容方法中,已經擴容的桶會被塞進去一個ForwardingNode節點 setTabAt(tab, i, fwd); 繼續看resize方法中ForwardingNode的初始化會發現是這樣初始化的 ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); ,看它的構造方法
static final class ForwardingNode<K,V> extends Node<K,V> { final Node<K,V>[] nextTable; ForwardingNode(Node<K,V>[] tab) { super(MOVED, null, null, null); this.nextTable = tab; } }
不難發下其初始化方法接收一個nextTab也就是擴容後的新數組,並將該數組賦值給其內部變量nextTable。也就是說當get發現桶已經擴容後,咱們能夠從fwd節點中找到新的數組。並重新的數組中找到新的目標桶並進行元素查找。
看了以上代碼,回到 e.find(h, key)) ,須要明確的是e就是ForwardingNode節點。看看find方法
1 Node<K,V> find(int h, Object k) { 2 // loop to avoid arbitrarily deep recursion on forwarding nodes 3 outer: for (Node<K,V>[] tab = nextTable;;) {//迭代nextTable 4 Node<K,V> e; int n; 5 if (k == null || tab == null || (n = tab.length) == 0 || 6 (e = tabAt(tab, (n - 1) & h)) == null) 7 return null; 8 for (;;) { 9 int eh; K ek; 10 if ((eh = e.hash) == h && 11 ((ek = e.key) == k || (ek != null && k.equals(ek)))) 12 return e; 13 if (eh < 0) { 14 if (e instanceof ForwardingNode) { 15 tab = ((ForwardingNode<K,V>)e).nextTable; 16 continue outer; 17 } 18 else 19 return e.find(h, k); 20 } 21 if ((e = e.next) == null) 22 return null; 23 } 24 } 25 }
OK,很明確了,確實是從nextTable中查找的。
得出一個結論,ConcurrentHashMap擴容不影響get操做。也就是在擴容過程當中能夠併發讀。
詳細看了ConcurrentHashMap put resize get過程的源碼,本章從總體上看一下ConcurrentHashMap的併發控制。
下面結合圖片咱們看一下ConcurrentHashMap的併發過程。
如上圖,線程1進行put操做,這時發現size > sizeCtl。開始進行擴容
此時線程1已經完成oldTab中索引[2,16)中的擴容。正在進行索引爲1的桶的擴容。接下來線程2執行get。
線程2根據get邏輯和key的hash,可能訪問的三種狀況如上圖所示
狀況一:訪問藍色號桶,即未擴容的桶。該桶還未進行擴容,因此在桶中找到對應元素,返回。
狀況二:訪問綠色桶,即正在擴容的桶。該桶正在擴容,在擴容過程當中,線程1持有Synchronized鎖,線程2只能自旋等待。
狀況三:訪問橘色桶,該桶已擴容的桶。該桶已擴容,oldTab中是fwd節點,hash=-1,因此執行fwd節點的find邏輯,fwd節點持有newTab(nextTable),因此線程2去newTab中查找對應元素,返回。
如上圖4,當線程1進行擴容時,線程3進來執行put,一樣存在三種可能的狀況
狀況一:訪問藍色桶,即未擴容的桶。正常執行put邏輯。
狀況二:訪問綠色桶,即正擴容的桶。由於線層1持有Synchronized鎖,線程3將一直自旋,等待擴容結束。
狀況三:訪問橘色桶,即已擴容的桶。由於已擴容的桶,在oldTab中是fwd節點,hash = -1 = MOVED,因此線程3執行幫助擴容的邏輯。等待擴容完成,線程3繼續完成put邏輯。
OK,以上就是ConcurrentHashMap關於get put resize的併發控制,從以上過程可見,存在鎖競爭的狀況頗有限,即便存在鎖競爭,也是進行自旋,而不會阻塞線程。可見ConcurrentHashMap能作到高效的併發讀。
在put過程當中,由於若是存在線程正在已經擴容,則幫助進行擴容(協助擴容這塊,有一個步長的概念,同時進行擴容的線程和table的長度有關)。若是當前桶正在進行擴容,則被Synchronized鎖拒之門外,自旋等待擴容結束。若是訪問的是未擴容的桶,則執行正常的put邏輯。可見整個過程當中,因爲鎖的粒度很小,put作到了高效的併發寫,也作到了高效的擴容。
總之一句話ConcurrentHashMap的高併發是經過 CAS樂觀鎖 + 自旋鎖 + 細粒度 保證的。
若有錯誤的地方還請留言指正。
原創不易,轉載請註明原文地址:https://www.cnblogs.com/hello-shf/p/12183263.html
原文出處:https://www.cnblogs.com/hello-shf/p/12183263.html