該系列文章收錄在公衆號【Ccww技術博客】,原創技術文章早於博客推出
在平時中集合使用中,當涉及多線程開發時,若是使用HashMap
可能會致使死鎖問題,使用HashTable
效率又不高。而ConcurrentHashMap
在保持同步同時併發效率比較高,ConcurrentHashmap
是最好的選擇,那面試中也會被經常問到,那可能的問題是:java
ConcurrentHashMap的實現原理node
ConcurrentHashMap的put()方法面試
其餘問題算法
ConcurrentHashMap的出現主要爲了解決hashmap在併發環境下不安全,JDK1.8ConcurrentHashMap的設計與實現很是精巧,大量的利用了volatile,CAS等樂觀鎖技術來減小鎖競爭對於性能的影響,ConcurrentHashMap保證線程安全的方案是:segmentfault
在JDK1.7中ConcurrentHashMap由Segment(分段鎖)數組結構和HashEntry數組組成,且主要經過Segment(分段鎖)段技術實現線程安全。數組
Segment是一種可重入鎖,是一種數組和鏈表的結構,一個Segment中包含一個HashEntry數組,每一個HashEntry又是一個鏈表結構,所以在ConcurrentHashMap查詢一個元素的過程須要進行兩次Hash操做,以下所示:安全
正是經過Segment分段鎖技術,將數據分紅一段一段的存儲,而後給每一段數據配一把鎖,當一個線程佔用鎖訪問其中一個段數據的時候,其餘段的數據也能被其餘線程訪問,可以實現真正的併發訪問。數據結構
這樣結構會使Hash的過程要比普通的HashMap要長,影響性能,但寫操做的時候能夠只對元素所在的Segment進行加鎖便可,不會影響到其餘的Segment,ConcurrentHashMap提高了併發能力。多線程
在JDK8ConcurrentHashMap內部機構:數組+鏈表+紅黑樹,Java 8在鏈表長度超過必定閾值(8)時將鏈表(尋址時間複雜度爲O(N))轉換爲紅黑樹(尋址時間複雜度爲O(long(N))),結構基本上與功能和JDK8的HashMap同樣,只不過ConcurrentHashMap保證線程安全性。併發
但在JDK1.8中摒棄了Segment分段鎖的數據結構,基於CAS操做保證數據的獲取以及使用synchronized關鍵字對相應數據段加鎖來實現線程安全,這進一步提升了併發性。(CAS原理詳情《面試:爲了進阿里,又把併發CAS(Compare and Swap)實現從新精讀一遍》)
))
static class Node<K,V> implements Map.Entry<K,V> { final int hash; final K key; volatile V val; //使用了volatile屬性 volatile Node<K,V> next; //使用了volatile屬性 ... }
ConcurrentHashMap採用Node類做爲基本的存儲單元,每一個鍵值對(key-value)都存儲在一個Node中,使用了volatile關鍵字修飾value和next,保證併發的可見性。其中Node子類有:
ConcurrentHashMap中查找元素、替換元素和賦值元素都是基於sun.misc.Unsafe
中原子操做實現多併發的無鎖化操做。
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) { return (Node<K,V>)U.getObjectAcquire(tab, ((long)i << ASHIFT) + ABASE); } static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) { return U.compareAndSetObject(tab, ((long)i << ASHIFT) + ABASE, c, v); } static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) { U.putObjectRelease(tab, ((long)i << ASHIFT) + ABASE, v); }
ConcurrentHashMap的put的流程步驟
if (key == null || value == null) throw new NullPointerException();
for的死循環,爲了實現CAS的無鎖化更新,若是table爲null或者table的長度爲0,則初始化table,調用initTable()
方法(第一次put數據,調用默認參數實現,其中重要的sizeCtl
參數)。
//計算索引的第一步,傳入鍵值的hash值 int hash = spread(key.hashCode()); int binCount = 0; //保存當前節點的長度 for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; K fk; V fv; if (tab == null || (n = tab.length) == 0) tab = initTable(); //初始化Hash表 ... }
肯定元素在Hash表的索引
經過hash算法能夠將元素分散到哈希桶中。在ConcurrentHashMap中經過以下方法肯定數組索引:
第一步:
static final int spread(int h) { return (h ^ (h >>> 16)) & HASH_BITS; }
第二步:(length-1) & (h ^ (h >>> 16)) & HASH_BITS);
經過tableAt()
方法找到位置tab[i]
的Node
,當Node爲null時爲沒有hash
衝突的話,使用casTabAt()
方法CAS
操做將元素插入到Hash
表中,ConcurrentHashmap
使用CAS
無鎖化操做,這樣在高併發hash
衝突低的狀況下,性能良好。
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //利用CAS操做將元素插入到Hash表中 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value))) break; // no lock when adding to empty bin(插入null的節點,無需加鎖) }
當f不爲null時,說明發生了hash衝突,當f.hash == MOVED==-1 時,說明ConcurrentHashmap
正在發生resize
操做,使用helpTransfer()
方法幫助正在進行resize操做。
else if ((fh = f.hash) == MOVED) //f.hash == -1 //hash爲-1 說明是一個forwarding nodes節點,代表正在擴容 tab = helpTransfer(tab, f);
以上狀況都不知足的時,使用synchronized
同步塊上鎖當前節點Node
,並判斷有沒有線程對數組進行了修改,若是沒有則進行:
binCount
,查找是否有和key相同的節點,若是有則將查找到節點的val值替換爲新的value值,並返回舊的value值,不然根據key,value,hash建立新Node並將其放在鏈表的尾部Node f
是TreeBin
的類型,則使用紅黑樹的方式進行插入。而後則退出synchronized(f)
鎖住的代碼塊//當前節點加鎖 synchronized (f) { //判斷下有沒有線程對數組進行了修改 if (tabAt(tab, i) == f) { //若是hash值是大於等於0的說明是鏈表 if (fh >= 0) { binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; //插入的元素鍵值的hash值有節點中元素的hash值相同,替換當前元素的值 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) //替換當前元素的值 e.val = value; break; } Node<K,V> pred = e; //若是循環到鏈表結尾還沒發現,那麼進行插入操做 if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value); break; } } }else if (f instanceof TreeBin) { //節點爲樹 Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) //替換舊值 p.val = value; } } else if (f instanceof ReservationNode) throw new IllegalStateException("Recursive update"); } }
執行完synchronized(f)
同步代碼塊以後會先檢查binCount
,若是大於等於TREEIFY_THRESHOLD = 8則進行treeifyBin操做嘗試將該鏈表轉換爲紅黑樹。
if (binCount != 0) { //若是節點長度大於8,轉化爲樹 if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; }
執行了一個addCount
方法,主要用於統計數量以及決定是否須要擴容.
addCount(1L, binCount);
ConcurrentHashmap
和hashMap
不一樣的是,concurrentHashMap
的key
和value
都不容許爲null,
由於concurrenthashmap
它們是用於多線程的,併發的 ,若是map.get(key)
獲得了null,不能判斷究竟是映射的value是null,仍是由於沒有找到對應的key而爲空,
而用於單線程狀態的hashmap
卻能夠用containKey(key)
去判斷究竟是否包含了這個null。
initTable()
方法/** * Hash表的初始化和調整大小的控制標誌。爲負數,Hash表正在初始化或者擴容; * (-1表示正在初始化,-N表示有N-1個線程在進行擴容) * 不然,當表爲null時,保存建立時使用的初始化大小或者默認0; * 初始化之後保存下一個調整大小的尺寸。 */ private transient volatile int sizeCtl; //第一次put,初始化數組 private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) { //若是已經有別的線程在初始化了,這裏等待一下 if ((sc = sizeCtl) < 0) Thread.yield(); // lost initialization race; just spin //-1 表示正在初始化 else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { ... } finally { sizeCtl = sc; } break; } } return tab; }
使用sizeCtl
參數做爲控制標誌的做用,當在從插入元素時,纔會初始化Hash表。在開始初始化的時候,
sizeCtl
的值,若是sizeCtl < 0,說明有線程在初始化,當前線程便放棄初始化操做。不然,將SIZECTL
設置爲-1,Hash表進行初始化。sizeCtl
的值設置爲當前的容量值else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //利用CAS操做將元素插入到Hash表中 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value))) break; // no lock when adding to empty bin(插入null的節點,無需加鎖) }
(f = tabAt(tab, i = (n - 1) & hash)) == null
中使用tabAt原子操做獲取數組,並利用casTabAt(tab, i, null, new Node<K,V>(hash, key, value))
CAS操做將元素插入到Hash表中
synchronized
加鎖,而後再使用tabAt()
原子操做判斷下有沒有線程對數組進行了修改,最後再進行其餘操做。爲何要鎖住更新操做的代碼塊?
由於發生了哈希衝突,當前線程正在f所在的鏈表上進行更新操做,假如此時另一個線程也須要到這個鏈表上進行更新操做,則須要等待當前線程更新完後再執行
//當前節點加鎖 synchronized (f) { //這裏判斷下有沒有線程對數組進行了修改 if (tabAt(tab, i) == f) { ......//do something } }
因爲篇幅過於長,分紅兩部分來說講,接下來的內容請看[《面試:爲了進阿里,死磕了ConcurrentHashMap源碼和麪試題(二)》]()
各位看官還能夠嗎?喜歡的話,動動手指點個💗,點個關注唄!!謝謝支持
歡迎關注公衆號【Ccww技術博客】,原創技術文章第一時間推出