ConcurrentHashMap之實現細節(轉)

ConcurrentHashMap是Java 5中支持高併發、高吞吐量的線程安全HashMap實現。在這以前我對ConcurrentHashMap只有一些膚淺的理解,僅知道它採用了多個鎖,大概也足夠了。可是在通過一次慘痛的面試經歷以後,我以爲必須深刻研究它的實現。面試中被問到讀是否要加鎖,由於讀寫會發生衝突,我說必需要加鎖,我和麪試官也所以發生了衝突,結果可想而知。仍是閒話少說,經過仔細閱讀源代碼,如今總算理解ConcurrentHashMap實現機制了,其實現之精巧,使人歎服,與你們共享之。java

 

 

實現原理 
node

 

鎖分離 (Lock Stripping)c++

 

ConcurrentHashMap容許多個修改操做併發進行,其關鍵在於使用了鎖分離技術。它使用了多個鎖來控制對hash表的不一樣部分進行的修改。ConcurrentHashMap內部使用段(Segment)來表示這些不一樣的部分,每一個段其實就是一個小的hash table,它們有本身的鎖。只要多個修改操做發生在不一樣的段上,它們就能夠併發進行。面試

 

有些方法須要跨段,好比size()和containsValue(),它們可能須要鎖定整個表而而不只僅是某個段,這須要按順序鎖定全部段,操做完畢後,又按順序釋放全部段的鎖。這裏「按順序」是很重要的,不然極有可能出現死鎖,在ConcurrentHashMap內部,段數組是final的,而且其成員變量實際上也是final的,可是,僅僅是將數組聲明爲final的並不保證數組成員也是final的,這須要實現上的保證。這能夠確保不會出現死鎖,由於得到鎖的順序是固定的。不變性是多線程編程佔有很重要的地位,下面還要談到。算法

 

Java代碼   收藏代碼
  1. /** 
  2.  * The segments, each of which is a specialized hash table 
  3.  */  
  4. final Segment<K,V>[] segments;  

 

 

不變(Immutable)和易變(Volatile)編程

 

ConcurrentHashMap徹底容許多個讀操做併發進行,讀操做並不須要加鎖。若是使用傳統的技術,如HashMap中的實現,若是容許能夠在hash鏈的中間添加或刪除元素,讀操做不加鎖將獲得不一致的數據。ConcurrentHashMap實現技術是保證HashEntry幾乎是不可變的。HashEntry表明每一個hash鏈中的一個節點,其結構以下所示:數組

 

Java代碼   收藏代碼
  1. static final class HashEntry<K,V> {  
  2.     final K key;  
  3.     final int hash;  
  4.     volatile V value;  
  5.     final HashEntry<K,V> next;  
  6. }  

能夠看到除了value不是final的,其它值都是final的,這意味着不能從hash鏈的中間或尾部添加或刪除節點,由於這須要修改next引用值,全部的節點的修改只能從頭部開始。對於put操做,能夠一概添加到Hash鏈的頭部。可是對於remove操做,可能須要從中間刪除一個節點,這就須要將要刪除節點的前面全部節點整個複製一遍,最後一個節點指向要刪除結點的下一個結點。這在講解刪除操做時還會詳述。爲了確保讀操做可以看到最新的值,將value設置成volatile,這避免了加鎖。安全

 


其它數據結構

 

爲了加快定位段以及段中hash槽的速度,每一個段hash槽的的個數都是2^n,這使得經過位運算就能夠定位段和段中hash槽的位置。當併發級別爲默認值16時,也就是段的個數,hash值的高4位決定分配在哪一個段中。可是咱們也不要忘記《算法導論》給咱們的教訓:hash槽的的個數不該該是2^n,這可能致使hash槽分配不均,這須要對hash值從新再hash一次。(這段彷佛有點多餘了 )多線程

 

這是從新hash的算法,還比較複雜,我也懶得去理解了。

Java代碼   收藏代碼
  1. private static int hash(int h) {  
  2.     // Spread bits to regularize both segment and index locations,  
  3.     // using variant of single-word Wang/Jenkins hash.  
  4.     h += (h <<  15) ^ 0xffffcd7d;  
  5.     h ^= (h >>> 10);  
  6.     h += (h <<   3);  
  7.     h ^= (h >>>  6);  
  8.     h += (h <<   2) + (h << 14);  
  9.     return h ^ (h >>> 16);  
  10. }  

 

這是定位段的方法:

Java代碼   收藏代碼
  1. final Segment<K,V> segmentFor(int hash) {  
  2.     return segments[(hash >>> segmentShift) & segmentMask];  
  3. }  

 

 

 

數據結構

 

關於Hash表的基礎數據結構,這裏不想作過多的探討。Hash表的一個很重要方面就是如何解決hash衝突,ConcurrentHashMap和HashMap使用相同的方式,都是將hash值相同的節點放在一個hash鏈中。與HashMap不一樣的是,ConcurrentHashMap使用多個子Hash表,也就是段(Segment)。下面是ConcurrentHashMap的數據成員:

 

Java代碼   收藏代碼
  1. public class ConcurrentHashMap<K, V> extends AbstractMap<K, V>  
  2.         implements ConcurrentMap<K, V>, Serializable {  
  3.     /** 
  4.      * Mask value for indexing into segments. The upper bits of a 
  5.      * key's hash code are used to choose the segment. 
  6.      */  
  7.     final int segmentMask;  
  8.   
  9.     /** 
  10.      * Shift value for indexing within segments. 
  11.      */  
  12.     final int segmentShift;  
  13.   
  14.     /** 
  15.      * The segments, each of which is a specialized hash table 
  16.      */  
  17.     final Segment<K,V>[] segments;  
  18. }  

 

全部的成員都是final的,其中segmentMask和segmentShift主要是爲了定位段,參見上面的segmentFor方法。

 

每一個Segment至關於一個子Hash表,它的數據成員以下:

 

Java代碼   收藏代碼
  1.     static final class Segment<K,V> extends ReentrantLock implements Serializable {  
  2. private static final long serialVersionUID = 2249069246763182397L;  
  3.         /** 
  4.          * The number of elements in this segment's region. 
  5.          */  
  6.         transient volatile int count;  
  7.   
  8.         /** 
  9.          * Number of updates that alter the size of the table. This is 
  10.          * used during bulk-read methods to make sure they see a 
  11.          * consistent snapshot: If modCounts change during a traversal 
  12.          * of segments computing size or checking containsValue, then 
  13.          * we might have an inconsistent view of state so (usually) 
  14.          * must retry. 
  15.          */  
  16.         transient int modCount;  
  17.   
  18.         /** 
  19.          * The table is rehashed when its size exceeds this threshold. 
  20.          * (The value of this field is always <tt>(int)(capacity * 
  21.          * loadFactor)</tt>.) 
  22.          */  
  23.         transient int threshold;  
  24.   
  25.         /** 
  26.          * The per-segment table. 
  27.          */  
  28.         transient volatile HashEntry<K,V>[] table;  
  29.   
  30.         /** 
  31.          * The load factor for the hash table.  Even though this value 
  32.          * is same for all segments, it is replicated to avoid needing 
  33.          * links to outer object. 
  34.          * @serial 
  35.          */  
  36.         final float loadFactor;  
  37. }  

count用來統計該段數據的個數,它是volatile,它用來協調修改和讀取操做,以保證讀取操做可以讀取到幾乎最新的修改。協調方式是這樣的,每次修改操做作告終構上的改變,如增長/刪除節點(修改節點的值不算結構上的改變),都要寫count值,每次讀取操做開始都要讀取count的值。這利用了Java 5中對volatile語義的加強,對同一個volatile變量的寫和讀存在happens-before關係。modCount統計段結構改變的次數,主要是爲了檢測對多個段進行遍歷過程當中某個段是否發生改變,在講述跨段操做時會還會詳述。threashold用來表示須要進行rehash的界限值。table數組存儲段中節點,每一個數組元素是個hash鏈,用HashEntry表示。table也是volatile,這使得可以讀取到最新的table值而不須要同步。loadFactor表示負載因子。

 

 

實現細節

 

修改操做

 

先來看下刪除操做remove(key)。

Java代碼   收藏代碼
  1. public V remove(Object key) {  
  2.  hash = hash(key.hashCode());  
  3.     return segmentFor(hash).remove(key, hash, null);  
  4. }  

整個操做是先定位到段,而後委託給段的remove操做。當多個刪除操做併發進行時,只要它們所在的段不相同,它們就能夠同時進行。下面是Segment的remove方法實現:

Java代碼   收藏代碼
  1. V remove(Object key, int hash, Object value) {  
  2.     lock();  
  3.     try {  
  4.         int c = count - 1;  
  5.         HashEntry<K,V>[] tab = table;  
  6.         int index = hash & (tab.length - 1);  
  7.         HashEntry<K,V> first = tab[index];  
  8.         HashEntry<K,V> e = first;  
  9.         while (e != null && (e.hash != hash || !key.equals(e.key)))  
  10.             e = e.next;  
  11.   
  12.         V oldValue = null;  
  13.         if (e != null) {  
  14.             V v = e.value;  
  15.             if (value == null || value.equals(v)) {  
  16.                 oldValue = v;  
  17.                 // All entries following removed node can stay  
  18.                 // in list, but all preceding ones need to be  
  19.                 // cloned.  
  20.                 ++modCount;  
  21.                 HashEntry<K,V> newFirst = e.next;  
  22.                 for (HashEntry<K,V> p = first; p != e; p = p.next)  
  23.                     newFirst = new HashEntry<K,V>(p.key, p.hash,  
  24.                                                   newFirst, p.value);  
  25.                 tab[index] = newFirst;  
  26.                 count = c; // write-volatile  
  27.             }  
  28.         }  
  29.         return oldValue;  
  30.     } finally {  
  31.         unlock();  
  32.     }  
  33. }  

 整個操做是在持有段鎖的狀況下執行的,空白行以前的行主要是定位到要刪除的節點e。接下來,若是不存在這個節點就直接返回null,不然就要將e前面的結點複製一遍,尾結點指向e的下一個結點。e後面的結點不須要複製,它們能夠重用。下面是個示意圖,我直接從這個網站 上覆制的(畫這樣的圖實在是太麻煩了,若是哪位有好的畫圖工具,能夠推薦一下)。

 

 

刪除元素以前:

 

a hash chain before an element is removed

 

 

刪除元素3以後:

the chain with element 3 removed

 

第二個圖其實有點問題,複製的結點中應該是值爲2的結點在前面,值爲1的結點在後面,也就是恰好和原來結點順序相反,還好這不影響咱們的討論。

 

整個remove實現並不複雜,可是須要注意以下幾點。第一,當要刪除的結點存在時,刪除的最後一步操做要將count的值減一。這必須是最後一步操做,不然讀取操做可能看不到以前對段所作的結構性修改。第二,remove執行的開始就將table賦給一個局部變量tab,這是由於table是volatile變量,讀寫volatile變量的開銷很大。編譯器也不能對volatile變量的讀寫作任何優化,直接屢次訪問非volatile實例變量沒有多大影響,編譯器會作相應優化。

 

 

接下來看put操做,一樣地put操做也是委託給段的put方法。下面是段的put方法:

Java代碼   收藏代碼
  1. V put(K key, int hash, V value, boolean onlyIfAbsent) {  
  2.     lock();  
  3.     try {  
  4.         int c = count;  
  5.         if (c++ > threshold) // ensure capacity  
  6.             rehash();  
  7.         HashEntry<K,V>[] tab = table;  
  8.         int index = hash & (tab.length - 1);  
  9.         HashEntry<K,V> first = tab[index];  
  10.         HashEntry<K,V> e = first;  
  11.         while (e != null && (e.hash != hash || !key.equals(e.key)))  
  12.             e = e.next;  
  13.   
  14.         V oldValue;  
  15.         if (e != null) {  
  16.             oldValue = e.value;  
  17.             if (!onlyIfAbsent)  
  18.                 e.value = value;  
  19.         }  
  20.         else {  
  21.             oldValue = null;  
  22.             ++modCount;  
  23.             tab[index] = new HashEntry<K,V>(key, hash, first, value);  
  24.             count = c; // write-volatile  
  25.         }  
  26.         return oldValue;  
  27.     } finally {  
  28.         unlock();  
  29.     }  
  30. }  

該方法也是在持有段鎖的狀況下執行的,首先判斷是否須要rehash,須要就先rehash。接着是找是否存在一樣一個key的結點,若是存在就直接替換這個結點的值。不然建立一個新的結點並添加到hash鏈的頭部,這時必定要修改modCount和count的值,一樣修改count的值必定要放在最後一步。put方法調用了rehash方法,reash方法實現得也很精巧,主要利用了table的大小爲2^n,這裏就不介紹了。

 

修改操做還有putAll和replace。putAll就是屢次調用put方法,沒什麼好說的。replace甚至不用作結構上的更改,實現要比put和delete要簡單得多,理解了put和delete,理解replace就不在話下了,這裏也不介紹了。

 

 

獲取操做

 

首先看下get操做,一樣ConcurrentHashMap的get操做是直接委託給Segment的get方法,直接看Segment的get方法:

 

Java代碼   收藏代碼
  1. V get(Object key, int hash) {  
  2.     if (count != 0) { // read-volatile  
  3.         HashEntry<K,V> e = getFirst(hash);  
  4.         while (e != null) {  
  5.             if (e.hash == hash && key.equals(e.key)) {  
  6.                 V v = e.value;  
  7.                 if (v != null)  
  8.                     return v;  
  9.                 return readValueUnderLock(e); // recheck  
  10.             }  
  11.             e = e.next;  
  12.         }  
  13.     }  
  14.     return null;  
  15. }  

 

get操做不須要鎖。第一步是訪問count變量,這是一個volatile變量,因爲全部的修改操做在進行結構修改時都會在最後一步寫count變量,經過這種機制保證get操做可以獲得幾乎最新的結構更新。對於非結構更新,也就是結點值的改變,因爲HashEntry的value變量是volatile的,也能保證讀取到最新的值。接下來就是對hash鏈進行遍歷找到要獲取的結點,若是沒有找到,直接訪回null。對hash鏈進行遍歷不須要加鎖的緣由在於鏈指針next是final的。可是頭指針卻不是final的,這是經過getFirst(hash)方法返回,也就是存在table數組中的值。這使得getFirst(hash)可能返回過期的頭結點,例如,當執行get方法時,剛執行完getFirst(hash)以後,另外一個線程執行了刪除操做並更新頭結點,這就致使get方法中返回的頭結點不是最新的。這是能夠容許,經過對count變量的協調機制,get能讀取到幾乎最新的數據,雖然可能不是最新的。要獲得最新的數據,只有採用徹底的同步。

 

最後,若是找到了所求的結點,判斷它的值若是非空就直接返回,不然在有鎖的狀態下再讀一次。這彷佛有些費解,理論上結點的值不可能爲空,這是由於put的時候就進行了判斷,若是爲空就要拋NullPointerException。空值的惟一源頭就是HashEntry中的默認值,由於HashEntry中的value不是final的,非同步讀取有可能讀取到空值。仔細看下put操做的語句:tab[index] = new HashEntry<K,V>(key, hash, first, value),在這條語句中,HashEntry構造函數中對value的賦值以及對tab[index]的賦值可能被從新排序,這就可能致使結點的值爲空。這種狀況應當很罕見,一旦發生這種狀況,ConcurrentHashMap採起的方式是在持有鎖的狀況下再讀一遍,這可以保證讀到最新的值,而且必定不會爲空值。

 

Java代碼   收藏代碼
  1. V readValueUnderLock(HashEntry<K,V> e) {  
  2.     lock();  
  3.     try {  
  4.         return e.value;  
  5.     } finally {  
  6.         unlock();  
  7.     }  
  8. }  

 

 

另外一個操做是containsKey,這個實現就要簡單得多了,由於它不須要讀取值:

Java代碼   收藏代碼
  1. boolean containsKey(Object key, int hash) {  
  2.     if (count != 0) { // read-volatile  
  3.         HashEntry<K,V> e = getFirst(hash);  
  4.         while (e != null) {  
  5.             if (e.hash == hash && key.equals(e.key))  
  6.                 return true;  
  7.             e = e.next;  
  8.         }  
  9.     }  
  10.     return false;  
  11. }  

 

 

跨段操做 

 

有些操做須要涉及到多個段,好比說size(), containsValaue()。先來看下size()方法:

 

Java代碼   收藏代碼
  1. public int size() {  
  2.     final Segment<K,V>[] segments = this.segments;  
  3.     long sum = 0;  
  4.     long check = 0;  
  5.     int[] mc = new int[segments.length];  
  6.     // Try a few times to get accurate count. On failure due to  
  7.     // continuous async changes in table, resort to locking.  
  8.     for (int k = 0; k < RETRIES_BEFORE_LOCK; ++k) {  
  9.         check = 0;  
  10.         sum = 0;  
  11.         int mcsum = 0;  
  12.         for (int i = 0; i < segments.length; ++i) {  
  13.             sum += segments[i].count;  
  14.             mcsum += mc[i] = segments[i].modCount;  
  15.         }  
  16.         if (mcsum != 0) {  
  17.             for (int i = 0; i < segments.length; ++i) {  
  18.                 check += segments[i].count;  
  19.                 if (mc[i] != segments[i].modCount) {  
  20.                     check = -1; // force retry  
  21.                     break;  
  22.                 }  
  23.             }  
  24.         }  
  25.         if (check == sum)  
  26.             break;  
  27.     }  
  28.     if (check != sum) { // Resort to locking all segments  
  29.         sum = 0;  
  30.         for (int i = 0; i < segments.length; ++i)  
  31.             segments[i].lock();  
  32.         for (int i = 0; i < segments.length; ++i)  
  33.             sum += segments[i].count;  
  34.         for (int i = 0; i < segments.length; ++i)  
  35.             segments[i].unlock();  
  36.     }  
  37.     if (sum > Integer.MAX_VALUE)  
  38.         return Integer.MAX_VALUE;  
  39.     else  
  40.         return (int)sum;  
  41. }  

 

size方法主要思路是先在沒有鎖的狀況下對全部段大小求和,若是不能成功(這是由於遍歷過程當中可能有其它線程正在對已經遍歷過的段進行結構性更新),最多執行RETRIES_BEFORE_LOCK次,若是還不成功就在持有全部段鎖的狀況下再對全部段大小求和。在沒有鎖的狀況下主要是利用Segment中的modCount進行檢測,在遍歷過程當中保存每一個Segment的modCount,遍歷完成以後再檢測每一個Segment的modCount有沒有改變,若是有改變表示有其它線程正在對Segment進行結構性併發更新,須要從新計算。

 

 

其實這種方式是存在問題的,在第一個內層for循環中,在這兩條語句sum += segments[i].count; mcsum += mc[i] = segments[i].modCount;之間,其它線程可能正在對Segment進行結構性的修改,致使segments[i].count和segments[i].modCount讀取的數據並不一致。這可能使size()方法返回任什麼時候候都未曾存在的大小,很奇怪javadoc竟然沒有明確標出這一點,多是由於這個時間窗口過小了吧。size()的實現還有一點須要注意,必需要先segments[i].count,才能segments[i].modCount,這是由於segment[i].count是對volatile變量的訪問,接下來segments[i].modCount才能獲得幾乎最新的值(前面我已經說了爲何只是「幾乎」了)。這點在containsValue方法中獲得了淋漓盡致的展示:

 

 

Java代碼   收藏代碼
  1. public boolean containsValue(Object value) {  
  2.     if (value == null)  
  3.         throw new NullPointerException();  
  4.   
  5.     // See explanation of modCount use above  
  6.   
  7.     final Segment<K,V>[] segments = this.segments;  
  8.     int[] mc = new int[segments.length];  
  9.   
  10.     // Try a few times without locking  
  11.     for (int k = 0; k < RETRIES_BEFORE_LOCK; ++k) {  
  12.         int sum = 0;  
  13.         int mcsum = 0;  
  14.         for (int i = 0; i < segments.length; ++i) {  
  15.             int c = segments[i].count;  
  16.             mcsum += mc[i] = segments[i].modCount;  
  17.             if (segments[i].containsValue(value))  
  18.                 return true;  
  19.         }  
  20.         boolean cleanSweep = true;  
  21.         if (mcsum != 0) {  
  22.             for (int i = 0; i < segments.length; ++i) {  
  23.                 int c = segments[i].count;  
  24.                 if (mc[i] != segments[i].modCount) {  
  25.                     cleanSweep = false;  
  26.                     break;  
  27.                 }  
  28.             }  
  29.         }  
  30.         if (cleanSweep)  
  31.             return false;  
  32.     }  
  33.     // Resort to locking all segments  
  34.     for (int i = 0; i < segments.length; ++i)  
  35.         segments[i].lock();  
  36.     boolean found = false;  
  37.     try {  
  38.         for (int i = 0; i < segments.length; ++i) {  
  39.             if (segments[i].containsValue(value)) {  
  40.                 found = true;  
  41.                 break;  
  42.             }  
  43.         }  
  44.     } finally {  
  45.         for (int i = 0; i < segments.length; ++i)  
  46.             segments[i].unlock();  
  47.     }  
  48.     return found;  
  49. }  

 

一樣注意內層的第一個for循環,裏面有語句int c = segments[i].count; 可是c卻歷來沒有被使用過,即便如此,編譯器也不能作優化將這條語句去掉,由於存在對volatile變量count的讀取,這條語句存在的惟一目的就是保證segments[i].modCount讀取到幾乎最新的值。關於containsValue方法的其它部分就不分析了,它和size方法差很少。

 

 

跨段方法中還有一個isEmpty()方法,其實現比size()方法還要簡單,也不介紹了。最後簡單地介紹下迭代方法,如keySet(), values(), entrySet()方法,這些方法都返回相應的迭代器,全部迭代器都繼承於Hash_Iterator類(提交時竟然提醒我不能包含sh It,只得加了下劃線),裏實現了主要的方法。其結構是:

 

Java代碼   收藏代碼
  1. abstract class Hash_Iterator{  
  2.     int nextSegmentIndex;  
  3.     int nextTableIndex;  
  4.     HashEntry<K,V>[] currentTable;  
  5.     HashEntry<K, V> nextEntry;  
  6.     HashEntry<K, V> lastReturned;  
  7. }  

 

 nextSegmentIndex是段的索引,nextTableIndex是nextSegmentIndex對應段中中hash鏈的索引,currentTable是nextSegmentIndex對應段的table。調用next方法時主要是調用了advance方法:

 

 

Java代碼   收藏代碼
  1. final void advance() {  
  2.     if (nextEntry != null && (nextEntry = nextEntry.next) != null)  
  3.         return;  
  4.   
  5.     while (nextTableIndex >= 0) {  
  6.         if ( (nextEntry = currentTable[nextTableIndex--]) != null)  
  7.             return;  
  8.     }  
  9.   
  10.     while (nextSegmentIndex >= 0) {  
  11.         Segment<K,V> seg = segments[nextSegmentIndex--];  
  12.         if (seg.count != 0) {  
  13.             currentTable = seg.table;  
  14.             for (int j = currentTable.length - 1; j >= 0; --j) {  
  15.                 if ( (nextEntry = currentTable[j]) != null) {  
  16.                     nextTableIndex = j - 1;  
  17.                     return;  
  18.                 }  
  19.             }  
  20.         }  
  21.     }  
  22. }  

不想再多介紹了,惟一須要注意的是跳到下一個段時,必定要先讀取下一個段的count變量。 

 

這種迭代方式的主要效果是不會拋出ConcurrentModificationException。一旦獲取到下一個段的table,也就意味着這個段的頭結點在迭代過程當中就肯定了,在迭代過程當中就不能反映對這個段節點併發的刪除和添加,對於節點的更新是可以反映的,由於節點的值是一個volatile變量。

 


結束語

 

ConcurrentHashMap是一個支持高併發的高性能的HashMap實現,它支持徹底併發的讀以及必定程度併發的寫。ConcurrentHashMap的實現也是很精巧,充分利用了最新的JMM規範,值得學習,卻不值得模仿。最後因爲本人水平有限,對大師的做品不免有誤解,若是存在,還望大牛們不吝指出。

相關文章
相關標籤/搜索