「分而治之」 一直是一個有效的處理大量數據的方法。著名的 MapReduce 也是採起了分而治之的思想。簡單來講,就是若是你要處理1000個數據,可是你並不具有處理1000個數據的能力,那麼你能夠只處理其中的10個,而後,分階段處理100次,將100次的結果進行合成,那就是最終想要的對原始的1000個數據的處理結果。java
Fork 一詞的原始含義是吃飯用的叉子,也有分叉的意思。在Linux 平臺中,函數 fork()用來建立子進程,使得系統進程能夠多一個執行分支。在 Java 中也沿用了相似的命名方式。併發
而 Join() 的含義和 Thread 類的 join 相似,表示等待。也就是使用 fork() 後系統多了一個執行分支(線程),因此須要等待這個執行分支執行完畢,纔有可能獲得最終的結果,所以 join 就是表示等待。框架
在實際使用中,若是毫無顧忌的使用 fork 開啓線程進行處理,那麼頗有可能致使系統開啓過多的線程而嚴重影響性能。因此,在JDK中,給出一個 ForkJoinPool 線程池,對於 fork() 方法並不急着開啓線程,而是提交給 ForkJoiinPool 線程池進行處理,以節省系統資源。ide
因爲線程池的優化,提交的任務和線程數量並非一對一的關係。在絕大多數狀況下,一個物理線程其實是須要處理多個邏輯任務的。所以,每一個線程必然須要擁有一個任務隊列。所以,在實際執行過程當中,可能遇到這麼一種狀況:線程A已經把本身的任務都處理完了,而線程B還有一堆任務等着處理,此時,線程A就會「幫助」 線程B,從線程 B的任務隊列中拿一個任務來處理,儘量的達到平衡。值得注意的是:當線程試圖幫助別人時,老是從任務隊列的底部開始拿數據,而線程試圖執行本身的任務時,則從相反的頂部開始拿。所以這種行爲也十分有利於避免數據競爭。函數
咱們看看線程池 ForkJoinPool 的一個接口:高併發
/** * Submits a ForkJoinTask for execution. * * @param task the task to submit * @param <T> the type of the task's result * @return the task * @throws NullPointerException if the task is null * @throws RejectedExecutionException if the task cannot be * scheduled for execution */ public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) { if (task == null) throw new NullPointerException(); externalPush(task); return task; }
你能夠向 ForkJoinPool 線程池提交一個 ForkJoinTask 任務。所謂 ForkJoinTask 任務就是支持 fork () 分解以及 join()等待的任務。 ForkJoinTask 有兩個重要的子類,RecursiveAction 和 RecursiveTask。他們粉筆表示沒有返回值的任務和能夠攜帶返回值的任務。有點像 Rannable 和 Callable。性能
下面來要給簡單的例子展現 Fork/Join 框架的使用。這裏用來計算求和。優化
/** * Fork/Join 核心思想:分而治之 * * 著名的 MapReduce 也是這個思想。將任務進行分解,而後合併全部的結果。 * */ public class CountTask extends RecursiveTask<Long> { /** * 閥值 */ static final int THRESHOLD = 10000; long start; long end; public CountTask(long start, long end) { this.start = start; this.end = end; } /** * 有返回值的 * @return */ @Override protected Long compute() { long sum = 0; // 當閥值小於10000則不分解了 boolean canCompute = (end - start) < THRESHOLD; if (canCompute) { for (long i = start; i <= end; i++) { sum += i; } } else { // 2000 long step = (start + end) / 100; ArrayList<CountTask> subTasks = new ArrayList<>(); long pos = start; for (int i = 0; i < 100; i++) { long lastOne = pos + step; if (lastOne > end) { lastOne = end; } //0-2000 個計算任務 * 100 CountTask subTask = new CountTask(pos, lastOne); pos += step + 1; subTasks.add(subTask); subTask.fork();// fork } for (CountTask t : subTasks) { sum += t.join(); } } return sum; } public static void main(String[] args) { ForkJoinPool forkJoinPool = new ForkJoinPool(); CountTask task = new CountTask(0, 200000L); // 將一個大的任務提交到池中 ForkJoinTask<Long> result = forkJoinPool.submit(task); long res = 0; try { // 等待運算結果 res = result.get(); System.out.println("sum = " + res); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } }
因爲計算求和必須須要返回值,所以咱們選擇了 RecursiveTask 做爲任務的模型。首先咱們構造了一個大任務,提交給線程池,線程池會返回一個攜帶結果的任務,經過 get 方法能夠獲得最終結果。若是執行 get 方法時任務沒有結束,那麼主線程就會在 get 方法等待。this
再看看 CountTask 的實現,首先 CountTask 繼承自 RecursiveTask ,能夠攜帶返回值,這裏的返回值類型設置爲 long,定義一個 THRESHOLD 設置了任務分解的規模,也就是若是須要求和的總數大於 THRESHOLD 個,那麼任務就須要再次分解,不然就直接執行。 每次分解時,簡單的將原有任務劃分紅100個規模相等的小任務,並使用 fork() 提交子任務。以後,等待全部的子任務結束,並將結果再次求和。線程
再使用 ForkJoin的時候注意:若是任務的劃分層次很深,一直得不到返回,那麼可能出現兩種狀況: 第一,系統內的線程數量愈來愈多,致使性能嚴重降低。第二,函數的調用層次變的很深,最終致使棧溢出。
此外,ForkJoin 線程池使用一個無鎖的棧來管理空閒線程,若是一個工做線程暫時取不到可用的任務,則可能會被掛起,掛起的線程將會被壓入由線程池維護的棧中,待未來有任務可用時,再從棧中喚醒這些線程。
本文來源自 《Java 高併發程序設計》,沒有什麼本身的看法。由於使用場景太少了。不過仍是能夠看看源碼來漲漲姿式的。嘿嘿。
good luck !!!!