使用 java8 lambda 表達式大半年了,一直都知道底層使用的是 Fork/Join 框架,今天終於有機會來學學 Fork/Join 框架了。java
Fork/Join 框架是 Java 7 提供的一個用於並行執行任務的框架,是一個把大任務分割成若干個小任務,最終彙總每一個小任務結果後獲得大任務結果的框架。面試
Fork/Join 的運行流程示意圖:
算法
好比,一個 1+2+3+...+100 的工做任務,咱們能夠把它 Fork 成 10 個子任務,分別計算這 10 個子任務的運行結果。最後再把 10 個子任務的結果 Join 起來,彙總成最後的結果。數組
爲了減小線程間的競爭,一般把這些子任務分別放到不一樣的隊列裏,併爲每一個隊列建立一個單獨的線程來執行隊列裏的任務,線程和隊列一一對應。可是,有的線程會先把本身隊列裏的任務幹完,而其餘線程對應的隊列裏還有任務等待處理。幹完活的線程與其等着,不如去幫其它線程幹活,因而它就去其餘線程的隊列裏竊取一個任務來執行。而在這時它們會訪問同一個隊列,因此爲了減小竊取任務線程和被竊取任務線程之間的競爭,一般會使用雙端隊列,被竊取任務線程永遠從雙端隊列的頭部拿任務執行,而竊取任務的線程永遠從雙端隊列的尾部拿任務執行。線程的這種執行方式,咱們稱之爲「工做竊取」算法。併發
實現 Fork/Join 框架的設計,大抵須要兩步:框架
首先咱們須要建立一個 ForkJoin 任務,把大任務分割成子任務,若是子任務不夠小,則繼續往下分,直到分割出的子任務足夠小。dom
在 Java 中咱們可使用 ForkJoinTask 類,它提供在任務中執行 fork() 和 join() 操做的機制,一般狀況下,咱們只須要繼承它的子類:異步
分割的子任務分別放在雙端隊列裏,而後啓動幾個線程分別從雙端隊列裏獲取任務執行。子任務執行完的結果都統一放在一個隊列裏,啓動一個線程從隊列裏拿數據,而後合併這些數據。ide
在 Java 中任務的執行須要經過 ForkJoinPool 來執行。性能
來一個阿里面試題:百萬級 Integer 數據量的一個 array 求和。
public class ArrayCountTask extends RecursiveTask<Long> { /** * 閾值 */ private static final Integer THRESHOLD = 10000; private Integer[] array; private Integer start; private Integer end; public ArrayCountTask(Integer[] array, Integer start, Integer end) { this.array = array; this.start = start; this.end = end; } @Override protected Long compute() { long sum = 0; // 最小子任務計算 if (end - start <= THRESHOLD) { for (int i = start; i < end; i++) { sum += array[i]; } } else { // 把大於閾值的任務繼續往下拆分,有點相似遞歸的思惟。 recursive 就是遞歸的意思。 int middle = (start + end) >>> 1; ArrayCountTask leftArrayCountTask = new ArrayCountTask(array, start, middle); ArrayCountTask rightArrayCountTask = new ArrayCountTask(array, middle, end); // 執行子任務 //leftArrayCountTask.fork(); //rightArrayCountTask.fork(); // invokeAll 方法使用 invokeAll(leftArrayCountTask, rightArrayCountTask); //等待子任務執行完,並獲得其結果 Long leftJoin = leftArrayCountTask.join(); Long rightJoin = rightArrayCountTask.join(); // 合併子任務的結果 sum = leftJoin + rightJoin; } return sum; } }
public static void main(String[] args) { // 1. 造一個 int 類型的百萬級別數組 Integer[] array = new Integer[150000000]; for (int i = 0; i < array.length; i++) { array[i] = new Random().nextInt(100); } // 2.普通方式計算結果 long start = System.currentTimeMillis(); long sum = 0; for (int i = 0; i < array.length; i++) { sum += array[i]; } long end = System.currentTimeMillis(); System.out.println("普通方式計算結果:" + sum + ",耗時:" + (end - start)); long start2 = System.currentTimeMillis(); // 3.fork/join 框架方式計算結果 ArrayCountTask arrayCountTask = new ArrayCountTask(array, 0, array.length); ForkJoinPool forkJoinPool = new ForkJoinPool(); sum = forkJoinPool.invoke(arrayCountTask); long end2 = System.currentTimeMillis(); System.out.println("fork/join 框架方式計算結果:" + sum + ",耗時:" + (end2 - start2)); // 結論: // 1. 電腦 i5-4300m,雙核四線程 // 2. 數組量少的時候,fork/join 框架要進行線程建立/切換的操做,性能不明顯。 // 3. 數組量超過 100000000,fork/join 框架的性能纔開始體現。 }
ForkJoinTask 與通常任務的主要區別在於它須要實現 compute 方法,在這個方法裏,首先須要判斷任務是否足夠小,若是足夠小就直接執行任務。若是不足夠小,就必須分割成兩個子任務,每一個子任務在調用 fork 方法時,又會進入 compute 方法,看看當前子任務是否須要繼續分割成子任務,若是不須要繼續分割,則執行當前子任務並返回結果。使用 join 方法會等待子任務執行完並獲得其結果。
在執行子任務時調用 fork 方法並非最佳的選擇,最佳的選擇是 invokeAll 方法。由於執行 compute() 方法的線程自己也是一個 worker 線程,當對兩個子任務調用 fork() 時,這個worker 線程就會把任務分配給另外兩個 worker,可是它本身卻停下來等待不幹活了!這樣就白白浪費了 Fork/Join 線程池中的一個 worker 線程,致使了4個子任務至少須要7個線程才能併發執行。
好比甲把 400 分紅兩個 200 後,fork() 寫法至關於甲把一個 200 分給乙,把另外一個 200 分給丙,而後,甲成了監工,不幹活,等乙和丙幹完了他直接彙報工做。乙和丙在把 200 分拆成兩個 100 的過程當中,他倆又成了監工,這樣,原本只須要 4 個工人的活,如今須要 7 個工人才能完成,其中有3個是不幹活的。
ForkJoinPool 由 ForkJoinTask 數組和 ForkJoinWorkerThread 數組組成。ForkJoinTask 數組負責將存放程序提交給 ForkJoinPool 的任務;而 ForkJoinWorkerThread 數組負責執行這些任務,ForkJoinWorkerThread 體現的就是「工做竊取」算法。
ForkJoinPool 使用 submit 或 invoke 提交的區別:invoke 同步執行,調用以後須要等待任務完成,才能執行後面的代碼;submit 是異步執行,只有在 Future 調用 get 的時候會阻塞。
ForkJoinPool 繼承自 AbstractExecutorService, 不是爲了替代 ExecutorService,而是它的補充,在某些應用場景下性能比 ExecutorService 更好。