多線程編程學習七( Fork/Join 框架).

1、介紹

使用 java8 lambda 表達式大半年了,一直都知道底層使用的是 Fork/Join 框架,今天終於有機會來學學 Fork/Join 框架了。java

Fork/Join 框架是 Java 7 提供的一個用於並行執行任務的框架,是一個把大任務分割成若干個小任務,最終彙總每一個小任務結果後獲得大任務結果的框架。面試

Fork/Join 的運行流程示意圖:
算法

好比,一個 1+2+3+...+100 的工做任務,咱們能夠把它 Fork 成 10 個子任務,分別計算這 10 個子任務的運行結果。最後再把 10 個子任務的結果 Join 起來,彙總成最後的結果。數組

爲了減小線程間的競爭,一般把這些子任務分別放到不一樣的隊列裏,併爲每一個隊列建立一個單獨的線程來執行隊列裏的任務,線程和隊列一一對應。可是,有的線程會先把本身隊列裏的任務幹完,而其餘線程對應的隊列裏還有任務等待處理。幹完活的線程與其等着,不如去幫其它線程幹活,因而它就去其餘線程的隊列裏竊取一個任務來執行。而在這時它們會訪問同一個隊列,因此爲了減小竊取任務線程和被竊取任務線程之間的競爭,一般會使用雙端隊列,被竊取任務線程永遠從雙端隊列的頭部拿任務執行,而竊取任務的線程永遠從雙端隊列的尾部拿任務執行。線程的這種執行方式,咱們稱之爲「工做竊取」算法。併發

2、設計

實現 Fork/Join 框架的設計,大抵須要兩步:框架

1. 分割任務

首先咱們須要建立一個 ForkJoin 任務,把大任務分割成子任務,若是子任務不夠小,則繼續往下分,直到分割出的子任務足夠小。dom

在 Java 中咱們可使用 ForkJoinTask 類,它提供在任務中執行 fork() 和 join() 操做的機制,一般狀況下,咱們只須要繼承它的子類:異步

  • RecursiveAction — 用於沒有返回結果的任務
  • RecursiveTask — 用於有返回結果的任務

2. 任務執行並返回結果

分割的子任務分別放在雙端隊列裏,而後啓動幾個線程分別從雙端隊列裏獲取任務執行。子任務執行完的結果都統一放在一個隊列裏,啓動一個線程從隊列裏拿數據,而後合併這些數據。ide

在 Java 中任務的執行須要經過 ForkJoinPool 來執行。性能

3、示例

來一個阿里面試題:百萬級 Integer 數據量的一個 array 求和。

public class ArrayCountTask extends RecursiveTask<Long> {
    /**
     * 閾值
     */
    private static final Integer THRESHOLD = 10000;

    private Integer[] array;
    private Integer start;
    private Integer end;

    public ArrayCountTask(Integer[] array, Integer start, Integer end) {
        this.array = array;
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        long sum = 0;
        // 最小子任務計算
        if (end - start <= THRESHOLD) {
            for (int i = start; i < end; i++) {
                sum += array[i];
            }
        } else {
            // 把大於閾值的任務繼續往下拆分,有點相似遞歸的思惟。 recursive 就是遞歸的意思。
            int middle = (start + end) >>> 1;
            ArrayCountTask leftArrayCountTask = new ArrayCountTask(array, start, middle);
            ArrayCountTask rightArrayCountTask = new ArrayCountTask(array, middle, end);
            // 執行子任務
            //leftArrayCountTask.fork();
            //rightArrayCountTask.fork();

            // invokeAll 方法使用
            invokeAll(leftArrayCountTask, rightArrayCountTask);

            //等待子任務執行完,並獲得其結果
            Long leftJoin = leftArrayCountTask.join();
            Long rightJoin = rightArrayCountTask.join();
            // 合併子任務的結果
            sum = leftJoin + rightJoin;
        }

        return sum;
    }
}
public static void main(String[] args) {
        // 1. 造一個 int 類型的百萬級別數組
        Integer[] array = new Integer[150000000];
        for (int i = 0; i < array.length; i++) {
            array[i] = new Random().nextInt(100);
        }
        // 2.普通方式計算結果
        long start = System.currentTimeMillis();
        long sum = 0;
        for (int i = 0; i < array.length; i++) {
            sum += array[i];
        }
        long end = System.currentTimeMillis();
        System.out.println("普通方式計算結果:" + sum + ",耗時:" + (end - start));
        long start2 = System.currentTimeMillis();
        // 3.fork/join 框架方式計算結果
        ArrayCountTask arrayCountTask = new ArrayCountTask(array, 0, array.length);
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        sum = forkJoinPool.invoke(arrayCountTask);
        long end2 = System.currentTimeMillis();
        System.out.println("fork/join 框架方式計算結果:" + sum + ",耗時:" + (end2 - start2));

        // 結論:
        // 1. 電腦 i5-4300m,雙核四線程
        // 2. 數組量少的時候,fork/join 框架要進行線程建立/切換的操做,性能不明顯。
        // 3. 數組量超過 100000000,fork/join 框架的性能纔開始體現。

    }

ForkJoinTask 與通常任務的主要區別在於它須要實現 compute 方法,在這個方法裏,首先須要判斷任務是否足夠小,若是足夠小就直接執行任務。若是不足夠小,就必須分割成兩個子任務,每一個子任務在調用 fork 方法時,又會進入 compute 方法,看看當前子任務是否須要繼續分割成子任務,若是不須要繼續分割,則執行當前子任務並返回結果。使用 join 方法會等待子任務執行完並獲得其結果。

在執行子任務時調用 fork 方法並非最佳的選擇,最佳的選擇是 invokeAll 方法。由於執行 compute() 方法的線程自己也是一個 worker 線程,當對兩個子任務調用 fork() 時,這個worker 線程就會把任務分配給另外兩個 worker,可是它本身卻停下來等待不幹活了!這樣就白白浪費了 Fork/Join 線程池中的一個 worker 線程,致使了4個子任務至少須要7個線程才能併發執行。

好比甲把 400 分紅兩個 200 後,fork() 寫法至關於甲把一個 200 分給乙,把另外一個 200 分給丙,而後,甲成了監工,不幹活,等乙和丙幹完了他直接彙報工做。乙和丙在把 200 分拆成兩個 100 的過程當中,他倆又成了監工,這樣,原本只須要 4 個工人的活,如今須要 7 個工人才能完成,其中有3個是不幹活的。

 

ForkJoinPool 由 ForkJoinTask 數組和 ForkJoinWorkerThread 數組組成。ForkJoinTask 數組負責將存放程序提交給 ForkJoinPool 的任務;而 ForkJoinWorkerThread 數組負責執行這些任務,ForkJoinWorkerThread 體現的就是「工做竊取」算法。

  • 當咱們調用 ForkJoinTask 的 fork 方法時,程序會調用 ForkJoinWorkerThread 的 pushTask 方法異步地執行這個任務,而後當即返回結果。
  • 當咱們調用 ForkJoinTask 的 join 方法時,程序會阻塞當前線程並等待獲取結果。

ForkJoinPool 使用 submit 或 invoke 提交的區別:invoke 同步執行,調用以後須要等待任務完成,才能執行後面的代碼;submit 是異步執行,只有在 Future 調用 get 的時候會阻塞。

ForkJoinPool 繼承自 AbstractExecutorService, 不是爲了替代 ExecutorService,而是它的補充,在某些應用場景下性能比 ExecutorService 更好。

相關文章
相關標籤/搜索