

當前線程非FJ工做線程或者不屬於當前FJPool,則最終都是調用 addSubmission(ForkJoinTask<?>) 將任務加入FJPool的任務隊列,並分配一個FJ工做線程。

提交任務後,任務將被異步執行。它有2個實現,可將Runnable接口封裝成 ForkJoinTask<?> 使用。異步

public void execute(ForkJoinTask<?> task) {
        if (task == null)
            throw new NullPointerException();

public void execute(Runnable task) {
        if (task == null)
            throw new NullPointerException();
        ForkJoinTask<?> job;
        if (task instanceof ForkJoinTask<?>) // avoid re-wrap
            job = (ForkJoinTask<?>) task;
            job = ForkJoinTask.adapt(task, null);   /*對Runnable接口進行封裝*/

     * Unless terminating, forks task if within an ongoing FJ
     * computation in the current pool, else submits as external task.
    private <T> void forkOrSubmit(ForkJoinTask<T> task) {
        ForkJoinWorkerThread w;
        Thread t = Thread.currentThread();
        if (shutdown)
            throw new RejectedExecutionException();
        if ((t instanceof ForkJoinWorkerThread) &&      
            (w = (ForkJoinWorkerThread)t).pool == this)
 這裏與 ForkJoinTask.fork()相似,fork不判斷是否屬於當前FJPool*/
            addSubmission(task); /*不然,加到提交任務隊列,等待分配工做線程來執行*/

提交一個任務,其本質與execute相同,區別是execute無返回值,submit會將提交給FJPool的ForkJoinTask<?> 返回給調用者;
由於調用的都是 forkOrSubmit 方法,所以submit的4個實現的實質是同樣的。this

     * @throws NullPointerException if the task is null
     * @throws RejectedExecutionException if the task cannot be
     *         scheduled for execution
    public <T> ForkJoinTask<T> submit(Callable<T> task) {  /*Callable的返回值爲泛型T,做爲封裝後的ForkJoinTask<T>的返回值*/
        if (task == null)
            throw new NullPointerException();
        ForkJoinTask<T> job = ForkJoinTask.adapt(task);
        return job;

     * @throws NullPointerException if the task is null
     * @throws RejectedExecutionException if the task cannot be
     *         scheduled for execution
    public <T> ForkJoinTask<T> submit(Runnable task, T result) {  /*Runnable不支持返回值,須要使用單獨的參數指定返回值*/
        if (task == null)
            throw new NullPointerException();
        ForkJoinTask<T> job = ForkJoinTask.adapt(task, result);
        return job;


public <T> T invoke(ForkJoinTask<T> task) {
        Thread t = Thread.currentThread();
        if (task == null)
            throw new NullPointerException();
        if (shutdown)
            throw new RejectedExecutionException();
        if ((t instanceof ForkJoinWorkerThread) &&
            ((ForkJoinWorkerThread)t).pool == this)  /*若是是當前FJPool的工做線程,則執行task,task.invoke將調用ForkJoinTask的exec()抽象方法,調用compute()方法*/
            return task.invoke();  // bypass submit if in same pool
        else {
            addSubmission(task);   /*將任務提交到隊列,分配一個工做線程給它*/
            return task.join();

     * Enqueues the given task in the submissionQueue.  Same idea as
     * ForkJoinWorkerThread.pushTask except for use of submissionLock.
     * @param t the task
    private void addSubmission(ForkJoinTask<?> t) {
        final ReentrantLock lock = this.submissionLock;
        try {
            ForkJoinTask<?>[] q; int s, m;
            if ((q = submissionQueue) != null) {    // ignore if queue removed
                long u = (((s = queueTop) & (m = q.length-1)) << ASHIFT)+ABASE;
                UNSAFE.putOrderedObject(q, u, t);
                queueTop = s + 1;
                if (s - queueBase == m)
        } finally {
        signalWork();  /*Wakes up or creare a worker*/

public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)

public static <T extends ForkJoinTask<?>> Collection<T> invokeAll(Collection<T> tasks)

public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2) {


public static void invokeAll(ForkJoinTask<?>... tasks)

public void run() {
        Throwable exception = null;
        try {
  ;      /*調用pool的work接口,將當前工做線程對象傳入*/
        } catch (Throwable ex) {
            exception = ex;
        } finally {

final void work(ForkJoinWorkerThread w) {
        boolean swept = false;                // true on empty scans
        long c;
        while (!w.terminate && (int)(c = ctl) >= 0) {
            int a;                            // active count
            if (!swept && (a = (int)(c >> AC_SHIFT)) <= 0)
                swept = scan(w, a);      /*進行scan,work-stealing algorithm 在這裏實現*/
            else if (tryAwaitWork(w, c))
                swept = false;

     * Scans for and, if found, executes one task. Scans start at a
     * random index of workers array, and randomly select the first
     * (2*#workers)-1 probes, and then, if all empty, resort to 2
     * circular sweeps, which is necessary to check quiescence. and
     * taking a submission only if no stealable tasks were found.  The
     * steal code inside the loop is a specialized form of
     * ForkJoinWorkerThread.deqTask, followed bookkeeping to support
     * helpJoinTask and signal propagation. The code for submission
     * queues is almost identical. On each steal, the worker completes
     * not only the task, but also all local tasks that this task may
     * have generated. On detecting staleness or contention when
     * trying to take a task, this method returns without finishing
     * sweep, which allows global state rechecks before retry.
     * @param w the worker
     * @param a the number of active workers
     * @return true if swept all queues without finding a task
    private boolean scan(ForkJoinWorkerThread w, int a) {
        int g = scanGuard; // mask 0 avoids useless scans if only one active
        int m = (parallelism == 1 - a && blockedCount == 0) ? 0 : g & SMASK;
        ForkJoinWorkerThread[] ws = workers;
        if (ws == null || ws.length <= m)         // staleness check
            return false;
        for (int r = w.seed, k = r, j = -(m + m); j <= m + m; ++j) {
            ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i;
            ForkJoinWorkerThread v = ws[k & m];          【從FJPool的workers取一個工做線程】
            if (v != null && (b = v.queueBase) != v.queueTop &&
                (q = v.queue) != null && (i = (q.length - 1) & b) >= 0) {     【獲取該工做線程的任務隊列】
                long u = (i << ASHIFT) + ABASE;
                if ((t = q[i]) != null && v.queueBase == b && 【從被偷的工做線程任務隊列出取出一個任務】
                    UNSAFE.compareAndSwapObject(q, u, t, null)) {
                    int d = (v.queueBase = b + 1) - v.queueTop;
                    v.stealHint = w.poolIndex;
                    if (d != 0)
                        signalWork();             // propagate if nonempty
                    w.execTask(t); 【用指派的工做線程,執行上述偷來的任務】
                r ^= r << 13; r ^= r >>> 17; w.seed = r ^ (r << 5);
                return false;                     // store next seed
            else if (j < 0) {                     // xorshift
                r ^= r << 13; r ^= r >>> 17; k = r ^= r << 5;
        if (scanGuard != g)                       // staleness check  【無效的檢查】
            return false;
        else {                                    // try to take submission
            ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i; 【必然走到這裏,從FJPool的 submissionQueue 中獲取任務來執行】
            if ((b = queueBase) != queueTop &&
                (q = submissionQueue) != null &&
                (i = (q.length - 1) & b) >= 0) {
                long u = (i << ASHIFT) + ABASE;
                if ((t = q[i]) != null && queueBase == b &&
                    UNSAFE.compareAndSwapObject(q, u, t, null)) {
                    queueBase = b + 1;
                return false;
            return true;                         // all queues empty


三、ForkJoinTask.fork()   ForkJoinWorkerThread.pushTask(ForkJoinTask<?>)

   /**      * The work-stealing queue array. Size must be a power of two.      * Initialized when started (as oposed to when constructed), to      * improve memory locality.      */     ForkJoinTask<?>[] queue;
