Java7中的ForkJoin併發框架初探(中)——JDK中實現簡要分析

根據前文描述的Doug Lea的理論基礎,在JDK1.7中已經給出了Fork Join的實現。在Java SE 7的API中,多了ForkJoinTask、ForkJoinPool、ForkJoinWorkerThread、RecursiveAction、RecursiveTask這樣5個類。本文就對JDK1.7中增長這5個工具類實現作簡要分析。java

0. JDK中ForkJoin實現概述數組

在JavaSE7的API和JDK1.7中,分別集成了支持ForkJoin的五個類:併發

  • ForkJoinPool 實現ForkJoin的線程池
  • ForkJoinWorkerThread  實現ForkJoin的線程
  • ForkJoinTask<V> 一個描述ForkJoin的抽象類
  • RecursiveAction 無返回結果的ForkJoinTask實現
  • RecursiveTask<V> 有返回結果的ForkJoinTask實現

ForkJoinPool維護了多個線程構成的數組,維護了任務提交隊列,給出了多個線程之間工做竊取的實現。給出了任務類型適配,和提交任務邏輯的實現。須要和線程緊密配合。框架

而ForkJoinWorkerThread則繼承了java.lang.Thread類,維護了線程本身的隊列,同一個任務fork()操做原則上會添加到同一個線程隊列中。而這個線程類須要和ForkJoinPool緊密合做,有指向對應ForkJoinPool對象的引用。less

ForkJoinTask則實現了Future接口,除了對接口的實現外,主要是fork()和join()操做。注意,貌似fork()只有ForkJoinWorkerThread 中才能執行。dom

兩個子類RecursiveAction和RecursiveTask則實現比較簡單,區別就在於返回值的處理不一樣。工具

1. ForkJoinPoolthis

ForkJoinPool是實現了 Fork Join 的線程池。看JDK源碼咱們知道ForkJoinPool是extends AbstractExecutorService的,也就是說間接地實現了Executor和ExecutorService接口。實際上也就意味着ForkJoinPool是繼ThreadPoolExecutor後的又一個Executor(Service)的具體實現。線程

1.1. 構建初始化code

咱們先看ForkJoinPool的構造方法,一共有3個重載的實現。有一個單參數的默認實現,一般咱們使用這個就足夠了,這最終會以默認的參數調用3參數的構造方法。咱們再來看3個參數的構造方法實現。其中:

  • int parallelism 第一個參數是並行度,這個參數簡介影響着(會額外作一些運算)這個ForkJoinPool的ForkJoinWorkerThread 線程數。默認狀況下,這個參數是任務運行環境的處理器個數,好比系統提供的處理器數目爲4,初始化線程池會開啓16個線程。
  • ForkJoinWorkerThreadFactory factory 這個是ForkJoinPool構建新線程ForkJoinWorkerThread 對象的工廠,相似於ThreadPoolExecutor中用到的ThreadFactory。
  • Thread.UncaughtExceptionHandler handler 這個前面併發的文章頁提到過,是線程異常處理器,這裏很少說了。

1.2. 任務提交

前面已經提到,ForkJoinPool也是Executor(Service)的實現,那麼execute()和submit()這樣向ThreadPoolExecutor提交任務的方法對於ForkJoinPool來講也是同樣有效的。

須要說明的是,除了增長支持ForkJoinTask對象參數的重載實現外,還在Runnable和Callable參數的方法中對原始的Runnable和Callable對象作了到ForkJoinTask的適配,使用的分別是ForkJoinTask的靜態內部類AdaptedRunnable和AdaptedCallable的對象。而這兩個類型參數對應的方法最終都會調用ForkJoinTask參數的方法:

public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {

    if (task == null)

        throw new NullPointerException();

    forkOrSubmit(task);

    return task;

}

咱們接下來再看下任務提交中被調用到的forkOrSubmit()方法:

private <T> void forkOrSubmit(ForkJoinTask<T> task) {
    ForkJoinWorkerThread w;
    Thread t = Thread.currentThread();
    if (shutdown)
        throw new RejectedExecutionException();
    if ((t instanceof ForkJoinWorkerThread) &&
        (w = (ForkJoinWorkerThread)t).pool == this)
        w.pushTask(task);
    else
        addSubmission(task);
}

邏輯很容易理解,先判斷ForkJoinPool的狀態,若已中止,則拋異常返回。以後若是當前線程是ForkJoinWorkerThread類型的,則將任務追加到ForkJoinWorkerThread對象中維護的隊列上,不然將新的任務放入ForkJoinPool的提交隊列中,並通知線程工做。

1.3. 線程的啓動和工做

前面已經強調過,ForkJoinPool和ForkJoinWorkerThread是緊密相關,耦合在一塊兒的。Thread的start()會調用run(),而ForkJoinWorkerThread類重寫了run()方法,會調用對應的線程池ForkJoinPool對象的work()方法。

咱們來看一下work()方法的實現。

final void work(ForkJoinWorkerThread w) {
    boolean swept = false;                // true on empty scans
    long c;
    while (!w.terminate && (int)(c = ctl) >= 0) {
        int a;                            // active count
        if (!swept && (a = (int)(c >> AC_SHIFT)) <= 0)
            swept = scan(w, a);
        else if (tryAwaitWork(w, c))
            swept = false;
    }
}

裏面主要是一個while循環體,只要當前的線程和線程池不是處於終止狀態,則這個循環一直執行。執行的內容則是這樣的,若是可以根據scan()方法獲得任務,並執行,不然進入阻塞狀態。

咱們來看一下scan()方法的實現。

private boolean scan(ForkJoinWorkerThread w, int a) {
    int g = scanGuard; // mask 0 avoids useless scans if only one active
    int m = (parallelism == 1 - a && blockedCount == 0) ? 0 : g & SMASK;
    ForkJoinWorkerThread[] ws = workers;
    if (ws == null || ws.length <= m)         // staleness check
        return false;
    for (int r = w.seed, k = r, j = -(m + m); j <= m + m; ++j) {
        ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i;
        ForkJoinWorkerThread v = ws[k & m];
        if (v != null && (b = v.queueBase) != v.queueTop &&
            (q = v.queue) != null && (i = (q.length - 1) & b) >= 0) {
            long u = (i << ASHIFT) + ABASE;
            if ((t = q[i]) != null && v.queueBase == b &&
                UNSAFE.compareAndSwapObject(q, u, t, null)) {
                int d = (v.queueBase = b + 1) - v.queueTop;
                v.stealHint = w.poolIndex;
                if (d != 0)
                    signalWork();             // propagate if nonempty
                w.execTask(t);
            }
            r ^= r << 13; r ^= r >>> 17; w.seed = r ^ (r << 5);
            return false;                     // store next seed
        }
        else if (j < 0) {                     // xorshift
            r ^= r << 13; r ^= r >>> 17; k = r ^= r << 5;
        }
        else
            ++k;
    }
    if (scanGuard != g)                       // staleness check
        return false;
    else {                                    // try to take submission
        ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i;
        if ((b = queueBase) != queueTop &&
            (q = submissionQueue) != null &&
            (i = (q.length - 1) & b) >= 0) {
            long u = (i << ASHIFT) + ABASE;
            if ((t = q[i]) != null && queueBase == b &&
                UNSAFE.compareAndSwapObject(q, u, t, null)) {
                queueBase = b + 1;
                w.execTask(t);
            }
            return false;
        }
        return true;                         // all queues empty
    }
}

看起來很複雜,實際的原理則很簡單,就是先嚐試作任務竊取( Work Stealing ),若是不知足條件則到提交隊列中獲取任務。而ForkJoinWorkerThread線程自己也維護了線程內fork和join任務操做獲得的隊列,結合起來,整體執行任務的順序就是:

  • 線程會先執行ForkJoinWorkerThread對象內維護的任務隊列中的任務,即ForkJoinWorkerThread的execTask()方法中的循環實現。一般是LIFO,即去最新的任務。也有特殊狀況,這個根據變量locallyFifo的值來判斷。
  • 以後會嘗試作任務竊取,嘗試從其餘線程中獲取任務
  • 任務竊取條件不知足時,到提交隊列中獲取提交的任務

1.4. ForkJoinPool的其它屬性

除了上述提到的操做,ForkJoin中還維護了

  • 線程數組和提交任務的隊列,這是最基本的
  • 操做相關的鎖和條件對象
  • volatile long ctl; 等線程池ForkJoinPool狀態的屬性
  • static final Random workerSeedGenerator; 等和任務竊取策略相關的一系列屬性
  •  private volatile long stealCount; 等數據統計相關屬性

等數據屬性。

2. ForkJoinWorkerThread

ForkJoinWorkerThread擴展於Thread類,但提供了不少支持ForkJoin的特性。

上文在介紹ForkJoinPool的時候已經對這個類作了不少描述,也強調過線程類ForkJoinWorkerThread和ForkJoinPool相互依賴,放在一塊兒纔有意義。實際上,還要提到描述Fork Join任務的類ForkJoinTask。

除了上面提到的之外,對於ForkJoinWorkerThread這個類,再稍微提一下這樣幾個點:

  • ForkJoinTask<?>[] queue; 這是維護和ForkJoin相關的(子)任務隊列,還有queueTop和queueBase屬性,分別標記隊列的尾部和頭部
  • final ForkJoinPool pool; 指向線程池的引用,須要注意的是,這個屬性被final修飾
  • 和ForkJoinTask的fork()和join()方法相關的方法——pushTask()和unpushTask(),分別負責在當前ForkJoinWorkerThread對象維護的隊列中新增和取回任務
  • 其它與狀態和統計相關的屬性

3. ForkJoinTask及兩個抽象子類

ForkJoinTask是ForkJoin框架中的主體,是ForkJoin中任務的體現。這個類實現了Future和Serializable接口。除了Futrue接口要知足的方法外,我想有這樣3個方法是有必要知道的,分別是fork()、join()和exec()。

對於fork(),這個也許你們都很熟悉了,在這裏也就是分解出子任務的執行。這個在實現上很簡單那,就是在當前線程ForkJoinWorkerThread對象維護的隊列中加入新的子任務。實現以下:

public final ForkJoinTask fork() {
    ((ForkJoinWorkerThread) Thread.currentThread())
        .pushTask(this);
    return this;
}

須要注意的是fork()方法的調用是在當前線程對象爲ForkJoinWorkerThread的條件下。

咱們再來看看對應的join()實現:

public final V join() {
    if (doJoin() != NORMAL)
        return reportResult();
    else
        return getRawResult();
}

顯然,它有調用了doJoin()方法,咱們再來深刻了解下。

private int doJoin() {
    Thread t; ForkJoinWorkerThread w; int s; boolean completed;
    if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
        if ((s = status) < 0)
            return s;
        if ((w = (ForkJoinWorkerThread)t).unpushTask(this)) {
            try {
                completed = exec();
            } catch (Throwable rex) {
                return setExceptionalCompletion(rex);
            }
            if (completed)
                return setCompletion(NORMAL);
        }
        return w.joinTask(this);
    }
    else
        return externalAwaitDone();
}

大概的邏輯是這樣的,在當前線程對象爲ForkJoinWorkerThread的條件下,從隊列中取回當前任務ForkJoinTask對象,並嘗試在調用線程對其直接執行,不然當前線程調用wait()阻塞等待。更深刻的理解可續繼續查閱源碼。

最後,咱們再來看看exec()方法,這個是在ForkJoinTask中是沒有給出實現的。

在JDK中,有ForkJoinTask的兩個抽象子類RecursiveAction和RecursiveTask,他們分別給出了exec()的實現,這也是這兩個子類主要作的事情,其實是調用了各自的compute()方法,而在RecursiveAction和RecursiveTask中compute()又是未給出實現的。

實際上,compute()方法就是Fork Join要執行的內容,是Fork Join任務的實質,須要開發者給出。

而RecursiveAction和RecursiveTask就是方便開發者使用Fork Join的,RecursiveAction和RecursiveTask這兩個類的區別僅僅是返回結果的狀況不一樣。而這個compute()方法就是留給開發者繼承擴展使用的。這個會在下篇文章詳細講述。

相關文章
相關標籤/搜索