ConcurrentHashMap實現原理及源碼分析

  ConcurrentHashMap是Java併發包中提供的一個線程安全且高效的HashMap實現(若對HashMap的實現原理還不甚瞭解,可參考個人另外一篇文章HashMap實現原理及源碼分析),ConcurrentHashMap在併發編程的場景中使用頻率很是之高,本文就來分析下ConcurrentHashMap的實現原理,並對其實現原理進行分析(JDK1.7).html

ConcurrentHashMap實現原理

  衆所周知,哈希表是中很是高效,複雜度爲O(1)的數據結構,在Java開發中,咱們最多見到最頻繁使用的就是HashMap和HashTable,可是在線程競爭激烈的併發場景中使用都不夠合理。node

  HashMap :先說HashMap,HashMap是線程不安全的,在併發環境下,可能會造成環狀鏈表(擴容時可能形成,具體緣由自行百度google或查看源碼分析),致使get操做時,cpu空轉,因此,在併發環境中使用HashMap是很是危險的。算法

  HashTable : HashTable和HashMap的實現原理幾乎同樣,差異無非是1.HashTable不容許key和value爲null;2.HashTable是線程安全的。可是HashTable線程安全的策略實現代價卻太大了,簡單粗暴,get/put全部相關操做都是synchronized的,這至關於給整個哈希表加了一把大鎖,多線程訪問時候,只要有一個線程訪問或操做該對象,那其餘線程只能阻塞,至關於將全部的操做串行化,在競爭激烈的併發場景中性能就會很是差。編程

  HashTable性能差主要是因爲全部操做須要競爭同一把鎖,而若是容器中有多把鎖,每一把鎖鎖一段數據,這樣在多線程訪問時不一樣段的數據時,就不會存在鎖競爭了,這樣即可以有效地提升併發效率。這就是ConcurrentHashMap所採用的"分段鎖"思想。數組

  

ConcurrentHashMap源碼分析   

ConcurrentHashMap採用了很是精妙的"分段鎖"策略,ConcurrentHashMap的主幹是個Segment數組。緩存

 final Segment<K,V>[] segments;

  Segment繼承了ReentrantLock,因此它就是一種可重入鎖(ReentrantLock)。在ConcurrentHashMap,一個Segment就是一個子哈希表,Segment裏維護了一個HashEntry數組,併發環境下,對於不一樣Segment的數據進行操做是不用考慮鎖競爭的。(就按默認的ConcurrentLeve爲16來說,理論上就容許16個線程併發執行,有木有很酷)安全

  因此,對於同一個Segment的操做才需考慮線程同步,不一樣的Segment則無需考慮。數據結構

Segment相似於HashMap,一個Segment維護着一個HashEntry數組多線程

 transient volatile HashEntry<K,V>[] table;

HashEntry是目前咱們提到的最小的邏輯處理單元了。一個ConcurrentHashMap維護一個Segment數組,一個Segment維護一個HashEntry數組。併發

 static final class HashEntry<K,V> { final int hash; final K key; volatile V value; volatile HashEntry<K,V> next; //其餘省略
}    

咱們說Segment相似哈希表,那麼一些屬性就跟咱們以前提到的HashMap差不離,好比負載因子loadFactor,好比閾值threshold等等,看下Segment的構造方法

Segment(float lf, int threshold, HashEntry<K,V>[] tab) { this.loadFactor = lf;//負載因子
            this.threshold = threshold;//閾值
            this.table = tab;//主幹數組即HashEntry數組
        }

咱們來看下ConcurrentHashMap的構造方法

 1  public ConcurrentHashMap(int initialCapacity,  2                                float loadFactor, int concurrencyLevel) {  3           if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)  4               throw new IllegalArgumentException();  5           //MAX_SEGMENTS 爲1<<16=65536,也就是最大併發數爲65536
 6           if (concurrencyLevel > MAX_SEGMENTS)  7               concurrencyLevel = MAX_SEGMENTS;  8           //2的sshif次方等於ssize,例:ssize=16,sshift=4;ssize=32,sshif=5
 9          int sshift = 0; 10          //ssize 爲segments數組長度,根據concurrentLevel計算得出
11          int ssize = 1; 12          while (ssize < concurrencyLevel) { 13              ++sshift; 14              ssize <<= 1; 15  } 16          //segmentShift和segmentMask這兩個變量在定位segment時會用到,後面會詳細講
17          this.segmentShift = 32 - sshift; 18          this.segmentMask = ssize - 1; 19          if (initialCapacity > MAXIMUM_CAPACITY) 20              initialCapacity = MAXIMUM_CAPACITY; 21          //計算cap的大小,即Segment中HashEntry的數組長度,cap也必定爲2的n次方.
22          int c = initialCapacity / ssize; 23          if (c * ssize < initialCapacity) 24              ++c; 25          int cap = MIN_SEGMENT_TABLE_CAPACITY; 26          while (cap < c) 27              cap <<= 1; 28          //建立segments數組並初始化第一個Segment,其他的Segment延遲初始化
29          Segment<K,V> s0 =
30              new Segment<K,V>(loadFactor, (int)(cap * loadFactor), 31                               (HashEntry<K,V>[])new HashEntry[cap]); 32          Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize]; 33          UNSAFE.putOrderedObject(ss, SBASE, s0); 
34          this.segments = ss; 35      }

  初始化方法有三個參數,若是用戶不指定則會使用默認值,initialCapacity爲16,loadFactor爲0.75(負載因子,擴容時須要參考),concurrentLevel爲16。

  從上面的代碼能夠看出來,Segment數組的大小ssize是由concurrentLevel來決定的,可是卻不必定等於concurrentLevel,ssize必定是大於或等於concurrentLevel的最小的2的次冪。好比:默認狀況下concurrentLevel是16,則ssize爲16;若concurrentLevel爲14,ssize爲16;若concurrentLevel爲17,則ssize爲32。爲何Segment的數組大小必定是2的次冪?其實主要是便於經過按位與的散列算法來定位Segment的index。至於更詳細的緣由,有興趣的話能夠參考個人另外一篇文章HashMap實現原理及源碼分析,其中對於數組長度爲何必定要是2的次冪有較爲詳細的分析。

  接下來,咱們來看看put方法

 public V put(K key, V value) { Segment<K,V> s; //concurrentHashMap不容許key/value爲空
        if (value == null) throw new NullPointerException(); //hash函數對key的hashCode從新散列,避免差勁的不合理的hashcode,保證散列均勻
        int hash = hash(key); //返回的hash值無符號右移segmentShift位與段掩碼進行位運算,定位segment
        int j = (hash >>> segmentShift) & segmentMask; if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
             (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
            s = ensureSegment(j); return s.put(key, hash, value, false); }

 從源碼看出,put的主要邏輯也就兩步:1.定位segment並確保定位的Segment已初始化 2.調用Segment的put方法

 關於segmentShift和segmentMask

  segmentShift和segmentMask這兩個全局變量的主要做用是用來定位Segment,int j =(hash >>> segmentShift) & segmentMask。

  segmentMask:段掩碼,假如segments數組長度爲16,則段掩碼爲16-1=15;segments長度爲32,段掩碼爲32-1=31。這樣獲得的全部bit位都爲1,能夠更好地保證散列的均勻性

  segmentShift:2的sshift次方等於ssize,segmentShift=32-sshift。若segments長度爲16,segmentShift=32-4=28;若segments長度爲32,segmentShift=32-5=27。而計算得出的hash值最大爲32位,無符號右移segmentShift,則意味着只保留高几位(其他位是沒用的),而後與段掩碼segmentMask位運算來定位Segment。

  get/put方法

  get方法

 public V get(Object key) { Segment<K,V> s; 
        HashEntry<K,V>[] tab; int h = hash(key); long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
//先定位Segment,再定位HashEntry
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null) { for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE); e != null; e = e.next) { K k; if ((k = e.key) == key || (e.hash == h && key.equals(k))) return e.value; } } return null; }

  get方法無需加鎖,因爲其中涉及到的共享變量都使用volatile修飾,volatile能夠保證內存可見性,因此不會讀取到過時數據。

  來看下concurrentHashMap代理到Segment上的put方法,Segment中的put方法是要加鎖的。只不過是鎖粒度細了而已。

final V put(K key, int hash, V value, boolean onlyIfAbsent) { HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value);//tryLock不成功時會遍歷定位到的HashEnry位置的鏈表(遍歷主要是爲了使CPU緩存鏈表),若找不到,則建立HashEntry。tryLock必定次數後(MAX_SCAN_RETRIES變量決定),則lock。若遍歷過程當中,因爲其餘線程的操做致使鏈表頭結點變化,則須要從新遍歷。 V oldValue; try { HashEntry<K,V>[] tab = table; int index = (tab.length - 1) & hash;//定位HashEntry,能夠看到,這個hash值在定位Segment時和在Segment中定位HashEntry都會用到,只不過定位Segment時只用到高几位。 HashEntry<K,V> first = entryAt(tab, index); for (HashEntry<K,V> e = first;;) { if (e != null) { K k; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldValue = e.value; if (!onlyIfAbsent) { e.value = value; ++modCount; } break; } e = e.next; } else { if (node != null) node.setNext(first); else node = new HashEntry<K,V>(hash, key, value, first); int c = count + 1;
              //若c超出閾值threshold,須要擴容並rehash。擴容後的容量是當前容量的2倍。這樣能夠最大程度避免以前散列好的entry從新散列,具體在另外一篇文章中有詳細分析,不贅述。擴容並rehash的這個過程是比較消耗資源的。
if (c > threshold && tab.length < MAXIMUM_CAPACITY) rehash(node); else setEntryAt(tab, index, node); ++modCount; count = c; oldValue = null; break; } } } finally { unlock(); } return oldValue; }

 總結

  ConcurrentHashMap做爲一種線程安全且高效的哈希表的解決方案,尤爲其中的"分段鎖"的方案,相比HashTable的全表鎖在性能上的提高很是之大。本文對ConcurrentHashMap的實現原理進行了詳細分析,並解讀了部分源碼,但願能幫助到有須要的童鞋。

相關文章
相關標籤/搜索