ForkJoin_invoke_submit_execute

在ForkJoinPool中,有invoke,submit,execute三個接口能夠提交任務運行,這三個接口有什麼區別?分別適用什麼場景?less

綜述:
當前線程是FJ工做線程且屬於當前FJPool,execute,submit方法會調用forkOrSubmit將task加入到工做線程的任務隊列,而invoke直接調用FJTask的exec()方法;
當前線程非FJ工做線程或者不屬於當前FJPool,則最終都是調用 addSubmission(ForkJoinTask<?>) 將任務加入FJPool的任務隊列,並分配一個FJ工做線程。
其中,submit,invoke有返回值(返回指派的任務的計算結果),而execute沒有返回值。dom

1、execute
提交任務後,任務將被異步執行。它有2個實現,可將Runnable接口封裝成 ForkJoinTask<?> 使用。異步

public void execute(ForkJoinTask<?> task) {
        if (task == null)
            throw new NullPointerException();
        forkOrSubmit(task);
    }ide

public void execute(Runnable task) {
        if (task == null)
            throw new NullPointerException();
        ForkJoinTask<?> job;
        if (task instanceof ForkJoinTask<?>) // avoid re-wrap
            job = (ForkJoinTask<?>) task;
        else
            job = ForkJoinTask.adapt(task, null);   /*對Runnable接口進行封裝*/
        forkOrSubmit(job);
    }oop

/**
     * Unless terminating, forks task if within an ongoing FJ
     * computation in the current pool, else submits as external task.
     */
    private <T> void forkOrSubmit(ForkJoinTask<T> task) {
        ForkJoinWorkerThread w;
        Thread t = Thread.currentThread();
        if (shutdown)
            throw new RejectedExecutionException();
        if ((t instanceof ForkJoinWorkerThread) &&      
            (w = (ForkJoinWorkerThread)t).pool == this)
            w.pushTask(task); 
/*若是當前調用該方法的線程是FJ工做線程,且屬於當前FJPool,則將任務加入工做線程的執行隊列;
 這裏與 ForkJoinTask.fork()相似,fork不判斷是否屬於當前FJPool*/
        else
            addSubmission(task); /*不然,加到提交任務隊列,等待分配工做線程來執行*/
    }ui


2、submit
提交一個任務,其本質與execute相同,區別是execute無返回值,submit會將提交給FJPool的ForkJoinTask<?> 返回給調用者;
由於調用的都是 forkOrSubmit 方法,所以submit的4個實現的實質是同樣的。this

/**
     * @throws NullPointerException if the task is null
     * @throws RejectedExecutionException if the task cannot be
     *         scheduled for execution
     */
    public <T> ForkJoinTask<T> submit(Callable<T> task) {  /*Callable的返回值爲泛型T,做爲封裝後的ForkJoinTask<T>的返回值*/
        if (task == null)
            throw new NullPointerException();
        ForkJoinTask<T> job = ForkJoinTask.adapt(task);
        forkOrSubmit(job);
        return job;
    }idea


    /**
     * @throws NullPointerException if the task is null
     * @throws RejectedExecutionException if the task cannot be
     *         scheduled for execution
     */
    public <T> ForkJoinTask<T> submit(Runnable task, T result) {  /*Runnable不支持返回值,須要使用單獨的參數指定返回值*/
        if (task == null)
            throw new NullPointerException();
        ForkJoinTask<T> job = ForkJoinTask.adapt(task, result);
        forkOrSubmit(job);
        return job;
    }.net

3、invoke線程

public <T> T invoke(ForkJoinTask<T> task) {
        Thread t = Thread.currentThread();
        if (task == null)
            throw new NullPointerException();
        if (shutdown)
            throw new RejectedExecutionException();
        if ((t instanceof ForkJoinWorkerThread) &&
            ((ForkJoinWorkerThread)t).pool == this)  /*若是是當前FJPool的工做線程,則執行task,task.invoke將調用ForkJoinTask的exec()抽象方法,調用compute()方法*/
            return task.invoke();  // bypass submit if in same pool
        else {
            addSubmission(task);   /*將任務提交到隊列,分配一個工做線程給它*/
            return task.join();
        }
    }


/**
     * Enqueues the given task in the submissionQueue.  Same idea as
     * ForkJoinWorkerThread.pushTask except for use of submissionLock.
     *
     * @param t the task
     */
    private void addSubmission(ForkJoinTask<?> t) {
        final ReentrantLock lock = this.submissionLock;
        lock.lock();
        try {
            ForkJoinTask<?>[] q; int s, m;
            if ((q = submissionQueue) != null) {    // ignore if queue removed
                long u = (((s = queueTop) & (m = q.length-1)) << ASHIFT)+ABASE;
                UNSAFE.putOrderedObject(q, u, t);
                queueTop = s + 1;
                if (s - queueBase == m)
                    growSubmissionQueue();
            }
        } finally {
            lock.unlock();
        }
        signalWork();  /*Wakes up or creare a worker*/
    }

4、其它接口
ForkJoinPool:
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
【輸入一組Callable對象,執行後返回一組Future<?>】

ForkJoinTask:
public static <T extends ForkJoinTask<?>> Collection<T> invokeAll(Collection<T> tasks)
【輸入一組ForkJoinTask<?>對象,返回一組一樣的對象】

public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2) {
        t2.fork();
        t1.invoke();
        t2.join();

【將t2加入任務隊列,當即執行t1,並等待t2執行完畢】

public static void invokeAll(ForkJoinTask<?>... tasks)
【可變長入參個數,將任務加入到隊列,待執行(異步)】

問題:
一、工做線程建立/分配後,如何獲取待執行的任務?
public void run() {
        Throwable exception = null;
        try {
            onStart();
            pool.work(this);      /*調用pool的work接口,將當前工做線程對象傳入*/
        } catch (Throwable ex) {
            exception = ex;
        } finally {
            onTermination(exception);
        }
    }

final void work(ForkJoinWorkerThread w) {
        boolean swept = false;                // true on empty scans
        long c;
        while (!w.terminate && (int)(c = ctl) >= 0) {
            int a;                            // active count
            if (!swept && (a = (int)(c >> AC_SHIFT)) <= 0)
                swept = scan(w, a);      /*進行scan,work-stealing algorithm 在這裏實現*/
            else if (tryAwaitWork(w, c))
                swept = false;
        }
    }


/**
     * Scans for and, if found, executes one task. Scans start at a
     * random index of workers array, and randomly select the first
     * (2*#workers)-1 probes, and then, if all empty, resort to 2
     * circular sweeps, which is necessary to check quiescence. and
     * taking a submission only if no stealable tasks were found.  The
     * steal code inside the loop is a specialized form of
     * ForkJoinWorkerThread.deqTask, followed bookkeeping to support
     * helpJoinTask and signal propagation. The code for submission
     * queues is almost identical. On each steal, the worker completes
     * not only the task, but also all local tasks that this task may
     * have generated. On detecting staleness or contention when
     * trying to take a task, this method returns without finishing
     * sweep, which allows global state rechecks before retry.
     *
     * @param w the worker
     * @param a the number of active workers
     * @return true if swept all queues without finding a task
     */
    private boolean scan(ForkJoinWorkerThread w, int a) {
        int g = scanGuard; // mask 0 avoids useless scans if only one active
        int m = (parallelism == 1 - a && blockedCount == 0) ? 0 : g & SMASK;
        ForkJoinWorkerThread[] ws = workers;
        if (ws == null || ws.length <= m)         // staleness check
            return false;
        for (int r = w.seed, k = r, j = -(m + m); j <= m + m; ++j) {
            ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i;
            ForkJoinWorkerThread v = ws[k & m];          【從FJPool的workers取一個工做線程】
            if (v != null && (b = v.queueBase) != v.queueTop &&
                (q = v.queue) != null && (i = (q.length - 1) & b) >= 0) {     【獲取該工做線程的任務隊列】
                long u = (i << ASHIFT) + ABASE;
                if ((t = q[i]) != null && v.queueBase == b && 【從被偷的工做線程任務隊列出取出一個任務】
                    UNSAFE.compareAndSwapObject(q, u, t, null)) {
                    int d = (v.queueBase = b + 1) - v.queueTop;
                    v.stealHint = w.poolIndex;
                    if (d != 0)
                        signalWork();             // propagate if nonempty
                    w.execTask(t); 【用指派的工做線程,執行上述偷來的任務】
                }
                r ^= r << 13; r ^= r >>> 17; w.seed = r ^ (r << 5);
                return false;                     // store next seed
            }
            else if (j < 0) {                     // xorshift
                r ^= r << 13; r ^= r >>> 17; k = r ^= r << 5;
            }
            else
                ++k;
        }
        if (scanGuard != g)                       // staleness check  【無效的檢查】
            return false;
        else {                                    // try to take submission
            ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i; 【必然走到這裏,從FJPool的 submissionQueue 中獲取任務來執行】
            if ((b = queueBase) != queueTop &&
                (q = submissionQueue) != null &&
                (i = (q.length - 1) & b) >= 0) {
                long u = (i << ASHIFT) + ABASE;
                if ((t = q[i]) != null && queueBase == b &&
                    UNSAFE.compareAndSwapObject(q, u, t, null)) {
                    queueBase = b + 1;
                    w.execTask(t);
                }
                return false;
            }
            return true;                         // all queues empty
        }
    }

二、工做線程的任務隊列,與FJPool的任務隊列之間有什麼聯繫?
【同上】


三、ForkJoinTask.fork()   ForkJoinWorkerThread.pushTask(ForkJoinTask<?>)
fork調用了後者,將調用fork的任務對象,加入到當前工做線程的任務列表。

   /**      * The work-stealing queue array. Size must be a power of two.      * Initialized when started (as oposed to when constructed), to      * improve memory locality.      */     ForkJoinTask<?>[] queue;

本站公眾號
   歡迎關注本站公眾號,獲取更多信息