ConcurrentHashMap原理分析(1.7與1.8)

前言

之前寫過介紹HashMap的文章,文中提到過HashMap在put的時候,插入的元素超過了容量(由負載因子決定)的範圍就會觸發擴容操做,就是rehash,這個會從新將原數組的內容從新hash到新的擴容數組中,在多線程的環境下,存在同時其餘的元素也在進行put操做,若是hash值相同,可能出現同時在同一數組下用鏈表表示,形成閉環,致使在get時會出現死循環,因此HashMap是線程不安全的。java

咱們來了解另外一個鍵值存儲集合HashTable,它是線程安全的,它在全部涉及到多線程操做的都加上了synchronized關鍵字來鎖住整個table,這就意味着全部的線程都在競爭一把鎖,在多線程的環境下,它是安全的,可是無疑是效率低下的。node

其實HashTable有不少的優化空間,鎖住整個table這麼粗暴的方法能夠變相的柔和點,好比在多線程的環境下,對不一樣的數據集進行操做時其實根本就不須要去競爭一個鎖,由於他們不一樣hash值,不會由於rehash形成線程不安全,因此互不影響,這就是鎖分離技術,將鎖的粒度下降,利用多個鎖來控制多個小的table,這就是這篇文章的主角ConcurrentHashMap JDK1.7版本的核心思想數組

ConcurrentHashMap

JDK1.7的實現

在JDK1.7版本中,ConcurrentHashMap的數據結構是由一個Segment數組和多個HashEntry組成,以下圖所示:安全

 

Segment數組的意義就是將一個大的table分割成多個小的table來進行加鎖,也就是上面的提到的鎖分離技術,而每個Segment元素存儲的是HashEntry數組+鏈表,這個和HashMap的數據存儲結構同樣數據結構

初始化

ConcurrentHashMap的初始化是會經過位與運算來初始化Segment的大小,用ssize來表示,以下所示多線程

1
2
3
4
5
6
int  sshift =  0 ;
int  ssize =  1 ;
while  (ssize < concurrencyLevel) {
     ++sshift;
     ssize <<=  1 ;
}

如上所示,由於ssize用位於運算來計算(ssize <<=1),因此Segment的大小取值都是以2的N次方,無關concurrencyLevel的取值,固然concurrencyLevel最大隻能用16位的二進制來表示,即65536,換句話說,Segment的大小最多65536個,沒有指定concurrencyLevel元素初始化,Segment的大小ssize默認爲16併發

每個Segment元素下的HashEntry的初始化也是按照位於運算來計算,用cap來表示,以下所示dom

1
2
3
int  cap =  1 ;
while  (cap < c)
     cap <<=  1 ;

如上所示,HashEntry大小的計算也是2的N次方(cap <<=1), cap的初始值爲1,因此HashEntry最小的容量爲2ssh

put操做

對於ConcurrentHashMap的數據插入,這裏要進行兩次Hash去定位數據的存儲位置ide

1
static  class  Segment<K,V>  extends  ReentrantLock  implements  Serializable {

從上Segment的繼承體系能夠看出,Segment實現了ReentrantLock,也就帶有鎖的功能,當執行put操做時,會進行第一次key的hash來定位Segment的位置,若是該Segment尚未初始化,即經過CAS操做進行賦值,而後進行第二次hash操做,找到相應的HashEntry的位置,這裏會利用繼承過來的鎖的特性,在將數據插入指定的HashEntry位置時(鏈表的尾端),會經過繼承ReentrantLock的tryLock()方法嘗試去獲取鎖,若是獲取成功就直接插入相應的位置,若是已經有線程獲取該Segment的鎖,那當前線程會以自旋的方式去繼續的調用tryLock()方法去獲取鎖,超過指定次數就掛起,等待喚醒

get操做

ConcurrentHashMap的get操做跟HashMap相似,只是ConcurrentHashMap第一次須要通過一次hash定位到Segment的位置,而後再hash定位到指定的HashEntry,遍歷該HashEntry下的鏈表進行對比,成功就返回,不成功就返回null

size操做

計算ConcurrentHashMap的元素大小是一個有趣的問題,由於他是併發操做的,就是在你計算size的時候,他還在併發的插入數據,可能會致使你計算出來的size和你實際的size有相差(在你return size的時候,插入了多個數據),要解決這個問題,JDK1.7版本用兩種方案

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
try  {
     for  (;;) {
         if  (retries++ == RETRIES_BEFORE_LOCK) {
             for  ( int  j =  0 ; j < segments.length; ++j) ensureSegment(j).lock();  // force creation
         }
         sum = 0L;
         size =  0 ;
         overflow =  false ;
         for  ( int  j =  0 ; j < segments.length; ++j) {
             Segment<K,V> seg = segmentAt(segments, j);
             if  (seg !=  null ) { sum += seg.modCount;  int  c = seg.count;  if  (c <  0  || (size += c) <  0 )
                overflow =  true ;
             } }
         if  (sum == last)  break ;
         last = sum; } }
finally  {
     if  (retries > RETRIES_BEFORE_LOCK) {
         for  ( int  j =  0 ; j < segments.length; ++j)
             segmentAt(segments, j).unlock();
     }
}
  1. 第一種方案他會使用不加鎖的模式去嘗試屢次計算ConcurrentHashMap的size,最多三次,比較先後兩次計算的結果,結果一致就認爲當前沒有元素加入,計算的結果是準確的
  2. 第二種方案是若是第一種方案不符合,他就會給每一個Segment加上鎖,而後計算ConcurrentHashMap的size返回

JDK1.8的實現

JDK1.8的實現已經摒棄了Segment的概念,而是直接用Node數組+鏈表+紅黑樹的數據結構來實現,併發控制使用Synchronized和CAS來操做,整個看起來就像是優化過且線程安全的HashMap,雖然在JDK1.8中還能看到Segment的數據結構,可是已經簡化了屬性,只是爲了兼容舊版本

在深刻JDK1.8的put和get實現以前要知道一些常量設計和數據結構,這些是構成ConcurrentHashMap實現結構的基礎,下面看一下基本屬性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// node數組最大容量:2^30=1073741824
private  static  final  int  MAXIMUM_CAPACITY =  1  <<  30 ;
// 默認初始值,必須是2的幕數
private  static  final  int  DEFAULT_CAPACITY =  16 ;
//數組可能最大值,須要與toArray()相關方法關聯
static  final  int  MAX_ARRAY_SIZE = Integer.MAX_VALUE -  8 ;
//併發級別,遺留下來的,爲兼容之前的版本
private  static  final  int  DEFAULT_CONCURRENCY_LEVEL =  16 ;
// 負載因子
private  static  final  float  LOAD_FACTOR =  0 .75f;
// 鏈表轉紅黑樹閥值,> 8 鏈表轉換爲紅黑樹
static  final  int  TREEIFY_THRESHOLD =  8 ;
//樹轉鏈表閥值,小於等於6(tranfer時,lc、hc=0兩個計數器分別++記錄原bin、新binTreeNode數量,<=UNTREEIFY_THRESHOLD 則untreeify(lo))
static  final  int  UNTREEIFY_THRESHOLD =  6 ;
static  final  int  MIN_TREEIFY_CAPACITY =  64 ;
private  static  final  int  MIN_TRANSFER_STRIDE =  16 ;
private  static  int  RESIZE_STAMP_BITS =  16 ;
// 2^15-1,help resize的最大線程數
private  static  final  int  MAX_RESIZERS = ( 1  << ( 32  - RESIZE_STAMP_BITS)) -  1 ;
// 32-16=16,sizeCtl中記錄size大小的偏移量
private  static  final  int  RESIZE_STAMP_SHIFT =  32  - RESIZE_STAMP_BITS;
// forwarding nodes的hash值
static  final  int  MOVED     = - 1 ;
// 樹根節點的hash值
static  final  int  TREEBIN   = - 2 ;
// ReservationNode的hash值
static  final  int  RESERVED  = - 3 ;
// 可用處理器數量
static  final  int  NCPU = Runtime.getRuntime().availableProcessors();
//存放node的數組
transient  volatile  Node<K,V>[] table;
/*控制標識符,用來控制table的初始化和擴容的操做,不一樣的值有不一樣的含義
  *當爲負數時:- 1 表明正在初始化,-N表明有N- 1 個線程正在 進行擴容
  *當爲 0 時:表明當時的table尚未被初始化
  *當爲正數時:表示初始化或者下一次進行擴容的大小
private  transient  volatile  int  sizeCtl;

基本屬性定義了ConcurrentHashMap的一些邊界以及操做時的一些控制,下面看一些內部的一些結構組成,這些是整個ConcurrentHashMap整個數據結構的核心

Node

Node是ConcurrentHashMap存儲結構的基本單元,繼承於HashMap中的Entry,用於存儲數據,源代碼以下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
static  class  Node<K,V>  implements  Map.Entry<K,V> {
     //鏈表的數據結構
     final  int  hash;
     final  K key;
     //val和next都會在擴容時發生變化,因此加上volatile來保持可見性和禁止重排序
     volatile  V val;
     volatile  Node<K,V> next;
     Node( int  hash, K key, V val, Node<K,V> next) {
         this .hash = hash;
         this .key = key;
         this .val = val;
         this .next = next;
     }
     public  final  K getKey()       {  return  key; }
     public  final  V getValue()     {  return  val; }
     public  final  int  hashCode()   {  return  key.hashCode() ^ val.hashCode(); }
     public  final  String toString(){  return  key +  "="  + val; }
     //不容許更新value 
     public  final  V setValue(V value) {
         throw  new  UnsupportedOperationException();
     }
     public  final  boolean  equals(Object o) {
         Object k, v, u; Map.Entry<?,?> e;
         return  ((o  instanceof  Map.Entry) &&
                 (k = (e = (Map.Entry<?,?>)o).getKey()) !=  null  &&
                 (v = e.getValue()) !=  null  &&
                 (k == key || k.equals(key)) &&
                 (v == (u = val) || v.equals(u)));
     }
     //用於map中的get()方法,子類重寫
     Node<K,V> find( int  h, Object k) {
         Node<K,V> e =  this ;
         if  (k !=  null ) {
             do  {
                 K ek;
                 if  (e.hash == h &&
                     ((ek = e.key) == k || (ek !=  null  && k.equals(ek))))
                     return  e;
             while  ((e = e.next) !=  null );
         }
         return  null ;
     }
}

Node數據結構很簡單,從上可知,就是一個鏈表,可是隻容許對數據進行查找,不容許進行修改

TreeNode

TreeNode繼承與Node,可是數據結構換成了二叉樹結構,它是紅黑樹的數據的存儲結構,用於紅黑樹中存儲數據,當鏈表的節點數大於8時會轉換成紅黑樹的結構,他就是經過TreeNode做爲存儲結構代替Node來轉換成黑紅樹源代碼以下

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
static  final  class  TreeNode<K,V>  extends  Node<K,V> {
     //樹形結構的屬性定義
     TreeNode<K,V> parent;   // red-black tree links
     TreeNode<K,V> left;
     TreeNode<K,V> right;
     TreeNode<K,V> prev;     // needed to unlink next upon deletion
     boolean  red;  //標誌紅黑樹的紅節點
     TreeNode( int  hash, K key, V val, Node<K,V> next,
              TreeNode<K,V> parent) {
         super (hash, key, val, next);
         this .parent = parent;
     }
     Node<K,V> find( int  h, Object k) {
         return  findTreeNode(h, k,  null );
     }
     //根據key查找 從根節點開始找出相應的TreeNode,
     final  TreeNode<K,V> findTreeNode( int  h, Object k, Class<?> kc) {
         if  (k !=  null ) {
             TreeNode<K,V> p =  this ;
             do   {
                 int  ph, dir; K pk; TreeNode<K,V> q;
                 TreeNode<K,V> pl = p.left, pr = p.right;
                 if  ((ph = p.hash) > h)
                     p = pl;
                 else  if  (ph < h)
                     p = pr;
                 else  if  ((pk = p.key) == k || (pk !=  null  && k.equals(pk)))
                     return  p;
                 else  if  (pl ==  null )
                     p = pr;
                 else  if  (pr ==  null )
                     p = pl;
                 else  if  ((kc !=  null  ||
                           (kc = comparableClassFor(k)) !=  null ) &&
                          (dir = compareComparables(kc, k, pk)) !=  0 )
                     p = (dir <  0 ) ? pl : pr;
                 else  if  ((q = pr.findTreeNode(h, k, kc)) !=  null )
                     return  q;
                 else
                     p = pl;
             while  (p !=  null );
         }
         return  null ;
     }
}

TreeBin

TreeBin從字面含義中能夠理解爲存儲樹形結構的容器,而樹形結構就是指TreeNode,因此TreeBin就是封裝TreeNode的容器,它提供轉換黑紅樹的一些條件和鎖的控制,部分源碼結構以下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
static  final  class  TreeBin<K,V>  extends  Node<K,V> {
     //指向TreeNode列表和根節點
     TreeNode<K,V> root;
     volatile  TreeNode<K,V> first;
     volatile  Thread waiter;
     volatile  int  lockState;
     // 讀寫鎖狀態
     static  final  int  WRITER =  1 // 獲取寫鎖的狀態
     static  final  int  WAITER =  2 // 等待寫鎖的狀態
     static  final  int  READER =  4 // 增長數據時讀鎖的狀態
     /**
      * 初始化紅黑樹
      */
     TreeBin(TreeNode<K,V> b) {
         super (TREEBIN,  null null null );
         this .first = b;
         TreeNode<K,V> r =  null ;
         for  (TreeNode<K,V> x = b, next; x !=  null ; x = next) {
             next = (TreeNode<K,V>)x.next;
             x.left = x.right =  null ;
             if  (r ==  null ) {
                 x.parent =  null ;
                 x.red =  false ;
                 r = x;
             }
             else  {
                 K k = x.key;
                 int  h = x.hash;
                 Class<?> kc =  null ;
                 for  (TreeNode<K,V> p = r;;) {
                     int  dir, ph;
                     K pk = p.key;
                     if  ((ph = p.hash) > h)
                         dir = - 1 ;
                     else  if  (ph < h)
                         dir =  1 ;
                     else  if  ((kc ==  null  &&
                               (kc = comparableClassFor(k)) ==  null ) ||
                              (dir = compareComparables(kc, k, pk)) ==  0 )
                         dir = tieBreakOrder(k, pk);
                         TreeNode<K,V> xp = p;
                     if  ((p = (dir <=  0 ) ? p.left : p.right) ==  null ) {
                         x.parent = xp;
                         if  (dir <=  0 )
                             xp.left = x;
                         else
                             xp.right = x;
                         r = balanceInsertion(r, x);
                         break ;
                     }
                 }
             }
         }
         this .root = r;
         assert  checkInvariants(root);
     }
     ......
}

介紹了ConcurrentHashMap主要的屬性與內部的數據結構,如今經過一個簡單的例子以debug的視角看看ConcurrentHashMap的具體操做細節

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public  class  TestConcurrentHashMap{   
     public  static  void  main(String[] args){
         ConcurrentHashMap<String,String> map =  new  ConcurrentHashMap();  //初始化ConcurrentHashMap
         //新增我的信息
         map.put( "id" , "1" );
         map.put( "name" , "andy" );
         map.put( "sex" , "男" );
         //獲取姓名
         String name = map.get( "name" );
         Assert.assertEquals(name, "andy" );
         //計算大小
         int  size = map.size();
         Assert.assertEquals(size, 3 );
     }
}

咱們先經過new ConcurrentHashMap()來進行初始化  

1
2
public  ConcurrentHashMap() {
}

由上你會發現ConcurrentHashMap的初始化實際上是一個空實現,並無作任何事,這裏後面會講到,這也是和其餘的集合類有區別的地方,初始化操做並非在構造函數實現的,而是在put操做中實現,固然ConcurrentHashMap還提供了其餘的構造函數,有指定容量大小或者指定負載因子,跟HashMap同樣,這裏就不作介紹了

put操做

在上面的例子中咱們新增我的信息會調用put方法,咱們來看下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
public  V put(K key, V value) {
     return  putVal(key, value,  false );
}
/** Implementation for put and putIfAbsent */
final  V putVal(K key, V value,  boolean  onlyIfAbsent) {
     if  (key ==  null  || value ==  null throw  new  NullPointerException();
     int  hash = spread(key.hashCode());  //兩次hash,減小hash衝突,能夠均勻分佈
     int  binCount =  0 ;
     for  (Node<K,V>[] tab = table;;) {  //對這個table進行迭代
         Node<K,V> f;  int  n, i, fh;
         //這裏就是上面構造方法沒有進行初始化,在這裏進行判斷,爲null就調用initTable進行初始化,屬於懶漢模式初始化
         if  (tab ==  null  || (n = tab.length) ==  0 )
             tab = initTable();
         else  if  ((f = tabAt(tab, i = (n -  1 ) & hash)) ==  null ) { //若是i位置沒有數據,就直接無鎖插入
             if  (casTabAt(tab, i,  null ,
                          new  Node<K,V>(hash, key, value,  null )))
                 break ;                    // no lock when adding to empty bin
         }
         else  if  ((fh = f.hash) == MOVED) //若是在進行擴容,則先進行擴容操做
             tab = helpTransfer(tab, f);
         else  {
             V oldVal =  null ;
             //若是以上條件都不知足,那就要進行加鎖操做,也就是存在hash衝突,鎖住鏈表或者紅黑樹的頭結點
             synchronized  (f) {
                 if  (tabAt(tab, i) == f) {
                     if  (fh >=  0 ) {  //表示該節點是鏈表結構
                         binCount =  1 ;
                         for  (Node<K,V> e = f;; ++binCount) {
                             K ek;
                             //這裏涉及到相同的key進行put就會覆蓋原先的value
                             if  (e.hash == hash &&
                                 ((ek = e.key) == key ||
                                  (ek !=  null  && key.equals(ek)))) {
                                 oldVal = e.val;
                                 if  (!onlyIfAbsent)
                                     e.val = value;
                                 break ;
                             }
                             Node<K,V> pred = e;
                             if  ((e = e.next) ==  null ) {   //插入鏈表尾部
                                 pred.next =  new  Node<K,V>(hash, key,
                                                           value,  null );
                                 break ;
                             }
                         }
                     }
                     else  if  (f  instanceof  TreeBin) { //紅黑樹結構
                         Node<K,V> p;
                         binCount =  2 ;
                         //紅黑樹結構旋轉插入
                         if  ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                        value)) !=  null ) {
                             oldVal = p.val;
                             if  (!onlyIfAbsent)
                                 p.val = value;
                         }
                     }
                 }
             }
             if  (binCount !=  0 ) {  //若是鏈表的長度大於8時就會進行紅黑樹的轉換
                 if  (binCount >= TREEIFY_THRESHOLD)
                     treeifyBin(tab, i);
                 if  (oldVal !=  null )
                     return  oldVal;
                 break ;
             }
         }
     }
     addCount(1L, binCount); //統計size,而且檢查是否須要擴容
     return  null ;
}

這個put的過程很清晰,對當前的table進行無條件自循環直到put成功,能夠分紅如下六步流程來概述

  1. 若是沒有初始化就先調用initTable()方法來進行初始化過程
  2. 若是沒有hash衝突就直接CAS插入
  3. 若是還在進行擴容操做就先進行擴容
  4. 若是存在hash衝突,就加鎖來保證線程安全,這裏有兩種狀況,一種是鏈表形式就直接遍歷到尾端插入,一種是紅黑樹就按照紅黑樹結構插入,
  5. 最後一個若是該鏈表的數量大於閾值8,就要先轉換成黑紅樹的結構,break再一次進入循環
  6. 若是添加成功就調用addCount()方法統計size,而且檢查是否須要擴容

如今咱們來對每一步的細節進行源碼分析,在第一步中,符合條件會進行初始化操做,咱們來看看initTable()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
  * Initializes table, using the size recorded in sizeCtl.
  */
private  final  Node<K,V>[] initTable() {
     Node<K,V>[] tab;  int  sc;
     while  ((tab = table) ==  null  || tab.length ==  0 ) { //空的table才能進入初始化操做
         if  ((sc = sizeCtl) <  0 //sizeCtl<0表示其餘線程已經在初始化了或者擴容了,掛起當前線程
             Thread.yield();  // lost initialization race; just spin
         else  if  (U.compareAndSwapInt( this , SIZECTL, sc, - 1 )) { //CAS操做SIZECTL爲-1,表示初始化狀態
             try  {
                 if  ((tab = table) ==  null  || tab.length ==  0 ) {
                     int  n = (sc >  0 ) ? sc : DEFAULT_CAPACITY;
                     @SuppressWarnings ( "unchecked" )
                     Node<K,V>[] nt = (Node<K,V>[]) new  Node<?,?>[n]; //初始化
                     table = tab = nt;
                     sc = n - (n >>>  2 ); //記錄下次擴容的大小
                 }
             finally  {
                 sizeCtl = sc;
             }
             break ;
         }
     }
     return  tab;
}

在第二步中沒有hash衝突就直接調用Unsafe的方法CAS插入該元素,進入第三步若是容器正在擴容,則會調用helpTransfer()方法幫助擴容,如今咱們跟進helpTransfer()方法看看

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
  *幫助從舊的table的元素複製到新的table中
  */
final  Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
     Node<K,V>[] nextTab;  int  sc;
     if  (tab !=  null  && (f  instanceof  ForwardingNode) &&
         (nextTab = ((ForwardingNode<K,V>)f).nextTable) !=  null ) {  //新的table nextTba已經存在前提下才能幫助擴容
         int  rs = resizeStamp(tab.length);
         while  (nextTab == nextTable && table == tab &&
                (sc = sizeCtl) <  0 ) {
             if  ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs +  1  ||
                 sc == rs + MAX_RESIZERS || transferIndex <=  0 )
                 break ;
             if  (U.compareAndSwapInt( this , SIZECTL, sc, sc +  1 )) {
                 transfer(tab, nextTab); //調用擴容方法
                 break ;
             }
         }
         return  nextTab;
     }
     return  table;
}

其實helpTransfer()方法的目的就是調用多個工做線程一塊兒幫助進行擴容,這樣的效率就會更高,而不是隻有檢查到要擴容的那個線程進行擴容操做,其餘線程就要等待擴容操做完成才能工做
既然這裏涉及到擴容的操做,咱們也一塊兒來看看擴容方法transfer()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
private  final  void  transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
         int  n = tab.length, stride;
         // 每核處理的量小於16,則強制賦值16
         if  ((stride = (NCPU >  1 ) ? (n >>>  3 ) / NCPU : n) < MIN_TRANSFER_STRIDE)
             stride = MIN_TRANSFER_STRIDE;  // subdivide range
         if  (nextTab ==  null ) {             // initiating
             try  {
                 @SuppressWarnings ( "unchecked" )
                 Node<K,V>[] nt = (Node<K,V>[]) new  Node<?,?>[n <<  1 ];         //構建一個nextTable對象,其容量爲原來容量的兩倍
                 nextTab = nt;
             catch  (Throwable ex) {       // try to cope with OOME
                 sizeCtl = Integer.MAX_VALUE;
                 return ;
             }
             nextTable = nextTab;
             transferIndex = n;
         }
         int  nextn = nextTab.length;
         // 鏈接點指針,用於標誌位(fwd的hash值爲-1,fwd.nextTable=nextTab)
         ForwardingNode<K,V> fwd =  new  ForwardingNode<K,V>(nextTab);
         // 當advance == true時,代表該節點已經處理過了
         boolean  advance =  true ;
         boolean  finishing =  false // to ensure sweep before committing nextTab
         for  ( int  i =  0 , bound =  0 ;;) {
             Node<K,V> f;  int  fh;
             // 控制 --i ,遍歷原hash表中的節點
             while  (advance) {
                 int  nextIndex, nextBound;
                 if  (--i >= bound || finishing)
                     advance =  false ;
                 else  if  ((nextIndex = transferIndex) <=  0 ) {
                     i = - 1 ;
                     advance =  false ;
                 }
                 // 用CAS計算獲得的transferIndex
                 else  if  (U.compareAndSwapInt
                         ( this , TRANSFERINDEX, nextIndex,
                                 nextBound = (nextIndex > stride ?
                                         nextIndex - stride :  0 ))) {
                     bound = nextBound;
                     i = nextIndex -  1 ;
                     advance =  false ;
                 }
             }
             if  (i <  0  || i >= n || i + n >= nextn) {
                 int  sc;
                 // 已經完成全部節點複製了
                 if  (finishing) {
                     nextTable =  null ;
                     table = nextTab;         // table 指向nextTable
                     sizeCtl = (n <<  1 ) - (n >>>  1 );      // sizeCtl閾值爲原來的1.5倍
                     return ;      // 跳出死循環,
                 }
                 // CAS 更擴容閾值,在這裏面sizectl值減一,說明新加入一個線程參與到擴容操做
                 if  (U.compareAndSwapInt( this , SIZECTL, sc = sizeCtl, sc -  1 )) {
                     if  ((sc -  2 ) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                         return ;
                     finishing = advance =  true ;
                     i = n;  // recheck before commit
                 }
             }
             // 遍歷的節點爲null,則放入到ForwardingNode 指針節點
             else  if  ((f = tabAt(tab, i)) ==  null )
                 advance = casTabAt(tab, i,  null , fwd);
             // f.hash == -1 表示遍歷到了ForwardingNode節點,意味着該節點已經處理過了
             // 這裏是控制併發擴容的核心
             else  if  ((fh = f.hash) == MOVED)
                 advance =  true // already processed
             else  {
                 // 節點加鎖
                 synchronized  (f) {
                     // 節點複製工做
                     if  (tabAt(tab, i) == f) {
                         Node<K,V> ln, hn;
                         // fh >= 0 ,表示爲鏈表節點
                         if  (fh >=  0 ) {
                             // 構造兩個鏈表  一個是原鏈表  另外一個是原鏈表的反序排列
                             int  runBit = fh & n;
                             Node<K,V> lastRun = f;
                             for  (Node<K,V> p = f.next; p !=  null ; p = p.next) {
                                 int  b = p.hash & n;
                                 if  (b != runBit) {
                                     runBit = b;
                                     lastRun = p;
                                 }
                             }
                             if  (runBit ==  0 ) {
                                 ln = lastRun;
                                 hn =  null ;
                             }
                             else  {
                                 hn = lastRun;
                                 ln =  null ;
                             }
                             for  (Node<K,V> p = f; p != lastRun; p = p.next) {
                                 int  ph = p.hash; K pk = p.key; V pv = p.val;
                                 if  ((ph & n) ==  0 )
                                     ln =  new  Node<K,V>(ph, pk, pv, ln);
                                 else
                                     hn =  new  Node<K,V>(ph, pk, pv, hn);
                             }
                             // 在nextTable i 位置處插上鍊表
                             setTabAt(nextTab, i, ln);
                             // 在nextTable i + n 位置處插上鍊表
                             setTabAt(nextTab, i + n, hn);
                             // 在table i 位置處插上ForwardingNode 表示該節點已經處理過了
                             setTabAt(tab, i, fwd);
                             // advance = true 能夠執行--i動做,遍歷節點
                             advance =  true ;
                         }
                         // 若是是TreeBin,則按照紅黑樹進行處理,處理邏輯與上面一致
                         else  if  (f  instanceof  TreeBin) {
                             TreeBin<K,V> t = (TreeBin<K,V>)f;
                             TreeNode<K,V> lo =  null , loTail =  null ;
                             TreeNode<K,V> hi =  null , hiTail =  null ;
                             int  lc =  0 , hc =  0 ;
                             for  (Node<K,V> e = t.first; e !=  null ; e = e.next) {
                                 int  h = e.hash;
                                 TreeNode<K,V> p =  new  TreeNode<K,V>
                                         (h, e.key, e.val,  null null );
                                 if  ((h & n) ==  0 ) {
                                     if  ((p.prev = loTail) ==  null )
                                         lo = p;
                                     else
                                         loTail.next = p;
                                     loTail = p;
                                     ++lc;
                                 }
                                 else  {
                                     if  ((p.prev = hiTail) ==  null )
                                         hi = p;
                                     else
                                         hiTail.next = p;
                                     hiTail = p;
                                     ++hc;
                                 }
                             }
                             // 擴容後樹節點個數若<=6,將樹轉鏈表
                             ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                     (hc !=  0 ) ?  new  TreeBin<K,V>(lo) : t;
                             hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                     (lc !=  0 ) ?  new  TreeBin<K,V>(hi) : t;
                             setTabAt(nextTab, i, ln);
                             setTabAt(nextTab, i + n, hn);
                             setTabAt(tab, i, fwd);
                             advance =  true ;
                         }
                     }
                 }
             }
         }
     }

其實helpTransfer()方法的目的就是調用多個工做線程一塊兒幫助進行擴容,這樣的效率就會更高,而不是隻有檢查到要擴容的那個線程進行擴容操做,其餘線程就要等待擴容操做完成才能工做
既然這裏涉及到擴容的操做,咱們也一塊兒來看看擴容方法transfer()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
private  final  void  transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
         int  n = tab.length, stride;
         // 每核處理的量小於16,則強制賦值16
         if  ((stride = (NCPU >  1 ) ? (n >>>  3 ) / NCPU : n) < MIN_TRANSFER_STRIDE)
             stride = MIN_TRANSFER_STRIDE;  // subdivide range
         if  (nextTab ==  null ) {             // initiating
             try  {
                 @SuppressWarnings ( "unchecked" )
                 Node<K,V>[] nt = (Node<K,V>[]) new  Node<?,?>[n <<  1 ];         //構建一個nextTable對象,其容量爲原來容量的兩倍
                 nextTab = nt;
             catch  (Throwable ex) {       // try to cope with OOME
                 sizeCtl = Integer.MAX_VALUE;
                 return ;
             }
             nextTable = nextTab;
             transferIndex = n;
         }
         int  nextn = nextTab.length;
         // 鏈接點指針,用於標誌位(fwd的hash值爲-1,fwd.nextTable=nextTab)
         ForwardingNode<K,V> fwd =  new  ForwardingNode<K,V>(nextTab);
         // 當advance == true時,代表該節點已經處理過了
         boolean  advance =  true ;
         boolean  finishing =  false // to ensure sweep before committing nextTab
         for  ( int  i =  0 , bound =  0 ;;) {
             Node<K,V> f;  int  fh;
             // 控制 --i ,遍歷原hash表中的節點
             while  (advance) {
                 int  nextIndex, nextBound;
                 if  (--i >= bound || finishing)
                     advance =  false ;
                 else  if  ((nextIndex = transferIndex) <=  0 ) {
                     i = - 1 ;
                     advance =  false ;
                 }
                 // 用CAS計算獲得的transferIndex
                 else  if  (U.compareAndSwapInt
                         ( this , TRANSFERINDEX, nextIndex,
                                 nextBound = (nextIndex > stride ?
                                         nextIndex - stride :  0 ))) {
                     bound = nextBound;
                     i = nextIndex -  1 ;
                     advance =  false ;
                 }
             }
             if  (i <  0  || i >= n || i + n >= nextn) {
                 int  sc;
                 // 已經完成全部節點複製了
                 if  (finishing) {
                     nextTable =  null ;
                     table = nextTab;         // table 指向nextTable
                     sizeCtl = (n <<  1 ) - (n >>>  1 );      // sizeCtl閾值爲原來的1.5倍
                     return ;      // 跳出死循環,
                 }
                 // CAS 更擴容閾值,在這裏面sizectl值減一,說明新加入一個線程參與到擴容操做
                 if  (U.compareAndSwapInt( this , SIZECTL, sc = sizeCtl, sc -  1 )) {
                     if  ((sc -  2 ) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                         return ;
                     finishing = advance =  true ;
                     i = n;  // recheck before commit
                 }
             }
             // 遍歷的節點爲null,則放入到ForwardingNode 指針節點
             else  if  ((f = tabAt(tab, i)) ==  null )
                 advance = casTabAt(tab, i,  null , fwd);
             // f.hash == -1 表示遍歷到了ForwardingNode節點,意味着該節點已經處理過了
             // 這裏是控制併發擴容的核心
             else  if  ((fh = f.hash) == MOVED)
                 advance =  true // already processed
             else  {
                 // 節點加鎖
                 synchronized  (f) {
                     // 節點複製工做
                     if  (tabAt(tab, i) == f) {
                         Node<K,V> ln, hn;
                         // fh >= 0 ,表示爲鏈表節點
                         if  (fh >=  0 ) {
                             // 構造兩個鏈表  一個是原鏈表  另外一個是原鏈表的反序排列
                             int  runBit = fh & n;
                             Node<K,V> lastRun = f;
                             for  (Node<K,V> p = f.next; p !=  null ; p = p.next) {
                                 int  b = p.hash & n;
                                 if  (b != runBit) {
                                     runBit = b;
                                     lastRun = p;
                                 }
                             }
                             if  (runBit ==  0 ) {
                                 ln = lastRun;
                                 hn =  null ;
                             }
                             else  {
                                 hn = lastRun;
                                 ln =  null ;
                             }
                             for  (Node<K,V> p = f; p != lastRun; p = p.next) {
                                 int  ph = p.hash; K pk = p.key; V pv = p.val;
                                 if  ((ph & n) ==  0 )
                                     ln =  new  Node<K,V>(ph, pk, pv, ln);
                                 else
                                     hn =  new  Node<K,V>(ph, pk, pv, hn);
                             }
                             // 在nextTable i 位置處插上鍊表
                             setTabAt(nextTab, i, ln);
                             // 在nextTable i + n 位置處插上鍊表
                             setTabAt(nextTab, i + n, hn);
                             // 在table i 位置處插上ForwardingNode 表示該節點已經處理過了
                             setTabAt(tab, i, fwd);
                             // advance = true 能夠執行--i動做,遍歷節點
                             advance =  true ;
                         }
                         // 若是是TreeBin,則按照紅黑樹進行處理,處理邏輯與上面一致
                         else  if  (f  instanceof  TreeBin) {
                             TreeBin<K,V> t = (TreeBin<K,V>)f;
                             TreeNode<K,V> lo =  null , loTail =  null ;
                             TreeNode<K,V> hi =  null , hiTail =  null ;
                             int  lc =  0 , hc =  0 ;
                             for  (Node<K,V> e = t.first; e !=  null ; e = e.next) {
                                 int  h = e.hash;
                                 TreeNode<K,V> p =  new  TreeNode<K,V>
                                         (h, e.key, e.val,  null null );
                                 if  ((h & n) ==  0 ) {
                                     if  ((p.prev = loTail) ==  null )
                                         lo = p;
                                     else
                                         loTail.next = p;
                                     loTail = p;
                                     ++lc;
                                 }
                                 else  {
                                     if  ((p.prev = hiTail) ==  null )
                                         hi = p;
                                     else
                                         hiTail.next = p;
                                     hiTail = p;
                                     ++hc;
                                 }
                             }
                             // 擴容後樹節點個數若<=6,將樹轉鏈表
                             ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                     (hc !=  0 ) ?  new  TreeBin<K,V>(lo) : t;
                             hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                     (lc !=  0 ) ?  new  TreeBin<K,V>(hi) : t;
                             setTabAt(nextTab, i, ln);
                             setTabAt(nextTab, i + n, hn);
                             setTabAt(tab, i, fwd);
                             advance =  true ;
                         }
                     }
                 }
             }
         }
     }

擴容過程有點複雜,這裏主要涉及到多線程併發擴容,ForwardingNode的做用就是支持擴容操做,將已處理的節點和空節點置爲ForwardingNode,併發處理時多個線程通過ForwardingNode就表示已經遍歷了,就日後遍歷,下圖是多線程合做擴容的過程:

介紹完擴容過程,咱們再次回到put流程,在第四步中是向鏈表或者紅黑樹里加節點,到第五步,會調用treeifyBin()方法進行鏈表轉紅黑樹的過程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private  final  void  treeifyBin(Node<K,V>[] tab,  int  index) {
     Node<K,V> b;  int  n, sc;
     if  (tab !=  null ) {
         //若是整個table的數量小於64,就擴容至原來的一倍,不轉紅黑樹了
         //由於這個閾值擴容能夠減小hash衝突,沒必要要去轉紅黑樹
         if  ((n = tab.length) < MIN_TREEIFY_CAPACITY)
             tryPresize(n <<  1 );
         else  if  ((b = tabAt(tab, index)) !=  null  && b.hash >=  0 ) {
             synchronized  (b) {
                 if  (tabAt(tab, index) == b) {
                     TreeNode<K,V> hd =  null , tl =  null ;
                     for  (Node<K,V> e = b; e !=  null ; e = e.next) {
                         //封裝成TreeNode
                         TreeNode<K,V> p =
                             new  TreeNode<K,V>(e.hash, e.key, e.val,
                                               null null );
                         if  ((p.prev = tl) ==  null )
                             hd = p;
                         else
                             tl.next = p;
                         tl = p;
                     }
                     //經過TreeBin對象對TreeNode轉換成紅黑樹
                     setTabAt(tab, index,  new  TreeBin<K,V>(hd));
                 }
             }
         }
     }
}

到第六步表示已經數據加入成功了,如今調用addCount()方法計算ConcurrentHashMap的size,在原來的基礎上加一,如今來看看addCount()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
private  final  void  addCount( long  x,  int  check) {
     CounterCell[] as;  long  b, s;
     //更新baseCount,table的數量,counterCells表示元素個數的變化
     if  ((as = counterCells) !=  null  ||
         !U.compareAndSwapLong( this , BASECOUNT, b = baseCount, s = b + x)) {
         CounterCell a;  long  v;  int  m;
         boolean  uncontended =  true ;
         //若是多個線程都在執行,則CAS失敗,執行fullAddCount,所有加入count
         if  (as ==  null  || (m = as.length -  1 ) <  0  ||
             (a = as[ThreadLocalRandom.getProbe() & m]) ==  null  ||
             !(uncontended =
               U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
             fullAddCount(x, uncontended);
             return ;
         }
         if  (check <=  1 )
             return ;
         s = sumCount();
     }
      //check>=0表示須要進行擴容操做
     if  (check >=  0 ) {
         Node<K,V>[] tab, nt;  int  n, sc;
         while  (s >= ( long )(sc = sizeCtl) && (tab = table) !=  null  &&
                (n = tab.length) < MAXIMUM_CAPACITY) {
             int  rs = resizeStamp(n);
             if  (sc <  0 ) {
                 if  ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs +  1  ||
                     sc == rs + MAX_RESIZERS || (nt = nextTable) ==  null  ||
                     transferIndex <=  0 )
                     break ;
                 if  (U.compareAndSwapInt( this , SIZECTL, sc, sc +  1 ))
                     transfer(tab, nt);
             }
             //當前線程發起庫哦哦讓操做,nextTable=null
             else  if  (U.compareAndSwapInt( this , SIZECTL, sc,
                                          (rs << RESIZE_STAMP_SHIFT) +  2 ))
                 transfer(tab,  null );
             s = sumCount();
         }
     }
}

put的流程如今已經分析完了,你能夠從中發現,他在併發處理中使用的是樂觀鎖,當有衝突的時候才進行併發處理,並且流程步驟很清晰,可是細節設計的很複雜,畢竟多線程的場景也複雜

get操做

咱們如今要回到開始的例子中,咱們對我的信息進行了新增以後,咱們要獲取所新增的信息,使用String name = map.get(「name」)獲取新增的name信息,如今咱們依舊用debug的方式來分析下ConcurrentHashMap的獲取方法get()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public  V get(Object key) {
     Node<K,V>[] tab; Node<K,V> e, p;  int  n, eh; K ek;
     int  h = spread(key.hashCode());  //計算兩次hash
     if  ((tab = table) !=  null  && (n = tab.length) >  0  &&
         (e = tabAt(tab, (n -  1 ) & h)) !=  null ) { //讀取首節點的Node元素
         if  ((eh = e.hash) == h) {  //若是該節點就是首節點就返回
             if  ((ek = e.key) == key || (ek !=  null  && key.equals(ek)))
                 return  e.val;
         }
         //hash值爲負值表示正在擴容,這個時候查的是ForwardingNode的find方法來定位到nextTable來
         //查找,查找到就返回
         else  if  (eh <  0 )
             return  (p = e.find(h, key)) !=  null  ? p.val :  null ;
         while  ((e = e.next) !=  null ) { //既不是首節點也不是ForwardingNode,那就往下遍歷
             if  (e.hash == h &&
                 ((ek = e.key) == key || (ek !=  null  && key.equals(ek))))
                 return  e.val;
         }
     }
     return  null ;
}

ConcurrentHashMap的get操做的流程很簡單,也很清晰,能夠分爲三個步驟來描述

  1. 計算hash值,定位到該table索引位置,若是是首節點符合就返回
  2. 若是遇到擴容的時候,會調用標誌正在擴容節點ForwardingNode的find方法,查找該節點,匹配就返回
  3. 以上都不符合的話,就往下遍歷節點,匹配就返回,不然最後就返回null

size操做

最後咱們來看下例子中最後獲取size的方式int size = map.size();,如今讓咱們看下size()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public  int  size() {
     long  n = sumCount();
     return  ((n < 0L) ?  0  :
             (n > ( long )Integer.MAX_VALUE) ? Integer.MAX_VALUE :
             ( int )n);
}
final  long  sumCount() {
     CounterCell[] as = counterCells; CounterCell a;  //變化的數量
     long  sum = baseCount;
     if  (as !=  null ) {
         for  ( int  i =  0 ; i < as.length; ++i) {
             if  ((a = as[i]) !=  null )
                 sum += a.value;
         }
     }
     return  sum;
}

在JDK1.8版本中,對於size的計算,在擴容和addCount()方法就已經有處理了,JDK1.7是在調用size()方法纔去計算,其實在併發集合中去計算size是沒有多大的意義的,由於size是實時在變的,只能計算某一刻的大小,可是某一刻太快了,人的感知是一個時間段,因此並非很精確

總結與思考

其實能夠看出JDK1.8版本的ConcurrentHashMap的數據結構已經接近HashMap,相對而言,ConcurrentHashMap只是增長了同步的操做來控制併發,從JDK1.7版本的ReentrantLock+Segment+HashEntry,到JDK1.8版本中synchronized+CAS+HashEntry+紅黑樹,相對而言,總結以下思考

    1. JDK1.8的實現下降鎖的粒度,JDK1.7版本鎖的粒度是基於Segment的,包含多個HashEntry,而JDK1.8鎖的粒度就是HashEntry(首節點)
    2. JDK1.8版本的數據結構變得更加簡單,使得操做也更加清晰流暢,由於已經使用synchronized來進行同步,因此不須要分段鎖的概念,也就不須要Segment這種數據結構了,因爲粒度的下降,實現的複雜度也增長了
    3. JDK1.8使用紅黑樹來優化鏈表,基於長度很長的鏈表的遍歷是一個很漫長的過程,而紅黑樹的遍歷效率是很快的,代替必定閾值的鏈表,這樣造成一個最佳拍檔
    4. JDK1.8爲何使用內置鎖synchronized來代替重入鎖ReentrantLock,我以爲有如下幾點
      1. 由於粒度下降了,在相對而言的低粒度加鎖方式,synchronized並不比ReentrantLock差,在粗粒度加鎖中ReentrantLock可能經過Condition來控制各個低粒度的邊界,更加的靈活,而在低粒度中,Condition的優點就沒有了
      2. JVM的開發團隊歷來都沒有放棄synchronized,並且基於JVM的synchronized優化空間更大,使用內嵌的關鍵字比使用API更加天然
      3. 在大量的數據操做下,對於JVM的內存壓力,基於API的ReentrantLock會開銷更多的內存,雖然不是瓶頸,可是也是一個選擇依據
相關文章
相關標籤/搜索