java.util.concurrent public class ForkJoinPool extends AbstractExecutorService public abstract class ForkJoinTask<V> implements Future<V>, Serializable public class ForkJoinWorkerThread extends Thread
Fork/Join技術是分治算法(Divide-and-Conquer)的並行實現,它是一項能夠得到良好的並行性能的簡單且高效的設計技術。 ForkJoinPool的優點在於,能夠充分利用多cpu,多核cpu的優點,把一個任務拆分紅多個「小任務」, 把多個「小任務」放到多個處理器核心上並行執行;當多個「小任務」執行完成以後,再將這些執行結果合併起來便可。 JDK用來執行Fork/Join任務的工做線程池默認大小等於CPU核心數。在一個4核CPU上,最多能夠同時執行4個子任務。 目的是爲了幫助咱們更好地利用多處理器帶來的好處,使用全部可用的運算能力來提高應用的性能。 1. Fork/Join框架主要由ForkJoinPool、ForkJoinWorkerThread和ForkJoinTask來實現 2. ForkJoinPool中只能夠運行ForkJoinTask類型的任務 (在實際使用中,也能夠接收Runnable/Callable任務,但在真正運行時,也會把這些任務封裝成ForkJoinTask類型的任務) 3. ForkJoinTask表示一個任務, ForkJoinTask的子類中有RecursiveAction和RecursiveTask RecursiveAction無返回結果,RecursiveTask有返回結果 工做中,通常重寫RecursiveAction或RecursiveTask的compute(),完成計算或者能夠進行任務拆分。 4. 調用ForkJoinTask的fork()的方法,可讓其餘空閒的線程執行這個ForkJoinTask 調用ForkJoinTask的join()的方法,將多個小任務的結果進行彙總。 5. ForkJoinWorkerThread是運行ForkJoinTask任務的工做線程
ForkJoinPool
中,線程池中每一個工做線程(ForkJoinWorkerThread)都對應一個任務隊列(WorkQueue),
工做線程優先處理來自自身隊列的任務(LIFO),而後以FIFO的順序隨機竊取其餘隊列中的任務。java
ForkJoinPool
並行的實現了分治算法(Divide-and-Conquer):把任務遞歸的拆分爲各個子任務,這樣能夠更好的利用系統資源算法
ForkJoinPool
中的任務分爲兩種:框架
外部任務(external/submissions task)提交三種方式: execute()是直接向池提交一個任務來異步執行,無返回結果; invoke()會一直阻塞到任務執行完成返回計算結果 submit()也是異步執行,可是會返回提交的任務,在適當的時候可經過task.get()獲取執行結果,實現同步到主線程 子任務(Worker task)提交: 由任務的fork()方法完成。 任務被分割(fork)以後調用了ForkJoinPool.WorkQueue.push()方法直接把任務放到隊列中等待被執行。 public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2) { int s1, s2; t2.fork(); if (((s1 = t1.doInvoke()) & ABNORMAL) != 0) t1.reportException(s1); if (((s2 = t2.doJoin()) & ABNORMAL) != 0) t2.reportException(s2); }
對於fork/join模式,假如pool裏面線程數量是固定的,那麼調用子任務的fork方法至關於A先分工給B和C, 而後A當監工不幹活,B和C去完成A交代的任務。 因此上面的模式至關於浪費了一個線程。 若是使用invokeAll至關於A分工給B和C後,A和B和C都去完成工做。這樣縮短了執行的時間。 /** * 測試 ForkJoinPool 線程池的使用 */ public class task { /** * 測試使用 ForkJoinPool 無返回值的任務執行 */ public static void noResultTask() throws Exception { ForkJoinPool pool = new ForkJoinPool(); pool.submit(new PrintTask(1, 200)); pool.shutdown(); } } /** * 無返回值的打印任務 */ class PrintTask extends RecursiveAction { private static final long serialVersionUID = 1L; private static final int THRESHOLD = 50; private int start; private int end; public PrintTask(int start, int end) { this.start = start; this.end = end; } @Override protected void compute() { //當 結束值 比 起始值 大於 50 時,按數值區間平均拆分爲兩個任務;不然直接打印該區間的值 if (end - start < THRESHOLD) { for (int i = start; i <= end; i++) { System.out.println(Thread.currentThread().getName() + ", i = " + i); } } else { int middle = (start + end) / 2; //分紅兩個子任務 PrintTask firstTask = new PrintTask(start, middle); PrintTask secondTask = new PrintTask(middle + 1, end); //任務拆分 //firstTask.fork(); //secondTask.fork(); invokeAll(firstTask,secondTask); } } } public class task { /** * 測試使用 ForkJoinPool 有返回值的任務執行,對結果進行合併。計算 1 到 200 的累加和 */ public static void hasResultTask() throws Exception { //線程池 ForkJoinPool pool = new ForkJoinPool(); //提交任務 ForkJoinTask<Integer> task = pool.submit(new CalculateTask(1, 200)); //獲得結果 int result = task.get(); //關閉線程 pool.shutdown(); } } /** * 有返回值的計算任務 */ class CalculateTask extends RecursiveTask<Integer> { private static final long serialVersionUID = 1L; private static final int THRESHOLD = 50; private int start; private int end; public CalculateTask(int start, int end) { super(); this.start = start; this.end = end; } @Override protected Integer compute() { //當 結束值 比 起始值 大於 50 時,按數值區間平均拆分爲兩個任務,進行兩個任務的累加值彙總 //不然直接計算累加值 if (end - start <= THRESHOLD) { int result = 0; for (int i = start; i <= end; i++) { result += i; } return result; } else { int middle = (start + end) / 2; CalculateTask firstTask = new CalculateTask(start, middle); CalculateTask secondTask = new CalculateTask(middle + 1, end); //任務拆分 invokeAll(firstTask,secondTask); //任務合併 return firstTask.join() + secondTask.join(); } } }