參考:https://my.oschina.net/hosee/blog/675884
http://www.cnblogs.com/ITtangtang/p/3948786.html
本文須要關注的地方。html
併發編程實踐中,ConcurrentHashMap是一個常常被使用的數據結構,相比於Hashtable以及Collections.synchronizedMap(),ConcurrentHashMap可以提供更高的併發度。同步容器將全部對容器狀態的訪問都串行化,以實現它們的線程安全性。這種方法的代價是嚴重下降併發性,當多個線程競爭容器的鎖時,吞吐量將嚴重下降。ConcurrentHashMap關鍵在於使用了分段鎖技術。
在ConcurrentHashMap的實現中使用了一個包含16個鎖的數組,每一個鎖保護全部散列桶的1/16,其中第N個散列桶由第(N mod 16)個鎖來保護。每個鎖保護的區域稱爲段(Segment),每一個段其實就是一個小的Hashtable,它們有本身的鎖。假設散列函數具備合理的分佈性,而且關鍵字可以均勻分佈,那麼這大約能把對於鎖的請求減小到原來的1/16,正是這項技術使得ConcurrentHashMap可以支持多達16個併發的寫入器。
ConcurrentHashMap與其餘併發容器一塊兒加強了同步容器類:它們提供的迭代器不會拋出 ConcurrentModificationException,所以不須要在迭代過程當中對容器加鎖。ConcurrentHashMap 返回的迭代器具備弱一致性(Weakly Consistent),而並不是「及時失敗」。弱一致性的迭代器能夠容忍併發的修改,當建立迭代器時會遍歷已有的元素,並能夠(可是不保證)在迭代器被構造後將修改操做反映給容器。
儘管有這些改進,但仍然有一些須要權衡的因素。對於一些須要在整個Map上進行計算的方法,例如size和isEmpty,這些方法的語義被略微減弱了以反映容器的併發特性。因爲size 返回的結果在計算時可能已通過期了,它實際上只是一個估計值,所以容許size返回一個近似值而不是一個精確值。由於:事實上size和isEmpty這樣的方法在併發環境下的用處很小,由於它們的返回值總在不斷變化。所以,這些操做的需求被弱化了,以換取對其餘更重要操做的性能優化,包括get、put、containsKey和remove等。
鎖分段的一個劣勢在於:與採用單個鎖來實現獨佔訪問相比,要獲取多個鎖來實現獨佔訪問將更加困難而且開銷更高。一般,在執行一個操做時最多隻需獲取一個鎖,但在某些狀況下須要加鎖整個容器,例如當ConcurrentHashMap須要擴展映射範圍,以及從新計算鍵值的散列值要分佈到更大的桶集合中時,就須要獲取分段鎖集合中的全部鎖。java
ConcurrentHashMap使用分段鎖技術,將數據分紅一段一段的存儲,而後給每一段數據配一把鎖,當一個線程佔用鎖訪問其中一個段數據的時候,其餘段的數據也能被其餘線程訪問,可以實現真正的併發訪問。以下圖是ConcurrentHashMap的內部結構圖:
它把區間按照併發級別(concurrentLevel),分紅了若干個segment。默認狀況下內部按併發級別爲16來建立。對於每一個segment的容量,默認狀況也是16。固然併發級別(concurrentLevel)和每一個段(segment)的初始容量都是能夠經過構造函數設定的。c++
ConcurrentHashMap中主要實體類就是三個:ConcurrentHashMap(整個Hash表),Segment(充當鎖的角色,每一個 Segment 對象守護整個散列表的某一個段的若干個桶),HashEntry(鍵值節點),對應上面的圖能夠看出之間的關係。算法
在 HashEntry 類中,key,hash 和 next 域都被聲明爲 final 型,value 域被聲明爲 volatile 型。編程
static final class HashEntry<K,V> { final K key; final int hash; volatile V value; final HashEntry<K,V> next; }
在 ConcurrentHashMap 中,在散列時若是產生「衝突」,將採用「鏈地址法」來處理「衝突」:把「衝突」的 HashEntry 對象連接成一個單向鏈表。因爲 HashEntry 的 next 域爲 final 型,因此新節點只能在鏈表的表頭處插入。 下圖是在一個空桶中依次插入 A,B,C 三個 HashEntry 對象後的結構圖:
注意:因爲只能在表頭插入,因此鏈表中節點的順序和插入的順序相反。其實,哪怕next不是final的,也應該從表頭插入,由於從表尾插入的話,首先須要遍歷到表尾,而後才能插入節點,複雜度O(n)。數組
Segment 類繼承於 ReentrantLock 類,從而每一個segment均可以當作一個鎖。每一個 Segment 對象用來守護其(成員對象 table 中)包含的若干個桶。
table 是一個由 HashEntry 對象組成的數組。table 數組的每個數組成員就是散列表的一個桶。安全
static final class Segment<K,V> extends ReentrantLock implements Serializable { /** * 在本 segment 範圍內,包含的 HashEntry 元素的個數 * 該變量被聲明爲 volatile 型 */ transient volatile int count; /** * table 被更新的次數 */ transient int modCount; /** * 當 table 中包含的 HashEntry 元素的個數超過本變量值時,觸發 table 的再散列 */ transient int threshold; /** * table 是由 HashEntry 對象組成的數組 */ transient volatile HashEntry<K,V>[] table; final float loadFactor; Segment(int initialCapacity, float lf) { loadFactor = lf; setTable(HashEntry.<K,V>newArray(initialCapacity)); } void setTable(HashEntry<K,V>[] newTable) { threshold = (int)(newTable.length * loadFactor); table = newTable; } /** * 根據 key 的散列值,找到 table 中對應的那個桶(table 數組的某個數組成員) */ HashEntry<K,V> getFirst(int hash) { HashEntry<K,V>[] tab = table; return tab[hash & (tab.length - 1)]; } }
public class ConcurrentHashMap<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V>, Serializable { /** * 散列表的默認初始容量爲 16,即初始默認爲 16 段 */ static final int DEFAULT_INITIAL_CAPACITY= 16; static final float DEFAULT_LOAD_FACTOR= 0.75f; /** * 散列表的默認併發級別爲 16。該值表示當前更新線程的估計數,在構造函數中沒有指定這個參數時,使用本參數 */ static final int DEFAULT_CONCURRENCY_LEVEL= 16; /** * segments 的掩碼值 * key 的散列碼的高位用來選擇具體的 segment */ final int segmentMask; /** * 偏移量 */ final int segmentShift; /** * 由 Segment 對象組成的數組 */ final Segment<K,V>[] segments; /** * 建立一個帶有指定初始容量、負載因子和併發級別的新的空散列表 */ public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if(!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if(concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; // 尋找最佳匹配參數(不小於給定參數的最接近的 2 次冪) int sshift = 0; int ssize = 1; while(ssize < concurrencyLevel) { ++sshift; ssize <<= 1; } segmentShift = 32 - sshift; // 偏移量值 segmentMask = ssize - 1; // 掩碼值 this.segments = Segment.newArray(ssize); // 建立數組 if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; int c = initialCapacity / ssize; if(c * ssize < initialCapacity) ++c; int cap = 1; while(cap < c) cap <<= 1; // 依次遍歷每一個數組元素 for(int i = 0; i < this.segments.length; ++i) // 初始化每一個數組元素引用的 Segment 對象 this.segments[i] = new Segment<K,V>(cap, loadFactor); } /** * 建立一個帶有默認初始容量 (16)、默認加載因子 (0.75) 和 默認併發級別 (16) 的空散列表。 */ public ConcurrentHashMap() { this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL); }
首先,根據 key 計算出對應的 hash 值:性能優化
public V put(K key, V value) { if (value == null) //ConcurrentHashMap 中不容許用 null 做爲映射值 throw new NullPointerException(); int hash = hash(key.hashCode()); // 計算鍵對應的散列碼 // 根據散列碼找到對應的 Segment return segmentFor(hash).put(key, hash, value, false); }
而後,根據 hash 值找到對應的Segment 對象:數據結構
/** * 使用 key 的散列碼來獲得 segments 數組中對應的 Segment */ final Segment<K,V> segmentFor(int hash) { // 將散列值右移 segmentShift 個位,並在高位填充 0 ,而後把獲得的值與 segmentMask 相「與」,從而獲得 hash 值對應的 segments 數組的下標值,最後根據下標值返回散列碼對應的 Segment 對象 return segments[(hash >>> segmentShift) & segmentMask]; }
最後,在這個 Segment 中執行具體的 put 操做:多線程
V put(K key, int hash, V value, boolean onlyIfAbsent) { lock(); // 加鎖,這裏是鎖定某個 Segment 對象而非整個 ConcurrentHashMap try { int c = count; if (c++ > threshold) // 若是超過再散列的閾值 rehash(); // 執行再散列,table 數組的長度將擴充一倍 HashEntry<K,V>[] tab = table; int index = hash & (tab.length - 1); // 找到散列值對應的具體的那個桶 HashEntry<K,V> first = tab[index]; HashEntry<K,V> e = first; while (e != null && (e.hash != hash || !key.equals(e.key))) e = e.next; V oldValue; if (e != null) { // 若是鍵值對已經存在 oldValue = e.value; if (!onlyIfAbsent) e.value = value; // 替換 value 值 } else { // 鍵值對不存在 oldValue = null; ++modCount; // 要添加新節點到鏈表中,因此 modCont 要加 1 // 建立新節點,並添加到鏈表的頭部 tab[index] = new HashEntry<K,V>(key, hash, first, value); count = c; // 寫 count 變量 } return oldValue; } finally { unlock(); // 解鎖 } }
注意:這裏的加鎖操做是針對某個具體的 Segment,鎖定的是該 Segment 而不是整個 ConcurrentHashMap。由於插入鍵 / 值對操做只是在這個 Segment 包含的某個桶中完成,不須要鎖定整個ConcurrentHashMap。此時,其餘寫線程對另外 15 個Segment 的加鎖並不會由於當前線程對這個 Segment 的加鎖而阻塞。同時,全部讀線程幾乎不會因本線程的加鎖而阻塞(除非讀線程恰好讀到這個 Segment 中某個 HashEntry 的 value 域的值爲 null,此時須要加鎖後從新讀取該值)。
相比較於 HashTable 和由同步包裝器包裝的 HashMap每次只能有一個線程執行讀或寫操做,ConcurrentHashMap 在併發訪問性能上有了質的提升。在理想狀態下,ConcurrentHashMap 能夠支持 16 個線程執行併發寫操做(若是併發級別設置爲 16),及任意數量線程的讀操做。
public V get(Object key) { int hash = hash(key); // throws NullPointerException if key null return segmentFor(hash).get(key, hash); }
V get(Object key, int hash) { if(count != 0) { // 首先讀 count 變量 HashEntry<K,V> e = getFirst(hash); while(e != null) { if(e.hash == hash && key.equals(e.key)) { V v = e.value; if(v != null) return v; // 若是讀到 value 域爲 null,說明發生了重排序,加鎖後從新讀取 return readValueUnderLock(e); } e = e.next; } } return null; }
ConcurrentHashMap徹底容許多個讀操做併發進行,讀操做並不須要加鎖。關鍵是用 HashEntry 對象的不變性來下降讀操做對加鎖的需求。只是判斷獲取的entry的value是否爲null,爲null時才使用加鎖的方式再次去獲取。
在代碼清單「HashEntry 類的定義」中咱們能夠看到,HashEntry 中的 key,hash,next 都聲明爲 final 型。這意味着,不能把節點添加到連接的中間和尾部,也不能在連接的中間和尾部刪除節點。這個特性能夠保證:在訪問某個節點時,這個節點以後的連接不會被改變。這個特性能夠大大下降處理鏈表時的複雜性。
下面分析在get的時候的線程安全性
HashEntry 類的 value 域被聲明爲 volatile 型,Java 的內存模型能夠保證:某個寫線程對 value 域的寫入立刻能夠被後續的某個讀線程「看」到。在 ConcurrentHashMap 中,不容許用 null 做爲鍵和值,當讀線程讀到某個 HashEntry 的 value 域的值爲 null 時,便知道發生了指令重排序現象(注意:volatile變量重排序規則,同時也是先行發生原則的一部分:對一個volatile變量的寫操做先行發生於後面對這個變量的讀操做,這裏的「後面」一樣是指時間上的前後順序。因此,在tab[index] = new HashEntry<K,V>(key, hash, first, value);
中,可能會出現當前線程獲得的newEntry對象是一個沒有徹底構造好的對象引用。),須要加鎖後從新讀入這個 value 值。
因爲對 volatile 變量的可見性,寫線程對鏈表的非結構性修改可以被後續不加鎖的讀線程「看到」。
假設咱們的鏈表元素是:e1-> e2 -> e3 -> e4 咱們要刪除 e3這個entry
由於HashEntry中next的不可變,因此咱們沒法直接把e2的next指向e4,而是將要刪除的節點以前的節點複製一份,造成新的鏈表。它的實現大體以下圖所示:
注意:最後纔將數組中對應桶位置的鏈表替換爲新鏈表(也就是在最後一步替換以前,tab[i]指向的始終是刪除以前的鏈表,詳細看下面的remove方法)。
若是咱們get的也恰巧是e3,可能咱們順着鏈表剛找到e1,這時另外一個線程就執行了刪除e3的操做,而咱們線程還會繼續沿着舊的鏈表找到e3返回,這時候可能看到被刪除的數據,可是在高併發環境下,這種影響是很小的。
V remove(Object key, int hash, Object value) { lock(); // 加鎖 try{ int c = count - 1; HashEntry<K,V>[] tab = table; int index = hash & (tab.length - 1); HashEntry<K,V> first = tab[index]; HashEntry<K,V> e = first; while(e != null&& (e.hash != hash || !key.equals(e.key))) e = e.next; V oldValue = null; if(e != null) { V v = e.value; if(value == null|| value.equals(v)) { // 找到要刪除的節點 oldValue = v; ++modCount; // 全部處於待刪除節點以後的節點原樣保留在鏈表中 // 全部處於待刪除節點以前的節點被克隆(實際上是把全部值取出來放到一個新的HashEntry對象中)到新鏈表中 HashEntry<K,V> newFirst = e.next;// 待刪節點的後繼結點 for(HashEntry<K,V> p = first; p != e; p = p.next) newFirst = new HashEntry<K,V>(p.key, p.hash, newFirst, p.value); // 新的頭結點是原鏈表中,刪除節點以前的那個節點 tab[index] = newFirst; count = c; // 寫 count 變量 } } return oldValue; } finally{ unlock(); // 解鎖 } }
和 get 操做同樣,首先根據散列碼找到具體的鏈表;而後遍歷這個鏈表找到要刪除的節點;最後把待刪除節點以後的全部節點原樣保留在新鏈表中,把待刪除節點以前的每一個節點克隆(實際上是把全部值取出來放到一個新的HashEntry對象中)到新鏈表中;最後纔將數組中對應桶位置的鏈表替換爲新鏈表(也就是在替換以前,get的始終是刪除以前的鏈表)。
下面經過圖例來講明 remove 操做。假設寫線程執行 remove 操做,要刪除鏈表的 C 節點,另外一個讀線程同時正在遍歷這個鏈表。
執行刪除以後的新鏈表
B中的next定義爲final,沒法修改將它指向D,所以C以前的全部節點都要從新創建。而且它們在新鏈表中的連接順序被反轉了。
在 ConcurrentHashMap中,每個 Segment 對象都有一個 count 對象來表示本 Segment 中包含的 HashEntry 對象的個數。若是咱們要統計整個ConcurrentHashMap裏元素的大小,就必須統計全部Segment裏元素的大小後求和。Segment裏的全局變量count是一個volatile變量,那麼在多線程場景下,咱們是否是直接把全部Segment的count相加就能夠獲得整個ConcurrentHashMap大小了呢?不是的,雖然相加時能夠獲取每一個Segment的count的最新值,可是拿到以後可能累加的過程當中count發生了變化,那麼統計結果就不許了。因此最安全的作法,是在統計size的時候把全部Segment鎖定。
由於在累加count操做過程當中,以前累加過的count發生變化的概率很是小,因此ConcurrentHashMap的作法是先嚐試2次經過不鎖住Segment的方式來統計各個Segment大小,若是統計的過程當中,容器的count發生了變化,則再採用加鎖的方式來統計全部Segment的大小。
那麼ConcurrentHashMap是如何判斷在統計的時候容器是否發生了變化呢?使用modCount變量,在put , remove和clean方法裏操做元素前都會將變量modCount進行加1,那麼在統計size先後比較modCount是否發生變化,從而得知容器的大小是否發生變化。
每一個版本的ConcurrentHashMap幾乎都有改動,本文說明的JDK6的源碼。JDK8相比前面的版本改動最大,下面簡要說一下:
詳細看: http://blog.csdn.net/u010723709/article/details/48007881
因爲Hashtable不管是讀仍是寫仍是遍歷,都須要得到對象鎖,串行操做,所以在多線程環境下性能比較差。
可是ConcurrentHashMap不能徹底取代Hashtable:HashTable的迭代器是強一致性的,而ConcurrentHashMap是弱一致的。其實 ConcurrentHashMap的get,clear,iterator 都是弱一致性的。 Doug Lea 也將這個判斷留給用戶本身決定是否使用ConcurrentHashMap。
弱一致性:不保證數據徹底處於一致性狀態。好比:
可能在get的時候得到一個還沒徹底構造好的HashEntry對象,致使得到的entry的value爲null,此時須要加鎖從新讀取。
public void clear() { for (int i = 0; i < segments.length; ++i) segments[i].clear(); }
由於沒有全局的鎖,在清除完一個segments以後,正在清理下一個segments的時候,已經清理segments可能又被加入了數據,所以clear返回的時候,ConcurrentHashMap中是可能存在數據的。所以,clear方法是弱一致的。
java.util 包中的集合類都返回 fail-fast 迭代器,這意味着它們假設線程在集合內容中進行迭代時,集合不會更改它的內容。若是 fail-fast 迭代器檢測到在迭代過程當中進行了更改操做,那麼它會拋出 ConcurrentModificationException。
ConcurrentHashMap中的迭代器主要包括entrySet、keySet、values方法。它們大同小異,這裏選擇entrySet解釋。當咱們調用entrySet返回值的iterator方法時,返回的是EntryIterator,在EntryIterator上調用next方法時,最終實際調用到了HashIterator.advance()方法,看下這個方法:
final void advance() { if (nextEntry != null && (nextEntry = nextEntry.next) != null) return; while (nextTableIndex >= 0) { if ( (nextEntry = currentTable[nextTableIndex--]) != null) return; } while (nextSegmentIndex >= 0) { Segment<K,V> seg = segments[nextSegmentIndex--]; if (seg.count != 0) { currentTable = seg.table; for (int j = currentTable.length - 1; j >= 0; --j) { if ( (nextEntry = currentTable[j]) != null) { nextTableIndex = j - 1; return; } } } } }
在這種迭代方式中,好比咱們刪除了鏈表的某個entry,可是在完成以前,迭代器得到了舊的鏈表指針,那麼就會遍歷舊的鏈表,而且不會報異常。