Java 多線程(5):Fork/Join 型線程池與 Work-Stealing 算法

JDK 1.7 時,標準類庫添加了 ForkJoinPool,做爲對 Fork/Join 型線程池的實現。Fork 在英文中有 分叉 的意思,而 Join合併 的意思。ForkJoinPool 的功能也是如此:Fork 將大任務分叉爲多個小任務,而後讓小任務執行,Join 是得到小任務的結果,而後進行合併,將合併的結果做爲大任務的結果 —— 而且這會是一個遞歸的過程 —— 由於任務若是足夠大,能夠將任務多級分叉直到任務足夠小。算法

Fork-Join

因而可知,ForkJoinPool 能夠知足 並行 地實現 分治算法(Divide-and-Conquer) 的須要。併發


ForkJoinPool 的類圖以下:
ForkJoinPool 的類圖異步

能夠看到 ForkJoinPool 實現了 ExecutorService 接口,因此首先 ForkJoinPool 也是一個 ExecutorService線程池)。於是 RunnableCallable 類型的任務,ForkJoinPool 也能夠經過 submitinvokeAllinvokeAny 等方法來執行。可是標準類庫還爲 ForkJoinPool 定義了一種新的任務,它就是 ForkJoinTask<V>ide

ForkJoinTask 類圖:
ForkJoinTask 類圖高併發

ForkJoinTask<V> 用來專門定義 Fork/Join 型任務 —— 完成將大任務分割爲小任務以及合併結果的工做。通常咱們不須要直接使用 ForkJoinTask<V>,而是經過繼承它的子類 RecursiveActionRecursiveTask 並實現對應的抽象方法 —— compute ,來定義咱們本身的任務。其中,RecursiveAction 是不帶返回值的 Fork/Join 型任務,因此使用此類任務並不產生結果,也就不涉及到結果的合併;而 RecursiveTask 是帶返回值的 Fork/Join 型任務,使用此類任務的話,在任務結束前,咱們須要進行結果的合併。其中,經過 ForkJoinTask<V>fork 方法,咱們能夠產生子任務並執行;經過 join 方法,咱們能夠得到子任務的結果。this


ForkJoinPool 可使用三種方法用來執行 ForkJoinTaskspa

invoke 方法:
invoke 方法線程

invoke 方法用來執行一個帶返回值的任務(一般繼承自RecursiveTask),而且該方法是阻塞的,直到任務執行完畢,該方法纔會中止阻塞並返回任務的執行結果。3d

submit 方法:
submit 方法code

除了從 ExecutorService 繼承的 submit 方法外,ForkJoinPool 還定義了用來執行 ForkJoinTasksubmit 方法 —— 通常該 submit 方法用來執行帶返回值的ForkJoinTask(一般繼承自RecursiveTask)。該方法是非阻塞的,調用以後將任務提交給 ForkJoinPool 去執行便當即返回,返回的即是已經提交到 ForkJoinPool 去執行的 task —— 由類圖可知 ForkJoinTask 實現了 Future 接口,因此能夠直接經過 task 來和已經提交的任務進行交互。

execute 方法:
execute 方法

除了從 Executor 得到的 execute 方法外,ForkJoinPool 也定義了用來執行ForkJoinTaskexecute 方法 —— 通常該 execute 方法用來執行不帶返回值的ForkJoinTask(一般繼承自RecursiveAction) ,該方法一樣是非阻塞的。


如今讓咱們來實踐下 ForkJoinPool 的功能:計算 π 的值。計算 π 的值有一個經過多項式方法,即:π = 4 * (1 - 1/3 + 1/5 - 1/7 + 1/9 - ……),並且多項式的項數越多,計算出的 π 的值越精確。

首先咱們定義用來估算 π 的 PiEstimateTask

class PiEstimateTask extends RecursiveTask<Double> {

    private final long begin;
    private final long end;
    private final long threshold; // 分割任務的臨界值

    public PiEstimateTask(long begin, long end, long threshold) {
        this.begin = begin;
        this.end = end;
        this.threshold = threshold;
    }

    @Override
    protected Double compute() {  // 實現 compute 方法
        if (end - begin <= threshold) {  // 臨界值之下,再也不分割,直接計算

            int sign; // 符號,多項式中偶數位取 1,奇數位取 -1(位置從 0 開始)
            double result = 0.0;
            
            for (long i = begin; i < end; i++) {
                sign = (i & 1) == 0 ? 1 : -1;
                result += sign / (i * 2.0 + 1);
            }

            return result * 4;
        }

        // 分割任務
        long middle = (begin + end) / 2;
        PiEstimateTask leftTask = new PiEstimateTask(begin, middle, threshold);
        PiEstimateTask rightTask = new PiEstimateTask(middle, end, threshold);

        leftTask.fork();  // 異步執行 leftTask
        rightTask.fork(); // 異步執行 rightTask

        double leftResult = leftTask.join();   // 阻塞,直到 leftTask 執行完畢返回結果
        double rightResult = rightTask.join(); // 阻塞,直到 rightTask 執行完畢返回結果

        return leftResult + rightResult; // 合併結果
    }

}

而後咱們使用 ForkJoinPoolinvoke 執行 PiEstimateTask

public class ForkJoinPoolTest {

    public static void main(String[] args) throws Exception {
        ForkJoinPool forkJoinPool = new ForkJoinPool(4);
    
        // 計算 10 億項,分割任務的臨界值爲 1 千萬
        PiEstimateTask task = new PiEstimateTask(0, 1_000_000_000, 10_000_000);
    
        double pi = forkJoinPool.invoke(task); // 阻塞,直到任務執行完畢返回結果
    
        System.out.println("π 的值:" + pi);
        
        forkJoinPool.shutdown(); // 向線程池發送關閉的指令
    }
}

運行結果:
運行結果

咱們也可使用 submit 方法異步的執行任務(此處 submit 方法返回的 future 指向的對象即提交任務時的 task):

public static void main(String[] args) throws Exception {
    ForkJoinPool forkJoinPool = new ForkJoinPool(4);

    PiEstimateTask task = new PiEstimateTask(0, 1_000_000_000, 10_000_000);
    Future<Double> future = forkJoinPool.submit(task); // 不阻塞
    
    double pi = future.get();
    System.out.println("π 的值:" + pi);
    System.out.println("future 指向的對象是 task 嗎:" + (future == task));
    
    forkJoinPool.shutdown(); // 向線程池發送關閉的指令
}

運行結果:
運行結果


值得注意的是,選取一個合適的分割任務的臨界值,對 ForkJoinPool 執行任務的效率有着相當重要的影響。臨界值選取過大,任務分割的不夠細,則不能充分利用 CPU;臨界值選取太小,則任務分割過多,可能產生過多的子任務,致使過多的線程間的切換和加劇 GC 的負擔從而影響了效率。因此,須要根據實際的應用場景選擇一個合適的分割任務的臨界值。


ForkJoinPool 相比於 ThreadPoolExecutor,還有一個很是重要的特色(優勢)在於,ForkJoinPool具備 Work-Stealing (工做竊取)的能力。所謂 Work-Stealing,在 ForkJoinPool 中的實現爲:線程池中每一個線程都有一個互不影響的任務隊列(雙端隊列),線程每次都從本身的任務隊列的隊頭中取出一個任務來運行;若是某個線程對應的隊列已空而且處於空閒狀態,而其餘線程的隊列中還有任務須要處理可是該線程處於工做狀態,那麼空閒的線程能夠從其餘線程的隊列的隊尾取一個任務來幫忙運行 —— 感受就像是空閒的線程去偷人家的任務來運行同樣,因此叫 「工做竊取」。

Work-Stealing 的適用場景是不一樣的任務的耗時相差比較大,即某些任務須要運行較長時間,而某些任務會很快的運行完成,這種狀況下用 Work-Stealing 很合適;可是若是任務的耗時很平均,則此時 Work-Stealing 並不適合,由於竊取任務時不一樣線程須要搶佔鎖,這可能會形成額外的時間消耗,並且每一個線程維護雙端隊列也會形成更大的內存消耗。因此 ForkJoinPool 並非 ThreadPoolExecutor 的替代品,而是做爲對 ThreadPoolExecutor 的補充。


總結:
ForkJoinPoolThreadPoolExecutor 都是 ExecutorService(線程池),但ForkJoinPool 的獨特色在於:

  1. ThreadPoolExecutor 只能執行 RunnableCallable 任務,而 ForkJoinPool 不只能夠執行 RunnableCallable 任務,還能夠執行 Fork/Join 型任務 —— ForkJoinTask —— 從而知足並行地實現分治算法的須要;
  2. ThreadPoolExecutor 中任務的執行順序是按照其在共享隊列中的順序來執行的,因此後面的任務須要等待前面任務執行完畢後才能執行,而 ForkJoinPool 每一個線程有本身的任務隊列,並在此基礎上實現了 Work-Stealing 的功能,使得在某些狀況下 ForkJoinPool 能更大程度的提升併發效率。
相關文章
相關標籤/搜索