- 若是有一個整型變量count,多個線程併發讓count自增1,你會怎麼設計?
- 你知道如何讓多個線程協做完成一件事件嗎?
前言
很高興碰見你~java
ConcurrentHashMap是個老生常談的集合類了,咱們都知道多線程環境下不能直接使用HashMap,而須要使用ConcurrentHashMap,但有沒有了解過ConcurrentHashMap究竟是如何實現線程安全的呢?他到底跟傳統的Hashtable和SynchronizeMap(沒聽過SynchronizeMap?他就是Collections.synchronizeMap方法返回的對象)到底好在哪?node
ConcurrentHashMap創建在HashMap的基礎上實現了線程安全,關於HashMap讀者能夠參考這篇文章:深刻剖析HashMap,從散列表的三大要素:哈希函數、哈希衝突、擴容方案、以及線程安全展開詳解HashMap的設計。關於HashMap的內容本文再也不贅述,讀者若對HashMap的底層設計不瞭解,必定要先去閱讀前面的文章。ConcurrentHashMap中蘊含的併發編程智慧是很是值得咱們學習的,正如文章開頭的兩個問題,你會如何解決呢?可能會直接上鎖或用更高性能的CAS,但ConcurrentHashMap給了咱們更不同的解決方案。git
本文的主要內容是講解ConcurrentHashMap中的併發設計,重點分析ConcurrentHashMap的四個方法源碼:putVal
、initTable
、addCount
、transfer
。分析每一個方法前會使用圖解介紹ConcurrentHashMap的核心思路。源碼中我加了很是詳細的註釋,有時間仍建議讀者閱讀完源碼,ConcurrentHashMap的併發智慧,都蘊含在源碼中。面試
那麼咱們開始吧~算法
CAS與自旋鎖
CAS是ConcurrentHashMap中的一個重點,也是ConcurrentHashMap提高性能的根基所在。在閱讀源碼中,能夠發現CAS無處不在。在介紹ConcurrentHashMap前,必須先介紹一下這兩個重點。編程
Java中的運算並非原子操做,如count++
可分爲:數組
- 獲取count副本count_
- 對count_進行自增
- 把count_賦值給count
若是在第一步以後,count被其餘的線程修改了,第三步的賦值會直接覆蓋掉其餘線程的修改。synchronize能夠解決這個問題,但上鎖爲重量級操做,嚴重影響性能,CAS是更好的解決方案。安全
CAS的思路並不複雜。仍是上面的例子:當咱們須要對變量count進行自增時,咱們能夠認爲沒有發生併發衝突,先存儲一個count副本,再對count進行自增,而後把副本和count自己進行比較,若是二者相同,則證實沒有發生併發衝突,修改count的值;若是不一樣,則說明count在咱們自增的過程當中被修改了,把上述整個過程從新來一遍,直到修改爲功爲止,以下圖:多線程
那,若是咱們在判斷count==count_以後,count被修改了怎麼辦?比較賦值的操做操做系統會保證的原子性,保證不會出現這種狀況。在java中常見的CAS方法有:架構
// 比較並替換 U.compareAndSwapInt(); U.compareAndSwapLong(); U.compareAndSwapObject();
在後續的源碼中,咱們會常常看到他們。經過這種思路,咱們不須要給count變量上鎖。但若是併發度太高,處理時間過長,則會致使某些線程一直在循環自旋,浪費cpu資源。
自旋鎖是利用CAS而設計的一種應用層面的鎖。以下代碼:
// 0表明鎖釋放,1表明鎖被某個線程拿走了 int lock = 0; while(true){ if(lock==0){ int lock_ ; if(U.compareAndSwapInt(this,lock_,0,1)){ ... // 獲取鎖後的邏輯處理 // 最後釋放鎖 lock = 0; break; } } }
上面就是很經典自旋鎖設計。判斷鎖是否被其餘線程擁有,若沒有則嘗試使用CAS得到鎖;前兩步失敗都會從新循環再次嘗試直到得到鎖。最後邏輯處理完成要令lock=0
來釋放鎖。衝突時間短的併發情景下這種方法能夠大大提高效率。
CAS和自旋鎖在ConcurrentHashMap應用地很是普遍,在源碼中咱們會常常看到他們的身影。同時這也是ConcurrentHashMap的設計核心所在。
ConcurrentHashMap的併發策略概述
Hashtable與SynchronizeMap採起的併發策略是對整個數組對象加鎖,致使性能及其低下。jdk1.7以前,ConcurrentHashMap採用的是鎖分段策略來優化性能,以下圖:
至關於把整個數組,拆分紅多個小數組。每次操做只須要鎖住操做的小數組便可,不一樣的segment之間不互相影響,提升了性能。jdk1.8以後,對整個策略進行了重構:鎖的不是segment,而是節點,以下圖:
鎖的粒度進一步被下降,併發的效率也提升了。jdk1.8作得優化不僅是細化鎖粒度,還帶來了CAS+synchronize的設計。那麼下面,咱們針對ConcurrentHashMap的常見方法:添加、刪除、擴容、初始化等進行詳解他的設計思路。
添加數據:putVal()
ConcurrentHashMap添加數據時,採起了CAS+synchronize結合策略。首先會判斷該節點是否爲null,若是爲null,嘗試使用CAS添加節點;若是添加失敗,說明發生了併發衝突,再對節點進行上鎖並插入數據。在併發較低的情景下無需加鎖,能夠顯著提升性能。同時只會CAS嘗試一次,也不會形成線程長時間等待浪費cpu時間的狀況。
ConcurrentHashMap的put方法總體流程以下(並非所有流程):
- 首先會判斷數組是否已經初始化,如若未初始化,會先去初始化數組;
- 若是當前要插入的節點爲null,嘗試使用CAS插入數據;
- 若是不爲null,則判斷節點hash值是否爲-1;-1表示數組正在擴容,會先去協助擴容,再回來繼續插入數據。(協助擴容後面會講)
- 最後會執行上鎖,並插入數據,最後判斷是否須要返回舊值;若是不是覆蓋舊值,須要更新map中的節點數,也就是圖中的addCount方法。
ConcurrentHashMap是基於HashMap改造的,其中的插入數據、hash算法和HashMap都大同小異,這裏再也不贅述。思路清晰以後,下面咱們看源碼分析:
final V putVal(K key, V value, boolean onlyIfAbsent) { // 不容許插入空值或空鍵 // 容許value空值會致使get方法返回null時有兩種狀況: // 1. 找不到對應的key2. 找到了可是value爲null; // 當get方法返回null時沒法判斷是哪一種狀況,在併發環境下containsKey方法已再也不可靠, // 須要返回null來表示查詢不到數據。容許key空值須要額外的邏輯處理,佔用了數組空間,且並無多大的實用價值。 // HashMap支持鍵和值爲null,但基於以上緣由,ConcurrentHashMap是不支持空鍵值。 if (key == null || value == null) throw new NullPointerException(); // 高低位異或擾動hashcode,和HashMap相似 // 但有一點點不一樣,後面會講,這裏能夠簡單認爲同樣的就能夠 int hash = spread(key.hashCode()); // bincount表示鏈表的節點數 int binCount = 0; // 嘗試多種方法循環處理,後續會有不少這種設計 for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; // 狀況一:若是數組爲空則進行初始化 if (tab == null || (n = tab.length) == 0) tab = initTable(); // 狀況二:目標下標對象爲null else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // 重點:採用CAS進行插入 if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null))) break; } // 狀況三:數組正在擴容,幫忙遷移數據到新的數組 // 同時會新數組,下次循環就是插入到新的數組 // 關於擴容的內容後面再講,這裏理解爲正在擴容便可 else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); // 狀況四:直接對節點進行加鎖,插入數據 // 下面代碼不少,但邏輯和HashMap插入數據大同小異 // 由於已經上鎖,不涉及併發安全設計 else { V oldVal = null; // 同步加鎖 synchronized (f) { // 重複檢查一下剛剛獲取的對象有沒有發生變化 if (tabAt(tab, i) == f) { // 鏈表處理狀況 if (fh >= 0) { binCount = 1; // 循環鏈表 for (Node<K,V> e = f;; ++binCount) { K ek; // 找到相同的則記錄舊值 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; // 判斷是否須要更新數值 if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; // 若未找到則插在鏈表尾 if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } // 紅黑樹處理狀況 else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } else if (f instanceof ReservationNode) throw new IllegalStateException("Recursive update"); } } // 判斷是否須要轉化爲紅黑樹,和返回舊數值 if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } // 總數+1;這是一個很是硬核的設計 // 這是ConcurrentHashMap設計中的一個重點,後面咱們詳細說 addCount(1L, binCount); return null; } // 這個方法和HashMap static final int spread(int h) { return (h ^ (h >>> 16)) & HASH_BITS; }
咱們注意到源碼中有兩個關鍵方法:初始化數組的initTable()
,修改map中節點總數的addCount
。這兩個方法是如何實現線程安全的呢,咱們繼續分析。
初始化數組:initTable()
初始化操做的重點是:保證多個線程併發調用此方法,只能有一個線程成功。ConcurrentHashMap採起了CAS+自旋的方法來解決併發問題,總體流程以下圖:
- 首先會判斷數組是否爲null,若是否說明另外一個線程初始化結束了,直接返回該數組;
- 第二步判斷是否正在初始化,若是是會讓出cpu執行時間,當前線程自旋等待
- 若是數組爲null,且沒有另外的線程正在初始化,那麼會嘗試獲取自旋鎖,獲取成功則進行初始化,獲取失敗則表示發生了併發衝突,繼續循環判斷。
ConcurrentHashMap並無直接採用上鎖的方式,而是採用CAS+自旋鎖的方式,提升了性能。自旋鎖保證了只有一個線程能真正初始化數組,同時又無需承擔synchronize的高昂代價,一箭雙鵰。在看源碼分析以前,咱們先來了解一下ConcurrentHashMap中一個關鍵的變量:sizeCtl 。
sizeCtl
默認爲0,在正常狀況下,他表示ConcurrentHashMap的閾值,是一個正數。當數組正在擴容時,他的值爲-1,表示當前正在初始化,其餘線程只須要判斷sizeCtl==-1
,就知道當前數組正在初始化。但當ConcurrentHashMap正在擴容時,sizeCtl是一個表示當前有多少個線程正在協助擴容的負數 ,咱們下面講到擴容時再分析。咱們直接來看initTable()
的源碼分析:
private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; // 這裏的循環是採用自旋的方式而不是上鎖來初始化 // 首先會判斷數組是否爲null或長度爲0 // 沒有在構造函數中進行初始化,主要是涉及到懶加載的問題 while ((tab = table) == null || tab.length == 0) { // sizeCtl是一個很是關鍵的變量; // 默認爲0,-1表示正在初始化,<-1表示有多少個線程正在幫助擴容,>0表示閾值 if ((sc = sizeCtl) < 0) Thread.yield(); // 讓出cpu執行時間 // 經過CAS設置sc爲-1,表示得到自選鎖 // 其餘線程則沒法進入初始化,進行自選等待 else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { // 重複檢查是否爲空 if ((tab = table) == null || tab.length == 0) { int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt; // 設置sc爲閾值,n>>>2表示1/4*n,也就至關於0.75n sc = n - (n >>> 2); } } finally { // 把sc賦值給sizeCtl sizeCtl = sc; } break; } } // 最後返回tab數組 return tab; }
下面咱們繼續看一下addCount()
方法如何實現併發安全。
修改節點總數:addCount()
addCount方法的目標很簡單,就是把ConcurrentHashMap的節點總數進行+1,也就是我在文章開頭提出的問題。ConcurrentHashMap的做者設計了一套很是嚴謹的架構來保證併發安全與高性能。
ConcurrentHashMap並非一個單獨的size變量,他把size進行拆分,以下圖:
這樣ConcurrentHashMap的節點數size就等於這些拆分開的size一、size2…的總和。這樣拆分有什麼好處呢?好處就是每一個線程能夠單獨修改對應的變量。以下圖:
兩個線程能夠同時進行自增操做,且徹底沒有任何的性能消耗,是否是一個很是神奇的思路?而當須要獲取節點總數時,只須要把所有加起來便可。在ConcurrentHashMap中每一個size被用一個CounterCell對象包裝着,CounterCell類很簡單:
static final class CounterCell { volatile long value; CounterCell(long x) { value = x; } }
僅僅只是對value值使用volatile關鍵字進行修飾。不知道volatile關鍵字?能夠參考這篇文章一文搞懂 | Java中volatile關鍵字,簡單來講就是保證當前線程對value的修改其餘線程立刻能夠知道。ConcurrentHashMap使用一個數組來存儲CounterCell,以下:
那麼每一個線程如何分配到對應的本身的CounterCell呢?ConcurrentHashMap中採用了相似HashMap的思路,獲取線程隨機數,再對這個隨機數進行取模獲得對應的CounterCell。獲取到對應的CounterCell以後,當前線程會嘗試使用CAS進行修改,若是修改失敗,則從新獲取線程隨機數,換一個CounterCell再來一次,直到修改爲功。
以上就是addCount方法的核心思路,但源碼的設計會複雜一點,還必須考慮CounterCell數組的初始化、CounterCell對象的建立、CounterCell數組的擴容。ConcurrentHashMap還保留了一個basecount,每一個線程會首先使用CAS嘗試修改basecount,若是修改失敗,纔會下發到counterCell數組中。總體的流程以下:
- 當前線程首先會使用CAS修改basecount的值,修改失敗則進入數組分配CounterCell修改;
- 判斷CounterCell數組是否爲空,
- 若是CounterCell數組爲空,則初始化數組
- 若是CounterCell數組不爲空,使用線程隨機數找到下標
- 若是該下標的的counterCell對象還沒初始化,則先建立一個CounterCell,這一步在圖中我沒有標出來。建立了CounterCell以後還須要考慮是否須要數組擴容
- 若是counterCell對象不爲null,使用CAS嘗試修改,失敗則從新來一次
- 若是上面兩種狀況都不知足,則會回去再嘗試CAS修改一下basecount
看起來好像挺複雜,但只要抓住size變量分割成多個CounterCell這個核心概念便可,其餘的步驟都是細節完善。咱們能夠看到整個思路徹底沒有提到synchronize加鎖,ConcurrentHashMap的做者採用CAS+自旋鎖代替了synchronize,這使得在高併發狀況下提高了很是大的性能。思路清晰以後,咱們看源碼也就簡單一些了。那接下來就 read the fucking code:
private final void addCount(long x, int check) { CounterCell[] as; long b, s; // 若是數組不爲空 或者 數組爲空且直接更新basecount失敗 if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { CounterCell a; long v; int m; // 表示沒發生競爭 boolean uncontended = true; // 這裏有如下狀況會進入fullAddCount方法: // 1. 數組爲null且直接修改basecount失敗 // 2. hash後的數組下標CounterCell對象爲null // 3. CAS修改CounterCell對象失敗 if (as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { // 該方法保證完成更新,重點方法!! fullAddCount(x, uncontended); return; } // 若是長度<=1不須要擴容(說實話我以爲這裏有點奇怪) if (check <= 1) return; s = sumCount(); } if (check >= 0) { // 擴容相關邏輯,下面再講 } }
前面源碼嘗試直接修改basecount失敗後,就會進入fullAddCount方法:
private final void fullAddCount(long x, boolean wasUncontended) { int h; // 若是當前線程隨機數爲0,強制初始化一個線程隨機數 // 這個隨機數的做用就相似於hashcode,不過他不須要被查找 // 下面每次循環都從新獲取一個隨機數,不會讓線程都堵在同一個地方 if ((h = ThreadLocalRandom.getProbe()) == 0) { ThreadLocalRandom.localInit(); h = ThreadLocalRandom.getProbe(); // wasUncontended表示沒有競爭 // 若是爲false表示以前CAS修改CounterCell失敗,須要從新獲取線程隨機數 wasUncontended = true; } // 直譯爲碰撞,若是他爲true,則表示須要進行擴容 boolean collide = false; // 下面分爲三種大的狀況: // 1. 數組不爲null,對應的子狀況爲CAS更新CounterCell失敗或者countCell對象爲null // 2. 數組爲null,表示以前CAS更新baseCount失敗,須要初始化數組 // 3. 第二步獲取不到鎖,再次嘗試CAS更新baseCount for (;;) { CounterCell[] as; CounterCell a; int n; long v; // 第一種狀況:數組不爲null if ((as = counterCells) != null && (n = as.length) > 0) { // 對應下標的CounterCell爲null的狀況 if ((a = as[(n - 1) & h]) == null) { // 判斷當前鎖是否被佔用 // cellsBusy是一個自旋鎖,0表示沒被佔用 if (cellsBusy == 0) { // 建立CounterCell對象 CounterCell r = new CounterCell(x); // 嘗試獲取鎖來添加一個新的CounterCell對象 if (cellsBusy == 0 && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { boolean created = false; try { CounterCell[] rs; int m, j; // recheck一次是否爲null if ((rs = counterCells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { rs[j] = r; // created=true表示建立成功 created = true; } } finally { // 釋放鎖 cellsBusy = 0; } // 建立成功也就是+1成功,直接返回 if (created) break; // 拿到鎖後發現已經有別的線程插入數據了 // 繼續循環,重來一次 continue; } } // 到達這裏說明想建立一個對象,可是鎖被佔用 collide = false; } // 以前直接CAS改變CounterCell失敗,從新獲取線程隨機數,再循環一次 else if (!wasUncontended) // CAS already known to fail wasUncontended = true; // Continue after rehash // 嘗試對CounterCell進行CAS else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)) break; // 若是發生過擴容或者長度已經達到虛擬機最大能夠核心數,直接認爲無碰撞 // 由於已經沒法再擴容了 // 因此併發線程數的理論最高值就是NCPU else if (counterCells != as || n >= NCPU) collide = false; // At max size or stale // 若是上面都是false,說明發生了衝突,須要進行擴容 else if (!collide) collide = true; // 獲取自旋鎖,並進行擴容 else if (cellsBusy == 0 && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { try { if (counterCells == as) { // Expand table unless stale // 擴大數組爲原來的2倍 CounterCell[] rs = new CounterCell[n << 1]; for (int i = 0; i < n; ++i) rs[i] = as[i]; counterCells = rs; } } finally { // 釋放鎖 cellsBusy = 0; } collide = false; // 繼續循環 continue; } // 這一步是從新hash,找下一個CounterCell對象 // 上面每一步失敗都會來到這裏獲取一個新的隨機數 h = ThreadLocalRandom.advanceProbe(h); } // 第二種狀況:數組爲null,嘗試獲取鎖來初始化數組 else if (cellsBusy == 0 && counterCells == as && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { boolean init = false; try { // recheck判斷數組是否爲null if (counterCells == as) { // 初始化數組 CounterCell[] rs = new CounterCell[2]; rs[h & 1] = new CounterCell(x); counterCells = rs; init = true; } } finally { // 釋放鎖 cellsBusy = 0; } // 若是初始化完成,直接跳出循環, // 由於初始化過程當中也包括了新建CounterCell對象 if (init) break; } // 第三種狀況:數組爲null,可是拿不到鎖,意味着別的線程在新建數組,嘗試直接更新baseCount else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x)) // 更新成功直接返回 break; } }
源碼的總體思路跟咱們前面講的是差很少的,細節上使用了不少的CAS+自旋鎖來保證線程安全。上面的註釋很是詳細,這裏就再也不贅述。當初閱讀源碼看到這裏,不得不佩服ConcurrentHashMap做者,咱們可能以爲一個CAS+synchronize就解決了,可是他卻想出了多線程同時更新的思路,配合CAS和自旋鎖,在高併發環境下極大提升了性能。
若是說把一個變量拆分紅多個子變量,利用多線程協做是一個很神奇的思路,那麼多個線程同時協做完成擴容操做會不會更加神奇?ConcurrentHashMap不只避開了併發的性能消耗,甚至利用上了併發的優點,多個線程一塊兒幫忙完成一件事。那接下來就來看看ConcurrentHashMap的擴容方案。
擴容方案:transfer()
在講擴容以前,須要補充兩個知識點:siezeCtl和ForwardingNode。
sizeCtl在前面提到過,默認值爲0,通常狀況下表示ConcurrentHashMap的閾值,數組初始化時值爲-1,當數組擴容時,表示爲參與擴容的線程數。ConcurrentHashMap在擴容時把sizeCtl設置爲一個很小的負數,並記住這個負數。線程參與擴容,該負數+1,線程退出該負數-1,這樣就能夠記住線程數了。一個變量維護四個狀態,再次佩服ConcurrentHashMap的做者。
那這個負數設置爲多少呢?有一個算法。看擴容時sizeCtl的初始化代碼:
int rs = resizeStamp(n);// 這裏n表示數組的長度 sizeCtl = rs << RESIZE_STAMP_SHIFT +2 ; // RESIZE_STAMP_SHIFT是一個常量,值爲16 static final int resizeStamp(int n) { return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1)); }
咱們一點一點來看這個算法。
Integer.numberOfLeadingZeros(n)
這個方法表示獲取n最高位1前面0的數目,如8的32位二進制爲00000000 0000000 00000000 00001000
。那麼返回就是28,前面有28個0。RESIZE_STAMP_BITS-1
值爲15,1<<RESIZE_STAMP_BITS-1
的結果就是00000000 00000000 10000000 00000000
。- 假設n=8,那麼
Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1))
的結果就是0000000 0000000 10000000 00011100
,這個數字就稱之爲檢驗碼,記爲rs。 - 最後執行
rs << RESIZE_STAMP_SHIFT +2
獲得sizeCtl的最終值:10000000 000111000 000000000 00000010
咱們會發現擴容時,高16位是校驗碼,低16位表示線程數,初始化時會+2,後續有新的線程加入會+1。那校驗碼有什麼用?當咱們須要判斷當前數組是否正在擴容時,只須要判斷sizeCtl>>>RESIZE_STAMP_BITS == rs
就能夠知道當前是否在擴容了。
而後再來看看ForwardingNode
。看名字就知道他是一個節點類,他的做用是標記當前節點已經遷移完成。以下圖:
ConcurrentHashMap會從後往前遍歷並遷移,已經遷移完成的節點會被賦值爲ForwardingNode,表示該節點下的全部數據已經遷移完成。ForwardingNode和普通的節點類似,但他的hash值爲MOVED
,也就是-1。還記得前面putVal嗎?在插入的時候會判斷當前節點是不是ForwardingNode,若是是則先幫忙遷移;不然若是正在擴容,說明擴容工做還沒到達當前下標,那麼能夠直接插入。
瞭解完sizeCtl和ForwardingNode,那麼就來看看ConcurrentHashMap的擴容方案。ConcurrentHashMap的擴容是多個線程協同工做的,提升了效率,以下圖:
ConcurrentHashMap把整個數組進行分段,每一個線程負責一段。bound表示該線程範圍的下限,i表示當前正在遷移的下標。每個遷移完成的節點都會被賦值ForwardingNode,表示遷移完成。stride表示線程遷移的「步幅」,當線程完成範圍內的任務後,就會繼續往前看看還有沒有須要遷移的,transferIndex就是記錄下個須要遷移的下標;當transferIndex==0時則表示不須要幫忙了。這就是ConcurrentHashMap擴容方案的核心思路了 。保證線程安全的思路和前面介紹的方法大同小異,都是經過 CAS+自旋鎖+synchronize來實現的。
另外ConcurrentHashMap遷移鏈表與二叉樹的思路與HashMap略有不一樣,這裏就不展開講了,瞭解了HashMap看ConcurrentHashMap的源碼很容易理解他的思路,也是大同小異。擴容方案就不打算畫總體流程圖了,只要瞭解核心思路,其餘都是細節的邏輯控制。咱們直接來看源碼分析。
首先要看到addCount方法,這個方法咱們前面介紹過他自增的邏輯,可是下半部分擴容的邏輯咱們沒有介紹,如今來看一下:
private final void addCount(long x, int check) { ... // 總數+1邏輯 // 這部分的邏輯主要是判斷是否須要擴容 // 同時保證只有一個線程可以建立新的數組 // 其餘的線程只能輔助遷移數據 if (check >= 0) { Node<K,V>[] tab, nt; int n, sc; // 當長度達到閾值且長度並未達到最大值時進行下一步擴容 while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { // 這個數配合後續的sizeCtr計算 // 他的格式是第16位確定爲1,低15位表示n前面連續的0個數,咱們前面介紹過 int rs = resizeStamp(n); // 小於0表示正在擴容或者正在初始化,不然進入下一步搶佔鎖進行建立新數組 if (sc < 0) { // 若是正在遷移右移16位後必定等於rs // ( sc == rs + 1 ||sc == rs + MAX_RESIZERS)這兩個條件我認爲不可能爲true // 有興趣能夠點擊下方網站查看 // https://bugs.java.com/bugdatabase/view_bug.do?bug_id=JDK-8214427 // nextTable==null說明下個數組還未建立 // transferIndex<=0說明遷移已經夠完成了 // 符合以上狀況的從新循環自旋 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; // 幫忙遷移,sc+1 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } // 搶佔鎖進行擴容 // 對rs檢驗碼進行左移16位再+2,這部分咱們在上面介紹過 else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) // 搶佔自旋鎖成功,進行擴容 transfer(tab, null); // 更新節點總數,繼續循環 s = sumCount(); } } }
上面的方法重點是利用sizeCtl充當自旋鎖,保證只有一個現場能建立新的數組,而其餘的線程只能協助遷移數組。那麼下面的方法就是擴容方案的重點方法:
// 這裏的兩個參數:tab表示舊數組,nextTab表示新數組 // 建立新數組的線程nextTab==null,其餘的線程nextTab等於第一個線程建立的數組 private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { int n = tab.length, stride; // stride表示每次前進的步幅,最低是16 if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range // 若是新的數組還未建立,則建立新數組 // 只有一個線程能進行建立數組 if (nextTab == null) { try { @SuppressWarnings("unchecked") // 擴展爲原數組的兩倍 Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; nextTab = nt; } catch (Throwable ex) { // 擴容失敗出現OOM,直接把閾值改爲最大值 sizeCtl = Integer.MAX_VALUE; return; } // 更改concurrentHashMap的內部變量nextTable nextTable = nextTab; // 遷移的起始值爲數組長度 transferIndex = n; } int nextn = nextTab.length; // 標誌節點,每一個遷移完成的數組下標都會設置爲這個節點 ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); // advance表示當前線程是否要前進 // finish表示遷移是否結束 // 官方的註釋表示在賦值爲true以前,必須再從新掃描一次確保遷移完成,後面會講到 boolean advance = true; boolean finishing = false; // to ensure sweep before committing nextTab // i表示當前線程遷移數據的下標,bound表示下限,從後往前遷移 for (int i = 0, bound = 0;;) { Node<K,V> f; int fh; // 這個循環主要是判斷是否須要前進,若是須要則CAS更改下個bound和i while (advance) { int nextIndex, nextBound; // 若是還未到達下限或者已經結束了,advance=false if (--i >= bound || finishing) advance = false; // 每一輪循環更新transferIndex的下標 // 若是下一個下標是0,表示已經無需繼續前進 else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } // 利用CAS更改bound和i繼續前進遷移數據 else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } } // i已經達到邊界,說明當前線程的任務已經完成,無需繼續前進 // 若是是第一個線程須要更新table引用 // 協助的線程須要將sizeCtl減一再退出 if (i < 0 || i >= n || i + n >= nextn) { int sc; // 若是已經更新完成,則更新table引用 if (finishing) { nextTable = null; table = nextTab; // 同時更新sizeCtl爲閾值 sizeCtl = (n << 1) - (n >>> 1); return; } // 線程完成本身的遷移任務,將sizeCtl減一 if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { // 這裏sc-2不等於校驗碼,說明此線程不是最後一個線程,還有其餘線程正在擴容 // 那麼就直接返回,他任務已經完成了 // 最後一個線程須要從新把整個數組再掃描一次,看看有沒有遺留的 if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; // finish設置爲true表示已經完成 // 這裏把i設置爲n,從新把整個數組掃描一次 finishing = advance = true; i = n; // recheck before commit } } // 若是當前節點爲null,表示遷移完成,設置爲標誌節點 else if ((f = tabAt(tab, i)) == null) // 這裏的設置有可能會失敗,因此不能直接設置advance爲true,須要再循環 advance = casTabAt(tab, i, null, fwd); // 當前節點是ForwardingNode,表示遷移完成,繼續前進 else if ((fh = f.hash) == MOVED) advance = true; // already processed else { // 給頭節點加鎖,進行遷移 // 加鎖後下面的內容就不涉及併發控制細節了,就是純粹的數據遷移 // 思路和HashMap差很少,但也有一些不一樣,多了一個lastRun // 讀者能夠閱讀一下下面源碼,這部分比較容易理解 synchronized (f) { // 上鎖以後再判斷一次看該節點是否仍是原來那個節點 // 若是不是則從新循環 if (tabAt(tab, i) == f) { Node<K,V> ln, hn; // hash值大於等於0表示該節點是普通鏈表節點 if (fh >= 0) { int runBit = fh & n; Node<K,V> lastRun = f; // ConcurrentHashMap並非直接把整個鏈表分爲兩個 // 而是先把尾部遷移到相同位置的一段先拿出來 // 例如該節點遷移後的位置可能爲 1或5 ,而鏈表的狀況是: // 1 -> 5 -> 1 -> 5 -> 5 -> 5 // 那麼concurrentHashMap會先把最後的三個5拿出來,lastRun指針指向倒數第三個5 for (Node<K,V> p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } // 判斷尾部總體遷移到哪一個位置 if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; // 這個node節點是改造過的 // 至關於使用頭插法插入到鏈表中 // 這裏的頭插法不須擔憂鏈表環,由於已經加鎖了 if ((ph & n) == 0) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } // 鏈表構造完成,把鏈表賦值給數組 setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); // 設置標誌對象,表示遷移完成 setTabAt(tab, i, fwd); advance = true; } // 樹節點的處理,和鏈表思路相同,不過他沒有lastRun,直接分爲兩個鏈表,採用尾插法 else if (f instanceof TreeBin) { TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null, loTail = null; TreeNode<K,V> hi = null, hiTail = null; int lc = 0, hc = 0; for (Node<K,V> e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } } } } } }
擴容是一個相對重量級的操做,他須要建立一個新的數組再把原來的節點一個個搬過去,在高併發環境下,若是直接對整個表上鎖,會有不少線程被阻塞。而ConcurrentHashMap的設計使得多個線程可協同完成擴容操做,甚至擴容的同時還能夠進行數據的讀取與插入,極大提升了效率。和前面的拆分size變量有殊途同歸之妙:利用多線程協同工做來提升效率 。
關於擴容還有另一個方法:helpTransfer
。顧名思義,就是幫忙擴容,在putVal方法中,遇到ForwardingNode對象會調用此方法。看完前面的源碼,這部分的源碼就簡單多了,no bb,show the code:
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) { Node<K,V>[] nextTab; int sc; // 判斷當前節點爲ForwardingNode,且已經建立新的數組 if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) { int rs = resizeStamp(tab.length); // sizeCtl<0表示還在擴容 while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) { // 校驗是否已經擴容完成或者已經推動到0,則不須要幫忙擴容 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0) break; // 嘗試讓讓sc+1並幫忙擴容 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { transfer(tab, nextTab); break; } } // 返回擴容以後的數組 return nextTab; } // 若數組還沒有初始化或節點非ForwardingNode,返回原數組 return table; }
到此擴容方案的源碼就分析完畢了。擴容方案的思路雖然簡單,可是須要有大量的邏輯控制來保證線程安全,因此源碼量也很是多。關於ConcurrentHashMap的核心方法已經都分析完畢了,其餘的如remove
、replace
等思路都和上面講過的大同小異,讀者可自行閱讀源碼。
最後
到這裏,關於concurrentHashMap的內容就基本講完了。之後跟面試官吹水,就不僅是一句ConcurrentHashMap是安全的就沒有下文了。ConcurrentHashMap優秀的CAS+自旋鎖+synchronize併發設計,是整個框架的重點所在。
看完ConcurrentHashMap的源碼有什麼用?固然是面試要問啊!《java編程思想》中提到,對於併發問題,若是不是專家,老老實實上個鎖,不要整這些花裏胡哨的。從ConcurrentHashMap的源碼咱們能夠得知併發的問題,遠遠沒有咱們想的那麼簡單,他是一個很是複雜的問題。學習ConcurrentHashMap,也並非要學他寫同樣的代碼,除了面試,我想更重要的一點是感覺編程的智慧。ConcurrentHashMap做者神奇的設計、嚴謹的代碼,讓咱們得以擁有在併發環境下安全且高性能的ConcurrentHashMap可使用。他的思想是,若是能在實際實踐中運用到一點點,都是莫大的收穫了。
如今,文章開頭的兩個問題,有答案了嗎?
但願文章對你有幫助~
全文到此,原創不易,以爲有幫助能夠點贊收藏評論轉發。
筆者才疏學淺,有任何想法歡迎評論區交流指正。
如需轉載請評論區或私信交流。另外歡迎光臨筆者的我的博客:傳送門