ConcurrentHashMap源碼解析(1.8)

1、簡介

上篇文章詳細介紹了HashMap的源碼及原理,本文趁熱打鐵繼續分析ConcurrentHashMap的原理。html

首先在看本文以前,但願對HashMap有一個詳細的瞭解。否則看直接看ConcurrentHashMap的源碼仍是有些費勁的。node

相信對HashMap,HashTable有必定了解,應該知道HashMap是不具有線程安全性的,在resize時會丟數據(JDK8),而HashTable雖然保證了線程安全性,可是其是經過給每一個方法加Synchronized關鍵字達到的同步目的。可是都知道Synchronized在競爭激烈的多線程併發環境中,在性能上的表現是很是不如人意的。那在高併發環境中HashMap如何保證線程安全而又不浪費太多性能呢?答案就是Java J.U.C併發包中的ConcurrentHashMap。數組

依然開局一張圖。JDK8中的ConcurrentHashMap數據結構。安全

 

 

 呃呵,和HashMap的結構是同樣的,沒錯在數據結構層面,ConcurrentHashMap和HashMap是徹底同樣的。有了這個基礎繼續往下看。數據結構

2、歷史版本

ConcurrentHashMap的歷史版本大體分界線在JDK8。也就是能夠分爲JDK8和JDK8之前版本。多線程

數據結構的區別併發

在JDK8以前HashMap沒有引入紅黑樹,一樣的ConcurrentHashMap也沒有引入紅黑樹。並且ConcurrentHashMap採用的是分段數組的底層數據結構。ide

在JDK7中的數據結構。函數

從上圖咱們不難看出其在數據結構方面的差異。高併發

鎖的區別

JDK7中爲了提升併發性能採用了這種分段的設計。因此在JDK7中ConcurrentHashMap採用的是分段鎖,也就是在每一個Segment上加ReentrantLock實現的線程安全線。關於ReetrantLock後面有時間會介紹,大體來講ReetrantLoack是比Synchronized更細粒度的一種鎖。使用得當的話其性能要比Synchronized表現要好,可是若是實現不得當容易形成死鎖。

這種基於Segment和ReetrantLock的設計相對HashTable來講大大提升了併發性能。也就是說多個線程能夠併發的操做多個Segment,而HashTable是經過給每一個方法加Synchronized即將多線程串行而實現的。因此在必定程度上提升了併發性能。可是這種性能的提高表現相對JDK8來講顯得不值一提。

若是說JDK7 ConcurrentHashMap相對HashTable來講是串行到多個線程併發的改進。而JDK8則是經過比Segment更細粒度的併發控制大大提升了其並發表現。

JDK8中ConcurrentHashMap採用的是CAS+Synchronized鎖而且鎖粒度是每個桶。簡單來講JDK7中鎖的粒度是Segment,JDK8鎖粒度細化到了桶級別。可想而知鎖粒度是大大提到了。輔之以代碼的優化,JDK8中的ConcurrentHashMap在性能上的表現很是優秀。

簡單總結一下,從HashTable到JDK7 ConcurrentHashMap再到JDK8 ConcurrentHashMap。是從同步到併發再到高併發的進步。

3、基礎知識

3.一、常量

//正在擴容,對應fwd類型的節點的hash
static final int MOVED     = -1; // hash for forwarding nodes //當前數組
transient volatile Node<K,V>[] table; //擴容時用到的,擴容後的數組。
private transient volatile Node<K,V>[] nextTable; //1,大於零,表示size * 0.75。 //2,等於-1,表示正在初始化。 //3,-(n + 1),表示正在執行擴容的線程其只表示基數,而不是真正的數量,須要計算得出的哦
private transient volatile int sizeCtl;

3.二、Unsafe類方法

 1     @SuppressWarnings("unchecked")  //transient volatile Node<K,V>[] table; tab變量確實是volatile
 2     static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {//獲取table中索引 i 處的元素。
 3         return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);//若是tab是volatile變量,則該方法保證其可見性。
 4  }  5 
 6     static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,//經過CAS設置table索引爲 i 處的元素。
 7                                         Node<K,V> c, Node<K,V> v) {  8         return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);  9  } 10             //transient volatile Node<K,V>[] table; tab變量確實是volatile
11     static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {//修改table 索引 i 處的元素。
12         U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);//若是tab是volatile變量,則該方法保證其可見性。
13     }

 

咱們不難看出 以上三個方法都是調用的Unsafe(U)類中的方法,Unsafe類中定義了大量對內存的操做方法,是native的,不建議開發者直接使用。

tabAt和setTabAt最終調用的兩個方法分別是 U.getObjectVolatile()和U.putObjectVolatile 顧名思義其是經過volatile保證的tab的可見性(Volatile只保證可見性不保證原子性哦)。前提是tab變量是Volatile修飾的變量。咱們經過調用棧,最紅能夠看到其實tab就是ConcurrentHashMap中的table。而這個變量是這麼定義的。

transient volatile Node<K,V>[] table;

 

可見其確實是Volatile修飾的變量。

再看

casTabAt方法,這個就是CAS方法了。

CAS:Compare and Swap三個單詞的縮寫,即:比較交換的意思。CAS在Java中又稱之爲樂觀鎖即咱們總認爲是沒有鎖的。

while(true){ CAS(); }

通常的經過上述用法達到自旋的目的。CAS通常經過自旋達到自旋鎖的目的,即認爲沒有鎖,失敗重試,這種思路。更多內容請自行百度。CAS很重要哦。

4、put過程源碼

 1 public V put(K key, V value) {  2     return putVal(key, value, false);  3 }  4 
 5 /** Implementation for put and putIfAbsent */
 6 final V putVal(K key, V value, boolean onlyIfAbsent) {  7     if (key == null || value == null) throw new NullPointerException();  8     int hash = spread(key.hashCode());//hash,對hashcode再散列
 9     int binCount = 0; 10     for (Node<K,V>[] tab = table;;) {//迭代桶數組,自旋
11         Node<K,V> f; int n, i, fh; 12         if (tab == null || (n = tab.length) == 0)//懶加載。若是爲空,則進行初始化
13             tab = initTable();//初始化桶數組 14         //(n - 1) & hash)計算下標,取值,爲空即無hash碰撞
15         else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { 16             if (casTabAt(tab, i, null, 17                          new Node<K,V>(hash, key, value, null)))//經過cas插入新值
18                 break;                   // no lock when adding to empty bin
19  } 20         //判斷是否正在擴容。若是正在擴容,當前線程幫助進行擴容。 21         //每一個線程只能同時負責一個桶上的數據遷移,而且不影響其它桶的put和get操做。 22         //(很牛逼的思路,能這麼作創建在更細粒度的鎖基礎上)
23         else if ((fh = f.hash) == MOVED) 24             tab = helpTransfer(tab, f); 25         else {//put5,存在hash碰撞
26             V oldVal = null; 27             //此處,f在上面已經被賦值,f爲當前下標桶的首元素。對鏈表來講是鏈表頭對紅黑樹來講是紅黑樹的頭元素。
28             synchronized (f) { 29                 //再次檢查當前節點是否有變化,有變化進入下一輪自旋 30                 //爲何再次檢查?由於不能保證,當前線程到這裏,有沒有其餘線程對該節點進行修改
31                 if (tabAt(tab, i) == f) { 32                     if (fh >= 0) {//當前桶爲鏈表
33                         binCount = 1; 34                         for (Node<K,V> e = f;; ++binCount) {//迭代鏈表節點
35  K ek; 36                             if (e.hash == hash &&//key相同,覆蓋(onlyIfAbsent有什麼用?)
37                                 ((ek = e.key) == key ||
38                                  (ek != null && key.equals(ek)))) { 39                                 oldVal = e.val; 40                                 if (!onlyIfAbsent) 41                                     e.val = value; 42                                 break; 43  } 44                             Node<K,V> pred = e; 45                             //找到鏈表尾部,插入新節點。(什麼這裏不用CAS?由於這在同步代碼塊裏面)
46                             if ((e = e.next) == null) { 47                                 pred.next = new Node<K,V>(hash, key, 48                                                           value, null); 49                                 break; 50  } 51  } 52  } 53                     else if (f instanceof TreeBin) {//當前桶爲紅黑樹
54                         Node<K,V> p; 55                         binCount = 2; 56                         if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, 57                                                        value)) != null) {//想紅黑樹插入新節點
58                             oldVal = p.val; 59                             if (!onlyIfAbsent) 60                                 p.val = value; 61  } 62  } 63  } 64  } 65             if (binCount != 0) { 66                 //樹化。binCount > 8,進行樹化,鏈表轉紅黑樹
67                 if (binCount >= TREEIFY_THRESHOLD) 68                     //若是容量 < 64則直接進行擴容;不轉紅黑樹。 69                     //(你想一想,假如容量爲16,你就插入了9個元素,巧了,都在同一個桶裏面, 70                     //若是這時進行樹化,時間複雜度會增長,性能降低,不如直接進行擴容,空間換時間)
71  treeifyBin(tab, i); 72                 if (oldVal != null) 73                     return oldVal; 74                 break; 75  } 76  } 77  } 78     addCount(1L, binCount);//擴容。addCount內部會進行判斷要不要擴容
79     return null; 80 }

總結以上過程

1,懶加載,未初始化則初始化table 2,hash,hashcode再散列,並計算下標 3,無碰撞,經過CAS插入 4,有碰撞   4.1、若是正在擴容,協助其它線程去擴容   4.2、若是是鏈表,插入鏈表   4.3、若是是紅黑樹,插入紅黑樹   4.4、若是鏈表長度超過8,樹化   4.5,若是key已經存在,覆蓋舊值 5,須要擴容,則擴容

相比HashMap過程多了一個協助擴容。

以上源碼須要注意的是

1 for (Node<K,V>[] tab = table;;) {//迭代桶數組,自旋
2     
3 }

這是一個自旋的過程,若是CAS修改失敗會進入下一輪自旋。好久之前看這段源碼的時候,我老是在想,CAS失敗了不就丟數據了嗎?因此這個自旋,也稱之爲自旋鎖會保證數據必定能插入成功。

說說上面鎖競爭的狀況,以上過程咱們不難發現對table的修改都是經過CAS操做實現的。好比下面這行代碼,若是已經有線程正在操做 i 位置的元素,則意味着本輪自旋將會失敗,繼續自旋,當其餘線程修改完成,本線程再次運行到tabAt覺得是Volatile操做,其餘線程的修改對本線程當即可見(詳見Volatile關鍵字內存語義的內容)。本線程經過tabAt發現該處已經存在元素,即發生碰撞,繼續往下運行。

 

1 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { 2     if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))//經過cas插入新值
3      break;                   // no lock when adding to empty bin
4 }

線程的調度須要操做系統從用戶態轉爲內核態,這是很是重量級的操做。CAS+自旋組成的自旋鎖保證了線程不會進入阻塞態。

而後繼續往下看

 

synchronized (f) { //再次檢查當前節點是否有變化,有變化進入下一輪自旋 //爲何再次檢查?由於不能保證,當前線程運行到這裏,有沒有其餘線程對該節點進行修改
    if (tabAt(tab, i) == f) {

先看這行代碼 synchronized (f) 這個f是一個桶的頭元素。也就是說在JDK8中synchronized鎖僅僅只鎖鏈表頭或者紅黑樹的頭(其實就是鎖一個桶,由於要訪問鏈表或者紅黑樹總要從頭開始訪問吧)

再看 if (tabAt(tab, i) == f) {} 其實就是雙重檢測(參考單例的雙重檢測),爲何要再檢查一遍呢?由於不能保證當前線程運行到這裏,有沒有其餘線程已經對該節點進行了修改。

initTable()

 1 private final Node<K,V>[] initTable() {  2     Node<K,V>[] tab; int sc;  3     while ((tab = table) == null || tab.length == 0) {  4         // 賦值sc。並當sizeCtl == -1 即當前有線程正在執行初始化
 5         if ((sc = sizeCtl) < 0)  6             //yield()暫停當前正在執行的線程,執行其餘線程  7             //(這是一個通知,可是這是不必定會讓當前線程中止,要取決於線程調度器)  8             //就是我想讓出資源,可是這只是一廂情願的事情,線程調度器會考慮你的方法,可是不必定採納。
 9  Thread.yield(); 10         //修改 sizeCtl 的值爲 -1。 SIZECTL 爲 sizeCtl 的內存地址。
11         else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { 12             try { 13                 //執行初始化過程
14                 if ((tab = table) == null || tab.length == 0) { 15                     //sc在上面已經賦值,=原來 sizeCtl的值。是非討厭JDK源碼這種賦值方式。
16                     int n = (sc > 0) ? sc : DEFAULT_CAPACITY; 17                     @SuppressWarnings("unchecked") 18                     //建立一個sc長度的table。
19                     Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; 20                     table = tab = nt; 21                     sc = n - (n >>> 2); 22  } 23             } finally { 24                 //初始化完成, sizeCtl從新賦值爲當前數組的長度。
25                 sizeCtl = sc; 26  } 27             break; 28  } 29  } 30     return tab; 31 }

以上過程,一樣是經過CAS實現的初始化控制,保證只有一個線程去執行初始化。

helpTransfer(tab, f);方法咱們後面介紹完擴容再說。

看完以上put過程,咱們能發現,JDK8經過CAS+自旋鎖將鎖的粒度控制在每個桶上,相對於JDK7中Segment鎖,鎖粒度提升了不少。而且CAS+自旋鎖保證了不會出現線程的切花這種重量級的操做。

5、擴容

 1 //tab舊桶數組,nextTab新桶數組
 2 private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {  3     int n = tab.length, stride;  4     //控制併發數,控制CPU的資源
 5     if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)  6         stride = MIN_TRANSFER_STRIDE; // subdivide range
 7     if (nextTab == null) {            // initiating//新數組爲空,則初始化新數組
 8         try {  9             @SuppressWarnings("unchecked")  10             //擴容爲原來的兩倍 n << 1
 11             Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];  12             nextTab = nt;  13         } catch (Throwable ex) {      // try to cope with OOME
 14             sizeCtl = Integer.MAX_VALUE;  15             return;  16  }  17         nextTable = nextTab;  18         transferIndex = n;  19  }  20     int nextn = nextTab.length;  21     //在這裏面進行new Node將node.hash置爲-1。表示該桶正在進行移動。  22     //(這裏很重要的一點是,只鎖表頭,因此只須要將鏈表(或者紅黑樹)頭結點.hash置爲-1便可)
 23     ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);  24     //advance是控制是否繼續進行移動的條件,當advance == false,表示正在移動一個桶。  25     //true表示能夠繼續進行下一個桶的移動
 26     boolean advance = true;  27     boolean finishing = false; // to ensure sweep before committing nextTab
 28     for (int i = 0, bound = 0;;) {//自旋
 29         Node<K,V> f; int fh;  30         while (advance) {//start
 31             int nextIndex, nextBound;  32             //當前桶是否是已經移動完了
 33             if (--i >= bound || finishing)  34                 advance = false;  35             //兩個中止移動的條件。移動完了。(這個是真正中止的條件。下面那個條件會進行一次檢查)
 36             else if ((nextIndex = transferIndex) <= 0) {  37                 i = -1;  38                 advance = false;  39  }  40             else if (U.compareAndSwapInt  41                      (this, TRANSFERINDEX, nextIndex,  42                       nextBound = (nextIndex > stride ?
 43                                    nextIndex - stride : 0))) {  44                 bound = nextBound;  45                 i = nextIndex - 1;  46                 advance = false;  47  }  48  }  49         if (i < 0 || i >= n || i + n >= nextn) {  50             int sc;  51             if (finishing) {//結束擴容
 52                 nextTable = null;  53                 table = nextTab;  54                 sizeCtl = (n << 1) - (n >>> 1);  55                 return;  56  }  57             if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {  58                 if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)  59                     return;  60                 finishing = advance = true;  61                 i = n; // recheck before commit 再次檢查一遍,防止有桶中還有數據沒移動。
 62  }  63         }//end 從start到end可看可不看就是條件控制,包括結束條件的控制,移動進度的控制等。  64         //該桶沒數據
 65         else if ((f = tabAt(tab, i)) == null)  66             //將oldtab中的該桶設置爲fwd節點,hash=-1
 67             advance = casTabAt(tab, i, null, fwd);  68         //已經移動過的桶其hash=-1
 69         else if ((fh = f.hash) == MOVED)  70             advance = true; // already processed
 71         else {  72             synchronized (f) {//上鎖
 73                 if (tabAt(tab, i) == f) {  74                     //ln新鏈表,不須要移動的節點從新組組織成的鏈表。  75                     //hn新鏈表,須要移動的節點從新組織成的鏈表
 76                     Node<K,V> ln, hn;  77                     if (fh >= 0) {//鏈表
 78                         int runBit = fh & n;  79                         Node<K,V> lastRun = f;  80                         //start  81                         //從start,到end之間。不看也行。實在費腦子。其實這段代碼寫的有點讓人費解  82                         //主要是不認真看不知道做者的意圖。本意是這樣的。判斷是否是能夠從某個節點n開始  83                         //後面的節點是否是都是和節點n同樣,移動的目標桶同樣的。  84                         //若是是同樣的,則後面的這些節點就不用移動了,只須要移動n節點便可。  85                         //(注意鏈表的引用,next指針就把後面的都帶過去了)  86                         //想一個極端狀況,若是在這裏迭代後發現,全部節點,擴容後數據移動的目標桶都是同樣的。  87                         //則只須要移動頭結點便可。不用從新拼接鏈表了。
 88                         for (Node<K,V> p = f.next; p != null; p = p.next) {  89                             int b = p.hash & n;  90                             if (b != runBit) {  91                                 runBit = b;  92                                 lastRun = p;  93  }  94  }  95                         if (runBit == 0) {// runBit== 0 表示該節點不須要移動
 96                             ln = lastRun;  97                             hn = null;  98  }  99                         else { 100                             hn = lastRun; 101                             ln = null; 102                         }//end
103                         for (Node<K,V> p = f; p != lastRun; p = p.next) { 104                             int ph = p.hash; K pk = p.key; V pv = p.val; 105                             if ((ph & n) == 0) 106                                 ln = new Node<K,V>(ph, pk, pv, ln); 107                             else
108                                 hn = new Node<K,V>(ph, pk, pv, hn); 109  } 110  setTabAt(nextTab, i, ln); 111                         setTabAt(nextTab, i + n, hn); 112  setTabAt(tab, i, fwd); 113                         advance = true; 114  } 115                     else if (f instanceof TreeBin) {//紅黑樹
116                         TreeBin<K,V> t = (TreeBin<K,V>)f; 117                         TreeNode<K,V> lo = null, loTail = null; 118                         TreeNode<K,V> hi = null, hiTail = null; 119                         int lc = 0, hc = 0; 120                         for (Node<K,V> e = t.first; e != null; e = e.next) { 121                             int h = e.hash; 122                             TreeNode<K,V> p = new TreeNode<K,V>
123                                 (h, e.key, e.val, null, null); 124                             if ((h & n) == 0) { 125                                 if ((p.prev = loTail) == null) 126                                     lo = p; 127                                 else
128                                     loTail.next = p; 129                                 loTail = p; 130                                 ++lc; 131  } 132                             else { 133                                 if ((p.prev = hiTail) == null) 134                                     hi = p; 135                                 else
136                                     hiTail.next = p; 137                                 hiTail = p; 138                                 ++hc; 139  } 140  } 141                         ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : 142                             (hc != 0) ? new TreeBin<K,V>(lo) : t; 143                         hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : 144                             (lc != 0) ? new TreeBin<K,V>(hi) : t; 145  setTabAt(nextTab, i, ln); 146                         setTabAt(nextTab, i + n, hn); 147  setTabAt(tab, i, fwd); 148                         advance = true; 149  } 150  } 151  } 152  } 153  } 154 }

5.一、擴容前準備階段

ForwardingNode

1 static final class ForwardingNode<K,V> extends Node<K,V> { 2     final Node<K,V>[] nextTable; 3     ForwardingNode(Node<K,V>[] tab) { 4         super(MOVED, null, null, null); 5         this.nextTable = tab; 6  } 7 }

看一下這個內部類,其實呢其就是一個起到標識做用的節點,該節點看上面代碼可知,該節點最主要的特色就是hash=MOVED=-1。hash=-1的節點在ConcurrentHashMap中表示該桶是被擴容過程遷移過的桶。而後當前線程判斷若是該桶已經被遷移。不管put仍是get都去新的數組中操做。還有一點很重要,還能夠經過ForwardingNode中 nextTable獲取到新的數組。

 

1 //該桶沒數據
2 else if ((f = tabAt(tab, i)) == null) 3     //將oldtab中的該桶設置爲fwd節點,hash=-1
4     advance = casTabAt(tab, i, null, fwd);

看上面代碼,先判斷該桶還有沒有數據。沒數據不用遷移,等同於已經遷移完了。其餘線程put會直接put到新的數組中。

 

1 //已經移動過的桶其hash=-1;
2 else if ((fh = f.hash) == MOVED) 3     advance = true; // already processed

若是該桶已經移動則跳過。

到此咱們能看出什麼?主要是已經移動完的設置成fwd節點,其它線程看到該桶已經移動,則會到新的table中操做。若是未移動,還直接操做當前table,由於就算put,待會處理到該桶,同樣移動到新桶,也沒啥影響。若是是正在移動的接下來會看到加了Synchronized鎖,保證只有一個線程能操做當前桶。簡直不要太妙。

5.二、擴容過程

畫重點,擴容過程

 1 synchronized (f) {//上鎖
 2     if (tabAt(tab, i) == f) {  3         //ln新鏈表,不須要移動的節點從新組組織成的鏈表。  4         //hn新鏈表,須要移動的節點從新組織成的鏈表
 5         Node<K,V> ln, hn;  6         if (fh >= 0) {//鏈表
 7             int runBit = fh & n;  8             Node<K,V> lastRun = f;  9             //start 10             //從start,到end之間。不看也行。實在費腦子。其實這段代碼寫的有點讓人費解 11             //主要是不認真看不知道做者的意圖。本意是這樣的。判斷是否是能夠從某個節點n開始 12             //後面的節點是否是都是和節點n同樣,移動的目標桶同樣的。 13             //若是是同樣的,則後面的這些節點就不用移動了,只須要移動n節點便可。 14             //(注意鏈表的引用,next指針就把後面的都帶過去了) 15             //想一個極端狀況,若是在這裏迭代後發現,全部節點,擴容後數據移動的目標桶都是同樣的。 16             //則只須要移動頭結點便可。不用從新拼接鏈表了。
17             for (Node<K,V> p = f.next; p != null; p = p.next) { 18                 int b = p.hash & n; 19                 if (b != runBit) { 20                     runBit = b; 21                     lastRun = p; 22  } 23  } 24             if (runBit == 0) {// runBit== 0 表示該節點不須要移動
25                 ln = lastRun; 26                 hn = null; 27  } 28             else { 29                 hn = lastRun; 30                 ln = null; 31             }//end
32             for (Node<K,V> p = f; p != lastRun; p = p.next) { 33                 int ph = p.hash; K pk = p.key; V pv = p.val; 34                 if ((ph & n) == 0) 35                     ln = new Node<K,V>(ph, pk, pv, ln); 36                 else
37                     hn = new Node<K,V>(ph, pk, pv, hn); 38  } 39  setTabAt(nextTab, i, ln); 40             setTabAt(nextTab, i + n, hn); 41  setTabAt(tab, i, fwd); 42             advance = true; 43  } 44         else if (f instanceof TreeBin) {//紅黑樹 45             //紅黑樹跳過
46  } 47  } 48 }

 

 5.2.一、併發控制

首先擴容過程是在synchronized同步代碼塊中的。而且只鎖了一個表頭。可看到沒有鎖新數組nextTab的桶。想一想,oldTab(tab變量)和nextTab都是多個線程共享的變量,爲何只有隻鎖了oldTab正在操做的桶?若是有多個線程向nextTab同時遷移數據怎麼辦?會不會存在線程安全性問題?

TIPS: 統一術語 tab = oldTab = table(舊數組) newTab = nextTab(擴容後新數組) oldIndex即在oldTab中的索引位 newIndex即在newTab中的位置

上一篇文章中介紹HashMap的時候詳細介紹了HashMap擴容中,oldTab舊桶遷移向newTab只有兩個目標桶。再簡單回顧一遍。

上面這張圖形象的展現了舊桶在擴容後的兩個去向:1,索引位原地不動,2,索引位爲oldCap+oldIndex。(關於爲何是這兩個去向,在HashMap擴容中已經詳細介紹了)

若是你還沒懂個人疑問,請參考下面這個圖。

前提,ConcurrentHashMap是併發擴容,能夠有多個線程同時擴容,其次若是如上圖紅線那樣,oldTab中有多個桶中的數據遷移到newTab中的同一個桶中,若是出現這種狀況就意味着存在線程安全性問題。

從上圖5-1中,兩個數據遷移的方向可知,擴容前,oldIndex不一樣就表示不在一個桶,擴容後的兩個去向若是oldIndex不同,也必定不在同一個桶。因此不會出現5-2圖中紅線的那種狀況,也就說明在擴容過程當中不須要鎖newTab。佩服+2

 

5.2.二、數據遷移

//ln新鏈表,不須要移動的節點從新組組織成的鏈表。 //hn新鏈表,須要移動的節點從新組織成的鏈表
Node<K,V> ln, hn;
int runBit = fh & n;

看兩個變量,上面說過擴容後,舊桶中的數據只有兩個遷移的方向。ln正是數據遷移後索引位依然是oldIndex的數據的鏈表,hn是遷移後須要遷移到oldCap + oldIndex索引位的鏈表。

關注一下runBit變量,若是 runBit == 0 成立則說明遷移後桶的索引位依然是oldIndex。詳見HashMap擴容分析。

重點關注一下start到end之間的代碼

關於這段代碼,首選咱們假設一種極端狀況,若是當前正在移動的桶中的數據在rehash以後,數據遷移的目標桶除了第一個節點的目標桶是oldIndex以外,後面的數據的目標桶都是oldIndex + oldCap。咱們還須要處理後面的節點嗎?不須要,由於只須要將第二個節點移動到newTab的oldIndex + oldCap位置便可。第二個元素也就是lastRun變量。相對於HashMap徹底的將數據組織成兩個鏈表,這也算得上是一個性能上的優化吧。

接着往下看

代碼段1:

1 for (Node<K,V> p = f.next; p != null; p = p.next) { 2     int b = p.hash & n; 3     if (b != runBit) {//相同跳過 4         runBit = b; 5         lastRun = p; 6  } 7 }

以上代碼經過對鏈表的一次掃描決定了lastRun。

代碼段2:

1 if (runBit == 0) {// runBit== 0 表示該節點不須要移動
2     ln = lastRun; 3     hn = null; 4 } 5 else { 6     hn = lastRun; 7     ln = null; 8 }//end

 

根據lastRun指向的節點的runBit決定後續節點在擴容後是oldIndex + oldCap仍是oldIndex。

代碼段3:

1 for (Node<K,V> p = f; p != lastRun; p = p.next) { 2     int ph = p.hash; K pk = p.key; V pv = p.val; 3     if ((ph & n) == 0) 4         ln = new Node<K,V>(ph, pk, pv, ln); 5     else
6         hn = new Node<K,V>(ph, pk, pv, hn); 7 }

 

上述代碼會從新組織兩個新鏈表。注意這個迭代到lastRun位置結束,由於以上過程已經肯定了lastRun的歸屬。

看一下 ln = new Node<K,V>(ph, pk, pv, ln); 從新組織鏈表的代碼,也就是ln會成爲新new出來的node的下一個節點。

這樣有什麼問題?問題就是節點在舊桶中的相對順序在新桶中將相反。也就是next的指針翻轉一下。能夠看一下node的構造函數就明瞭了。

 

演示擴容過程

假設當前擴容前oldCap即oldTab的長度爲2,擴容後newCap即newTab的長度爲4。以下圖看擴容過程,橘色的表明遷移後索引位依然是oldIndex,綠色表明擴容後索引位爲oldIndex + oldCap。

 

 

 上述代碼段1迭代找到了lastRun即指向node(11),代碼段2將lastRun賦值給hn。代碼段3執行過程以下

1,將node(1)拼接到ln 2,將node(3)拼接到hn,此時注意,hn已經lastRun指向的節點node(11),此時hn=3—>11—>15—>19—>null
3,處理node(5)拼接到ln 4,處理...

 

對比JDK7 HashMap,JDK8 HashMap,JDK8 ConcurrentHashMap在擴容後對節點相對順序的保證方面,JDK7 HashMap是徹底倒序。JDK8 HashMap不改變相對順序。JDK8 ConcurrentHashMap 保證部分節點的相對順序,其他的倒序。

題外話,從代碼風格和死路上,猜想一下ConcurrentHashMap應該是來自JDK7的HashMap。

 

1 setTabAt(nextTab, i, ln); 2 setTabAt(nextTab, i + n, hn); 3 setTabAt(tab, i, fwd); 4 advance = true;

ln和hn兩個鏈表各回各家各找各媽。

 

回過頭來再看put方法中的幫助擴容

1 else if ((fh = f.hash) == MOVED) 2     tab = helpTransfer(tab, f);

 

在put方法中有這樣一行判斷,當f.hash = MOVED即當前HashMap正在擴容中,則當前線程會去嘗試幫助擴容。

 1 final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {  2     Node<K,V>[] nextTab; int sc;  3     if (tab != null && (f instanceof ForwardingNode) &&//條件判斷
 4         (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {//從fwd節點中取出新table
 5         int rs = resizeStamp(tab.length);  6         while (nextTab == nextTable && table == tab &&
 7                (sc = sizeCtl) < 0) {  8             if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
 9                 sc == rs + MAX_RESIZERS || transferIndex <= 0) 10                 break; 11             if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {//修改sizeCtl = sizeCtl + 1,表示多了一個線程參與擴容
12  transfer(tab, nextTab); 13                 break; 14  } 15  } 16         return nextTab; 17  } 18     return table; 19 }

 

在helpTransfer方法中會首先作一系列判斷,經過fwd節點獲取到nextTab即新的數組。經過CAS 實現sizeCtl++操做,表示多了一個線程進行擴容,由於在擴容方法中對擴容線程數量有控制。

 

最後的最後,擴容的時機

說一下觸發擴容的操做,總的來講就是put操做,可是有兩個時機很重要,其一就是addCount方法中,每次put一個元素,在addCount方法中都會判斷需不須要進行擴容。另外就是treeifyBin方法中,若是桶中數據超過了8個而且數組長度<64則不會進行樹化,而是進行擴容。關於這個在HashMap源碼介紹中也有介紹。你想一想,假如容量爲16,你就插入了9個元素,巧了,都在同一個桶裏面,若是這時進行樹化,樹化自己就是一個耗時的過程。時間複雜度會增長,性能降低,不如直接進行擴容,空間換時間。

終於擴容過程寫完了。很經典,想讀懂也很費勁。

6、get過程源碼

 1 public V get(Object key) {  2     Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;  3     int h = spread(key.hashCode());//hash
 4     if ((tab = table) != null && (n = tab.length) > 0 &&
 5         (e = tabAt(tab, (n - 1) & h)) != null) {//取桶
 6         if ((eh = e.hash) == h) {//key相同直接返回
 7             if ((ek = e.key) == key || (ek != null && key.equals(ek)))  8                 return e.val;  9  } 10         else if (eh < 0)//hash < 0 表示正在擴容 11             //在這裏須要很是注意的一點,擴容後的桶會放入fwd節點 12             //該節點hash = MOVED,fwd.nextTable爲擴容後新的數組。
13             return (p = e.find(h, key)) != null ? p.val : null; 14         while ((e = e.next) != null) {//迭代鏈表
15             if (e.hash == h &&
16                 ((ek = e.key) == key || (ek != null && key.equals(ek)))) 17                 return e.val; 18  } 19  } 20     return null; 21 }

 

get源碼只關注下面這行

return (p = e.find(h, key)) != null ? p.val : null;

 

當該桶已經被移動,則經過e.find方法去nextTab新數組查找。首先在5章節resize擴容方法中,已經擴容的桶會被塞進去一個ForwardingNode節點 setTabAt(tab, i, fwd); 繼續看resize方法中ForwardingNode的初始化會發現是這樣初始化的 ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); ,看它的構造方法

static final class ForwardingNode<K,V> extends Node<K,V> { final Node<K,V>[] nextTable; ForwardingNode(Node<K,V>[] tab) { super(MOVED, null, null, null); this.nextTable = tab; } }

 

不難發下其初始化方法接收一個nextTab也就是擴容後的新數組,並將該數組賦值給其內部變量nextTable。也就是說當get發現桶已經擴容後,咱們能夠從fwd節點中找到新的數組。並重新的數組中找到新的目標桶並進行元素查找。

看了以上代碼,回到 e.find(h, key)) ,須要明確的是e就是ForwardingNode節點。看看find方法

 1 Node<K,V> find(int h, Object k) {  2     // loop to avoid arbitrarily deep recursion on forwarding nodes
 3     outer: for (Node<K,V>[] tab = nextTable;;) {//迭代nextTable
 4         Node<K,V> e; int n;  5         if (k == null || tab == null || (n = tab.length) == 0 ||
 6             (e = tabAt(tab, (n - 1) & h)) == null)  7             return null;  8         for (;;) {  9             int eh; K ek; 10             if ((eh = e.hash) == h &&
11                 ((ek = e.key) == k || (ek != null && k.equals(ek)))) 12                 return e; 13             if (eh < 0) { 14                 if (e instanceof ForwardingNode) { 15                     tab = ((ForwardingNode<K,V>)e).nextTable; 16                     continue outer; 17  } 18                 else
19                     return e.find(h, k); 20  } 21             if ((e = e.next) == null) 22                 return null; 23  } 24  } 25 }

OK,很明確了,確實是從nextTable中查找的。

得出一個結論,ConcurrentHashMap擴容不影響get操做。也就是在擴容過程當中能夠併發讀。

7、ConcurrentHashMap併發控制

詳細看了ConcurrentHashMap put resize get過程的源碼,本章從總體上看一下ConcurrentHashMap的併發控制。

下面結合圖片咱們看一下ConcurrentHashMap的併發過程。

 

 

 

 

 如上圖,線程1進行put操做,這時發現size > sizeCtl。開始進行擴容

 

 

 

 

此時線程1已經完成oldTab中索引[2,16)中的擴容。正在進行索引爲1的桶的擴容。接下來線程2執行get。

 

 

 

 

線程2根據get邏輯和key的hash,可能訪問的三種狀況如上圖所示

狀況一:訪問藍色號桶,即未擴容的桶。該桶還未進行擴容,因此在桶中找到對應元素,返回。

狀況二:訪問綠色桶,即正在擴容的桶。該桶正在擴容,在擴容過程當中,線程1持有Synchronized鎖,線程2只能自旋等待。

狀況三:訪問橘色桶,該桶已擴容的桶。該桶已擴容,oldTab中是fwd節點,hash=-1,因此執行fwd節點的find邏輯,fwd節點持有newTab(nextTable),因此線程2去newTab中查找對應元素,返回。

 

 如上圖4,當線程1進行擴容時,線程3進來執行put,一樣存在三種可能的狀況

狀況一:訪問藍色桶,即未擴容的桶。正常執行put邏輯。

狀況二:訪問綠色桶,即正擴容的桶。由於線層1持有Synchronized鎖,線程3將一直自旋,等待擴容結束。

狀況三:訪問橘色桶,即已擴容的桶。由於已擴容的桶,在oldTab中是fwd節點,hash = -1 = MOVED,因此線程3執行幫助擴容的邏輯。等待擴容完成,線程3繼續完成put邏輯。

 

OK,以上就是ConcurrentHashMap關於get put resize的併發控制,從以上過程可見,存在鎖競爭的狀況頗有限,即便存在鎖競爭,也是進行自旋,而不會阻塞線程。可見ConcurrentHashMap能作到高效的併發讀。

在put過程當中,由於若是存在線程正在已經擴容,則幫助進行擴容(協助擴容這塊,有一個步長的概念,同時進行擴容的線程和table的長度有關)。若是當前桶正在進行擴容,則被Synchronized鎖拒之門外,自旋等待擴容結束。若是訪問的是未擴容的桶,則執行正常的put邏輯。可見整個過程當中,因爲鎖的粒度很小,put作到了高效的併發寫,也作到了高效的擴容。

總之一句話ConcurrentHashMap的高併發是經過 CAS樂觀鎖 + 自旋鎖 + 細粒度 保證的。

 

  若有錯誤的地方還請留言指正。
  原創不易,轉載請註明原文地址:https://www.cnblogs.com/hello-shf/p/12183263.html

原文出處:https://www.cnblogs.com/hello-shf/p/12183263.html

相關文章
相關標籤/搜索