ConcurrentHashMap採用了很是精妙的"分段鎖"策略,ConcurrentHashMap的主幹是個Segment數組。Segment繼承了ReentrantLock,因此它就是一種可重入鎖(ReentrantLock)。在ConcurrentHashMap,一個Segment就是一個子哈希表,Segment裏維護了一個HashEntry數組,併發環境下,對於不一樣Segment的數據進行操做是不用考慮鎖競爭的。c++
當問到咱們有關於ConcurrentHashMap的工做原理以及實現時,能夠從如下幾個方面說:數組
ConcurrentHashMap的優勢,即HashMap和HashTable的缺點。安全
ConcurrentHashMap是Java1.5中引用的一個線程安全的支持高併發的HashMap集合類。多線程
1、線程不安全的HashMap併發
由於多線程環境下,使用Hashmap進行put操做會引發死循環,致使CPU利用率接近100%,因此在併發狀況下不能使用HashMap。ssh
2、效率低下的HashTable函數
HashTable容器使用synchronized來保證線程安全,但在線程競爭激烈的狀況下HashTable的效率很是低下。高併發
由於當一個線程訪問HashTable的同步方法時,其餘線程訪問HashTable的同步方法時,可能會進入阻塞或輪詢狀態。this
如線程1使用put進行添加元素,線程2不但不能使用put方法添加元素,而且也不能使用get方法來獲取元素,因此競爭越激烈效率越低。spa
3、鎖分段技術
HashTable容器在競爭激烈的併發環境下表現出效率低下的緣由,是由於全部訪問HashTable的線程都必須競爭同一把鎖,
那假如容器裏有多把鎖,每一把鎖用於鎖容器其中一部分數據,那麼當多線程訪問容器裏不一樣數據段的數據時,線程間就不會存在鎖競爭,
從而能夠有效的提升併發訪問效率,這就是ConcurrentHashMap所使用的鎖分段技術。
首先將數據分紅一段一段的存儲,而後給每一段數據配一把鎖,當一個線程佔用鎖訪問其中一個段數據的時候,其餘段的數據也能被其餘線程訪問。
有些方法須要跨段,好比size()和containsValue(),它們可能須要鎖定整個表而而不只僅是某個段,這須要按順序鎖定全部段,操做完畢後,又按順序釋放全部段的鎖。
這裏「按順序」是很重要的,不然極有可能出現死鎖,在ConcurrentHashMap內部,段數組是final的,而且其成員變量實際上也是final的,
可是,僅僅是將數組聲明爲final的並不保證數組成員也是final的,這須要實現上的保證。這能夠確保不會出現死鎖,由於得到鎖的順序是固定的。
oncurrentHashMap 類中包含兩個靜態內部類 HashEntry 和 Segment。
HashEntry 用來封裝映射表的鍵 / 值對;Segment 用來充當鎖的角色,每一個 Segment 對象守護整個散列映射表的若干個桶。
每一個桶是由若干個 HashEntry 對象連接起來的鏈表。一個 ConcurrentHashMap 實例中包含由若干個 Segment 對象組成的數組。
每一個Segment守護者一個HashEntry數組裏的元素,當對HashEntry數組的數據進行修改時,必須首先得到它對應的Segment鎖。
5、HashEntry類
static final class HashEntry<K,V> { final K key; // 聲明 key 爲 final 型 final int hash; // 聲明 hash 值爲 final 型 volatile V value; // 聲明 value 爲 volatile 型 final HashEntry<K,V> next; // 聲明 next 爲 final 型 HashEntry(K key, int hash, HashEntry<K,V> next, V value) { this.key = key; this.hash = hash; this.next = next; this.value = value; } }
每一個HashEntry表明Hash表中的一個節點,在其定義的結構中能夠看到,除了value值沒有定義final,其他的都定義爲final類型,咱們知道Java中關鍵詞final修飾的域成爲最終域。
用關鍵詞final修飾的變量一旦賦值,就不能改變,也稱爲修飾的標識爲常量。這就意味着咱們刪除或者增長一個節點的時候,就必須從頭開始從新創建Hash鏈,由於next引用值須要改變。
因爲 HashEntry 的 next 域爲 final 型,因此新節點只能在鏈表的表頭處插入。 例如將A,B,C插入空桶中,插入後的結構爲:
6、segment類
static final class Segment<K,V> extends ReentrantLock implements Serializable { private static final long serialVersionUID = 2249069246763182397L; /** * 在本 segment 範圍內,包含的 HashEntry 元素的個數 * 該變量被聲明爲 volatile 型,保證每次讀取到最新的數據 */ transient volatile int count; /** *table 被更新的次數 */ transient int modCount; /** * 當 table 中包含的 HashEntry 元素的個數超過本變量值時,觸發 table 的再散列 */ transient int threshold; /** * table 是由 HashEntry 對象組成的數組 * 若是散列時發生碰撞,碰撞的 HashEntry 對象就以鏈表的形式連接成一個鏈表 * table 數組的數組成員表明散列映射表的一個桶 * 每一個 table 守護整個 ConcurrentHashMap 包含桶總數的一部分 * 若是併發級別爲 16,table 則守護 ConcurrentHashMap 包含的桶總數的 1/16 */ transient volatile HashEntry<K,V>[] table; /** * 裝載因子 */ final float loadFactor; }
Segment 類繼承於 ReentrantLock 類,從而使得 Segment 對象能充當鎖的角色。每一個 Segment 對象用來守護其(成員對象 table 中)包含的若干個桶。
table 是一個由 HashEntry 對象組成的數組。table 數組的每個數組成員就是散列映射表的一個桶。
每個 Segment 對象都有一個 count 對象來表示本 Segment 中包含的 HashEntry 對象的總數。
之因此在每一個 Segment 對象中包含一個計數器,而不是在 ConcurrentHashMap 中使用全局的計數器,是爲了不出現「熱點域」而影響 ConcurrentHashMap 的併發性。
下圖是依次插入 ABC 三個 HashEntry 節點後,Segment 的結構示意圖。
7、ConcurrentHashMap 類
默認的狀況下,每一個ConcurrentHashMap 類會建立16個併發的segment,每一個segment裏面包含多個Hash表,每一個Hash鏈都是有HashEntry節點組成的。
若是鍵能均勻散列,每一個 Segment 大約守護整個散列表中桶總數的 1/16。
public class ConcurrentHashMap<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V>, Serializable { /** * 散列映射表的默認初始容量爲 16,即初始默認爲 16 個桶 * 在構造函數中沒有指定這個參數時,使用本參數 */ static final int DEFAULT_INITIAL_CAPACITY= 16; /** * 散列映射表的默認裝載因子爲 0.75,該值是 table 中包含的 HashEntry 元素的個數與 * table 數組長度的比值 * 當 table 中包含的 HashEntry 元素的個數超過了 table 數組的長度與裝載因子的乘積時, * 將觸發 再散列 * 在構造函數中沒有指定這個參數時,使用本參數 */ static final float DEFAULT_LOAD_FACTOR= 0.75f; /** * 散列表的默認併發級別爲 16。該值表示當前更新線程的估計數 * 在構造函數中沒有指定這個參數時,使用本參數 */ static final int DEFAULT_CONCURRENCY_LEVEL= 16; /** * segments 的掩碼值 * key 的散列碼的高位用來選擇具體的 segment */ final int segmentMask; /** * 偏移量 */ final int segmentShift; /** * 由 Segment 對象組成的數組 */ final Segment<K,V>[] segments; /** * 建立一個帶有指定初始容量、加載因子和併發級別的新的空映射。 */ public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if(!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if(concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; // 尋找最佳匹配參數(不小於給定參數的最接近的 2 次冪) int sshift = 0; int ssize = 1; while(ssize < concurrencyLevel) { ++sshift; ssize <<= 1; } segmentShift = 32 - sshift; // 偏移量值 segmentMask = ssize - 1; // 掩碼值 this.segments = Segment.newArray(ssize); // 建立數組 if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; int c = initialCapacity / ssize; if(c * ssize < initialCapacity) ++c; int cap = 1; while(cap < c) cap <<= 1; // 依次遍歷每一個數組元素 for(int i = 0; i < this.segments.length; ++i) // 初始化每一個數組元素引用的 Segment 對象 this.segments[i] = new Segment<K,V>(cap, loadFactor); } /** * 建立一個帶有默認初始容量 (16)、默認加載因子 (0.75) 和 默認併發級別 (16) * 的空散列映射表。 */ public ConcurrentHashMap() { // 使用三個默認參數,調用上面重載的構造函數來建立空散列映射表 this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL); }
8、 用分離鎖實現多個線程間的併發寫操做
插入數據後的ConcurrentHashMap的存儲形式
(1)Put方法的實現
首先,根據 key 計算出對應的 hash 值:
public V put(K key, V value) { if (value == null) //ConcurrentHashMap 中不容許用 null 做爲映射值 throw new NullPointerException(); int hash = hash(key.hashCode()); // 計算鍵對應的散列碼 // 根據散列碼找到對應的 Segment return segmentFor(hash).put(key, hash, value, false); } 根據 hash 值找到對應的 Segment: /** * 使用 key 的散列碼來獲得 segments 數組中對應的 Segment */ final Segment<K,V> segmentFor(int hash) { // 將散列值右移 segmentShift 個位,並在高位填充 0 // 而後把獲得的值與 segmentMask 相「與」 // 從而獲得 hash 值對應的 segments 數組的下標值 // 最後根據下標值返回散列碼對應的 Segment 對象 return segments[(hash >>> segmentShift) & segmentMask]; }
在這個 Segment 中執行具體的 put 操做:
V put(K key, int hash, V value, boolean onlyIfAbsent) { lock(); // 加鎖,這裏是鎖定某個 Segment 對象而非整個 ConcurrentHashMap try { int c = count; if (c++ > threshold) // 若是超過再散列的閾值 rehash(); // 執行再散列,table 數組的長度將擴充一倍 HashEntry<K,V>[] tab = table; // 把散列碼值與 table 數組的長度減 1 的值相「與」 // 獲得該散列碼對應的 table 數組的下標值 int index = hash & (tab.length - 1); // 找到散列碼對應的具體的那個桶 HashEntry<K,V> first = tab[index]; HashEntry<K,V> e = first; while (e != null && (e.hash != hash || !key.equals(e.key))) e = e.next; V oldValue; if (e != null) { // 若是鍵 / 值對以經存在 oldValue = e.value; if (!onlyIfAbsent) e.value = value; // 設置 value 值 } else { // 鍵 / 值對不存在 oldValue = null; ++modCount; // 要添加新節點到鏈表中,因此 modCont 要加 1 // 建立新節點,並添加到鏈表的頭部 tab[index] = new HashEntry<K,V>(key, hash, first, value); count = c; // 寫 count 變量 } return oldValue; } finally { unlock(); // 解鎖 } `}
這裏的加鎖操做是針對(鍵的 hash 值對應的)某個具體的 Segment,鎖定的是該 Segment 而不是整個 ConcurrentHashMap。
由於插入鍵 / 值對操做只是在這個 Segment 包含的某個桶中完成,不須要鎖定整個ConcurrentHashMap。
此時,其餘寫線程對另外 15 個Segment 的加鎖並不會由於當前線程對這個 Segment 的加鎖而阻塞。
同時,全部讀線程幾乎不會因本線程的加鎖而阻塞(除非讀線程恰好讀到這個 Segment 中某個 HashEntry 的 value 域的值爲 null,此時須要加鎖後從新讀取該值)。
(2)Get方法的實現
V get(Object key, int hash) { if(count != 0) { // 首先讀 count 變量 HashEntry<K,V> e = getFirst(hash); while(e != null) { if(e.hash == hash && key.equals(e.key)) { V v = e.value; if(v != null) return v; // 若是讀到 value 域爲 null,說明發生了重排序,加鎖後從新讀取 return readValueUnderLock(e); } e = e.next; } } return null; } V readValueUnderLock(HashEntry<K,V> e) { lock(); try { return e.value; } finally { unlock(); } }
ConcurrentHashMap中的讀方法不須要加鎖,全部的修改操做在進行結構修改時都會在最後一步寫count 變量,經過這種機制保證get操做可以獲得幾乎最新的結構更新。
(3)Remove方法的實現
V remove(Object key, int hash, Object value) { lock(); //加鎖 try{ int c = count - 1; HashEntry<K,V>[] tab = table; //根據散列碼找到 table 的下標值 int index = hash & (tab.length - 1); //找到散列碼對應的那個桶 HashEntry<K,V> first = tab[index]; HashEntry<K,V> e = first; while(e != null&& (e.hash != hash || !key.equals(e.key))) e = e.next; V oldValue = null; if(e != null) { V v = e.value; if(value == null|| value.equals(v)) { //找到要刪除的節點 oldValue = v; ++modCount; //全部處於待刪除節點以後的節點原樣保留在鏈表中 //全部處於待刪除節點以前的節點被克隆到新鏈表中 HashEntry<K,V> newFirst = e.next;// 待刪節點的後繼結點 for(HashEntry<K,V> p = first; p != e; p = p.next) newFirst = new HashEntry<K,V>(p.key, p.hash, newFirst, p.value); //把桶連接到新的頭結點 //新的頭結點是原鏈表中,刪除節點以前的那個節點 tab[index] = newFirst; count = c; //寫 count 變量 } } return oldValue; } finally{ unlock(); //解鎖 } }
整個操做是在持有段鎖的狀況下執行的,空白行以前的行主要是定位到要刪除的節點e。
若是不存在這個節點就直接返回null,不然就要將e前面的結點複製一遍,尾結點指向e的下一個結點。
e後面的結點不須要複製,它們能夠重用。
中間那個for循環是作什麼用的呢?從代碼來看,就是將定位以後的全部entry克隆並拼回前面去,但有必要嗎?
每次刪除一個元素就要將那以前的元素克隆一遍?這點實際上是由entry的不變性來決定的,仔細觀察entry定義,發現除了value,其餘全部屬性都是用final來修飾的,
這意味着在第一次設置了next域以後便不能再改變它,取而代之的是將它以前的節點全都克隆一次。至於entry爲何要設置爲不變性,這跟不變性的訪問不須要同步從而節省時間有關。
(4)containsKey方法的實現,它不須要讀取值。
boolean containsKey(Object key, int hash) { if (count != 0) { // read-volatile HashEntry<K,V> e = getFirst(hash); while (e != null) { if (e.hash == hash && key.equals(e.key)) return true; e = e.next; } } return false; }
(5)size()
咱們要統計整個ConcurrentHashMap裏元素的大小,就必須統計全部Segment裏元素的大小後求和。
Segment裏的全局變量count是一個volatile變量,那麼在多線程場景下,咱們是否是直接把全部Segment的count相加就能夠獲得整個ConcurrentHashMap大小了呢?
不是的,雖然相加時能夠獲取每一個Segment的count的最新值,可是拿到以後可能累加前使用的count發生了變化,那麼統計結果就不許了。
因此最安全的作法,是在統計size的時候把全部Segment的put,remove和clean方法所有鎖住,可是這種作法顯然很是低效。
由於在累加count操做過程當中,以前累加過的count發生變化的概率很是小,因此ConcurrentHashMap的作法是先嚐試2次經過不鎖住Segment的方式來統計各個Segment大小,若是統計的過程當中,容器的count發生了變化,則再採用加鎖的方式來統計全部Segment的大小。
那麼ConcurrentHashMap是如何判斷在統計的時候容器是否發生了變化呢?使用modCount變量,在put , remove和clean方法裏操做元素前都會將變量modCount進行加1,那麼在統計size先後比較modCount是否發生變化,從而得知容器的大小是否發生變化。
9、總結
1.在使用鎖來協調多線程間併發訪問的模式下,減少對鎖的競爭能夠有效提升併發性。
有兩種方式能夠減少對鎖的競爭:
減少請求同一個鎖的頻率。
減小持有鎖的時間。
2.ConcurrentHashMap 的高併發性主要來自於三個方面:
用分離鎖實現多個線程間的更深層次的共享訪問。
用 HashEntery 對象的不變性來下降執行讀操做的線程在遍歷鏈表期間對加鎖的需求。
經過對同一個 Volatile 變量的寫 / 讀訪問,協調不一樣線程間讀 / 寫操做的內存可見性。
使用分離鎖,減少了請求同一個鎖的頻率。