在上一篇中介紹了HashMap的原理,這一節是ConcurrentMap的最後一節,因此會完整的介紹ConcurrentHashMap的實現。html
ConcurrentHashMap原理java
在讀寫鎖章節部分介紹過一種是用讀寫鎖實現Map的方法。此種方法看起來能夠實現Map響應的功能,並且吞吐量也應該不錯。可是經過前面對讀寫鎖原理的分析後知道,讀寫鎖的適合場景是讀操做>>寫操做,也就是讀操做應該佔據大部分操做,另外讀寫鎖存在一個很嚴重的問題是讀寫操做不能同時發生。要想解決讀寫同時進行問題(至少不一樣元素的讀寫分離),那麼就只能將鎖拆分,不一樣的元素擁有不一樣的鎖,這種技術就是「鎖分離」技術。node
默認狀況下ConcurrentHashMap是用了16個相似HashMap 的結構,其中每個HashMap擁有一個獨佔鎖。也就是說最終的效果就是經過某種Hash算法,將任何一個元素均勻的映射到某個HashMap的Map.Entry上面,而對某個一個元素的操做就集中在其分佈的HashMap上,與其它HashMap無關。這樣就支持最多16個併發的寫操做。c++
上圖就是ConcurrentHashMap的類圖。參考上面的說明和HashMap的原理分析,能夠看到ConcurrentHashMap將整個對象列表分爲segmentMask+1個片斷(Segment)。其中每個片斷是一個相似於HashMap的結構,它有一個HashEntry的數組,數組的每一項又是一個鏈表,經過HashEntry的next引用串聯起來。算法
這個類圖上面的數據結構的定義很是有學問,接下來會一個個有針對性的分析。數組
首先如何從ConcurrentHashMap定位到HashEntry。在HashMap的原理分析部分說過,對於一個Hash的數據結構來講,爲了減小浪費的空間和快速定位數據,那麼就須要數據在Hash上的分佈比較均勻。對於一次Map的查找來講,首先就須要定位到Segment,而後從過Segment定位到HashEntry鏈表,最後纔是經過遍歷鏈表獲得須要的元素。數據結構
在不討論併發的前提下先來討論如何定位到HashEntry的。在ConcurrentHashMap中是經過hash(key.hashCode())和segmentFor(hash)來獲得Segment的。清單1 描述瞭如何定位Segment的過程。其中hash(int)是將key的hashCode進行二次編碼,使之可以在segmentMask+1個Segment上均勻分佈(默認是16個)。能夠看到的是這裏和HashMap仍是有點不一樣的,這裏採用的算法叫Wang/Jenkins hash,有興趣的能夠參考資料1和參考資料2。總之它的目的就是使元素可以均勻的分佈在不一樣的Segment上,這樣纔可以支持最多segmentMask+1個併發,這裏segmentMask+1是segments的大小。多線程
清單1 定位Segment併發
private static int hash(int h) {
// Spread bits to regularize both segment and index locations,
// using variant of single-word Wang/Jenkins hash.
h += (h << 15) ^ 0xffffcd7d;
h ^= (h >>> 10);
h += (h << 3);
h ^= (h >>> 6);
h += (h << 2) + (h << 14);
return h ^ (h >>> 16);
}
final Segment<K,V> segmentFor(int hash) {
return segments[(hash >>> segmentShift) & segmentMask];
}app
顯然在不可以對Segment擴容的狀況下,segments的大小就應該是固定的。因此在ConcurrentHashMap中segments/segmentMask/segmentShift都是常量,一旦初始化後就不能被再次修改,其中segmentShift是查找Segment的一個常量偏移量。
有了Segment之後再定位HashEntry就和HashMap中定位HashEntry同樣了,先將hash值與Segment中HashEntry的大小減1進行與操做定位到HashEntry鏈表,而後遍歷鏈表就能夠完成相應的操做了。
可以定位元素之後ConcurrentHashMap就已經具備了HashMap的功能了,如今要解決的就是如何併發的問題。要解決併發問題,加鎖是必不可免的。再回頭看Segment的類圖,能夠看到Segment除了有一個volatile類型的元素大小count外,Segment仍是集成自ReentrantLock的。另外在前面的原子操做和鎖機制中介紹過,要想最大限度的支持併發,那麼可以利用的思路就是儘可能讀操做不加鎖,寫操做不加鎖。若是是讀操做不加鎖,寫操做加鎖,對於競爭資源來講就須要定義爲volatile類型的。volatile類型可以保證happens-before法則,因此volatile可以近似保證正確性的狀況下最大程度的下降加鎖帶來的影響,同時還與寫操做的鎖不產生衝突。
同時爲了防止在遍歷HashEntry的時候被破壞,那麼對於HashEntry的數據結構來講,除了value以外其餘屬性就應該是常量,不然不可避免的會獲得ConcurrentModificationException。這就是爲何HashEntry數據結構中key,hash,next是常量的緣由(final類型)。
有了上面的分析和條件後再來看Segment的get/put/remove就容易多了。
get操做
清單2 Segment定位元素
V get(Object key, int hash) {
if (count != 0) { // read-volatile
HashEntry<K,V> e = getFirst(hash);
while (e != null) {
if (e.hash == hash && key.equals(e.key)) {
V v = e.value;
if (v != null)
return v;
return readValueUnderLock(e); // recheck
}
e = e.next;
}
}
return null;
}
HashEntry<K,V> getFirst(int hash) {
HashEntry<K,V>[] tab = table;
return tab[hash & (tab.length - 1)];
}V readValueUnderLock(HashEntry<K,V> e) {
lock();
try {
return e.value;
} finally {
unlock();
}
}
清單2 描述的是Segment如何定位元素。首先判斷Segment的大小count>0,Segment的大小描述的是HashEntry不爲空(key不爲空)的個數。若是Segment中存在元素那麼就經過getFirst定位到指定的HashEntry鏈表的頭節點上,而後遍歷此節點,一旦找到key對應的元素後就返回其對應的值。可是在清單2 中能夠看到拿到HashEntry的value後還進行了一次判斷操做,若是爲空還須要加鎖再讀取一次(readValueUnderLock)。爲何會有這樣的操做?儘管ConcurrentHashMap不容許將value爲null的值加入,但如今仍然可以讀到一個爲空的value就意味着此值對當前線程還不可見(這是由於HashEntry尚未徹底構造完成就賦值致使的,後面還會談到此機制)。
put操做
清單3 描述的是Segment的put操做。首先就須要加鎖了,修改一個競爭資源確定是要加鎖的,這個毫無疑問。須要說明的是Segment集成的是ReentrantLock,因此這裏加的鎖也就是獨佔鎖,也就是說同一個Segment在同一時刻只有能一個put操做。
接下來來就是檢查是否須要擴容,這和HashMap同樣,若是須要的話就擴大一倍,同時進行rehash操做。
查找元素就和get操做是同樣的,獲得元素就直接修改其值就行了。這裏onlyIfAbsent只是爲了實現ConcurrentMap的putIfAbsent操做而已。須要說明如下幾點:
清單3 Segment的put操做
V put(K key, int hash, V value, boolean onlyIfAbsent) {
lock();
try {
int c = count;
if (c++ > threshold) // ensure capacity
rehash();
HashEntry<K,V>[] tab = table;
int index = hash & (tab.length - 1);
HashEntry<K,V> first = tab[index];
HashEntry<K,V> e = first;
while (e != null && (e.hash != hash || !key.equals(e.key)))
e = e.next;V oldValue;
if (e != null) {
oldValue = e.value;
if (!onlyIfAbsent)
e.value = value;
}
else {
oldValue = null;
++modCount;
tab[index] = new HashEntry<K,V>(key, hash, first, value);
count = c; // write-volatile
}
return oldValue;
} finally {
unlock();
}
}
remove 操做
清單4 描述了Segment刪除一個元素的過程。同put同樣,remove也須要加鎖,這是由於對table可能會有變動。因爲HashEntry的next節點是final類型的,因此一旦刪除鏈表中間一個元素,就須要將刪除以前或者以後的元素從新加入新的鏈表。而Segment採用的是將刪除元素以前的元素一個個從新加入刪除以後的元素以前(也就是鏈表頭結點)來完成新鏈表的構造。
清單4 Segment的remove操做
V remove(Object key, int hash, Object value) {
lock();
try {
int c = count - 1;
HashEntry<K,V>[] tab = table;
int index = hash & (tab.length - 1);
HashEntry<K,V> first = tab[index];
HashEntry<K,V> e = first;
while (e != null && (e.hash != hash || !key.equals(e.key)))
e = e.next;V oldValue = null;
if (e != null) {
V v = e.value;
if (value == null || value.equals(v)) {
oldValue = v;
// All entries following removed node can stay
// in list, but all preceding ones need to be
// cloned.
++modCount;
HashEntry<K,V> newFirst = e.next;
for (HashEntry<K,V> p = first; p != e; p = p.next)
newFirst = new HashEntry<K,V>(p.key, p.hash,
newFirst, p.value);
tab[index] = newFirst;
count = c; // write-volatile
}
}
return oldValue;
} finally {
unlock();
}
}
下面的示意圖描述瞭如何刪除一個已經存在的元素的。假設咱們要刪除B3元素。首先定位到B3所在的Segment,而後再定位到Segment的table中的B1元素,也就是Bx所在的鏈表。而後遍歷鏈表找到B3,找到以後就從頭結點B1開始構建新的節點B1(藍色)加到B4的前面,繼續B1後面的節點B2構造B2(藍色),加到由藍色的B1和B4構成的新的鏈表。繼續下去,直到遇到B3後終止,這樣就構造出來一個新的鏈表B2(藍色)->B1(藍色)->B4->B5,而後將此鏈表的頭結點B2(藍色)設置到Segment的table中。這樣就完成了元素B3的刪除操做。須要說明的是,儘管就的鏈表仍然存在(B1->B2->B3->B4->B5),可是因爲沒有引用指向此鏈表,因此此鏈表中無引用的(B1->B2->B3)最終會被GC回收掉。這樣作的一個好處是,若是某個讀操做在刪除時已經定位到了舊的鏈表上,那麼此操做仍然將能讀到數據,只不過讀取到的是舊數據而已,這在多線程裏面是沒有問題的。
除了對單個元素操做外,還有對所有的Segment的操做,好比size()操做等。
size操做
size操做涉及到統計全部Segment的大小,這樣就會遍歷全部的Segment,若是每次加鎖就會致使整個Map都被鎖住了,任何須要鎖的操做都將沒法進行。這裏用到了一個比較巧妙的方案解決此問題。
在Segment中有一個變量modCount,用來記錄Segment結構變動的次數,結構變動包括增長元素和刪除元素,每增長一個元素操做就+1,每進行一次刪除操做+1,每進行一次清空操做(clear)就+1。也就是說每次涉及到元素個數變動的操做modCount都會+1,並且一直是增大的,不會減少。
遍歷兩次ConcurrentHashMap中的segments,每次遍歷是記錄每個Segment的modCount,比較兩次遍歷的modCount值的和是否相同,若是相同就返回在遍歷過程當中獲取的Segment的count的和,也就是全部元素的個數。若是不相同就重複再作一次。重複一次還不相同就將全部Segment鎖住,一個一個的獲取其大小(count),最後將這些count加起來獲得總的大小。固然了最後須要將鎖一一釋放。清單5 描述了這個過程。
這裏有一個比較高級的話題是爲何在讀取modCount的時候老是先要讀取count一下。爲何不是先讀取modCount而後再讀取count的呢?也就是說下面的兩條語句可否交換下順序?
sum += segments[i].count;
mcsum += mc[i] = segments[i].modCount;
答案是不能!爲何?這是由於modCount老是在加鎖的狀況下才發生變化,因此不會發生多線程同時修改的狀況,也就是不必時volatile類型。另外老是在count修改的狀況下修改modCount,而count是一個volatile變量。因而這裏就充分利用了volatile的特性。
根據happens-before法則,第(3)條:對volatile字段的寫入操做happens-before於每個後續的同一個字段的讀操做。也就是說一個操做C在volatile字段的寫操做以後,那麼volatile寫操做以前的全部操做都對此操做C可見。因此修改modCount老是在修改count以前,也就是說若是讀取到了一個count的值,那麼在count變化以前的modCount也就可以讀取到,換句話說就是若是看到了count值的變化,那麼就必定看到了modCount值的變化。而若是上面兩條語句交換下順序就沒法保證這個結果必定存在了。
在ConcurrentHashMap.containsValue中,能夠看到每次遍歷segments時都會執行int c = segments[i].count;,可是接下來的語句中又不用此變量c,儘管如此JVM仍然不能將此語句優化掉,由於這是一個volatile字段的讀取操做,它保證了一些列操做的happens-before順序,因此是相當重要的。在這裏能夠看到:
ConcurrentHashMap將volatile發揮到了極致!
另外isEmpty操做於size操做相似,再也不累述。
清單5 ConcurrentHashMap的size操做
public int size() {
final Segment<K,V>[] segments = this.segments;
long sum = 0;
long check = 0;
int[] mc = new int[segments.length];
// Try a few times to get accurate count. On failure due to
// continuous async changes in table, resort to locking.
for (int k = 0; k < RETRIES_BEFORE_LOCK; ++k) {
check = 0;
sum = 0;
int mcsum = 0;
for (int i = 0; i < segments.length; ++i) {
sum += segments[i].count;
mcsum += mc[i] = segments[i].modCount;
}
if (mcsum != 0) {
for (int i = 0; i < segments.length; ++i) {
check += segments[i].count;
if (mc[i] != segments[i].modCount) {
check = -1; // force retry
break;
}
}
}
if (check == sum)
break;
}
if (check != sum) { // Resort to locking all segments
sum = 0;
for (int i = 0; i < segments.length; ++i)
segments[i].lock();
for (int i = 0; i < segments.length; ++i)
sum += segments[i].count;
for (int i = 0; i < segments.length; ++i)
segments[i].unlock();
}
if (sum > Integer.MAX_VALUE)
return Integer.MAX_VALUE;
else
return (int)sum;
}
ConcurrentSkipListMap/Set
原本打算介紹下ConcurrentSkipListMap的,結果打開源碼一看,完全放棄了。那裏面的數據結構和算法我估計研究一週也未必可以徹底弄懂。好久之前我看TreeMap的時候就頭大,想一想那些複雜的「紅黑二叉樹」我頭都大了。這些都歸咎於從前沒有好好學習《數據結構和算法》,如今再回頭看這些複雜的算法感受很是頭疼,爲了減小腦細胞的死亡,暫且仍是不要惹這些「玩意兒」。有興趣的能夠看看參考資料4 中對TreeMap的介紹。
參考資料: