(1)ConcurrentHashMap與HashMap的數據結構是否同樣?java
(2)HashMap在多線程環境下什麼時候會出現併發安全問題?數組
(3)ConcurrentHashMap是怎麼解決併發安全問題的?安全
(4)ConcurrentHashMap使用了哪些鎖?數據結構
(5)ConcurrentHashMap的擴容是怎麼進行的?多線程
(6)ConcurrentHashMap是不是強一致性的?併發
(7)ConcurrentHashMap不能解決哪些問題?源碼分析
(8)ConcurrentHashMap中有哪些不常見的技術值得學習?性能
ConcurrentHashMap是HashMap的線程安全版本,內部也是使用(數組 + 鏈表 + 紅黑樹)的結構來存儲元素。學習
相比於一樣線程安全的HashTable來講,效率等各方面都有極大地提升。優化
這裏先簡單介紹一下各類鎖,以便下文講到相關概念時能有個印象。
(1)synchronized
java中的關鍵字,內部實現爲監視器鎖,主要是經過對象監視器在對象頭中的字段來代表的。
synchronized從舊版本到如今已經作了不少優化了,在運行時會有三種存在方式:偏向鎖,輕量級鎖,重量級鎖。
偏向鎖,是指一段同步代碼一直被一個線程訪問,那麼這個線程會自動獲取鎖,下降獲取鎖的代價。
輕量級鎖,是指當鎖是偏向鎖時,被另外一個線程所訪問,偏向鎖會升級爲輕量級鎖,這個線程會經過自旋的方式嘗試獲取鎖,不會阻塞,提升性能。
重量級鎖,是指當鎖是輕量級鎖時,當自旋的線程自旋了必定的次數後,尚未獲取到鎖,就會進入阻塞狀態,該鎖升級爲重量級鎖,重量級鎖會使其餘線程阻塞,性能下降。
(2)CAS
CAS,Compare And Swap,它是一種樂觀鎖,認爲對於同一個數據的併發操做不必定會發生修改,在更新數據的時候,嘗試去更新數據,若是失敗就不斷嘗試。
(3)volatile(非鎖)
java中的關鍵字,當多個線程訪問同一個變量時,一個線程修改了這個變量的值,其餘線程可以當即看獲得修改的值。(這裏牽涉到java內存模型的知識,感興趣的同窗能夠本身查查相關資料)
volatile只保證可見性,不保證原子性,好比 volatile修改的變量 i,針對i++操做,不保證每次結果都正確,由於i++操做是兩步操做,至關於 i = i +1,先讀取,再加1,這種狀況volatile是沒法保證的。
(4)自旋鎖
自旋鎖,是指嘗試獲取鎖的線程不會阻塞,而是循環的方式不斷嘗試,這樣的好處是減小線程的上下文切換帶來的開鎖,提升性能,缺點是循環會消耗CPU。
(5)分段鎖
分段鎖,是一種鎖的設計思路,它細化了鎖的粒度,主要運用在ConcurrentHashMap中,實現高效的併發操做,當操做不須要更新整個數組時,就只鎖數組中的一項就能夠了。
(5)ReentrantLock
可重入鎖,是指一個線程獲取鎖以後再嘗試獲取鎖時會自動獲取鎖,可重入鎖的優勢是避免死鎖。
其實,synchronized也是可重入鎖。
public ConcurrentHashMap() { } public ConcurrentHashMap(int initialCapacity) { if (initialCapacity < 0) throw new IllegalArgumentException(); int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1)); this.sizeCtl = cap; } public ConcurrentHashMap(Map<? extends K, ? extends V> m) { this.sizeCtl = DEFAULT_CAPACITY; putAll(m); } public ConcurrentHashMap(int initialCapacity, float loadFactor) { this(initialCapacity, loadFactor, 1); } public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (initialCapacity < concurrencyLevel) // Use at least as many bins initialCapacity = concurrencyLevel; // as estimated threads long size = (long)(1.0 + (long)initialCapacity / loadFactor); int cap = (size >= (long)MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : tableSizeFor((int)size); this.sizeCtl = cap; }
構造方法與HashMap對比能夠發現,沒有了HashMap中的threshold和loadFactor,而是改用了sizeCtl來控制,並且只存儲了容量在裏面,那麼它是怎麼用的呢?官方給出的解釋以下:
(1)-1,表示有線程正在進行初始化操做
(2)-(1 + nThreads),表示有n個線程正在一塊兒擴容
(3)0,默認值,後續在真正初始化的時候使用默認容量
(4)> 0,初始化或擴容完成後下一次的擴容門檻
至於,官方這個解釋對不對咱們後面再討論。
public V put(K key, V value) { return putVal(key, value, false); } final V putVal(K key, V value, boolean onlyIfAbsent) { // key和value都不能爲null if (key == null || value == null) throw new NullPointerException(); // 計算hash值 int hash = spread(key.hashCode()); // 要插入的元素所在桶的元素個數 int binCount = 0; // 死循環,結合CAS使用(若是CAS失敗,則會從新取整個桶進行下面的流程) for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0) // 若是桶未初始化或者桶個數爲0,則初始化桶 tab = initTable(); else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // 若是要插入的元素所在的桶尚未元素,則把這個元素插入到這個桶中 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) // 若是使用CAS插入元素時,發現已經有元素了,則進入下一次循環,從新操做 // 若是使用CAS插入元素成功,則break跳出循環,流程結束 break; // no lock when adding to empty bin } else if ((fh = f.hash) == MOVED) // 若是要插入的元素所在的桶的第一個元素的hash是MOVED,則當前線程幫忙一塊兒遷移元素 tab = helpTransfer(tab, f); else { // 若是這個桶不爲空且不在遷移元素,則鎖住這個桶(分段鎖) // 並查找要插入的元素是否在這個桶中 // 存在,則替換值(onlyIfAbsent=false) // 不存在,則插入到鏈表結尾或插入樹中 V oldVal = null; synchronized (f) { // 再次檢測第一個元素是否有變化,若是有變化則進入下一次循環,從頭來過 if (tabAt(tab, i) == f) { // 若是第一個元素的hash值大於等於0(說明不是在遷移,也不是樹) // 那就是桶中的元素使用的是鏈表方式存儲 if (fh >= 0) { // 桶中元素個數賦值爲1 binCount = 1; // 遍歷整個桶,每次結束binCount加1 for (Node<K,V> e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { // 若是找到了這個元素,則賦值了新值(onlyIfAbsent=false) // 並退出循環 oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; if ((e = e.next) == null) { // 若是到鏈表尾部尚未找到元素 // 就把它插入到鏈表結尾並退出循環 pred.next = new Node<K,V>(hash, key, value, null); break; } } } else if (f instanceof TreeBin) { // 若是第一個元素是樹節點 Node<K,V> p; // 桶中元素個數賦值爲2 binCount = 2; // 調用紅黑樹的插入方法插入元素 // 若是成功插入則返回null // 不然返回尋找到的節點 if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { // 若是找到了這個元素,則賦值了新值(onlyIfAbsent=false) // 並退出循環 oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } // 若是binCount不爲0,說明成功插入了元素或者尋找到了元素 if (binCount != 0) { // 若是鏈表元素個數達到了8,則嘗試樹化 // 由於上面把元素插入到樹中時,binCount只賦值了2,並無計算整個樹中元素的個數 // 因此不會重複樹化 if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); // 若是要插入的元素已經存在,則返回舊值 if (oldVal != null) return oldVal; // 退出外層大循環,流程結束 break; } } } // 成功插入元素,元素個數加1(是否要擴容在這個裏面) addCount(1L, binCount); // 成功插入元素返回null return null; }
總體流程跟HashMap比較相似,大體是如下幾步:
(1)若是桶數組未初始化,則初始化;
(2)若是待插入的元素所在的桶爲空,則嘗試把此元素直接插入到桶的第一個位置;
(3)若是正在擴容,則當前線程一塊兒加入到擴容的過程當中;
(4)若是待插入的元素所在的桶不爲空且不在遷移元素,則鎖住這個桶(分段鎖);
(5)若是當前桶中元素以鏈表方式存儲,則在鏈表中尋找該元素或者插入元素;
(6)若是當前桶中元素以紅黑樹方式存儲,則在紅黑樹中尋找該元素或者插入元素;
(7)若是元素存在,則返回舊值;
(8)若是元素不存在,整個Map的元素個數加1,並檢查是否須要擴容;
添加元素操做中使用的鎖主要有(自旋鎖 + CAS + synchronized + 分段鎖)。
爲何使用synchronized而不是ReentrantLock?
由於synchronized已經獲得了極大地優化,在特定狀況下並不比ReentrantLock差。
未完待續~~