在ForkJoinPool中,有invoke,submit,execute三個接口能夠提交任務運行,這三個接口有什麼區別?分別適用什麼場景?less
綜述:
當前線程是FJ工做線程且屬於當前FJPool,execute,submit方法會調用forkOrSubmit將task加入到工做線程的任務隊列,而invoke直接調用FJTask的exec()方法;
當前線程非FJ工做線程或者不屬於當前FJPool,則最終都是調用 addSubmission(ForkJoinTask<?>) 將任務加入FJPool的任務隊列,並分配一個FJ工做線程。
其中,submit,invoke有返回值(返回指派的任務的計算結果),而execute沒有返回值。dom
1、execute
提交任務後,任務將被異步執行。它有2個實現,可將Runnable接口封裝成 ForkJoinTask<?> 使用。異步
public void execute(ForkJoinTask<?> task) {
if (task == null)
throw new NullPointerException();
forkOrSubmit(task);
}ide
public void execute(Runnable task) {
if (task == null)
throw new NullPointerException();
ForkJoinTask<?> job;
if (task instanceof ForkJoinTask<?>) // avoid re-wrap
job = (ForkJoinTask<?>) task;
else
job = ForkJoinTask.adapt(task, null); /*對Runnable接口進行封裝*/
forkOrSubmit(job);
}oop
/**
* Unless terminating, forks task if within an ongoing FJ
* computation in the current pool, else submits as external task.
*/
private <T> void forkOrSubmit(ForkJoinTask<T> task) {
ForkJoinWorkerThread w;
Thread t = Thread.currentThread();
if (shutdown)
throw new RejectedExecutionException();
if ((t instanceof ForkJoinWorkerThread) &&
(w = (ForkJoinWorkerThread)t).pool == this)
w.pushTask(task);
/*若是當前調用該方法的線程是FJ工做線程,且屬於當前FJPool,則將任務加入工做線程的執行隊列;
這裏與 ForkJoinTask.fork()相似,fork不判斷是否屬於當前FJPool*/
else
addSubmission(task); /*不然,加到提交任務隊列,等待分配工做線程來執行*/
}ui
2、submit
提交一個任務,其本質與execute相同,區別是execute無返回值,submit會將提交給FJPool的ForkJoinTask<?> 返回給調用者;
由於調用的都是 forkOrSubmit 方法,所以submit的4個實現的實質是同樣的。this
/**
* @throws NullPointerException if the task is null
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
*/
public <T> ForkJoinTask<T> submit(Callable<T> task) { /*Callable的返回值爲泛型T,做爲封裝後的ForkJoinTask<T>的返回值*/
if (task == null)
throw new NullPointerException();
ForkJoinTask<T> job = ForkJoinTask.adapt(task);
forkOrSubmit(job);
return job;
}idea
/**
* @throws NullPointerException if the task is null
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
*/
public <T> ForkJoinTask<T> submit(Runnable task, T result) { /*Runnable不支持返回值,須要使用單獨的參數指定返回值*/
if (task == null)
throw new NullPointerException();
ForkJoinTask<T> job = ForkJoinTask.adapt(task, result);
forkOrSubmit(job);
return job;
}.net
3、invoke線程
public <T> T invoke(ForkJoinTask<T> task) {
Thread t = Thread.currentThread();
if (task == null)
throw new NullPointerException();
if (shutdown)
throw new RejectedExecutionException();
if ((t instanceof ForkJoinWorkerThread) &&
((ForkJoinWorkerThread)t).pool == this) /*若是是當前FJPool的工做線程,則執行task,task.invoke將調用ForkJoinTask的exec()抽象方法,調用compute()方法*/
return task.invoke(); // bypass submit if in same pool
else {
addSubmission(task); /*將任務提交到隊列,分配一個工做線程給它*/
return task.join();
}
}
/**
* Enqueues the given task in the submissionQueue. Same idea as
* ForkJoinWorkerThread.pushTask except for use of submissionLock.
*
* @param t the task
*/
private void addSubmission(ForkJoinTask<?> t) {
final ReentrantLock lock = this.submissionLock;
lock.lock();
try {
ForkJoinTask<?>[] q; int s, m;
if ((q = submissionQueue) != null) { // ignore if queue removed
long u = (((s = queueTop) & (m = q.length-1)) << ASHIFT)+ABASE;
UNSAFE.putOrderedObject(q, u, t);
queueTop = s + 1;
if (s - queueBase == m)
growSubmissionQueue();
}
} finally {
lock.unlock();
}
signalWork(); /*Wakes up or creare a worker*/
}
4、其它接口
ForkJoinPool:
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
【輸入一組Callable對象,執行後返回一組Future<?>】
ForkJoinTask:
public static <T extends ForkJoinTask<?>> Collection<T> invokeAll(Collection<T> tasks)
【輸入一組ForkJoinTask<?>對象,返回一組一樣的對象】
public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2) {
t2.fork();
t1.invoke();
t2.join();
}
【將t2加入任務隊列,當即執行t1,並等待t2執行完畢】
public static void invokeAll(ForkJoinTask<?>... tasks)
【可變長入參個數,將任務加入到隊列,待執行(異步)】
問題:
一、工做線程建立/分配後,如何獲取待執行的任務?
public void run() {
Throwable exception = null;
try {
onStart();
pool.work(this); /*調用pool的work接口,將當前工做線程對象傳入*/
} catch (Throwable ex) {
exception = ex;
} finally {
onTermination(exception);
}
}
final void work(ForkJoinWorkerThread w) {
boolean swept = false; // true on empty scans
long c;
while (!w.terminate && (int)(c = ctl) >= 0) {
int a; // active count
if (!swept && (a = (int)(c >> AC_SHIFT)) <= 0)
swept = scan(w, a); /*進行scan,work-stealing algorithm 在這裏實現*/
else if (tryAwaitWork(w, c))
swept = false;
}
}
/**
* Scans for and, if found, executes one task. Scans start at a
* random index of workers array, and randomly select the first
* (2*#workers)-1 probes, and then, if all empty, resort to 2
* circular sweeps, which is necessary to check quiescence. and
* taking a submission only if no stealable tasks were found. The
* steal code inside the loop is a specialized form of
* ForkJoinWorkerThread.deqTask, followed bookkeeping to support
* helpJoinTask and signal propagation. The code for submission
* queues is almost identical. On each steal, the worker completes
* not only the task, but also all local tasks that this task may
* have generated. On detecting staleness or contention when
* trying to take a task, this method returns without finishing
* sweep, which allows global state rechecks before retry.
*
* @param w the worker
* @param a the number of active workers
* @return true if swept all queues without finding a task
*/
private boolean scan(ForkJoinWorkerThread w, int a) {
int g = scanGuard; // mask 0 avoids useless scans if only one active
int m = (parallelism == 1 - a && blockedCount == 0) ? 0 : g & SMASK;
ForkJoinWorkerThread[] ws = workers;
if (ws == null || ws.length <= m) // staleness check
return false;
for (int r = w.seed, k = r, j = -(m + m); j <= m + m; ++j) {
ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i;
ForkJoinWorkerThread v = ws[k & m]; 【從FJPool的workers取一個工做線程】
if (v != null && (b = v.queueBase) != v.queueTop &&
(q = v.queue) != null && (i = (q.length - 1) & b) >= 0) { 【獲取該工做線程的任務隊列】
long u = (i << ASHIFT) + ABASE;
if ((t = q[i]) != null && v.queueBase == b && 【從被偷的工做線程任務隊列出取出一個任務】
UNSAFE.compareAndSwapObject(q, u, t, null)) {
int d = (v.queueBase = b + 1) - v.queueTop;
v.stealHint = w.poolIndex;
if (d != 0)
signalWork(); // propagate if nonempty
w.execTask(t); 【用指派的工做線程,執行上述偷來的任務】
}
r ^= r << 13; r ^= r >>> 17; w.seed = r ^ (r << 5);
return false; // store next seed
}
else if (j < 0) { // xorshift
r ^= r << 13; r ^= r >>> 17; k = r ^= r << 5;
}
else
++k;
}
if (scanGuard != g) // staleness check 【無效的檢查】
return false;
else { // try to take submission
ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i; 【必然走到這裏,從FJPool的 submissionQueue 中獲取任務來執行】
if ((b = queueBase) != queueTop &&
(q = submissionQueue) != null &&
(i = (q.length - 1) & b) >= 0) {
long u = (i << ASHIFT) + ABASE;
if ((t = q[i]) != null && queueBase == b &&
UNSAFE.compareAndSwapObject(q, u, t, null)) {
queueBase = b + 1;
w.execTask(t);
}
return false;
}
return true; // all queues empty
}
}
二、工做線程的任務隊列,與FJPool的任務隊列之間有什麼聯繫?
【同上】
三、ForkJoinTask.fork() ForkJoinWorkerThread.pushTask(ForkJoinTask<?>)
fork調用了後者,將調用fork的任務對象,加入到當前工做線程的任務列表。
/** * The work-stealing queue array. Size must be a power of two. * Initialized when started (as oposed to when constructed), to * improve memory locality. */ ForkJoinTask<?>[] queue;