上篇文章介紹了 HashMap 在多線程併發狀況下是不安全的,多線程併發推薦使用 ConcurrentHashMap ,那麼 ConcurrentHashMap 是什麼?它的設計思想是什麼,源碼是怎麼實現的?node
Concurrent翻譯過來是併發的意思,字面理解它的做用是處理併發狀況的 HashMap,在介紹它以前先回顧下以前的知識。經過前面兩篇學習,咱們知道多線程併發下 HashMap 是不安全的(如死循環),更廣泛的是多線程併發下,因爲堆內存對於各個線程是共享的,而 HashMap 的 put 方法不是原子操做,假設Thread1先 put 值,而後 sleep 2秒(也能夠是系統時間片切換失去執行權),在這2秒內值被Thread2改了,Thread1「醒來」再 get 的時候發現已經不是原來的值了,這就容易出問題。數組
那麼如何避免這種多線程「奧迪變奧拓」的狀況呢?常規思路就是給 HashMap 的 put 方法加鎖(synchronized),保證同一個時刻只容許一個線程擁有對 hashmap 有寫的操做權限便可。然而假如線程1中操做耗時,佔着茅坑半天不出來,其餘須要操做該 hashmap 的線程就須要在門口排隊半天,嚴重影響用戶體驗(HashTable 就是這麼幹的)。舉個生活中的例子,不少銀行除了存取錢,還支持存取貴重物品,貴重物品都放在保險箱裏,把 HashMap 和 HashTable 比做銀行,結構:安全
把線程比做人,對應的狀況以下:bash
多線程下用 HashMap 不肯定性過高,有破產的風險,不能選;用 HashTable 不會破產,可是用戶體驗不太好,那麼怎樣才能作到多人存取既不影響他人存值,又不用排隊呢?有人提議搞個「銀行者聯盟」,多開幾個像HashTable 這種「帶鎖」的銀行就行了,有多少人辦理業務,就開多少個銀行,一對一服務,這個區都是大老闆,開銀行的成本都是小錢,因而「銀行者聯盟」成立了。多線程
接下來的狀況是這樣的:好比蓋倫和亞索一塊兒去銀行存他們的大寶劍,這個「銀行者聯盟」一頓操做,而後對蓋倫說,1號銀行如今沒人,你能夠去那存,不用排隊,而後蓋倫就去1號銀行存他的大寶劍,1號銀行把蓋倫接進門,立刻拉閘,一頓操做,而後把蓋倫的大寶劍放在第x行第x個保險箱,等蓋倫辦妥離開後,再開閘;一樣「銀行者聯盟」對亞索說,2號銀行如今沒人,你能夠去那存,不用排隊,而後亞索去2號銀行存他的大寶劍,2號銀行把亞索接進門,立刻拉閘,一頓操做把亞索的大寶劍放在第x行第x號保險箱,等亞索離開後再開閘,此時無論蓋倫和亞索在各自銀行裏面待多久都不會影響到彼此,不用擔憂本身的大寶劍被人偷換了。這就是ConcurrentHashMap的設計思路,用一個圖來理解併發
從上圖能夠看出,此時鎖的是對應的單個銀行,而不是整個「銀行者聯盟」。分析下這種設計的特色:ssh
由這幾點基本思想能夠引起一些思考,好比:函數
1.成立「銀行者聯盟」時初識銀行數是多少?怎麼設計合理?源碼分析
上面這張圖沒有給出是否須要排隊的結論,這是由於須要結合實際狀況分析,好比初識化有16個銀行,只有兩我的來辦理業務,那天然不須要排隊;若是如今16個銀行都有人在辦理業務,這時候來了第17我的,那麼他仍是須要排隊的。因爲「銀行者聯盟」事先沒法得知會有多少人來辦理業務,因此在它創立的時候須要制定一個「標準」,即初始銀行數量,人多的狀況「銀行者聯盟」應該多開幾家銀行,避免別人排隊;人少的狀況應該少開,避免浪費錢(什麼,你說不差錢?那也不行)性能
2.當有人來辦理業務的時候,「銀行者聯盟」怎麼肯定此人去哪一個銀行?
正常狀況下,若是全部銀行都是未上鎖狀態,那麼有人來辦理業務去哪都不用排隊,當其中有些銀行已經上鎖,那麼後續「銀行者聯盟」給人推薦的時候就不能把客戶往上鎖的銀行引了,不然分分鐘給人錘成麻瓜。所以「銀行者聯盟」須要時刻保持清醒的頭腦,對本身的銀行空閒狀況瞭如指掌,每次給用戶推薦都應該是最好的選擇。
3.「銀行者聯盟」怎麼保證同一時間不會有兩我的在同一個銀行擁有存權限?
經過對指定銀行加鎖/解鎖的方式實現。
經過 Java7 的源碼分析下代碼實現,先看下一些重要的成員
//默認的數組大小16(HashMap裏的那個數組)
static final int DEFAULT_INITIAL_CAPACITY = 16;
//擴容因子0.75
static final float DEFAULT_LOAD_FACTOR = 0.75f;
//ConcurrentHashMap中的數組
final Segment<K,V>[] segments
//默認併發標準16
static final int DEFAULT_CONCURRENCY_LEVEL = 16;
//Segment是ReentrantLock子類,所以擁有鎖的操做
static final class Segment<K,V> extends ReentrantLock implements Serializable {
//HashMap的那一套,分別是數組、鍵值對數量、閾值、負載因子
transient volatile HashEntry<K,V>[] table;
transient int count;
transient int threshold;
final float loadFactor;
Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
this.loadFactor = lf;
this.threshold = threshold;
this.table = tab;
}
}
//換了馬甲仍是認識你!!!HashEntry對象,存key、value、hash值以及下一個節點
static final class HashEntry<K,V> {
final int hash;
final K key;
volatile V value;
volatile HashEntry<K,V> next;
}
//segment中HashEntry[]數組最小長度
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;
//用於定位在segments數組中的位置,下面介紹
final int segmentMask;
final int segmentShift;
複製代碼
上面這些一下出來有點接受不了不要緊,下面都會介紹到。
接下來從最簡單的初識化開始分析
ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
複製代碼
默認構造函數會調用帶三個參數的構造函數
public ConcurrentHashMap() {
this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// Find power-of-two sizes best matching arguments
//步驟① start
int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;
//步驟① end
//步驟② start
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;
//步驟② end
// create segments and segments[0]
//步驟③ start
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
//步驟③ end
}
複製代碼
上面定義了許多臨時變量,註釋寫的又少,第一次看名字根本不知道這鬼東西表明什麼意思,不過咱們能夠把已知的數據代進去,算出這些變量的值,再分析能不能找出一些貓膩。假設這是第一次默認建立:
例子1 | 例子2 |
---|---|
ssize = 1,concurrencyLevel = 10 | ssize = 1,concurrencyLevel = 8 |
ssize <<= 1 —> 2<10 知足 | ssize <<= 1 —> 2<10 知足 |
ssize <<= 1 —> 4<10 知足 | ssize <<= 1 —> 4<10 知足 |
ssize <<= 1 —> 8<10 知足 | ssize <<= 1 —> 8<10 不知足 ssize = 8 |
ssize <<= 1 —> 16<10 不知足 ssize = 16 |
因此咱們傳 concurrencyLevel 不必定就是最後數組的長度,長度的計算公式:
長度 = 2的n次方(2的n次方 >= concurrencyLevel)
到這裏只是建立了一個長度爲16的Segment 數組,並初始化數組0號位置,segmentShift和segmentMask還沒派上用場,畫圖存檔:
接着看 put 方法
public V put(K key, V value) {
Segment<K,V> s;
//步驟①注意valus不能爲空!!!
if (value == null)
throw new NullPointerException();
//根據key計算hash值,key也不能爲null,不然hash(key)報空指針
int hash = hash(key);
//步驟②派上用場了,根據hash值計算在segments數組中的位置
int j = (hash >>> segmentShift) & segmentMask;
//步驟③查看當前數組中指定位置Segment是否爲空
//若爲空,先建立初始化Segment再put值,不爲空,直接put值。
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
return s.put(key, hash, value, false);
}
複製代碼
步驟①能夠看到和 HashMap 的區別,這裏的 key/value 爲空會報空指針異常;步驟②先根據 key 值計算 hash 值,再和前面算出來的兩個變量計算出這個 key 應該放在哪一個Segment中(具體怎麼計算的有興趣能夠去研究下,先高位運算再取與),假設咱們算出來該鍵值對應該放在5號,步驟③判斷5號爲空,看下 ensureSegment() 方法
private Segment<K,V> ensureSegment(int k) {
//獲取segments
final Segment<K,V>[] ss = this.segments;
long u = (k << SSHIFT) + SBASE; // raw offset
Segment<K,V> seg;
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
//拷貝一份和segment 0同樣的segment
Segment<K,V> proto = ss[0]; // use segment 0 as prototype
//大小和segment 0一致,爲2
int cap = proto.table.length;
//負載因子和segment 0一致,爲0.75
float lf = proto.loadFactor;
//閾值和segment 0一致,爲1
int threshold = (int)(cap * lf);
//根據大小建立HashEntry數組tab
HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
//再次檢查
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) { // recheck
根據已有屬性建立指定位置的Segment
Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) {
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
break;
}
}
}
return seg;
}
複製代碼
該方法重點在於拷貝了segments[0],所以新建立的Segment與segment[0]的配置相同,因爲多個線程都會有可能執行該方法,所以這裏經過UNSAFE的一些原子性操做的方法作了屢次的檢查,到目前爲止畫圖存檔:
如今「舞臺」也有了,請開始你的表演,看下 Segment 的put方法
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
//步驟① start
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
//步驟① end
V oldValue;
try {
//步驟② start
//獲取Segment中的HashEntry[]
HashEntry<K,V>[] tab = table;
//算出在HashEntry[]中的位置
int index = (tab.length - 1) & hash;
//找到HashEntry[]中的指定位置的第一個節點
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) {
//若是不爲空,遍歷這條鏈
if (e != null) {
K k;
//狀況① 以前已存過,則替換原值
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {
//狀況② 另外一個線程的準備工做
if (node != null)
//鏈表頭插入方式
node.setNext(first);
else //狀況③ 該位置爲空,則新建一個節點(注意這裏採用鏈表頭插入方式)
node = new HashEntry<K,V>(hash, key, value, first);
//鍵值對數量+1
int c = count + 1;
//若是鍵值對數量超過閾值
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
//擴容
rehash(node);
else //未超過閾值,直接放在指定位置
setEntryAt(tab, index, node);
++modCount;
count = c;
//插入成功返回null
oldValue = null;
break;
}
}
//步驟② end
} finally {
//步驟③
//解鎖
unlock();
}
//修改爲功,返回原值
return oldValue;
}
複製代碼
上面的 put 方法其實和 Java7 HashMap裏大體是同樣的,只是多了加鎖/解鎖兩步,也正由於這樣才保證了同一時刻只有一個線程擁有修改的權限。按步驟分析下上面的流程:
假設如今Thread1進來存值,前面沒人來過,它能夠成功拿到鎖,根據計算,得出它要存的鍵值對應該放在HashEntry[] 的0號位置,0號位置爲空,因而新建一個 HashEntry,並經過 setEntryAt() 方法,放在0號位置,然而還沒等 Thread1 釋放鎖,系統的時間片切到了 Thread2 ,先畫圖存檔
Thread2 也來存值,經過前面的計算,剛好 Thread2 也被定位到 segments[5],接下來 Thread2 嘗試獲取鎖,沒有成功(Thread1 還未釋放),執行 scanAndLockForPut() 方法:
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
//經過Segment和hash值尋找匹配的HashEntry
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
HashEntry<K,V> node = null;
//重試次數
int retries = -1; // negative while locating node
//循環嘗試獲取鎖
while (!tryLock()) {
HashEntry<K,V> f; // to recheck first below
//步驟①
if (retries < 0) {
//狀況① 沒找到,以前表中不存在
if (e == null) {
if (node == null) // speculatively create node
//新建 HashEntry 備用,retries改爲0
node = new HashEntry<K,V>(hash, key, value, null);
retries = 0;
}
//狀況② 找到,恰好第一個節點就是,retries改爲0
else if (key.equals(e.key))
retries = 0;
//狀況③ 第一個節點不是,移到下一個,retries仍是-1,繼續找
else
e = e.next;
}
//步驟②
//嘗試了MAX_SCAN_RETRIES次還沒拿到鎖,簡直B了dog!
else if (++retries > MAX_SCAN_RETRIES) {
//泉水掛機
lock();
break;
}
//步驟③
//在MAX_SCAN_RETRIES次過程當中,key對應的entry發生了變化,則從頭開始
else if ((retries & 1) == 0 &&
(f = entryForHash(this, hash)) != first) {
e = first = f; // re-traverse if entry changed
retries = -1;
}
}
return node;
}
複製代碼
經過上面的註釋分析能夠看出,Thread2 雖然此刻沒有權限修改,可是它也沒閒着,利用等鎖的這個時間,把本身要放的鍵值對在數組中哪一個位置計算出來了,這樣當 Thread2 一拿到鎖就能夠立馬定位到具體位置操做,節省時間。上面的步驟③稍微解釋下,好比 Thread2 經過查找得知本身要修改的值在0號位置,但在 Thread1 裏面又把該值改到了1號位置,若是它還去0號操做那確定出問題了,因此須要從新肯定。
假設 Thread2 put 值爲("亞索",「98」),對應1號位置,那麼在 scanAndLockForPut 方法中對應狀況①,畫圖存檔:
再回到 Segment put 方法中的狀況②,當 Thread1 釋放鎖後,Thread2 持有鎖,並準備把亞索放在1號位置,然而此時 Segment[5] 裏的鍵值對數量2 > 閾值1,因此調用 rehash() 方法擴容,
private void rehash(HashEntry<K,V> node) {
/*
* Reclassify nodes in each list to new table. Because we
* are using power-of-two expansion, the elements from
* each bin must either stay at same index, or move with a
* power of two offset. We eliminate unnecessary node
* creation by catching cases where old nodes can be
* reused because their next fields won't change. * Statistically, at the default threshold, only about * one-sixth of them need cloning when a table * doubles. The nodes they replace will be garbage * collectable as soon as they are no longer referenced by * any reader thread that may be in the midst of * concurrently traversing table. Entry accesses use plain * array indexing because they are followed by volatile * table write. */ //舊數組引用 HashEntry<K,V>[] oldTable = table; //舊數組長度 int oldCapacity = oldTable.length; //新數組長度爲舊數組的2倍 int newCapacity = oldCapacity << 1; //修改新的閾值 threshold = (int)(newCapacity * loadFactor); //建立新表 HashEntry<K,V>[] newTable = (HashEntry<K,V>[]) new HashEntry[newCapacity]; int sizeMask = newCapacity - 1; //遍歷舊錶 for (int i = 0; i < oldCapacity ; i++) { HashEntry<K,V> e = oldTable[i]; if (e != null) { HashEntry<K,V> next = e.next; //肯定在新表中的位置 int idx = e.hash & sizeMask; //狀況① 鏈表只有一個節點,指定轉移到新表指定位置 if (next == null) // Single node on list newTable[idx] = e; else { // Reuse consecutive sequence at same slot HashEntry<K,V> lastRun = e; int lastIdx = idx; for (HashEntry<K,V> last = next; last != null; last = last.next) { //狀況② 擴容先後位置發生改變 int k = last.hash & sizeMask; if (k != lastIdx) { lastIdx = k; lastRun = last; } } //將改變的鍵值對放到新表的對應位置 newTable[lastIdx] = lastRun; // Clone remaining nodes //狀況③ 把鏈表中剩下的節點拷到新表中 for (HashEntry<K,V> p = e; p != lastRun; p = p.next) { V v = p.value; int h = p.hash; int k = h & sizeMask; HashEntry<K,V> n = newTable[k]; newTable[k] = new HashEntry<K,V>(h, p.key, v, n); } } } } //添加新的節點(鏈表頭插入方式) int nodeIndex = node.hash & sizeMask; // add the new node node.setNext(newTable[nodeIndex]); newTable[nodeIndex] = node; table = newTable; } 複製代碼
一樣是擴容轉移,這裏的代碼比 HashMap 中的 transfer 多了一些操做,在上上篇學習 HashMap 擴容可知,擴容後鍵值對的新位置要麼和原位置同樣,要麼等於原位置+舊數組的長度,因此畫個圖來理解下上面代碼這麼寫的緣由:
前提:當前 HashEntry[] 長度爲8,閾值爲 8*0.75 = 6,因此 put 第7個鍵值對須要擴容 ,蓋倫和亞索擴容先後位置不變,妖姬和卡特擴容後位置須要加上原數組長度,因此執行上面代碼流程:
上面的代碼先找出擴容先後須要轉移的節點,先執行轉移,而後再把該條鏈上剩下的節點轉移,之因此這麼寫是起到複用的效果,註釋中也說了,在使用默認閾值的狀況下,只有大約 1/6 的節點須要被 clone 。注意到目前爲止,能夠看到不管是擴容轉移仍是新增節點,Java7都是採用的頭插入方式,流程圖以下:
相比之下,get 方法沒有加鎖/解鎖的操做,代碼比較簡單就不分析了。
Java8 對比Java7有很大的不一樣,好比取消了Segments數組,容許併發擴容。
先看下ConcurrentHashMap的初始化
public ConcurrentHashMap() {
}
複製代碼
和Java7不同,這裏是個空方法,那麼它具體的初始化操做呢?直接看下 put 方法
public V put(K key, V value) {
return putVal(key, value, false);
}
/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
// key/value不能爲空!!!
if (key == null || value == null) throw new NullPointerException();
//計算hash值
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
//註釋① 表爲null則初始化
if (tab == null || (n = tab.length) == 0)
tab = initTable();
//CAS方法判斷指定位置是否爲null,爲空則經過建立新節點,經過CAS方法設置在指定位置
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
//當前節點正在擴容
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
//指定位置不爲空
else {
V oldVal = null;
//註釋② 加鎖
synchronized (f) {
if (tabAt(tab, i) == f) {
//節點是鏈表的狀況
if (fh >= 0) {
binCount = 1;
//遍歷總體鏈
for (Node<K,V> e = f;; ++binCount) {
K ek;
//若是已存在,替換原值
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
//若是是新加節點,則以尾部插入實現添加
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
//節點是紅黑樹的狀況
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
//遍歷紅黑樹
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
else if (f instanceof ReservationNode)
throw new IllegalStateException("Recursive update");
}
}
if (binCount != 0) {
//鏈表中節點個數超過8轉成紅黑樹
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
//註釋③ 添加節點
addCount(1L, binCount);
return null;
}
複製代碼
代碼有點長,第一次看頗有可能引發身體不適,主要是由於引入了紅黑樹的判斷和操做,以及線程安全的操做。一樣key/value 爲空會報空指針異常,這也是和 HashMap 一個明顯的區別。
調用 initTable 初始化數組
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
// sizeCtl小於0,當前線程讓出執行權
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
//CAS 操做將 sizeCtl 值改成-1
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
//默認建立大小爲16的數組
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
//初始化完再改回來
sizeCtl = sc;
}
break;
}
}
return tab;
}
複製代碼
put方法並無加鎖,那麼它是如何保證建立新表的時候併發安全呢?答案就是這裏的 sizeCtl ,sizeCtl 默認值爲0,當一個線程初始化數組時,會將 sizeCtl 改爲 -1,因爲被 volatile 修飾,對於其餘線程來講這個變化是可見的,上面代碼看到後續線程判斷 sizeCtl 小於0 就會讓出執行權。
Java8 摒棄了Segment,而是對數組中單個位置加鎖。當指定位置節點不爲 null 時,狀況與 Java8 HashMap 操做相似,新節點的添加仍是尾部插入方式。
無論是鏈表的仍是紅黑樹,肯定以後總的節點數會加1,可能會引發擴容,Java8 ConcunrrentHashMap 支持併發擴容,以前擴容老是由一個線程將舊數組中的鍵值對轉移到新的數組中,支持併發的話,轉移所須要的時間就能夠縮短了,固然相應的併發處理控制邏輯也就更復雜了,擴容轉移經過 transfer 方法完成,Java8中該方法很長,感興趣的能夠看下源碼。。。
用一個圖來表示 Java8 ConcurrentHashMap的樣子
經過分析源碼對比了 HashMap 與 ConcurrentHashMap的差異,以及Java7和Java8上 ConcurrentHashMap 設計的不一樣,固然還有不少坑沒有填,好比其中調用了不少UNSAFE的CAS方法,能夠減小性能上的消耗,平時不多用,瞭解的比較少;以及紅黑樹的具體原理和實現,後續慢慢填。。。