這一篇咱們來講map系列的最後一個--ConcurrentHashMap。jdk1.7與jdk1.8中此類的實現有很大差別,因爲筆者使用jdk1.8,因此如下內容均爲jdk1.8版本。html
熟悉的套路,再來一次!java
ConcurrentHashMap 支持檢索的徹底併發和更新的高預期併發性。換句話說,ConcurrentHashMap是同步容器。即便全部操做都是線程安全的,可是檢索操做不須要鎖,而且不支持以阻止全部訪問操做的方式鎖住全表。因此說ConcurrentHashMap性能是很是高的。數組
檢索操做包括get通常來講不會阻塞,因此可能會與一些更新操做重疊。檢索操做的結果反映了最近完成的更新操做,換句話說,對一個給定key的更新操做happen-before 對這個key的非null檢索操做。可是對於聚合操做,好比putAll以及clear,併發的檢索可能只會表現出一部分元素的插入或者刪除。一樣,Iterators、Spliterators、Enumerations返回的元素反映了哈希表某一刻或者說iterator以及enumeration建立的時刻的狀態。而且不會拋出ConcurrentModificationException。iterators被設計爲一次只能由一個線程使用。請記住,包括size、isEmpty以及containsValue這樣的聚合狀態方法的結果只在map並無在其餘線程中被同步更新的狀況下有用,不然,這些結果可能反映了足夠用來監視或者估計目的的瞬態,可是不能用來做爲程序控制。安全
在任何狀況下,可以估計ConcurrentHashMap中將要存放鍵值對的數量並在構造函數中將這個值體如今initialCapacity中的作法可以避免因爲擴容帶來的性能的下降。多線程
ConcurrentHashMap能夠用使用java.util.concurrent.atomic.LongAdder來作頻率記錄,好比在ConcurrentHashMap<String,LongAdder> freqs中增長一個數可使用freqs.computeIfAbsent(Key,k -> new LongAdder()).increment();併發
像Hashtable但不像HashMap,它不容許null值用來作key或者value。app
ConcurrentHashMaps支持一組順序和並行批量操做,與大多數 Stream方法不一樣,它們被設計爲安全且一般合理地應用,即便是由其餘線程同時更新的映射;這些批處理操做容許一個parallelismThreshold參數來決定是否進行並行進行操做,好比使用Long.MAX_VALUE能夠抑制全部的並行性,使用1的話能夠充分利用用於並行計算的ForkJoinPoll的commonPool;。一般來講,在實際使用中,咱們經過這二者在其之間找尋一個最佳性能的值。dom
有人說java.util.concurrent的實現徹底依賴於CAS,那啥是CAS? CAS(Compare and Swap)比較與交換,是一個樂觀鎖,採用自旋的方式去更新值,能高效的完成原子操做。CAS有3個操做數,內存值V,舊的預期值A,要修改的新值B。當且僅當預期值A和內存值V相同時,將內存值V修改成B,不然什麼都不作。函數
ABA問題。由於CAS須要在操做值的時候檢查下值有沒有發生變化,若是沒有發生變化則更新,可是若是一個值原來是A,變成了B,又變成了A,那麼使用CAS進行檢查時會發現它的值沒有發生變化,可是實際上卻變化了。ABA問題的解決思路就是使用版本號。在變量前面追加上版本號,每次變量更新的時候把版本號加一,那麼A-B-A 就會變成1A-2B-3A。 從Java1.5開始JDK的atomic包裏提供了一個類AtomicStampedReference來解決ABA問題。這個類的compareAndSet方法做用是首先檢查當前引用是否等於預期引用,而且當前標誌是否等於預期標誌,若是所有相等,則以原子方式將該引用和該標誌的值設置爲給定的更新值。 關於ABA問題參考文檔: blog.hesey.net/2011/09/res…性能
循環時間長開銷大。自旋CAS若是長時間不成功,會給CPU帶來很是大的執行開銷。若是JVM能支持處理器提供的pause指令那麼效率會有必定的提高,pause指令有兩個做用,第一它能夠延遲流水線執行指令(de-pipeline),使CPU不會消耗過多的執行資源,延遲的時間取決於具體實現的版本,在一些處理器上延遲時間是零。第二它能夠避免在退出循環的時候因內存順序衝突(memory order violation)而引發CPU流水線被清空(CPU pipeline flush),從而提升CPU的執行效率。
只能保證一個共享變量的原子操做。當對一個共享變量執行操做時,咱們可使用循環CAS的方式來保證原子操做,可是對多個共享變量操做時,循環CAS就沒法保證操做的原子性,這個時候就能夠用鎖,或者有一個取巧的辦法,就是把多個共享變量合併成一個共享變量來操做。好比有兩個共享變量i=2,j=a,合併一下ij=2a,而後用CAS來操做ij。從Java1.5開始JDK提供了AtomicReference類來保證引用對象之間的原子性,你能夠把多個變量放在一個對象裏來進行CAS操做。
【參考連接】www.jianshu.com/p/450925729…
其實單看ConcurrentHashMap的存儲結構來講,跟HashMap的很像能夠說同樣,都是數組+鏈表或者數組+紅黑樹的方式。因此不作多解釋。
咱們以putVal爲例查看,看一下源碼:
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
複製代碼
前面咱們說到了擴容,也提到了擴容的話,ConcurrentHashMap是支持多線程的。那咱們具體來看一下: 在putVal操做中是從下面這個方法中進入擴容機制的。
咱們看一下addCount的源碼:
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}
複製代碼
看到用了不少的私有屬性,咱們看看這些屬性表明啥意思: 撿着重要的說,
如今咱們開始講講addCount的流程:
addCount中有兩個大if分支,第一個if分支做用是將增長的元素數量增長到baseCount上,若是CAS失敗則添加到counterCells上。
第二個分支則是擴容的主要步驟,固然只有check>=0的時候進行擴容判斷。
在第二個判斷中首先判斷當前元素數量是否已經超出sizeCtl而且table的值不爲null且table的長度不超過默認最大的容量。若是是則進行下面的擴容判斷。
以後進行sc判斷,小於0的話說明正在進行擴容,則判斷是否擴容完成,若是完成的話則break出去結束,沒有的話則調用transfer方法幫助擴容。並使用CAS更新正在擴容的線程數。
若是sc>0說明本身是第一個發起擴容的線程,則調用transfer進行擴容。
ConcurrentHashMap 是同步容器,採用CAS+synchronized方式保證線程安全。與java1.7不能1.7中採用分段鎖的方式。
ConcurrentHashMap擴容能夠多線程進行協助擴容。
存儲結構也是數組+鏈表以及數組+紅黑樹
擴容時默認增加爲原來的二倍
擴容時同HashMap同樣,也會將一個鏈表上的元素分紅兩個鏈表並插入到新數組的這個索引處以及(這個索引+舊數組的長度)索引處。