自學ConcuurentHashMap源碼

自學ConcuurentHashMap源碼

參考:https://my.oschina.net/hosee/blog/675884
http://www.cnblogs.com/ITtangtang/p/3948786.html
本文須要關注的地方html

  1. 利用分段鎖實現多個線程併發寫入、刪除或者修改(默認16);
  2. 利用HashEntry的不變性和 volatile 變量的可見性來保證get讀幾乎不須要加鎖(判斷獲取的entry的value是否爲null,爲null時才使用加鎖的方式再次去獲取,緣由是put的時候的指令重排序)
  3. get的三種狀況分析
  4. 每一個版本的ConcurrentHashMap幾乎都有改動,本文說明的JDK6的源碼。JDK8相比前面的版本改動最大,最後有簡單的說明

1、基礎知識

  併發編程實踐中,ConcurrentHashMap是一個常常被使用的數據結構,相比於Hashtable以及Collections.synchronizedMap(),ConcurrentHashMap可以提供更高的併發度。同步容器將全部對容器狀態的訪問都串行化,以實現它們的線程安全性。這種方法的代價是嚴重下降併發性,當多個線程競爭容器的鎖時,吞吐量將嚴重下降。ConcurrentHashMap關鍵在於使用了分段鎖技術。
  在ConcurrentHashMap的實現中使用了一個包含16個鎖的數組,每一個鎖保護全部散列桶的1/16,其中第N個散列桶由第(N mod 16)個鎖來保護。每個鎖保護的區域稱爲段(Segment),每一個段其實就是一個小的Hashtable,它們有本身的鎖。假設散列函數具備合理的分佈性,而且關鍵字可以均勻分佈,那麼這大約能把對於鎖的請求減小到原來的1/16,正是這項技術使得ConcurrentHashMap可以支持多達16個併發的寫入器。
  ConcurrentHashMap與其餘併發容器一塊兒加強了同步容器類:它們提供的迭代器不會拋出 ConcurrentModificationException,所以不須要在迭代過程當中對容器加鎖。ConcurrentHashMap 返回的迭代器具備弱一致性(Weakly Consistent),而並不是「及時失敗」。弱一致性的迭代器能夠容忍併發的修改,當建立迭代器時會遍歷已有的元素,並能夠(可是不保證)在迭代器被構造後將修改操做反映給容器。
  儘管有這些改進,但仍然有一些須要權衡的因素。對於一些須要在整個Map上進行計算的方法,例如size和isEmpty,這些方法的語義被略微減弱了以反映容器的併發特性。因爲size 返回的結果在計算時可能已通過期了,它實際上只是一個估計值,所以容許size返回一個近似值而不是一個精確值。由於:事實上size和isEmpty這樣的方法在併發環境下的用處很小,由於它們的返回值總在不斷變化。所以,這些操做的需求被弱化了,以換取對其餘更重要操做的性能優化,包括get、put、containsKey和remove等。
  鎖分段的一個劣勢在於:與採用單個鎖來實現獨佔訪問相比,要獲取多個鎖來實現獨佔訪問將更加困難而且開銷更高。一般,在執行一個操做時最多隻需獲取一個鎖,但在某些狀況下須要加鎖整個容器,例如當ConcurrentHashMap須要擴展映射範圍,以及從新計算鍵值的散列值要分佈到更大的桶集合中時,就須要獲取分段鎖集合中的全部鎖。java

1. 實現原理

  ConcurrentHashMap使用分段鎖技術,將數據分紅一段一段的存儲,而後給每一段數據配一把鎖,當一個線程佔用鎖訪問其中一個段數據的時候,其餘段的數據也能被其餘線程訪問,可以實現真正的併發訪問。以下圖是ConcurrentHashMap的內部結構圖:

它把區間按照併發級別(concurrentLevel),分紅了若干個segment。默認狀況下內部按併發級別爲16來建立。對於每一個segment的容量,默認狀況也是16。固然併發級別(concurrentLevel)和每一個段(segment)的初始容量都是能夠經過構造函數設定的。c++

2. 源碼解讀

  ConcurrentHashMap中主要實體類就是三個:ConcurrentHashMap(整個Hash表),Segment(充當鎖的角色,每一個 Segment 對象守護整個散列表的某一個段的若干個桶),HashEntry(鍵值節點),對應上面的圖能夠看出之間的關係。算法

HashEntry 類

  在 HashEntry 類中,key,hash 和 next 域都被聲明爲 final 型,value 域被聲明爲 volatile 型。編程

static final class HashEntry<K,V> {  
     final K key;  
     final int hash;  
     volatile V value;  
     final HashEntry<K,V> next;  
 }

  在 ConcurrentHashMap 中,在散列時若是產生「衝突」,將採用「鏈地址法」來處理「衝突」:把「衝突」的 HashEntry 對象連接成一個單向鏈表。因爲 HashEntry 的 next 域爲 final 型,因此新節點只能在鏈表的表頭處插入。 下圖是在一個空桶中依次插入 A,B,C 三個 HashEntry 對象後的結構圖:

注意:因爲只能在表頭插入,因此鏈表中節點的順序和插入的順序相反。其實,哪怕next不是final的,也應該從表頭插入,由於從表尾插入的話,首先須要遍歷到表尾,而後才能插入節點,複雜度O(n)。數組

Segment 類

  Segment 類繼承於 ReentrantLock 類,從而每一個segment均可以當作一個鎖。每一個 Segment 對象用來守護其(成員對象 table 中)包含的若干個桶。
  table 是一個由 HashEntry 對象組成的數組。table 數組的每個數組成員就是散列表的一個桶。安全

static final class Segment<K,V> extends ReentrantLock implements Serializable { 
       /** 
        * 在本 segment 範圍內,包含的 HashEntry 元素的個數
        * 該變量被聲明爲 volatile 型
        */ 
       transient volatile int count; 
 
       /** 
        * table 被更新的次數
        */ 
       transient int modCount; 
 
       /** 
        * 當 table 中包含的 HashEntry 元素的個數超過本變量值時,觸發 table 的再散列
        */ 
       transient int threshold; 
 
       /** 
        * table 是由 HashEntry 對象組成的數組
        */ 
       transient volatile HashEntry<K,V>[] table; 
 
       final float loadFactor; 
 
       Segment(int initialCapacity, float lf) { 
           loadFactor = lf; 
           setTable(HashEntry.<K,V>newArray(initialCapacity)); 
       } 
 
       void setTable(HashEntry<K,V>[] newTable) { 
           threshold = (int)(newTable.length * loadFactor); 
           table = newTable; 
       } 
 
       /** 
        * 根據 key 的散列值,找到 table 中對應的那個桶(table 數組的某個數組成員)
        */ 
       HashEntry<K,V> getFirst(int hash) { 
           HashEntry<K,V>[] tab = table; 
           return tab[hash & (tab.length - 1)]; 
       } 
}

ConcurrentHashMap 類

public class ConcurrentHashMap<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V>, Serializable { 
 
   /** 
    * 散列表的默認初始容量爲 16,即初始默認爲 16 段
    */ 
   static final int DEFAULT_INITIAL_CAPACITY= 16; 
   static final float DEFAULT_LOAD_FACTOR= 0.75f; 
 
   /** 
    * 散列表的默認併發級別爲 16。該值表示當前更新線程的估計數,在構造函數中沒有指定這個參數時,使用本參數
    */ 
   static final int DEFAULT_CONCURRENCY_LEVEL= 16; 
 
   /** 
    * segments 的掩碼值
    * key 的散列碼的高位用來選擇具體的 segment 
    */ 
   final int segmentMask; 
 
   /** 
    * 偏移量
    */ 
   final int segmentShift; 
 
   /** 
    * 由 Segment 對象組成的數組
    */ 
   final Segment<K,V>[] segments; 
 
   /** 
    * 建立一個帶有指定初始容量、負載因子和併發級別的新的空散列表
    */ 
   public ConcurrentHashMap(int initialCapacity,  float loadFactor, int concurrencyLevel) { 
       if(!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) 
           throw new IllegalArgumentException(); 
 
       if(concurrencyLevel > MAX_SEGMENTS) 
           concurrencyLevel = MAX_SEGMENTS; 
 
       // 尋找最佳匹配參數(不小於給定參數的最接近的 2 次冪) 
       int sshift = 0; 
       int ssize = 1; 
       while(ssize < concurrencyLevel) { 
           ++sshift; 
           ssize <<= 1; 
       } 
       segmentShift = 32 - sshift;       // 偏移量值
       segmentMask = ssize - 1;           // 掩碼值 
       this.segments = Segment.newArray(ssize);   // 建立數組
 
       if (initialCapacity > MAXIMUM_CAPACITY) 
           initialCapacity = MAXIMUM_CAPACITY; 
       int c = initialCapacity / ssize; 
       if(c * ssize < initialCapacity) 
           ++c; 
       int cap = 1; 
       while(cap < c) 
           cap <<= 1; 
 
       // 依次遍歷每一個數組元素
       for(int i = 0; i < this.segments.length; ++i) 
           // 初始化每一個數組元素引用的 Segment 對象
          this.segments[i] = new Segment<K,V>(cap, loadFactor); 
   } 
 
   /** 
    * 建立一個帶有默認初始容量 (16)、默認加載因子 (0.75) 和 默認併發級別 (16) 的空散列表。
    */ 
   public ConcurrentHashMap() { 
       this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL); 
}

put方法

  首先,根據 key 計算出對應的 hash 值:性能優化

public V put(K key, V value) { 
       if (value == null)          //ConcurrentHashMap 中不容許用 null 做爲映射值
           throw new NullPointerException(); 
       int hash = hash(key.hashCode());        // 計算鍵對應的散列碼
       // 根據散列碼找到對應的 Segment 
       return segmentFor(hash).put(key, hash, value, false); 
}

  而後,根據 hash 值找到對應的Segment 對象:數據結構

/** 
    * 使用 key 的散列碼來獲得 segments 數組中對應的 Segment 
    */ 
final Segment<K,V> segmentFor(int hash) { 
   // 將散列值右移 segmentShift 個位,並在高位填充 0 ,而後把獲得的值與 segmentMask 相「與」,從而獲得 hash 值對應的 segments 數組的下標值,最後根據下標值返回散列碼對應的 Segment 對象
       return segments[(hash >>> segmentShift) & segmentMask]; 
}

  最後,在這個 Segment 中執行具體的 put 操做:多線程

V put(K key, int hash, V value, boolean onlyIfAbsent) { 
           lock();  // 加鎖,這裏是鎖定某個 Segment 對象而非整個 ConcurrentHashMap 
           try { 
               int c = count; 
 
               if (c++ > threshold)     // 若是超過再散列的閾值
                   rehash();              // 執行再散列,table 數組的長度將擴充一倍
 
               HashEntry<K,V>[] tab = table; 
               int index = hash & (tab.length - 1); 
               // 找到散列值對應的具體的那個桶
               HashEntry<K,V> first = tab[index]; 
 
               HashEntry<K,V> e = first; 
               while (e != null && (e.hash != hash || !key.equals(e.key))) 
                   e = e.next; 
 
               V oldValue; 
               if (e != null) {            // 若是鍵值對已經存在
                   oldValue = e.value; 
                   if (!onlyIfAbsent) 
                       e.value = value;    // 替換 value 值
               } 
               else {                        // 鍵值對不存在 
                   oldValue = null; 
                   ++modCount;         // 要添加新節點到鏈表中,因此 modCont 要加 1  
                   // 建立新節點,並添加到鏈表的頭部 
                   tab[index] = new HashEntry<K,V>(key, hash, first, value); 
                   count = c;               // 寫 count 變量
               } 
               return oldValue; 
           } finally { 
               unlock();                     // 解鎖
           } 
       }

  注意:這裏的加鎖操做是針對某個具體的 Segment,鎖定的是該 Segment 而不是整個 ConcurrentHashMap。由於插入鍵 / 值對操做只是在這個 Segment 包含的某個桶中完成,不須要鎖定整個ConcurrentHashMap。此時,其餘寫線程對另外 15 個Segment 的加鎖並不會由於當前線程對這個 Segment 的加鎖而阻塞。同時,全部讀線程幾乎不會因本線程的加鎖而阻塞(除非讀線程恰好讀到這個 Segment 中某個 HashEntry 的 value 域的值爲 null,此時須要加鎖後從新讀取該值)。
  相比較於 HashTable 和由同步包裝器包裝的 HashMap每次只能有一個線程執行讀或寫操做,ConcurrentHashMap 在併發訪問性能上有了質的提升。在理想狀態下,ConcurrentHashMap 能夠支持 16 個線程執行併發寫操做(若是併發級別設置爲 16),及任意數量線程的讀操做。

get方法

public V get(Object key) {
    int hash = hash(key); // throws NullPointerException if key null
    return segmentFor(hash).get(key, hash);
}
V get(Object key, int hash) { 
           if(count != 0) {       // 首先讀 count 變量
               HashEntry<K,V> e = getFirst(hash); 
               while(e != null) { 
                   if(e.hash == hash && key.equals(e.key)) { 
                       V v = e.value; 
                       if(v != null)            
                           return v; 
                       // 若是讀到 value 域爲 null,說明發生了重排序,加鎖後從新讀取
                       return readValueUnderLock(e); 
                   } 
                   e = e.next; 
               } 
           } 
           return null; 
       }

  ConcurrentHashMap徹底容許多個讀操做併發進行,讀操做並不須要加鎖。關鍵是用 HashEntry 對象的不變性來下降讀操做對加鎖的需求。只是判斷獲取的entry的value是否爲null,爲null時才使用加鎖的方式再次去獲取
  在代碼清單「HashEntry 類的定義」中咱們能夠看到,HashEntry 中的 key,hash,next 都聲明爲 final 型。這意味着,不能把節點添加到連接的中間和尾部,也不能在連接的中間和尾部刪除節點。這個特性能夠保證:在訪問某個節點時,這個節點以後的連接不會被改變。這個特性能夠大大下降處理鏈表時的複雜性。
下面分析在get的時候的線程安全性

一、若是get的過程當中另外一個線程剛好新增entry


  HashEntry 類的 value 域被聲明爲 volatile 型,Java 的內存模型能夠保證:某個寫線程對 value 域的寫入立刻能夠被後續的某個讀線程「看」到。在 ConcurrentHashMap 中,不容許用 null 做爲鍵和值,當讀線程讀到某個 HashEntry 的 value 域的值爲 null 時,便知道發生了指令重排序現象(注意:volatile變量重排序規則,同時也是先行發生原則的一部分:對一個volatile變量的寫操做先行發生於後面對這個變量的讀操做,這裏的「後面」一樣是指時間上的前後順序。因此,在tab[index] = new HashEntry<K,V>(key, hash, first, value);中,可能會出現當前線程獲得的newEntry對象是一個沒有徹底構造好的對象引用。),須要加鎖後從新讀入這個 value 值。

二、若是get的過程當中另外一個線程修改了一個entry的value

  因爲對 volatile 變量的可見性,寫線程對鏈表的非結構性修改可以被後續不加鎖的讀線程「看到」。

三、若是get的過程當中另外一個線程刪除了一個entry

  假設咱們的鏈表元素是:e1-> e2 -> e3 -> e4 咱們要刪除 e3這個entry
  由於HashEntry中next的不可變,因此咱們沒法直接把e2的next指向e4,而是將要刪除的節點以前的節點複製一份,造成新的鏈表。它的實現大體以下圖所示:

  注意:最後纔將數組中對應桶位置的鏈表替換爲新鏈表(也就是在最後一步替換以前,tab[i]指向的始終是刪除以前的鏈表,詳細看下面的remove方法)。
  若是咱們get的也恰巧是e3,可能咱們順着鏈表剛找到e1,這時另外一個線程就執行了刪除e3的操做,而咱們線程還會繼續沿着舊的鏈表找到e3返回,這時候可能看到被刪除的數據,可是在高併發環境下,這種影響是很小的。

remove方法

V remove(Object key, int hash, Object value) { 
           lock();         // 加鎖
           try{ 
               int c = count - 1; 
               HashEntry<K,V>[] tab = table; 
               int index = hash & (tab.length - 1); 
               HashEntry<K,V> first = tab[index]; 
               HashEntry<K,V> e = first; 
               while(e != null&& (e.hash != hash || !key.equals(e.key))) 
                   e = e.next; 
 
               V oldValue = null; 
               if(e != null) { 
                   V v = e.value; 
                   if(value == null|| value.equals(v)) { // 找到要刪除的節點
                       oldValue = v; 
                       ++modCount; 
                       // 全部處於待刪除節點以後的節點原樣保留在鏈表中
                       // 全部處於待刪除節點以前的節點被克隆(實際上是把全部值取出來放到一個新的HashEntry對象中)到新鏈表中
                       HashEntry<K,V> newFirst = e.next;// 待刪節點的後繼結點
                       for(HashEntry<K,V> p = first; p != e; p = p.next) 
                           newFirst = new HashEntry<K,V>(p.key, p.hash,  newFirst, p.value); 

                       // 新的頭結點是原鏈表中,刪除節點以前的那個節點
                       tab[index] = newFirst; 
                       count = c;      // 寫 count 變量
                   } 
               } 
               return oldValue; 
           } finally{ 
               unlock();               // 解鎖
           } 
       }

  和 get 操做同樣,首先根據散列碼找到具體的鏈表;而後遍歷這個鏈表找到要刪除的節點;最後把待刪除節點以後的全部節點原樣保留在新鏈表中,把待刪除節點以前的每一個節點克隆(實際上是把全部值取出來放到一個新的HashEntry對象中)到新鏈表中;最後纔將數組中對應桶位置的鏈表替換爲新鏈表(也就是在替換以前,get的始終是刪除以前的鏈表)。
  下面經過圖例來講明 remove 操做。假設寫線程執行 remove 操做,要刪除鏈表的 C 節點,另外一個讀線程同時正在遍歷這個鏈表。

執行刪除以後的新鏈表

B中的next定義爲final,沒法修改將它指向D,所以C以前的全部節點都要從新創建。而且它們在新鏈表中的連接順序被反轉了。

size方法

  在 ConcurrentHashMap中,每個 Segment 對象都有一個 count 對象來表示本 Segment 中包含的 HashEntry 對象的個數。若是咱們要統計整個ConcurrentHashMap裏元素的大小,就必須統計全部Segment裏元素的大小後求和。Segment裏的全局變量count是一個volatile變量,那麼在多線程場景下,咱們是否是直接把全部Segment的count相加就能夠獲得整個ConcurrentHashMap大小了呢?不是的,雖然相加時能夠獲取每一個Segment的count的最新值,可是拿到以後可能累加的過程當中count發生了變化,那麼統計結果就不許了。因此最安全的作法,是在統計size的時候把全部Segment鎖定。
  由於在累加count操做過程當中,以前累加過的count發生變化的概率很是小,因此ConcurrentHashMap的作法是先嚐試2次經過不鎖住Segment的方式來統計各個Segment大小,若是統計的過程當中,容器的count發生了變化,則再採用加鎖的方式來統計全部Segment的大小。
  那麼ConcurrentHashMap是如何判斷在統計的時候容器是否發生了變化呢?使用modCount變量,在put , remove和clean方法裏操做元素前都會將變量modCount進行加1,那麼在統計size先後比較modCount是否發生變化,從而得知容器的大小是否發生變化。

2、比較

  每一個版本的ConcurrentHashMap幾乎都有改動,本文說明的JDK6的源碼。JDK8相比前面的版本改動最大,下面簡要說一下:

  1. 它摒棄了Segment(段鎖)的概念,而是啓用了一種全新的方式實現,利用CAS算法。
  2. 它沿用了與它同時期的HashMap版本的思想,底層依然由「數組」+鏈表+紅黑樹的方式思想,可是爲了作到併發,又增長了不少輔助的類,例如TreeBin,Traverser等對象內部類。

詳細看: http://blog.csdn.net/u010723709/article/details/48007881

與Hashtable比較

  因爲Hashtable不管是讀仍是寫仍是遍歷,都須要得到對象鎖,串行操做,所以在多線程環境下性能比較差。
  可是ConcurrentHashMap不能徹底取代Hashtable:HashTable的迭代器是強一致性的,而ConcurrentHashMap是弱一致的。其實 ConcurrentHashMap的get,clear,iterator 都是弱一致性的。 Doug Lea 也將這個判斷留給用戶本身決定是否使用ConcurrentHashMap。
  弱一致性:不保證數據徹底處於一致性狀態。好比:

get方法:

可能在get的時候得到一個還沒徹底構造好的HashEntry對象,致使得到的entry的value爲null,此時須要加鎖從新讀取。

clear方法

public void clear() {
    for (int i = 0; i < segments.length; ++i)
        segments[i].clear();
}

  由於沒有全局的鎖,在清除完一個segments以後,正在清理下一個segments的時候,已經清理segments可能又被加入了數據,所以clear返回的時候,ConcurrentHashMap中是可能存在數據的。所以,clear方法是弱一致的。

迭代器

   java.util 包中的集合類都返回 fail-fast 迭代器,這意味着它們假設線程在集合內容中進行迭代時,集合不會更改它的內容。若是 fail-fast 迭代器檢測到在迭代過程當中進行了更改操做,那麼它會拋出 ConcurrentModificationException。
   ConcurrentHashMap中的迭代器主要包括entrySet、keySet、values方法。它們大同小異,這裏選擇entrySet解釋。當咱們調用entrySet返回值的iterator方法時,返回的是EntryIterator,在EntryIterator上調用next方法時,最終實際調用到了HashIterator.advance()方法,看下這個方法:

final void advance() {
    if (nextEntry != null && (nextEntry = nextEntry.next) != null)
        return;
 
    while (nextTableIndex >= 0) {
        if ( (nextEntry = currentTable[nextTableIndex--]) != null)
            return;
    }
 
    while (nextSegmentIndex >= 0) {
        Segment<K,V> seg = segments[nextSegmentIndex--];
        if (seg.count != 0) {
            currentTable = seg.table;
            for (int j = currentTable.length - 1; j >= 0; --j) {
                if ( (nextEntry = currentTable[j]) != null) {
                    nextTableIndex = j - 1;
                    return;
                }
            }
        }
    }
}

  在這種迭代方式中,好比咱們刪除了鏈表的某個entry,可是在完成以前,迭代器得到了舊的鏈表指針,那麼就會遍歷舊的鏈表,而且不會報異常。

相關文章
相關標籤/搜索