Fork/join主要是Java7提供的一個並行執行任務的框架,Fork就是把一個大任務切分爲諾乾子任務並行的執行,Join就是合併這些子任務的執行結果,最後獲得大任務的結果。數組
若是1+2+3+……+10000 能夠分割成10個子任務,每一個子任務分別對1000個數進行求和,最終彙總這10個子任務的結果。bash
需求:計算1+2+3+4框架
使用Fork、Join框架首先要考慮到時如何分割任務,若是但願每一個子任務最多執行兩個數的相加,那麼咱們設置分割的閾值是2,因爲4個數字相加,因此這個框架會把這個任務fork成兩個子任務,子任務一負責計算1+2,子任務二負責計算3+4,而後再join兩個子任務的結果,由於是有結果的任務,因此必須繼承RecursiveTask,具體實現代碼以下:異步
public class CountTask extends RecursiveTask<Integer> {
private static final int THRESHOLD = 2;
private int start;
private int end;
public CountTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int sum = 0;
//若是任務足夠小就計算任務
boolean canCompute = (end - start) <= THRESHOLD;
if (canCompute) {
for (int i = start; i <= end; i++) {
sum += i;
}
} else {
//若是任務大於閾值,就分裂成兩個子任務計算
int middle = (start + end) / 2;
CountTask leftTask = new CountTask(start, middle);
CountTask rightTask = new CountTask(middle + 1, end);
//執行子任務
leftTask.fork();
rightTask.fork();
//等待子任務執行完,並獲得其結果
int leftResult = leftTask.join();
int rightResult = rightTask.join();
//合併子任務
sum = leftResult + rightResult;
}
return sum;
}
}複製代碼
public static void main(String[] strs) {
ForkJoinPool forkJoinPool = new ForkJoinPool();
//生成一個計算任務,負責計算1+2+3+4
CountTask task = new CountTask(1, 4);
//執行一個任務
Future<Integer> result = forkJoinPool.submit(task);
try {
System.out.println(result.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
if(task.isCompletedAbnormally()){
System.out.println(task.getException());
}
}複製代碼
經過這個例子,咱們進一步瞭解了ForkJoinTask,ForkJoinTask與通常任務的主要區別在於它須要實現compute方法,在這個方法裏,首先須要判斷任務是否足夠小,若是足夠小就直接執行任務,若是不足夠小,就必須分割成兩個子任務,每一個子任務在調用fork方法時,又會進入compute方法,看看當前子任務是否須要繼續分割成子任務,若是不須要繼續分割,則執行當前子任務並返回結果,使用join方法會等待子任務執行完並獲得其結果。ide
ForkJoinTask在執行任務的時候可能會拋出異常,可是咱們沒辦法在主線程裏直接捕獲異常,因此ForkJoinTask提供了isCompletedAbnormally()方法來檢查任務是否已經拋出異常或已經被取消了,而且能夠經過ForkJoinTask的getException方法捕獲異常ui
ForkJoinPool由ForkJoinTask數組和ForkJoinWorkerThread數組組成,ForkJoinTask數組負責將存放程序提交給ForkJoinPool的任務,而ForkJoinWorkerThread數組負責執行這些任務。this
(1).ForkJoinTask的fork方法實現原理spa
當咱們調用ForkJoinTask的fork方法時,程序會調用ForkJoinWorkerThread的pushTask方法異步的執行這個任務,而後當即返回結果,代碼以下:線程
public final ForkJoinTask<V> fork() {
((ForkJoinWorkerThread) Thread.currentThread())
.pushTask(this);
return this;
}複製代碼
pushTask方法把當前任務存放在ForkJoinTask數組隊列裏。而後再調用ForkJoinPool的signalWork()方法喚醒或建立一個工做線程來執行任務,代碼以下:code
final void pushTask(ForkJoinTask<?> t) {
ForkJoinTask<?>[] q; int s, m;
if ((q = queue) != null) { // ignore if queue removed
long u = (((s = queueTop) & (m = q.length - 1)) << ASHIFT) + ABASE;
UNSAFE.putOrderedObject(q, u, t);
queueTop = s + 1; // or use putOrderedInt
if ((s -= queueBase) <= 2)
pool.signalWork();
else if (s == m)
growQueue();
}
}複製代碼
(2).ForkJoinTask的join方法實現原理
Join方法的主要做用是阻塞當前線程並等待獲取結果。讓咱們一塊兒看看ForkJoinTask的join方法的實現,代碼以下:
public final V join() {
if (doJoin() != NORMAL)
return reportResult();
else
return getRawResult();
}複製代碼
private V reportResult() {
int s; Throwable ex;
if ((s = status) == CANCELLED)
throw new CancellationException();
if (s == EXCEPTIONAL && (ex = getThrowableException()) != null)
UNSAFE.throwException(ex);
return getRawResult();
}複製代碼
首先,它調用了doJoin方法,經過doJoin()方法獲得當前任務的狀態來判斷返回什麼結果,任務狀態有4種:已完成(NORMAL)、被取消(CANCELLED)、信號(SIGNAL)和出現異常(EXCEPTIONAL).
①:若是任務狀態是已完成,則直接返回任務結果
②:若是任務狀態是被取消,則直接拋出CancellationException
③:若是任務狀態是拋出異常,則直接拋出對應的異常
doJoin方法代碼以下:
private int doJoin() {
Thread t; ForkJoinWorkerThread w; int s; boolean completed;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
if ((s = status) < 0)
return s;
if ((w = (ForkJoinWorkerThread)t).unpushTask(this)) {
try {
completed = exec();
} catch (Throwable rex) {
return setExceptionalCompletion(rex);
}
if (completed)
return setCompletion(NORMAL);
}
return w.joinTask(this);
}
else
return externalAwaitDone();
}複製代碼
在doJoin()方法裏,首先經過查看任務的狀態,看任務是否已經執行完成,若是執行完成,則直接返回任務狀態,若是沒有執行完,則從任務數組裏取出任務並執行,若是任務順利執行完成,則設置任務狀態爲NORMAL,若是出現異常,則記錄異常,並將任務狀態設置爲EXCEPTION.