Java 多線程中的任務分解機制-ForkJoinPool,以及CompletableFuture

ForkJoinPool的優點在於,能夠充分利用多cpu,多核cpu的優點,把一個任務拆分紅多個「小任務」,把多個「小任務」放到多個處理器核心上並行執行;當多個「小任務」執行完成以後,再將這些執行結果合併起來便可。html

Java7 提供了ForkJoinPool來支持將一個任務拆分紅多個「小任務」並行計算,再把多個「小任務」的結果合併成總的計算結果。java

ForkJoinPool是ExecutorService的實現類,所以是一種特殊的線程池。算法

使用方法:建立了ForkJoinPool實例以後,就能夠調用ForkJoinPool的submit(ForkJoinTask<T> task) 或invoke(ForkJoinTask<T> task)方法來執行指定任務了。編程

其中ForkJoinTask表明一個能夠並行、合併的任務。ForkJoinTask是一個抽象類,它還有兩個抽象子類:RecusiveAction和RecusiveTask其中RecusiveTask表明有返回值的任務,而RecusiveAction表明沒有返回值的任務設計模式

Code:多線程

RecusiveAction實現方法:dom

package com.qhong.thread.ForkJoinPoolDemo; import java.util.Random; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveAction; import java.util.concurrent.TimeUnit; public class ForkJoinPoolDemo extends RecursiveAction { private static final long serialVersionUID = 1L; //定義一個分解任務的閾值——50,即一個任務最多承擔50個工做量
    private int THRESHOLD=50; //任務量
    private int task_Num=0; ForkJoinPoolDemo(int Num){ this.task_Num=Num; } public static void main (String[] args) throws Exception { //建立一個支持分解任務的線程池ForkJoinPool
        ForkJoinPool pool=new ForkJoinPool(); ForkJoinPoolDemo task=new ForkJoinPoolDemo(120); pool.submit(task); pool.awaitTermination(20, TimeUnit.SECONDS);//等待20s,觀察結果
 pool.shutdown(); } /** * @author qhong * @param * @return * @date 2018/4/18 17:13 * @description 實現recursiveAction中抽象方法 */ @Override protected void compute() { if(task_Num<=THRESHOLD){ System.out.println(Thread.currentThread().getName()+"承擔了"+task_Num+"份工做"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } }else{ //隨機解成兩個任務
            Random m=new Random(); int x=m.nextInt(50); ForkJoinPoolDemo left=new ForkJoinPoolDemo(x); ForkJoinPoolDemo right=new ForkJoinPoolDemo(task_Num-x); left.fork(); right.fork(); } } }

Output:異步

ForkJoinPool-1-worker-1承擔了6份工做 ForkJoinPool-1-worker-2承擔了2份工做 ForkJoinPool-1-worker-3承擔了30份工做 ForkJoinPool-1-worker-0承擔了9份工做 ForkJoinPool-1-worker-1承擔了46份工做 ForkJoinPool-1-worker-2承擔了17份工做 ForkJoinPool-1-worker-0承擔了0份工做 ForkJoinPool-1-worker-3承擔了10份工做

RecusiveTask的具體實現:ide

package com.qhong.thread.ForkJoinPoolDemo; import java.util.Arrays; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveTask; import java.util.stream.LongStream; /** * @author qhong * @date 2018/4/18 16:14 * @description **/
public class ForkJoinCalculator implements Calculator { private ForkJoinPool pool; public ForkJoinCalculator() { // 也可使用公用的 ForkJoinPool: // pool = ForkJoinPool.commonPool()
        pool = new ForkJoinPool(); } public static void main(String[] args) { ForkJoinCalculator forkJoinCalculator=new ForkJoinCalculator(); long[] numbers=LongStream.range(1,20).toArray(); System.out.println(Arrays.toString(numbers)); long result=forkJoinCalculator.sumUp(numbers); System.out.println("result:"+result); } private static class SumTask extends RecursiveTask<Long> { private long[] numbers; private int from; private int to; public SumTask(long[] numbers, int from, int to) { this.numbers = numbers; this.from = from; this.to = to; } @Override protected Long compute() { // 當須要計算的數字小於6時,直接計算結果
            if (to - from < 4) { long total = 0; for (int i = from; i <= to; i++) { total += numbers[i]; } System.out.println(String.format("currentThread:%s,total:%s,from:%s,to:%s",Thread.currentThread().getName(),total,from,to)); return total; // 不然,把任務一分爲二,遞歸計算
            } else { int middle = (from + to) / 2; SumTask taskLeft = new SumTask(numbers, from, middle); SumTask taskRight = new SumTask(numbers, middle+1, to); taskLeft.fork(); taskRight.fork(); return taskLeft.join() + taskRight.join(); } } } @Override public long sumUp(long[] numbers) { return pool.invoke(new SumTask(numbers, 0, numbers.length-1)); } }

Output:函數式編程

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19] currentThread:ForkJoinPool-1-worker-2,total:6,from:0,to:2 currentThread:ForkJoinPool-1-worker-3,total:36,from:10,to:12 currentThread:ForkJoinPool-1-worker-2,total:9,from:3,to:4 currentThread:ForkJoinPool-1-worker-3,total:29,from:13,to:14 currentThread:ForkJoinPool-1-worker-2,total:21,from:5,to:7 currentThread:ForkJoinPool-1-worker-3,total:70,from:15,to:18 currentThread:ForkJoinPool-1-worker-2,total:19,from:8,to:9 result:190

分析:

根據上面的示例代碼,能夠看出 fork() 和 join() 是 Fork/Join Framework 「魔法」的關鍵。咱們能夠根據函數名假設一下 fork() 和 join() 的做用:

  • fork():開啓一個新線程(或是重用線程池內的空閒線程),將任務交給該線程處理。
  • join():等待該任務的處理線程處理完畢,得到返回值。

並非每一個 fork() 都會促成一個新線程被建立,而每一個 join() 也不是必定會形成線程被阻塞。

Fork/Join Framework 的實現算法並非那麼「顯然」,而是一個更加複雜的算法——這個算法的名字就叫作 work stealing 算法

  • ForkJoinPool 的每一個工做線程都維護着一個工做隊列WorkQueue),這是一個雙端隊列(Deque),裏面存放的對象是任務ForkJoinTask)。
  • 每一個工做線程在運行中產生新的任務(一般是由於調用了 fork())時,會放入工做隊列的隊尾,而且工做線程在處理本身的工做隊列時,使用的是 LIFO 方式,也就是說每次從隊尾取出任務來執行。
  • 每一個工做線程在處理本身的工做隊列同時,會嘗試竊取一個任務(或是來自於剛剛提交到 pool 的任務,或是來自於其餘工做線程的工做隊列),竊取的任務位於其餘線程的工做隊列的隊首,也就是說工做線程在竊取其餘工做線程的任務時,使用的是 FIFO 方式。
  • 在遇到 join() 時,若是須要 join 的任務還沒有完成,則會先處理其餘任務,並等待其完成。
  • 在既沒有本身的任務,也沒有能夠竊取的任務時,進入休眠。

fork() 作的工做只有一件事,既是把任務推入當前工做線程的工做隊列裏

join() 的工做則複雜得多,也是 join() 可使得線程免於被阻塞的緣由——不像同名的 Thread.join()

  1. 檢查調用 join() 的線程是不是 ForkJoinThread 線程。若是不是(例如 main 線程),則阻塞當前線程,等待任務完成。若是是,則不阻塞。
  2. 查看任務的完成狀態,若是已經完成,直接返回結果。
  3. 若是任務還沒有完成,但處於本身的工做隊列內,則完成它。
  4. 若是任務已經被其餘的工做線程偷走,則竊取這個小偷的工做隊列內的任務(以 FIFO 方式),執行,以期幫助它早日完成欲 join 的任務。
  5. 若是偷走任務的小偷也已經把本身的任務所有作完,正在等待須要 join 的任務時,則找到小偷的小偷,幫助它完成它的任務。
  6. 遞歸地執行第5步。

所謂work-stealing模式,即每一個工做線程都會有本身的任務隊列。當工做線程完成了本身全部的工做後,就會去「偷」別的工做線程的任務。

假如咱們須要作一個比較大的任務,咱們能夠把這個任務分割爲若干互不依賴的子任務,爲了減小線程間的競爭,因而把這些子任務分別放到不一樣的隊列裏,併爲每一個隊列建立一個單獨的線程來執行隊列裏的任務,線程和隊列一一對應,好比A線程負責處理A隊列裏的任務。可是有的線程會先把本身隊列裏的任務幹完,而其餘線程對應的隊列裏還有任務等待處理。幹完活的線程與其等着,不如去幫其餘線程幹活,因而它就去其餘線程的隊列裏竊取一個任務來執行。而在這時它們會訪問同一個隊列,因此爲了減小竊取任務線程和被竊取任務線程之間的競爭,一般會使用雙端隊列,被竊取任務線程永遠從雙端隊列的頭部拿任務執行,而竊取任務的線程永遠從雙端隊列的尾部拿任務執行。

submit

其實除了前面介紹過的每一個工做線程本身擁有的工做隊列之外,ForkJoinPool 自身也擁有工做隊列,這些工做隊列的做用是用來接收由外部線程(非 ForkJoinThread 線程)提交過來的任務,而這些工做隊列被稱爲 submitting queue 。

submit() 和 fork() 其實沒有本質區別,只是提交對象變成了 submitting queue 而已(還有一些同步,初始化的操做)。submitting queue 和其餘 work queue 同樣,是工做線程」竊取「的對象,所以當其中的任務被一個工做線程成功竊取時,就意味着提交的任務真正開始進入執行階段。

 

ForkJoinPool與ThreadPoolExecutor區別:

1.ForkJoinPool中的每一個線程都會有一個隊列,而ThreadPoolExecutor只有一個隊列,並根據queue類型不一樣,細分出各類線程池

2.ForkJoinPool可以使用數量有限的線程來完成很是多的具備父子關係的任務,ThreadPoolExecutor中根本沒有什麼父子關係任務

3.ForkJoinPool在使用過程當中,會建立大量的子任務,會進行大量的gc,可是ThreadPoolExecutor不須要,所以單線程(或者任務分配平均)

4.ForkJoinPool在多任務,且任務分配不均是有優點,可是在單線程或者任務分配均勻的狀況下,效率沒有ThreadPoolExecutor高,畢竟要進行大量gc子任務

 

ForkJoinPool在多線程狀況下,可以實現工做竊取(Work Stealing),在該線程池的每一個線程中會維護一個隊列來存放須要被執行的任務。當線程自身隊列中的任務都執行完畢後,它會從別的線程中拿到未被執行的任務並幫助它執行。

ThreadPoolExecutor由於它其中的線程並不會關注每一個任務之間任務量的差別。當執行任務量最小的任務的線程執行完畢後,它就會處於空閒的狀態(Idle),等待任務量最大的任務執行完畢。

所以多任務在多線程中分配不均時,ForkJoinPool效率高。

 

stream中應用ForkJoinPool

Arrays.asList("a1", "a2", "b1", "c2", "c1") .parallelStream() .filter(s -> { System.out.format("filter: %s [%s]\n", s, Thread.currentThread().getName()); return true; }) .map(s -> { System.out.format("map: %s [%s]\n", s, Thread.currentThread().getName()); return s.toUpperCase(); }) .sorted((s1, s2) -> { System.out.format("sort: %s <> %s [%s]\n", s1, s2, Thread.currentThread().getName()); return s1.compareTo(s2); }) .forEach(s -> System.out.format("forEach: %s [%s]\n", s, Thread.currentThread().getName()));

parallelStream讓部分Java代碼自動地以並行的方式執行

最後:

有一點要注意,就是手動設置ForkJoinPool的線程數量時,實際線程數爲設置的線程數+1,由於還有一個main主線程

即便將ForkJoinPool的通用線程池的線程數量設置爲1,實際上也會有2個工做線程。所以線程數爲1的ForkJoinPool通用線程池和線程數爲2的ThreadPoolExecutor是等價的。

與ForkJoinPool對應的是CompletableFuture

Future以及相關使用方法提供了異步執行任務的能力,可是對於結果的獲取倒是很不方便,只能經過阻塞或者輪詢的方式獲得任務的結果。

阻塞的方式顯然和咱們的異步編程的初衷相違背,輪詢的方式又會耗費無謂的CPU資源,並且也不能及時地獲得計算結果

CompletableFuture就是利用觀察者設計模式當計算結果完成及時通知監聽者

在Java 8中, 新增長了一個包含50個方法左右的類: CompletableFuture,提供了很是強大的Future的擴展功能,能夠幫助咱們簡化異步編程的複雜性,提供了函數式編程的能力,能夠經過回調的方式處理計算結果,而且提供了轉換和組合CompletableFuture的方法。

具體講解連接:http://colobu.com/2016/02/29/Java-CompletableFuture/

http://colobu.com/2018/03/12/20-Examples-of-Using-Java%E2%80%99s-CompletableFuture/

 

 

http://www.cnblogs.com/lixuwu/p/7979480.html#undefined

http://blog.dyngr.com/blog/2016/09/15/java-forkjoinpool-internals/

https://www.jianshu.com/p/8d7e3cc892cf

https://blog.csdn.net/dm_vincent/article/details/39505977

相關文章
相關標籤/搜索