Java 併發之 Fork/Join 框架

什麼是 Fork/Join 框架

Fork/Join 框架是一種在 JDk 7 引入的線程池,用於並行執行把一個大任務拆成多個小任務並行執行,最終彙總每一個小任務結果獲得大任務結果的特殊任務。經過其命名也很容易看出框架主要分爲 Fork 和 Join 兩個階段,第一階段 Fork 是把一個大任務拆分爲多個子任務並行的執行,第二階段 Join 是合併這些子任務的全部執行結果,最後獲得大任務的結果。算法

這裏不難發現其執行主要流程:首先判斷一個任務是否足夠小,若是任務足夠小,則直接計算,不然,就拆分紅幾個更小的小任務分別計算,這個過程能夠反覆的拆分紅一系列小任務。Fork/Join 框架是一種基於 分治 的算法,經過拆分大任務成多個獨立的小任務,而後並行執行這些小任務,最後合併小任務的結果獲得大任務的最終結果,經過並行計算以提升效率。。數組

Fork/Join 框架使用示例

下面經過一個計算列表中全部元素的總和的示例來看看 Fork/Join 框架是如何使用的,總的思路是:將這個列表分紅許多子列表,而後對每一個子列表的元素進行求和,而後,咱們再計算全部這些值的總和就獲得原始列表的和了。Fork/Join 框架中定義了 ForkJoinTask 來表示一個 Fork/Join 任務,其提供了 fork()、join() 等操做,一般狀況下,咱們並不須要直接繼承這個 ForkJoinTask 類,而是使用框架提供的兩個 ForkJoinTask 的子類:數據結構

  • RecursiveAction 用於表示沒有返回結果的 Fork/Join 任務。
  • RecursiveTask 用於表示有返回結果的 Fork/Join 任務。

很顯然,在這個示例中是須要返回結果的,能夠定義 SumAction 類繼承自 RecursiveTask,代碼入下:多線程

/**
 * @author mghio
 * @since 2021-07-25
 */
public class SumTask extends RecursiveTask<Long> {

  private static final int SEQUENTIAL_THRESHOLD = 50;

  private final List<Long> data;

  public SumTask(List<Long> data) {
    this.data = data;
  }

  @Override
  protected Long compute() {
    if (data.size() <= SEQUENTIAL_THRESHOLD) {
      long sum = computeSumDirectly();
      System.out.format("Sum of %s: %d\n", data.toString(), sum);
      return sum;
    } else {
      int mid = data.size() / 2;
      SumTask firstSubtask = new SumTask(data.subList(0, mid));
      SumTask secondSubtask = new SumTask(data.subList(mid, data.size()));
      // 執行子任務
      firstSubtask.fork();
      secondSubtask.fork();
      // 等待子任務執行完成,並獲取結果
      long firstSubTaskResult = firstSubtask.join();
      long secondSubTaskResult = secondSubtask.join();
      return firstSubTaskResult + secondSubTaskResult;
    }
  }

  private long computeSumDirectly() {
    long sum = 0;
    for (Long l : data) {
      sum += l;
    }
    return sum;
  }

  public static void main(String[] args) {
    Random random = new Random();

    List<Long> data = random
        .longs(1_000, 1, 100)
        .boxed()
        .collect(Collectors.toList());

    ForkJoinPool pool = new ForkJoinPool();
    SumTask task = new SumTask(data);
    pool.invoke(task);

    System.out.println("Sum: " + pool.invoke(task));
  }
}

這裏當列表大小小於 SEQUENTIAL_THRESHOLD 變量的值(閾值)時視爲小任務,直接計算求和列表元素結果,不然再次拆分爲小任務,運行結果以下:併發

1.png

經過這個示例代碼能夠發現,Fork/Join 框架 中 ForkJoinTask 任務與日常的通常任務的主要不一樣點在於:ForkJoinTask 須要實現抽象方法 compute() 來定義計算邏輯,在這個方法裏通常通用的實現模板是,首先先判斷當前任務是不是小任務,若是是,就執行執行任務,若是不是小任務,則再次拆分爲兩個子任務,而後當每一個子任務調用 fork() 方法時,會再次進入到 compute() 方法中,檢查當前任務是否須要再拆分爲子任務,若是已是小任務,則執行當前任務並返回結果,不然繼續分割,最後調用 join() 方法等待全部子任務執行完成並得到執行結果。僞代碼以下:框架

if (problem is small) {
  directly solve problem.
} else {
  Step 1. split problem into independent parts.
  Step 2. fork new subtasks to solve each part.
  Step 3. join all subtasks.
  Step 4. compose result from subresults.
}

Fork/Join 框架設計

Fork/Join 框架核心思想是把一個大任務拆分紅若干個小任務,而後彙總每一個小任務的結果最終獲得大任務的結果,若是讓你設計一個這樣的框架,你會如何實現呢?(建議思考一下),Fork/Join 框架的整個流程正如其名所示,分爲兩個步驟:dom

  1. 大任務分割 須要有這麼一個的類,用來將大任務拆分爲子任務,可能一次拆分後的子任務仍是比較大,須要屢次拆分,直到拆分出來的子任務符合咱們定義的小任務才結束。
  2. 執行任務併合並任務結果 第一步拆分出來的子任務分別存放在一個個 雙端隊列 裏面(P.S. 這裏爲何要使用雙端隊列請看下文),而後每一個隊列啓動一個線程從隊列中獲取任務執行。這些子任務的執行結果都會放到一個統一的隊列中,而後再啓動一個線程從這個隊列中拿數據,最後合併這些數據返回。

Fork/Join 框架使用了以下兩個類來完成以上兩個步驟:異步

  • ForkJoinTask 類 在上文的實例中也有提到,表示 ForkJoin 任務,在使用框架時首先必須先定義任務,一般只須要繼承自 ForkJoinTask 類的子類 RecursiveAction(無返回結果) 或者 RecursiveTask(有返回結果)便可。
  • ForkJoinPool 從名字也能夠猜到一二了,就是用來執行 ForkJoinTask 的線程池。大任務拆分出的子任務會添加到當前線程的雙端隊列的頭部。

喜歡思考的你,心中想必會想到這麼一種場景,當咱們須要完成一個大任務時,會先把這個大任務拆分爲多個獨立的子任務,這些子任務會放到獨立的隊列中,併爲每一個隊列都建立一個單獨的線程去執行隊列裏的任務,即這裏線程和隊列時一對一的關係,那麼當有的線程可能會先把本身隊列的任務執行完成了,而有的線程則沒有執行完成,這就致使一些先執行完任務的線程乾等了,這是個好問題。ide

既然是作併發的,確定要最大程度壓榨計算機的性能,對於這種場景併發大師 Doug Lea 使用了工做竊取算法處理,使用工做竊取算法後,先完成本身隊列中任務的線程會去其它線程的隊列中」竊取「一個任務來執行,哈哈,一方有難,八方支援。可是此時這個線程和隊列的持有線程會同時訪問同一個隊列,因此爲了減小竊取任務的線程和被竊取任務的線程之間的競爭,ForkJoin 選擇了雙端隊列這種數據結構,這樣就能夠按照這種規則執行任務了:被竊取任務的線程始終從隊列頭部獲取任務並執行,竊取任務的線程使用從隊列尾部獲取任務執行。這個算法在絕大部分狀況下均可以充分利用多線程進行並行計算,可是在雙端隊列裏只有一個任務等極端狀況下仍是會存在必定程度的競爭。性能

2.png

Fork/Join 框架實現原理

Fork/Join 框架的實現核心是 ForkJoinPool 類,該類的重要組成部分爲 ForkJoinTask 數組和 ForkJoinWorkerThread 數組,其中 ForkJoinTask 數組用來存放框架使用者給提交給 ForkJoinPool 的任務,ForkJoinWorkerThread 數組則負責執行這些任務。任務有以下四種狀態:

  • NORMAL 已完成
  • CANCELLED 被取消
  • SIGNAL 信號
  • EXCEPTIONAL 發生異常

下面來看看這兩個類的核心方法實現原理,首先來看 ForkJoinTask 的 fork() 方法,源碼以下:

6.png

方法對於 ForkJoinWorkerThread 類型的線程,首先會調用 ForkJoinWorkerThread 的 workQueue 的 push() 方法異步的去執行這個任務,而後立刻返回結果。繼續跟進 ForkJoinPool 的 push() 方法,源碼以下:

8.png

方法將當前任務添加到 ForkJoinTask 任務隊列數組中,而後再調用 ForkJoinPool 的 signalWork 方法建立或者喚醒一個工做線程來執行該任務。而後再來看看 ForkJoinTask 的 join() 方法,方法源碼以下:

3.png

4.png

方法首先調用了 doJoin() 方法,該方法返回當前任務的狀態,根據返回的任務狀態作不一樣的處理:

  1. 已完成狀態則直接返回結果
  2. 被取消狀態則直接拋出異常(CancellationException)
  3. 發生異常狀態則直接拋出對應的異常

繼續跟進 doJoin() 方法,方法源碼以下:

5.png

方法首先判斷當前任務狀態是否已經執行完成,若是執行完成則直接返回任務狀態。若是沒有執行完成,則從任務數組中(workQueue)取出任務並執行,任務執行完成則設置任務狀態爲 NORMAL,若是出現異常則記錄異常並設置任務狀態爲 EXCEPTIONAL(在 doExec() 方法中)。

總結

本文主要介紹了 Java 併發框架中的 Fork/Join 框架的基本原理和其使用的工做竊取算法(work-stealing)、設計方式和部分實現源碼。Fork/Join 框架在 JDK 的官方標準庫中也有應用。好比 JDK 1.8+ 標準庫提供的 Arrays.parallelSort(array) 能夠進行並行排序,它的原理就是內部經過 Fork/Join 框架對大數組分拆進行並行排序,能夠提升排序的速度,還有集合中的 Collection.parallelStream() 方法底層也是基於 Fork/Join 框架實現的,最後就是定義小任務的閾值每每是須要經過測試驗證才能合理給出,而且保證程序能夠達到最好的性能。

相關文章
相關標籤/搜索