本文首發於一世流雲的專欄: https://segmentfault.com/blog...
算法領域有一種基本思想叫作「分治」,所謂「分治」就是將一個難以直接解決的大問題,分割成一些規模較小的子問題,以便各個擊破,分而治之。java
好比:對於一個規模爲N的問題,若該問題能夠容易地解決,則直接解決;不然將其分解爲K個規模較小的子問題,這些子問題互相獨立且與原問題性質相同,遞歸地解這些子問題,而後將各子問題的解合併獲得原問題的解,這種算法設計策略叫作分治法。算法
許多基礎算法都運用了「分治」的思想,好比二分查找、快速排序等等。編程
基於「分治」的思想,J.U.C在JDK1.7時引入了一套Fork/Join框架。Fork/Join框架的基本思想就是將一個大任務分解(Fork)成一系列子任務,子任務能夠繼續往下分解,當多個不一樣的子任務都執行完成後,能夠將它們各自的結果合併(Join)成一個大結果,最終合併成大任務的結果:segmentfault
從上述Fork/Join框架的描述能夠看出,咱們須要一些線程來執行Fork出的任務,在實際中,若是每次都建立新的線程執行任務,對系統資源的開銷會很大,因此Fork/Join框架利用了線程池來調度任務。數組
另外,這裏能夠思考一個問題,既然由線程池調度,根據咱們以前學習普通/計劃線程池的經驗,必然存在兩個要素:併發
通常的線程池只有一個任務隊列,可是對於Fork/Join框架來講,因爲Fork出的各個子任務實際上是平行關係,爲了提升效率,減小線程競爭,應該將這些平行的任務放到不一樣的隊列中去,如上圖中,大任務分解成三個子任務:子任務一、子任務二、子任務3,那麼就建立三個任務隊列,而後再建立3個工做線程與隊列一一對應。框架
因爲線程處理不一樣任務的速度不一樣,這樣就可能存在某個線程先執行完了本身隊列中的任務的狀況,這時爲了提高效率,咱們可讓該線程去「竊取」其它任務隊列中的任務,這就是所謂的工做竊取算法。dom
「工做竊取」的示意圖以下,當線程1執行完自身任務隊列中的任務後,嘗試從線程2的任務隊列中「竊取」任務:異步
對於通常的隊列來講,入隊元素都是在「隊尾」,出隊元素在「隊首」,要知足「工做竊取」的需求,任務隊列應該支持從「隊尾」出隊元素,這樣能夠減小與其它工做線程的衝突(由於正常狀況下,其它工做線程從「隊首」獲取本身任務隊列中的任務),知足這一需求的任務隊列其實就是咱們在juc-collections框架中介紹過的雙端阻塞隊列—— LinkedBlockingDeque。
固然,出於性能考慮,J.U.C中的Fork/Join框架並無直接利用LinkedBlockingDeque做爲任務隊列,而是本身從新實現了一個。
爲了給接下來的分析F/J框架組件作鋪墊,咱們先經過一個簡單示例看下Fork/Join框架的基本使用。async
假設有個很是大的long[]數組,經過FJ框架求解數組全部元素的和。
任務類定義,由於須要返回結果,因此繼承RecursiveTask,並覆寫compute方法。任務的fork經過ForkJoinTask的fork方法執行,join方法方法用於等待任務執行後返回:
public class ArraySumTask extends RecursiveTask<Long> { private final int[] array; private final int begin; private final int end; private static final int THRESHOLD = 100; public ArraySumTask(int[] array, int begin, int end) { this.array = array; this.begin = begin; this.end = end; } @Override protected Long compute() { long sum = 0; if (end - begin + 1 < THRESHOLD) { // 小於閾值, 直接計算 for (int i = begin; i <= end; i++) { sum += array[i]; } } else { int middle = (end + begin) / 2; ArraySumTask subtask1 = new ArraySumTask(this.array, begin, middle); ArraySumTask subtask2 = new ArraySumTask(this.array, middle + 1, end); subtask1.fork(); subtask2.fork(); long sum1 = subtask1.join(); long sum2 = subtask2.join(); sum = sum1 + sum2; } return sum; } }
調用方以下:
public class Main { public static void main(String[] args) { ForkJoinPool executor = new ForkJoinPool(); ArraySumTask task = new ArraySumTask(new int[10000], 0, 9999); ForkJoinTask future = executor.submit(task); // some time passed... if (future.isCompletedAbnormally()) { System.out.println(future.getException()); } try { System.out.println("result: " + future.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } }
注意:ForkJoinTask在執行的時候可能會拋出異常,可是沒辦法在主線程裏直接捕獲異常,因此ForkJoinTask提供了isCompletedAbnormally()
方法來檢查任務是否已經拋出異常或已經被取消了,而且能夠經過ForkJoinTask的getException
方法獲取異常.
在前幾小節中,咱們簡要介紹了Fork/Join框架和它的使用。本節咱們將更進一步,深刻F/J框架,瞭解它的各個組件的關係和核心設計思想,本節不會涉及太多的源碼分析,而是參考 Doug Lea的這篇論文《A Java Fork/Join Framework》,從宏觀上分析F/J框架,而後分析整個框架的調度流程,閱讀完本節後,在下一節——Fork/Join框架(2) 實現中,咱們再去深刻源碼會輕鬆不少。
F/J框架的實現很是複雜,內部大量運用了位操做和無鎖算法,撇開這些實現細節不談,該框架主要涉及三大核心組件:ForkJoinPool
(線程池)、ForkJoinTask
(任務)、ForkJoinWorkerThread
(工做線程),外加WorkQueue
(任務隊列):
ForkJoinPool做爲Executors框架的一員,從外部看與其它線程池並無什麼區別,僅僅是ExecutorService的一個實現類:
ForkJoinPool的主要工做以下:
invoke
/execute
/submit
方法提交任務);WorkQueue[]
)的初始化和管理;注意:ForkJoinPool提供了3類外部提交任務的方法: invoke、 execute、 submit,它們的主要區別在於任務的執行方式上。
ForkJoinPool對象的構建有兩種方式:
ForkJoinPool.commonPool()
靜態方法構造。JDK8之後,ForkJoinPool又提供了一個靜態方法commonPool(),這個方法返回一個ForkJoinPool內部聲明的靜態ForkJoinPool實例,主要是爲了簡化線程池的構建,這個ForkJoinPool實例能夠知足大多數的使用場景:
public static ForkJoinPool commonPool() { // assert common != null : "static init error"; return common; }
ForkJoinPool對外提供的3種構造器,其實最終都調用了下面這個構造器:
/** * @param parallelism 並行級別, 默認爲CPU核心數 * @param factory 工做線程工廠 * @param handler 異常處理器 * @param mode 調度模式: true表示FIFO_QUEUE; false表示LIFO_QUEUE * @param workerNamePrefix 工做線程的名稱前綴 */ private ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler, int mode, String workerNamePrefix) { this.workerNamePrefix = workerNamePrefix; this.factory = factory; this.ueh = handler; this.config = (parallelism & SMASK) | mode; long np = (long) (-parallelism); // offset ctl counts this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK); }
這些入參目前不用關注,咱們重點是mode
這個字段,ForkJoinPool支持兩種模式:
mode = asyncMode ? FIFO_QUEUE : LIFO_QUEUE
注意:這裏的同步/異步並非指F/J框架自己是採用同步模式仍是採用異步模式工做,而是指其中的工做線程的工做方式。在F/J框架中,每一個工做線程(Worker)都有一個屬於本身的任務隊列(WorkQueue),這是一個底層採用數組實現的 雙向隊列。
同步是指:對於工做線程(Worker)自身隊列中的任務,採用 後進先出(LIFO)的方式執行;異步是指:對於工做線程(Worker)自身隊列中的任務,採用 先進先出(FIFO)的方式執行。
從Fork/Join框架的描述上來看,「任務」必需要知足必定的條件:
所以,J.U.C提供了一個抽象類——ForkJoinTask,來做爲該類Fork/Join任務的抽象定義:
ForkJoinTask實現了Future接口,是一個異步任務,咱們在使用Fork/Join框架時,通常須要使用線程池來調度任務,線程池內部調度的其實都是ForkJoinTask任務(即便提交的是一個Runnable或Callable任務,也會被適配成ForkJoinTask)。
除了ForkJoinTask,Fork/Join框架還提供了兩個它的抽象實現,咱們在自定義ForkJoin任務時,通常繼承這兩個類:
public abstract class RecursiveAction extends ForkJoinTask<Void> { /** * 該任務的執行,子類覆寫該方法 */ protected abstract void compute(); public final Void getRawResult() { return null; } protected final void setRawResult(Void mustBeNull) { } protected final boolean exec() { compute(); return true; } }
public abstract class RecursiveTask<V> extends ForkJoinTask<V> { /** * 該任務的執行結果. */ V result; /** * 該任務的執行,子類覆寫該方法 */ protected abstract V compute(); public final V getRawResult() { return result; } protected final void setRawResult(V value) { result = value; } protected final boolean exec() { result = compute(); return true; } }
ForkJoinTask除了和ForkJoinPool 結合使用外,也能夠單獨使用,當咱們調用ForkJoinTask的fork方法時,其內部會經過
ForkJoinPool.commonPool()
方法建立線程池,而後將本身做爲任務提交給線程池。
Fork/Join框架中,每一個工做線程(Worker)都有一個本身的任務隊列(WorkerQueue), 因此須要對通常的Thread作些特性化處理,J.U.C提供了ForkJoinWorkerThread類做爲ForkJoinPool中的工做線程:
public class ForkJoinWorkerThread extends Thread { final ForkJoinPool pool; // 該工做線程歸屬的線程池 final ForkJoinPool.WorkQueue workQueue; // 對應的任務隊列 protected ForkJoinWorkerThread(ForkJoinPool pool) { super("aForkJoinWorkerThread"); // 指定工做線程名稱 this.pool = pool; this.workQueue = pool.registerWorker(this); } // ... }
ForkJoinWorkerThread 在構造過程當中,會保存所屬線程池信息和與本身綁定的任務隊列信息。同時,它會經過ForkJoinPool的registerWorker
方法將本身註冊到線程池中。
線程池中的每一個工做線程(ForkJoinWorkerThread)都有一個本身的任務隊列(WorkQueue),工做線程優先處理自身隊列中的任務(LIFO或FIFO順序,由線程池構造時的參數
mode
決定),自身隊列爲空時,以FIFO的順序隨機竊取其它隊列中的任務。
任務隊列(WorkQueue)是ForkJoinPool與其它線程池區別最大的地方,在ForkJoinPool內部,維護着一個WorkQueue[]
數組,它會在外部首次提交任務)時進行初始化:
volatile WorkQueue[] workQueues; // main registry
當經過線程池的外部方法( submit、 invoke、 execute)提交任務時,若是WorkQueue[]
沒有初始化,則會進行初始化;而後根據數組大小和線程隨機數(ThreadLocalRandom.probe
)等信息,計算出任務隊列所在的數組索引(這個索引必定是 偶數),若是索引處沒有任務隊列,則初始化一個,再將任務入隊。也就是說,經過外部方法提交的任務必定是在偶數隊列,沒有綁定工做線程。
WorkQueue做爲ForkJoinPool的內部類,表示一個雙端隊列。雙端隊列既能夠做爲棧使用(LIFO),也能夠做爲隊列使用(FIFO)。ForkJoinPool的「工做竊取」正是利用了這個特色,當工做線程從本身的隊列中獲取任務時,默認老是以棧操做(LIFO)的方式從棧頂取任務;當工做線程嘗試竊取其它任務隊列中的任務時,則是FIFO的方式。
咱們在ForkJoinPool一節中曾講過,能夠指定線程池的同步/異步模式(mode參數),其做用就在於此。同步模式就是「棧操做」,異步模式就是「隊列操做」,影響的就是工做線程從本身隊列中取任務的方式。
ForkJoinPool中的工做隊列能夠分爲兩類:
文字描述不太好理解,咱們經過示意圖來看下任務入隊和「工做竊取」的整個過程:
假設如今經過ForkJoinPool的submit方法提交了一個FuturetTask任務,參考 使用示例。
初始狀態下,線程池中的任務隊列爲空,workQueues == null
,也沒有工做線程:
此時會初始化任務隊列數組WorkQueue[]
,大小爲2的冪次,而後在某個槽位(偶數位)初始化一個任務隊列(WorkQueue
),並插入任務:
注意,因爲是非工做線程經過外部方法提交的任務,因此這個任務隊列並無綁定工做線程。
之因此是2的冪次,是因爲ForkJoinPool採用了一種隨機算法(相似 ConcurrentHashMap的隨機算法),該算法經過線程池隨機數(ThreadLocalRandom的probe值)和數組的大小計算出工做線程所映射的數組槽位,這種算法要求數組大小爲2的冪次。
首次提交任務後,因爲沒有工做線程,因此會建立一個工做線程,同時在某個奇數槽的位置建立一個與它綁定的任務隊列,以下圖:
ForkJoinWorkThread_1會隨機掃描workQueues中的隊列,直到找到一個能夠竊取的隊列——workQueues[2]
,而後從該隊列的base
端獲取任務並執行,並將base
加1:
竊取到的任務是FutureTask,ForkJoinWorkThread_1最終會調用它的compute
方法(子類繼承ForkJoinTask,覆寫compute,參考本文的使用示例),該方法中會新建兩個子任務,並執行它們的fork
方法:
@Override protected Long compute() { long sum = 0; if (end - begin + 1 < THRESHOLD) { // 小於閾值, 直接計算 for (int i = begin; i <= end; i++) { sum += array[i]; } } else { int middle = (end + begin) / 2; ArraySumTask subtask1 = new ArraySumTask(this.array, begin, middle); ArraySumTask subtask2 = new ArraySumTask(this.array, middle + 1, end); subtask1.fork(); subtask2.fork(); long sum1 = subtask1.join(); long sum2 = subtask2.join(); sum = sum1 + sum2; } return sum; }
以前說過,因爲是由工做線程ForkJoinWorkThread_1來調用FutureTask的fork
方法,因此會將這兩個子任務放入ForkJoinWorkThread_1自身隊列中:
而後,ForkJoinWorkThread_1會阻塞等待任務1和任務2的結果(先在subtask1.join
等待):
long sum1 = subtask1.join(); long sum2 = subtask2.join();
從這裏也能夠看出,任務放到哪一個隊列,實際上是 由調用線程來決定的(根據線程探針值probe計算隊列索引)。若是調用線程是工做線程,則必然有本身的隊列( task queue),則任務都會放到本身的隊列中;若是調用線程是其它線程(如主線程),則建立沒有工做線程綁定的任務隊列( submissions queue),而後存入任務。
ForkJoinWorkThread_1調用兩個子任務1和2的fork
方法,除了將它們放入本身的任務隊列外,還會致使新增一個工做線程ForkJoinWorkThread_2:
ForkJoinWorkThread_2運行後會像ForkJoinWorkThread_1那樣從其它隊列竊取任務,以下圖,從ForkJoinWorkThread_1隊列的base
端竊取一個任務(直接執行,並不會放入本身隊列):
竊取完成後,ForkJoinWorkThread_2會直接執行任務1,又回到了FutureTask子類的compute
方法,假設此時又fork
出兩個任務——任務三、任務4,則ForkJoinWorkThread_2最終會在任務3的join
方法上等待:
若是此時還有其它工做線程,則重複上述步驟:
竊取、執行、入隊、join阻塞、返回
。ForkJoinTask的join方法內部邏輯很是複雜,上述ForkJoinWorkThread_1和ForkJoinWorkThread_2目前都在等待任務的完成,但事實上,ForkJoinTask存在一種
互助機制,即工做線程之間能夠互相幫助執行任務,這裏不詳細展開,只須要知道,ForkJoinWorkThread_1和ForkJoinWorkThread_2可能會被其它工做線程喚醒。
咱們這裏假設ForkJoinWorkThread_2被其它某個工做線程喚醒,任務3和任務4的join方法依次返回告終果,那麼任務1的結果也會返回,因而ForkJoinWorkThread_1也被喚醒(它在任務1的join上等待),而後ForkJoinWorkThread_1會繼續執行任務2的join方法,若是任務2再也不分解,則最終返回任務1和任務2的合併結果,計算結束。
ForkJoinWorkThread_1和ForkJoinWorkThread_2喚醒執行完竊取到的任務後,尚未結束,它們還會去看看自身隊列中有無任務能夠執行。
/** * Executes the given task and any remaining local tasks. */ final void runTask(ForkJoinTask<?> task) { if (task != null) { scanState &= ~SCANNING; // mark as busy (currentSteal = task).doExec(); U.putOrderedObject(this, QCURRENTSTEAL, null); // release for GC execLocalTasks(); ForkJoinWorkerThread thread = owner; if (++nsteals < 0) // collect on overflow transferStealCount(pool); scanState |= SCANNING; if (thread != null) thread.afterTopLevelExec(); } }
上述ForkJoinPool.WorkQueue.runTask
方法中,doExec()
就是執行竊取的任務,而execLocalTasks
用於執行隊列自己的任務。
咱們假設此時的線程池是下面這種狀態:
工做線程ForkJoinWorkThread_1調用execLocalTasks方法一次性執行本身隊列中的全部任務,這時分紅兩種狀況:
1.異步模式(asyncMode==true)
若是構造線程池時,asyncMode爲true,表示以異步模式執行工做線程自身隊列中的任務,此時會從 base -> top
遍歷並執行全部任務。
2.同步模式(asyncMode==false)
若是構造線程池時,asyncMode爲false(默認狀況),表示以同步模式執行工做線程自身隊列中的任務,此時會從 top -> base
遍歷並執行全部任務。
任務的入隊老是在top
端,因此當以同步模式遍歷時,其實至關於棧操做(從棧頂pop元素);
若是是異步模式,至關於隊列的出隊操做(從base端poll元素)。
異步模式比較適合於那些不須要返回結果的任務。其實若是將隊列中的任務當作一棵樹(無環連通圖)的話,異步模式相似於圖的廣度優先遍歷,同步模式相似於圖的深度優先遍歷
假設此處以默認的同步模式遍歷,ForkJoinWorkThread_1從棧頂開始執行並移除任務,先執行任務2並移除,再執行任務1並:
本章簡要概述了Fork/Join框架的思想、主要組件及基本使用,Fork/Join框架的核心包含四大組件:ForkJoinTask任務類、ForkJoinPool線程池、ForkJoinWorkerThread工做線程、WorkQueue任務隊列。
本章經過示例,描述了各個組件的關係以及ForkJoin線程池的整個調度流程,F/J框架的核心來自於它的工做竊取及調度策略,能夠總結爲如下幾點:
下一章將經過源碼分析更深刻的理解Fork/Join調度過程。