先預先說明,我這邊jdk的代碼版本爲1.8.0_11,同時,由於我直接在本地jdk源碼上進行了部分修改、調試,因此,致使你們看到的我這邊貼的代碼,和你們的不太同樣。html
不過,我對源碼進行修改、重構時,會保證和原始代碼的功能、邏輯嚴格一致,更多時候,可能只是修改變量名,方便理解。java
你們也知道,jdk代碼寫得實在是比較深奧,變量名常常都是單字符,i,j,k啥的,實在是很難理解,因此,我通常會根據本身的理解,去重命名,爲了減輕咱們的頭腦負擔。node
至於怎麼去修改代碼並調試,能夠參考我以前的文章:git
曹工力薦:調試 jdk 中 rt.jar 包部分的源碼(可自由增長註釋,修改代碼並debug)數組
文章中,我改過的代碼放在:多線程
https://gitee.com/ckl111/jdk-debugjvm
你們知道,concurrentHashMap底層是數組+鏈表+紅黑樹,數組的長度假設爲n,在hashmap初始化的時候,這個n除了做爲數組長度,還會做爲另外一個關鍵field的值。ide
/** * Table initialization and resizing control. When negative, the * table is being initialized or resized: -1 for initialization, * else -(1 + the number of active resizing threads). Otherwise, * when table is null, holds the initial table size to use upon * creation, or 0 for default. After initialization, holds the * next element count value upon which to resize the table. */ private transient volatile int sizeCtl;
該字段很是關鍵,根據取值不一樣,有不一樣的功能。函數
public ConcurrentHashMap() { }
此時,sizeCtl被初始化爲0.fetch
此時,sizeCtl也是32,和容量一致。
public ConcurrentHashMap(Map<? extends K, ? extends V> m) { this.sizeCtl = DEFAULT_CAPACITY; putAll(m); }
此時,sizeCtl,直接使用了默認值,16.
public ConcurrentHashMap(int initialCapacity, float loadFactor) { this(initialCapacity, loadFactor, 1); }
這裏重載了:
這裏,咱們傳入的負載因子爲0.75,這也是默認的負載因子,傳入的初始容量爲14.
這裏面會根據: 1 + 14/0.75 = 19,拿到真正的size,而後根據size,獲取到第一個大於19的2的n次方,即32,來做爲數組容量,而後sizeCtl也被設置爲32.
實際上,new一個hashmap的時候,咱們並無建立支撐數組,那,何時建立數組呢?是在真正往裏面放數據的時候,好比put的時候。
/** Implementation for put and putIfAbsent */ final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); int hash = spread(key.hashCode()); int binCount = 0; ConcurrentHashMapPutResultVO vo = new ConcurrentHashMapPutResultVO(); vo.setBinCount(0); for (Node<K,V>[] tab = table;;) { int tableLength; // 1 if (tab == null) { tab = initTable(); continue; } ... }
1處,即會去初始化table。
/** * Initializes table, using the size recorded in sizeCtl. * 初始化hashmap,使用sizeCtl做爲容量 */ private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) { sc = sizeCtl; if (sc < 0){ Thread.yield(); // lost initialization race; just spin continue; } /** * 走到這裏,說明sizeCtl大於0,大於0,表明什麼,能夠去看下其構造函數,此時,sizeCtl表示 * capacity的大小。 * {@link #ConcurrentHashMap(int)} * * cas修改成-1,若是成功修改成-1,則表示搶到了鎖,能夠進行初始化 * */ // 1 boolean bGotChanceToInit = U.compareAndSwapInt(this, SIZECTL, sc, -1); if (bGotChanceToInit) { try { tab = table; /** * 若是當前表爲空,還沒有初始化,則進行初始化,分配空間 */ if (tab == null || tab.length == 0) { /** * sc大於0,則以sc爲準,不然使用默認的容量 */ int n = (sc > 0) ? sc : DEFAULT_CAPACITY; Node<K, V>[] nt = (Node<K, V>[]) new Node<?, ?>[n]; table = tab = nt; /** * n >>> 2,無符號右移2位,則是n的四分之一。 * n- n/4,結果爲3/4 * n * 則,這裏修改sc爲 3/4 * n * 好比,默認容量爲16,則修改sc爲12 */ // 2 sc = n - (n >>> 2); } } finally { /** * 修改sizeCtl到field */ // 3 sizeCtl = sc; } break; } } return tab; }
通過上面的分析,initTable時,這個字段可能有兩種取值:
上面說的是,在put的時候去initTable,實際上,這個initTable,也會在如下函數中被調用,其共同點就是,都是往裏面放數據的操做:
上面說了不少,目前,咱們知道的是,在initTable後,sizeCtl的值,是舊的數組的長度 * 0.75。
接下來,咱們看看擴容時機,在put時,會調用putVal,這個函數的大致步驟:
final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); // 1 int hash = spread(key.hashCode()); int binCount = 0; System.out.println("binCount:" + binCount); // 2 ConcurrentHashMapPutResultVO vo = new ConcurrentHashMapPutResultVO(); vo.setBinCount(0); for (Node<K,V>[] tab = table;;) { int tableLength; // 3 if (tab == null) { tab = initTable(); continue; } tableLength = tab.length; if (tableLength == 0) { tab = initTable(); continue; } int entryNodeHashCode; // 4 int entryNodeIndex = (tableLength - 1) & hash; Node<K,V> entryNode = tabAt(tab,entryNodeIndex); /** * 5 若是咱們要放的桶,仍是個空的,則直接cas放進去 */ if (entryNode == null) { Node<K, V> node = new Node<>(hash, key, value, null); // no lock when adding to empty bin boolean bSuccess = casTabAt(tab, entryNodeIndex, null, node); if (bSuccess) { break; } else { /** * 若是沒成功,則繼續下一輪循環 */ continue; } } entryNodeHashCode = entryNode.hash; /** * 6 若是要放的這個桶,正在遷移,則幫助遷移 */ if (entryNodeHashCode == MOVED){ tab = helpTransfer(tab, entryNode); continue; } /** * 7 對entryNode加鎖 */ V oldVal = null; System.out.println("sync"); synchronized (entryNode) { /** * 這一行是判斷,在咱們執行前面的一堆方法的時候,看看entryNodeIndex處的node是否變化 */ if (tabAt(tab, entryNodeIndex) != entryNode) { continue; } /** * 8 hashCode大於0,說明不是處於遷移狀態 */ if (entryNodeHashCode >= 0) { /** * 9 鏈表中找到合適的位置並放入 */ findPositionAndPut(key, value, onlyIfAbsent, hash, vo, entryNode); binCount = vo.getBinCount(); oldVal = (V) vo.getOldValue(); } else if (entryNode instanceof TreeBin) { ... } } System.out.println("binCount:" + binCount); // 10 if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, entryNodeIndex); if (oldVal != null) return oldVal; break; } } // 11 addCount(1L, binCount); return null; }
1處,計算key的hashcode
2處,我這邊new了一個對象,裏面兩個字段:
public class ConcurrentHashMapPutResultVO<V> { int binCount; V oldValue; }
其中,oldValue用來存放,若是put進去的key/value,其中key已經存在的話,通常會直接覆蓋以前的舊值,這裏主要存放以前的舊值,由於咱們須要返回舊值。
binCount,則存放:在找到對應的hash桶以後,在鏈表中,遍歷了多少個元素,該值後面會使用,做爲一個標誌,當該標誌大於0的時候,纔去進一步檢查,看看是否擴容。
3處,若是table爲null,說明table裏沒有任何一個鍵值對,數組也還沒建立,則初始化table
4處,根據hashcode,和(數組長度 - 1)相與,計算出應該存放的哈希桶在數組中的索引
5處,若是要放的哈希桶,仍是空的,則直接cas設置進去,成功則跳出循環,不然重試
6處,若是要放的這個桶,該節點的hashcode爲MOVED(一個常量,值爲-1),說明有其餘線程正在擴容該hashmap,則幫助擴容
7處,對要存放的hash桶的頭節點加鎖
8處,若是頭節點的hashcode大於0,說明是拉了一條鏈表,則調用子方法(我這邊本身抽的),去找到合適的位置並插入到鏈表
9處,findPositionAndPut,在鏈表中,找到合適的位置,並插入
10處,在findPositionAndPut函數中,會返回:爲了找到合適的位置,遍歷了多少個元素,這個值,就是binCount。
若是這個binCount大於8,則說明遍歷了8個元素,則須要轉紅黑樹了。
11處,由於咱們新增了一個元素,總數天然要加1,這裏面會去增長總數,和檢查是否須要擴容。
其中,第9步,由於是本身抽的函數,因此這裏貼出來給你們看下:
/** * 遍歷鏈表,找到應該放的位置;若是遍歷完了還沒找到,則放到最後 * @param key * @param value * @param onlyIfAbsent * @param hash * @param vo * @param entryNode */ private void findPositionAndPut(K key, V value, boolean onlyIfAbsent, int hash, ConcurrentHashMapPutResultVO vo, Node<K, V> entryNode) { vo.setBinCount(1); for (Node<K,V> currentIterateNode = entryNode; ; vo.setBinCount(vo.getBinCount() + 1)) { /** * 若是當前遍歷指向的節點的hash值,與參數中的key的hash值相等,則, * 繼續判斷 */ K currentIterateNodeKey = currentIterateNode.key; boolean bKeyEqualOrNot = Objects.equals(currentIterateNodeKey, key); /** * key的hash值相等,且equals比較也相等,則就是咱們要找的 */ if (currentIterateNode.hash == hash && bKeyEqualOrNot) { /** * 獲取舊的值 */ vo.setOldValue(currentIterateNode.val); /** * 覆蓋舊的node的val */ if (!onlyIfAbsent) currentIterateNode.val = value; // 這裏直接break跳出循環 break; } /** * 把當前節點保存起來 */ Node<K,V> pred = currentIterateNode; /** * 獲取下一個節點 */ currentIterateNode = currentIterateNode.next; /** * 若是下一個節點爲null,說明當前已是鏈表的最後一個node了 */ if ( currentIterateNode == null) { /** * 則在當前節點後面,掛上新的節點 */ pred.next = new Node<K,V>(hash, key, value, null); break; } } }
第11步,也是咱們要看的重點:
private final void addCount(long delta, int check) { CounterCell[] counterCellsArray = counterCells; // 1 long b = baseCount; // 2 long newBaseCount = b + delta; /** * 3 直接cas在baseCount上增長 */ boolean bSuccess = U.compareAndSwapLong(this, BASECOUNT, b, newBaseCount); if ( counterCellsArray != null || !bSuccess) { ... newBaseCount = sumCount(); } // 4 if (check >= 0) { while (true) { Node<K,V>[] tab = table; Node<K,V>[] nt; int n = 0; // 5 int sc = sizeCtl; // 6 boolean bSumExteedSizeControl = newBaseCount >= (long) sc; // 7 boolean bContinue = bSumExteedSizeControl && tab != null && (n = tab.length) < MAXIMUM_CAPACITY; if (bContinue) { int rs = resizeStamp(n); if (sc < 0) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) // 8 transfer(tab, null); newBaseCount = sumCount(); } else { break; } } } }
1處,baseCount是一個field,存儲當前hashmap中,有多少個鍵值對,你put一次,就一個;remove一次,就減一個。
2處,b + delta,其中,b就是baseCount,是舊的數量;dalta,咱們傳入的是1,就是要增長的元素數量
因此,b + delta,獲得的,就是通過此次put後,預期的數量
3處,直接cas,修改baseCount這個field爲 新值,也就是第二步拿到的值。
4處,這裏檢查check是否大於0,check,是第二個形參;這個參數,咱們外邊怎麼傳的?
addCount(1L, binCount);
不就是bincount嗎,也就是說,這裏檢查:咱們在put過程當中,在鏈表中遍歷了幾個元素,若是遍歷了至少1個元素,這裏要進入下面的邏輯:檢查是否要擴容,由於,你binCount大於0,說明可能已經開始出現哈希衝突了。
5處,取field:sizeCtl的值,給局部變量sc
6處,判斷當前的新的鍵值對總數,是否大於sc了;好比容量是16,那麼sizeCtl是12,若是此時,hashmap中存放的鍵值對已經大於等於12了,則要檢查是否擴容了
7處,幾個組合條件,查看是否要擴容,其中,主要的條件就是第6步的那個。
8處,調用transfer,進行擴容
總結一下,通過前面的第6處,咱們知道,若是存放的鍵值對總數,已經大於等於0.75*哈希桶(也就是底層數組的長度)的數量了,那麼,就基本要擴容了。
擴容也是一個相對複雜的過程,這裏只說大概,詳細的放下講。
假設,如今底層數組長度,128,也就是128個哈希桶,當存放的鍵值對數量,大於等於 128 * 0.75的時候,就會開始擴容,擴容的過程,大概是:
這個過程,昨天的博文,畫了個圖,這裏再貼一下。
擴容後:
但是,若是咱們要一個個去遍歷全部哈希桶,而後遍歷對應的鏈表/紅黑樹,會不會太慢了?徹底是單線程工做啊。
換個思路,咱們能不能加快點呢?好比,線程1能夠去處理數組的 0 -15這16個桶,16- 31這16個桶,徹底可讓線程2去作啊,這樣的話,不就多線程了嗎,不是就快了嗎?
沒錯,jdk就是這麼幹的。
jdk維護了一個field,這個field,專門用來存當前能夠獲取的任務的索引,舉個例子:
你們看上圖就懂了,一開始,這裏假設咱們有128個桶,每次每一個線程,去拿16個桶來處理。
剛開始的時候,field:transferIndex就等於127,也就是最後一個桶的位置,而後咱們要從後往前取,那麼,127 到112,恰好就是16個桶,因此,申請任務的時候,就會用cas去更新field爲112,則表示,本身取到了112 到127這一個區間的hash桶遷移任務。
若是自始至終,只有一個線程呢,它處理完了112 - 127這一批hash桶後,會繼續取下一波任務,96 - 112;以此類推。
若是多線程的話呢,也是相似的,反正都是去嘗試cas更新transferIndex的值爲任務區間的開始下標的值,成功了,就算任務認領成功了。
多線程,怎麼知道須要去幫助擴容呢? 發起擴容的線程,在處理完bucket[k]時,會把老的table中的對應的bucket[k]的頭節點,修改成下面這種類型的節點:
static final class ForwardingNode<K,V> extends Node<K,V> { final Node<K,V>[] nextTable; ForwardingNode(Node<K,V>[] tab) { super(MOVED, null, null, null); this.nextTable = tab; } }
其餘線程,在put或者其餘操做時,發現頭結點變成了這個,就會去協助擴容了。
我我的感受,差異不大,多線程擴容,就是多線程去獲取本身的那一段任務,而後來完成。我這邊寫了簡單的demo,不過感受仍是頗有用的,能夠幫助咱們理解。
import sun.misc.Unsafe; import java.lang.reflect.Field; import java.util.concurrent.*; import java.util.concurrent.locks.LockSupport; public class ConcurrentTaskFetch { /** * 空閒任務索引,獲取任務時,從該下標開始,往前獲取。 * 好比當前下標爲10,表示tasks數組中,0-10這個區間的任務,沒人領取 */ // 0 private volatile int freeTaskIndexForFetch; // 1 private static final int TASK_COUNT_PER_FETCH = 16; // 2 private String[] tasks = new String[128]; public static void main(String[] args) { ConcurrentTaskFetch fetch = new ConcurrentTaskFetch(); // 3 fetch.init(); ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100)); executor.prestartAllCoreThreads(); CyclicBarrier cyclicBarrier = new CyclicBarrier(10); // 4 for (int i = 0; i < 10; i++) { executor.execute(new Runnable() { @Override public void run() { try { cyclicBarrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } // 5 FetchedTaskInfo fetchedTaskInfo = fetch.fetchTask(); if (fetchedTaskInfo != null) { System.out.println("thread:" + Thread.currentThread().getName() + ",get task success:" + fetchedTaskInfo); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("thread:" + Thread.currentThread().getName() + ", process task finished"); } } }); } LockSupport.park(); } public void init() { for (int i = 0; i < 128; i++) { tasks[i] = "task" + i; } freeTaskIndexForFetch = tasks.length; } // 6 public FetchedTaskInfo fetchTask() { System.out.println("Thread start fetch task:"+Thread.currentThread().getName()+",time: "+System.currentTimeMillis()); while (true){ // 6.1 if (freeTaskIndexForFetch == 0) { System.out.println("thread:" + Thread.currentThread().getName() + ",get task failed,there is no task"); return null; } /** * 6.2 獲取當前任務的集合的上界 */ int subTaskListEndIndex = this.freeTaskIndexForFetch; /** * 6.3 獲取當前任務的集合的下界 */ int subTaskListStartIndex = subTaskListEndIndex > TASK_COUNT_PER_FETCH ? subTaskListEndIndex - TASK_COUNT_PER_FETCH : 0; /** * 6.4 * 如今,咱們拿到了集合的上下界,即[subTaskListStartIndex,subTaskListEndIndex) * 該區間爲前開後閉,因此,實際的區間爲: * [subTaskListStartIndex,subTaskListEndIndex - 1] */ /** * 6.5 使用cas,嘗試更新{@link freeTaskIndexForFetch} 爲 subTaskListStartIndex */ if (U.compareAndSwapInt(this, FREE_TASK_INDEX_FOR_FETCH, subTaskListEndIndex, subTaskListStartIndex)) { // 6.6 FetchedTaskInfo info = new FetchedTaskInfo(); info.setStartIndex(subTaskListStartIndex); info.setEndIndex(subTaskListEndIndex - 1); return info; } } } // Unsafe mechanics private static final sun.misc.Unsafe U; private static final long FREE_TASK_INDEX_FOR_FETCH; static { try { // U = sun.misc.Unsafe.getUnsafe(); Field f = Unsafe.class.getDeclaredField("theUnsafe"); f.setAccessible(true); U = (Unsafe) f.get(null); Class<?> k = ConcurrentTaskFetch.class; FREE_TASK_INDEX_FOR_FETCH = U.objectFieldOffset (k.getDeclaredField("freeTaskIndexForFetch")); } catch (Exception e) { throw new Error(e); } } static class FetchedTaskInfo{ int startIndex; int endIndex; public int getStartIndex() { return startIndex; } public void setStartIndex(int startIndex) { this.startIndex = startIndex; } public int getEndIndex() { return endIndex; } public void setEndIndex(int endIndex) { this.endIndex = endIndex; } @Override public String toString() { return "FetchedTaskInfo{" + "startIndex=" + startIndex + ", endIndex=" + endIndex + '}'; } } }
0處,定義了一個field,相似於前面的transferIndex
/** * 空閒任務索引,獲取任務時,從該下標開始,往前獲取。 * 好比當前下標爲10,表示tasks數組中,0-10這個區間的任務,沒人領取 */ // 0 private volatile int freeTaskIndexForFetch;
1,定義了每次取多少個任務,這裏也是16個
private static final int TASK_COUNT_PER_FETCH = 16;
2,定義任務列表,共128個任務
3,main函數中,進行任務初始化
public void init() { for (int i = 0; i < 128; i++) { tasks[i] = "task" + i; } freeTaskIndexForFetch = tasks.length; }
主要初始化任務列表,其次,將freeTaskIndexForFetch 賦值爲128,後續取任務,從這個下標開始
4處,啓動10個線程,每一個線程去執行取任務,按理說,咱們128個任務,每一個線程取16個,只能有8個線程取到任務,2個線程取不到
5處,線程邏輯裏,去獲取任務
6處,獲取任務的方法定義
6.1 ,若是可獲取的任務索引爲0了,說明沒任務了,直接返回
6.2,獲取當前任務的集合的上界
6.3,獲取當前任務的集合的下界,減去16就好了
6.4,拿到了集合的上下界,即[subTaskListStartIndex,subTaskListEndIndex)
6.5, 使用cas,更新field爲:6.4中的任務下界。
執行效果演示:
能夠看到,8個線程取到任務,2個線程沒取到。
其實jvm內存分配時,也是相似的思路,好比,設置堆內存爲200m,那這200m是啓動時立馬從操做系統分配了的。
接下來,就是每次new對象的時候,去這個大內存裏,找個小空間,這個過程,也是須要cas去競爭的,好比確定也有個全局的字段,來表示當前可用內存的索引,好比該索引爲100,表示,第100個字節後的空間是能夠用的,那我要new個對象,這個對象有3個字段,須要大概30個字節,那我是否是須要把這個索引更新爲130。
這中間是多線程的,因此也是要cas操做。
道理都是相似的。
時間倉促,有問題在所不免,歡迎及時指出或加羣討論。