(1)ConcurrentHashMap與HashMap的數據結構是否同樣?java
(2)HashMap在多線程環境下什麼時候會出現併發安全問題?node
(3)ConcurrentHashMap是怎麼解決併發安全問題的?算法
(4)ConcurrentHashMap使用了哪些鎖?segmentfault
(5)ConcurrentHashMap的擴容是怎麼進行的?數組
(6)ConcurrentHashMap是不是強一致性的?安全
(7)ConcurrentHashMap不能解決哪些問題?數據結構
(8)ConcurrentHashMap中有哪些不常見的技術值得學習?多線程
ConcurrentHashMap是HashMap的線程安全版本,內部也是使用(數組 + 鏈表 + 紅黑樹)的結構來存儲元素。併發
相比於一樣線程安全的HashTable來講,效率等各方面都有極大地提升。dom
這裏先簡單介紹一下各類鎖,以便下文講到相關概念時能有個印象。
(1)synchronized
java中的關鍵字,內部實現爲監視器鎖,主要是經過對象監視器在對象頭中的字段來代表的。
synchronized從舊版本到如今已經作了不少優化了,在運行時會有三種存在方式:偏向鎖,輕量級鎖,重量級鎖。
偏向鎖,是指一段同步代碼一直被一個線程訪問,那麼這個線程會自動獲取鎖,下降獲取鎖的代價。
輕量級鎖,是指當鎖是偏向鎖時,被另外一個線程所訪問,偏向鎖會升級爲輕量級鎖,這個線程會經過自旋的方式嘗試獲取鎖,不會阻塞,提升性能。
重量級鎖,是指當鎖是輕量級鎖時,當自旋的線程自旋了必定的次數後,尚未獲取到鎖,就會進入阻塞狀態,該鎖升級爲重量級鎖,重量級鎖會使其餘線程阻塞,性能下降。
(2)CAS
CAS,Compare And Swap,它是一種樂觀鎖,認爲對於同一個數據的併發操做不必定會發生修改,在更新數據的時候,嘗試去更新數據,若是失敗就不斷嘗試。
(3)volatile(非鎖)
java中的關鍵字,當多個線程訪問同一個變量時,一個線程修改了這個變量的值,其餘線程可以當即看獲得修改的值。(這裏牽涉到java內存模型的知識,感興趣的同窗能夠本身查查相關資料)
volatile只保證可見性,不保證原子性,好比 volatile修改的變量 i,針對i++操做,不保證每次結果都正確,由於i++操做是兩步操做,至關於 i = i +1,先讀取,再加1,這種狀況volatile是沒法保證的。
(4)自旋鎖
自旋鎖,是指嘗試獲取鎖的線程不會阻塞,而是循環的方式不斷嘗試,這樣的好處是減小線程的上下文切換帶來的開鎖,提升性能,缺點是循環會消耗CPU。
(5)分段鎖
分段鎖,是一種鎖的設計思路,它細化了鎖的粒度,主要運用在ConcurrentHashMap中,實現高效的併發操做,當操做不須要更新整個數組時,就只鎖數組中的一項就能夠了。
(5)ReentrantLock
可重入鎖,是指一個線程獲取鎖以後再嘗試獲取鎖時會自動獲取鎖,可重入鎖的優勢是避免死鎖。
其實,【本篇文章由公衆號「彤哥讀源碼」原創】synchronized也是可重入鎖。
public ConcurrentHashMap() { } public ConcurrentHashMap(int initialCapacity) { if (initialCapacity < 0) throw new IllegalArgumentException(); int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1)); this.sizeCtl = cap; } public ConcurrentHashMap(Map<? extends K, ? extends V> m) { this.sizeCtl = DEFAULT_CAPACITY; putAll(m); } public ConcurrentHashMap(int initialCapacity, float loadFactor) { this(initialCapacity, loadFactor, 1); } public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (initialCapacity < concurrencyLevel) // Use at least as many bins initialCapacity = concurrencyLevel; // as estimated threads long size = (long)(1.0 + (long)initialCapacity / loadFactor); int cap = (size >= (long)MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : tableSizeFor((int)size); this.sizeCtl = cap; }
構造方法與HashMap對比能夠發現,沒有了HashMap中的threshold和loadFactor,而是改用了sizeCtl來控制,並且只存儲了容量在裏面,那麼它是怎麼用的呢?官方給出的解釋以下:
/** * Table initialization and resizing control. When negative, the * table is being initialized or resized: -1 for initialization, * else -(1 + the number of active resizing threads). Otherwise, * when table is null, holds the initial table size to use upon * creation, or 0 for default. After initialization, holds the * next element count value upon which to resize the table. */ private transient volatile int sizeCtl;
(1)-1,表示有線程正在進行初始化操做
(2)-(1 + nThreads),表示有n個線程正在一塊兒擴容
(3)0,默認值,後續在真正初始化的時候使用默認容量
(4)> 0,初始化或擴容完成後下一次的擴容門檻
至於,官方這個解釋對不對咱們後面再討論。
public V put(K key, V value) { return putVal(key, value, false); } final V putVal(K key, V value, boolean onlyIfAbsent) { // key和value都不能爲null if (key == null || value == null) throw new NullPointerException(); // 計算hash值 int hash = spread(key.hashCode()); // 要插入的元素所在桶的元素個數 int binCount = 0; // 死循環,結合CAS使用(若是CAS失敗,則會從新取整個桶進行下面的流程) for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0) // 若是桶未初始化或者桶個數爲0,則初始化桶 tab = initTable(); else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // 若是要插入的元素所在的桶尚未元素,則把這個元素插入到這個桶中 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) // 若是使用CAS插入元素時,發現已經有元素了,則進入下一次循環,從新操做 // 若是使用CAS插入元素成功,則break跳出循環,流程結束 break; // no lock when adding to empty bin } else if ((fh = f.hash) == MOVED) // 若是要插入的元素所在的桶的第一個元素的hash是MOVED // 表示正在擴容,則當前線程幫忙一塊兒遷移元素 tab = helpTransfer(tab, f); else { // 若是這個桶不爲空且不在遷移元素,則鎖住這個桶(分段鎖) // 並查找要插入的元素是否在這個桶中 // 存在,則替換值(onlyIfAbsent=false) // 不存在,則插入到鏈表結尾或插入樹中 V oldVal = null; synchronized (f) { // 再次檢測第一個元素是否有變化,若是有變化則進入下一次循環,從頭來過 if (tabAt(tab, i) == f) { // 若是第一個元素的hash值大於等於0(說明不是在遷移,也不是樹) // 那就是桶中的元素使用的是鏈表方式存儲 if (fh >= 0) { // 桶中元素個數賦值爲1 binCount = 1; // 遍歷整個桶,每次結束binCount加1 for (Node<K,V> e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { // 若是找到了這個元素,則賦值了新值(onlyIfAbsent=false) // 並退出循環 oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; if ((e = e.next) == null) { // 若是到鏈表尾部尚未找到元素 // 就把它插入到鏈表結尾並退出循環 pred.next = new Node<K,V>(hash, key, value, null); break; } } } else if (f instanceof TreeBin) { // 若是第一個元素是樹節點 Node<K,V> p; // todo 桶中元素個數賦值爲2? binCount = 2; // 調用紅黑樹的插入方法插入元素 // 若是成功插入則返回null // 不然返回尋找到的節點 if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { // 若是找到了這個元素,則賦值了新值(onlyIfAbsent=false) // 並退出循環 oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } // 若是binCount不爲0,說明成功插入了元素或者尋找到了元素 if (binCount != 0) { // 若是鏈表元素個數達到了8,則嘗試樹化 // 由於上面把元素插入到樹中時,binCount只賦值了2,並無計算整個樹中元素的個數 // 因此不會重複樹化 if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); // 若是要插入的元素已經存在,則返回舊值 if (oldVal != null) return oldVal; // 退出外層大循環,流程結束 break; } } } // 成功插入元素,元素個數加1(是否要擴容在這個裏面) addCount(1L, binCount); // 成功插入元素返回null return null; }
總體流程跟HashMap比較相似,大體是如下幾步:
(1)若是桶數組未初始化,則初始化;
(2)若是待插入的元素所在的桶爲空,則嘗試把此元素直接插入到桶的第一個位置;
(3)若是正在擴容,則當前線程一塊兒加入到擴容的過程當中;
(4)若是待插入的元素所在的桶不爲空且不在遷移元素,則鎖住這個桶(分段鎖);
(5)若是當前桶中元素以鏈表方式存儲,則在鏈表中尋找該元素或者插入元素;
(6)若是當前桶中元素以紅黑樹方式存儲,則在紅黑樹中尋找該元素或者插入元素;
(7)若是元素存在,則返回舊值;
(8)若是元素不存在,整個Map的元素個數加1,並檢查是否須要擴容;
添加元素操做中使用的鎖主要有(自旋鎖 + CAS + synchronized)。
P.S.1.爲何使用synchronized而不是ReentrantLock?
由於synchronized已經獲得了極大地優化,在特定狀況下並不比ReentrantLock差。
P.S.2.Node中的hash值有哪些?
static final int MOVED = -1; // hash for forwarding nodes static final int TREEBIN = -2; // hash for roots of trees static final int RESERVED = -3; // hash for transient reservations static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash
MOVED(-1),表示正在擴容,做用在ForwardingNode上;
TREEBIN(-2),表示樹節點,做用在TreeBin上;(TreeBin是桶中的第一個元素,真正的元素存儲在TreeBin裏面的TreeNode上)
RESERVED(-3),表示保留節點,做用在ReservationNode上,ReservationNode是用在compute()和computeIfAbsent()方法裏的;
HASH_BITS,正常元素的hash掩碼,與HASH_BITS作&操做後的hash值都是大於0的。
第一次放元素時,初始化桶數組。
private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) { if ((sc = sizeCtl) < 0) // 若是sizeCtl<0說明正在初始化或者擴容,讓出CPU Thread.yield(); // lost initialization race; just spin else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { // 若是把sizeCtl原子更新爲-1成功,則當前線程進入初始化 // 若是原子更新失敗則說明有其它線程先一步進入初始化了,則進入下一次循環 // 若是下一次循環時還沒初始化完畢,則sizeCtl<0進入上面if的邏輯讓出CPU // 若是下一次循環更新完畢了,則table.length!=0,退出循環 try { // 再次檢查table是否爲空,防止ABA問題 if ((tab = table) == null || tab.length == 0) { // 若是sc爲0則使用默認值16 int n = (sc > 0) ? sc : DEFAULT_CAPACITY; // 新建數組 @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; // 賦值給table桶數組 table = tab = nt; // 設置sc爲數組長度的0.75倍 // n - (n >>> 2) = n - n/4 = 0.75n // 可見這裏裝載因子和擴容門檻都是寫死了的 // 這也正是沒有threshold和loadFactor屬性的緣由 sc = n - (n >>> 2); } } finally { // 把sc賦值給sizeCtl,這時存儲的是擴容門檻 sizeCtl = sc; } break; } } return tab; }
(1)使用CAS鎖控制只有一個線程初始化桶數組;
(2)sizeCtl在初始化後存儲的是擴容門檻;
(3)擴容門檻寫死的是桶數組大小的0.75倍,桶數組大小即map的容量,也就是最多存儲多少個元素。
每次添加元素後,元素數量加1,並判斷是否達到擴容門檻,達到了則進行擴容或協助擴容。
private final void addCount(long x, int check) { CounterCell[] as; long b, s; // 這裏使用的思想跟LongAdder類是如出一轍的(後面會講) // 把數組的大小存儲根據不一樣的線程存儲到不一樣的段上(也是分段鎖的思想) // 而且有一個baseCount,優先更新baseCount,若是失敗了再更新不一樣線程對應的段 // 這樣能夠保證儘可能小的減小衝突 // 先嚐試把數量加到baseCount上,若是失敗再加到分段的CounterCell上 if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { CounterCell a; long v; int m; boolean uncontended = true; // 若是as爲空【本篇文章由公衆號「彤哥讀源碼」原創】 // 或者長度爲0 // 或者當前線程所在的段爲null // 或者在當前線程的段上加數量失敗 if (as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { // 強制增長數量(不管如何數量是必定要加上的,並非簡單地自旋) // 不一樣線程對應不一樣的段都更新失敗了 // 說明已經發生衝突了,那麼就對counterCells進行擴容 // 以減小多個線程hash到同一個段的機率 fullAddCount(x, uncontended); return; } if (check <= 1) return; // 計算元素個數 s = sumCount(); } if (check >= 0) { Node<K,V>[] tab, nt; int n, sc; // 若是元素個數達到了擴容門檻,則進行擴容 // 注意,正常狀況下sizeCtl存儲的是擴容門檻,即容量的0.75倍 while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { // rs是擴容時的一個郵戳標識 int rs = resizeStamp(n); if (sc < 0) { // sc<0說明正在擴容中 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) // 擴容已經完成了,退出循環 // 正常應該只會觸發nextTable==null這個條件,其它條件沒看出來什麼時候觸發 break; // 擴容未完成,則當前線程加入遷移元素中 // 並把擴容線程數加1 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) // 這裏是觸發擴容的那個線程進入的地方 // sizeCtl的高16位存儲着rs這個擴容郵戳 // sizeCtl的低16位存儲着擴容線程數加1,即(1+nThreads) // 因此官方說的擴容時sizeCtl的值爲 -(1+nThreads)是錯誤的 // 進入遷移元素 transfer(tab, null); // 從新計算元素個數 s = sumCount(); } } }
(1)元素個數的存儲方式相似於LongAdder類,存儲在不一樣的段上,減小不一樣線程同時更新size時的衝突;
(2)計算元素個數時把這些段的值及baseCount相加算出總的元素個數;
(3)正常狀況下sizeCtl存儲着擴容門檻,擴容門檻爲容量的0.75倍;
(4)擴容時sizeCtl高位存儲擴容郵戳(resizeStamp),低位存儲擴容線程數加1(1+nThreads);
(5)其它線程添加元素後若是發現存在擴容,也會加入的擴容行列中來;
線程添加元素時發現正在擴容且當前元素所在的桶元素已經遷移完成了,則協助遷移其它桶的元素。
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) { Node<K,V>[] nextTab; int sc; // 若是桶數組不爲空,而且當前桶第一個元素爲ForwardingNode類型,而且nextTab不爲空 // 說明當前桶已經遷移完畢了,纔去幫忙遷移其它桶的元素 // 擴容時會把舊桶的第一個元素置爲ForwardingNode,並讓其nextTab指向新桶數組 if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) { int rs = resizeStamp(tab.length); // sizeCtl<0,說明正在擴容 while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0) break; // 擴容線程數加1 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { // 當前線程幫忙遷移元素 transfer(tab, nextTab); break; } } return nextTab; } return table; }
當前桶元素遷移完成了纔去協助遷移其它桶元素;
擴容時容量變爲兩倍,並把部分元素遷移到其它桶中。
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { int n = tab.length, stride; if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range if (nextTab == null) { // initiating // 若是nextTab爲空,說明還沒開始遷移 // 就新建一個新桶數組 try { // 新桶數組是原桶的兩倍 @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return; } nextTable = nextTab; transferIndex = n; } // 新桶數組大小 int nextn = nextTab.length; // 新建一個ForwardingNode類型的節點,並把新桶數組存儲在裏面 ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); boolean advance = true; boolean finishing = false; // to ensure sweep before committing nextTab for (int i = 0, bound = 0;;) { Node<K,V> f; int fh; // 整個while循環就是在算i的值,過程太複雜,不用太關心 // i的值會從n-1依次遞減,感興趣的能夠打下斷點就知道了 // 其中n是舊桶數組的大小,也就是說i從15開始一直減到1這樣去遷移元素 while (advance) { int nextIndex, nextBound; if (--i >= bound || finishing) advance = false; else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } } if (i < 0 || i >= n || i + n >= nextn) { // 若是一次遍歷完成了 // 也就是整個map全部桶中的元素都遷移完成了 int sc; if (finishing) { // 若是所有遷移完成了,則替換舊桶數組 // 並設置下一次擴容門檻爲新桶數組容量的0.75倍 nextTable = null; table = nextTab; sizeCtl = (n << 1) - (n >>> 1); return; } if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { // 當前線程擴容完成,把擴容線程數-1 if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) // 擴容完成兩邊確定相等 return; // 把finishing設置爲true // finishing爲true纔會走到上面的if條件 finishing = advance = true; // i從新賦值爲n // 這樣會再從新遍歷一次桶數組,看看是否是都遷移完成了 // 也就是第二次遍歷都會走到下面的(fh = f.hash) == MOVED這個條件 i = n; // recheck before commit } } else if ((f = tabAt(tab, i)) == null) // 若是桶中無數據,直接放入ForwardingNode標記該桶已遷移 advance = casTabAt(tab, i, null, fwd); else if ((fh = f.hash) == MOVED) // 若是桶中第一個元素的hash值爲MOVED // 說明它是ForwardingNode節點 // 也就是該桶已遷移 advance = true; // already processed else { // 鎖定該桶並遷移元素 // 【本篇文章由公衆號「彤哥讀源碼」原創】 synchronized (f) { // 再次判斷當前桶第一個元素是否有修改 // 也就是可能其它線程先一步遷移了元素 if (tabAt(tab, i) == f) { // 把一個鏈表分化成兩個鏈表 // 規則是桶中各元素的hash與桶大小n進行與操做 // 等於0的放到低位鏈表(low)中,不等於0的放到高位鏈表(high)中 // 其中低位鏈表遷移到新桶中的位置相對舊桶不變 // 高位鏈表遷移到新桶中位置正好是其在舊桶的位置加n // 這也正是爲何擴容時容量在變成兩倍的緣由 Node<K,V> ln, hn; if (fh >= 0) { // 第一個元素的hash值大於等於0 // 說明該桶中元素是以鏈表形式存儲的 // 這裏與HashMap遷移算法基本相似 // 惟一不一樣的是多了一步尋找lastRun // 這裏的lastRun是提取出鏈表後面不用處理再特殊處理的子鏈表 // 好比全部元素的hash值與桶大小n與操做後的值分別爲 0 0 4 4 0 0 0 // 則最後後面三個0對應的元素確定仍是在同一個桶中 // 這時lastRun對應的就是倒數第三個節點 // 至於爲啥要這樣處理,我也沒太搞明白 int runBit = fh & n; Node<K,V> lastRun = f; for (Node<K,V> p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } // 看看最後這幾個元素歸屬於低位鏈表仍是高位鏈表 if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } // 遍歷鏈表,把hash&n爲0的放在低位鏈表中 // 不爲0的放在高位鏈表中 for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } // 低位鏈表的位置不變 setTabAt(nextTab, i, ln); // 高位鏈表的位置是原位置加n setTabAt(nextTab, i + n, hn); // 標記當前桶已遷移 setTabAt(tab, i, fwd); // advance爲true,返回上面進行--i操做 advance = true; } else if (f instanceof TreeBin) { // 若是第一個元素是樹節點 // 也是同樣,分化成兩顆樹 // 也是根據hash&n爲0放在低位樹中 // 不爲0放在高位樹中 TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null, loTail = null; TreeNode<K,V> hi = null, hiTail = null; int lc = 0, hc = 0; // 遍歷整顆樹,根據hash&n是否爲0分化成兩顆樹 for (Node<K,V> e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } // 若是分化的樹中元素個數小於等於6,則退化成鏈表 ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t; // 低位樹的位置不變 setTabAt(nextTab, i, ln); // 高位樹的位置是原位置加n setTabAt(nextTab, i + n, hn); // 標記該桶已遷移 setTabAt(tab, i, fwd); // advance爲true,返回上面進行--i操做 advance = true; } } } } } }
(1)新桶數組大小是舊桶數組的兩倍;
(2)遷移元素先從靠後的桶開始;
(3)遷移完成的桶在裏面放置一ForwardingNode類型的元素,標記該桶遷移完成;
(4)遷移時根據hash&n是否等於0把桶中元素分化成兩個鏈表或樹;
(5)低位鏈表(樹)存儲在原來的位置;
(6)高們鏈表(樹)存儲在原來的位置加n的位置;
(7)遷移元素時會鎖住當前桶,也是分段鎖的思想;
刪除元素跟添加元素同樣,都是先找到元素所在的桶,而後採用分段鎖的思想鎖住整個桶,再進行操做。
public V remove(Object key) { // 調用替換節點方法 return replaceNode(key, null, null); } final V replaceNode(Object key, V value, Object cv) { // 計算hash int hash = spread(key.hashCode()); // 自旋 for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0 || (f = tabAt(tab, i = (n - 1) & hash)) == null) // 若是目標key所在的桶不存在,跳出循環返回null break; else if ((fh = f.hash) == MOVED) // 若是正在擴容中,協助擴容 tab = helpTransfer(tab, f); else { V oldVal = null; // 標記是否處理過 boolean validated = false; synchronized (f) { // 再次驗證當前桶第一個元素是否被修改過 if (tabAt(tab, i) == f) { if (fh >= 0) { // fh>=0表示是鏈表節點 validated = true; // 遍歷鏈表尋找目標節點 for (Node<K,V> e = f, pred = null;;) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { // 找到了目標節點 V ev = e.val; // 檢查目標節點舊value是否等於cv if (cv == null || cv == ev || (ev != null && cv.equals(ev))) { oldVal = ev; if (value != null) // 若是value不爲空則替換舊值 e.val = value; else if (pred != null) // 若是前置節點不爲空 // 刪除當前節點 pred.next = e.next; else // 若是前置節點爲空 // 說明是桶中第一個元素,刪除之 setTabAt(tab, i, e.next); } break; } pred = e; // 遍歷到鏈表尾部還沒找到元素,跳出循環 if ((e = e.next) == null) break; } } else if (f instanceof TreeBin) { // 若是是樹節點 validated = true; TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> r, p; // 遍歷樹找到了目標節點 if ((r = t.root) != null && (p = r.findTreeNode(hash, key, null)) != null) { V pv = p.val; // 檢查目標節點舊value是否等於cv if (cv == null || cv == pv || (pv != null && cv.equals(pv))) { oldVal = pv; if (value != null) // 若是value不爲空則替換舊值 p.val = value; else if (t.removeTreeNode(p)) // 若是value爲空則刪除元素 // 若是刪除後樹的元素個數較少則退化成鏈表 // t.removeTreeNode(p)這個方法返回true表示刪除節點後樹的元素個數較少 setTabAt(tab, i, untreeify(t.first)); } } } } } // 若是處理過,無論有沒有找到元素都返回 if (validated) { // 若是找到了元素,返回其舊值 if (oldVal != null) { // 若是要替換的值爲空,元素個數減1 if (value == null) addCount(-1L, -1); return oldVal; } break; } } } // 沒找到元素返回空 return null; }
(1)計算hash【本篇文章由公衆號「彤哥讀源碼」原創】;
(2)若是所在的桶不存在,表示沒有找到目標元素,返回;
(3)若是正在擴容,則協助擴容完成後再進行刪除操做;
(4)若是是以鏈表形式存儲的,則遍歷整個鏈表查找元素,找到以後再刪除;
(5)若是是以樹形式存儲的,則遍歷樹查找元素,找到以後再刪除;
(6)若是是以樹形式存儲的,刪除元素以後樹較小,則退化成鏈表;
(7)若是確實刪除了元素,則整個map元素個數減1,並返回舊值;
(8)若是沒有刪除元素,則返回null;
獲取元素,根據目標key所在桶的第一個元素的不一樣採用不一樣的方式獲取元素,關鍵點在於find()方法的重寫。
public V get(Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; // 計算hash int h = spread(key.hashCode()); // 若是元素所在的桶存在且裏面有元素 if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { // 若是第一個元素就是要找的元素,直接返回 if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } else if (eh < 0) // hash小於0,說明是樹或者正在擴容 // 使用find尋找元素,find的尋找方式依據Node的不一樣子類有不一樣的實現方式 return (p = e.find(h, key)) != null ? p.val : null; // 遍歷整個鏈表尋找元素 while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; }
(1)hash到元素所在的桶;
(2)若是桶中第一個元素就是該找的元素,直接返回;
(3)若是是樹或者正在遷移元素,則調用各自Node子類的find()方法尋找元素;
(4)若是是鏈表,遍歷整個鏈表尋找元素;
(5)獲取元素沒有加鎖;
元素個數的存儲也是採用分段的思想,獲取元素個數時須要把全部段加起來。
public int size() { // 調用sumCount()計算元素個數 long n = sumCount(); return ((n < 0L) ? 0 : (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n); } final long sumCount() { // 計算CounterCell全部段及baseCount的數量之和 CounterCell[] as = counterCells; CounterCell a; long sum = baseCount; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) sum += a.value; } } return sum; }
(1)元素的個數依據不一樣的線程存在在不一樣的段裏;(見addCounter()分析)
(2)計算CounterCell全部段及baseCount的數量之和;
(3)獲取元素個數沒有加鎖;
(1)ConcurrentHashMap是HashMap的線程安全版本;
(2)ConcurrentHashMap採用(數組 + 鏈表 + 紅黑樹)的結構存儲元素;
(3)ConcurrentHashMap相比於一樣線程安全的HashTable,效率要高不少;
(4)ConcurrentHashMap採用的鎖有 synchronized,CAS,自旋鎖,分段鎖,volatile等;
(5)ConcurrentHashMap中沒有threshold和loadFactor這兩個字段,而是採用sizeCtl來控制;
(6)sizeCtl = -1,表示正在進行初始化;
(7)sizeCtl = 0,默認值,表示後續在真正初始化的時候使用默認容量;
(8)sizeCtl > 0,在初始化以前存儲的是傳入的容量,在初始化或擴容後存儲的是下一次的擴容門檻;
(9)sizeCtl = (resizeStamp << 16) + (1 + nThreads),表示正在進行擴容,高位存儲擴容郵戳,低位存儲擴容線程數加1;
(10)更新操做時若是正在進行擴容,當前線程協助擴容;
(11)更新操做會採用synchronized鎖住當前桶的第一個元素,這是分段鎖的思想;
(12)整個擴容過程都是經過CAS控制sizeCtl這個字段來進行的,這很關鍵;
(13)遷移完元素的桶會放置一個ForwardingNode節點,以標識該桶遷移完畢;
(14)元素個數的存儲也是採用的分段思想,相似於LongAdder的實現;
(15)元素個數的更新會把不一樣的線程hash到不一樣的段上,減小資源爭用;
(16)元素個數的更新若是仍是出現多個線程同時更新一個段,則會擴容段(CounterCell);
(17)獲取元素個數是把全部的段(包括baseCount和CounterCell)相加起來獲得的;
(18)查詢操做是不會加鎖的,因此ConcurrentHashMap不是強一致性的;
(19)ConcurrentHashMap中不能存儲key或value爲null的元素;
ConcurrentHashMap中有哪些值得學習的技術呢?
我認爲有如下幾點:
(1)CAS + 自旋,樂觀鎖的思想,減小線程上下文切換的時間;
(2)分段鎖的思想,減小同一把鎖爭用帶來的低效問題;
(3)CounterCell,分段存儲元素個數,減小多線程同時更新一個字段帶來的低效;
(4)@sun.misc.Contended(CounterCell上的註解),避免僞共享;(p.s.僞共享咱們後面也會講的^^)
(5)多線程協同進行擴容【本篇文章由公衆號「彤哥讀源碼」原創】;
(6)你又學到了哪些呢?
ConcurrentHashMap不能解決什麼問題呢?
請看下面的例子:
private static final Map<Integer, Integer> map = new ConcurrentHashMap<>(); public void unsafeUpdate(Integer key, Integer value) { Integer oldValue = map.get(key); if (oldValue == null) { map.put(key, value); } }
這裏若是有多個線程同時調用unsafeUpdate()這個方法,ConcurrentHashMap還能保證線程安全嗎?
答案是不能。由於get()以後if以前可能有其它線程已經put()了這個元素,這時候再put()就把那個線程put()的元素覆蓋了。
那怎麼修改呢?
答案也很簡單,使用putIfAbsent()方法,它會保證元素不存在時才插入元素,以下:
public void safeUpdate(Integer key, Integer value) { map.putIfAbsent(key, value); }
那麼,若是上面oldValue不是跟null比較,而是跟一個特定的值好比1進行比較怎麼辦?也就是下面這樣:
public void unsafeUpdate(Integer key, Integer value) { Integer oldValue = map.get(key); if (oldValue == 1) { map.put(key, value); } }
這樣的話就沒辦法使用putIfAbsent()方法了。
其實,ConcurrentHashMap還提供了另外一個方法叫replace(K key, V oldValue, V newValue)能夠解決這個問題。
replace(K key, V oldValue, V newValue)這個方法可不能亂用,若是傳入的newValue是null,則會刪除元素。
public void safeUpdate(Integer key, Integer value) { map.replace(key, 1, value); }
那麼,若是if以後不是簡單的put()操做,而是還有其它業務操做,以後纔是put(),好比下面這樣,這該怎麼辦呢?
public void unsafeUpdate(Integer key, Integer value) { Integer oldValue = map.get(key); if (oldValue == 1) { System.out.println(System.currentTimeMillis()); /** * 其它業務操做 */ System.out.println(System.currentTimeMillis()); map.put(key, value); } }
這時候就沒辦法使用ConcurrentHashMap提供的方法了,只能業務本身來保證線程安全了,好比下面這樣:
public void safeUpdate(Integer key, Integer value) { synchronized (map) { Integer oldValue = map.get(key); if (oldValue == null) { System.out.println(System.currentTimeMillis()); /** * 其它業務操做 */ System.out.println(System.currentTimeMillis()); map.put(key, value); } } }
這樣雖然不太友好,可是最起碼能保證業務邏輯是正確的。
固然,這裏使用ConcurrentHashMap的意義也就不大了,能夠換成普通的HashMap了。
上面只是舉一個簡單的例子,咱們不能據說ConcurrentHashMap是線程安全的,就認爲它不管什麼狀況下都是線程安全的,仍是那句話盡信書不如無書。
這也正是咱們讀源碼的目的之一,瞭解其本質,才能在咱們的實際工做中少挖坑,不管是挖給別人仍是挖給本身^^。
好了,整個ConcurrentHashMap就講完了。
文章暫不支持留言功能,若是您有任何建議或意見請在公衆號後臺給我留言,留言必回覆。
喜歡這篇關於ConcurrentHashMap講解的,賞個雞腿唄~~
歡迎關注個人公衆號「彤哥讀源碼」,查看更多源碼系列文章, 與彤哥一塊兒暢遊源碼的海洋。