ForkJoinPool大型圖文現場(一閱到底 vs 直接收藏)

知識回顧

併發工具類咱們已經講了不少,這些工具類的「目標」是讓咱們只關注任務自己,而且忽視線程間合做細節,簡化了併發編程難度的同時,也增長了不少安全性。工具類的對使用者的「目標」雖然一致,但每個工具類自己都有它獨特的應用場景,好比:html

將上面三種通用場景形象化展現一下:java

結合上圖相信你的腦海裏已經浮現出這幾個工具類的具體實現方式,感受這已經涵蓋了全部的併發場景。程序員

TYTS,以上這些方式的子線程接到任務後不會再繼續拆分紅「子子」任務,也就是說,子線程即使接到很大或很複雜的任務也得硬着頭皮努力執行完,很顯然這個大任務是問題關鍵算法

若是能把大任務拆分紅更小的子問題,直到子問題簡單到能夠直接求解就行了,這就是分治的思想編程

分治思想

在計算機科學中,分治法是一種很重要的算法。字面上的解釋是「分而治之」,就是把一個複雜的問題分紅兩個或更多的相同或類似的子問題,再把子問題分紅更小的子問題……直到最後子問題能夠簡單的直接求解,原問題的解就變成了子問題解的合併設計模式

這個技巧是不少高效算法的基礎,如排序算法 (快速排序,歸併排序),傅立葉變換 (快速傅立葉變換)……,若是你是搞大數據的,MapReduce 就是分支思想的典型,若是你想更詳細的理解分治相關的算法,請參考這篇一文圖解分治算法和思想api

結合上面的描述,相信你腦海中已經構建出來分治的模型了:數組

那全部的大任務都能用分治算法來解決嗎?很顯然不是的安全

分治法適用的狀況

整體來講,分治法所能解決的問題通常具備如下幾個特徵:多線程

  1. 該問題的規模縮小到必定的程度就能夠容易地解決
  2. 該問題能夠分解爲若干個規模較小的相同問題,即該問題具備最優子結構性質。
  3. 利用該問題分解出的子問題的解能夠合併爲該問題的解;
  4. 該問題所分解出的各個子問題是相互獨立的,即子問題之間不包含公共的子子問題

瞭解了分治算法的核心思想,咱們就來看看 Java 是如何利用分治思想拆分與合併任務的吧

ForkJoin

有子任務,天然要用到多線程。咱們很早以前說過,執行子任務的線程不容許單首創建,要用線程池管理。秉承相同設計理念,再結合分治算法, ForkJoin 框架中就出現了 ForkJoinPool 和 ForkJoinTask。正所謂:

天對地,雨對風。大陸對長空。山花對海樹,赤曰對蒼穹

套用已有知識,簡單理解就是這樣滴:

咱們以前說過無數次,JDK 不會重複造輪子,這裏談及類似是爲了讓你們有個簡單的直觀印象,內裏確定有所差異,咱們先大體看一下這兩個類:

ForkJoinTask

又是這個男人,Doug Lea,怎麼就那麼牛(破音)

/**
 * Abstract base class for tasks that run within a {@link ForkJoinPool}.
 * A {@code ForkJoinTask} is a thread-like entity that is much
 * lighter weight than a normal thread.  Huge numbers of tasks and
 * subtasks may be hosted by a small number of actual threads in a
 * ForkJoinPool, at the price of some usage limitations.
 *
 * @since 1.7
 * @author Doug Lea
 */
public abstract class ForkJoinTask<V> implements Future<V>, Serializable

能夠看到 ForkJoinTask 實現了 Future 接口(那就是具備 Future 接口的特性),一樣如其名,fork()join() 天然是它的兩個核心方法

  • fork() : 異步執行一個子任務(上面說的拆分)
  • join() : 阻塞當前線程等待子任務的執行結果(上面說的合併)

另外,從上面代碼中能夠看出,ForkJoinTask 是一個抽象類,在分治模型中,它還有兩個抽象子類 RecursiveActionRecursiveTask

那這兩個子抽象類有什麼差異呢?若是你打開 IDE,你應該一眼就能看出差異,so easy

public abstract class RecursiveAction extends ForkJoinTask<Void>{
    ...
  /**
   * The main computation performed by this task.
   */
  protected abstract void compute();
  ...
}



public abstract class RecursiveTask<V> extends ForkJoinTask<V>{
    ...
  protected abstract void compute();
  ...
}

兩個類裏面都定義了一個抽象方法 compute() ,須要子類重寫實現具體邏輯

那子類要遵循什麼邏輯重寫這個方法呢?

遵循分治思想,重寫的邏輯很簡單,就是回答三個問題:

  • 何時進一步拆分任務?
  • 何時知足最小可執行任務,即再也不進行拆分?
  • 何時彙總子任務結果

用「僞代碼」再翻譯一下上面這段話,大概就是這樣滴:

if(任務小到不用繼續拆分){
    直接計算獲得結果
}else{
    拆分子任務
    調用子任務的fork()進行計算
    調用子任務的join()合併計算結果
}

(做爲程序員,若是你寫過遞歸運算,這個邏輯理解起來是很是簡單的)

介紹到這裏,就能夠用 ForkJoin 幹些事情了——經典 Fibonacci 計算就能夠用分治思想(不信,你逐條按照上面分治算法適用狀況自問自答一下?),直接借用官方 Docs (注意看 compute 方法),額外添加個 main 方法來看一下:

@Slf4j
public class ForkJoinDemo {
    public static void main(String[] args) {
        int n = 20;

        // 爲了追蹤子線程名稱,須要重寫 ForkJoinWorkerThreadFactory 的方法
        final ForkJoinPool.ForkJoinWorkerThreadFactory factory = pool -> {
            final ForkJoinWorkerThread worker = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
            worker.setName("my-thread" + worker.getPoolIndex());
            return worker;
        };

        //建立分治任務線程池,能夠追蹤到線程名稱
        ForkJoinPool forkJoinPool = new ForkJoinPool(4, factory, null, false);

        // 快速建立 ForkJoinPool 方法
        // ForkJoinPool forkJoinPool = new ForkJoinPool(4);

        //建立分治任務
        Fibonacci fibonacci = new Fibonacci(n);

        //調用 invoke 方法啓動分治任務
        Integer result = forkJoinPool.invoke(fibonacci);
        log.info("Fibonacci {} 的結果是 {}", n, result);
    }
}

@Slf4j
class Fibonacci extends RecursiveTask<Integer> {
    final int n;
    Fibonacci(int n) {
        this.n = n;
    }

    @Override
    public Integer compute() {
        //和遞歸相似,定義可計算的最小單元
        if (n <= 1) {
            return n;
        }
        // 想查看子線程名稱輸出的能夠打開下面註釋
        //log.info(Thread.currentThread().getName());

        Fibonacci f1 = new Fibonacci(n - 1);
        // 拆分紅子任務
        f1.fork();
        Fibonacci f2 = new Fibonacci(n - 2);
        // f1.join 等待子任務執行結果
        return f2.compute() + f1.join();
    }
}

執行結果以下:

進展到這裏,相信基本的使用就已經搞定了,上面代碼中使用了 ForkJoinPool,那問題來了:

池化既然是一類思想,Java 已經有了 ThreadPoolExecutor ,爲何又要搞出個 ForkJoinPool 呢?

藉助下面這張圖,先來回憶一下 ThreadPoolExecutor 的實現原理(詳情請看爲何要使用線程池):

一眼就能看出來這是典型的生產者/消費者模式,消費者線程都從一個共享的 Task Queue 中消費提交的任務。ThreadPoolExecutor 簡單的並行操做主要是爲了執行時間不肯定的任務(I/O 或定時任務等)

JDK 重複造輪子是不可能的,分治思想其實也能夠理解成一種父子任務依賴的關係,當依賴層級很是深,用 ThreadPoolExecutor 來處理這種關係很顯然是不太現實的,因此 ForkJoinPool 做爲功能補充就出現了

ForkJoinPool

任務拆分後有依賴關係,還得減小線程之間的競爭,那就讓線程執行屬於本身的 task 就能夠了唄,因此較 ThreadPoolExecutor 的單個 TaskQueue 的形式,ForkJoinPool 是多個 TaskQueue的形式,簡單用圖來表示,就是這樣滴:

有多個任務隊列,因此在 ForkJoinPool 中就有一個數組形式的成員變量 WorkQueue[]。那問題又來了

任務隊列有多個,提交的任務放到哪一個隊列中呢?(上圖中的 Router Rule 部分)

這就須要一套路由規則,從上面的代碼 Demo 中能夠理解,提交的任務主要有兩種:

  • 有外部直接提交的(submission task
  • 也有任務本身 fork 出來的(worker task

爲了進一步區分這兩種 task,Doug Lea 就設計一個簡單的路由規則:

  • submission task 放到 WorkQueue 數組的「偶數」下標中
  • worker task 放在 WorkQueue 的「奇數」下標中,而且只有奇數下標纔有線程( worker )與之相對

應局部豐富一下上圖就是這樣滴:

每一個任務執行時間都是不同的(固然是在 CPU 眼裏),執行快的線程的工做隊列的任務就多是空的,爲了最大化利用 CPU 資源,就容許空閒線程拿取其它任務隊列中的內容,這個過程就叫作 work-stealing (工做竊取)

當前線程要執行一個任務,其餘線程還有可能過來竊取任務,這就會產生競爭,爲了減小競爭,WorkQueue 就設計成了一個雙端隊列:

  • 支持 LIFO(last-in-first-out) 的push(放)和pop(拿)操做——操做 top 端
  • 支持 FIFO (first-in-first-out) 的 poll (拿)操做——操做 base 端

線程(worker)操做本身的 WorkQueue 默認是 LIFO 操做(可選FIFO),當線程(worker)嘗試竊取其餘 WorkQueue 裏的任務時,這個時候執行的是FIFO操做,即從 base 端竊取,用圖豐富一下就是這樣滴:

這樣的好處很是明顯了:

  1. LIFO 操做只有對應的 worker 才能執行,push和pop不須要考慮併發
  2. 拆分時,越大的任務越在WorkQueue的base端,儘早分解,可以儘快進入計算

從 WorkQueue 的成員變量的修飾符中也能看出一二了(base 有 volatile 修飾,而 top 卻沒有):

volatile int base;         // index of next slot for poll
int top;                   // index of next slot for push

到這裏,相信你已經瞭解 ForkJoinPool 的基本實現原理了,但也會伴隨着不少疑問(這都是怎麼實現的?),好比:

  • 有競爭就須要鎖,ForkJoinPool 是如何控制狀態的呢?
  • ForkJoinPool 的線程數是怎麼控制的呢?
  • 上面說的路由規則的具體邏輯是什麼呢?
  • ......

保留住這些問題,一點點看源碼來了解一下吧:

源碼分析(JDK 1.8)

ForkJoinPool 的源碼涉及到大量的位運算,這裏會把核心部分說清楚,想要理解的更深刻,還須要你們本身一點點追蹤查看

結合上面的鋪墊,你應該知道 ForkJoinPool 裏有三個重要的角色:

  • ForkJoinWorkerThread(繼承 Thread):就是咱們上面說的線程(Worker)
  • WorkQueue:雙向的任務隊列
  • ForkJoinTask:Worker 執行的對象

源碼分析的整個流程也是圍繞這幾個類的方法來講明,但在瞭解這三個角色以前,咱們須要先了解 ForkJoinPool 都爲這三個角色鋪墊了哪些內容

故事就得從 ForkJoinPool 的構造方法提及

ForkJoinPool 構造方法

public ForkJoinPool() {
  this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
       defaultForkJoinWorkerThreadFactory, null, false);
}


public ForkJoinPool(int parallelism) {
  this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
}


public ForkJoinPool(int parallelism,
                    ForkJoinWorkerThreadFactory factory,
                    UncaughtExceptionHandler handler,
                    boolean asyncMode) {
  this(checkParallelism(parallelism),
       checkFactory(factory),
       handler,
       asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
       "ForkJoinPool-" + nextPoolId() + "-worker-");
  checkPermission();
}

除了以上三個構造方法以外,在 JDK1.8 中還增長了另一種初始化 ForkJoinPool 對象的方式(QQ:這是什麼設計模式?):

static final ForkJoinPool common;

/**
     * @return the common pool instance
     * @since 1.8
     */
public static ForkJoinPool commonPool() {
  // assert common != null : "static init error";
  return common;
}

Common 是在靜態塊裏面初始化的(只會被執行一次):

common = java.security.AccessController.doPrivileged
            (new java.security.PrivilegedAction<ForkJoinPool>() {
                public ForkJoinPool run() { return makeCommonPool(); }});

private static ForkJoinPool makeCommonPool() {
  int parallelism = -1;

  ... 其餘默認初始化內容 

    if (parallelism < 0 && // default 1 less than #cores
        (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
      parallelism = 1;
  if (parallelism > MAX_CAP)
    parallelism = MAX_CAP;

  // 執行上面的構造方法
  return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
                          "ForkJoinPool.commonPool-worker-");
}

由於這是一個單例通用的 ForkJoinPool,因此切記:

若是使用通用 ForkJoinPool,最好只作 CPU 密集型的計算操做,不要有不肯定性的 I/O 內容在任務裏面,以防拖垮總體

上面全部的構造方法最後都會調用這個私有方法:

private ForkJoinPool(int parallelism,
                     ForkJoinWorkerThreadFactory factory,
                     UncaughtExceptionHandler handler,
                     int mode,
                     String workerNamePrefix) {
  this.workerNamePrefix = workerNamePrefix;
  this.factory = factory;
  this.ueh = handler;
  this.config = (parallelism & SMASK) | mode;
  long np = (long)(-parallelism); // offset ctl counts
  this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}

參數有點多,在這裏解釋一下每一個參數的含義:

序號 參數名 描述/解釋
1 parallelism 並行度,這並非定義的線程數,具體線程數,以及 WorkQueue 的長度等都是根據這個並行度來計算的,經過上面 makeCommonPool 方法能夠知道,parallelism 默認值是 CPU 核心線程數減 1
2 factory 很常見了,建立 ForkJoinWorkerThread 的工廠接口
3 handler 每一個線程的異常處理器
4 mode 上面說的 WorkQueue 的模式,LIFO/FIFO;
5 workerNamePrefix ForkJoinWorkerThread的前綴名稱
6 ctl 線程池的核心控制線程字段

在構造方法中就已經有位運算了,太難了:

想知道 ForkJoinPool 的成員變量 config 要表達的意思,就要仔細拆開來看

static final int SMASK        = 0xffff;        // short bits == max index

this.config = (parallelism & SMASK) | mode;

parallelism & SMASK 其實就是要保證並行度的值不能大於 SMASK,上面全部的構造方法在傳入 parallelism 的時候都會調用 checkParallelism 來檢查合法性:

static final int MAX_CAP      = 0x7fff;        // max #workers - 1


private static int checkParallelism(int parallelism) {
        if (parallelism <= 0 || parallelism > MAX_CAP)
            throw new IllegalArgumentException();
        return parallelism;
    }

能夠看到 parallelism 的最大值就是 MAX_CAP 了,0x7fff 確定小於0xffff。因此 config 的值其實就是:

this.config = parallelism | mode;

這裏假設 parallelism 就是 MAX_CAP , 而後與 mode 進行或運算,其中 mode 有三種:

  • LIFO_QUEUE
  • FIFO_QUEUE
  • SHARED_QUEUE

下面以 LIFO_QUEUE 和 FIFO_QUEUE 舉例說明:

// Mode bits for ForkJoinPool.config and WorkQueue.config
 static final int MODE_MASK    = 0xffff << 16;  // top half of int
 static final int LIFO_QUEUE   = 0;
 static final int FIFO_QUEUE   = 1 << 16;
 static final int SHARED_QUEUE = 1 << 31;       // must be negative

因此 parallelism | mode 根據 mode 的不一樣會產生兩種結果,可是會獲得一個確認的信息:

config 的第 17 位表示模式,低 15 位表示並行度 parallelism

當咱們須要從 config 中獲取模式 mode 時候,只須要用mode 掩碼 (MODE_MASK)和 config 作與運算就能夠了

因此一張圖歸納 config 就是:

long np = (long)(-parallelism); // offset ctl counts

上面這段代碼就是將並行度 parallelism 補碼轉換爲 long 型,以 MAX_CAP 做爲並行度爲例,np 的值就是

這個 np 的值,就會用做 ForkJoinPool 成員變量 ctl 的計算:

// Active counts 活躍線程數
private static final int  AC_SHIFT   = 48;
private static final long AC_UNIT    = 0x0001L << AC_SHIFT;
private static final long AC_MASK    = 0xffffL << AC_SHIFT;

// Total counts 總線程數
private static final int  TC_SHIFT   = 32;
private static final long TC_UNIT    = 0x0001L << TC_SHIFT;
private static final long TC_MASK    = 0xffffL << TC_SHIFT;
private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); // sign

// 計算 ctl 
this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
  • np << AC_SHIFT 即 np 向左移動 48 位,這樣原來的低 16 位變成了高 16 位,再用 AC 掩碼(AC_MASK) 作與運算,也就是說 ctl 的 49 ~ 64 位表示活躍線程數
  • np << TC_SHIFT 即 np 向左移動 32 位,這樣原來的低 16 位變成了 33 ~ 48 位,再用 TC 掩碼作與運算,也就是說 ctl 的 33 ~ 48 位表示總線程數

最後兩者再進行或運算,若是並行度仍是 MAX_CAP ,那 ctl 的最後結果就是:

到這裏,咱們才閱讀完一個構造函數的內容,從最終的結論能夠看出,初始化後 AC = TC,而且 ctl 是一個小於零的數,ctl 是 64 位的 long 類型,低 32 位是如何構造的並無在構造函數中體現出來,但註釋給了明確的說明:

/*
* Bits and masks for field ctl, packed with 4 16 bit subfields:
* AC: Number of active running workers minus target parallelism
* TC: Number of total workers minus target parallelism
* SS: version count and status of top waiting thread
* ID: poolIndex of top of Treiber stack of waiters
*
* When convenient, we can extract the lower 32 stack top bits
* (including version bits) as sp=(int)ctl.  The offsets of counts
* by the target parallelism and the positionings of fields makes
* it possible to perform the most common checks via sign tests of
* fields: When ac is negative, there are not enough active
* workers, when tc is negative, there are not enough total
* workers.  When sp is non-zero, there are waiting workers.  To
* deal with possibly negative fields, we use casts in and out of
* "short" and/or signed shifts to maintain signedness.
*
* Because it occupies uppermost bits, we can add one active count
* using getAndAddLong of AC_UNIT, rather than CAS, when returning
* from a blocked join.  Other updates entail multiple subfields
* and masking, requiring CAS.
*/

這段註釋主要說明了低 32 位的做用(後面會從源碼中體現出來,這裏先有個印象會對後面源碼閱讀有幫助),按註釋含義先完善一下 ctl 的值:

  • SS:棧頂工做線程狀態和版本數(每個線程在掛起時都會持有前一個等待線程所在工做隊列的索引,由此構成一個等待的工做線程棧,棧頂是最新等待的線程),第一位表示狀態 1:不活動(inactive)0:活動(active),後15表示版本號,防止 ABA 問題
  • ID: 棧頂工做線程所在工做隊列的索引

註釋中還說,另 sp=(int)ctl,即獲取 64 位 ctl 的低 32 位(SS | ID),由於低 32 位都是建立出線程以後纔會存在的值,因此推斷出,若是 sp != 0, 就存在等待的工做線程,喚醒使用就行,不用建立新的線程。這樣就經過 ctl 能夠獲取到有關線程所須要的一切信息了

除了構造方法所構建的成員變量,ForkJoinPool 還有一個很是重要的成員變量 runState,和你以前瞭解的知識同樣,線程池也須要狀態來進行管理

volatile int runState;               // lockable status

// runState bits: SHUTDOWN must be negative, others arbitrary powers of two
private static final int  RSLOCK     = 1;       //線程池被鎖定
private static final int  RSIGNAL    = 1 << 1;    //線程池有線程須要喚醒
private static final int  STARTED    = 1 << 2;  //線程池已經初始化
private static final int  STOP       = 1 << 29;    //線程池中止
private static final int  TERMINATED = 1 << 30; //線程池終止
private static final int  SHUTDOWN   = 1 << 31; //線程池關閉

runState 有上面 6 種狀態切換,按註釋所言,只有 SHUTDOWN 狀態是負數,其餘都是整數,在併發環境更改狀態必然要用到鎖,ForkJoinPool 對線程池加鎖和解鎖分別由 lockRunStateunlockRunState 來實現 (這兩個方法能夠暫且不用深刻理解,能夠暫時跳過,只須要理解它們是幫助安全更改線程池狀態的鎖便可)

不深刻了解能夠,可是我不能不寫啊...... 你待會不是得回看嗎?

lockRunState

/**
* Acquires the runState lock; returns current (locked) runState.
*/
// 從方法註釋中看到,該方法必定會返回 locked 的 runState,也就是說必定會加鎖成功
private int lockRunState() {
  int rs;
  return ((((rs = runState) & RSLOCK) != 0 ||
           !U.compareAndSwapInt(this, RUNSTATE, rs, rs |= RSLOCK)) ?
          awaitRunStateLock() : rs);
}
  • 由於 RSLOCK = 1,若是 runState & RSLOCK == 0,則說明目前沒有加鎖,進入或運算的下半段 CAS
  • 先經過 CAS 嘗試加鎖,嘗試成功直接返回,嘗試失敗則要調用 awaitRunStateLock 方法
/**
* Spins and/or blocks until runstate lock is available.  See
* above for explanation.
*/
private int awaitRunStateLock() {
  Object lock;
  boolean wasInterrupted = false;
  for (int spins = SPINS, r = 0, rs, ns;;) {
    //判斷是否加鎖(==0表示未加鎖)
    if (((rs = runState) & RSLOCK) == 0) {
      // 經過CAS加鎖
      if (U.compareAndSwapInt(this, RUNSTATE, rs, ns = rs | RSLOCK)) {
        if (wasInterrupted) {
          try {
            // 重置線程終端標記
            Thread.currentThread().interrupt();
          } catch (SecurityException ignore) {
            // 這裏居然 catch 了個寂寞
          }
        }
        // 加鎖成功返回最新的 runState,for 循環的惟一正常出口
        return ns;
      }
    }
    else if (r == 0)
      r = ThreadLocalRandom.nextSecondarySeed();
    else if (spins > 0) {
      r ^= r << 6; r ^= r >>> 21; r ^= r << 7; // xorshift
      if (r >= 0)
        --spins;
    }
    // Flag1 若是是其餘線程正在初始化佔用鎖,則調用 yield 方法讓出 CPU,讓其快速初始化
    else if ((rs & STARTED) == 0 || (lock = stealCounter) == null)
      Thread.yield();   // initialization race
    // Flag2 若是其它線程持有鎖,而且線程池已經初始化,則將喚醒位標記爲1
    else if (U.compareAndSwapInt(this, RUNSTATE, rs, rs | RSIGNAL)) {
      // 進入互斥鎖
      synchronized (lock) {
        // 再次判斷,若是等於0,說明進入互斥鎖前恰好有線程進行了喚醒,就不用等待,直接進行喚醒操做便可,不然就進入等待
        if ((runState & RSIGNAL) != 0) {
          try {
            lock.wait();
          } catch (InterruptedException ie) {
            if (!(Thread.currentThread() instanceof
                  ForkJoinWorkerThread))
              wasInterrupted = true;
          }
        }
        else
          lock.notifyAll();
      }
    }
  }
}

上面代碼 33 ~ 34 (Flag1)行以及 36 ~ 50 (Flag2) 行,若是你沒看後續代碼,如今來理解是有些困難的,我這裏先提早說明一下:

Flag1: 當完整的初始化 ForkJoinPool 時,直接利用了 stealCounter 這個原子變量,由於初始化時(調用 externalSubmit 時),纔會對 StealCounter 賦值。因此,這裏的邏輯是,當狀態不是 STARTED 或者 stealCounter 爲空,讓出線程等待,也就是說,別的線程還沒初始化徹底,讓其繼續佔用鎖初始化便可

Flag2: 咱們在講等待/通知模型時就說,不要讓無限自旋嘗試,若是資源不知足就等待,若是資源知足了就通知,因此,若是 (runState & RSIGNAL) == 0 成立,說明有線程須要喚醒,直接喚醒就好,不然也別浪費資源,主動等待一會

當閱讀到這的代碼時,立刻就拋出來兩個問題:

Q1: 既然是加鎖,爲何不用已有的輪子 ReentrantLock 呢?

PS:若是你讀過併發系列 Java AQS隊列同步器以及ReentrantLock的應用 ,你會知道 ReentrantLock 是用一個完整字段 state 來控制同步狀態。但這裏在競爭鎖的時候還會判斷線程池的狀態,若是是初始化狀態主動 yield 放棄 CPU 來減小競爭;另外,用一個完整的 runState 不一樣位來表示狀態也體現出更細的粒度吧

Q2: synchronized 大法雖好,可是咱們都知道這是比較重量級的鎖,爲何還在這裏應用了呢?

PS: 首先 synchronized 通過不斷優化,沒有它剛誕生時那麼重,另外按照 Flag 2 的代碼含義,進入 synchronized 同步塊的機率仍是很低的,能夠用最簡單的方式穩穩兜底(奧卡姆剃刀了原理?)

有加鎖天然要解鎖,向下看 unlockRunState

unlockRunState

解鎖的邏輯相對簡單多了,整體目標是清除鎖標記位。若是順利將狀態修改成目標狀態,天然解鎖成功;不然表示有別的線程進入了wait,須要調用notifyAll喚醒,從新嘗試競爭

/**
     * Unlocks and sets runState to newRunState.
     *
     * @param oldRunState a value returned from lockRunState
     * @param newRunState the next value (must have lock bit clear).
     */
    private void unlockRunState(int oldRunState, int newRunState) {
        if (!U.compareAndSwapInt(this, RUNSTATE, oldRunState, newRunState)) {
            Object lock = stealCounter;
            runState = newRunState;              // clears RSIGNAL bit
            if (lock != null)
                synchronized (lock) { lock.notifyAll(); }
        }
    }

這兩個方法貫穿着後續代碼分析的始終,多注意 unlockRunState 的入參便可,另外你也看到了通知都是用的 notifyAll,而不是 notify,這個問題咱們以前重點說明過,你還記得爲何嗎?若是不記得,打開併發編程之等待通知機制 回憶一下吧

第一層知識鋪墊已經差很少了,前進

invoke/submit/execute

回到本文最開始帶有 main 函數的 demo,咱們向 ForkJoinPool 提交任務調用的是 invoke 方法, 其實 ForkJoinPool 還支持 submit 和 execute 兩種方式來提交任務。併發的玩法很是相似,這三類方法的做業也很好區分:

  • invoke:提交任務,並等待返回執行結果
  • submit:提交併馬上返回任務,ForkJoinTask實現了Future,能夠充分利用 Future 的特性
  • execute:只提交任務

在這三大類基礎上又重載了幾個更細粒度的方法,這裏不一一列舉:

public <T> T invoke(ForkJoinTask<T> task) {
  if (task == null)
    throw new NullPointerException();
  externalPush(task);
  return task.join();
}

public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
  if (task == null)
    throw new NullPointerException();
  externalPush(task);
  return task;
}

public void execute(ForkJoinTask<?> task) {
  if (task == null)
    throw new NullPointerException();
  externalPush(task);
}

相信你已經發現了,提交任務的方法都會調用 externalPush(task) 這個用法,源碼的主角終於要登場了

可是......

若是你看 externalPush 代碼,第一行就是聲明一個 WorkQueue 數組變量,爲了後續流程更加絲滑,咱還得鋪墊一點 WorkQueue 的知識(又要鋪墊)

WorkQueue

一看這麼多成員變量,仍是很慌的,不過,咱們只須要把我幾個主要的就足夠了

//初始隊列容量
static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
//最大隊列容量
static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M

// Instance fields
volatile int scanState;    // versioned, <0: inactive; odd:scanning
int stackPred;             // pool stack (ctl) predecessor  前任池(WorkQueue[])索引,由此構成一個棧
int nsteals;               // number of steals  偷取的任務個數
int hint;                  // randomization and stealer index hint  記錄偷取者的索引,方便後面順藤摸瓜
int config;                // pool index and mode
volatile int qlock;        // 1: locked, < 0: terminate; else 0
volatile int base;         // index of next slot for poll
int top;                   // index of next slot for push
ForkJoinTask<?>[] array;   // the elements (initially unallocated)  任務數組
final ForkJoinPool pool;   // the containing pool (may be null)
final ForkJoinWorkerThread owner; // owning thread or null if shared  當前工做隊列的工做線程,共享模式下爲null
volatile Thread parker;    // == owner during call to park; else null  調用park阻塞期間爲owner,其餘狀況爲null
volatile ForkJoinTask<?> currentJoin;  // task being joined in awaitJoin  記錄當前join來的任務
volatile ForkJoinTask<?> currentSteal; // mainly used by helpStealer  記錄從其餘工做隊列偷取過來的任務

咱們上面說了,WorkQueue 是一個雙端隊列,線程池有 runState,WorkQueue 有 scanState

  • 小於零:inactive (未激活狀態)
  • 奇數:scanning (掃描狀態)
  • 偶數:running (運行狀態)

操做線程池須要鎖,操做隊列也是須要鎖的,qlock 就派上用場了

  • 1: 鎖定
  • 0:未鎖定
  • 小於零:終止狀態

WorkQueue 中也有個 config,可是和 ForkJoinPool 中的是不同的,WorkQueue 中的config 記錄了該 WorkQueue 在 WorkQueue[] 數組的下標以及 mode

其餘字段的含義咱們就寫在代碼註釋中吧,主角從新登場,此次是真的

externalPush

文章前面說過,task 會細分紅 submission taskworker taskworker taskfork 出來的,那從這個入口進入的,天然也就是 submission task 了,也就是說:

  • 經過invoke()submit() | execute() 等方法提交的 task, 是 submission task,會放到 WorkQueue 數組的偶數索引位置
  • 調用 fork() 方法生成出的任務,叫 worker task,會放到 WorkQueue 數組的奇數索引位置

該方法上的註釋也寫的很清楚,具體請參考代碼註釋

/**
     * Tries to add the given task to a submission queue at
     * submitter's current queue. Only the (vastly) most common path
     * is directly handled in this method, while screening for need
     * for externalSubmit.
     *
     * @param task the task. Caller must ensure non-null.
     */
    final void externalPush(ForkJoinTask<?> task) {
        WorkQueue[] ws; WorkQueue q; int m;
          //Flag1: 經過ThreadLocalRandom產生隨機數,用於下面計算槽位索引
        int r = ThreadLocalRandom.getProbe();
        int rs = runState; //初始狀態爲0
          //Flag2: 若是ws,即ForkJoinPool中的WorkQueue數組已經完成初始化,且根據隨機數定位的index存在workQueue,且cas的方式加鎖成功
        if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
            (q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&
            //對WorkQueue操做加鎖
            U.compareAndSwapInt(q, QLOCK, 0, 1)) {
            ForkJoinTask<?>[] a; int am, n, s;
              //WorkQueue中的任務數組不爲空
            if ((a = q.array) != null && 
                (am = a.length - 1) > (n = (s = q.top) - q.base)) {  //組長度大於任務個數,不須要擴容
                int j = ((am & s) << ASHIFT) + ABASE; //WorkQueue中的任務數組不爲空
                U.putOrderedObject(a, j, task); //向Queue中放入任務
                U.putOrderedInt(q, QTOP, s + 1);//top值加一
                U.putIntVolatile(q, QLOCK, 0);  //對WorkQueue操做解鎖
                  //任務個數小於等於1,那麼此槽位上的線程有可能等待,若是你們都沒任務,可能都在等待,新任務來了,喚醒,起來幹活了
                  if (n <= 1)
                      //喚醒可能存在等待的線程
                    signalWork(ws, q);
                return;
            }
              //任務入隊失敗,前面加鎖了,這裏也要解鎖
            U.compareAndSwapInt(q, QLOCK, 1, 0);
        }
          //Flag3: 不知足上述條件,也就是說上面的這些 WorkQueue[]等都不存在,就要經過這個方法一切從頭開始建立
        externalSubmit(task);
    }

上面加了三處 Flag,爲了讓你們更好的理解代碼仍是有必要作進一步說明的:

Flag1: ThreadLocalRandom 是 ThreadLocal 的衍生物,每一個線程默認的 probe 是 0,當線程調用ThreadLocalRandom.current()時,會初始化 seed 和 probe,維護在線程內部,這裏就知道是生成一個隨機數就好,具體細節仍是值得你們自行看一下

Flag2: 這裏包含的信息仍是很是多的

// 二進制爲:0000 0000 0000 0000 0000 0000 0111 1110 
static final int SQMASK       = 0x007e;        // max 64 (even) slots
  • m 的值表明 WorkQueue 數組的最大下表
  • m & r 會保證隨機數 r 大於 m 的部分不可用
  • m & r & SQMASK 由於 SQMASK 最後一位是 0,最終的結果就會是偶數
  • r != 0 說明當前線程已經初始化過一些內容
  • rs > 0 說明 ForkJoinPool 的 runState 也已經被初始化過

Flag3: 看過 flag2 的描述,你也就很好理解 Flag 3 了,若是是第一次提交任務,必走 Flag 3 的 externalSubmit 方法

externalSubmit

這個方法很長,但沒超過 80 行,具體請看方法註釋

//初始化所須要的一切  
    private void externalSubmit(ForkJoinTask<?> task) {
        int r;                                    // initialize caller's probe
          //生成隨機數
        if ((r = ThreadLocalRandom.getProbe()) == 0) {
            ThreadLocalRandom.localInit();
            r = ThreadLocalRandom.getProbe();
        }
        for (;;) {
            WorkQueue[] ws; WorkQueue q; int rs, m, k;
            boolean move = false;
              // 若是線程池的狀態爲終止狀態,則幫助終止
            if ((rs = runState) < 0) {
                tryTerminate(false, false);     // help terminate
                throw new RejectedExecutionException();
            }
              //Flag1: 再判斷一次狀態是否爲初始化,由於在lockRunState過程當中有可能狀態被別的線程更改了
            else if ((rs & STARTED) == 0 ||     // initialize
                     ((ws = workQueues) == null || (m = ws.length - 1) < 0)) {
                int ns = 0;
                  //Flag1.1: 加鎖
                rs = lockRunState();
                try {
                    if ((rs & STARTED) == 0) {
                          // 初始化stealcounter的值(任務竊取計數器,原子變量)
                        U.compareAndSwapObject(this, STEALCOUNTER, null,
                                               new AtomicLong());
                        // create workQueues array with size a power of two
                          //取config的低16位(確切說是低15位),獲取並行度
                        int p = config & SMASK; // ensure at least 2 slots
                          //Flag1.2: 若是你看過HashMap 的源碼,這個就很好理解了,獲取2次冪大小
                        int n = (p > 1) ? p - 1 : 1;
                        n |= n >>> 1; n |= n >>> 2;  n |= n >>> 4;
                        n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;
                          //初始化 WorkQueue 數組
                        workQueues = new WorkQueue[n];
                          // 標記初始化完成
                        ns = STARTED;
                    }
                } finally {
                      // 解鎖
                    unlockRunState(rs, (rs & ~RSLOCK) | ns);
                }
            }
          
              //Flag2 上面分析過,取偶數位槽位,將任務放進偶數槽位
            else if ((q = ws[k = r & m & SQMASK]) != null) {
                  // 對 WorkQueue 加鎖
                if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {
                    ForkJoinTask<?>[] a = q.array;
                    int s = q.top;
                      // 初始化任務提交標識
                    boolean submitted = false; // initial submission or resizing
                    try {                      // locked version of push
                          //計算內存偏移量,聽任務,更新top值
                        if ((a != null && a.length > s + 1 - q.base) ||
                            (a = q.growArray()) != null) {
                            int j = (((a.length - 1) & s) << ASHIFT) + ABASE;
                            U.putOrderedObject(a, j, task);
                            U.putOrderedInt(q, QTOP, s + 1);
                              //提交任務成功
                            submitted = true;
                        }
                    } finally {
                          //WorkQueue解鎖
                        U.compareAndSwapInt(q, QLOCK, 1, 0);
                    }
                      // 任務提交成功了
                    if (submitted) {
                          //天然要喚醒可能存在等待的線程來處理任務了
                        signalWork(ws, q);
                        return;
                    }
                }
                  //任務提交沒成功,能夠從新計算隨機數,再走一次流程
                move = true;                   // move on failure
            }
              //Flag3: 接Flag2,若是找到的槽位是空,則要初始化一個WorkQueue
            else if (((rs = runState) & RSLOCK) == 0) { // create new queue
                q = new WorkQueue(this, null);
                  // 設置工做隊列的竊取線索值
                q.hint = r;
                  // 如上面 WorkQueue 中config 的介紹,記錄當前WorkQueue在WorkQueue[]數組中的值,和隊列模式
                q.config = k | SHARED_QUEUE;
                  // 初始化爲 inactive 狀態
                q.scanState = INACTIVE;
                  //加鎖
                rs = lockRunState();           // publish index
                if (rs > 0 &&  (ws = workQueues) != null &&
                    k < ws.length && ws[k] == null)
                    ws[k] = q;                 // else terminated
                  //解鎖
                unlockRunState(rs, rs & ~RSLOCK);
            }
            else
                move = true;                   // move if busy
            if (move)
                r = ThreadLocalRandom.advanceProbe(r);
        }
    }

Flag1.1 : 有個細節須要說一下,咱們在 Java AQS隊列同步器以及ReentrantLock的應用 時提到過使用鎖的範式以及爲何要這樣用,ForkJoinPool 這裏一樣遵循這種範式

Lock lock = new ReentrantLock();
lock.lock();
try{
    ...
}finally{
    lock.unlock();
}

Flag1.2: 簡單描述這個過程,就是根據不一樣的並行度來初始化不一樣大小的 WorkQueue[]數組,數組大小要求是 2 的 n 次冪,因此給你們個表格直觀理解一下並行度和隊列容量的關係:

並行度p 容量
1,2 4
3,4 8
5 ~ 8 16
9 ~ 16 32

Flag 1,2,3: 若是你理解了上面這個方法,很顯然,第一次執行這個方法內部的邏輯順序應該是 Flag1——> Flag3——>Flag2

externalSubmit 若是任務成功提交,就會調用 signalWork 方法了

signalWork

前面鋪墊的知識要大規模派上用場(一大波殭屍來襲),are you ready?

若是 ForkJoinPool 的 ctl 成員變量的做用已經忘了,趕忙向上翻從新記憶一下

//常量值
static final int SS_SEQ       = 1 << 16;       // version count
   

final void signalWork(WorkQueue[] ws, WorkQueue q) {
        long c; int sp, i; WorkQueue v; Thread p;
          // ctl 小於零,說明活動的線程數 AC 不夠
        while ((c = ctl) < 0L) {                       // too few active
              // 取ctl的低32位,若是爲0,說明沒有等待的線程
            if ((sp = (int)c) == 0) {                  // no idle workers
                  // 取TC的高位,若是不等於0,則說明目前的工做着尚未達到並行度
                if ((c & ADD_WORKER) != 0L)            // too few workers
                      //添加 Worker,也就是說要建立線程了
                    tryAddWorker(c);
                break;
            }
              //未開始或者已中止,直接跳出
            if (ws == null)                            // unstarted/terminated
                break;
              //i=空閒線程棧頂端所屬的工做隊列索引
            if (ws.length <= (i = sp & SMASK))         // terminated
                break;
            if ((v = ws[i]) == null)                   // terminating
                break;
          
              //程序執行到這裏,說明有空閒線程,計算下一個scanState,增長了版本號,而且調整爲 active 狀態
            int vs = (sp + SS_SEQ) & ~INACTIVE;        // next scanState
            int d = sp - v.scanState;                  // screen CAS
            //計算下一個ctl的值,活動線程數 AC + 1,經過stackPred取得前一個WorkQueue的索引,從新設置回sp,行程最終的ctl值
              long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred);
              //更新 ctl 的值
            if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) {
                v.scanState = vs;                      // activate v
                  //若是有線程阻塞,則調用unpark喚醒便可 
                  if ((p = v.parker) != null)
                    U.unpark(p);
                break;
            }
              //沒有任務,直接跳出
            if (q != null && q.base == q.top)          // no more work
                break;
        }
    }

假設程序剛開始執行,那麼活動線程數以及總線程數確定都沒達到並行度要求,這時就會調用 tryAddWorker 方法了

tryAddWorker

tryAddWorker 的邏輯就很是簡單了,由於是操做線程池,一樣會用到 lockRunState/unlockRunState 的鎖控制

private void tryAddWorker(long c) {
          //初始化添加worker表識
        boolean add = false;
        do {
              //由於要添加Worker,因此AC和TC都要加一
            long nc = ((AC_MASK & (c + AC_UNIT)) |
                       (TC_MASK & (c + TC_UNIT)));
              //ctl還沒被改變
            if (ctl == c) {
                int rs, stop;                 // check if terminating
                if ((stop = (rs = lockRunState()) & STOP) == 0)
                      //更新ctl 的值,
                    add = U.compareAndSwapLong(this, CTL, c, nc);
                unlockRunState(rs, rs & ~RSLOCK);
                if (stop != 0)
                    break;
                  //ctl值更新成功,開始真正的建立Worker
                if (add) {
                    createWorker();
                    break;
                }
            }
         // 從新獲取ctl,而且沒有達到最大線程數,而且沒有空閒的線程
        } while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0);
    }

一切順利,就要調用 createWorker 方法來建立真正的 Worker 了,形勢逐漸明朗

createWorker

介紹過了 WorkerQueue 和 ForkJoinTask,上文說的三個重要角色中的最後一個 ForkJoinWorkerThread 終於登場了

private boolean createWorker() {
        ForkJoinWorkerThreadFactory fac = factory;
        Throwable ex = null;
        ForkJoinWorkerThread wt = null;
        try {
              //若是工廠已經存在了,就用factory來建立線程,會去註冊線程,這裏的this就是ForkJoinPool對象
            if (fac != null && (wt = fac.newThread(this)) != null) {
              //啓動線程  
              wt.start();
                return true;
            }
        } catch (Throwable rex) {
            ex = rex;
        }
          //若是建立線程失敗,就要逆向註銷線程,包括前面對ctl等的操做
        deregisterWorker(wt, ex);
        return false;
    }

Worker 線程是如何與 WorkQueue 對應的,就藏在 fac.newThread(this) 這個方法裏面,下面這點代碼展現一下調用過程

public ForkJoinWorkerThread newThread(ForkJoinPool pool);

static final class DefaultForkJoinWorkerThreadFactory
  implements ForkJoinWorkerThreadFactory {
  public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
    return new ForkJoinWorkerThread(pool);
  }
}

protected ForkJoinWorkerThread(ForkJoinPool pool) {
  // Use a placeholder until a useful name can be set in registerWorker
  super("aForkJoinWorkerThread");
  this.pool = pool;
  this.workQueue = pool.registerWorker(this);
}

很顯然核心內容在 registerWorker 方法裏面了

registerWorker

WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner) {
  this.pool = pool;
  this.owner = owner;
  // Place indices in the center of array (that is not yet allocated)
  base = top = INITIAL_QUEUE_CAPACITY >>> 1;
}  

final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
  UncaughtExceptionHandler handler;
  //這裏線程被設置爲守護線程,由於,當只剩下守護線程時,JVM就會推出
  wt.setDaemon(true);                           // configure thread
  //填補處理異常的handler
  if ((handler = ueh) != null)
    wt.setUncaughtExceptionHandler(handler);
  //建立一個WorkQueue,而且設置當前WorkQueue的owner是當前線程
  WorkQueue w = new WorkQueue(this, wt);
  int i = 0;                                    // assign a pool index
  //又用到了config的知識,提取出咱們指望的WorkQueue模式
  int mode = config & MODE_MASK;
  //加鎖
  int rs = lockRunState();
  try {
    WorkQueue[] ws; int n;                    // skip if no array
    //判斷ForkJoinPool的WorkQueue[]都初始化徹底
    if ((ws = workQueues) != null && (n = ws.length) > 0) {
      //一種魔數計算方式,用以減小衝突
      int s = indexSeed += SEED_INCREMENT;  // unlikely to collide
      //假設WorkQueue的初始長度是16,那這裏的m就是15,最終目的就是爲了獲得一個奇數
      int m = n - 1;
      //和獲得偶數的計算方式同樣,獲得一個小於m的奇數i
      i = ((s << 1) | 1) & m;               // odd-numbered indices
      //若是這個槽位不爲空,說明已經被其餘線程初始化過了,也就是有衝突,選取別的槽位
      if (ws[i] != null) {                  // collision
        int probes = 0;                   // step by approx half n
        //步長加2,也就保證step仍是奇數
        int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2;
        //一直遍歷,直到找到空槽位,若是都遍歷了一遍,那就須要對WorkQueue[]擴容了
        while (ws[i = (i + step) & m] != null) {
          if (++probes >= n) {
            workQueues = ws = Arrays.copyOf(ws, n <<= 1);
            m = n - 1;
            probes = 0;
          }
        }
      }
      //初始化一個隨機數
      w.hint = s;                           // use as random seed
      //如文章前面所說,config記錄索引值和模式
      w.config = i | mode;
      //掃描狀態也記錄爲索引值,如文章前面所說,奇數表示爲scanning狀態
      w.scanState = i;                      // publication fence
      //把初始化好的WorkQueue放到ForkJoinPool的WorkQueue[]數組中
      ws[i] = w;
    }
  } finally {
    //解鎖
    unlockRunState(rs, rs & ~RSLOCK);
  }
  //設置worker的前綴名,用於業務區分
  wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1)));
  //返回當前線程建立的WorkQueue,回到上一層調用棧,也就將WorkQueue註冊到ForkJoinWorkerThread裏面了
  return w;
}

到這裏線程是順利建立成功了,但是若是線程沒有建立成功,就須要 deregisterWorker來作善後工做了

deregisterWorker

deregisterWorker 方法接收剛剛建立的線程引用和異常做爲參數,來作善後工做,將 registerWorker 相關工做撤銷回來

final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
  WorkQueue w = null;
  if (wt != null && (w = wt.workQueue) != null) {
    WorkQueue[] ws;                           // remove index from array
    //獲取當前線程註冊的索引值
    int idx = w.config & SMASK;
    //加鎖
    int rs = lockRunState();
    //若是奇數槽位都不爲空,則清空內容
    if ((ws = workQueues) != null && ws.length > idx && ws[idx] == w)
      ws[idx] = null;
    //解鎖
    unlockRunState(rs, rs & ~RSLOCK);
  }
  long c;                                       // decrement counts
  //死循環式CAS更改ctl的值,將前面AC和TC加1的值再減1,ctl就在那裏,不增不減
  do {} while (!U.compareAndSwapLong
               (this, CTL, c = ctl, ((AC_MASK & (c - AC_UNIT)) |
                                     (TC_MASK & (c - TC_UNIT)) |
                                     (SP_MASK & c))));
  
  //清空WorkQueue,將其中的task取消掉
  if (w != null) {
    w.qlock = -1;                             // ensure set
    w.transferStealCount(this);
    w.cancelAll();                            // cancel remaining tasks
  }
  
  //可能的替換操做
  for (;;) {                                    // possibly replace
    WorkQueue[] ws; int m, sp;
    //若是線程池終止了,那就跳出循環便可
    if (tryTerminate(false, false) || w == null || w.array == null ||
        (runState & STOP) != 0 || (ws = workQueues) == null ||
        (m = ws.length - 1) < 0)              // already terminating
      break;
    
    //當前線程建立失敗,經過sp判斷,若是還存在空閒線程,則調用tryRelease來喚醒這個線程,而後跳出
    if ((sp = (int)(c = ctl)) != 0) {         // wake up replacement
      if (tryRelease(c, ws[sp & m], AC_UNIT))
        break;
    }
    //若是沒空閒線程,而且尚未達到知足並行度的條件,那就得再次嘗試建立一個線程,彌補剛剛的失敗
    else if (ex != null && (c & ADD_WORKER) != 0L) {
      tryAddWorker(c);                      // create replacement
      break;
    }
    else                                      // don't need replacement
      break;
  }
  if (ex == null)                               // help clean on way out
    //處理異常
    ForkJoinTask.helpExpungeStaleExceptions();
  else                                          // rethrow
    ForkJoinTask.rethrow(ex);
}

總之 deregisterWorker 方法從線程池裏註銷線程,清空WorkQueue,同時更新ctl,最後作可能的替換,根據線程池的狀態決定是否找一個本身的替代者:

  • 有空閒線程,則喚醒一個
  • 沒有空閒線程,再次嘗試建立一個新的工做線程

deregisterWorker 線程解釋清楚了是爲了幫助你們完整理解流程,但 registerWorker 成功後的流程還沒走完,咱得繼續,有了 Worker,那就調用 wt.start() 幹活吧

run

ForkJoinWorkerThread 繼承自Thread,調用start() 方法後,天然要調用本身重寫的 run() 方法

public void run() {
  if (workQueue.array == null) { // only run once
    Throwable exception = null;
    try {
      onStart();
      //Work開始工做,處理workQueue中的任務
      pool.runWorker(workQueue);
    } catch (Throwable ex) {
      exception = ex;
    } finally {
      try {
        onTermination(exception);
      } catch (Throwable ex) {
        if (exception == null)
          exception = ex;
      } finally {
        pool.deregisterWorker(this, exception);
      }
    }
  }
}

方法的重點天然是進入到 runWorker

runWorker

runWorker 是很常規的三部曲操做:

  • scan: 經過掃描獲取任務
  • runTask:執行掃描到的任務
  • awaitWork:沒任務進入等待

具體請看註釋

final void runWorker(WorkQueue w) {
          //初始化隊列,並根據須要是否擴容爲原來的2倍
        w.growArray();                   // allocate queue
        int seed = w.hint;               // initially holds randomization hint
        int r = (seed == 0) ? 1 : seed;  // avoid 0 for xorShift
          
          //死循環更新偏移r,爲掃描任務做準備  
          for (ForkJoinTask<?> t;;) {
              //掃描任務
            if ((t = scan(w, r)) != null)
                  //掃描到就執行任務
                w.runTask(t);
              //沒掃描到就等待,若是等也等不到任務,那就跳出循環別死等了
            else if (!awaitWork(w, r))
                break;
            r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
        }
    }

先來看 scan 方法

scan

ForkJoinPool 的任務竊取機制要來了,如何 steal 的,就藏在scan 方法中

private ForkJoinTask<?> scan(WorkQueue w, int r) {
  WorkQueue[] ws; int m;
  //再次驗證workQueue[]數組的初始化狀況
  if ((ws = workQueues) != null && (m = ws.length - 1) > 0 && w != null) {
    //獲取當前掃描狀態
    int ss = w.scanState;                     // initially non-negative
    
    //又一個死循環,注意到出口位置就好
    //和前面邏輯相似,隨機一個起始位置,並賦值給k
    for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0;;) {
      WorkQueue q; ForkJoinTask<?>[] a; ForkJoinTask<?> t;
      int b, n; long c;
      //若是k槽位不爲空
      if ((q = ws[k]) != null) {
        //base-top小於零,而且任務q不爲空
        if ((n = (b = q.base) - q.top) < 0 &&
            (a = q.array) != null) {      // non-empty
          //獲取base的偏移量,賦值給i
          long i = (((a.length - 1) & b) << ASHIFT) + ABASE;
          //從base端獲取任務,和前文的描述的steal搭配上了,是從base端steal
          if ((t = ((ForkJoinTask<?>)
                    U.getObjectVolatile(a, i))) != null &&
              q.base == b) {
            //是active狀態
            if (ss >= 0) {
              //更新WorkQueue中數組i索引位置爲空,而且更新base的值
              if (U.compareAndSwapObject(a, i, t, null)) {
                q.base = b + 1;
                //n<-1,說明當前隊列還有剩餘任務,繼續喚醒可能存在的其餘線程
                if (n < -1)       // signal others
                  signalWork(ws, q);
                //直接返回任務
                return t;
              }
            }
            else if (oldSum == 0 &&   // try to activate
                     w.scanState < 0)
              tryRelease(c = ctl, ws[m & (int)c], AC_UNIT);
          }
          
          //若是獲取任務失敗,則準備換位置掃描
          if (ss < 0)                   // refresh
            ss = w.scanState;
          r ^= r << 1; r ^= r >>> 3; r ^= r << 10;
          origin = k = r & m;           // move and rescan
          oldSum = checkSum = 0;
          continue;
        }
        checkSum += b;
      }
      
      //k一直在變,掃描到最後,若是等於origin,說明已經掃描了一圈還沒掃描到任務
      if ((k = (k + 1) & m) == origin) {    // continue until stable
        if ((ss >= 0 || (ss == (ss = w.scanState))) &&
            oldSum == (oldSum = checkSum)) {
          if (ss < 0 || w.qlock < 0)    // already inactive
            break;
          //準備inactive當前工做隊列
          int ns = ss | INACTIVE;       // try to inactivate
          //活動線程數AC減1
          long nc = ((SP_MASK & ns) |
                     (UC_MASK & ((c = ctl) - AC_UNIT)));
          w.stackPred = (int)c;         // hold prev stack top
          U.putInt(w, QSCANSTATE, ns);
          if (U.compareAndSwapLong(this, CTL, c, nc))
            ss = ns;
          else
            w.scanState = ss;         // back out
        }
        checkSum = 0;
      }
    }
  }
  return null;
}

若是順利掃描到任務,那就要調用 runTask 方法來真正的運行這個任務了

runTask

立刻就接近真相了,steal 到任務了,就乾點正事吧

final void runTask(ForkJoinTask<?> task) {
            if (task != null) {
                scanState &= ~SCANNING; // mark as busy
                  //Flag1: 記錄當前的任務是偷來的,至於如何執行task,是咱們寫在compute方法中的,咱們一會看doExec() 方法
                (currentSteal = task).doExec();
                U.putOrderedObject(this, QCURRENTSTEAL, null); // release for GC
                execLocalTasks();
                ForkJoinWorkerThread thread = owner;
                  //累加偷來的數量,親兄弟明算賬啊,雖然算完也沒啥實際意義
                if (++nsteals < 0)      // collect on overflow
                    transferStealCount(pool);
                  //任務執行完後,就從新更新scanState爲SCANNING
                scanState |= SCANNING;
                if (thread != null)
                    thread.afterTopLevelExec();
            }
        }

Flag1: doExec 方法纔是真正執行任務的關鍵,它是連接咱們自定義 compute 方法的核心,來看 doExec 方法

doExec

形勢一片大好,挺住,揭開 exec 的面紗,就看到本質了

//ForkJoinTask中的抽象方法,RecursiveTask 和 RecursiveAction 都重寫了它
protected abstract boolean exec();

final int doExec() {
  int s; boolean completed;
  if ((s = status) >= 0) {
    try {
      
      completed = exec();
    } catch (Throwable rex) {
      return setExceptionalCompletion(rex);
    }
    if (completed)
      s = setCompletion(NORMAL);
  }
  return s;
}

//RecursiveTask重寫的內容,終於看到咱們文章開頭 demo 中的compute 了
protected final boolean exec() {
  result = compute();
  return true;
}

到這裏,咱們已經看到本質了,繞了這麼一大圈,終於和咱們本身重寫的compute方法聯繫到了一塊兒,真是不容易,可是 runWorker 三部曲還差最後一曲 awaitWork 沒譜,咱們來看看

awaitWork

上面說的是 scan 到了任務,要是沒有scan到任務,那就得將當前線程阻塞一下,具體標註在註釋中,能夠簡單瞭解一下

private boolean awaitWork(WorkQueue w, int r) {
  if (w == null || w.qlock < 0)                 // w is terminating
    return false;
  for (int pred = w.stackPred, spins = SPINS, ss;;) {
    if ((ss = w.scanState) >= 0)
      break;
    else if (spins > 0) {
      r ^= r << 6; r ^= r >>> 21; r ^= r << 7;
      if (r >= 0 && --spins == 0) {         // randomize spins
        WorkQueue v; WorkQueue[] ws; int s, j; AtomicLong sc;
        if (pred != 0 && (ws = workQueues) != null &&
            (j = pred & SMASK) < ws.length &&
            //前驅任務隊列還在
            (v = ws[j]) != null &&        // see if pred parking
            //而且工做隊列已經激活,說明任務來了了
            (v.parker == null || v.scanState >= 0))
          //繼續自旋等一會,別返回false
          spins = SPINS;                // continue spinning
      }
    }
    //自旋以後,再次檢查工做隊列是否終止,如果,退出掃描
    else if (w.qlock < 0)                     // recheck after spins
      return false;
    else if (!Thread.interrupted()) {
      long c, prevctl, parkTime, deadline;
      int ac = (int)((c = ctl) >> AC_SHIFT) + (config & SMASK);
      if ((ac <= 0 && tryTerminate(false, false)) ||
          (runState & STOP) != 0)           // pool terminating
        return false;
      if (ac <= 0 && ss == (int)c) {        // is last waiter
        prevctl = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & pred);
        int t = (short)(c >>> TC_SHIFT);  // shrink excess spares
        if (t > 2 && U.compareAndSwapLong(this, CTL, c, prevctl))
          return false;                 // else use timed wait
        parkTime = IDLE_TIMEOUT * ((t >= 0) ? 1 : 1 - t);
        deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP;
      }
      else
        prevctl = parkTime = deadline = 0L;
      Thread wt = Thread.currentThread();
      U.putObject(wt, PARKBLOCKER, this);   // emulate LockSupport
      w.parker = wt;
      if (w.scanState < 0 && ctl == c)      // recheck before park
        U.park(false, parkTime);
      U.putOrderedObject(w, QPARKER, null);
      U.putObject(wt, PARKBLOCKER, null);
      if (w.scanState >= 0)
        break;
      if (parkTime != 0L && ctl == c &&
          deadline - System.nanoTime() <= 0L &&
          U.compareAndSwapLong(this, CTL, c, prevctl))
        return false;                     // shrink pool
    }
  }
  return true;
}

到這裏,ForkJoinPool 的完整流程算是有個基本瞭解了,可是咱們前面講的這些內容都是從 submission task 做爲切入點的。剛剛聊到的 compute 方法,咱們按照分治算法範式寫了本身的邏輯,具體請回看文中開頭的demo,很關鍵的一點是,咱們在 compute 中調用了 fork 方法,這就給咱們瞭解 worker task 的機會了,繼續來看 fork 方法

fork

Fork 方法的邏輯很簡單,若是當前線程是 ForkJoinWorkerThread 類型,也就是說已經經過上文註冊的 Worker,那麼直接調用 push 方法將 task 放到當前線程擁有的 WorkQueue 中,不然就再調用 externalPush 重走咱們已上說的全部邏輯(你敢再走一遍嗎?)

public final ForkJoinTask<V> fork() {
  Thread t;
  if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
    ((ForkJoinWorkerThread)t).workQueue.push(this);
  else
    ForkJoinPool.common.externalPush(this);
  return this;
}

//push 方法很簡單,這裏就再也不過多解釋了
final void push(ForkJoinTask<?> task) {
  ForkJoinTask<?>[] a; ForkJoinPool p;
  int b = base, s = top, n;
  if ((a = array) != null) {    // ignore if queue removed
    int m = a.length - 1;     // fenced write for task visibility
    U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
    U.putOrderedInt(this, QTOP, s + 1);
    if ((n = s - b) <= 1) {
      if ((p = pool) != null)
        p.signalWork(p.workQueues, this);
    }
    else if (n >= m)
      growArray();
  }
}

有 fork 就有 join,繼續看一下 join 方法()

join

join 的核心調用在 doJoin,可是看到這麼多級聯三元運算符,我慌了

public final V join() {
  int s;
  if ((s = doJoin() & DONE_MASK) != NORMAL)
    reportException(s);
  return getRawResult();
}

private int doJoin() {
  int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
  //status,task 的運行狀態
  return (s = status) < 0 ? s :
  ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
    (w = (wt = (ForkJoinWorkerThread)t).workQueue).
    tryUnpush(this) && (s = doExec()) < 0 ? s :
  wt.pool.awaitJoin(w, this, 0L) :
  externalAwaitDone();
}

咱們將 doJoin 方法用咱們最熟悉的 if/else 作個改動,是否是就豁然開朗了

private int doJoin() {
  int s;
  Thread t;
  ForkJoinWorkerThread wt;
  ForkJoinPool.WorkQueue w;

  if((s = status) < 0) { // 有結果,直接返回
    return s;
  }else {
    if((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {         
      // 若是是 ForkJoinWorkerThread Worker
      if((w = (wt = (ForkJoinWorkerThread) t).workQueue).tryUnpush(this) // 相似上面提到的 scan,可是是專項嘗試從本工做隊列裏取出等待的任務
         // 取出了任務,就去執行它,並返回結果
         && (s = doExec()) < 0) { 
        return s;
      }else {
        // 也有可能別的線程把這個任務偷走了,那就執行內部等待方法
        return wt.pool.awaitJoin(w, this, 0L); 
      }
    }else { 
      // 若是不是 ForkJoinWorkerThread,執行外部等待方法
      return externalAwaitDone();
    }
  }

}

其中 awaitJoin 和 externalAwaitDone 都用到了 Helper(幫助) 和 Compensating(補償) 兩種策略,這兩種策略你們徹底能夠自行閱讀了,尤爲是 awaitJoin 方法,強烈推薦你們自行閱讀,其中 pop 的過程在這裏,這裏再也不展開

到這裏,有關 ForkJoinPool 相關的內容就算是結束了,爲了讓你們有個更好的理解 fork/join 機制,咱們仍是畫幾張圖解釋一下

Fork/Join 圖解

假設咱們的大任務是 Task(8), 最終被分治成可執行的最小單元是 Task(1)

按照分治思想拆分任務的總體目標就是這樣滴:

從外部先提交一個大的 Task(8),將其放在偶數槽位中(請注意顏色對應

不知足並行度,會建立 Worker 1 來掃描,並從 base 端竊取到任務 task(8),執行到 compute, fork

出兩個 task(4), 並 push到 WorkQueue 中

在執行任務時始終會確認是否知足並行度要求,若是沒有就會繼續建立新的Worker,與此同時,也會繼續 fork 任務,直到最小單元。Worker1 會從 top 端 pop 出來 task(4) 來繼續 compute 和 fork,並從新 push 到 WorkQueue 中

task(2) 還不是最小單元,因此會繼續 pop 出 task(2),並最終 fork 出兩個 task(1) push 到 WorkQueue中

task(1) 已是最小粒度了,能夠直接 pop 出來執行,獲取最終結果;在 Worker1 進行這些 pop 操做的同時,爲了知足並行度要求也會建立的其餘Worker,好比 Worker 2,這時 Worker2 會從 Worker 1 所在隊列的 base 端竊取任務

Worker 2 依舊是按照這個規則進行 pop->fork,到最終能夠 exec 任務,假設 Worker 1 的任務先執行完,要 join 結果,當 join task(4) 時,經過 hint 定位到是誰偷走了 task(4),這時順藤摸瓜找到 Worker2,若是 Worker2 還有任務沒執行完,Worker1 再竊取回來幫着執行,這樣互幫互助,最終快速完成任務

靈魂追問

  1. 爲何說 ForkjoinPool 效率要更高?同時建議使用 commonPool?
  2. JDK1.8 Stream 底層就充分用到了 ForkJoinPool,你知道還有哪裏用到了 ForkJoinPool 了嗎?
  3. ForkJoinPool 最多會有多少個槽位?
  4. 下面代碼有人說不能充分利用 ForkJoinPool,多個 task 的提交要用 invokeAll,你知道爲何嗎?若是不用 invokeAll,要怎樣使用 fork/join 呢?
protected Long compute() {
    if (任務足夠小) {
        return cal();
    }
  
    SumTask subtask1 = new SumTask(...);
    SumTask subtask2 = new SumTask(...);
  
    // 分別對子任務調用fork():
    subtask1.fork();
    subtask2.fork();
  
    // 分別獲取合併結果:
    Long subresult1 = subtask1.join();
    Long subresult2 = subtask2.join();
  
    return subresult1 + subresult2;
}

總結

這又是一篇長文,不少小夥伴私下都建議我將長文拆開,一方面讀者好消化,另外一方面我本身也在數量的體現上變得高產。幾回想拆開,但好多文章拆開就失去了連續性(你們都有遺忘曲線)。過年沒回老家,就有時間擼文章了。爲了更好的理解源碼,文章的基礎鋪墊內容不少,看到這,你應該很累了,想要將更零散的知識點串起來,那就多看代碼註釋回味一下,而後一塊兒膜拜 Doug Lea 吧

參考

  1. Java 併發編程實戰
  2. https://www.liaoxuefeng.com/a...
  3. https://www.cnblogs.com/aniao...
  4. https://cloud.tencent.com/dev...
相關文章
相關標籤/搜索