ConcurrentHashMap源碼閱讀java
稍微有點粗糙的圖片node
ConcurrentHashMap
是屬於Java併發包,能夠稱之爲是線程安全的HashMap.
(如下有簡稱CHM
)數組
總所周知,HashMap
有良好的存取性能,但並不支持併發環境,HashTable
支持併發環境,而在存取方法上直接加Synchronized
的方式會使性能明顯降低,儘管Synchronize
在JDK1.6
以後進行了大量的優化,但依舊不是最優選.安全
在HashMap
中數組+鏈表/紅黑樹的結構基礎上,區別於HashTable
中的對整個數組對象上鎖,ConcurrentHashMap
使用爲數組中的每一個桶上鎖的機制,不知道還能不能成爲分段鎖一個桶就是一個段(JDK1.7
採用的是segment,1.8的代碼中雖然保留了但很是簡短僅爲兼容)多線程
僅表明我的意見,有錯誤歡迎留言,謝謝!併發
1. transient volatile Node<K,V>[] table;
複製代碼
實際存儲數據的Node數組,volatile
保證可見性。dom
2. private transient volatile Node<K,V>[] nextTable;
複製代碼
下一個使用的數組,僅在擴容更新的時候不爲空,擴容時會慢慢把數據移動到這個數組.ide
該數組做爲擴容的過分,類外沒法訪問函數
3. private transient volatile long baseCount;
複製代碼
在沒有發生爭用時的元素統計工具
4. private transient volatile int transferIndex;
複製代碼
擴容索引值,表示已經分配給擴容線程的table數組索引位置,主要用來協調多個線程間遷移任務的併發安全.
private transient volatile int sizeCtl;
複製代碼
重要程度堪比AQS
的state
,是一個在多線程間共享的競態變量,用於維護各類狀態,保存各種信息.
sizeCtl > 0
時可分爲兩種狀況:
sizeCtl
表示初始容量.sizeCtl = -1
: 表示正在初始化或者擴容階段.
sizeCtl < -1
: sizeCtl
承擔起了擴容時標識符(高16位)和參與線程數目(低16位)的存儲
在addCount
和helpTransfer
的方法代碼中,若是須要幫助擴容,則會CAS替換爲sizeCtl+1
在完成當前擴容內容,且沒有再分配的區域時,線程會退出擴容,此時會CAS替換爲sizeCtl-1
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
Node(int hash, K key, V val, Node<K,V> next) {
this.hash = hash;
this.key = key;
this.val = val;
this.next = next;
}
public final K getKey() { return key; }
public final V getValue() { return val; }
public final int hashCode() { return key.hashCode() ^ val.hashCode(); }
public final String toString(){ return key + "=" + val; }
public final V setValue(V value) {
throw new UnsupportedOperationException();
}
public final boolean equals(Object o) {
Object k, v, u; Map.Entry<?,?> e;
return ((o instanceof Map.Entry) &&
(k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
(v = e.getValue()) != null &&
(k == key || k.equals(key)) &&
(v == (u = val) || v.equals(u)));
}
/** * Virtualized support for map.get(); overridden in subclasses. * 爲map.get()提供虛擬化支持;在子類中覆蓋. */
Node<K,V> find(int h, Object k) {
Node<K,V> e = this;
if (k != null) {
do {
// 裏面就是普通的鏈表循環,直到拿到相應的值
K ek;
if (e.hash == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
} while ((e = e.next) != null);
}
return null;
}
}
複製代碼
ConcurrentHashMap
,HashMap
中Node
的差異
val
和next
使用volatile
關鍵字修飾,確保多線程之間的可見性.hashCode
方法略有不一樣,由於ConcurrentHashMap
不支持key
或value
爲NULL值,因此直接使用key.hashCode() ^ val.hashCode()
跳過了爲空判斷.find()
方法用來在特定時間段幫忙獲取節點後的元素.通常做爲桶的頭節點調用,用來查詢桶中元素.轉發節點
該類僅僅存活在擴容階段,做爲一個標記節點放在桶的首位,而且指向是nextTable
(擴容的中間數組)
從構造函數可知,ForwardingNode
的hash
爲-1,其餘爲空,是個完徹底全的輔助類.
static final class ForwardingNode<K,V> extends Node<K,V> {
final Node<K,V>[] nextTable;
// 構造函數中默認以MOVED:-1爲Hash,其它爲空
ForwardingNode(Node<K,V>[] tab) {
super(MOVED, null, null, null);
this.nextTable = tab;
}
// 幫助擴容時的元素查找
Node<K,V> find(int h, Object k) {
// loop to avoid arbitrarily deep recursion on forwarding nodes
outer: for (Node<K,V>[] tab = nextTable;;) {
Node<K,V> e; int n;
if (k == null || tab == null || (n = tab.length) == 0 ||
(e = tabAt(tab, (n - 1) & h)) == null)
return null;
for (;;) {
int eh; K ek;
if ((eh = e.hash) == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
if (eh < 0) {
if (e instanceof ForwardingNode) {
tab = ((ForwardingNode<K,V>)e).nextTable;
continue outer;
}
else
return e.find(h, k);
}
if ((e = e.next) == null)
return null;
}
}
}
}
複製代碼
find()
方法實在擴容期間幫助get
方法獲取桶中元素.public V put(K key, V value) {
return putVal(key, value, false);
}
複製代碼
/** * 方法參數: * 1. key,value 天然不用說就是k/v的兩個值 * 2. onlyIfAbsent 若爲true,則僅僅在值爲空時覆蓋 * 返回值: * 返回舊值,如果新增就爲null. */
final V putVal(K key, V value, boolean onlyIfAbsent) {
// CHM不支持NULL值的鐵證.
if (key == null || value == null) throw new NullPointerException();
// 得到key的Hash,spread能夠稱之爲擾動函數
int hash = spread(key.hashCode());
int binCount = 0;
// 無限循環
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 在tab爲空時負責初始化Table
if (tab == null || (n = tab.length) == 0)
tab = initTable();
// 使用`(n-1)&hash`肯定了元素的下標位置,獲取對應節點
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 若是對應位置節點爲空,直接以當前信息爲桶的頭節點
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
// 若是獲取的桶的頭結點的`Hash`爲`MOVED`,表示該節點是`ForwardingNode`
// 也就表示數組正在進行擴容
else if ((fh = f.hash) == MOVED)
// 幫助擴容
tab = helpTransfer(tab, f);
else {
V oldVal = null;
// 上鎖保證原子性,volatile僅能保證可見性
// f爲key獲取到的節點元素,以此爲鎖對象
synchronized (f) {
// f在上文就是根據`tabAt(tab,i)`獲取的
// 此處是再次獲取驗證有沒有被修改
if (tabAt(tab, i) == f) {
// 與else.if比較,得知
// fh >= 0表示當前節點爲鏈表節點,即當前桶結構爲鏈表 ???
if (fh >= 0) {
// 鏈表中的元素個數統計
binCount = 1;
// 循環遍歷整個桶
// 跳出循環的兩種狀況:
// 1. 找到相同的值,binCount此時表示遍歷的節點個數
// 2. 遍歷到末尾,binCount就表示桶中的節點個數
for (Node<K,V> e = f;; ++binCount) {
K ek;
// 源碼中大量運用了表達式的短路特性,來展現判斷的優先級
// 1. 若hash不相等,則直接跳過判斷
// 2. hash相等以後,若key的地址相同,則直接進入if
// 3. 地址不一樣時在進入判斷內容是否相等
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
// onlyIfAbsent爲true,表示存在時不覆蓋內容
if (!onlyIfAbsent)
e.val = value;
// 已經找到肯定的元素了,更新不更新都跳出
break;
}
// 由於e就在同步代碼塊中,桶已經被上鎖,不可能有別的線程改變
// 因此不須要從新獲取
Node<K,V> pred = e;
// 1. 若是e爲空,則直接將元素掛接到e後面,跳出循環
// 2. e不爲空,繼續遍歷
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
// 相似HashMap,樹節點獨立操做.
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
// 表示進入了上面的同步表達式,對桶進行修改以後
if (binCount != 0) {
// 若是binCount大於樹的臨界值,就將鏈表轉化爲紅黑樹
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
// 若是oldVal部位空,則返回
if (oldVal != null)
return oldVal;
break;
}
}
}
// 添加元素計數,並在binCount大於0時檢查是否須要擴容
addCount(1L, binCount);
return null;
}
複製代碼
判斷並排除key,value非空,ConcurrentHashMap
不支持key或value爲空.
獲得擾動後的hash,進入tab數組的遍歷,若數組爲空則進行初始化
經過(n - 1) & hash
的公式獲取桶的下標 ,若桶爲空則直接填充key,value爲桶的頭節點
判斷桶的頭節點hash,若hash == -1
表示數組在擴容並幫助擴容.
進入synchronize
的同步代碼塊,若是桶的頭節點的hash大於0表示桶的結構爲鏈表,接下去就是正常的鏈表遍歷,新增或者覆蓋.
若是桶的頭節點是TreeBin
類型表示桶的結構爲紅黑樹,按紅黑樹的操做進行遍歷.
退出同步代碼塊,判斷在遍歷期間統計的binCount
是否須要轉化爲紅黑樹結構.
判斷oldVal
是否爲空,這步也挺關鍵的,若是不爲空表示時覆蓋操做,直接return
就好,不須要檢查擴容.
若是oldVal
不爲空調用addCount
方法新增元素個數,並檢測是否須要擴容.
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
// 獲取hash,並進過擾動
int h = spread(key.hashCode());
// 判斷以進入獲取方法
// 1. 數組不爲空 & 數組長度大於0
// 2. 獲取的桶不爲空
if ((tab = table) != null && (n = tab.length) > 0 &&
// 獲取桶下標的公式都是通用的 `(n -1) & h`
(e = tabAt(tab, (n - 1) & h)) != null)
{// 對於桶中頭節點的hash,對比成功就不須要遍歷整個列表了
if ((eh = e.hash) == h) {
// 返回匹配的元素value
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
// 元素hash < 0的狀況有如下三種:
// 1. 數組正在擴容,Node的實際類型是ForwardingNode
// 2. 節點爲樹的root節點,TreeNode
// 3. 暫時保留的Hash, Node
// 不一樣的Node都會調用各自的find()方法
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
// 若是頭節點不是所需節點,且Map此時並未擴容
// 直接遍歷桶中元素查找
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
複製代碼
key
的hash,在獲取以前會先判斷tab是否爲空以及長度(n -1)& hash
獲取的桶下表獲取桶.key
的hash和桶的頭節點是否相等,相等則直接返回.hash < 0
,表示處於如下三種狀態,則是經過調用各自實際節點類型的find
方法獲取元素.
ForwardingNode
TreeNode
addCount
的做用:
ConcurrentHashMap
的元素計數/** * 參數: * x -> 具體增長的元素個數 * check -> 若是check<0不檢查時都須要擴容, */
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
// 1. counterCells不爲空
// 2. CAS修改baseCount屬性成功
if ((as = counterCells) != null ||
// CAS增長baseCOunt
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
// 線程爭用的狀態標記
boolean uncontended = true;
// 1. 計數cell爲null,或長度小於1
// 2. 隨機去一個數組位置爲爲空
// 3. CAS替換CounterCell的value失敗
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
// CAS增長CounterCell的value值失敗會調用fullAddCount方法
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
// 根據`check >= 0`判斷是否須要檢查擴容
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
// 1. 若是元素總數大於sizeCtl,表示達到了擴容閾值
// 2. tab數組不能爲空,已經初始化
// 3. table.length小於最大容,有擴容空間
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
// 根據數組長度獲取一個擴容標誌
int rs = resizeStamp(n);
if (sc < 0) {
// 若是sc的低16位不等於rs,表示標識符已經改變. // 待補充
// 若是nextTable爲空,表示擴容已經結束
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
// CAS替換sc值爲sc+1,成功則開始擴容
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
// 調用transfer開始擴容,此時nextTable已經指定
transfer(tab, nt);
}
// `sc > 0`表示數組此時並不在擴容階段,更新sizeCtl並開始擴容
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
// 調用transfer,nextTable待生成
transfer(tab, null);
s = sumCount();
}
}
}
複製代碼
/** * 參數: * tab -> 擴容的數組,通常爲table * f -> 線程持有的鎖對應的桶的頭節點 * 調用地方: * 1. `putVal`檢測到頭節點Hash爲MOVED */
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
// 1.參數數組不能爲空
// 2.參數f必須爲ForwardingNode類型
// 3.f.nextTab不能爲空
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
// resizeStamp一頓位操做打的我頭昏腦漲
// 獲取擴容的標識
int rs = resizeStamp(tab.length);
// Map仍處在擴容狀態的判斷
// 1. 判斷節點f的nextTable是否和成員變量的nextTable相同
// 2. 判斷傳入的tab和成員變量的table是否相同
// 3. sizeCtl是否小於0
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
// 兩種不一樣的狀況判斷
// 一. 不須要幫助擴容的狀況
// 1. sc的高16位不等於rs
// 2. sc等於rs+1
// 3. sc等於rs+MAX_RESIZERS
// 4. transferIndex <= 0, 這個好理解由於擴容時會分配並減去transferIndex,
// 小於0時表示數組的區域已分配完畢
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
// 二. CAS `sc+1`並調用transfer幫助擴容.
// 線程在幫助擴容時會對sizeCtl+1,完成時-1,表示標記
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
複製代碼
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
// stride爲這次須要遷移的桶的數目
// NCPU爲當前主機CPU數目
// MIN_TRANSFER_STRIDE爲每一個線程最小處理的組數目
// 1. 在多核中stride爲當前容量的1/8對CPU數目取整,例如容量爲16時,CPU爲2時結果是1
// 2. 在單核中stride爲n就爲當前數組容量
// !!! stride最小爲16,被限定死.
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
// nextTab是擴容的過渡對象,因此必需要先初始化
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
// !!! 重點就在這 擴容後的大小爲當前的兩倍 --> n << 1
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
// 擴容失敗,直接填充int的最大值
sizeCtl = Integer.MAX_VALUE;
// 直接退出
return;
}
// 更新成員變量
nextTable = nextTab;
// transferIndex爲數組長度
transferIndex = n;
}
// 記錄過渡數組的長度
int nextn = nextTab.length;
// 此處新建了一個ForwardingNode用於後續佔位
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
/** * 以上爲數據準備部分,初始化過渡數組,記錄長度,建立填充節點等操做 * 如下時真正擴容的主要邏輯 */
// 該變量控制遷移的進行,
boolean advance = true;
boolean finishing = false; // 兩個變量做用未知 finishing多是這次擴容標記
// 擴容的for循環裏面能夠分爲兩部分
// 一. while循環裏面肯定須要遷移的桶的區域,以及本次須要遷移的桶的下標
// 這個i就是須要遷移的桶的下標
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
// 該while代碼塊根據if的順序功能分別是
// --i: 負責遷移區域的向前推薦,i爲桶下標
// nextIndex: 在沒有獲取負責區域時,檢查是否還須要擴容
// CAS: 負責獲取這次for循環的區域,每次都爲stride個桶
while (advance) {
int nextIndex, nextBound;
// 這個`--i`每次都會進行,每次都會向前推動一個位置
if (--i >= bound || finishing)
advance = false;
// 所以若是當transferIndex<=0時,表示擴容的區域分配完
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
// CAS替換transferIndex的值,新值爲舊值減去分到的stride
// stride就表示這次的遷移區域,nextIndex就表明了下次起點
// 從這裏能夠看出擴容是從數組末尾開始向前推動的
}else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
// bount爲這次擴容的推動終點,下次起點
bound = nextBound;
// i這次擴容開始的桶下表
i = nextIndex - 1;
advance = false;
}
}
// 二. 擴容的邏輯代碼
// 1. 此if斷定擴容的結果,中間是三種異常值
// 1). i < 0的狀況時上面第二個if跳出的線程
// 2). i > 舊數組的長度
// 3). i+n大於新數組的長度
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
// 此階段擴容結束後的操做
// 1. 將nextTable置空,
// 2. 將中間過渡的數組賦值給table
// 3. sizeCtl變爲1.5倍(2n-0.5n)
if (finishing) {
nextTable = null;
table = nextTab;
// 分別使用有符號左移,無符號右移
sizeCtl = (n << 1) - (n >>> 1);
return;
}
// CAS替換`sizeCtl-1`,表示本線程的擴容任務已經完成
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
// 表達式成立表示還有別的線程在執行擴容,直接退出
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
// 表達式成立,表示已經所有擴容完成.
finishing = advance = true;
// 提交前從新檢查
i = n;
}
}
// 2. 擴容時發現負責的區域有空的桶直接使用ForwardingNode填充
// ForwardingNode持有nextTable的引用
else if ((f = tabAt(tab, i)) == null)
// CAS替換
advance = casTabAt(tab, i, null, fwd);
// 3. 表示處理完畢
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
// 4. 遷移桶的操做
else {
// sync保證原子性和可見性
synchronized (f) {
// 獲取數組中的第i個桶的頭節點
// 進入synchronized以後從新判斷,保證數據的正確性沒有在中間被修改
if (tabAt(tab, i) == f) {
// 此處擴容和HashMap有點像,分爲了lowNode和highNode兩個頭結點
Node<K,V> ln, hn;
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
// true的話會從新
advance = true;
}
// 樹的桶遷移操做
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
複製代碼
在addCount
方法中間檢查元素個數是否達到擴容閾值(0.75 * table.length),超過則觸發擴容,調用方法transfer
.
sizeCtl
會被CAS
替換爲(resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2
接下來就是teansfer
的代碼:
根據CPU
和當前容量算出每次擴容該分配的區域大小,最小爲16,表示爲stride
.
若過渡數組nextTab
未初始化,則先初始化數組.並使用transferIndex
記錄下舊數組長度,做爲擴容長度.
以上擴容須要的數據準備徹底開始具體的擴容操做:
在一個while
循環中獲取本次擴容包含的桶的範圍,即[transferIndex,transferIndex-stride]
的範圍,i
表示當前擴容的桶的下標.
三個判斷,四段代碼分別完成不一樣狀況下的操做
i
數值異常,< 0 || >= n || + n > nextn
,表示擴容已完成,且在while
循環中沒有分配的擴容任務.
若是此時finishing
參數爲true
表示總體擴容完成,且完成結束前的檢查.
若是finishing
爲false
,則**CAS
替換sizeCtl爲sizeCtl-1**,表示一個線程完成擴容任務並須要退出.
替換成功以後還會檢查sc
是否等於addCount
進來時的值,不相等就直接return
,表示還有線程未完成擴容任務.
i
對應的桶爲空,直接使用ForwardingNode
填充頭節點,表示此處正在擴容.並設advance
爲true
若是檢查到節點hash
爲Moved
表示當前節點爲ForwardingNode
,advance
爲true
.
排除了上面三種狀況,就是對應的桶的遷移工做,和HashMap
有點像.結束後設置advance
爲true
以後會再回到第4步.
putVal
等元素操做方法中,發現獲取的桶頭節點爲ForwdingNode
就表示ConcurrentHashMap
當前正在擴容,會立刻調用helpTransfer
幫助擴容.helpTransfer
中會有各類正確性判斷,只有在如下三個條件都都知足時纔會幫助擴容.
tab
是否不空ForwardingNode
nextTable
是否初始化.sc >> 16 != rs
- 標識符已經改變.sc == rs+1
- 觸發擴容的線程已退出,擴容已經完成sc == rs+MAX_RESIZER
- 參與擴容的線程達最大值transferIndex <= 0
- 擴容區域已經分配完transfer
幫助擴容.addCount
-> sizeCtl = (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2
helpTransfer
中會有判斷sizeCtl
高16位的操做,resizeStamp(n)
的值推高16位,賦值給sizeCtl
,而低16位則保存了這個2.也就是說在擴容的時候sizeCtl
的高16爲保存了標識符,而低16位保存了參與線程數目.有線程參與擴容 -> sizeCtl = sizeCtl - 1
線程退出擴容 -> sizeCtl = sizeCtl + 1
擴容完成 -> sizeCtl = nextTab.length * 0.75
HashMap
同樣,ConcurrentHashMap
並非在構造函數中就直接初始化底層的數組,而是在put
等存方法中,判斷是否須要擴容.private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
// `sizeCtl`表示有別的數組正在初始化,讓出CPU時間
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
// CAS操做,以-1置換`sizeCtl`的值
// 能夠看出 `sizeCtl==-1`時,表示數組正在某個線程初始化
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
// 置換以後須要從新檢測數組是否未初始化
if ((tab = table) == null || tab.length == 0) {
// sc就是置換以前的sizeCtl.
// 此時sizeCtl做爲初始容量.
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
// 初始化結束以後sc變爲0.75n,是擴容閾值
sc = n - (n >>> 2);
}
} finally {
// 爲避免異常退出致使sizeCtl永久爲-1,此處強制賦值.
sizeCtl = sc;
}
break;
}
}
// 返回了新建的數組地址
return tab;
}
複製代碼
initTable
方法時在putVal
而非構造函數,也算是CHM
中的一種懶加載機制.sizeCtl
經過CAS
置換爲-1,表示正在初始化sizeCtl
以前的值爲初始容量,sizeCtl
<=0時使用默認容量16sizeCtl
賦值爲0.75*數組容量(sizeCtl貫穿全篇,真的很重要)static final int resizeStamp(int n) {
return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}
複製代碼
Integer.numberOfLeadingZeros(n)
返回的是n的32位二進制形式前面的0的個數,例如值位16的int(32位)
類型二進制表示爲000000...0010000
,1前面的就有27個0,返回就是27.|
操做如今此處能夠簡單理解爲加法。static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}
複製代碼
HashMap
中的hash()
方法功能相似.CHM
中的擾動函數除了將高16位於低16位異或以外又與上HASH_BITS,能夠有效下降哈希衝突的機率,使元素分散更加均勻.@SuppressWarnings("unchecked")
// tab: 數組 i : 下標
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
複製代碼
CAS
形式替換數組元素// tab: 原始數組 i:下標 c:對比元素 v:替換元素
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
複製代碼
// tab:原始數組 i:下標 v:替換元素
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}
複製代碼
Unsafe
是一塊Java開發人員都不多接觸的區域,但這裏仍是簡單瞭解一下private static final sun.misc.Unsafe U;
// sizeCtl屬性的偏移地址
private static final long SIZECTL;
// transferIndex屬性的偏移地址
private static final long TRANSFERINDEX;
// baseCount的偏移地址
private static final long BASECOUNT;
// cellsBusy的偏移地址
private static final long CELLSBUSY;
// CounterCell類中value的偏移地址
private static final long CELLVALUE;
// Node數組第一個元素的偏移地址
private static final long ABASE;
// Node數組中元素的增量地址,與ABASE配合使用能訪問到數組的各元素
private static final int ASHIFT;
static {
try {
U = sun.misc.Unsafe.getUnsafe();
Class<?> k = ConcurrentHashMap.class;
// 先經過反射獲取到對應的屬性值,再經過Unsafe類獲取屬性的偏移地址
SIZECTL = U.objectFieldOffset
(k.getDeclaredField("sizeCtl"));
TRANSFERINDEX = U.objectFieldOffset
(k.getDeclaredField("transferIndex"));
BASECOUNT = U.objectFieldOffset
(k.getDeclaredField("baseCount"));
CELLSBUSY = U.objectFieldOffset
(k.getDeclaredField("cellsBusy"));
Class<?> ck = CounterCell.class;
CELLVALUE = U.objectFieldOffset
(ck.getDeclaredField("value"));
Class<?> ak = Node[].class;
// 獲取數組中第一個元素的偏移地址
ABASE = U.arrayBaseOffset(ak);
// 獲取數組的增量地址
int scale = U.arrayIndexScale(ak);
if ((scale & (scale - 1)) != 0)
throw new Error("data type scale not a power of two");
ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
} catch (Exception e) {
throw new Error(e);
}
}
複製代碼