在多線程環境下,使用HashMap
進行put
操做時存在丟失數據的狀況,爲了不這種bug的隱患,強烈建議使用ConcurrentHashMap
代替HashMap
,爲了對更深刻的瞭解,本文將對JDK1.7和1.8的不一樣實現進行分析。數組
jdk1.7中採用Segment
+ HashEntry
的方式進行實現,結構以下:安全
ConcurrentHashMap
初始化時,計算出Segment
數組的大小ssize
和每一個Segment
中HashEntry
數組的大小cap
,並初始化Segment
數組的第一個元素;其中ssize
大小爲2的冪次方,默認爲16,cap
大小也是2的冪次方,最小值爲2,最終結果根據根據初始化容量initialCapacity
進行計算,計算過程以下:數據結構
if (c * ssize < initialCapacity) ++c; int cap = MIN_SEGMENT_TABLE_CAPACITY; while (cap < c) cap <<= 1;
其中Segment
在實現上繼承了ReentrantLock
,這樣就自帶了鎖的功能。多線程
當執行put
方法插入數據時,根據key的hash值,在Segment
數組中找到相應的位置,若是相應位置的Segment
還未初始化,則經過CAS進行賦值,接着執行Segment
對象的put
方法經過加鎖機制插入數據,實現以下:併發
場景:線程A和線程B同時執行相同Segment
對象的put
方法dom
一、線程A執行tryLock()
方法成功獲取鎖,則把HashEntry
對象插入到相應的位置;
二、線程B獲取鎖失敗,則執行scanAndLockForPut()
方法,在scanAndLockForPut
方法中,會經過重複執行tryLock()
方法嘗試獲取鎖,在多處理器環境下,重複次數爲64,單處理器重複次數爲1,當執行tryLock()
方法的次數超過上限時,則執行lock()
方法掛起線程B;
三、當線程A執行完插入操做時,會經過unlock()
方法釋放鎖,接着喚醒線程B繼續執行;this
由於ConcurrentHashMap
是能夠併發插入數據的,因此在準確計算元素時存在必定的難度,通常的思路是統計每一個Segment
對象中的元素個數,而後進行累加,可是這種方式計算出來的結果並不同的準確的,由於在計算後面幾個Segment
的元素個數時,已經計算過的Segment
同時可能有數據的插入或則刪除,在1.7的實現中,採用了以下方式:spa
try { for (;;) { if (retries++ == RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments.length; ++j) ensureSegment(j).lock(); // force creation } sum = 0L; size = 0; overflow = false; for (int j = 0; j < segments.length; ++j) { Segment<K,V> seg = segmentAt(segments, j); if (seg != null) { sum += seg.modCount; int c = seg.count; if (c < 0 || (size += c) < 0) overflow = true; } } if (sum == last) break; last = sum; } } finally { if (retries > RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments.length; ++j) segmentAt(segments, j).unlock(); } }
先採用不加鎖的方式,連續計算元素的個數,最多計算3次:
一、若是先後兩次計算結果相同,則說明計算出來的元素個數是準確的;
二、若是先後兩次計算結果都不一樣,則給每一個Segment
進行加鎖,再計算一次元素的個數;線程
1.8中放棄了Segment
臃腫的設計,取而代之的是採用Node
+ CAS
+ Synchronized
來保證併發安全進行實現,結構以下:設計
只有在執行第一次put
方法時纔會調用initTable()
初始化Node
數組,實現以下:
private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) { if ((sc = sizeCtl) < 0) Thread.yield(); // lost initialization race; just spin else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { if ((tab = table) == null || tab.length == 0) { int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt; sc = n - (n >>> 2); } } finally { sizeCtl = sc; } break; } } return tab; }
當執行put
方法插入數據時,根據key的hash值,在Node
數組中找到相應的位置,實現以下:
一、若是相應位置的Node
還未初始化,則經過CAS插入相應的數據;
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin }
二、若是相應位置的Node
不爲空,且當前該節點不處於移動狀態,則對該節點加synchronized
鎖,若是該節點的hash
不小於0,則遍歷鏈表更新節點或插入新節點;
if (fh >= 0) { binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } }
三、若是該節點是TreeBin
類型的節點,說明是紅黑樹結構,則經過putTreeVal
方法往紅黑樹中插入節點;
else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } }
四、若是binCount
不爲0,說明put
操做對數據產生了影響,若是當前鏈表的個數達到8個,則經過treeifyBin
方法轉化爲紅黑樹,若是oldVal
不爲空,說明是一次更新操做,沒有對元素個數產生影響,則直接返回舊值;
if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; }
五、若是插入的是一個新節點,則執行addCount()
方法嘗試更新元素個數baseCount
;
1.8中使用一個volatile
類型的變量baseCount
記錄元素的個數,當插入新數據或則刪除數據時,會經過addCount()
方法更新baseCount
,實現以下:
if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { CounterCell a; long v; int m; boolean uncontended = true; if (as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { fullAddCount(x, uncontended); return; } if (check <= 1) return; s = sumCount(); }
一、初始化時counterCells
爲空,在併發量很高時,若是存在兩個線程同時執行CAS
修改baseCount
值,則失敗的線程會繼續執行方法體中的邏輯,使用CounterCell
記錄元素個數的變化;
二、若是CounterCell
數組counterCells
爲空,調用fullAddCount()
方法進行初始化,並插入對應的記錄數,經過CAS
設置cellsBusy字段,只有設置成功的線程才能初始化CounterCell
數組,實現以下:
else if (cellsBusy == 0 && counterCells == as && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { boolean init = false; try { // Initialize table if (counterCells == as) { CounterCell[] rs = new CounterCell[2]; rs[h & 1] = new CounterCell(x); counterCells = rs; init = true; } } finally { cellsBusy = 0; } if (init) break; }
三、若是經過CAS
設置cellsBusy字段失敗的話,則繼續嘗試經過CAS
修改baseCount
字段,若是修改baseCount
字段成功的話,就退出循環,不然繼續循環插入CounterCell
對象;
else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x)) break;
因此在1.8中的size
實現比1.7簡單多,由於元素個數保存baseCount
中,部分元素的變化個數保存在CounterCell
數組中,實現以下:
public int size() { long n = sumCount(); return ((n < 0L) ? 0 : (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n); } final long sumCount() { CounterCell[] as = counterCells; CounterCell a; long sum = baseCount; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) sum += a.value; } } return sum; }
經過累加baseCount
和CounterCell
數組中的數量,便可獲得元素的總個數;