Fork/Join 框架詳解(基於 JDK 8)

概述

Fork 就是把一個大任務切分爲若干個子任務並行地執行,Join 就是合併這些子任務的執行結果,最後獲得這個大任務的結果。Fork/Join 框架使用的是工做竊取算法。java

工做竊取算法

工做竊取算法是指某個線程從其餘隊列裏竊取任務來執行。對於一個比較大的任務,能夠把它分割爲若干個互不依賴的子任務,爲了減小線程間的競爭,把這些子任務分別放到不一樣的隊列裏,併爲每一個隊列建立一個單獨的線程來執行隊列裏的任務,線程和隊列一一對應。可是,有的線程會先把本身隊列裏的任務幹完,而其餘線程對應的隊列裏還有任務須要處理,因而它就去其餘線程的隊列裏竊取一個任務來執行。因爲此時它們訪問同一個隊列,爲了減少競爭,一般會使用雙端隊列。被竊取任務的線程永遠從雙端隊列的頭部獲取任務,竊取任務的線程永遠從雙端隊列的尾部獲取任務。算法

工做竊取算法的優缺點

優勢:充分利用線程進行並行計算,減小了線程間的競爭。
缺點:雙端隊列只存在一個任務時會致使競爭,會消耗更多的系統資源,由於須要建立多個線程和多個雙端隊列。編程

Fork/Join 框架的異常處理

ForkJoinTask 在執行的時候可能拋出異常,但沒有辦法在主線程中直接捕獲異常,因此 ForkJoinTask 提供了 isCompletedAbnormally() 方法檢查任務是否已經拋出異常或已經被取消。getException() 方法返回 Throwable 對象,若是任務被取消了則返回 CancellationException,若是任務沒有完成或者沒有拋出異常則返回 null數組

Fork/Join 框架的實現原理

fork() 方法的實現原理

當調用 ForkJoinTask 的 fork() 方法時,程序會調用 ForkJoinPool.WorkQueuepush() 方法異步地執行這個任務,而後當即返回結果。代碼以下:併發

public final ForkJoinTask<V> fork() {
    Thread t;
    if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
        ((ForkJoinWorkerThread)t).workQueue.push(this);
    else
        ForkJoinPool.common.externalPush(this);
    return this;
}

push() 方法把當前任務存放在一個 ForkJoinTask 數組隊列裏,而後再調用 ForkJoinPoolsignalWork() 方法喚醒或建立一個工做線程來執行任務。代碼以下:框架

final void push(ForkJoinTask<?> task) {
    ForkJoinTask<?>[] a; ForkJoinPool p;
    int b = base, s = top, n;
    if ((a = array) != null) {    // ignore if queue removed
        int m = a.length - 1;     // fenced write for task visibility
        U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
        U.putOrderedInt(this, QTOP, s + 1);
        if ((n = s - b) <= 1) {
            if ((p = pool) != null)
                p.signalWork(p.workQueues, this);
        }
        else if (n >= m)
            growArray();
    }
}

join() 方法的實現原理

當調用 ForkJoinTask 的 join() 方法時,程序會調用 doJoin() 方法,經過 doJoin() 方法來判斷返回什麼結果異步

public final V join() {
    int s;
    if ((s = doJoin() & DONE_MASK) != NORMAL)
        reportException(s);
    return getRawResult();
}
private int doJoin() {
    int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
    return (s = status) < 0 ? s :
        ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
        (w = (wt = (ForkJoinWorkerThread)t).workQueue).
        tryUnpush(this) && (s = doExec()) < 0 ? s :
        wt.pool.awaitJoin(w, this, 0L) :
        externalAwaitDone();
}
private void reportException(int s) {
    if (s == CANCELLED)
        throw new CancellationException();
    if (s == EXCEPTIONAL)
        rethrow(getThrowableException());
}
public abstract V getRawResult();

實例代碼:ide

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;

public class CountTask extends RecursiveTask<Integer> {
    private static final int THRESHOLD = 2;
    private int start;
    private int end;

    public CountTask(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        int sum = 0;
        boolean canCompute = (end - start) <= THRESHOLD;
        if (canCompute) { // 若是任務足夠小,就計算任務
            for (int i = start; i <= end; i++) {
                sum += i;
            }
        } else { // 若是任務大於閾值,分裂成兩個子任務執行
            int middle = (start + end) / 2;
            CountTask leftTask = new CountTask(start, middle);
            CountTask rightTask = new CountTask(middle + 1, end);

            // 執行子任務
            leftTask.fork();
            rightTask.fork();

            // 等待子任務執行完,並獲得其結果
            int leftResult = leftTask.join();
            int rightResult = rightTask.join();

            // 合併子任務
            sum = leftResult + rightResult;
        }
        return sum;
    }

    public static void main(String[] args) {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        CountTask countTask = new CountTask(1, 100);
        peekNextLocalTask();
        Future<Integer> result = forkJoinPool.submit(countTask);
        try {
            if (countTask.isCompletedAbnormally()) {
                System.out.println(countTask.getException());
            }
            System.out.println(result.get());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
}

參考資料
Java 併發編程的藝術this

相關文章
相關標籤/搜索