HashMap 是非線程安全的,put操做可能致使死循環。其解決方案有 HashTable 和 Collections.synchronizedMap(hashMap) 。這兩種方案都是對讀寫加鎖,獨佔式,效率比較低下。java
HashMap 在併發執行put操做時會引發死循環,由於多線程致使 HashMap 的 Entry 鏈表造成環形數據結構,則 Entry 的 next 節點永遠不爲空,會死循環獲取 Entry。算法
HashTable 使用 synchronized 來保證線程安全,可是在線程競爭激烈的狀況下,效率很是低。其緣由是全部訪問該容器的線程都必須競爭一把鎖。編程
針對上述問題,ConcurrentHashMap 使用鎖分段技術,容器裏有多把鎖,每一把鎖用於其中一部分數據,當多線程訪問不一樣數據段的數據時,線程間就不會存在鎖的競爭。數組
在JDK 1.7中,ConcurrentHashMap 採用了 Segment 數組和 HashEntry 數組的方式進行實現。其中 Segment 是一種可重入鎖(ReentrantLock),扮演鎖的角色。而 HashEntry 則是用於存儲鍵值對的數據。結構以下圖所示:安全
一個 Segment 包含一個 HashEntry 數組,每一個 HashEntry 是一個鏈表結構的元素。每一個 Segment 守護一個 HashEntry 數組的元素。數據結構
初始化時,計算出 Segment 數組的大小 ssize 和每一個 Segment 中 HashEntry 數組的大小 cap,並初始化 Segment 數組的第一個元素。其中 ssize 爲2的冪次方,默認爲16,cap 大小也是2的冪次方,最小值爲2。最終結果根據初始化容量 initialCapacity 計算。多線程
//計算segment數組長度
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// Find power-of-two sizes best matching arguments
int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
//初始化segmentShift和SegmentMask
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;
//計算每一個Segment中HashEntry數組大小cap
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;
// 初始化segment數組和segment[0]
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
複製代碼
首先,初始化了 segments 數組,其長度 ssize 是經過 concurrencyLevel 計算得出的。須要保證ssize的長度是2的N次方,segments 數組的長度最大是65536。併發
而後初始化了 segmentShift 和 segmentMask 這兩個全局變量,用於定位 segment 的散列算法。segmentShift 是用於散列運算的位數, segmentMask 是散列運算的掩碼。ssh
以後根據 initialCapaicity 和 loadfactor 這兩個參數來計算每一個 Segment 中 HashEntry 數組的大小 cap。函數
最後根據以上肯定的參數,初始化了 segment 數組以及 segment[0]。
整個 get 操做過程都不須要加鎖,所以很是高效。首先將 key 通過 Hash 以後定位到 Segment,而後再經過一個 Hash 定位到具體元素。不須要加鎖是由於 get 方法將須要使用的共享變量都定義成 volatile 類型,所以能在線程之間保持可見性,在多線程同時讀時能保證不會讀到過時的值。
put 方法須要對共享變量進行寫入操做,爲了線程安全,必須加鎖。 put 方法首先定位到 Segment,而後在 Segment 裏進行插入操做。插入操做首先要判斷是否須要對 Segment 裏的 HashEntry 數組進行擴容,而後定位添加元素的位置,將其放入到 HashEntry 數組。
Segment 的擴容比 HashMap 更恰當,由於後者是插入元素後判斷是否已經到達容量,若是到達了就擴容,可是可能擴容後沒有插入,進行了無效的擴容。Segment 在擴容時,首先建立一個原來容量兩倍的數組,而後將原數組裏的元素進行再散列後插入到新的數組。同時爲了高效, ConcurrentHashMap 不會對整個容器進行擴容,而是隻對某個 segment 進行擴容。
每一個 Segment 都有一個 volatile 修飾的全局變量 count ,求整個 ConcurrentHashMap 的 size 時很明顯就是將全部的 count 累加便可。可是 volatile 修飾的變量卻不能保證多線程的原子性,全部直接累加很容易出現併發問題。可是若是在調用 size 方法時鎖住其他的操做,效率也很低。
ConcurrentHashMap 的作法是先嚐試兩次經過不加鎖的方式進行計算,若是兩次結果相同,說明結果正確。若是計算結果不一樣,則給每一個 Segment 加鎖,進行統計。
在JDK 1.8中,改變了分段鎖的思路,採用了 Node數組 + CAS + Synchronized 來保證併發安全。底層仍然採用了數組+鏈表+紅黑樹的存儲結構。
在JDK 1.8中,使用 Node 替換 HashEntry,二者做用相同。在 Node 中, val 和 next 兩個變量都是 volatile 修飾的,保證了可見性。
使用 table 變量存放 Node 節點數組,默認爲 null, 默認大小爲16,且每次擴容時大小老是2的冪次方。在擴容時,使用 nextTable 存放新生成的數據,數組爲 table 的兩倍。
ForwardingNode 是一個特殊的 Node 節點,hash 值爲-1,存儲了 nextTable 的引用。只有table 發生擴容時,其發生做用,做爲佔位符放在 table 中表示當前節點爲null或者已經被移動。
在 HashMap 中,其核心的數據結構是鏈表。而在 ConcurrentHashMap 中,若是鏈表的數據過長會轉換爲紅黑樹來處理。經過將鏈表的節點包裝成 TreeNode 放在 TreeBin 中,而後經由 TreeBin 完成紅黑樹的轉換。
TreeBin 不負責鍵值對的包裝,用於在鏈表轉換爲紅黑樹時包裝 TreeNode 節點,用來構建紅黑樹。
在構造函數中,ConcurrentHashMap 僅僅設置了一些參數。當首次插入元素時,才經過 initTable() 方法進行了初始化。
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
//有其餘線程在初始化,掛起當前線程
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
//得到了初始化的權利,使用CAS將sizeCtl設置爲-1,表示本線程正在初始化
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
//進行初始化
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
//下次擴容的大小,至關於0.75*n
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
複製代碼
該方法的關鍵爲sizeCtl。
sizeCtl:控制標識符,用來控制table初始化和擴容操做的,在不一樣的地方有不一樣的用途,其值也不一樣,所表明的含義也不一樣:
sizeCtl 默認爲0。若是該值小於0,表示有其餘線程在初始化,須要暫停該線程。若是該線程獲取了初始化的權利,先將其設置爲-1。最後將 sizeCtl 設置爲 0.75*n,表示擴容的閾值。
put操做的核心思想依然是根據 hash 計算節點插入在 table 的位置,若是爲空,直接插入,不然插入到鏈表或樹中。
首先計算hash值,而後進入循環中遍歷table,嘗試插入。
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
//詳細代碼接下來分別講述
}
複製代碼
首先判斷 table 是否爲空,若是爲空或者是 null,則先進行初始化操做。
if (tab == null || (n = tab.length) == 0)
tab = initTable();
複製代碼
若是已經初始化過,且插入的位置沒有節點,直接插入該節點。使用CAS嘗試插入該節點。
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
複製代碼
若是有線程在擴容,先幫助擴容。
//當前位置的hashcode等於-1,須要擴容
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
複製代碼
若是都不知足,使用 synchronized 鎖寫入數據。根據數據結構的不一樣,若是是鏈表則插入尾部;若是是樹節點,使用樹的插入操做。
else {
V oldVal = null;
//對該節點進行加鎖處理(hash值相同的鏈表的頭節點)
synchronized (f) {
if (tabAt(tab, i) == f) {
//fh > 0 表示爲鏈表,將該節點插入到鏈表尾部
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
//hash 和 key 都同樣,替換value
if (e.hash == hash &&
((ek = e.key) == key || (ek != null && key.equals(ek)))) {
oldVal = e.val;
//putIfAbsent()
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
//鏈表尾部 直接插入
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
//樹節點,按照樹的插入操做進行插入
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
複製代碼
在for循環的最後,判斷鏈表的長度是否須要鏈表轉換爲樹結構。
if (binCount != 0) {
// 若是鏈表長度已經達到臨界值8,把鏈表轉換爲樹結構
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
複製代碼
最後,若是是更新節點,前邊已經返回了 oldVal,不然就是插入新的節點。還須要使用 addCount() 方法,爲 size 加一。
總結步驟以下:
參考資料