Fork 就是把一個大任務切分爲若干個子任務並行地執行,Join 就是合併這些子任務的執行結果,最後獲得這個大任務的結果。Fork/Join 框架使用的是工做竊取算法。java
工做竊取算法是指某個線程從其餘隊列裏竊取任務來執行。對於一個比較大的任務,能夠把它分割爲若干個互不依賴的子任務,爲了減小線程間的競爭,把這些子任務分別放到不一樣的隊列裏,併爲每一個隊列建立一個單獨的線程來執行隊列裏的任務,線程和隊列一一對應。可是,有的線程會先把本身隊列裏的任務幹完,而其餘線程對應的隊列裏還有任務須要處理,因而它就去其餘線程的隊列裏竊取一個任務來執行。因爲此時它們訪問同一個隊列,爲了減少競爭,一般會使用雙端隊列。被竊取任務的線程永遠從雙端隊列的頭部獲取任務,竊取任務的線程永遠從雙端隊列的尾部獲取任務。算法
優勢:充分利用線程進行並行計算,減小了線程間的競爭。
缺點:雙端隊列只存在一個任務時會致使競爭,會消耗更多的系統資源,由於須要建立多個線程和多個雙端隊列。編程
ForkJoinTask 在執行的時候可能拋出異常,但沒有辦法在主線程中直接捕獲異常,因此 ForkJoinTask 提供了 isCompletedAbnormally()
方法檢查任務是否已經拋出異常或已經被取消。getException()
方法返回 Throwable
對象,若是任務被取消了則返回 CancellationException
,若是任務沒有完成或者沒有拋出異常則返回 null
。數組
當調用 ForkJoinTask 的 fork() 方法時,程序會調用 ForkJoinPool.WorkQueue
的 push()
方法異步地執行這個任務,而後當即返回結果。代碼以下:併發
public final ForkJoinTask<V> fork() { Thread t; if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ((ForkJoinWorkerThread)t).workQueue.push(this); else ForkJoinPool.common.externalPush(this); return this; }
push() 方法把當前任務存放在一個 ForkJoinTask 數組隊列裏,而後再調用 ForkJoinPool
的 signalWork()
方法喚醒或建立一個工做線程來執行任務。代碼以下:框架
final void push(ForkJoinTask<?> task) { ForkJoinTask<?>[] a; ForkJoinPool p; int b = base, s = top, n; if ((a = array) != null) { // ignore if queue removed int m = a.length - 1; // fenced write for task visibility U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task); U.putOrderedInt(this, QTOP, s + 1); if ((n = s - b) <= 1) { if ((p = pool) != null) p.signalWork(p.workQueues, this); } else if (n >= m) growArray(); } }
當調用 ForkJoinTask 的 join() 方法時,程序會調用 doJoin()
方法,經過 doJoin() 方法來判斷返回什麼結果異步
public final V join() { int s; if ((s = doJoin() & DONE_MASK) != NORMAL) reportException(s); return getRawResult(); } private int doJoin() { int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w; return (s = status) < 0 ? s : ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? (w = (wt = (ForkJoinWorkerThread)t).workQueue). tryUnpush(this) && (s = doExec()) < 0 ? s : wt.pool.awaitJoin(w, this, 0L) : externalAwaitDone(); } private void reportException(int s) { if (s == CANCELLED) throw new CancellationException(); if (s == EXCEPTIONAL) rethrow(getThrowableException()); } public abstract V getRawResult();
實例代碼:ide
import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; import java.util.concurrent.RecursiveTask; public class CountTask extends RecursiveTask<Integer> { private static final int THRESHOLD = 2; private int start; private int end; public CountTask(int start, int end) { this.start = start; this.end = end; } @Override protected Integer compute() { int sum = 0; boolean canCompute = (end - start) <= THRESHOLD; if (canCompute) { // 若是任務足夠小,就計算任務 for (int i = start; i <= end; i++) { sum += i; } } else { // 若是任務大於閾值,分裂成兩個子任務執行 int middle = (start + end) / 2; CountTask leftTask = new CountTask(start, middle); CountTask rightTask = new CountTask(middle + 1, end); // 執行子任務 leftTask.fork(); rightTask.fork(); // 等待子任務執行完,並獲得其結果 int leftResult = leftTask.join(); int rightResult = rightTask.join(); // 合併子任務 sum = leftResult + rightResult; } return sum; } public static void main(String[] args) { ForkJoinPool forkJoinPool = new ForkJoinPool(); CountTask countTask = new CountTask(1, 100); peekNextLocalTask(); Future<Integer> result = forkJoinPool.submit(countTask); try { if (countTask.isCompletedAbnormally()) { System.out.println(countTask.getException()); } System.out.println(result.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } }
參考資料
Java 併發編程的藝術this