ForkJoinPool 是Java 1.7 引入的一種新的併發框架—— ForkJoin Framework。如下是重要的幾點特性:java
ForkJoinPool & ForkJoinTask 概述:算法
引用:https://www.infoq.cn/article/fork-join-introduction併發
好比計算1至1000的正整數之和,使用ForkJoinPool 如何進行並行的計算。框架
package common.forkjoinpool; public interface Calculator { long sumUp(long[] numbers); }
package common.forkjoinpool; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveTask; public class ForkJoinCalculator implements Calculator { private ForkJoinPool pool; public ForkJoinCalculator() { // 也可使用公用的 ForkJoinPool: // pool = ForkJoinPool.commonPool() pool = new ForkJoinPool(); } @Override public long sumUp(long[] numbers) { return pool.invoke(new SumTask(numbers, 0, numbers.length - 1)); } private static class SumTask extends RecursiveTask<Long> { private long[] numbers; private int from; private int to; public SumTask(long[] numbers, int from, int to) { this.numbers = numbers; this.from = from; this.to = to; } @Override protected Long compute() { // 當須要計算的數字小於6時,直接計算結果 if (to - from < 6) { long total = 0; for (int i = from; i <= to; i++) { total += numbers[i]; } return total; // 不然,把任務一分爲二,遞歸計算 } else { int middle = (from + to) / 2; SumTask taskLeft = new SumTask(numbers, from, middle); SumTask taskRight = new SumTask(numbers, middle + 1, to); taskLeft.fork(); taskRight.fork(); return taskLeft.join() + taskRight.join(); } } } }
package common.forkjoinpool; import java.util.stream.LongStream; public class Main { public static void main(String[] args) { long[] numbers = LongStream.rangeClosed(1, 1000).toArray(); Calculator calculator = new ForkJoinCalculator(); System.out.println(calculator.sumUp(numbers)); // 打印結果500500 } }
這段代碼的核心方法是 less
@Override protected Long compute() { // 當須要計算的數字小於6時,直接計算結果 if (to - from < 6) { long total = 0; for (int i = from; i <= to; i++) { total += numbers[i]; } return total; // 不然,把任務一分爲二,遞歸計算 } else { int middle = (from + to) / 2; SumTask taskLeft = new SumTask(numbers, from, middle); SumTask taskRight = new SumTask(numbers, middle + 1, to); taskLeft.fork(); taskRight.fork(); return taskLeft.join() + taskRight.join(); } }
經過 compute() 方法,分解任務,分而治之。async
ForkJoinTask 繼承關係以下,ide
RecursiveTask 和 RecursiveAction 的區別:函數
RecursiveAction性能
它是一種沒有任何返回值的任務。只是作一些工做,好比寫數據到磁盤,而後就退出了。 一個RecursiveAction能夠把本身的工做分割成更小的幾塊, 這樣它們能夠由獨立的線程或者CPU執行。 咱們能夠經過繼承來實現一個RecursiveAction。ui
RecursiveTask
它是一種會返回結果的任務。能夠將本身的工做分割爲若干更小任務,並將這些子任務的執行合併到一個集體結果。 能夠有幾個水平的分割和合並。
RecursiveAction以下使用方法,
package common.forkjointask; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveAction; import java.util.concurrent.TimeUnit; public class MyRecursiveAction extends RecursiveAction { /** * 每一個"小任務"最多隻打印20個數 */ private static final int MAX = 20; private int start; private int end; public MyRecursiveAction(int start, int end) { this.start = start; this.end = end; } @Override protected void compute() { //當end-start的值小於MAX時,開始打印 if ((end - start) < MAX) { for (int i = start; i < end; i++) { System.out.println(Thread.currentThread().getName() + "-i的值" + i); } } else { // 將大任務分解成兩個小任務 int middle = (start + end) / 2; MyRecursiveAction left = new MyRecursiveAction(start, middle); MyRecursiveAction right = new MyRecursiveAction(middle, end); left.fork(); right.fork(); } } public static void main(String[] args) throws InterruptedException { // 建立包含Runtime.getRuntime().availableProcessors()返回值做爲個數的並行線程的ForkJoinPool ForkJoinPool forkJoinPool = new ForkJoinPool(); // 提交可分解的PrintTask任務 forkJoinPool.submit(new MyRecursiveAction(0, 1000)); while (!forkJoinPool.isTerminated()) { forkJoinPool.awaitTermination(2, TimeUnit.SECONDS); } // 關閉線程池 forkJoinPool.shutdown(); } }
這段代碼雖然打印了0-999這一千個數字,可是並非連續打印的,這是由於程序將這個打印任務進行了分解,分解後的任務會並行執行,因此不會按順序打印。
翻看源碼,ForkJoinPool 有這麼一個 字段,
/** * Common (static) pool. Non-null for public use unless a static * construction exception, but internal usages null-check on use * to paranoically avoid potential initialization circularities * as well as to simplify generated code. */ static final ForkJoinPool common;
這就是 commonPool ,是ForkJoinPool 在類加載時候建立的,
/** * Creates and returns the common pool, respecting user settings * specified via system properties. */ private static ForkJoinPool makeCommonPool() { int parallelism = -1; ForkJoinWorkerThreadFactory factory = null; UncaughtExceptionHandler handler = null; try { // ignore exceptions in accessing/parsing properties String pp = System.getProperty ("java.util.concurrent.ForkJoinPool.common.parallelism"); String fp = System.getProperty ("java.util.concurrent.ForkJoinPool.common.threadFactory"); String hp = System.getProperty ("java.util.concurrent.ForkJoinPool.common.exceptionHandler"); if (pp != null) parallelism = Integer.parseInt(pp); if (fp != null) factory = ((ForkJoinWorkerThreadFactory)ClassLoader. getSystemClassLoader().loadClass(fp).newInstance()); if (hp != null) handler = ((UncaughtExceptionHandler)ClassLoader. getSystemClassLoader().loadClass(hp).newInstance()); } catch (Exception ignore) { } if (factory == null) { if (System.getSecurityManager() == null) factory = defaultForkJoinWorkerThreadFactory; else // use security-managed default factory = new InnocuousForkJoinWorkerThreadFactory(); } if (parallelism < 0 && // default 1 less than #cores (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0) parallelism = 1; if (parallelism > MAX_CAP) parallelism = MAX_CAP; return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE, "ForkJoinPool.commonPool-worker-"); }
在makeCommonPool 方法中建立commonPool的時候 調用的構造函數以下,
/** * Creates a {@code ForkJoinPool} with the given parameters, without * any security checks or parameter validation. Invoked directly by * makeCommonPool. */ private ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler, int mode, String workerNamePrefix) { this.workerNamePrefix = workerNamePrefix; this.factory = factory; this.ueh = handler; this.config = (parallelism & SMASK) | mode; long np = (long)(-parallelism); // offset ctl counts this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK); }
重要參數解釋(咱們仍是結合英文註釋來看):
1. parallelism:並行度( the parallelism level),默認狀況下跟咱們機器的cpu個數保持一致,使用 Runtime.getRuntime().availableProcessors()能夠獲得咱們機器運行時可用的CPU個數
2. factory:建立新線程的工廠( the factory for creating new threads)。默認狀況下使用ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory。
3. handler:線程異常狀況下的處理器,該處理器在線程執行任務時因爲某些沒法預料到的錯誤而致使任務線程中斷時進行一些處理,默認狀況爲null。
4. asyncMode:這個參數要注意,在ForkJoinPool中,每個工做線程都有一個獨立的任務隊列,asyncMode表示工做線程內的任務隊列是採用何種方式進行調度,能夠是先進先出FIFO,也能夠是後進先出LIFO。若是爲true,則線程池中的工做線程則使用先進先出方式進行任務調度,默認狀況下是false。
ForkJoinPool
有一個 Async Mode ,效果是工做線程在處理本地任務時也使用 FIFO 順序。這種模式下的ForkJoinPool
更接近因而一個消息隊列,而不是用來處理遞歸式的任務。
fork()
作的工做只有一件事,既是把任務推入當前工做線程的工做隊列裏。能夠參看如下的源代碼:
public final ForkJoinTask<V> fork() { Thread t; if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ((ForkJoinWorkerThread)t).workQueue.push(this); else ForkJoinPool.common.externalPush(this); return this; }
join()
的工做則複雜得多,也是 join()
可使得線程免於被阻塞的緣由——不像同名的 Thread.join()
。
join()
的線程是不是 ForkJoinThread 線程。若是不是(例如 main 線程),則阻塞當前線程,等待任務完成。若是是,則不阻塞。將上述流程畫成序列圖的話就是這個樣子:
public static void main(String[] args) throws InterruptedException { // 建立包含Runtime.getRuntime().availableProcessors()返回值做爲個數的並行線程的ForkJoinPool ForkJoinPool forkJoinPool = new ForkJoinPool(); // 提交可分解的PrintTask任務 forkJoinPool.submit(new MyRecursiveAction(0, 1000)); while (!forkJoinPool.isTerminated()) { forkJoinPool.awaitTermination(2, TimeUnit.SECONDS); } // 關閉線程池 forkJoinPool.shutdown(); }
其實除了前面介紹過的每一個工做線程本身擁有的工做隊列之外,ForkJoinPool
自身也擁有工做隊列,這些工做隊列的做用是用來接收由外部線程(非 ForkJoinThread
線程)提交過來的任務,而這些工做隊列被稱爲 submitting queue 。
submit()
和 fork()
其實沒有本質區別,只是提交對象變成了 submitting queue 而已(還有一些同步,初始化的操做)。submitting queue 和其餘 work queue 同樣,是工做線程」竊取「的對象,所以當其中的任務被一個工做線程成功竊取時,就意味着提交的任務真正開始進入執行階段。
參考:http://blog.dyngr.com/blog/2016/09/15/java-forkjoinpool-internals/
=======END=======