推薦文章:
網上的文章,jdk7版本的比較多,因爲本身本地是jdk8,因此仍是整理jdk8的邏輯吧。
------------------------------------------------------------------------------------------
1、結構跟思路說明,主要字段解釋
HashMap的源碼咱們比較清楚,大概結構就是數組加鏈表,數組默認長度爲16,負載因子爲0.75,超事後擴容,長度爲當前數組長度*2,數組下的鏈表元素超過8個時轉換爲紅黑樹結構,優化查找性能。jdk8中ConcurrentHashMap的結構跟HashMap同樣,也是直接數組加鏈表,每一個鏈表的首節點做爲synchronized的同步對象使用,對該鏈表數據的訪問都要通過同步塊synchronized,跟jdk7不同,這點最後再說。jdk8的難點在於多線程擴容部分。
主要字段說明:
int DEFAULT_CAPACITY = 16; // 默認容量,哈希表數組的初始長度
int DEFAULT_CONCURRENCY_LEVEL = 16; // 默認併發級別,也就是容許的最大併發線程數
float LOAD_FACTOR = 0.75f; // 負載因子
int TREEIFY_THRESHOLD = 8; // 轉化爲紅黑樹的閾值
volatile Node<K,V>[] table; // 哈希表數組
volatile int sizeCtl; // 用來控制初始化跟擴容的字段。-1表示初始化,-(1+擴容線程數)表示在擴容或者縮容,默認值0,未初始化且大於0則表示初始容量,擴容後值爲下一次應該擴容的閾值(當前容量*0.75)
2、put源碼
put總體邏輯就是根據key哈希值計算在哈希桶的位置,而後將數據放入鏈表,具體以下:
一、若是未初始化則先進行初始化,完成後再次進入循環;
二、若找到位置,該處元素爲null,則用cas方式進行設置,成功則退出,不然進入下一次循環;
三、若發現對應位置的鏈表首節點hash值爲-1,說明在進行擴容,則當前線程也幫助進行擴容,擴容完成後繼續進行put操做;
四、以上判斷都結束後,synchronized鎖定鏈表首節點,進入鏈表遍歷,進行設置。
相關代碼:
public V put(K key, V value) {
return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) { // onlyIfAbsent,是否保留原來的值,默認false,也就是覆蓋舊值
if (key == null || value == null) throw new NullPointerException(); // 不容許null的key跟value,由於有可能會做爲對象鎖使用,synchronized是基於monitor機制的,null沒有對象頭
int hash = spread(key.hashCode()); // 對hashcode進行散列,獲取一個相對分佈更加均勻的hash值。這個散列是用hashcode的高16位跟低16位進行異或運算獲得的一個結果。
int binCount = 0;
for (Node<K,V>[] tab = table;;) { // 哈希數組賦值給tab----爲啥要
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0) // 哈希數組默認是空的,要初始化
tab = initTable(); // 見下文,new了一個node數組
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // 根據hash值取table中元素,該元素爲空
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) // cas設置該元素的值,失敗則說明有線程競爭,break進入下次循環
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED) // 從名字來看,正在擴容,MOVED 值爲-1,這個擴容部分再看
tab = helpTransfer(tab, f); //
else { // 找到哈希表中的位置,並且該位置元素不爲null,hash值也不是-1,進入正常的遍歷鏈表賦值
V oldVal = null;
synchronized (f) { // 就是這個操做,把鏈表首節點做爲鎖對象進行同步(也多是紅黑樹頭節點)
if (tabAt(tab, i) == f) { // 普通Node節點
if (fh >= 0) { // 首節點的hash值
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { // key 的hash值相等且equals比較也相同,則覆蓋
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) { // 在鏈表追加
pred.next = new Node<K,V>(hash, key, value, null);
break;
}
}
}
else if (f instanceof TreeBin) { // 紅黑樹節點
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD) // 鏈表長度達到8個,轉換爲紅黑樹節點--這個並不徹底是
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
對於鏈表過長而轉換爲紅黑樹,實際是並非,還要看哈希表容量,容量小於64的話,哈希表擴容,通常來講就解決了單節點過長問題。若單鏈表過長,且哈希表長度超過64,這纔會轉換爲紅黑樹。
哈希桶(數組)初始化過程,也是整個map的初始化過程:
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) { // 數組爲空
if ((sc = sizeCtl) < 0) // 初始化的"功勞"被別人搶走了
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { // 搶奪初始化"功勞"的操做,cas設置sizeCtl = -1
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY; // 數組大小默認爲16
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; //建了個node數組,長度爲sizeCtl大小(默認16)
table = tab = nt; // 數組賦給哈希表
sc = n - (n >>> 2); // sc 變爲原來的3/4,爲啥是這個數,由於負載因子0.75,正好用這個數作記錄
}
} finally {
sizeCtl = sc; // 第12行的數據給了sizeCtl
}
break;
}
}
return tab;
}
這裏的這個spread(),就是把高位拿下來也參與了hash。由於若是僅僅取餘的話,其實看的只是最低的幾位,即便高位不一樣,最後也會被分到同一個位置,這會致使數據在hash表中分佈不均勻。將高位跟低位一塊兒參與運算,能夠適當的減輕這種狀況。
3、get源碼
get的主要邏輯也比較清晰,就是根據哈希值進行取餘來肯定位置,而後根據equals來比較是否相等,是則取出。hash值爲負數,則認爲是在進行擴容致使的數據遷移。數據遷移的總體邏輯最後整理。邏輯以下:
一、判斷是否已初始化,若未初始化,則直接返回null;hash值對應的鏈表首節點爲null,也直接返回null;
二、hash值跟鏈表首節點相等,且key地址相同或者equals相等,則認爲鏈表首節點就是要找的節點,返回其值;
三、若鏈表首節點的hash值爲負數,說明map在擴容,鏈表可能數據不全;經過節點Node的find方法進行查找;(這個方法須要分析一下)
四、正常的遍歷鏈表進行查找;
相關代碼:
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode()); // 跟put同樣的散列操做,結果最後會被修正爲正數
if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { //table已經初始化,並且hash值對應的位置,首節點有值,未初始化直接返回null
if ((eh = e.hash) == h) { // 首節點hash值跟要查找的key的hash值同樣
if ((ek = e.key) == key || (ek != null && key.equals(ek))) // 地址或者equals結果相同
return e.val;
}
else if (eh < 0) // hash值不同的狀況(元素能被分到這個地方,按道理hash應該是取餘相等):hash值 < 0,說明在擴容或者爲紅黑樹
return (p = e.find(h, key)) != null ? p.val : null; //查找元素,支持紅黑樹節點
while ((e = e.next) != null) { // hash值不同,挨個查找
if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
4、擴容源碼
先理清楚方法中幾個變量的含義:
int size ---- 本次想要擴充的數量(通常是數組長度的一半(右移1位),putAll的時候是新map的長度)
int c ---- 當前數組擴容後的大小
private final void tryPresize(int size) {
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(size + (size >>> 1) + 1); // 計算本次擴容的目標值
int sc;
while ((sc = sizeCtl) >= 0) { // 目標值不必定足夠大,要跟sizeCtl進行比較
Node<K,V>[] tab = table; int n;
if (tab == null || (n = tab.length) == 0) { //沒有初始化
n = (sc > c) ? sc : c; //擴容的閾值跟計算的目標值比較,較大者爲目標值(這裏至關於用一個map來初始化concurrentHashMap了)
if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {// 搶奪初始化,Unsafe的這個方法說明見下文
try {
if (table == tab) {
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
}
}
else if (c <= sc || n >= MAXIMUM_CAPACITY) // 沒到達擴容閾值或者表已經太大,沒法再擴容
break;
else if (tab == table) { //
int rs = resizeStamp(n);
if (sc < 0) {
Node<K,V>[] nt;
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
}
}
}
這裏是擴容的主要方法,咱們先理清楚擴容的容量是怎麼變化的:
直接調用這個tryPresize(int size)的有兩個地方putAll(Map m)跟treeifyBin(Node[] tab, int index),前者傳進來的參數是m.size(),後者是tab.length << 1。結合initTable,初始化後的數組長度爲16,若是觸發了treeifyBin,因爲數組長度小於64,會進行數組擴容而不是轉換爲紅黑樹,擴容時傳值過來的是16 << 1也就是32,而後這個tableSizeFor(32+(32 >>> 1) +1)也就是tableSizeFor(49),tableSizeFor()是個對入參往上取2的最小冪的過程,也就是64。看到沒,由於初始數組長度是16,若是在數組長度不是很長的狀況下觸發了紅黑樹轉換機制,那麼數組長度直接成了64,而不是32!!這是爲了省事兒吧,由於這種狀況下,hash表元素很少,而單鏈表的長度竟然到了8個,能夠說是元素分佈的很是不均勻了,極端一點一下擴容多點,相對來講能使元素分佈的分散一點。之後再有鏈表過長的,也不會觸發這個tryPresize了,由於第一次擴容後就已經達到紅黑樹要求的數組長度最低值了。
而後看putAll(Map m)這個方法,這個是將另外的map添加到ConcurrentHashMap中來的方法,擴容的目標數值是size + (size >>> 1) + 1,size爲新來的map的元素個數,這個具體最終要擴展到多少要跟sizeCtl進行比較,若是sizeCtl大則說明不用擴容,直接退出,不然進行擴容。
U.compareAndSwapInt(this, SIZECTL, sc, -1)這一行是調用的jdk的Unsafe類的一個方法,看名字是cas操做修改一個int值。Unsafe類是jdk用於操做內存等一些精細操做的類,僅限於jdk使用,咱們調用會報錯(原理是檢查類加載對象,咱們能夠有方法繞過去),有四個參數compareAndSwapInt(this, offset, expect, update),第一個參數是被修改的對象,offset是被修改字段在對象中的內存偏移量(從對象在內存中的起始位置算到該字段的偏移量),第三個是指望值,是被修改字段的初始值,至關於樂觀鎖的版本號,update是若是cas判斷符合後字段要新後的新值。這一部分的相關介紹,請參考:
https://blog.csdn.net/sherld/article/details/42492259
到這裏,總體思路相對來講比較明確。複雜的是擴容的具體執行transfer的代碼;這裏要考慮多個線程的併發擴容問題。讀了幾遍仍是有的地方沒太明白:
transfer的思路:
爲了便於多線程並行處理,不會引發衝突,這裏對哈希桶進行了分段,每一個線程處理的數據爲一個「步長」--也就是一個stride的長度,數據遷移完成後再繼續處理下一個分段。須要注意的是對於整個哈希桶來講,數據的遷移是從末尾開始倒着往前進行的。好比哈希桶數組的長度是64,則先進行遷移的是49-64這16個鏈表對應的數據,這也正是一個步長的長度。對於舊數組,擴容線程每完成一個鏈表的數據遷移,就會將該鏈表首節點元素修改成ForwardingNode,查找元素能夠經過該對象的find方法進行,remove,pudate,put則要優先執行擴容,而後再進行相關操做。
int stride :步長,每一個擴容線程要處理的哈希桶的位數;
int transferIndex :遷移下標,下一個要進行擴容的線程應該獲取步長的起始位置;注意是倒着來的;
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { //每次擴容結束,nextTab 會被置空,所以第一個擴容的線程的nextTab應該是null
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) // 步長的計算,默認最小值是16,so,數組不是很大的狀況下,通常是16。從計算過程看出,對8線程cpu來講,這個值要比16大,n最小應該是64*16=1024
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // initiating //新數組的初始化,直接容量翻倍
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n; //由於是倒着來的,注意這裏是n而不是0
}
int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true; // hash桶完成的標誌位,true已經處理過了,false未處理
boolean finishing = false; // to ensure sweep before committing nextTab //整個擴容完成的標誌位
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) { // 更新待遷移的hash桶索引
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { //本線程任務搶佔成功,將下個線程要開始的位置設置好
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) { // 全部節點完成
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { // 本線程遷移完成,
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) // 最後一個線程
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
else if ((f = tabAt(tab, i)) == null) //該位置爲空,不須要遷移,直接放一個fwd對象,表示遷移過了
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED) //已是fwd對象,說明遷移完了,不用處理
advance = true; // already processed
else {
synchronized (f) { // 正常的遷移邏輯,跟hashmap相似,利用按位與操做將鏈表數據巧妙的分到兩個鏈表中,並且位置也已經計算好了
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
if (fh >= 0) {
//普通節點遷移,略
advance = true;
}
else if (f instanceof TreeBin) {
//樹節點遷移,略
advance = true;
}
}
}
}
}
}
第3一、32行代碼的解釋:第一個擴容的線程,執行transfer方法以前,會設置 sizeCtl = (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2),後續幫其擴容的線程,執行transfer方法以前,會設置 sizeCtl = sizeCtl+1,每個退出transfer的方法的線程,退出以前,會設置 sizeCtl = sizeCtl-1,那麼最後一個線程退出時:必然有sc == (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2),即 (sc - 2) == resizeStamp(n) << RESIZE_STAMP_SHIFT。
本覺得ConcurrentHashMap的結構瞭解了,翻看源碼應該會很容易,結果實際花費的時間比預期多的多。主要緣由是對於擴容的transfer思路以及其中各類標誌位的處理以及這麼處理的緣由不甚瞭解,須要進行反覆猜想推理。也幸而有各路大神提早研究過,這纔跟在大神身後亦步亦趨,終於理清楚。