在JDK中,提供了這樣一種功能:它可以將複雜的邏輯拆分紅一個個簡單的邏輯來並行執行,待每一個並行執行的邏輯執行完成後,再將各個結果進行彙總,得出最終的結果數據。有點像Hadoop中的MapReduce。java
ForkJoin是由JDK1.7以後提供的多線程併發處理框架。ForkJoin框架的基本思想是分而治之。什麼是分而治之?分而治之就是將一個複雜的計算,按照設定的閾值分解成多個計算,而後將各個計算結果進行彙總。相應的,ForkJoin將複雜的計算當作一個任務,而分解的多個計算則是當作一個個子任務來並行執行。算法
對於Java語言來講,生來就支持多線程併發編程,在併發編程領域也是在不斷髮展的。Java在其發展過程當中對併發編程的支持愈來愈完善也正好印證了這一點。編程
併發和並行在本質上仍是有所區別的。數組
併發指的是在同一時刻,只有一個線程可以獲取到CPU執行任務,而多個線程被快速的輪換執行,這就使得在宏觀上具備多個線程同時執行的效果,併發不是真正的同時執行,併發可使用下圖表示。微信
並行指的是不管什麼時候,多個線程都是在多個CPU核心上同時執行的,是真正的同時執行。多線程
把一個規模大的問題劃分爲規模較小的子問題,而後分而治之,最後合併子問題的解獲得原問題的解。併發
①分割原問題;框架
②求解子問題;ide
③合併子問題的解爲原問題的解。函數
咱們可使用以下僞代碼來表示這個步驟。
if(任務很小){ 直接計算獲得結果 }else{ 分拆成N個子任務 調用子任務的fork()進行計算 調用子任務的join()合併計算結果 }
在分治法中,子問題通常是相互獨立的,所以,常常經過遞歸調用算法來求解子問題。
Java 1.7 引入了一種新的併發框架—— Fork/Join Framework,主要用於實現「分而治之」的算法,特別是分治以後遞歸調用的函數。
ForkJoin框架的本質是一個用於並行執行任務的框架, 可以把一個大任務分割成若干個小任務,最終彙總每一個小任務結果後獲得大任務的計算結果。在Java中,ForkJoin框架與ThreadPool共存,並非要替換ThreadPool
其實,在Java 8中引入的並行流計算,內部就是採用的ForkJoinPool來實現的。例如,下面使用並行流實現打印數組元組的程序。
public class SumArray { public static void main(String[] args){ List<Integer> numberList = Arrays.asList(1,2,3,4,5,6,7,8,9); numberList.parallelStream().forEach(System.out::println); } }
這段代碼的背後就使用到了ForkJoinPool。
說到這裏,可能有讀者會問:可使用線程池的ThreadPoolExecutor來實現啊?爲何要使用ForkJoinPool啊?ForkJoinPool是個什麼鬼啊?! 接下來,咱們就來回答這個問題。
ForkJoin框架是從jdk1.7中引入的新特性,它同ThreadPoolExecutor同樣,也實現了Executor和ExecutorService接口。它使用了一個無限隊列來保存須要執行的任務,而線程的數量則是經過構造函數傳入,若是沒有向構造函數中傳入指定的線程數量,那麼當前計算機可用的CPU數量會被設置爲線程數量做爲默認值。
ForkJoinPool主要使用分治法(Divide-and-Conquer Algorithm)來解決問題。典型的應用好比快速排序算法。這裏的要點在於,ForkJoinPool可以使用相對較少的線程來處理大量的任務。好比要對1000萬個數據進行排序,那麼會將這個任務分割成兩個500萬的排序任務和一個針對這兩組500萬數據的合併任務。以此類推,對於500萬的數據也會作出一樣的分割處理,到最後會設置一個閾值來規定當數據規模到多少時,中止這樣的分割處理。好比,當元素的數量小於10時,會中止分割,轉而使用插入排序對它們進行排序。那麼到最後,全部的任務加起來會有大概200萬+個。問題的關鍵在於,對於一個任務而言,只有當它全部的子任務完成以後,它纔可以被執行。
因此當使用ThreadPoolExecutor時,使用分治法會存在問題,由於ThreadPoolExecutor中的線程沒法向任務隊列中再添加一個任務並在等待該任務完成以後再繼續執行。而使用ForkJoinPool就可以解決這個問題,它就可以讓其中的線程建立新的任務,並掛起當前的任務,此時線程就可以從隊列中選擇子任務執行。
那麼使用ThreadPoolExecutor或者ForkJoinPool,性能上會有什麼差別呢?
首先,使用ForkJoinPool可以使用數量有限的線程來完成很是多的具備父子關係的任務,好比使用4個線程來完成超過200萬個任務。可是,使用ThreadPoolExecutor時,是不可能完成的,由於ThreadPoolExecutor中的Thread沒法選擇優先執行子任務,須要完成200萬個具備父子關係的任務時,也須要200萬個線程,很顯然這是不可行的,也是很不合理的!!
假如咱們須要作一個比較大的任務,咱們能夠把這個任務分割爲若干互不依賴的子任務,爲了減小線程間的競爭,因而把這些子任務分別放到不一樣的隊列裏,併爲每一個隊列建立一個單獨的線程來執行隊列裏的任務,線程和隊列一一對應,好比A線程負責處理A隊列裏的任務。可是有的線程會先把本身隊列裏的任務幹完,而其餘線程對應的隊列裏還有任務等待處理。幹完活的線程與其等着,不如去幫其餘線程幹活,因而它就去其餘線程的隊列裏竊取一個任務來執行。而在這時它們會訪問同一個隊列,因此爲了減小竊取任務線程和被竊取任務線程之間的競爭,一般會使用雙端隊列,被竊取任務線程永遠從雙端隊列的頭部拿任務執行,而竊取任務的線程永遠從雙端隊列的尾部拿任務執行。
工做竊取算法的優勢:
充分利用線程進行並行計算,並減小了線程間的競爭。
工做竊取算法的缺點:
在某些狀況下仍是存在競爭,好比雙端隊列裏只有一個任務時。而且該算法會消耗更多的系統資源,好比建立多個線程和多個雙端隊列。
Fork/Join框架侷限性:
對於Fork/Join框架而言,當一個任務正在等待它使用Join操做建立的子任務結束時,執行這個任務的工做線程查找其餘未被執行的任務,並開始執行這些未被執行的任務,經過這種方式,線程充分利用它們的運行時間來提升應用程序的性能。爲了實現這個目標,Fork/Join框架執行的任務有一些侷限性。
(1)任務只能使用Fork和Join操做來進行同步機制,若是使用了其餘同步機制,則在同步操做時,工做線程就不能執行其餘任務了。好比,在Fork/Join框架中,使任務進行了睡眠,那麼,在睡眠期間內,正在執行這個任務的工做線程將不會執行其餘任務了。
(2)在Fork/Join框架中,所拆分的任務不該該去執行IO操做,好比:讀寫數據文件。
(3)任務不能拋出檢查異常,必須經過必要的代碼來出來這些異常。
ForkJoin框架中一些重要的類以下所示。
ForkJoinPool 框架中涉及的主要類以下所示。
1.ForkJoinPool類
實現了ForkJoin框架中的線程池,由類圖能夠看出,ForkJoinPool類實現了線程池的Executor接口。
咱們也能夠從下圖中看出ForkJoinPool的類圖關係。
其中,可使用Executors.newWorkStealPool()方法建立ForkJoinPool。
ForkJoinPool中提供了以下提交任務的方法。
public void execute(ForkJoinTask<?> task) public void execute(Runnable task) public <T> T invoke(ForkJoinTask<T> task) public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) public <T> ForkJoinTask<T> submit(Callable<T> task) public <T> ForkJoinTask<T> submit(Runnable task, T result) public ForkJoinTask<?> submit(Runnable task)
2.ForkJoinWorkerThread類
實現ForkJoin框架中的線程。
3.ForkJoinTask
ForkJoinTask封裝了數據及其相應的計算,而且支持細粒度的數據並行。ForkJoinTask比線程要輕量,ForkJoinPool中少許工做線程可以運行大量的ForkJoinTask。
ForkJoinTask類中主要包括兩個方法fork()和join(),分別實現任務的分拆與合併。
fork()方法相似於Thread.start(),可是它並不當即執行任務,而是將任務放入工做隊列中。跟Thread.join()方法不一樣,ForkJoinTask的join()方法並不簡單的阻塞線程,而是利用工做線程運行其餘任務,當一個工做線程中調用join(),它將處理其餘任務,直到注意到目標子任務已經完成。
咱們可使用下圖來表示這個過程。
ForkJoinTask有3個子類:
4.RecursiveTask
有返回結果的ForkJoinTask實現Callable。
5.RecursiveAction類
無返回結果的ForkJoinTask實現Runnable。
6.CountedCompleter
在任務完成執行後會觸發執行一個自定義的鉤子函數。
package io.binghe.concurrency.example.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; import java.util.concurrent.RecursiveTask; @Slf4j public class ForkJoinTaskExample extends RecursiveTask<Integer> { public static final int threshold = 2; private int start; private int end; public ForkJoinTaskExample(int start, int end) { this.start = start; this.end = end; } @Override protected Integer compute() { int sum = 0; //若是任務足夠小就計算任務 boolean canCompute = (end - start) <= threshold; if (canCompute) { for (int i = start; i <= end; i++) { sum += i; } } else { // 若是任務大於閾值,就分裂成兩個子任務計算 int middle = (start + end) / 2; ForkJoinTaskExample leftTask = new ForkJoinTaskExample(start, middle); ForkJoinTaskExample rightTask = new ForkJoinTaskExample(middle + 1, end); // 執行子任務 leftTask.fork(); rightTask.fork(); // 等待任務執行結束合併其結果 int leftResult = leftTask.join(); int rightResult = rightTask.join(); // 合併子任務 sum = leftResult + rightResult; } return sum; } public static void main(String[] args) { ForkJoinPool forkjoinPool = new ForkJoinPool(); //生成一個計算任務,計算1+2+3+4 ForkJoinTaskExample task = new ForkJoinTaskExample(1, 100); //執行一個任務 Future<Integer> result = forkjoinPool.submit(task); try { log.info("result:{}", result.get()); } catch (Exception e) { log.error("exception", e); } } }
若是以爲文章對你有點幫助,請微信搜索並關注「 冰河技術 」微信公衆號,跟冰河學習高併發編程技術。
最後,附上併發編程須要掌握的核心技能知識圖,祝你們在學習併發編程時,少走彎路。