分支合併框架的目的是以遞歸方式將能夠並行的任務拆分紅更小的任務。而後將每一個子任務的結果合併起來生成總體的結果。他是ExecutorService接口的一個實現,他把子任務分配給線程池(ForkJoinPool)中的工做線程。java
要把任務提交到這個池子,必須建立一個RecursiveTask<R>的一個子類,其中R是並行化的任務(以及全部子任務)產生的結果類型,或者若是任務不返回結果,則是RecursiveAction類型。要定義RecursiveTask,只需實現他的惟一抽象方法compute便可:算法
protected abstract R compute();
這個方法同時定義了將任務拆分紅子任務的邏輯,以及沒法在拆分或不方便在拆分時,生成單個子任務的結果的邏輯。正因爲此,這個方法的實現相似於下面的僞代碼。數組
public R compute(){ if(task < threadShold){ concurrent execution... }else{ split task to left and right; 遞歸調用本方法 拆分每一個子任務 等待全部子任務完成 合併每一個子任務結果 } }
compute的編寫過程很是像分治算法。框架
接下來咱們試着使用這個模式爲一個數字數組進行求和計算。異步
package com.sunrun.movieshow.concurrent.forkjoin; import java.util.concurrent.RecursiveTask; public class ForkJoinSumCalculator extends RecursiveTask<Long> { // compute array private final long[] numbers; private final int start; private final int end; // 再也不切分的閾值 private static final long THRESHOLD = 10_000L; private ForkJoinSumCalculator(long[] numbers, int start, int end){ this.numbers = numbers; this.start = start; this.end = end; } // 用於建立主任務 public ForkJoinSumCalculator(long[] numbers){ this(numbers,0,numbers.length); } @Override protected Long compute() { // 計算當前處理的數組長度 int length = end - start; // 判斷是否須要切分 if(length <= THRESHOLD){ // 小於當前閾值,順序計算結果 return computeSequentially(); } // == 不然切分計算 ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbers, start, start + length / 2); // 利用另外一個ForkJoinPool線程異步執行新建立的子任務 leftTask.fork(); ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbers, start + length / 2, end); // 這裏也能夠調用join,但比起再次分配線程,最好仍是直接調用compute好些。 Long rightResult = rightTask.compute(); // 獲取切分的另外一個任務的結果,這一句代碼放在最後,避免阻塞 Long leftResult = leftTask.join(); // 返回結算結果 return leftResult + rightResult; } /** * 順序計算結果 * @return 當前順序計算的結果 */ private Long computeSequentially() { long sum = 0; for (int i = start; i < end; i++) { sum += numbers[i]; } return sum; } }
建立ForkJoin線程池並提交咱們建立的任務。ide
package com.sunrun.movieshow.concurrent.forkjoin; import java.util.Arrays; import java.util.List; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveTask; import java.util.stream.LongStream; public class ForkJoinSumCalculatorTest{ public static void main(String[] args) { // 須要計算的數組 long n = 1_000_000; // 1~1000000 long[] numbers = LongStream.rangeClosed(1, n).toArray(); // create ForkJoinSumCalculator instance. ForkJoinSumCalculator task = new ForkJoinSumCalculator(numbers); // create pool and set forkjoin calculator to it. System.out.println(new ForkJoinPool().invoke(task)); /** * 500000500000 */ } }
注意到這個計算過程會涉及到裝拆箱,所以性能可能會有所下降。性能
雖然分支合併框架還算簡單易用,可是須要注意使用他的場合,避免出現難以預估的錯誤。優化
錯誤還好,怕是沒發現產生了這種錯誤。異步框架的錯誤調試是很痛苦的。ui
對一個任務調用Join方法會阻塞調用方,直到該任務做出結果。所以,咱們最好將該方法的調用放在方法的最後(如2代碼所示)。不然,咱們獲得的版本會比原始的順序算法更慢,由於每一個子任務都必須等待另外一個子任務完成才能啓動;this
不該該在RecursieTask內部使用ForkJoinPool的invoke方法。相反,你應該時鐘直接調用compute或fork方法,只有順序代碼才應該用invoke來啓動並行計算。
對子任務調用fork方法能夠把它排進ForkJoinPool。同時對左邊和右邊的子任務調用他彷佛很天然。但這樣作的效率比直接對其中一個調用compute要低。這樣作咱們能夠Wie其中的一個子任務重用同一線程,從而避免在線程池中多分配一個任務形成的開銷,所以咱們對rightTask調用的是compute方法。
調試使用分支/合併框架的並行計算比較麻煩:由於compute的線程並非概念上的調用方,後者是調用fork的那個。
JIT編譯器優化問題:分支/合併框架須要預熱或者說執行即使纔會被JIT編譯器優化。這就是爲何在測量性能以前要跑幾遍程序的緣由。
最後,咱們必須選擇一個標準,來決定任務是要進一步拆分仍是已經小到能夠順序求值。這是一個經驗的問題,沒有一個定型的標準,好比咱們的程序以數組長度爲10000進行劃分,當前的計算數組長度小於等於該值時,就再也不進行切分,而直接順序計算結果。