Java Fork/Join 框架

簡介

從JDK1.7開始,Java提供Fork/Join框架用於並行執行任務,它的思想就是講一個大任務分割成若干小任務,最終彙總每一個小任務的結果獲得這個大任務的結果。html

這種思想和MapReduce很像(input --> split --> map --> reduce --> output)java

主要有兩步:算法

  • 第1、任務切分;
  • 第2、結果合併

它的模型大體是這樣的:線程池中的每一個線程都有本身的工做隊列(PS:這一點和ThreadPoolExecutor不一樣,ThreadPoolExecutor是全部線程公用一個工做隊列,全部線程都從這個工做隊列中取任務),當本身隊列中的任務都完成之後,會從其它線程的工做隊列中偷一個任務執行,這樣能夠充分利用資源。api

工做竊取(work-stealing)

工做竊取(work-stealing)算法是指某個線程從其餘隊列裏竊取任務來執行。工做竊取的運行流程圖以下:數組

那麼爲何須要使用工做竊取算法呢?app

假如咱們須要作一個比較大的任務,咱們能夠把這個任務分割爲若干互不依賴的子任務,爲了減小線程間的競爭,因而把這些子任務分別放到不一樣的隊列裏,併爲每一個隊列建立一個單獨的線程來執行隊列裏的任務,線程和隊列一一對應,好比A線程負責處理A隊列裏的任務。可是有的線程會先把本身隊列裏的任務幹完,而其餘線程對應的隊列裏還有任務等待處理。幹完活的線程與其等着,不如去幫其餘線程幹活,因而它就去其餘線程的隊列裏竊取一個任務來執行。而在這時它們會訪問同一個隊列,因此爲了減小竊取任務線程和被竊取任務線程之間的競爭,一般會使用雙端隊列,被竊取任務線程永遠從雙端隊列的頭部拿任務執行,而竊取任務的線程永遠從雙端隊列的尾部拿任務執行。框架

工做竊取算法的優勢是充分利用線程進行並行計算,並減小了線程間的競爭,其缺點是在某些狀況下仍是存在競爭,好比雙端隊列裏只有一個任務時。而且消耗了更多的系統資源,好比建立多個線程和多個雙端隊列。dom

API介紹

ForkJoinPool

An ExecutorService for running ForkJoinTasks.異步

A ForkJoinPool differs from other kinds of ExecutorService mainly by virtue of employing work-stealing: all threads in the pool attempt to find and execute tasks submitted to the pool and/or created by other active tasks (eventually blocking waiting for work if none exist). This enables efficient processing when most tasks spawn other subtasks (as do most ForkJoinTasks), as well as when many small tasks are submitted to the pool from external clients. Especially when setting asyncMode to true in constructors, ForkJoinPools may also be appropriate for use with event-style tasks that are never joined.async

ForkJoinPool與其它的ExecutorService區別主要在於它使用「工做竊取」:線程池中的全部線程都企圖找到並執行提交給線程池的任務。當大量的任務產生子任務的時候,或者同時當有許多小任務被提交到線程池中的時候,這種處理是很是高效的。特別的,當在構造方法中設置asyncMode爲true的時候這種處理更加高效。

ForkJoinTask

ForkJoinTask表明運行在ForkJoinPool中的任務。

主要方法:

  • fork()    在當前線程運行的線程池中安排一個異步執行。簡單的理解就是再建立一個子任務。
  • join()    當任務完成的時候返回計算結果。
  • invoke()    開始執行任務,若是必要,等待計算完成。

子類:

  • RecursiveAction    一個遞歸無結果的ForkJoinTask(沒有返回值)
  • RecursiveTask    一個遞歸有結果的ForkJoinTask(有返回值)

ForkJoinWorkerThread

A thread managed by a ForkJoinPool, which executes ForkJoinTasks.

ForkJoinWorkerThread表明ForkJoinPool線程池中的一個執行任務的線程。

類圖

    

代碼分析

接下來,簡略的看一下關鍵代碼來加深對Fork/Join的理解。

ForkJoinPool

WorkQueue是一個ForkJoinPool中的內部類,它是線程池中線程的工做隊列的一個封裝,支持任務竊取。

什麼叫線程的任務竊取呢?就是說你和你的一個夥伴一塊兒吃水果,你的那份吃完了,他那份沒吃完,那你就偷偷的拿了他的一些水果吃了。存在執行2個任務的子線程,這裏要講成存在A,B兩個個WorkQueue在執行任務,A的任務執行完了,B的任務沒執行完,那麼A的WorkQueue就從B的WorkQueue的ForkJoinTask數組中拿走了一部分尾部的任務來執行,能夠合理的提升運行和計算效率。

submit()

能夠看到:

  1. 一樣是提交任務,submit會返回ForkJoinTask,而execute不會
  2. 任務提交給線程池之後,會將這個任務加入到當前提交者的任務隊列中。

前面咱們說過,每一個線程都有一個WorkQueue,而WorkQueue中有執行任務的線程(ForkJoinWorkerThread owner),還有這個線程須要處理的任務(ForkJoinTask<?>[] array)。那麼這個新提交的任務就是加到array中。

ForkJoinWorkerThread

從代碼中咱們能夠清楚地看到,ForkJoinWorkThread持有ForkJoinPool和ForkJoinPool.WorkQueue的引用,以代表該線程屬於哪一個線程池,它的工做隊列是哪一個

ForkJoinTask

fork()

能夠看到,若是是ForkJoinWorkerThread運行過程當中fork(),則直接加入到它的工做隊列中,不然,從新提交任務。

join()和invoke()

能夠看到它們都會等待計算完成

圖形化處理過程

下面盜兩張圖

 

使用示例

批量發送消息

1 package com.cjs.boot.demo;
 2 
 3 import java.util.ArrayList;
 4 import java.util.List;
 5 import java.util.concurrent.ForkJoinPool;
 6 import java.util.concurrent.RecursiveAction;
 7 import java.util.concurrent.TimeUnit;
 8 
 9 public class ForkJoinPoolDemo {
10 
11     class SendMsgTask extends RecursiveAction {
12 
13         private final int THRESHOLD = 10;
14 
15         private int start;
16         private int end;
17         private List<String> list;
18 
19         public SendMsgTask(int start, int end, List<String> list) {
20             this.start = start;
21             this.end = end;
22             this.list = list;
23         }
24 
25         @Override
26         protected void compute() {
27 
28             if ((end - start) <= THRESHOLD) {
29                 for (int i = start; i < end; i++) {
30                     System.out.println(Thread.currentThread().getName() + ": " + list.get(i));
31                 }
32             }else {
33                 int middle = (start + end) / 2;
34                 invokeAll(new SendMsgTask(start, middle, list), new SendMsgTask(middle, end, list));
35             }
36 
37         }
38 
39     }
40 
41     public static void main(String[] args) throws InterruptedException {
42         List<String> list = new ArrayList<>();
43         for (int i = 0; i < 123; i++) {
44             list.add(String.valueOf(i+1));
45         }
46 
47         ForkJoinPool pool = new ForkJoinPool();
48         pool.submit(new ForkJoinPoolDemo().new SendMsgTask(0, list.size(), list));
49         pool.awaitTermination(10, TimeUnit.SECONDS);
50         pool.shutdown();
51     }
52 
53 }

求和

1 package com.cjs.boot.demo;
 2 
 3 import java.util.concurrent.ExecutionException;
 4 import java.util.concurrent.ForkJoinPool;
 5 import java.util.concurrent.ForkJoinTask;
 6 import java.util.concurrent.RecursiveTask;
 7 
 8 public class ForkJoinTaskDemo {
 9 
10     private class SumTask extends RecursiveTask<Integer> {
11 
12         private static final int THRESHOLD = 20;
13 
14         private int arr[];
15         private int start;
16         private int end;
17 
18         public SumTask(int[] arr, int start, int end) {
19             this.arr = arr;
20             this.start = start;
21             this.end = end;
22         }
23 
24         /**
25          * 小計
26          */
27         private Integer subtotal() {
28             Integer sum = 0;
29             for (int i = start; i < end; i++) {
30                 sum += arr[i];
31             }
32             System.out.println(Thread.currentThread().getName() + ": ∑(" + start + "~" + end + ")=" + sum);
33             return sum;
34         }
35 
36         @Override
37         protected Integer compute() {
38 
39             if ((end - start) <= THRESHOLD) {
40                 return subtotal();
41             }else {
42                 int middle = (start + end) / 2;
43                 SumTask left = new SumTask(arr, start, middle);
44                 SumTask right = new SumTask(arr, middle, end);
45                 left.fork();
46                 right.fork();
47 
48                 return left.join() + right.join();
49             }
50         }
51     }
52 
53     public static void main(String[] args) throws ExecutionException, InterruptedException {
54         int[] arr = new int[100];
55         for (int i = 0; i < 100; i++) {
56             arr[i] = i + 1;
57         }
58 
59         ForkJoinPool pool = new ForkJoinPool();
60         ForkJoinTask<Integer> result = pool.submit(new ForkJoinTaskDemo().new SumTask(arr, 0, arr.length));
61         System.out.println("最終計算結果: " + result.invoke());
62         pool.shutdown();
63     }
64 
65 }
ForkJoinPool.commonPool-worker-2: ∑(50~62)=678
ForkJoinPool.commonPool-worker-2: ∑(62~75)=897
ForkJoinPool.commonPool-worker-2: ∑(75~87)=978
ForkJoinPool.commonPool-worker-2: ∑(87~100)=1222
ForkJoinPool-1-worker-1: ∑(0~12)=78
ForkJoinPool-1-worker-1: ∑(12~25)=247
ForkJoinPool-1-worker-1: ∑(25~37)=378
ForkJoinPool-1-worker-1: ∑(37~50)=572
ForkJoinPool-1-worker-2: ∑(75~87)=978
ForkJoinPool-1-worker-3: ∑(50~62)=678
ForkJoinPool-1-worker-5: ∑(62~75)=897
ForkJoinPool.commonPool-worker-7: ∑(0~12)=78
ForkJoinPool.commonPool-worker-3: ∑(37~50)=572
ForkJoinPool-1-worker-4: ∑(87~100)=1222
ForkJoinPool.commonPool-worker-2: ∑(25~37)=378
ForkJoinPool.commonPool-worker-5: ∑(12~25)=247
最終計算結果: 5050

api文檔中的兩個示例

1 package com.cjs.boot.demo;
 2 
 3 import java.util.Arrays;
 4 import java.util.concurrent.*;
 5 
 6 public class RecursiveActionDemo {
 7 
 8     private static class SortTask extends RecursiveAction {
 9 
10         static final int THRESHOLD = 100;
11 
12         final long[] array;
13         final int lo, hi;
14 
15         public SortTask(long[] array, int lo, int hi) {
16             this.array = array;
17             this.lo = lo;
18             this.hi = hi;
19         }
20 
21         public SortTask(long[] array) {
22             this(array, 0, array.length);
23         }
24 
25         public void sortSequentially(int lo, int hi) {
26             Arrays.sort(array, lo, hi);
27         }
28 
29         public void merge(int lo, int mid, int hi) {
30             long[] buf = Arrays.copyOfRange(array, lo, mid);
31             for (int i = 0, j = lo, k = mid; i < buf.length; j++) {
32                 array[j] = (k == hi || buf[i] < array[k]) ? buf[i++] : array[k++];
33             }
34         }
35 
36         @Override
37         protected void compute() {
38             if (hi - lo < THRESHOLD) {
39                 sortSequentially(lo, hi);
40             }else {
41                 int mid = (lo + hi) >>> 1;
42                 invokeAll(new SortTask(array, lo, mid), new SortTask(array, mid, hi));
43                 merge(lo, mid, hi);
44             }
45         }
46     }
47 
48     public static void main(String[] args) throws ExecutionException, InterruptedException {
49         long[] array = new long[120];
50         for (int i = 0; i < array.length; i++) {
51             array[i] = (long) (Math.random() * 1000);
52         }
53         System.out.println(Arrays.toString(array));
54 
55         ForkJoinPool pool = new ForkJoinPool();
56         pool.submit(new SortTask(array));
57         pool.awaitTermination(5, TimeUnit.SECONDS);
58         pool.shutdown();
59 
60     }
61 
62 }
1 package com.cjs.boot.demo;
 2 
 3 import java.util.concurrent.*;
 4 
 5 public class RecursiveTaskDemo {
 6 
 7     private static class Fibonacci extends RecursiveTask<Integer> {
 8 
 9         final int n;
10 
11         public Fibonacci(int n) {
12             this.n = n;
13         }
14 
15         @Override
16         protected Integer compute() {
17             if (n <= 1) {
18                 return n;
19             }else {
20                 Fibonacci f1 = new Fibonacci(n - 1);
21                 f1.fork();
22                 Fibonacci f2 = new Fibonacci(n - 1);
23                 return f2.compute() + f1.join();
24             }
25         }
26     }
27 
28     public static void main(String[] args) throws InterruptedException, ExecutionException {
29         ForkJoinPool pool = new ForkJoinPool();
30         Future<Integer> future = pool.submit(new Fibonacci(10));
31         System.out.println(future.get());
32         pool.shutdown();
33     }
34 
35 }

參考

http://gee.cs.oswego.edu/dl/papers/fj.pdf

http://ifeve.com/talk-concurrency-forkjoin/

https://www.cnblogs.com/senlinyang/p/7885964.html

https://blog.csdn.net/u012403290/article/details/70917810

相關文章
相關標籤/搜索