Java多線程進階(四三)—— J.U.C之executors框架:Fork/Join框架(1) 原理

圖片描述

本文首發於一世流雲的專欄: https://segmentfault.com/blog...

1、引言

算法領域有一種基本思想叫作「分治」,所謂「分治」就是將一個難以直接解決的大問題,分割成一些規模較小的子問題,以便各個擊破,分而治之。java

好比:對於一個規模爲N的問題,若該問題能夠容易地解決,則直接解決;不然將其分解爲K個規模較小的子問題,這些子問題互相獨立且與原問題性質相同,遞歸地解這些子問題,而後將各子問題的解合併獲得原問題的解,這種算法設計策略叫作分治法。算法

許多基礎算法都運用了「分治」的思想,好比二分查找、快速排序等等。編程

基於「分治」的思想,J.U.C在JDK1.7時引入了一套Fork/Join框架。Fork/Join框架的基本思想就是將一個大任務分解(Fork)成一系列子任務,子任務能夠繼續往下分解,當多個不一樣的子任務都執行完成後,能夠將它們各自的結果合併(Join)成一個大結果,最終合併成大任務的結果:segmentfault

圖片參考自《Java併發編程的藝術》

2、工做竊取算法

從上述Fork/Join框架的描述能夠看出,咱們須要一些線程來執行Fork出的任務,在實際中,若是每次都建立新的線程執行任務,對系統資源的開銷會很大,因此Fork/Join框架利用了線程池來調度任務。數組

另外,這裏能夠思考一個問題,既然由線程池調度,根據咱們以前學習普通/計劃線程池的經驗,必然存在兩個要素:併發

  • 工做線程
  • 任務隊列

通常的線程池只有一個任務隊列,可是對於Fork/Join框架來講,因爲Fork出的各個子任務實際上是平行關係,爲了提升效率,減小線程競爭,應該將這些平行的任務放到不一樣的隊列中去,如上圖中,大任務分解成三個子任務:子任務一、子任務二、子任務3,那麼就建立三個任務隊列,而後再建立3個工做線程與隊列一一對應。框架

因爲線程處理不一樣任務的速度不一樣,這樣就可能存在某個線程先執行完了本身隊列中的任務的狀況,這時爲了提高效率,咱們可讓該線程去「竊取」其它任務隊列中的任務,這就是所謂的工做竊取算法dom

「工做竊取」的示意圖以下,當線程1執行完自身任務隊列中的任務後,嘗試從線程2的任務隊列中「竊取」任務:異步

圖片參考自《Java併發編程的藝術》

對於通常的隊列來講,入隊元素都是在「隊尾」,出隊元素在「隊首」,要知足「工做竊取」的需求,任務隊列應該支持從「隊尾」出隊元素,這樣能夠減小與其它工做線程的衝突(由於正常狀況下,其它工做線程從「隊首」獲取本身任務隊列中的任務),知足這一需求的任務隊列其實就是咱們在juc-collections框架中介紹過的雙端阻塞隊列—— LinkedBlockingDeque
固然,出於性能考慮,J.U.C中的Fork/Join框架並無直接利用LinkedBlockingDeque做爲任務隊列,而是本身從新實現了一個。

3、使用示例

爲了給接下來的分析F/J框架組件作鋪墊,咱們先經過一個簡單示例看下Fork/Join框架的基本使用。async

假設有個很是大的long[]數組,經過FJ框架求解數組全部元素的和。

任務類定義,由於須要返回結果,因此繼承RecursiveTask,並覆寫compute方法。任務的fork經過ForkJoinTask的fork方法執行,join方法方法用於等待任務執行後返回:

public class ArraySumTask extends RecursiveTask<Long> {
 
    private final int[] array;
    private final int begin;
    private final int end;
 
    private static final int THRESHOLD = 100;
 
    public ArraySumTask(int[] array, int begin, int end) {
        this.array = array;
        this.begin = begin;
        this.end = end;
    }
 
    @Override
    protected Long compute() {
        long sum = 0;
 
        if (end - begin + 1 < THRESHOLD) {      // 小於閾值, 直接計算
            for (int i = begin; i <= end; i++) {
                sum += array[i];
            }
        } else {
            int middle = (end + begin) / 2;
            ArraySumTask subtask1 = new ArraySumTask(this.array, begin, middle);
            ArraySumTask subtask2 = new ArraySumTask(this.array, middle + 1, end);
 
            subtask1.fork();
            subtask2.fork();
 
            long sum1 = subtask1.join();
            long sum2 = subtask2.join();
 
            sum = sum1 + sum2;
        }
        return sum;
    }
}

調用方以下:

public class Main {
    public static void main(String[] args) {
        ForkJoinPool executor = new ForkJoinPool();
        ArraySumTask task = new ArraySumTask(new int[10000], 0, 9999);
 
        ForkJoinTask future = executor.submit(task);
 
        // some time passed...
 
        if (future.isCompletedAbnormally()) {
            System.out.println(future.getException());
        }
 
        try {
            System.out.println("result: " + future.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
 
    }
}
注意:ForkJoinTask在執行的時候可能會拋出異常,可是沒辦法在主線程裏直接捕獲異常,因此ForkJoinTask提供了 isCompletedAbnormally()方法來檢查任務是否已經拋出異常或已經被取消了,而且能夠經過ForkJoinTask的 getException方法獲取異常.

4、核心組件

在前幾小節中,咱們簡要介紹了Fork/Join框架和它的使用。本節咱們將更進一步,深刻F/J框架,瞭解它的各個組件的關係和核心設計思想,本節不會涉及太多的源碼分析,而是參考 Doug Lea的這篇論文《A Java Fork/Join Framework》,從宏觀上分析F/J框架,而後分析整個框架的調度流程,閱讀完本節後,在下一節——Fork/Join框架(2) 實現中,咱們再去深刻源碼會輕鬆不少。

F/J框架的實現很是複雜,內部大量運用了位操做和無鎖算法,撇開這些實現細節不談,該框架主要涉及三大核心組件:ForkJoinPool(線程池)、ForkJoinTask(任務)、ForkJoinWorkerThread(工做線程),外加WorkQueue(任務隊列):

  • ForkJoinPool:ExecutorService的實現類,負責工做線程的管理、任務隊列的維護,以及控制整個任務調度流程;
  • ForkJoinTask:Future接口的實現類,fork是其核心方法,用於分解任務並異步執行;而join方法在任務結果計算完畢以後纔會運行,用來合併或返回計算結果;
  • ForkJoinWorkerThread:Thread的子類,做爲線程池中的工做線程(Worker)執行任務;
  • WorkQueue:任務隊列,用於保存任務;

ForkJoinPool

ForkJoinPool做爲Executors框架的一員,從外部看與其它線程池並無什麼區別,僅僅是ExecutorService的一個實現類:

clipboard.png

ForkJoinPool的主要工做以下:

  1. 接受外部任務的提交(外部調用ForkJoinPool的invoke/execute/submit方法提交任務);
  2. 接受ForkJoinTask自身fork出的子任務的提交;
  3. 任務隊列數組(WorkQueue[])的初始化和管理;
  4. 工做線程(Worker)的建立/管理。
注意:ForkJoinPool提供了3類外部提交任務的方法: invokeexecutesubmit,它們的主要區別在於任務的執行方式上。
  • 經過invoke方法提交的任務,調用線程直到任務執行完成纔會返回,也就是說這是一個同步方法,且有返回結果
  • 經過execute方法提交的任務,調用線程會當即返回,也就是說這是一個異步方法,且沒有返回結果
  • 經過submit方法提交的任務,調用線程會當即返回,也就是說這是一個異步方法,且有返回結果(返回Future實現類,能夠經過get獲取結果)。

ForkJoinPool對象的構建有兩種方式:

  1. 經過3種構造器的任意一種進行構造;
  2. 經過ForkJoinPool.commonPool()靜態方法構造。
JDK8之後,ForkJoinPool又提供了一個靜態方法commonPool(),這個方法返回一個ForkJoinPool內部聲明的靜態ForkJoinPool實例,主要是爲了簡化線程池的構建,這個ForkJoinPool實例能夠知足大多數的使用場景:
public static ForkJoinPool commonPool() {
     // assert common != null : "static init error";
     return common;
 }

ForkJoinPool對外提供的3種構造器,其實最終都調用了下面這個構造器:

/**
 * @param parallelism      並行級別, 默認爲CPU核心數
 * @param factory          工做線程工廠
 * @param handler          異常處理器
 * @param mode        調度模式: true表示FIFO_QUEUE; false表示LIFO_QUEUE
 * @param workerNamePrefix 工做線程的名稱前綴
 */
private ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler,
                     int mode, String workerNamePrefix) {
    this.workerNamePrefix = workerNamePrefix;
    this.factory = factory;
    this.ueh = handler;
    this.config = (parallelism & SMASK) | mode;
    long np = (long) (-parallelism); // offset ctl counts
    this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);

}
  • parallelism:默認值爲CPU核心數,ForkJoinPool裏工做線程數量與該參數有關,但它不表示最大線程數;
  • factory:工做線程工廠,默認是DefaultForkJoinWorkerThreadFactory,其實就是用來建立工做線程對象——ForkJoinWorkerThread;
  • handler:異常處理器;
  • config:保存parallelism和mode信息,供後續讀取;
  • ctl:線程池的核心控制字段

這些入參目前不用關注,咱們重點是mode這個字段,ForkJoinPool支持兩種模式:

  1. 同步模式(默認方式)
  2. 異步模式

mode = asyncMode ? FIFO_QUEUE : LIFO_QUEUE

注意:這裏的同步/異步並非指F/J框架自己是採用同步模式仍是採用異步模式工做,而是指其中的工做線程的工做方式。在F/J框架中,每一個工做線程(Worker)都有一個屬於本身的任務隊列(WorkQueue),這是一個底層採用數組實現的 雙向隊列
同步是指:對於工做線程(Worker)自身隊列中的任務,採用 後進先出(LIFO)的方式執行;異步是指:對於工做線程(Worker)自身隊列中的任務,採用 先進先出(FIFO)的方式執行。

ForkJoinTask

從Fork/Join框架的描述上來看,「任務」必需要知足必定的條件:

  1. 支持Fork,即任務自身的分解
  2. 支持Join,即任務結果的合併

所以,J.U.C提供了一個抽象類——ForkJoinTask,來做爲該類Fork/Join任務的抽象定義:

clipboard.png

ForkJoinTask實現了Future接口,是一個異步任務,咱們在使用Fork/Join框架時,通常須要使用線程池來調度任務,線程池內部調度的其實都是ForkJoinTask任務(即便提交的是一個Runnable或Callable任務,也會被適配成ForkJoinTask)。

除了ForkJoinTask,Fork/Join框架還提供了兩個它的抽象實現,咱們在自定義ForkJoin任務時,通常繼承這兩個類:

  • RecursiveAction:表示具備返回結果的ForkJoin任務
  • RecursiveTask:表示沒有返回結果的ForkJoin任務
public abstract class RecursiveAction extends ForkJoinTask<Void> {
    /**
     * 該任務的執行,子類覆寫該方法
     */
    protected abstract void compute();
 
    public final Void getRawResult() { return null; }
 
    protected final void setRawResult(Void mustBeNull) { }
 
    protected final boolean exec() {
        compute();
        return true;
    }
}
public abstract class RecursiveTask<V> extends ForkJoinTask<V> {
 
    /**
     * 該任務的執行結果.
     */
    V result;
 
    /**
     * 該任務的執行,子類覆寫該方法
     */
    protected abstract V compute();
 
    public final V getRawResult() {
        return result;
    }
 
    protected final void setRawResult(V value) {
        result = value;
    }
 
    protected final boolean exec() {
        result = compute();
        return true;
    }

}
ForkJoinTask除了和ForkJoinPool 結合使用外,也能夠單獨使用,當咱們調用ForkJoinTask的fork方法時,其內部會經過 ForkJoinPool.commonPool()方法建立線程池,而後將本身做爲任務提交給線程池。

ForkJoinWorkerThread

Fork/Join框架中,每一個工做線程(Worker)都有一個本身的任務隊列(WorkerQueue), 因此須要對通常的Thread作些特性化處理,J.U.C提供了ForkJoinWorkerThread類做爲ForkJoinPool中的工做線程:

public class ForkJoinWorkerThread extends Thread {
    
    final ForkJoinPool pool;                    // 該工做線程歸屬的線程池
    final ForkJoinPool.WorkQueue workQueue;     // 對應的任務隊列
 
    protected ForkJoinWorkerThread(ForkJoinPool pool) {
        super("aForkJoinWorkerThread");         // 指定工做線程名稱
        this.pool = pool;
        this.workQueue = pool.registerWorker(this);
    }
  
    // ...
}

ForkJoinWorkerThread 在構造過程當中,會保存所屬線程池信息和與本身綁定的任務隊列信息。同時,它會經過ForkJoinPool的registerWorker方法將本身註冊到線程池中。

線程池中的每一個工做線程(ForkJoinWorkerThread)都有一個本身的任務隊列(WorkQueue),工做線程優先處理自身隊列中的任務(LIFO或FIFO順序,由線程池構造時的參數 mode 決定),自身隊列爲空時,以FIFO的順序隨機竊取其它隊列中的任務。

WorkQueue

任務隊列(WorkQueue)是ForkJoinPool與其它線程池區別最大的地方,在ForkJoinPool內部,維護着一個WorkQueue[]數組,它會在外部首次提交任務)時進行初始化:

volatile WorkQueue[] workQueues; // main registry
 

當經過線程池的外部方法( submitinvokeexecute)提交任務時,若是 WorkQueue[]沒有初始化,則會進行初始化;而後根據數組大小和線程隨機數( ThreadLocalRandom.probe)等信息,計算出任務隊列所在的數組索引(這個索引必定是 偶數),若是索引處沒有任務隊列,則初始化一個,再將任務入隊。也就是說,經過外部方法提交的任務必定是在偶數隊列,沒有綁定工做線程。

WorkQueue做爲ForkJoinPool的內部類,表示一個雙端隊列。雙端隊列既能夠做爲使用(LIFO),也能夠做爲隊列使用(FIFO)。ForkJoinPool的「工做竊取」正是利用了這個特色,當工做線程從本身的隊列中獲取任務時,默認老是以棧操做(LIFO)的方式從棧頂取任務;當工做線程嘗試竊取其它任務隊列中的任務時,則是FIFO的方式。

咱們在ForkJoinPool一節中曾講過,能夠指定線程池的同步/異步模式(mode參數),其做用就在於此。同步模式就是「棧操做」,異步模式就是「隊列操做」,影響的就是工做線程從本身隊列中取任務的方式。

ForkJoinPool中的工做隊列能夠分爲兩類:

  • 有工做線程(Worker)綁定的任務隊列:數組下標始終是奇數,稱爲task queue,該隊列中的任務均由工做線程調用產生(工做線程調用FutureTask.fork方法);
  • 沒有工做線程(Worker)綁定的任務隊列:數組下標始終是偶數,稱爲submissions queue,該隊列中的任務所有由其它線程提交(也就是非工做線程調用execute/submit/invoke或者FutureTask.fork方法)。

5、線程池調度示例

文字描述不太好理解,咱們經過示意圖來看下任務入隊和「工做竊取」的整個過程:

假設如今經過ForkJoinPool的submit方法提交了一個FuturetTask任務,參考 使用示例

初始

初始狀態下,線程池中的任務隊列爲空,workQueues == null,也沒有工做線程:

clipboard.png


外部提交FutureTask任務

此時會初始化任務隊列數組WorkQueue[],大小爲2的冪次,而後在某個槽位(偶數位)初始化一個任務隊列(WorkQueue),並插入任務:

clipboard.png

注意,因爲是非工做線程經過外部方法提交的任務,因此這個任務隊列並無綁定工做線程。

之因此是2的冪次,是因爲ForkJoinPool採用了一種隨機算法(相似 ConcurrentHashMap的隨機算法),該算法經過線程池隨機數(ThreadLocalRandom的probe值)和數組的大小計算出工做線程所映射的數組槽位,這種算法要求數組大小爲2的冪次。

建立工做線程

首次提交任務後,因爲沒有工做線程,因此會建立一個工做線程,同時在某個奇數槽的位置建立一個與它綁定的任務隊列,以下圖:

clipboard.png


竊取任務

ForkJoinWorkThread_1會隨機掃描workQueues中的隊列,直到找到一個能夠竊取的隊列——workQueues[2],而後從該隊列的base端獲取任務並執行,並將base加1:

clipboard.png

竊取到的任務是FutureTask,ForkJoinWorkThread_1最終會調用它的compute方法(子類繼承ForkJoinTask,覆寫compute,參考本文的使用示例),該方法中會新建兩個子任務,並執行它們的fork方法:

@Override
protected Long compute() {
    long sum = 0;
 
    if (end - begin + 1 < THRESHOLD) {      // 小於閾值, 直接計算
        for (int i = begin; i <= end; i++) {
            sum += array[i];
        }
    } else {
        int middle = (end + begin) / 2;
        ArraySumTask subtask1 = new ArraySumTask(this.array, begin, middle);
        ArraySumTask subtask2 = new ArraySumTask(this.array, middle + 1, end);
 
        subtask1.fork();
        subtask2.fork();
 
        long sum1 = subtask1.join();
        long sum2 = subtask2.join();
 
        sum = sum1 + sum2;
    }
    return sum;
}

以前說過,因爲是由工做線程ForkJoinWorkThread_1來調用FutureTask的fork方法,因此會將這兩個子任務放入ForkJoinWorkThread_1自身隊列中:

clipboard.png

而後,ForkJoinWorkThread_1會阻塞等待任務1和任務2的結果(先在subtask1.join等待):

long sum1 = subtask1.join();
  long sum2 = subtask2.join();
從這裏也能夠看出,任務放到哪一個隊列,實際上是 由調用線程來決定的(根據線程探針值probe計算隊列索引)。若是調用線程是工做線程,則必然有本身的隊列( task queue),則任務都會放到本身的隊列中;若是調用線程是其它線程(如主線程),則建立沒有工做線程綁定的任務隊列( submissions queue),而後存入任務。

新的工做線程

ForkJoinWorkThread_1調用兩個子任務1和2的fork方法,除了將它們放入本身的任務隊列外,還會致使新增一個工做線程ForkJoinWorkThread_2:

clipboard.png

ForkJoinWorkThread_2運行後會像ForkJoinWorkThread_1那樣從其它隊列竊取任務,以下圖,從ForkJoinWorkThread_1隊列的base端竊取一個任務(直接執行,並不會放入本身隊列):

clipboard.png

竊取完成後,ForkJoinWorkThread_2會直接執行任務1,又回到了FutureTask子類的compute方法,假設此時又fork出兩個任務——任務三、任務4,則ForkJoinWorkThread_2最終會在任務3的join方法上等待:

clipboard.png

若是此時還有其它工做線程,則重複上述步驟: 竊取、執行、入隊、join阻塞、返回。ForkJoinTask的join方法內部邏輯很是複雜,上述ForkJoinWorkThread_1和ForkJoinWorkThread_2目前都在等待任務的完成,但事實上,ForkJoinTask存在一種 互助機制,即工做線程之間能夠互相幫助執行任務,這裏不詳細展開,只須要知道,ForkJoinWorkThread_1和ForkJoinWorkThread_2可能會被其它工做線程喚醒。

咱們這裏假設ForkJoinWorkThread_2被其它某個工做線程喚醒,任務3和任務4的join方法依次返回告終果,那麼任務1的結果也會返回,因而ForkJoinWorkThread_1也被喚醒(它在任務1的join上等待),而後ForkJoinWorkThread_1會繼續執行任務2的join方法,若是任務2再也不分解,則最終返回任務1和任務2的合併結果,計算結束。


自身隊列的任務執行

ForkJoinWorkThread_1和ForkJoinWorkThread_2喚醒執行完竊取到的任務後,尚未結束,它們還會去看看自身隊列中有無任務能夠執行。

/**
 * Executes the given task and any remaining local tasks.
 */
final void runTask(ForkJoinTask<?> task) {
    if (task != null) {
        scanState &= ~SCANNING; // mark as busy
        (currentSteal = task).doExec();
        U.putOrderedObject(this, QCURRENTSTEAL, null); // release for GC
        execLocalTasks();
        ForkJoinWorkerThread thread = owner;
        if (++nsteals < 0)      // collect on overflow
            transferStealCount(pool);
        scanState |= SCANNING;
        if (thread != null)
            thread.afterTopLevelExec();
    }
}

上述ForkJoinPool.WorkQueue.runTask方法中,doExec()就是執行竊取的任務,而execLocalTasks用於執行隊列自己的任務。

咱們假設此時的線程池是下面這種狀態:

clipboard.png

工做線程ForkJoinWorkThread_1調用execLocalTasks方法一次性執行本身隊列中的全部任務,這時分紅兩種狀況:

1.異步模式(asyncMode==true)

若是構造線程池時,asyncMode爲true,表示以異步模式執行工做線程自身隊列中的任務,此時會從 base -> top遍歷並執行全部任務。

2.同步模式(asyncMode==false)

若是構造線程池時,asyncMode爲false(默認狀況),表示以同步模式執行工做線程自身隊列中的任務,此時會從 top -> base 遍歷並執行全部任務。

任務的入隊老是在 top端,因此當以同步模式遍歷時,其實至關於棧操做(從棧頂pop元素);
若是是異步模式,至關於隊列的出隊操做(從base端poll元素)。
異步模式比較適合於那些不須要返回結果的任務。其實若是將隊列中的任務當作一棵樹(無環連通圖)的話,異步模式相似於圖的廣度優先遍歷,同步模式相似於圖的深度優先遍歷

假設此處以默認的同步模式遍歷,ForkJoinWorkThread_1從棧頂開始執行並移除任務,先執行任務2並移除,再執行任務1並:

clipboard.png

clipboard.png

6、總結

本章簡要概述了Fork/Join框架的思想、主要組件及基本使用,Fork/Join框架的核心包含四大組件:ForkJoinTask任務類、ForkJoinPool線程池、ForkJoinWorkerThread工做線程、WorkQueue任務隊列。

本章經過示例,描述了各個組件的關係以及ForkJoin線程池的整個調度流程,F/J框架的核心來自於它的工做竊取及調度策略,能夠總結爲如下幾點:

  1. 每一個Worker線程利用它本身的任務隊列維護可執行任務;
  2. 任務隊列是一種雙端隊列,支持LIFO的pushpop操做,也支持FIFO的take操做;
  3. 任務fork的子任務,只會push到它所在線程(調用fork方法的線程)的隊列;
  4. 工做線程既可使用LIFO經過pop處理本身隊列中的任務,也能夠FIFO經過poll處理本身隊列中的任務,具體取決於構造線程池時的asyncMode參數;
  5. 當工做線程本身隊列中沒有待處理任務時,它嘗試去隨機讀取(竊取)其它任務隊列的base端的任務;
  6. 當線程進入join操做,它也會去處理其它工做線程的隊列中的任務(本身的已經處理完了),直到目標任務完成(經過isDone方法);
  7. 當一個工做線程沒有任務了,而且嘗試從其它隊列竊取也失敗了,它讓出資源(經過使用yields, sleeps或者其它優先級調整)而且隨後會再次激活,直到全部工做線程都空閒了——此時,它們都阻塞在等待另外一個頂層線程的調用。

下一章將經過源碼分析更深刻的理解Fork/Join調度過程。

相關文章
相關標籤/搜索