ForkJoinPool 的使用以及原理

ForkJoinPool 是Java 1.7 引入的一種新的併發框架—— ForkJoin Framework。如下是重要的幾點特性:java

  1. ForkJoinPool 不是爲了替代 ExecutorService,而是它的補充,在某些應用場景下性能比 ExecutorService 更好。(見 Java Tip: When to use ForkJoinPool vs ExecutorService )
  2. ForkJoinPool 主要用於實現「分而治之」的算法,特別是分治以後遞歸調用的函數,例如 quick sort 等。
  3. ForkJoinPool 最適合的是計算密集型的任務,若是存在 I/O,線程間同步,sleep() 等會形成線程長時間阻塞的狀況時,最好配合使用 ManagedBlocker。

 

ForkJoinPool & ForkJoinTask 概述:算法

  • ForkJoinTask:咱們要使用 ForkJoin 框架,必須首先建立一個 ForkJoin 任務。它提供在任務中執行 fork() 和 join() 操做的機制,一般狀況下咱們不須要直接繼承 ForkJoinTask 類,而只須要繼承它的子類,ForkJoin 框架提供瞭如下兩個子類:
    • RecursiveAction:用於沒有返回結果的任務。
    • RecursiveTask :用於有返回結果的任務。
  • ForkJoinPool :ForkJoinTask 須要經過 ForkJoinPool 來執行,任務分割出的子任務會添加到當前工做線程所維護的雙端隊列中,進入隊列的頭部。當一個工做線程的隊列裏暫時沒有任務時,它會隨機從其餘工做線程的隊列的尾部獲取一個任務。

引用:https://www.infoq.cn/article/fork-join-introduction併發

 

ForkJoinPool 的使用

好比計算1至1000的正整數之和,使用ForkJoinPool 如何進行並行的計算。框架

package common.forkjoinpool;

public interface Calculator {
    long sumUp(long[] numbers);
}
package common.forkjoinpool;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

public class ForkJoinCalculator implements Calculator {
    private ForkJoinPool pool;

    public ForkJoinCalculator() {
        // 也可使用公用的 ForkJoinPool:
        // pool = ForkJoinPool.commonPool()
        pool = new ForkJoinPool();
    }

    @Override
    public long sumUp(long[] numbers) {
        return pool.invoke(new SumTask(numbers, 0, numbers.length - 1));
    }

    private static class SumTask extends RecursiveTask<Long> {
        private long[] numbers;
        private int from;
        private int to;

        public SumTask(long[] numbers, int from, int to) {
            this.numbers = numbers;
            this.from = from;
            this.to = to;
        }


        @Override
        protected Long compute() {

            // 當須要計算的數字小於6時,直接計算結果
            if (to - from < 6) {
                long total = 0;
                for (int i = from; i <= to; i++) {
                    total += numbers[i];
                }
                return total;
                // 不然,把任務一分爲二,遞歸計算
            } else {
                int middle = (from + to) / 2;
                SumTask taskLeft = new SumTask(numbers, from, middle);
                SumTask taskRight = new SumTask(numbers, middle + 1, to);
                taskLeft.fork();
                taskRight.fork();
                return taskLeft.join() + taskRight.join();
            }
        }
    }
}
package common.forkjoinpool;

import java.util.stream.LongStream;

public class Main {

    public static void main(String[] args) {
        long[] numbers = LongStream.rangeClosed(1, 1000).toArray();
        Calculator calculator = new ForkJoinCalculator();
        System.out.println(calculator.sumUp(numbers)); // 打印結果500500
    }
}

這段代碼的核心方法是 less

@Override
protected Long compute() {

    // 當須要計算的數字小於6時,直接計算結果
    if (to - from < 6) {
        long total = 0;
        for (int i = from; i <= to; i++) {
            total += numbers[i];
        }
        return total;
        // 不然,把任務一分爲二,遞歸計算
    } else {
        int middle = (from + to) / 2;
        SumTask taskLeft = new SumTask(numbers, from, middle);
        SumTask taskRight = new SumTask(numbers, middle + 1, to);
        taskLeft.fork();
        taskRight.fork();
        return taskLeft.join() + taskRight.join();
    }
}

經過 compute() 方法,分解任務,分而治之。async

 

ForkJoinTask 兩種實現類型

ForkJoinTask 繼承關係以下,ide

RecursiveTask 和 RecursiveAction 的區別:函數

RecursiveAction性能

它是一種沒有任何返回值的任務。只是作一些工做,好比寫數據到磁盤,而後就退出了。 一個RecursiveAction能夠把本身的工做分割成更小的幾塊, 這樣它們能夠由獨立的線程或者CPU執行。 咱們能夠經過繼承來實現一個RecursiveAction。ui

RecursiveTask

它是一種會返回結果的任務。能夠將本身的工做分割爲若干更小任務,並將這些子任務的執行合併到一個集體結果。 能夠有幾個水平的分割和合並。

RecursiveAction以下使用方法,

package common.forkjointask;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.TimeUnit;

public class MyRecursiveAction extends RecursiveAction {

    /**
     * 每一個"小任務"最多隻打印20個數
     */
    private static final int MAX = 20;

    private int start;
    private int end;

    public MyRecursiveAction(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected void compute() {
        //當end-start的值小於MAX時,開始打印
        if ((end - start) < MAX) {
            for (int i = start; i < end; i++) {
                System.out.println(Thread.currentThread().getName() + "-i的值" + i);
            }
        } else {
            // 將大任務分解成兩個小任務
            int middle = (start + end) / 2;
            MyRecursiveAction left = new MyRecursiveAction(start, middle);
            MyRecursiveAction right = new MyRecursiveAction(middle, end);
            left.fork();
            right.fork();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        // 建立包含Runtime.getRuntime().availableProcessors()返回值做爲個數的並行線程的ForkJoinPool
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        // 提交可分解的PrintTask任務
        forkJoinPool.submit(new MyRecursiveAction(0, 1000));

        while (!forkJoinPool.isTerminated()) {
            forkJoinPool.awaitTermination(2, TimeUnit.SECONDS);
        }
        // 關閉線程池
        forkJoinPool.shutdown();
    }

}

這段代碼雖然打印了0-999這一千個數字,可是並非連續打印的,這是由於程序將這個打印任務進行了分解,分解後的任務會並行執行,因此不會按順序打印。

 

ForkJoinPool 簡單分析

commonPool的建立 - ForkJoinPool.commPool() 

翻看源碼,ForkJoinPool 有這麼一個 字段,

/**
     * Common (static) pool. Non-null for public use unless a static
     * construction exception, but internal usages null-check on use
     * to paranoically avoid potential initialization circularities
     * as well as to simplify generated code.
     */
    static final ForkJoinPool common;

這就是 commonPool ,是ForkJoinPool 在類加載時候建立的,

/**
     * Creates and returns the common pool, respecting user settings
     * specified via system properties.
     */
    private static ForkJoinPool makeCommonPool() {
        int parallelism = -1;
        ForkJoinWorkerThreadFactory factory = null;
        UncaughtExceptionHandler handler = null;
        try {  // ignore exceptions in accessing/parsing properties
            String pp = System.getProperty
                ("java.util.concurrent.ForkJoinPool.common.parallelism");
            String fp = System.getProperty
                ("java.util.concurrent.ForkJoinPool.common.threadFactory");
            String hp = System.getProperty
                ("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
            if (pp != null)
                parallelism = Integer.parseInt(pp);
            if (fp != null)
                factory = ((ForkJoinWorkerThreadFactory)ClassLoader.
                           getSystemClassLoader().loadClass(fp).newInstance());
            if (hp != null)
                handler = ((UncaughtExceptionHandler)ClassLoader.
                           getSystemClassLoader().loadClass(hp).newInstance());
        } catch (Exception ignore) {
        }
        if (factory == null) {
            if (System.getSecurityManager() == null)
                factory = defaultForkJoinWorkerThreadFactory;
            else // use security-managed default
                factory = new InnocuousForkJoinWorkerThreadFactory();
        }
        if (parallelism < 0 && // default 1 less than #cores
            (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
            parallelism = 1;
        if (parallelism > MAX_CAP)
            parallelism = MAX_CAP;
        return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
                                "ForkJoinPool.commonPool-worker-");
    }

在makeCommonPool 方法中建立commonPool的時候 調用的構造函數以下,

/**
     * Creates a {@code ForkJoinPool} with the given parameters, without
     * any security checks or parameter validation.  Invoked directly by
     * makeCommonPool.
     */
    private ForkJoinPool(int parallelism,
                         ForkJoinWorkerThreadFactory factory,
                         UncaughtExceptionHandler handler,
                         int mode,
                         String workerNamePrefix) {
        this.workerNamePrefix = workerNamePrefix;
        this.factory = factory;
        this.ueh = handler;
        this.config = (parallelism & SMASK) | mode;
        long np = (long)(-parallelism); // offset ctl counts
        this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
    }

重要參數解釋(咱們仍是結合英文註釋來看):

1. parallelism:並行度( the parallelism level),默認狀況下跟咱們機器的cpu個數保持一致,使用 Runtime.getRuntime().availableProcessors()能夠獲得咱們機器運行時可用的CPU個數

2. factory:建立新線程的工廠( the factory for creating new threads)。默認狀況下使用ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory。

3. handler:線程異常狀況下的處理器,該處理器在線程執行任務時因爲某些沒法預料到的錯誤而致使任務線程中斷時進行一些處理,默認狀況爲null。

4. asyncMode:這個參數要注意,在ForkJoinPool中,每個工做線程都有一個獨立的任務隊列,asyncMode表示工做線程內的任務隊列是採用何種方式進行調度,能夠是先進先出FIFO,也能夠是後進先出LIFO。若是爲true,則線程池中的工做線程則使用先進先出方式進行任務調度,默認狀況下是false。

ForkJoinPool 有一個 Async Mode ,效果是工做線程在處理本地任務時也使用 FIFO 順序。這種模式下的 ForkJoinPool 更接近因而一個消息隊列,而不是用來處理遞歸式的任務。

 

ForkJoinPool work stealing 算法

  1. ForkJoinPool 的每一個工做線程都維護着一個工做隊列(WorkQueue),這是一個雙端隊列(Deque),裏面存放的對象是任務(ForkJoinTask)。
  2. 每一個工做線程在運行中產生新的任務(一般是由於調用了 fork())時,會放入工做隊列的隊尾,而且工做線程在處理本身的工做隊列時,使用的是 LIFO 方式,也就是說每次從隊尾取出任務來執行。
  3. 每一個工做線程在處理本身的工做隊列同時,會嘗試竊取一個任務(或是來自於剛剛提交到 pool 的任務,或是來自於其餘工做線程的工做隊列),竊取的任務位於其餘線程的工做隊列的隊首,也就是說工做線程在竊取其餘工做線程的任務時,使用的是 FIFO 方式。
  4. 在遇到 join() 時,若是須要 join 的任務還沒有完成,則會先處理其餘任務,並等待其完成。
  5. 在既沒有本身的任務,也沒有能夠竊取的任務時,進入休眠。

 

ForkJoinTask fork 方法

fork() 作的工做只有一件事,既是把任務推入當前工做線程的工做隊列裏。能夠參看如下的源代碼:

public final ForkJoinTask<V> fork() {
        Thread t;
        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
            ((ForkJoinWorkerThread)t).workQueue.push(this);
        else
            ForkJoinPool.common.externalPush(this);
        return this;
    }

 

ForkJoinTask join 方法

join() 的工做則複雜得多,也是 join() 可使得線程免於被阻塞的緣由——不像同名的 Thread.join()

  1. 檢查調用 join() 的線程是不是 ForkJoinThread 線程。若是不是(例如 main 線程),則阻塞當前線程,等待任務完成。若是是,則不阻塞。
  2. 查看任務的完成狀態,若是已經完成,直接返回結果。
  3. 若是任務還沒有完成,但處於本身的工做隊列內,則完成它。
  4. 若是任務已經被其餘的工做線程偷走,則竊取這個小偷的工做隊列內的任務(以 FIFO 方式),執行,以期幫助它早日完成欲 join 的任務。
  5. 若是偷走任務的小偷也已經把本身的任務所有作完,正在等待須要 join 的任務時,則找到小偷的小偷,幫助它完成它的任務。
  6. 遞歸地執行第5步。

將上述流程畫成序列圖的話就是這個樣子:

 

ForkJoinPool.submit 方法

public static void main(String[] args) throws InterruptedException {
        // 建立包含Runtime.getRuntime().availableProcessors()返回值做爲個數的並行線程的ForkJoinPool
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        // 提交可分解的PrintTask任務
        forkJoinPool.submit(new MyRecursiveAction(0, 1000));

        while (!forkJoinPool.isTerminated()) {
            forkJoinPool.awaitTermination(2, TimeUnit.SECONDS);
        }
        // 關閉線程池
        forkJoinPool.shutdown();
    }

其實除了前面介紹過的每一個工做線程本身擁有的工做隊列之外,ForkJoinPool 自身也擁有工做隊列,這些工做隊列的做用是用來接收由外部線程(非 ForkJoinThread 線程)提交過來的任務,而這些工做隊列被稱爲 submitting queue 。

submit() 和 fork() 其實沒有本質區別,只是提交對象變成了 submitting queue 而已(還有一些同步,初始化的操做)。submitting queue 和其餘 work queue 同樣,是工做線程」竊取「的對象,所以當其中的任務被一個工做線程成功竊取時,就意味着提交的任務真正開始進入執行階段。

參考:http://blog.dyngr.com/blog/2016/09/15/java-forkjoinpool-internals/

=======END=======

相關文章
相關標籤/搜索