簡介:html
本文主要介紹Java8中的併發容器ConcurrentHashMap的工做原理,和其它文章不一樣的是,本文重點分析了不一樣線程的各種併發操做如get,put,remove之間是如何同步的,以及這些操做和擴容操做之間同步可能出現的各類狀況。因爲源代碼的分析確定會有所紕漏,但願你們積極指出錯誤。java
歡迎探討,若有錯誤敬請指正 node
如需轉載,請註明出處 http://www.cnblogs.com/nullzx/數組
圖片來源(http://www.importnew.com/28263.html)安全
咱們將數組稱之爲表,將數組中每一個鏈表或紅黑樹稱之爲桶,將數組中的每一個結點稱之爲槽,也就是說「槽」存儲了鏈表的頭結點或者紅黑樹的根結點。源代碼中用內部類Node表示鏈表中的每一個結點。多線程
static class Node<K,V> implements Map.Entry<K,V> { final int hash; final K key; volatile V val; volatile Node<K,V> next; //…… //省略其它代碼 //…… Node<K,V> find(int h, Object k) { Node<K,V> e = this; if (k != null) { do { K ek; if (e.hash == h && ((ek = e.key) == k || (ek != null && k.equals(ek)))) return e; } while ((e = e.next) != null); } return null; } }
每一個結點都有一個hash屬性它表示了Node對象的哈希值,這個哈希值其實是key.hashCode()通過spread函數進一步散列後的值(後面的內容有spread函數源代碼)。特別須要注意的是val和next屬性都是用volatile修飾的。併發
而有關紅黑樹的內容不在本文的討論範圍以內,有興趣的同窗能夠參考個人另外三篇有關紅黑樹的技術博客。ide
構造函數
public ConcurrentHashMap(int initialCapacity, float loadFactor)
2.1 表的長度
實際上表的長度必須爲2的整數次冪。該類內部會用大於等於initialCapacity的最小2的整數次冪做爲長度。假設你構造ConcurrentHashMap對象時傳遞的initialCapacity的值是21,那麼實際上表的長度是32。通常教科書上設計哈希表時,會將表的長度設置爲較大的質數,而這裏將表的長度設置成2的整數次冪,我認爲有如下兩點緣由:
1)在教科書中咱們是經過
(Node對象的hash屬性值)%表長度
來定位槽的位置。這樣作的前提是咱們假設求餘運算是很快就能夠完成的,但實際上CPU可能須要不少條指令才能實現求餘操做。若是槽的長度正好的2的整數次冪,那麼咱們就能夠經過下面的方式計算槽的位置 ,這和上面的計算方式等價,但位與運算明顯要快於求餘運算。
(Node對象的hash屬性值)&(表長度-1)
2)在多線程擴容的時,這樣的長度設置能夠避免在擴容時對新表加鎖,從而加快ConcurrentHashMap的擴容速度。關於擴容的細節問題,後面會進行講述。
2.2 負載因子的含義
默認負載因子爲0.75。咱們假設表的長度爲100(固然,實際上不多是這個值,這裏只是爲了方便分析)。那麼咱們最多存儲75個結點就要擴容(注意並非佔用75個槽之後纔會擴容)。因此負載因子是對查詢效率和存儲空間平衡關係的表示。
static final int HASH_BITS = 0x7fffffff; static final int spread(int h) { return (h ^ (h >>> 16)) & HASH_BITS; }
經過key肯定槽的位置時,若是咱們直接使用
key.hashCode() &(表長度-1)
若是經過上述方法,仍然還存在較多的key衝突,那麼就會致使同一個槽中彙集了較多結點,Java8中就會將這個長的鏈表轉化爲一顆以key表示大小的紅黑樹,以減小查詢時間。默認狀況下鏈表長度大於8就會被轉化成紅黑樹。
這裏咱們先不考慮併發問題,先說說基本的擴容操做,當put操做完成後,都要統計當前ConcurrentHashMap中結點的個數(顯然結點個數不是一個準確值,只能是一個估計值)。若是結點個數大於設定的閥值(表的長度*負載因子),就要進行擴容操做,以提升查詢效率。
前面咱們說過表的長度是2的整數次冪,擴容時咱們讓表的長度翻倍,因此擴容後的新表長度也必然是2的整數次冪。咱們這裏假設舊錶的長度是8(實際上代碼中表的最小長度也是16,這樣假設是爲了畫圖方便),圖中的數字表示結點的hash值。
從圖中咱們能夠看出,擴容後表的長度變成了16。咱們如今要對比觀察擴容先後每一個結點的位置,顯然能夠獲得一個有意思的結論:每一個結點在擴容後要麼留在了新表原來的位置上,要麼去了新表 「原位置+8」的位置上,而8就是舊錶的長度。好比擴容前3號槽有[3,11,19]結點,擴容後[3,19]結點依然留在了原3號位置,而節點[11]去了「原位置3 + 8 = 11」的位置。計算新表中槽的位置有很巧妙的方法,有興趣的同窗能夠參照transfer函數的源代碼。
擴容長度翻倍並,且擴容後長度仍然是2的整數次冪的特性在多線程擴容有很大的優點。原表中不一樣桶上的結點,在新表上必定不會分配到相同位置的槽上。咱們可讓不一樣線程負責原表不一樣位置的桶中全部結點的遷移,這樣兩個線程的遷移操做是不會相互干擾的。
好比咱們可讓一個線程負責原表中3號桶中全部結點的遷移,另外一個線程負責原表中4號桶全部結點的遷移。原表中3號位置上的結點只能遷移到新表3號位置或11號位置上,絕對不會映射到其它位置上。而4號位置上的結點只能遷移到新表4號位置或12號位置上,因此在遷移結點的過程當中,兩個線程就沒必要在新表的對應槽上加鎖了。
5.1 get方法
public V get(Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; int h = spread(key.hashCode()); if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } //說明這個桶遷移已完成 或者 槽中是紅黑樹的根 else if (eh < 0) return (p = e.find(h, key)) != null ? p.val : null; while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; }
經過源代碼發現,整個get操做都沒有加鎖,也沒有用 CAS操做,那麼get方法是怎麼保證線程安全的呢?如今先不回答這個問題,不過咱們應該注意get方法中頭結點hash值小於0的狀況(即eh < 0)的狀況,結合後面的擴容操做進行解釋。
5.2 put方法
public V put(K key, V value) { return putVal(key, value, false); }
put方法實際上調用了putVal方法
final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); int hash = spread(key.hashCode()); int binCount = 0; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0) tab = initTable(); //每次循環都會從新計算槽的位置,由於可能恰好完成擴容操做 //擴容完成後會使用新表,槽的位置可能會發生改變 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //槽爲空,先嚐試用CAS方式進行添加結點 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin } //當前線程先幫助遷移,遷移完成後在新表中進行put else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null; //加鎖操做,防止其它線程對此桶同時進行put,remove,transfer操做 synchronized (f) { //頭結點發生改變,就說明當前鏈表(或紅黑樹)的頭節點已不是f了 //可能被前面的線程remove掉了或者遷移到新表上了 //若是被remove掉了,須要從新對鏈表新的頭節點加鎖 if (tabAt(tab, i) == f) { //ForwordingNode的hash值爲-1 //鏈表結點的hash值 >= 0 if (fh >= 0) { binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; //若是遍歷到了最後一個結點, //那麼就說明須要插入新的結點,就把新結點插入在鏈表尾部 if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } //紅黑樹的根結點的hash值爲-2 else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } //內部判斷是否須要擴容 addCount(1L, binCount); return null; }
put方法作了如下幾點事情:
1)若是沒有初始化就先調用initTable()方法來進行初始化過程2)若是沒有hash衝突就嘗試CAS方式插入
3)若是還在進行擴容操做就先幫助其它線程進一塊兒行擴容
4)若是存在hash衝突,就加鎖來保證put操做的線程安全。
有意思的是,ConcurrentHashMap中並無使用ReentrantLock,而是直接使用了synchronized關鍵字對槽加鎖。我的猜想,這樣作的緣由是避免建立過多的鎖對象。若是桶的長度是1024(別問我爲啥是這個值,我只是考慮到了它是2的整數次冪,若是你聯想到了其它不宜公開討論的內容,請告訴我地址),那麼咱們就須要在每一個桶的位置上分配一把鎖,也就要1024把鎖,考慮到每次擴容後都還要從新建立全部的鎖對象,這顯然是不划算的。
添加結點操做完成後會調用addCount方法,在addCount方法中會去判斷是否須要擴容操做。若是容量超過閥值了,就由這個線程發起擴容操做。若是已經處於擴容狀態(sizeCtl < -1),根據剩餘遷移的數據和已參加到擴容中的線程數來判斷是否須要當前線程來幫助擴容。
5.3 remove方法
public V remove(Object key) { return replaceNode(key, null, null); }
實際上調用了replaceNode方法
final V replaceNode(Object key, V value, Object cv) { int hash = spread(key.hashCode()); for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0 || /*每次循環都會從新計算槽的位置,由於在擴容完成後會使用新表 槽的位置可能會發生改變*/ (f = tabAt(tab, i = (n - 1) & hash)) == null) break; //若是有線程正在擴容,先幫助它一塊兒擴容,而後在新表中進行put操做 else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null; boolean validated = false; //加鎖操做,防止其它線程對此桶同時進行put,remove,transfer操做 synchronized (f) { //頭結點發生改變,就說明當前鏈表(或紅黑樹)的頭節點已不是f了 //可能被前面的線程remove掉了或者遷移到新表上了 //若是被remove掉了,須要從新對鏈表新的頭節點加鎖 if (tabAt(tab, i) == f) { if (fh >= 0) { validated = true; for (Node<K,V> e = f, pred = null;;) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { V ev = e.val; if (cv == null || cv == ev || (ev != null && cv.equals(ev))) { oldVal = ev; if (value != null) e.val = value; else if (pred != null) pred.next = e.next; else setTabAt(tab, i, e.next); } break; } pred = e; if ((e = e.next) == null) break; } } else if (f instanceof TreeBin) { validated = true; TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> r, p; if ((r = t.root) != null && (p = r.findTreeNode(hash, key, null)) != null) { V pv = p.val; if (cv == null || cv == pv || (pv != null && cv.equals(pv))) { oldVal = pv; if (value != null) p.val = value; else if (t.removeTreeNode(p)) setTabAt(tab, i, untreeify(t.first)); } } } } } if (validated) { if (oldVal != null) { if (value == null) addCount(-1L, -1); return oldVal; } break; } } } return null; }
5.4 ForwardingNode類
static final class ForwardingNode<K,V> extends Node<K,V> { //新表的引用 final Node<K,V>[] nextTable; ForwardingNode(Node<K,V>[] tab) { super(MOVED, null, null, null); this.nextTable = tab; } //進行get操做的線程若發現槽中的節點爲ForwordingNode類型 //說明該桶中全部結點已遷移完成,會調用ForwordingNode的find方法在新表中進行查找 Node<K,V> find(int h, Object k) { // loop to avoid arbitrarily deep recursion on forwarding nodes //重新表中查詢 outer: for (Node<K,V>[] tab = nextTable;;) { //n表示新表的長度 Node<K,V> e; int n; if (k == null || tab == null || (n = tab.length) == 0 || //從新在新表中定位 (e = tabAt(tab, (n - 1) & h)) == null) return null; for (;;) { int eh; K ek; if ((eh = e.hash) == h && ((ek = e.key) == k || (ek != null && k.equals(ek)))) return e; //繼續遞歸查詢?這裏沒看懂 if (eh < 0) { if (e instanceof ForwardingNode) { tab = ((ForwardingNode<K,V>)e).nextTable; continue outer; } else return e.find(h, k); } //下一個 if ((e = e.next) == null) return null; } } } }
ForwardingNode類繼承了Node類,因此ForwardingNode對象也是Node類型對象,因此它也能夠放到表中。
ForwardingNode在擴容中使用。每個ForwardingNode對象都包含擴容後的表的引用(新表保存在nextTable屬性中)。 ForwardingNode對象的key,value,next屬性值所有爲null,它的hash值爲-1(注意小於0哦,能夠去看看get方法中對應的部分了)。
ForwardingNode對象中也定義了find的方法,它是從擴容後的新表中查詢結點,而不是以自身爲頭結點進行查找。
5.5 擴容方法
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { int n = tab.length, stride; if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range if (nextTab == null) { // initiating try { @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return; } nextTable = nextTab; transferIndex = n; } int nextn = nextTab.length; ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); boolean advance = true; boolean finishing = false; // to ensure sweep before committing nextTab for (int i = 0, bound = 0;;) { Node<K,V> f; int fh; while (advance) { int nextIndex, nextBound; if (--i >= bound || finishing) advance = false; else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } } if (i < 0 || i >= n || i + n >= nextn) { int sc; if (finishing) { nextTable = null; table = nextTab; sizeCtl = (n << 1) - (n >>> 1); return; } if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; finishing = advance = true; i = n; // recheck before commit } } else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd); else if ((fh = f.hash) == MOVED) advance = true; // already processed else { //頭結點加鎖,防止其它線程此時對該桶進行put和remove操做 synchronized (f) { //和put及remove操做判斷頭結點是否改變的原理相似 if (tabAt(tab, i) == f) { // fh >= 0 表示鏈表 Node<K,V> ln, hn; if (fh >= 0) { int runBit = fh & n; Node<K,V> lastRun = f; for (Node<K,V> p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } //按新表中槽的位置分爲兩部分 //注意新表中的節點都是新建的,而不是修改原的結點的next指針 //這樣作是爲了同其它線程的get方法併發時能get正確的結果 for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); //將頭結點設置爲fwd setTabAt(tab, i, fwd); advance = true; } //表示紅黑樹 else if (f instanceof TreeBin) { TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null, loTail = null; TreeNode<K,V> hi = null, hiTail = null; int lc = 0, hc = 0; for (Node<K,V> e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); //將頭結點設置爲fwd setTabAt(tab, i, fwd); advance = true; } } } } } }
整個擴容操做分爲兩個部分
1)構建一個nextTable,它的容量是原來的兩倍,這個操做是單線程完成的。
2)是將原來table中的結點遷移到nextTable中,這裏容許多線程進行操做。
在每一個位置擴容時,會對頭結點加鎖,避免其它線程在該位置進行put及remove操做,這個位置擴容結束時會將頭結點設置成ForwardingNode,而後釋放鎖。ForwardingNode結點中包含新表的引用,ForwardingNode結點的hash屬性的值爲-1,next屬性的值爲null。原表中引用爲null的槽一樣被設置成ForwardingNode結點。
多線程遷移的過程不是一個線程處理一個槽,而是一個線程處理多個連續的多個槽。在ConcurrentHashMap類中還定義下面屬性值,開始擴容時這個值表示了舊錶的長度,也就是說搬運工做是從舊錶的末尾開始的。
private transient volatile int transferIndex;
transfer函數中定義一個局部變量stride,它表示了每一個線程的一次遷移處理的桶的個數,當一個線程處理完成後 transferIndex就自減一個stride,那麼下一個線程就應該從transerIndex – stride處開始,往前處理stride個桶,以此類推完成協做。爲何要設計成從舊錶的後部開始往頭部的方向搬運呢?我的猜測是搬運結束的時候條件是統一的,只是寫代碼的技巧吧。固然怎麼肯定整個舊錶上的內容所有都遷移了,還須要讀更多的源代碼,這裏就不做分析了。
上圖表示了擴容操做過程當中舊錶和新表之間的一種可能的狀態,在圖中fw表示ForwordingNode類型結點,數字表示Node類型結點(上圖中的擴容過程和前面論述過的「4.擴容操做」章節中的的擴容過程不是同一個過程,對應的數據會有所差別)。如今咱們就經過如下幾種狀況解釋上圖所表達的意思。
首先,多個線程在同一個位置上的get操做時顯然不須要同步,因此這種狀況不須要討論,咱們來討論剩下幾種狀況。
6.1初化的同步問題
表的建立並非在構造函數中進行的,而是在put方法中進行的,也就是說這其實是個懶漢模式。可是若是多個線程同時建立表,顯然是非線程安全的。因此只能有一個線程來進行建立表,其它線程會等待建立完成後完成其它操做。ConcurrentHashMap類中設定一個volatile變量sizeCtl
private transient volatile int sizeCtl;
而後經過CAS方法去修改它,若是有其它線程發現sieCtl爲-1
U.compareAndSwapInt(this, SIZECTL, sc, -1)
就表示已經有線程正在建立表了,那麼當前線程就會放棄CPU使用權(調用Thread.yield()方法),等待分初始化完成後繼續進行put操做。不然當前線程嘗試將siezeCtl修改成-1,若成功,就由當前線程來建立表。
6.2 put方法和remove方法之間的同步問題
在表的同一個桶上,一個線程調用put方法和另外一個線程調用put方法是互斥的;在表的同一個桶上,一個線程調用remove方法和另外一個線程調用remove方法也是互斥的;在表的同一個桶上,一個線程調用remove方法和另外一個線程調用put方法也是互斥的。這些互斥操做在代碼中都是經過鎖來保證的,每一個線程執行這些操做時都會先鎖住槽。
6.3 put(或remove)方法和get方法的同步問題
實際上這兩類操做是不須要同步,先到先得。這主要因爲Node定義中value和next都定義成了volatile類型。一個線程可否get到另外一個線程剛剛put(或remove)的值,這主要由兩個線程當前訪問的結點所處的位置決定的。
6.4 get方法和擴容操做的同步問題
能夠分紅兩種狀況討論
1)該位置的頭結點是Node類型對象,直接get,即便這個桶正在進行遷移,在get方法未完成前,遷移操做已完成,即槽被設置成了ForwordingNode對象,也不要緊,並不影響get的結果。由於get線程仍然持有舊鏈表的引用,能夠從當前結點位置訪問到全部的後續結點。這是由於新表中的節點是經過複製舊錶中的結點獲得的,因此新表的結點的next值不會影響舊錶中對應結點的next值。當get方法結束後,舊鏈表就出於不可達的狀態,會被垃圾回收線程回收。
2)該位置的頭結點是ForwordingNode類型對象(頭結點的hash值 == -1),頭結點是ForwordingNode類型的對象,調用該對象的find方法,在新表中查找。
因此不管哪一種狀況,都能get到正確的值。
6.5 put(或remove)方法和擴容操做的同步問題
一樣能夠分爲兩種狀況討論:
1)該位置的頭結點是Node類型對象,那就看誰先獲取鎖,若是put操做先獲取鎖,則先將Node對象放入到舊錶中,而後調用addCount方法,判斷是否須要幫助擴容。
2)該位置的頭結點是ForwordingNode類型對象,那就會先幫助擴容,而後在新表中進行put操做。
[1] Java7/8 中的 HashMap 和 ConcurrentHashMap 全解析