併發工具類咱們已經講了不少,這些工具類的「目標」是讓咱們只關注任務自己,而且忽視線程間合做細節,簡化了併發編程難度的同時,也增長了不少安全性。工具類的對使用者的「目標」雖然一致,但每個工具類自己都有它獨特的應用場景,好比:html
將上面三種通用場景形象化展現一下:java
結合上圖相信你的腦海裏已經浮現出這幾個工具類的具體實現方式,感受這已經涵蓋了全部的併發場景。程序員
TYTS,以上這些方式的子線程接到任務後不會再繼續拆分紅「子子」任務,也就是說,子線程即使接到很大或很複雜的任務也得硬着頭皮努力執行完,很顯然這個大任務是問題關鍵算法
若是能把大任務拆分紅更小的子問題,直到子問題簡單到能夠直接求解就行了,這就是分治的思想編程
在計算機科學中,分治法是一種很重要的算法。字面上的解釋是「分而治之」,就是把一個複雜的問題分紅兩個或更多的相同或類似的子問題,再把子問題分紅更小的子問題……直到最後子問題能夠簡單的直接求解,原問題的解就變成了子問題解的合併。設計模式
這個技巧是不少高效算法的基礎,如排序算法 (快速排序,歸併排序),傅立葉變換 (快速傅立葉變換)……,若是你是搞大數據的,MapReduce 就是分支思想的典型,若是你想更詳細的理解分治相關的算法,請參考這篇一文圖解分治算法和思想api
結合上面的描述,相信你腦海中已經構建出來分治的模型了:數組
那全部的大任務都能用分治算法來解決嗎?很顯然不是的安全
整體來講,分治法所能解決的問題通常具備如下幾個特徵:多線程
瞭解了分治算法的核心思想,咱們就來看看 Java 是如何利用分治思想拆分與合併任務的吧
有子任務,天然要用到多線程。咱們很早以前說過,執行子任務的線程不容許單首創建,要用線程池管理。秉承相同設計理念,再結合分治算法, ForkJoin 框架中就出現了 ForkJoinPool 和 ForkJoinTask。正所謂:
天對地,雨對風。大陸對長空。山花對海樹,赤曰對蒼穹
套用已有知識,簡單理解就是這樣滴:
咱們以前說過無數次,JDK 不會重複造輪子,這裏談及類似是爲了讓你們有個簡單的直觀印象,內裏確定有所差異,咱們先大體看一下這兩個類:
又是這個男人,Doug Lea
,怎麼就那麼牛(破音)
/** * Abstract base class for tasks that run within a {@link ForkJoinPool}. * A {@code ForkJoinTask} is a thread-like entity that is much * lighter weight than a normal thread. Huge numbers of tasks and * subtasks may be hosted by a small number of actual threads in a * ForkJoinPool, at the price of some usage limitations. * * @since 1.7 * @author Doug Lea */ public abstract class ForkJoinTask<V> implements Future<V>, Serializable
能夠看到 ForkJoinTask
實現了 Future
接口(那就是具備 Future 接口的特性),一樣如其名,fork()
和 join()
天然是它的兩個核心方法
fork()
: 異步執行一個子任務(上面說的拆分)join()
: 阻塞當前線程等待子任務的執行結果(上面說的合併)另外,從上面代碼中能夠看出,ForkJoinTask
是一個抽象類,在分治模型中,它還有兩個抽象子類 RecursiveAction
和 RecursiveTask
那這兩個子抽象類有什麼差異呢?若是你打開 IDE,你應該一眼就能看出差異,so easy
public abstract class RecursiveAction extends ForkJoinTask<Void>{ ... /** * The main computation performed by this task. */ protected abstract void compute(); ... } public abstract class RecursiveTask<V> extends ForkJoinTask<V>{ ... protected abstract void compute(); ... }
兩個類裏面都定義了一個抽象方法 compute()
,須要子類重寫實現具體邏輯
那子類要遵循什麼邏輯重寫這個方法呢?
遵循分治思想,重寫的邏輯很簡單,就是回答三個問題:
用「僞代碼」再翻譯一下上面這段話,大概就是這樣滴:
if(任務小到不用繼續拆分){ 直接計算獲得結果 }else{ 拆分子任務 調用子任務的fork()進行計算 調用子任務的join()合併計算結果 }
(做爲程序員,若是你寫過遞歸運算,這個邏輯理解起來是很是簡單的)
介紹到這裏,就能夠用 ForkJoin 幹些事情了——經典 Fibonacci 計算就能夠用分治思想(不信,你逐條按照上面分治算法適用狀況自問自答一下?),直接借用官方 Docs (注意看 compute 方法),額外添加個 main 方法來看一下:
@Slf4j public class ForkJoinDemo { public static void main(String[] args) { int n = 20; // 爲了追蹤子線程名稱,須要重寫 ForkJoinWorkerThreadFactory 的方法 final ForkJoinPool.ForkJoinWorkerThreadFactory factory = pool -> { final ForkJoinWorkerThread worker = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool); worker.setName("my-thread" + worker.getPoolIndex()); return worker; }; //建立分治任務線程池,能夠追蹤到線程名稱 ForkJoinPool forkJoinPool = new ForkJoinPool(4, factory, null, false); // 快速建立 ForkJoinPool 方法 // ForkJoinPool forkJoinPool = new ForkJoinPool(4); //建立分治任務 Fibonacci fibonacci = new Fibonacci(n); //調用 invoke 方法啓動分治任務 Integer result = forkJoinPool.invoke(fibonacci); log.info("Fibonacci {} 的結果是 {}", n, result); } } @Slf4j class Fibonacci extends RecursiveTask<Integer> { final int n; Fibonacci(int n) { this.n = n; } @Override public Integer compute() { //和遞歸相似,定義可計算的最小單元 if (n <= 1) { return n; } // 想查看子線程名稱輸出的能夠打開下面註釋 //log.info(Thread.currentThread().getName()); Fibonacci f1 = new Fibonacci(n - 1); // 拆分紅子任務 f1.fork(); Fibonacci f2 = new Fibonacci(n - 2); // f1.join 等待子任務執行結果 return f2.compute() + f1.join(); } }
執行結果以下:
進展到這裏,相信基本的使用就已經搞定了,上面代碼中使用了 ForkJoinPool,那問題來了:
池化既然是一類思想,Java 已經有了ThreadPoolExecutor
,爲何又要搞出個ForkJoinPool
呢?
藉助下面這張圖,先來回憶一下 ThreadPoolExecutor 的實現原理(詳情請看爲何要使用線程池):
一眼就能看出來這是典型的生產者/消費者
模式,消費者線程都從一個共享的 Task Queue 中消費提交的任務。ThreadPoolExecutor 簡單的並行操做主要是爲了執行時間不肯定的任務(I/O 或定時任務等)
JDK 重複造輪子是不可能的,分治思想其實也能夠理解成一種父子任務依賴的關係,當依賴層級很是深,用 ThreadPoolExecutor
來處理這種關係很顯然是不太現實的,因此 ForkJoinPool
做爲功能補充就出現了
任務拆分後有依賴關係,還得減小線程之間的競爭,那就讓線程執行屬於本身的 task 就能夠了唄,因此較 ThreadPoolExecutor
的單個 TaskQueue 的形式,ForkJoinPool
是多個 TaskQueue的形式,簡單用圖來表示,就是這樣滴:
有多個任務隊列,因此在 ForkJoinPool 中就有一個數組形式的成員變量 WorkQueue[]
。那問題又來了
任務隊列有多個,提交的任務放到哪一個隊列中呢?(上圖中的
Router Rule
部分)
這就須要一套路由規則,從上面的代碼 Demo 中能夠理解,提交的任務主要有兩種:
爲了進一步區分這兩種 task,Doug Lea 就設計一個簡單的路由規則:
submission task
放到 WorkQueue 數組的「偶數」
下標中worker task
放在 WorkQueue 的「奇數」
下標中,而且只有奇數下標纔有線程( worker )與之相對應局部豐富一下上圖就是這樣滴:
每一個任務執行時間都是不同的(固然是在 CPU 眼裏),執行快的線程的工做隊列的任務就多是空的,爲了最大化利用 CPU 資源,就容許空閒線程拿取其它任務隊列中的內容,這個過程就叫作 work-stealing
(工做竊取)
當前線程要執行一個任務,其餘線程還有可能過來竊取任務,這就會產生競爭,爲了減小競爭,WorkQueue 就設計成了一個雙端隊列:
線程(worker)操做本身的 WorkQueue 默認是 LIFO 操做(可選FIFO),當線程(worker)嘗試竊取其餘 WorkQueue 裏的任務時,這個時候執行的是FIFO操做,即從 base 端竊取,用圖豐富一下就是這樣滴:
這樣的好處很是明顯了:
從 WorkQueue 的成員變量的修飾符中也能看出一二了(base 有 volatile 修飾,而 top 卻沒有):
volatile int base; // index of next slot for poll int top; // index of next slot for push
到這裏,相信你已經瞭解 ForkJoinPool 的基本實現原理了,但也會伴隨着不少疑問(這都是怎麼實現的?),好比:
保留住這些問題,一點點看源碼來了解一下吧:
ForkJoinPool 的源碼涉及到大量的位運算,這裏會把核心部分說清楚,想要理解的更深刻,還須要你們本身一點點追蹤查看
結合上面的鋪墊,你應該知道 ForkJoinPool 裏有三個重要的角色:
源碼分析的整個流程也是圍繞這幾個類的方法來講明,但在瞭解這三個角色以前,咱們須要先了解 ForkJoinPool 都爲這三個角色鋪墊了哪些內容
故事就得從 ForkJoinPool 的構造方法提及
public ForkJoinPool() { this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()), defaultForkJoinWorkerThreadFactory, null, false); } public ForkJoinPool(int parallelism) { this(parallelism, defaultForkJoinWorkerThreadFactory, null, false); } public ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler, boolean asyncMode) { this(checkParallelism(parallelism), checkFactory(factory), handler, asyncMode ? FIFO_QUEUE : LIFO_QUEUE, "ForkJoinPool-" + nextPoolId() + "-worker-"); checkPermission(); }
除了以上三個構造方法以外,在 JDK1.8 中還增長了另一種初始化 ForkJoinPool 對象的方式(QQ:這是什麼設計模式?):
static final ForkJoinPool common; /** * @return the common pool instance * @since 1.8 */ public static ForkJoinPool commonPool() { // assert common != null : "static init error"; return common; }
Common 是在靜態塊裏面初始化的(只會被執行一次):
common = java.security.AccessController.doPrivileged (new java.security.PrivilegedAction<ForkJoinPool>() { public ForkJoinPool run() { return makeCommonPool(); }}); private static ForkJoinPool makeCommonPool() { int parallelism = -1; ... 其餘默認初始化內容 if (parallelism < 0 && // default 1 less than #cores (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0) parallelism = 1; if (parallelism > MAX_CAP) parallelism = MAX_CAP; // 執行上面的構造方法 return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE, "ForkJoinPool.commonPool-worker-"); }
由於這是一個單例通用的 ForkJoinPool,因此切記:
若是使用通用 ForkJoinPool,最好只作 CPU 密集型的計算操做,不要有不肯定性的 I/O 內容在任務裏面,以防拖垮總體
上面全部的構造方法最後都會調用這個私有方法:
private ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler, int mode, String workerNamePrefix) { this.workerNamePrefix = workerNamePrefix; this.factory = factory; this.ueh = handler; this.config = (parallelism & SMASK) | mode; long np = (long)(-parallelism); // offset ctl counts this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK); }
參數有點多,在這裏解釋一下每一個參數的含義:
序號 | 參數名 | 描述/解釋 |
---|---|---|
1 | parallelism | 並行度,這並非定義的線程數,具體線程數,以及 WorkQueue 的長度等都是根據這個並行度來計算的,經過上面 makeCommonPool 方法能夠知道,parallelism 默認值是 CPU 核心線程數減 1 |
2 | factory | 很常見了,建立 ForkJoinWorkerThread 的工廠接口 |
3 | handler | 每一個線程的異常處理器 |
4 | mode | 上面說的 WorkQueue 的模式,LIFO/FIFO; |
5 | workerNamePrefix | ForkJoinWorkerThread的前綴名稱 |
6 | ctl | 線程池的核心控制線程字段 |
在構造方法中就已經有位運算了,太難了:
想知道 ForkJoinPool 的成員變量 config 要表達的意思,就要仔細拆開來看
static final int SMASK = 0xffff; // short bits == max index this.config = (parallelism & SMASK) | mode;
parallelism & SMASK
其實就是要保證並行度的值不能大於 SMASK,上面全部的構造方法在傳入 parallelism 的時候都會調用 checkParallelism
來檢查合法性:
static final int MAX_CAP = 0x7fff; // max #workers - 1 private static int checkParallelism(int parallelism) { if (parallelism <= 0 || parallelism > MAX_CAP) throw new IllegalArgumentException(); return parallelism; }
能夠看到 parallelism 的最大值就是 MAX_CAP
了,0x7fff
確定小於0xffff
。因此 config 的值其實就是:
this.config = parallelism | mode;
這裏假設 parallelism 就是 MAX_CAP
, 而後與 mode 進行或運算
,其中 mode 有三種:
下面以 LIFO_QUEUE 和 FIFO_QUEUE 舉例說明:
// Mode bits for ForkJoinPool.config and WorkQueue.config static final int MODE_MASK = 0xffff << 16; // top half of int static final int LIFO_QUEUE = 0; static final int FIFO_QUEUE = 1 << 16; static final int SHARED_QUEUE = 1 << 31; // must be negative
因此 parallelism | mode
根據 mode 的不一樣會產生兩種結果,可是會獲得一個確認的信息:
config 的第 17 位表示模式,低 15 位表示並行度 parallelism
當咱們須要從 config 中獲取模式 mode 時候,只須要用mode 掩碼 (MODE_MASK)和 config 作與運算
就能夠了
因此一張圖歸納 config 就是:
long np = (long)(-parallelism); // offset ctl counts
上面這段代碼就是將並行度 parallelism 補碼轉換爲 long 型,以 MAX_CAP
做爲並行度爲例,np 的值就是
這個 np 的值,就會用做 ForkJoinPool 成員變量 ctl 的計算:
// Active counts 活躍線程數 private static final int AC_SHIFT = 48; private static final long AC_UNIT = 0x0001L << AC_SHIFT; private static final long AC_MASK = 0xffffL << AC_SHIFT; // Total counts 總線程數 private static final int TC_SHIFT = 32; private static final long TC_UNIT = 0x0001L << TC_SHIFT; private static final long TC_MASK = 0xffffL << TC_SHIFT; private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); // sign // 計算 ctl this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
np << AC_SHIFT
即 np 向左移動 48 位,這樣原來的低 16 位變成了高 16 位,再用 AC 掩碼(AC_MASK) 作與運算
,也就是說 ctl 的 49 ~ 64 位表示活躍線程數np << TC_SHIFT
即 np 向左移動 32 位,這樣原來的低 16 位變成了 33 ~ 48 位,再用 TC 掩碼作與運算
,也就是說 ctl 的 33 ~ 48 位表示總線程數最後兩者再進行或運算,若是並行度仍是 MAX_CAP
,那 ctl 的最後結果就是:
到這裏,咱們才閱讀完一個構造函數的內容,從最終的結論能夠看出,初始化後 AC = TC,而且 ctl 是一個小於零的數,ctl 是 64 位的 long 類型,低 32 位是如何構造的並無在構造函數中體現出來,但註釋給了明確的說明:
/* * Bits and masks for field ctl, packed with 4 16 bit subfields: * AC: Number of active running workers minus target parallelism * TC: Number of total workers minus target parallelism * SS: version count and status of top waiting thread * ID: poolIndex of top of Treiber stack of waiters * * When convenient, we can extract the lower 32 stack top bits * (including version bits) as sp=(int)ctl. The offsets of counts * by the target parallelism and the positionings of fields makes * it possible to perform the most common checks via sign tests of * fields: When ac is negative, there are not enough active * workers, when tc is negative, there are not enough total * workers. When sp is non-zero, there are waiting workers. To * deal with possibly negative fields, we use casts in and out of * "short" and/or signed shifts to maintain signedness. * * Because it occupies uppermost bits, we can add one active count * using getAndAddLong of AC_UNIT, rather than CAS, when returning * from a blocked join. Other updates entail multiple subfields * and masking, requiring CAS. */
這段註釋主要說明了低 32 位的做用(後面會從源碼中體現出來,這裏先有個印象會對後面源碼閱讀有幫助),按註釋含義先完善一下 ctl 的值:
1:不活動(inactive)
; 0:活動(active)
,後15表示版本號,防止 ABA 問題註釋中還說,另 sp=(int)ctl
,即獲取 64 位 ctl 的低 32 位(SS | ID
),由於低 32 位都是建立出線程以後纔會存在的值,因此推斷出,若是 sp != 0, 就存在等待的工做線程,喚醒使用就行,不用建立新的線程。這樣就經過 ctl 能夠獲取到有關線程所須要的一切信息了
除了構造方法所構建的成員變量,ForkJoinPool 還有一個很是重要的成員變量 runState
,和你以前瞭解的知識同樣,線程池也須要狀態來進行管理
volatile int runState; // lockable status // runState bits: SHUTDOWN must be negative, others arbitrary powers of two private static final int RSLOCK = 1; //線程池被鎖定 private static final int RSIGNAL = 1 << 1; //線程池有線程須要喚醒 private static final int STARTED = 1 << 2; //線程池已經初始化 private static final int STOP = 1 << 29; //線程池中止 private static final int TERMINATED = 1 << 30; //線程池終止 private static final int SHUTDOWN = 1 << 31; //線程池關閉
runState
有上面 6 種狀態切換,按註釋所言,只有 SHUTDOWN
狀態是負數,其餘都是整數,在併發環境更改狀態必然要用到鎖,ForkJoinPool 對線程池加鎖和解鎖分別由 lockRunState
和 unlockRunState
來實現 (這兩個方法能夠暫且不用深刻理解,能夠暫時跳過,只須要理解它們是幫助安全更改線程池狀態的鎖便可)
不深刻了解能夠,可是我不能不寫啊...... 你待會不是得回看嗎?
/** * Acquires the runState lock; returns current (locked) runState. */ // 從方法註釋中看到,該方法必定會返回 locked 的 runState,也就是說必定會加鎖成功 private int lockRunState() { int rs; return ((((rs = runState) & RSLOCK) != 0 || !U.compareAndSwapInt(this, RUNSTATE, rs, rs |= RSLOCK)) ? awaitRunStateLock() : rs); }
或運算
的下半段 CASawaitRunStateLock
方法/** * Spins and/or blocks until runstate lock is available. See * above for explanation. */ private int awaitRunStateLock() { Object lock; boolean wasInterrupted = false; for (int spins = SPINS, r = 0, rs, ns;;) { //判斷是否加鎖(==0表示未加鎖) if (((rs = runState) & RSLOCK) == 0) { // 經過CAS加鎖 if (U.compareAndSwapInt(this, RUNSTATE, rs, ns = rs | RSLOCK)) { if (wasInterrupted) { try { // 重置線程終端標記 Thread.currentThread().interrupt(); } catch (SecurityException ignore) { // 這裏居然 catch 了個寂寞 } } // 加鎖成功返回最新的 runState,for 循環的惟一正常出口 return ns; } } else if (r == 0) r = ThreadLocalRandom.nextSecondarySeed(); else if (spins > 0) { r ^= r << 6; r ^= r >>> 21; r ^= r << 7; // xorshift if (r >= 0) --spins; } // Flag1 若是是其餘線程正在初始化佔用鎖,則調用 yield 方法讓出 CPU,讓其快速初始化 else if ((rs & STARTED) == 0 || (lock = stealCounter) == null) Thread.yield(); // initialization race // Flag2 若是其它線程持有鎖,而且線程池已經初始化,則將喚醒位標記爲1 else if (U.compareAndSwapInt(this, RUNSTATE, rs, rs | RSIGNAL)) { // 進入互斥鎖 synchronized (lock) { // 再次判斷,若是等於0,說明進入互斥鎖前恰好有線程進行了喚醒,就不用等待,直接進行喚醒操做便可,不然就進入等待 if ((runState & RSIGNAL) != 0) { try { lock.wait(); } catch (InterruptedException ie) { if (!(Thread.currentThread() instanceof ForkJoinWorkerThread)) wasInterrupted = true; } } else lock.notifyAll(); } } } }
上面代碼 33 ~ 34 (Flag1)行以及 36 ~ 50 (Flag2) 行,若是你沒看後續代碼,如今來理解是有些困難的,我這裏先提早說明一下:
Flag1: 當完整的初始化 ForkJoinPool 時,直接利用了 stealCounter 這個原子變量,由於初始化時(調用 externalSubmit 時),纔會對 StealCounter 賦值。因此,這裏的邏輯是,當狀態不是 STARTED 或者 stealCounter 爲空,讓出線程等待,也就是說,別的線程還沒初始化徹底,讓其繼續佔用鎖初始化便可
Flag2: 咱們在講等待/通知模型時就說,不要讓無限自旋嘗試,若是資源不知足就等待,若是資源知足了就通知,因此,若是 (runState & RSIGNAL) == 0
成立,說明有線程須要喚醒,直接喚醒就好,不然也別浪費資源,主動等待一會
當閱讀到這的代碼時,立刻就拋出來兩個問題:
Q1: 既然是加鎖,爲何不用已有的輪子 ReentrantLock 呢?
PS:若是你讀過併發系列 Java AQS隊列同步器以及ReentrantLock的應用 ,你會知道 ReentrantLock 是用一個完整字段 state 來控制同步狀態。但這裏在競爭鎖的時候還會判斷線程池的狀態,若是是初始化狀態主動 yield 放棄 CPU 來減小競爭;另外,用一個完整的 runState 不一樣位來表示狀態也體現出更細的粒度吧
Q2: synchronized 大法雖好,可是咱們都知道這是比較重量級的鎖,爲何還在這裏應用了呢?
PS: 首先 synchronized 通過不斷優化,沒有它剛誕生時那麼重,另外按照 Flag 2 的代碼含義,進入 synchronized 同步塊的機率仍是很低的,能夠用最簡單的方式穩穩兜底(奧卡姆剃刀了原理?)
有加鎖天然要解鎖,向下看 unlockRunState
解鎖的邏輯相對簡單多了,整體目標是清除鎖標記位。若是順利將狀態修改成目標狀態,天然解鎖成功;不然表示有別的線程進入了wait,須要調用notifyAll喚醒,從新嘗試競爭
/** * Unlocks and sets runState to newRunState. * * @param oldRunState a value returned from lockRunState * @param newRunState the next value (must have lock bit clear). */ private void unlockRunState(int oldRunState, int newRunState) { if (!U.compareAndSwapInt(this, RUNSTATE, oldRunState, newRunState)) { Object lock = stealCounter; runState = newRunState; // clears RSIGNAL bit if (lock != null) synchronized (lock) { lock.notifyAll(); } } }
這兩個方法貫穿着後續代碼分析的始終,多注意 unlockRunState
的入參便可,另外你也看到了通知都是用的 notifyAll,而不是 notify,這個問題咱們以前重點說明過,你還記得爲何嗎?若是不記得,打開併發編程之等待通知機制 回憶一下吧
第一層知識鋪墊已經差很少了,前進
回到本文最開始帶有 main 函數的 demo,咱們向 ForkJoinPool 提交任務調用的是 invoke 方法, 其實 ForkJoinPool 還支持 submit 和 execute 兩種方式來提交任務。併發的玩法很是相似,這三類方法的做業也很好區分:
在這三大類基礎上又重載了幾個更細粒度的方法,這裏不一一列舉:
public <T> T invoke(ForkJoinTask<T> task) { if (task == null) throw new NullPointerException(); externalPush(task); return task.join(); } public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) { if (task == null) throw new NullPointerException(); externalPush(task); return task; } public void execute(ForkJoinTask<?> task) { if (task == null) throw new NullPointerException(); externalPush(task); }
相信你已經發現了,提交任務的方法都會調用 externalPush(task) 這個用法,源碼的主角終於要登場了
可是......
若是你看 externalPush 代碼,第一行就是聲明一個 WorkQueue 數組變量,爲了後續流程更加絲滑,咱還得鋪墊一點 WorkQueue 的知識(又要鋪墊)
一看這麼多成員變量,仍是很慌的,不過,咱們只須要把我幾個主要的就足夠了
//初始隊列容量 static final int INITIAL_QUEUE_CAPACITY = 1 << 13; //最大隊列容量 static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M // Instance fields volatile int scanState; // versioned, <0: inactive; odd:scanning int stackPred; // pool stack (ctl) predecessor 前任池(WorkQueue[])索引,由此構成一個棧 int nsteals; // number of steals 偷取的任務個數 int hint; // randomization and stealer index hint 記錄偷取者的索引,方便後面順藤摸瓜 int config; // pool index and mode volatile int qlock; // 1: locked, < 0: terminate; else 0 volatile int base; // index of next slot for poll int top; // index of next slot for push ForkJoinTask<?>[] array; // the elements (initially unallocated) 任務數組 final ForkJoinPool pool; // the containing pool (may be null) final ForkJoinWorkerThread owner; // owning thread or null if shared 當前工做隊列的工做線程,共享模式下爲null volatile Thread parker; // == owner during call to park; else null 調用park阻塞期間爲owner,其餘狀況爲null volatile ForkJoinTask<?> currentJoin; // task being joined in awaitJoin 記錄當前join來的任務 volatile ForkJoinTask<?> currentSteal; // mainly used by helpStealer 記錄從其餘工做隊列偷取過來的任務
咱們上面說了,WorkQueue 是一個雙端隊列,線程池有 runState,WorkQueue 有 scanState
操做線程池須要鎖,操做隊列也是須要鎖的,qlock 就派上用場了
WorkQueue 中也有個 config,可是和 ForkJoinPool 中的是不同的,WorkQueue 中的config 記錄了該 WorkQueue 在 WorkQueue[] 數組的下標以及 mode
其餘字段的含義咱們就寫在代碼註釋中吧,主角從新登場,此次是真的
文章前面說過,task 會細分紅 submission task
和 worker task
,worker task
是 fork
出來的,那從這個入口進入的,天然也就是 submission task
了,也就是說:
- 經過
invoke()
|submit()
|execute()
等方法提交的 task, 是submission task
,會放到 WorkQueue 數組的偶數索引位置- 調用
fork()
方法生成出的任務,叫 worker task,會放到 WorkQueue 數組的奇數索引位置
該方法上的註釋也寫的很清楚,具體請參考代碼註釋
/** * Tries to add the given task to a submission queue at * submitter's current queue. Only the (vastly) most common path * is directly handled in this method, while screening for need * for externalSubmit. * * @param task the task. Caller must ensure non-null. */ final void externalPush(ForkJoinTask<?> task) { WorkQueue[] ws; WorkQueue q; int m; //Flag1: 經過ThreadLocalRandom產生隨機數,用於下面計算槽位索引 int r = ThreadLocalRandom.getProbe(); int rs = runState; //初始狀態爲0 //Flag2: 若是ws,即ForkJoinPool中的WorkQueue數組已經完成初始化,且根據隨機數定位的index存在workQueue,且cas的方式加鎖成功 if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 && (q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 && //對WorkQueue操做加鎖 U.compareAndSwapInt(q, QLOCK, 0, 1)) { ForkJoinTask<?>[] a; int am, n, s; //WorkQueue中的任務數組不爲空 if ((a = q.array) != null && (am = a.length - 1) > (n = (s = q.top) - q.base)) { //組長度大於任務個數,不須要擴容 int j = ((am & s) << ASHIFT) + ABASE; //WorkQueue中的任務數組不爲空 U.putOrderedObject(a, j, task); //向Queue中放入任務 U.putOrderedInt(q, QTOP, s + 1);//top值加一 U.putIntVolatile(q, QLOCK, 0); //對WorkQueue操做解鎖 //任務個數小於等於1,那麼此槽位上的線程有可能等待,若是你們都沒任務,可能都在等待,新任務來了,喚醒,起來幹活了 if (n <= 1) //喚醒可能存在等待的線程 signalWork(ws, q); return; } //任務入隊失敗,前面加鎖了,這裏也要解鎖 U.compareAndSwapInt(q, QLOCK, 1, 0); } //Flag3: 不知足上述條件,也就是說上面的這些 WorkQueue[]等都不存在,就要經過這個方法一切從頭開始建立 externalSubmit(task); }
上面加了三處 Flag,爲了讓你們更好的理解代碼仍是有必要作進一步說明的:
Flag1: ThreadLocalRandom 是 ThreadLocal 的衍生物,每一個線程默認的 probe 是 0,當線程調用ThreadLocalRandom.current()時,會初始化 seed 和 probe,維護在線程內部,這裏就知道是生成一個隨機數就好,具體細節仍是值得你們自行看一下
Flag2: 這裏包含的信息仍是很是多的
// 二進制爲:0000 0000 0000 0000 0000 0000 0111 1110 static final int SQMASK = 0x007e; // max 64 (even) slots
Flag3: 看過 flag2 的描述,你也就很好理解 Flag 3 了,若是是第一次提交任務,必走 Flag 3 的 externalSubmit
方法
這個方法很長,但沒超過 80 行,具體請看方法註釋
//初始化所須要的一切 private void externalSubmit(ForkJoinTask<?> task) { int r; // initialize caller's probe //生成隨機數 if ((r = ThreadLocalRandom.getProbe()) == 0) { ThreadLocalRandom.localInit(); r = ThreadLocalRandom.getProbe(); } for (;;) { WorkQueue[] ws; WorkQueue q; int rs, m, k; boolean move = false; // 若是線程池的狀態爲終止狀態,則幫助終止 if ((rs = runState) < 0) { tryTerminate(false, false); // help terminate throw new RejectedExecutionException(); } //Flag1: 再判斷一次狀態是否爲初始化,由於在lockRunState過程當中有可能狀態被別的線程更改了 else if ((rs & STARTED) == 0 || // initialize ((ws = workQueues) == null || (m = ws.length - 1) < 0)) { int ns = 0; //Flag1.1: 加鎖 rs = lockRunState(); try { if ((rs & STARTED) == 0) { // 初始化stealcounter的值(任務竊取計數器,原子變量) U.compareAndSwapObject(this, STEALCOUNTER, null, new AtomicLong()); // create workQueues array with size a power of two //取config的低16位(確切說是低15位),獲取並行度 int p = config & SMASK; // ensure at least 2 slots //Flag1.2: 若是你看過HashMap 的源碼,這個就很好理解了,獲取2次冪大小 int n = (p > 1) ? p - 1 : 1; n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1; //初始化 WorkQueue 數組 workQueues = new WorkQueue[n]; // 標記初始化完成 ns = STARTED; } } finally { // 解鎖 unlockRunState(rs, (rs & ~RSLOCK) | ns); } } //Flag2 上面分析過,取偶數位槽位,將任務放進偶數槽位 else if ((q = ws[k = r & m & SQMASK]) != null) { // 對 WorkQueue 加鎖 if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) { ForkJoinTask<?>[] a = q.array; int s = q.top; // 初始化任務提交標識 boolean submitted = false; // initial submission or resizing try { // locked version of push //計算內存偏移量,聽任務,更新top值 if ((a != null && a.length > s + 1 - q.base) || (a = q.growArray()) != null) { int j = (((a.length - 1) & s) << ASHIFT) + ABASE; U.putOrderedObject(a, j, task); U.putOrderedInt(q, QTOP, s + 1); //提交任務成功 submitted = true; } } finally { //WorkQueue解鎖 U.compareAndSwapInt(q, QLOCK, 1, 0); } // 任務提交成功了 if (submitted) { //天然要喚醒可能存在等待的線程來處理任務了 signalWork(ws, q); return; } } //任務提交沒成功,能夠從新計算隨機數,再走一次流程 move = true; // move on failure } //Flag3: 接Flag2,若是找到的槽位是空,則要初始化一個WorkQueue else if (((rs = runState) & RSLOCK) == 0) { // create new queue q = new WorkQueue(this, null); // 設置工做隊列的竊取線索值 q.hint = r; // 如上面 WorkQueue 中config 的介紹,記錄當前WorkQueue在WorkQueue[]數組中的值,和隊列模式 q.config = k | SHARED_QUEUE; // 初始化爲 inactive 狀態 q.scanState = INACTIVE; //加鎖 rs = lockRunState(); // publish index if (rs > 0 && (ws = workQueues) != null && k < ws.length && ws[k] == null) ws[k] = q; // else terminated //解鎖 unlockRunState(rs, rs & ~RSLOCK); } else move = true; // move if busy if (move) r = ThreadLocalRandom.advanceProbe(r); } }
Flag1.1 : 有個細節須要說一下,咱們在 Java AQS隊列同步器以及ReentrantLock的應用 時提到過使用鎖的範式以及爲何要這樣用,ForkJoinPool
這裏一樣遵循這種範式
Lock lock = new ReentrantLock(); lock.lock(); try{ ... }finally{ lock.unlock(); }
Flag1.2: 簡單描述這個過程,就是根據不一樣的並行度來初始化不一樣大小的 WorkQueue[]數組,數組大小要求是 2 的 n 次冪,因此給你們個表格直觀理解一下並行度和隊列容量的關係:
並行度p | 容量 |
---|---|
1,2 | 4 |
3,4 | 8 |
5 ~ 8 | 16 |
9 ~ 16 | 32 |
Flag 1,2,3: 若是你理解了上面這個方法,很顯然,第一次執行這個方法內部的邏輯順序應該是 Flag1
——> Flag3
——>Flag2
externalSubmit 若是任務成功提交,就會調用 signalWork
方法了
前面鋪墊的知識要大規模派上用場(一大波殭屍來襲),are you ready?
若是 ForkJoinPool 的 ctl 成員變量的做用已經忘了,趕忙向上翻從新記憶一下
//常量值 static final int SS_SEQ = 1 << 16; // version count final void signalWork(WorkQueue[] ws, WorkQueue q) { long c; int sp, i; WorkQueue v; Thread p; // ctl 小於零,說明活動的線程數 AC 不夠 while ((c = ctl) < 0L) { // too few active // 取ctl的低32位,若是爲0,說明沒有等待的線程 if ((sp = (int)c) == 0) { // no idle workers // 取TC的高位,若是不等於0,則說明目前的工做着尚未達到並行度 if ((c & ADD_WORKER) != 0L) // too few workers //添加 Worker,也就是說要建立線程了 tryAddWorker(c); break; } //未開始或者已中止,直接跳出 if (ws == null) // unstarted/terminated break; //i=空閒線程棧頂端所屬的工做隊列索引 if (ws.length <= (i = sp & SMASK)) // terminated break; if ((v = ws[i]) == null) // terminating break; //程序執行到這裏,說明有空閒線程,計算下一個scanState,增長了版本號,而且調整爲 active 狀態 int vs = (sp + SS_SEQ) & ~INACTIVE; // next scanState int d = sp - v.scanState; // screen CAS //計算下一個ctl的值,活動線程數 AC + 1,經過stackPred取得前一個WorkQueue的索引,從新設置回sp,行程最終的ctl值 long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred); //更新 ctl 的值 if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) { v.scanState = vs; // activate v //若是有線程阻塞,則調用unpark喚醒便可 if ((p = v.parker) != null) U.unpark(p); break; } //沒有任務,直接跳出 if (q != null && q.base == q.top) // no more work break; } }
假設程序剛開始執行,那麼活動線程數以及總線程數確定都沒達到並行度要求,這時就會調用 tryAddWorker
方法了
tryAddWorker 的邏輯就很是簡單了,由於是操做線程池,一樣會用到 lockRunState
/unlockRunState
的鎖控制
private void tryAddWorker(long c) { //初始化添加worker表識 boolean add = false; do { //由於要添加Worker,因此AC和TC都要加一 long nc = ((AC_MASK & (c + AC_UNIT)) | (TC_MASK & (c + TC_UNIT))); //ctl還沒被改變 if (ctl == c) { int rs, stop; // check if terminating if ((stop = (rs = lockRunState()) & STOP) == 0) //更新ctl 的值, add = U.compareAndSwapLong(this, CTL, c, nc); unlockRunState(rs, rs & ~RSLOCK); if (stop != 0) break; //ctl值更新成功,開始真正的建立Worker if (add) { createWorker(); break; } } // 從新獲取ctl,而且沒有達到最大線程數,而且沒有空閒的線程 } while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0); }
一切順利,就要調用 createWorker 方法來建立真正的 Worker 了,形勢逐漸明朗
介紹過了 WorkerQueue 和 ForkJoinTask,上文說的三個重要角色中的最後一個 ForkJoinWorkerThread
終於登場了
private boolean createWorker() { ForkJoinWorkerThreadFactory fac = factory; Throwable ex = null; ForkJoinWorkerThread wt = null; try { //若是工廠已經存在了,就用factory來建立線程,會去註冊線程,這裏的this就是ForkJoinPool對象 if (fac != null && (wt = fac.newThread(this)) != null) { //啓動線程 wt.start(); return true; } } catch (Throwable rex) { ex = rex; } //若是建立線程失敗,就要逆向註銷線程,包括前面對ctl等的操做 deregisterWorker(wt, ex); return false; }
Worker 線程是如何與 WorkQueue 對應的,就藏在 fac.newThread(this)
這個方法裏面,下面這點代碼展現一下調用過程
public ForkJoinWorkerThread newThread(ForkJoinPool pool); static final class DefaultForkJoinWorkerThreadFactory implements ForkJoinWorkerThreadFactory { public final ForkJoinWorkerThread newThread(ForkJoinPool pool) { return new ForkJoinWorkerThread(pool); } } protected ForkJoinWorkerThread(ForkJoinPool pool) { // Use a placeholder until a useful name can be set in registerWorker super("aForkJoinWorkerThread"); this.pool = pool; this.workQueue = pool.registerWorker(this); }
很顯然核心內容在 registerWorker
方法裏面了
WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner) { this.pool = pool; this.owner = owner; // Place indices in the center of array (that is not yet allocated) base = top = INITIAL_QUEUE_CAPACITY >>> 1; } final WorkQueue registerWorker(ForkJoinWorkerThread wt) { UncaughtExceptionHandler handler; //這裏線程被設置爲守護線程,由於,當只剩下守護線程時,JVM就會推出 wt.setDaemon(true); // configure thread //填補處理異常的handler if ((handler = ueh) != null) wt.setUncaughtExceptionHandler(handler); //建立一個WorkQueue,而且設置當前WorkQueue的owner是當前線程 WorkQueue w = new WorkQueue(this, wt); int i = 0; // assign a pool index //又用到了config的知識,提取出咱們指望的WorkQueue模式 int mode = config & MODE_MASK; //加鎖 int rs = lockRunState(); try { WorkQueue[] ws; int n; // skip if no array //判斷ForkJoinPool的WorkQueue[]都初始化徹底 if ((ws = workQueues) != null && (n = ws.length) > 0) { //一種魔數計算方式,用以減小衝突 int s = indexSeed += SEED_INCREMENT; // unlikely to collide //假設WorkQueue的初始長度是16,那這裏的m就是15,最終目的就是爲了獲得一個奇數 int m = n - 1; //和獲得偶數的計算方式同樣,獲得一個小於m的奇數i i = ((s << 1) | 1) & m; // odd-numbered indices //若是這個槽位不爲空,說明已經被其餘線程初始化過了,也就是有衝突,選取別的槽位 if (ws[i] != null) { // collision int probes = 0; // step by approx half n //步長加2,也就保證step仍是奇數 int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2; //一直遍歷,直到找到空槽位,若是都遍歷了一遍,那就須要對WorkQueue[]擴容了 while (ws[i = (i + step) & m] != null) { if (++probes >= n) { workQueues = ws = Arrays.copyOf(ws, n <<= 1); m = n - 1; probes = 0; } } } //初始化一個隨機數 w.hint = s; // use as random seed //如文章前面所說,config記錄索引值和模式 w.config = i | mode; //掃描狀態也記錄爲索引值,如文章前面所說,奇數表示爲scanning狀態 w.scanState = i; // publication fence //把初始化好的WorkQueue放到ForkJoinPool的WorkQueue[]數組中 ws[i] = w; } } finally { //解鎖 unlockRunState(rs, rs & ~RSLOCK); } //設置worker的前綴名,用於業務區分 wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1))); //返回當前線程建立的WorkQueue,回到上一層調用棧,也就將WorkQueue註冊到ForkJoinWorkerThread裏面了 return w; }
到這裏線程是順利建立成功了,但是若是線程沒有建立成功,就須要 deregisterWorker來作善後工做了
deregisterWorker 方法接收剛剛建立的線程引用和異常做爲參數,來作善後工做,將 registerWorker 相關工做撤銷回來
final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) { WorkQueue w = null; if (wt != null && (w = wt.workQueue) != null) { WorkQueue[] ws; // remove index from array //獲取當前線程註冊的索引值 int idx = w.config & SMASK; //加鎖 int rs = lockRunState(); //若是奇數槽位都不爲空,則清空內容 if ((ws = workQueues) != null && ws.length > idx && ws[idx] == w) ws[idx] = null; //解鎖 unlockRunState(rs, rs & ~RSLOCK); } long c; // decrement counts //死循環式CAS更改ctl的值,將前面AC和TC加1的值再減1,ctl就在那裏,不增不減 do {} while (!U.compareAndSwapLong (this, CTL, c = ctl, ((AC_MASK & (c - AC_UNIT)) | (TC_MASK & (c - TC_UNIT)) | (SP_MASK & c)))); //清空WorkQueue,將其中的task取消掉 if (w != null) { w.qlock = -1; // ensure set w.transferStealCount(this); w.cancelAll(); // cancel remaining tasks } //可能的替換操做 for (;;) { // possibly replace WorkQueue[] ws; int m, sp; //若是線程池終止了,那就跳出循環便可 if (tryTerminate(false, false) || w == null || w.array == null || (runState & STOP) != 0 || (ws = workQueues) == null || (m = ws.length - 1) < 0) // already terminating break; //當前線程建立失敗,經過sp判斷,若是還存在空閒線程,則調用tryRelease來喚醒這個線程,而後跳出 if ((sp = (int)(c = ctl)) != 0) { // wake up replacement if (tryRelease(c, ws[sp & m], AC_UNIT)) break; } //若是沒空閒線程,而且尚未達到知足並行度的條件,那就得再次嘗試建立一個線程,彌補剛剛的失敗 else if (ex != null && (c & ADD_WORKER) != 0L) { tryAddWorker(c); // create replacement break; } else // don't need replacement break; } if (ex == null) // help clean on way out //處理異常 ForkJoinTask.helpExpungeStaleExceptions(); else // rethrow ForkJoinTask.rethrow(ex); }
總之 deregisterWorker 方法從線程池裏註銷線程,清空WorkQueue,同時更新ctl,最後作可能的替換,根據線程池的狀態決定是否找一個本身的替代者:
deregisterWorker 線程解釋清楚了是爲了幫助你們完整理解流程,但 registerWorker 成功後的流程還沒走完,咱得繼續,有了 Worker,那就調用 wt.start()
幹活吧
ForkJoinWorkerThread 繼承自Thread,調用start() 方法後,天然要調用本身重寫的 run() 方法
public void run() { if (workQueue.array == null) { // only run once Throwable exception = null; try { onStart(); //Work開始工做,處理workQueue中的任務 pool.runWorker(workQueue); } catch (Throwable ex) { exception = ex; } finally { try { onTermination(exception); } catch (Throwable ex) { if (exception == null) exception = ex; } finally { pool.deregisterWorker(this, exception); } } } }
方法的重點天然是進入到 runWorker
runWorker 是很常規的三部曲操做:
具體請看註釋
final void runWorker(WorkQueue w) { //初始化隊列,並根據須要是否擴容爲原來的2倍 w.growArray(); // allocate queue int seed = w.hint; // initially holds randomization hint int r = (seed == 0) ? 1 : seed; // avoid 0 for xorShift //死循環更新偏移r,爲掃描任務做準備 for (ForkJoinTask<?> t;;) { //掃描任務 if ((t = scan(w, r)) != null) //掃描到就執行任務 w.runTask(t); //沒掃描到就等待,若是等也等不到任務,那就跳出循環別死等了 else if (!awaitWork(w, r)) break; r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift } }
先來看 scan 方法
ForkJoinPool 的任務竊取機制要來了,如何 steal 的,就藏在scan 方法中
private ForkJoinTask<?> scan(WorkQueue w, int r) { WorkQueue[] ws; int m; //再次驗證workQueue[]數組的初始化狀況 if ((ws = workQueues) != null && (m = ws.length - 1) > 0 && w != null) { //獲取當前掃描狀態 int ss = w.scanState; // initially non-negative //又一個死循環,注意到出口位置就好 //和前面邏輯相似,隨機一個起始位置,並賦值給k for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0;;) { WorkQueue q; ForkJoinTask<?>[] a; ForkJoinTask<?> t; int b, n; long c; //若是k槽位不爲空 if ((q = ws[k]) != null) { //base-top小於零,而且任務q不爲空 if ((n = (b = q.base) - q.top) < 0 && (a = q.array) != null) { // non-empty //獲取base的偏移量,賦值給i long i = (((a.length - 1) & b) << ASHIFT) + ABASE; //從base端獲取任務,和前文的描述的steal搭配上了,是從base端steal if ((t = ((ForkJoinTask<?>) U.getObjectVolatile(a, i))) != null && q.base == b) { //是active狀態 if (ss >= 0) { //更新WorkQueue中數組i索引位置爲空,而且更新base的值 if (U.compareAndSwapObject(a, i, t, null)) { q.base = b + 1; //n<-1,說明當前隊列還有剩餘任務,繼續喚醒可能存在的其餘線程 if (n < -1) // signal others signalWork(ws, q); //直接返回任務 return t; } } else if (oldSum == 0 && // try to activate w.scanState < 0) tryRelease(c = ctl, ws[m & (int)c], AC_UNIT); } //若是獲取任務失敗,則準備換位置掃描 if (ss < 0) // refresh ss = w.scanState; r ^= r << 1; r ^= r >>> 3; r ^= r << 10; origin = k = r & m; // move and rescan oldSum = checkSum = 0; continue; } checkSum += b; } //k一直在變,掃描到最後,若是等於origin,說明已經掃描了一圈還沒掃描到任務 if ((k = (k + 1) & m) == origin) { // continue until stable if ((ss >= 0 || (ss == (ss = w.scanState))) && oldSum == (oldSum = checkSum)) { if (ss < 0 || w.qlock < 0) // already inactive break; //準備inactive當前工做隊列 int ns = ss | INACTIVE; // try to inactivate //活動線程數AC減1 long nc = ((SP_MASK & ns) | (UC_MASK & ((c = ctl) - AC_UNIT))); w.stackPred = (int)c; // hold prev stack top U.putInt(w, QSCANSTATE, ns); if (U.compareAndSwapLong(this, CTL, c, nc)) ss = ns; else w.scanState = ss; // back out } checkSum = 0; } } } return null; }
若是順利掃描到任務,那就要調用 runTask 方法來真正的運行這個任務了
立刻就接近真相了,steal 到任務了,就乾點正事吧
final void runTask(ForkJoinTask<?> task) { if (task != null) { scanState &= ~SCANNING; // mark as busy //Flag1: 記錄當前的任務是偷來的,至於如何執行task,是咱們寫在compute方法中的,咱們一會看doExec() 方法 (currentSteal = task).doExec(); U.putOrderedObject(this, QCURRENTSTEAL, null); // release for GC execLocalTasks(); ForkJoinWorkerThread thread = owner; //累加偷來的數量,親兄弟明算賬啊,雖然算完也沒啥實際意義 if (++nsteals < 0) // collect on overflow transferStealCount(pool); //任務執行完後,就從新更新scanState爲SCANNING scanState |= SCANNING; if (thread != null) thread.afterTopLevelExec(); } }
Flag1: doExec 方法纔是真正執行任務的關鍵,它是連接咱們自定義 compute 方法的核心,來看 doExec 方法
形勢一片大好,挺住,揭開 exec 的面紗,就看到本質了
//ForkJoinTask中的抽象方法,RecursiveTask 和 RecursiveAction 都重寫了它 protected abstract boolean exec(); final int doExec() { int s; boolean completed; if ((s = status) >= 0) { try { completed = exec(); } catch (Throwable rex) { return setExceptionalCompletion(rex); } if (completed) s = setCompletion(NORMAL); } return s; } //RecursiveTask重寫的內容,終於看到咱們文章開頭 demo 中的compute 了 protected final boolean exec() { result = compute(); return true; }
到這裏,咱們已經看到本質了,繞了這麼一大圈,終於和咱們本身重寫的compute方法聯繫到了一塊兒,真是不容易,可是 runWorker 三部曲還差最後一曲 awaitWork 沒譜,咱們來看看
上面說的是 scan 到了任務,要是沒有scan到任務,那就得將當前線程阻塞一下,具體標註在註釋中,能夠簡單瞭解一下
private boolean awaitWork(WorkQueue w, int r) { if (w == null || w.qlock < 0) // w is terminating return false; for (int pred = w.stackPred, spins = SPINS, ss;;) { if ((ss = w.scanState) >= 0) break; else if (spins > 0) { r ^= r << 6; r ^= r >>> 21; r ^= r << 7; if (r >= 0 && --spins == 0) { // randomize spins WorkQueue v; WorkQueue[] ws; int s, j; AtomicLong sc; if (pred != 0 && (ws = workQueues) != null && (j = pred & SMASK) < ws.length && //前驅任務隊列還在 (v = ws[j]) != null && // see if pred parking //而且工做隊列已經激活,說明任務來了了 (v.parker == null || v.scanState >= 0)) //繼續自旋等一會,別返回false spins = SPINS; // continue spinning } } //自旋以後,再次檢查工做隊列是否終止,如果,退出掃描 else if (w.qlock < 0) // recheck after spins return false; else if (!Thread.interrupted()) { long c, prevctl, parkTime, deadline; int ac = (int)((c = ctl) >> AC_SHIFT) + (config & SMASK); if ((ac <= 0 && tryTerminate(false, false)) || (runState & STOP) != 0) // pool terminating return false; if (ac <= 0 && ss == (int)c) { // is last waiter prevctl = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & pred); int t = (short)(c >>> TC_SHIFT); // shrink excess spares if (t > 2 && U.compareAndSwapLong(this, CTL, c, prevctl)) return false; // else use timed wait parkTime = IDLE_TIMEOUT * ((t >= 0) ? 1 : 1 - t); deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP; } else prevctl = parkTime = deadline = 0L; Thread wt = Thread.currentThread(); U.putObject(wt, PARKBLOCKER, this); // emulate LockSupport w.parker = wt; if (w.scanState < 0 && ctl == c) // recheck before park U.park(false, parkTime); U.putOrderedObject(w, QPARKER, null); U.putObject(wt, PARKBLOCKER, null); if (w.scanState >= 0) break; if (parkTime != 0L && ctl == c && deadline - System.nanoTime() <= 0L && U.compareAndSwapLong(this, CTL, c, prevctl)) return false; // shrink pool } } return true; }
到這裏,ForkJoinPool 的完整流程算是有個基本瞭解了,可是咱們前面講的這些內容都是從 submission task 做爲切入點的。剛剛聊到的 compute 方法,咱們按照分治算法範式寫了本身的邏輯,具體請回看文中開頭的demo,很關鍵的一點是,咱們在 compute 中調用了 fork 方法,這就給咱們瞭解 worker task 的機會了,繼續來看 fork 方法
Fork 方法的邏輯很簡單,若是當前線程是 ForkJoinWorkerThread 類型,也就是說已經經過上文註冊的 Worker,那麼直接調用 push 方法將 task 放到當前線程擁有的 WorkQueue 中,不然就再調用 externalPush 重走咱們已上說的全部邏輯(你敢再走一遍嗎?)
public final ForkJoinTask<V> fork() { Thread t; if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ((ForkJoinWorkerThread)t).workQueue.push(this); else ForkJoinPool.common.externalPush(this); return this; } //push 方法很簡單,這裏就再也不過多解釋了 final void push(ForkJoinTask<?> task) { ForkJoinTask<?>[] a; ForkJoinPool p; int b = base, s = top, n; if ((a = array) != null) { // ignore if queue removed int m = a.length - 1; // fenced write for task visibility U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task); U.putOrderedInt(this, QTOP, s + 1); if ((n = s - b) <= 1) { if ((p = pool) != null) p.signalWork(p.workQueues, this); } else if (n >= m) growArray(); } }
有 fork 就有 join,繼續看一下 join 方法()
join 的核心調用在 doJoin,可是看到這麼多級聯三元運算符,我慌了
public final V join() { int s; if ((s = doJoin() & DONE_MASK) != NORMAL) reportException(s); return getRawResult(); } private int doJoin() { int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w; //status,task 的運行狀態 return (s = status) < 0 ? s : ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? (w = (wt = (ForkJoinWorkerThread)t).workQueue). tryUnpush(this) && (s = doExec()) < 0 ? s : wt.pool.awaitJoin(w, this, 0L) : externalAwaitDone(); }
咱們將 doJoin 方法用咱們最熟悉的 if/else 作個改動,是否是就豁然開朗了
private int doJoin() { int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w; if((s = status) < 0) { // 有結果,直接返回 return s; }else { if((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) { // 若是是 ForkJoinWorkerThread Worker if((w = (wt = (ForkJoinWorkerThread) t).workQueue).tryUnpush(this) // 相似上面提到的 scan,可是是專項嘗試從本工做隊列裏取出等待的任務 // 取出了任務,就去執行它,並返回結果 && (s = doExec()) < 0) { return s; }else { // 也有可能別的線程把這個任務偷走了,那就執行內部等待方法 return wt.pool.awaitJoin(w, this, 0L); } }else { // 若是不是 ForkJoinWorkerThread,執行外部等待方法 return externalAwaitDone(); } } }
其中 awaitJoin 和 externalAwaitDone 都用到了 Helper(幫助) 和 Compensating(補償) 兩種策略,這兩種策略你們徹底能夠自行閱讀了,尤爲是 awaitJoin 方法,強烈推薦你們自行閱讀,其中 pop 的過程在這裏,這裏再也不展開
到這裏,有關 ForkJoinPool 相關的內容就算是結束了,爲了讓你們有個更好的理解 fork/join 機制,咱們仍是畫幾張圖解釋一下
假設咱們的大任務是 Task(8), 最終被分治成可執行的最小單元是 Task(1)
按照分治思想拆分任務的總體目標就是這樣滴:
從外部先提交一個大的 Task(8),將其放在偶數槽位中(請注意顏色對應)
不知足並行度,會建立 Worker 1 來掃描,並從 base 端竊取到任務 task(8),執行到 compute, fork
出兩個 task(4), 並 push到 WorkQueue 中
在執行任務時始終會確認是否知足並行度要求,若是沒有就會繼續建立新的Worker,與此同時,也會繼續 fork 任務,直到最小單元。Worker1 會從 top 端 pop 出來 task(4) 來繼續 compute 和 fork,並從新 push 到 WorkQueue 中
task(2) 還不是最小單元,因此會繼續 pop 出 task(2),並最終 fork 出兩個 task(1) push 到 WorkQueue中
task(1) 已是最小粒度了,能夠直接 pop 出來執行,獲取最終結果;在 Worker1 進行這些 pop 操做的同時,爲了知足並行度要求也會建立的其餘Worker,好比 Worker 2,這時 Worker2 會從 Worker 1 所在隊列的 base 端竊取任務
Worker 2 依舊是按照這個規則進行 pop->fork,到最終能夠 exec 任務,假設 Worker 1 的任務先執行完,要 join 結果,當 join task(4) 時,經過 hint 定位到是誰偷走了 task(4),這時順藤摸瓜找到 Worker2,若是 Worker2 還有任務沒執行完,Worker1 再竊取回來幫着執行,這樣互幫互助,最終快速完成任務
protected Long compute() { if (任務足夠小) { return cal(); } SumTask subtask1 = new SumTask(...); SumTask subtask2 = new SumTask(...); // 分別對子任務調用fork(): subtask1.fork(); subtask2.fork(); // 分別獲取合併結果: Long subresult1 = subtask1.join(); Long subresult2 = subtask2.join(); return subresult1 + subresult2; }
這又是一篇長文,不少小夥伴私下都建議我將長文拆開,一方面讀者好消化,另外一方面我本身也在數量的體現上變得高產。幾回想拆開,但好多文章拆開就失去了連續性(你們都有遺忘曲線)。過年沒回老家,就有時間擼文章了。爲了更好的理解源碼,文章的基礎鋪墊內容不少,看到這,你應該很累了,想要將更零散的知識點串起來,那就多看代碼註釋回味一下,而後一塊兒膜拜 Doug Lea 吧