Java集合---ConcurrentHashMap原理分析

1、背景:

線程不安全的HashMap

    由於多線程環境下,使用Hashmap進行put操做會引發死循環,致使CPU利用率接近100%,因此在併發狀況下不能使用HashMap。
 

效率低下的HashTable容器

     HashTable容器使用synchronized來保證線程安全,但在線程競爭激烈的狀況下HashTable的效率很是低下。由於當一個線程訪問HashTable的同步方法時,其餘線程訪問HashTable的同步方法時,可能會進入阻塞或輪詢狀態。如線程1使用put進行添加元素,線程2不但不能使用put方法添加元素,而且也不能使用get方法來獲取元素,因此競爭越激烈效率越低。
 

鎖分段技術

    HashTable容器在競爭激烈的併發環境下表現出效率低下的緣由,是由於全部訪問HashTable的線程都必須競爭同一把鎖,那假如容器裏有多把鎖,每一把鎖用於鎖容器其中一部分數據,那麼當多線程訪問容器裏不一樣數據段的數據時,線程間就不會存在鎖競爭,從而能夠有效的提升併發訪問效率,這就是ConcurrentHashMap所使用的鎖分段技術,首先將數據分紅一段一段的存儲,而後給每一段數據配一把鎖,當一個線程佔用鎖訪問其中一個段數據的時候,其餘段的數據也能被其餘線程訪問。有些方法須要跨段,好比size()和containsValue(),它們可能須要鎖定整個表而而不只僅是某個段,這須要按順序鎖定全部段,操做完畢後,又按順序釋放全部段的鎖。這裏「按順序」是很重要的,不然極有可能出現死鎖,在ConcurrentHashMap內部,段數組是final的,而且其成員變量實際上也是final的,可是,僅僅是將數組聲明爲final的並不保證數組成員也是final的,這須要實現上的保證。這能夠確保不會出現死鎖,由於得到鎖的順序是固定的。
ConcurrentHashMap是由Segment數組結構和HashEntry數組結構組成。Segment是一種可重入鎖ReentrantLock,在ConcurrentHashMap裏扮演鎖的角色,HashEntry則用於存儲鍵值對數據。一個ConcurrentHashMap裏包含一個Segment數組,Segment的結構和HashMap相似,是一種數組和鏈表結構, 一個Segment裏包含一個HashEntry數組,每一個HashEntry是一個鏈表結構的元素, 每一個Segment守護者一個HashEntry數組裏的元素,當對HashEntry數組的數據進行修改時,必須首先得到它對應的Segment鎖。
 

2、應用場景

    當有一個大數組時須要在多個線程共享時就能夠考慮是否把它給分層多個節點了,避免大鎖。並能夠考慮經過hash算法進行一些模塊定位。
其實不止用於線程,當設計數據表的事務時(事務某種意義上也是同步機制的體現),能夠把一個表當作一個須要同步的數組,若是操做的表數據太多時就能夠考慮事務分離了(這也是爲何要避免大表的出現),好比把數據進行字段拆分,水平分表等.
 

3、源碼解讀

    ConcurrentHashMap(1.7及以前)中主要實體類就是三個:ConcurrentHashMap(整個Hash表),Segment(桶),HashEntry(節點),對應上面的圖能夠看出之間的關係
/** 
* The segments, each of which is a specialized hash table 
*/  
final Segment<K,V>[] segments;

不變(Immutable)和易變(Volatile)

    ConcurrentHashMap徹底容許多個讀操做併發進行,讀操做並不須要加鎖。若是使用傳統的技術,如HashMap中的實現,若是容許能夠在hash鏈的中間添加或刪除元素,讀操做不加鎖將獲得不一致的數據。ConcurrentHashMap實現技術是保證HashEntry幾乎是不可變的。HashEntry表明每一個hash鏈中的一個節點,其結構以下所示:
 static final class HashEntry<K,V> {  
 final K key; final int hash; volatile V value; final HashEntry<K,V> next; } 
  能夠看到除了value不是final的,其它值都是final的,這意味着不能從hash鏈的中間或尾部添加或刪除節點,由於這須要修改next 引用值,全部的節點的修改只能從頭部開始。對於put操做,能夠一概添加到Hash鏈的頭部。可是對於remove操做,可能須要從中間刪除一個節點,這就須要將要刪除節點的前面全部節點整個複製一遍,最後一個節點指向要刪除結點的下一個結點。這在講解刪除操做時還會詳述。爲了確保讀操做可以看到最新的值,將value設置成volatile,這避免了加鎖。

其它

  爲了加快定位段以及段中hash槽的速度,每一個段hash槽的的個數都是2^n,這使得經過位運算就能夠定位段和段中hash槽的位置。當併發級別爲默認值16時,也就是段的個數,hash值的高4位決定分配在哪一個段中。可是咱們也不要忘記《算法導論》給咱們的教訓:hash槽的的個數不該該是 2^n,這可能致使hash槽分配不均,這須要對hash值從新再hash一次。(這段彷佛有點多餘了 )
 

定位操做:

 final Segment<K,V> segmentFor(int hash) {  
     return segments[(hash >>> segmentShift) & segmentMask]; }
  既然ConcurrentHashMap使用分段鎖Segment來保護不一樣段的數據,那麼在插入和獲取元素的時候,必須先經過哈希算法定位到Segment。能夠看到ConcurrentHashMap會首先使用Wang/Jenkins hash的變種算法對元素的hashCode進行一次再哈希。
再哈希,其目的是爲了減小哈希衝突,使元素可以均勻的分佈在不一樣的Segment上,從而提升容器的存取效率。假如哈希的質量差到極點,那麼全部的元素都在一個Segment中,不只存取元素緩慢,分段鎖也會失去意義。我作了一個測試,不經過再哈希而直接執行哈希計算。
 
System.out.println(Integer.parseInt("0001111", 2) & 15);
System.out.println(Integer.parseInt("0011111", 2) & 15);
System.out.println(Integer.parseInt("0111111", 2) & 15);
System.out.println(Integer.parseInt("1111111", 2) & 15);
 
計算後輸出的哈希值全是15,經過這個例子能夠發現若是不進行再哈希,哈希衝突會很是嚴重,由於只要低位同樣,不管高位是什麼數,其哈希值老是同樣。咱們再把上面的二進制數據進行再哈希後結果以下,爲了方便閱讀,不足32位的高位補了0,每隔四位用豎線分割下。
 
0100|0111|0110|0111|1101|1010|0100|1110
1111|0111|0100|0011|0000|0001|1011|1000
0111|0111|0110|1001|0100|0110|0011|1110
1000|0011|0000|0000|1100|1000|0001|1010
 
能夠發現每一位的數據都散列開了,經過這種再哈希能讓數字的每一位都能參加到哈希運算當中,從而減小哈希衝突。ConcurrentHashMap經過如下哈希算法定位segment。
默認狀況下segmentShift爲28,segmentMask爲15,再哈希後的數最大是32位二進制數據,向右無符號移動28位,意思是讓高4位參與到hash運算中, (hash >>> segmentShift) & segmentMask的運算結果分別是4,15,7和8,能夠看到hash值沒有發生衝突。
final Segment<K,V> segmentFor(int hash) {
    return segments[(hash >>> segmentShift) & segmentMask]; }

 

數據結構

 
  全部的成員都是final的,其中segmentMask和segmentShift主要是爲了定位段,參見上面的segmentFor方法。
  關於Hash表的基礎數據結構,這裏不想作過多的探討。Hash表的一個很重要方面就是如何解決hash衝突,ConcurrentHashMap 和HashMap使用相同的方式,都是將hash值相同的節點放在一個hash鏈中。與HashMap不一樣的是,ConcurrentHashMap使用多個子Hash表,也就是段(Segment)。
每一個Segment至關於一個子Hash表,它的數據成員以下:
static final class Segment<K,V> extends ReentrantLock implements Serializable {    
         /** 
          * The number of elements in this segment's region. 
          */
 transient volatileint count; /** * Number of updates that alter the size of the table. This is * used during bulk-read methods to make sure they see a * consistent snapshot: If modCounts change during a traversal * of segments computing size or checking containsValue, then * we might have an inconsistent view of state so (usually) * must retry. */ transient int modCount; /** * The table is rehashed when its size exceeds this threshold. * (The value of this field is always <tt>(int)(capacity * * loadFactor)</tt>.) */ transient int threshold; /** * The per-segment table. */ transient volatile HashEntry<K,V>[] table; /** * The load factor for the hash table. Even though this value * is same for all segments, it is replicated to avoid needing * links to outer object. * @serial */ final float loadFactor; } 
  count用來統計該段數據的個數,它是volatile,它用來協調修改和讀取操做,以保證讀取操做可以讀取到幾乎最新的修改。協調方式是這樣的,每次修改操做作告終構上的改變,如增長/刪除節點(修改節點的值不算結構上的改變),都要寫count值,每次讀取操做開始都要讀取count的值。這利用了 Java 5中對volatile語義的加強,對同一個volatile變量的寫和讀存在happens-before關係。modCount統計段結構改變的次數,主要是爲了檢測對多個段進行遍歷過程當中某個段是否發生改變,在講述跨段操做時會還會詳述。threashold用來表示須要進行rehash的界限值。table數組存儲段中節點,每一個數組元素是個hash鏈,用HashEntry表示。table也是volatile,這使得可以讀取到最新的 table值而不須要同步。loadFactor表示負載因子。

刪除操做remove(key)

public V remove(Object key) {  
   hash = hash(key.hashCode()); return segmentFor(hash).remove(key, hash, null); }

 

   整個操做是先定位到段,而後委託給段的remove操做。當多個刪除操做併發進行時,只要它們所在的段不相同,它們就能夠同時進行。
下面是Segment的remove方法實現:
 V remove(Object key, int hash, Object value) {  
     lock(); try { int c = count - 1; HashEntry<K,V>[] tab = table; int index = hash & (tab.length - 1); HashEntry<K,V> first = tab[index]; HashEntry<K,V> e = first; while (e != null && (e.hash != hash || !key.equals(e.key))) e = e.next; V oldValue = null; if (e != null) { V v = e.value; if (value == null || value.equals(v)) { oldValue = v; // All entries following removed node can stay // in list, but all preceding ones need to be // cloned. ++modCount; HashEntry<K,V> newFirst = e.next; *for (HashEntry<K,V> p = first; p != e; p = p.next) *newFirst = new HashEntry<K,V>(p.key, p.hash,  newFirst, p.value); tab[index] = newFirst; count = c; // write-volatile  }  } return oldValue; } finally {  unlock();  } }
  整個操做是在持有段鎖的狀況下執行的,空白行以前的行主要是定位到要刪除的節點e。接下來,若是不存在這個節點就直接返回null,不然就要將e前面的結點複製一遍,尾結點指向e的下一個結點。e後面的結點不須要複製,它們能夠重用。
中間那個for循環是作什麼用的呢?(*號標記)從代碼來看,就是將定位以後的全部entry克隆並拼回前面去,但有必要嗎?每次刪除一個元素就要將那以前的元素克隆一遍?這點實際上是由entry的不變性來決定的,仔細觀察entry定義,發現除了value,其餘全部屬性都是用final來修飾的,這意味着在第一次設置了next域以後便不能再改變它,取而代之的是將它以前的節點全都克隆一次。至於entry爲何要設置爲不變性,這跟不變性的訪問不須要同步從而節省時間有關
下面是個示意圖
                                         刪除元素以前:
 
 
 
                                        
                                         刪除元素3以後:
 
 
 
  第二個圖其實有點問題,複製的結點中應該是值爲2的結點在前面,值爲1的結點在後面,也就是恰好和原來結點順序相反,還好這不影響咱們的討論。
整個remove實現並不複雜,可是須要注意以下幾點。第一,當要刪除的結點存在時,刪除的最後一步操做要將count的值減一。這必須是最後一步操做,不然讀取操做可能看不到以前對段所作的結構性修改。第二,remove執行的開始就將table賦給一個局部變量tab,這是由於table是 volatile變量,讀寫volatile變量的開銷很大。編譯器也不能對volatile變量的讀寫作任何優化,直接屢次訪問非volatile實例變量沒有多大影響,編譯器會作相應優化。

get操做

ConcurrentHashMap的get操做是直接委託給Segment的get方法,直接看Segment的get方法:
V get(Object key, int hash) {  
     if (count != 0) { // read-volatile 當前桶的數據個數是否爲0 
         HashEntry<K,V> e = getFirst(hash); 獲得頭節點 while (e != null) { if (e.hash == hash && key.equals(e.key)) { V v = e.value; if (v != null) return v; return readValueUnderLock(e); // recheck  } e = e.next;  }  }  returnnull; } 
get操做不須要鎖。
  除非讀到的值是空的纔會加鎖重讀,咱們知道HashTable容器的get方法是須要加鎖的,那麼ConcurrentHashMap的get操做是如何作到不加鎖的呢?緣由是它的get方法裏將要使用的共享變量都定義成volatile
 
  第一步是訪問count變量,這是一個volatile變量,因爲全部的修改操做在進行結構修改時都會在最後一步寫count 變量,經過這種機制保證get操做可以獲得幾乎最新的結構更新。對於非結構更新,也就是結點值的改變,因爲HashEntry的value變量是 volatile的,也能保證讀取到最新的值。
 
  接下來就是根據hash和key對hash鏈進行遍歷找到要獲取的結點,若是沒有找到,直接訪回null。對hash鏈進行遍歷不須要加鎖的緣由在於鏈指針next是final的。可是頭指針卻不是final的,這是經過getFirst(hash)方法返回,也就是存在 table數組中的值。這使得getFirst(hash)可能返回過期的頭結點,例如,當執行get方法時,剛執行完getFirst(hash)以後,另外一個線程執行了刪除操做並更新頭結點,這就致使get方法中返回的頭結點不是最新的。這是能夠容許,經過對count變量的協調機制,get能讀取到幾乎最新的數據,雖然可能不是最新的。要獲得最新的數據,只有採用徹底的同步。
 
  最後,若是找到了所求的結點,判斷它的值若是非空就直接返回,不然在有鎖的狀態下再讀一次。這彷佛有些費解,理論上結點的值不可能爲空,這是由於 put的時候就進行了判斷,若是爲空就要拋NullPointerException。空值的惟一源頭就是HashEntry中的默認值,由於 HashEntry中的value不是final的,非同步讀取有可能讀取到空值。仔細看下put操做的語句:tab[index] = new HashEntry<K,V>(key, hash, first, value),在這條語句中,HashEntry構造函數中對value的賦值以及對tab[index]的賦值可能被從新排序,這就可能致使結點的值爲空。這裏當v爲空時,多是一個線程正在改變節點,而以前的get操做都未進行鎖定,根據bernstein條件,讀後寫或寫後讀都會引發數據的不一致,因此這裏要對這個e從新上鎖再讀一遍,以保證獲得的是正確值。
 V readValueUnderLock(HashEntry<K,V> e) {  
     lock(); try { return e.value; } finally {  unlock();  } }
  如用於統計當前Segement大小的count字段和用於存儲值的HashEntry的value。定義成volatile的變量,可以在線程之間保持可見性,可以被多線程同時讀,而且保證不會讀到過時的值,可是隻能被單線程寫(有一種狀況能夠被多線程寫,就是寫入的值不依賴於原值),在get操做裏只須要讀不須要寫共享變量count和value,因此能夠不用加鎖。之因此不會讀到過時的值,是根據java內存模型的happen before原則,對volatile字段的寫入操做先於讀操做,即便兩個線程同時修改和獲取volatile變量,get操做也能拿到最新的值,這是用volatile替換鎖的經典應用場景

put操做

一樣地put操做也是委託給段的put方法。下面是段的put方法:
 V put(K key, int hash, V value, boolean onlyIfAbsent) {  
     lock(); try { int c = count; if (c++ > threshold) // ensure capacity  rehash(); HashEntry<K,V>[] tab = table; int index = hash & (tab.length - 1); HashEntry<K,V> first = tab[index]; HashEntry<K,V> e = first; while (e != null && (e.hash != hash || !key.equals(e.key))) e = e.next;  V oldValue; if (e != null) { oldValue = e.value; if (!onlyIfAbsent) e.value = value;  } else { oldValue = null; ++modCount; tab[index] = new HashEntry<K,V>(key, hash, first, value); count = c; // write-volatile  } return oldValue; } finally {  unlock();  } }
  該方法也是在持有段鎖(鎖定整個segment)的狀況下執行的,這固然是爲了併發的安全,修改數據是不能併發進行的,必須得有個判斷是否超限的語句以確保容量不足時可以rehash。接着是找是否存在一樣一個key的結點,若是存在就直接替換這個結點的值。不然建立一個新的結點並添加到hash鏈的頭部,這時必定要修改modCount和count的值,一樣修改count的值必定要放在最後一步。put方法調用了rehash方法,reash方法實現得也很精巧,主要利用了table的大小爲2^n,這裏就不介紹了。而比較難懂的是這句int index = hash & (tab.length - 1),原來segment裏面纔是真正的hashtable,即每一個segment是一個傳統意義上的hashtable,如上圖,從二者的結構就能夠看出區別,這裏就是找出須要的entry在table的哪個位置,以後獲得的entry就是這個鏈的第一個節點,若是e!=null,說明找到了,這是就要替換節點的值(onlyIfAbsent == false),不然,咱們須要new一個entry,它的後繼是first,而讓tab[index]指向它,什麼意思呢?實際上就是將這個新entry插入到鏈頭,剩下的就很是容易理解了
 
  因爲put方法裏須要對共享變量進行寫入操做,因此爲了線程安全,在操做共享變量時必須得加鎖。Put方法首先定位到Segment,而後在Segment裏進行插入操做。插入操做須要經歷兩個步驟,第一步判斷是否須要對Segment裏的HashEntry數組進行擴容,第二步定位添加元素的位置而後放在HashEntry數組裏。
  • 是否須要擴容。在插入元素前會先判斷Segment裏的HashEntry數組是否超過容量(threshold),若是超過閥值,數組進行擴容。值得一提的是,Segment的擴容判斷比HashMap更恰當,由於HashMap是在插入元素後判斷元素是否已經到達容量的,若是到達了就進行擴容,可是頗有可能擴容以後沒有新元素插入,這時HashMap就進行了一次無效的擴容。
  • 如何擴容。擴容的時候首先會建立一個兩倍於原容量的數組,而後將原數組裏的元素進行再hash後插入到新的數組裏。爲了高效ConcurrentHashMap不會對整個容器進行擴容,而只對某個segment進行擴容。
 
另外一個操做是containsKey,這個實現就要簡單得多了,由於它不須要讀取值:
 
 boolean containsKey(Object key, int hash) {  
     if (count != 0) { // read-volatile  
         HashEntry<K,V> e = getFirst(hash); while (e != null) { if (e.hash == hash && key.equals(e.key))  returntrue; e = e.next;  }  }  returnfalse; } 

size()操做

  若是咱們要統計整個ConcurrentHashMap裏元素的大小,就必須統計全部Segment裏元素的大小後求和。Segment裏的全局變量count是一個volatile變量,那麼在多線程場景下,咱們是否是直接把全部Segment的count相加就能夠獲得整個ConcurrentHashMap大小了呢?不是的,雖然相加時能夠獲取每一個Segment的count的最新值,可是拿到以後可能累加前使用的count發生了變化,那麼統計結果就不許了。因此最安全的作法,是在統計size的時候把全部Segment的put,remove和clean方法所有鎖住,可是這種作法顯然很是低效。
  由於在累加count操做過程當中,以前累加過的count發生變化的概率很是小,因此ConcurrentHashMap的作法是先嚐試2次經過不鎖住Segment的方式來統計各個Segment大小,若是統計的過程當中,容器的count發生了變化,則再採用加鎖的方式來統計全部Segment的大小。
  那麼ConcurrentHashMap是如何判斷在統計的時候容器是否發生了變化呢?使用modCount變量,在put , remove和clean方法裏操做元素前都會將變量modCount進行加1,那麼在統計size先後比較modCount是否發生變化,從而得知容器的大小是否發生變化。
 

參考:

相關文章
相關標籤/搜索