這是一個JDK7引入的並行框架,它把流程劃分紅fork(分解)+join(合併)兩個步驟(怎麼那麼像MapReduce?),傳統線程池來實現一個並行任務的時候,常常須要花費大量的時間去等待其餘線程執行任務的完成,可是fork-join框架使用work stealing技術緩解了這個問題:java
依靠應用程序自己並行拆封任務,若是使用簡單的多線程程序的方法,複雜度必然很大。這就須要一個更好的範式或者工具來代程序員處理這類問題。Java 7也意識到了這個問題,才標準庫中集成了由Doug Lea開發的Fork/Join並行計算框架。經過使用 Fork/Join 模式,軟件開發人員可以方便地利用多核平臺的計算能力。儘管尚未作到對軟件開發人員徹底透明,Fork/Join 模式已經極大地簡化了編寫併發程序的瑣碎工做。對於符合 Fork/Join 模式的應用,軟件開發人員再也不須要處理各類並行相關事務,例如同步、通訊等,以難以調試而聞名的死鎖和 data race 等錯誤也就不會出現,提高了思考問題的層次。你能夠把 Fork/Join 模式看做並行版本的 Divide and Conquer 策略,僅僅關注如何劃分任務和組合中間結果,將剩下的事情丟給 Fork/Join 框架。可是Fork/Join並行計算框架,並非銀彈,並不能解決全部應用程序在超多核心處理器上的併發問題。程序員
若是一個應用能被分解成多個子任務,而且組合多個子任務的結果就可以得到最終的答案,那麼這個應用就適合用 Fork/Join 模式來解決。其原理以下圖。
數組
應用程序開發者須要作的就是拆分任務並組合每一個子任務的中間結果,而不用再考慮線程和鎖的問題。多線程
咱們首先看一個簡單的Fork/Join的任務定義。併發
public class Calculator extends RecursiveTask<Integer> { private static final int THRESHOLD = 100; private int start; private int end; public Calculator(int start, int end) { this.start = start; this.end = end; } @Override protected Integer compute() { int sum = 0; if((start - end) < THRESHOLD){ for(int i = start; i< end;i++){ sum += i; } }else{ int middle = (start + end) /2; Calculator left = new Calculator(start, middle); Calculator right = new Calculator(middle + 1, end); left.fork(); right.fork(); sum = left.join() + right.join(); } return sum; } }
這段代碼中,定義了一個累加的任務,在compute方法中,判斷當前的計算範圍是否小於一個值,若是是則計算,若是沒有,就把任務拆分爲連個子任務,併合並連個子任務的中間結果。程序遞歸的完成了任務拆分和計算。框架
任務定義以後就是執行任務,Fork/Join提供一個和Executor框架 的擴展線程池來執行任務。dom
@Test public void run() throws Exception{ ForkJoinPool forkJoinPool = new ForkJoinPool(); Future<Integer> result = forkJoinPool.submit(new Calculator(0, 10000)); assertEquals(new Integer(49995000), result.get()); }
RecursiveAction供不須要返回值的任務繼續。ide
RecursiveTask經過泛型參數設置計算的返回值類型。工具
ForkJoinPool提供了一系列的submit方法,計算任務。ForkJoinPool默認的線程數經過Runtime.availableProcessors()得到,由於在計算密集型的任務中,得到多於處理性核心數的線程並不能得到更多性能提高。性能
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
doSubmit(task);
return task;
}
sumit方法返回了task自己,ForkJoinTask實現了Future接口,因此能夠經過它等待得到結果。
這個例子並行排序數組,不須要返回結果,因此繼承了RecursiveAction。
public class SortTask extends RecursiveAction { final long[] array; final int start; final int end; private int THRESHOLD = 100; //For demo only public SortTask(long[] array) { this.array = array; this.start = 0; this.end = array.length - 1; } public SortTask(long[] array, int start, int end) { this.array = array; this.start = start; this.end = end; } protected void compute() { if (end - start < THRESHOLD) sequentiallySort(array, start, end); else { int pivot = partition(array, start, end); new SortTask(array, start, pivot - 1).fork(); new SortTask(array, pivot + 1, end).fork(); } } private int partition(long[] array, int start, int end) { long x = array[end]; int i = start - 1; for (int j = start; j < end; j++) { if (array[j] <= x) { i++; swap(array, i, j); } } swap(array, i + 1, end); return i + 1; } private void swap(long[] array, int i, int j) { if (i != j) { long temp = array[i]; array[i] = array[j]; array[j] = temp; } } private void sequentiallySort(long[] array, int lo, int hi) { Arrays.sort(array, lo, hi + 1); } }