ForkJoin框架之CountedCompleter,工做線程及並行流

前言

在前面的文章"ForkJoin框架之ForkJoinTask"中梳理了ForkJoin框架的簡要運行格架和異常處理流程,顯然要理解ForkJoin框架的調度,包含工做竊取等思想,須要去ForkJoinPool中瞭解,而對於ForkJoinTask的拓展和使用則須要瞭解它的一些子類,前文中偶爾會提到ForkJoinTask的一個子類:CountedCompleter,直譯爲計數的完成器.java

前文也說過,JAVA8的並行流其實就是基於了ForkJoin框架實現,所以並行流其實就在使用咱們前面提到的工做竊取和分治思想.爲了方便對於ForkJoinTask的理解,本文將詳述CountedCompleter(同時在ForkJoinPool中也須要了解它),以及前文提到的工做線程ForkJoinWorkerThread,並簡單看一看並行流.算法

CountedCompleter源碼

根據doug的註釋,CoutedCompleter是一個特殊的ForkJoinTask,它會在觸發完成動做時,檢查有沒有掛起action,若沒有則執行一個完成動做.這個概念有些抽象,必須結合源碼和源碼做者給出的示例加以理解,一樣的,理解了它,也就理解了CountedCompleter的擴展類的實現方式,從而能閱讀懂有關的源碼(如並行流中涉及到運行集拆分,結果合併,運算調度等源碼).編程

它也是一個抽象類,基於ForkJoinTask的exec函數進行了若干擴展.api

public abstract class CountedCompleter<T> extends ForkJoinTask<T> 

//任務的完成者,很明顯這是一個全局的棧結構(暫時這麼理解吧,其實也不太嚴格).
final CountedCompleter<?> completer;
//重要字段,表明完成前掛起的任務數量,用volatile修飾.
volatile int pending;
//帶有completer的構造器.
protected CountedCompleter(CountedCompleter<?> completer) {
    this.completer = completer;
}
//不帶completer的構造器
protected CountedCompleter() {
    this.completer = null;
}
//抽象的compute方法,它是相似ForkJoinTask的擴展方式.
public abstract void compute();
//重寫的exec方法
protected final boolean exec() {
    //直接調用compute方法並返回false.回到ForkJoinTask類中的doExec方法,能夠看到
    //調用了exec後若獲得true值,將會執行setCompletion(NORMAL)動做.且該動做將在首次喚醒等待結果的線程.
    //此處return了false,將不去執行上述操做.詳情參考上篇文章.
    compute();
    return false;
}

以上是CountedCompleter的簽名,字段,構造器和核心的抽象方法compute,其實整個CountedCompleter就是在圍着這點東西轉,首先看一看與ForkJoinTask的結合.數組

顯然,CountedCompleter簡單重寫了ForkJoinTask的exec方法簡單調用抽象的compute方法並返回false,當出現異常時,流程不變,但當compute方式正常完成的狀況,將不可能進行父類後續的設置完成和喚醒操做.所以它必須由CountedCompleter自定義的完成.安全

而CountedCompleter也確實暴露了一些公有函數,可是調用的時機卻要用戶繼承它以後決定.咱們先來繼續一些輔助源碼並理解Completer的設計理念,稍後再來看它的完成方法.數據結構

//onCompletion勾子方法,默認空實現.
//CountedCompleter在tryComplete方法中會在符合完成的第一個條件(無掛起任務)的狀況下執行它.
//complete方法也會對它有無條件地調用.
//關於這兩個方法稍後詳述.
//它的實現取決於要實現的操做,並行流中的一些ops會在此處進行一些中間結果處理,好比結果集的合併(reduce操做).
public void onCompletion(CountedCompleter<?> caller) {
}

//重寫ForkJoinTask中的方法.上篇源碼分享文章中提過,在ForkJoinTask的setExceptionalCompletion會調用internalPropagateException
//傳遞異常,並且是個空實現,而在CountedCompleter中實現了該方法,並在內部調用onExceptionalCompletion
void internalPropagateException(Throwable ex) {
    CountedCompleter<?> a = this, s = a;
    //循環判斷每個task是否要傳遞異常給它的completer
    //無方法體的while循環.道格大神的代碼神蹟.
    while (a.onExceptionalCompletion(ex, s) &&
            //要傳遞給completer且具有completer且completer還不是完成態(正常或非正常)
           (a = (s = a).completer) != null && a.status >= 0 &&
            //則令completer去記錄異常完成,若記錄成功則進入下一輪循環.
           a.recordExceptionalCompletion(ex) == EXCEPTIONAL)
        ;
    //由於onExceptionalCompletion固定返回true,若沒有中間完成的任務,直到最後一個completer,也就是root,
    //root不具有completer,將中斷循環.
}

//異常完成勾子方法.
//按上一節的概念,當ForkJoinTask執行出錯,即exec->compute出錯時,最終會調到此勾子.或當手動completeExceptionally或cancel時.
public boolean onExceptionalCompletion(Throwable ex, CountedCompleter<?> caller) {
    //直接返回true,顯然也是一個供擴展的方法.返回true表明異常應該傳遞給this的completer.
    return true;
}

//返回completer
public final CountedCompleter<?> getCompleter() {
    return completer;
}

//返回掛起任務數量.
public final int getPendingCount() {
    return pending;
}

//設置掛起任務數量
public final void setPendingCount(int count) {
    pending = count;
}

//原子地爲掛起任務數量添加delta
public final void addToPendingCount(int delta) {
    U.getAndAddInt(this, PENDING, delta);
}

   //原子地將當前掛起任務數量從expected更改到count
public final boolean compareAndSetPendingCount(int expected, int count) {
    return U.compareAndSwapInt(this, PENDING, expected, count);
}

//將當前任務的掛起數量原子減至0.
public final int decrementPendingCountUnlessZero() {
    int c;
    do {} while ((c = pending) != 0 &&
                 !U.compareAndSwapInt(this, PENDING, c, c - 1));
    return c;
}

//返回root completer.邏輯很簡單.
public final CountedCompleter<?> getRoot() {
    CountedCompleter<?> a = this, p;
    while ((p = a.completer) != null)
        a = p;
    return a;
}

以上是幾個工具函數,邏輯也很簡單,僅有一處可能留有疑問:完成態/異常態是如何傳遞的.多線程

如今你們應該理解爲何ForkJoinTask要將internalPropagateException置爲空實現了,顯然,對於不一樣方式的實現,確實須要不一樣的傳遞行爲.CountedCompleter保存了一個相似"棧結構"的任務鏈,雖然提早講到棧底即爲root任務(固然root在底部仍是頂部自己不重要),顯然任何一個子任務出現了問題,與它關聯的父任務的行爲顯然要有一個明確的由子類定義的規則.app

咱們看到在重寫的internalPropagateException方法中,不停地判斷當前任務是否要將異常信號傳遞給鏈上的下一個任務(on方法始終返回true,不要緊咱們能夠在子類中重寫),而後讓未完成的completer去記錄同一個異常ex.框架

那麼問題來了,只要completer已完成過(正常完成過異常完成或取消),顯然while循環中斷,completer和它的後續completer將不會被處理(1).一樣,若傳遞異常的任務自己就是另外一個或幾個任務的completer,它的異常信息顯然不會反向傳遞(2).

對於問題(1),顯然若是後續的completer已出現過異常,必然也會走一遍一樣的邏輯,傳遞給後面的completer,若是它正常完成,也必然要有相應向後傳遞的行爲,不然沒法解決(1),咱們接下來即論述相關方法.

對於問題(2),顯然問題(1)中描述的狀況與此有所交集,若是咱們創建了一個CountedCompleter任務,並在compute方法中大肆fork子任務入隊,fork以後不等子任務完成,也不獲取子任務的執行結果,直接將父任務setCompletion或者setExceptionalCompletion,子任務仍是會繼續執行的.

爲了便於理解,咱們繼續來看與任務的完成有關的方法.

//嘗試完成根任務或減小棧鏈下游的某一個completer的掛起數(包含它自身).
public final void tryComplete() {
    //1.初始用a保存this,後續爲當前操做任務,用s保存a.
    CountedCompleter<?> a = this, s = a;
    for (int c;;) {
        //2.第一次進入或在6形成競態的某一次循環中,a(this或this的completer鏈中的某一個)的的掛起任務數爲0,表明它掛起的任務都完成了.
        if ((c = a.pending) == 0) {
            //3.a的勾子方法,若已經運行過4,且判斷條件爲假未能到5並在下一次循環從新回到3的狀況,a!=s且a是s的completer,
            //在對onCompletion重寫時,能夠根據this與參數是否相等進行判斷,如並行流聚合時能夠根據這個條件進行結果集的合併.
            a.onCompletion(s);
            //4.將a指向本身的completer,s指向原來的a.
            if ((a = (s = a).completer) == null) {
                //5.原來a的completer不存在,即a不是root,不須要再傳遞了,讓root進行quietlyComplete並返回.
                //此時說明整條鏈上的competer掛起任務所有是0.
                s.quietlyComplete();
                return;
            }
            //隱藏的7.當原a的completer存在(a不是root)的狀況,繼續對該complter判斷掛起任務數或嘗試減1,對下一個元素開啓下一輪循環.
        }
        //6.對this的completer棧的某一次循環時發現了掛起任務數不爲0的,則對該completer的掛起數減1,
        //表示它掛起的任務完成了一個,並返回.若在此時剛好出現了競態,另外一條鏈上的任務搶先減一,則當前
        //的a要進入下一循環,它可能會在2處判斷經過,進入到鏈上的下一個completer的傳播邏輯.
        else if (U.compareAndSwapInt(a, PENDING, c, c - 1))
            return;
    }
}

//基本等效於tryComplete,只是不執行onCompletion,tryComplete會在判斷鏈上某個completer的掛起任務數是0當即執行onCompletion.
public final void propagateCompletion() {
    CountedCompleter<?> a = this, s = a;
    for (int c;;) {
        if ((c = a.pending) == 0) {
            if ((a = (s = a).completer) == null) {
                s.quietlyComplete();
                return;
            }
        }
        else if (U.compareAndSwapInt(a, PENDING, c, c - 1))
            return;
    }
}


//complete方法,邏輯簡單,絲絕不考慮掛起數,直接執行當前task的幾個完成方法,並嘗試對completer進行tryComplete.
//它不改變本身的掛起任務數,但會讓completer對棧上的其餘completer或自身嘗試減小掛起數或完成root.
public void complete(T rawResult) {
    CountedCompleter<?> p;
    setRawResult(rawResult);//使用參數設置爲當前任務的結果,儘管它爲空方法.
    onCompletion(this);//直接調用onCompletion勾子.
    quietlyComplete();//安靜地將status置爲NORMAL.
    if ((p = completer) != null)
        //本身不改變自身掛起數,也不嘗試完成root,但讓completer嘗試去向下執行這些操做.
        p.tryComplete();
}

//沒辦法單獨理解這個方法名.官方註釋是和nextComplete放置在循環中使用.
public final CountedCompleter<?> firstComplete() {
    for (int c;;) {
        if ((c = pending) == 0)
            //1.當前task沒有掛起任務數,則返回它.
            return this;
        else if (U.compareAndSwapInt(this, PENDING, c, c - 1))
            //2.不然嘗試減小一個掛起任務數並返回null.但當出現競態時,可能致使未能進入2而在下一次循環進入1.
            return null;
    }
}

//結合前面的firstComplete互相理解,它會對當前任務判斷是否有completer,有則對該completer進行firstComplete,
//不然將當前任務安靜完成並返回null.
//故結果只能返回null或completer
public final CountedCompleter<?> nextComplete() {
    CountedCompleter<?> p;
    if ((p = completer) != null)
        //有completer且completer已無掛起任務數,則返回completer,
        //有completer且completer有掛起任務數,則嘗試對該任務數減一併返回null.出現競態則可能返回該completer.
        return p.firstComplete();
    else {
        //無completer,安靜完成當前任務並返回null.
        quietlyComplete();
        return null;
    }
}

//等同於getRoot().quietlyComplete()
public final void quietlyCompleteRoot() {
    for (CountedCompleter<?> a = this, p;;) {
        if ((p = a.completer) == null) {
            a.quietlyComplete();
            return;
        }
        a = p;
    }
}


//若是當前任務未完成,嘗試去出棧執行,並處理至多給定數量的其餘未處理任務,且對這些未處理任務
//來講,當前任務處於它們的完成路徑上(即這些任務是completer棧鏈的前置任務),實現特殊的工做竊取.
public final void helpComplete(int maxTasks) {
    Thread t; ForkJoinWorkerThread wt;
    if (maxTasks > 0 && status >= 0) {
        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
            //當前線程是ForkJoinWorkerThread,嘗試執行當前任務並嘗試從線程的工做隊列中嘗試幫助前置任務執行.
            (wt = (ForkJoinWorkerThread)t).pool.
                helpComplete(wt.workQueue, this, maxTasks);
        else
            //使用common池的externalHelpComplete方法.
            ForkJoinPool.common.externalHelpComplete(this, maxTasks);
    }
}

上一段代碼整體邏輯不難,有如下幾點總結:

1.顯然tryComplete方法在調用後的最終結果只有兩個:本身或completer鏈前方的某一個completer的掛起任務數減1(1),本身或completer鏈前方某一個completer(root)的quietlyComplete被執行(2).簡單來講,就是讓root進行quietlyComplete(鏈上每個掛起任務數都是0)或讓鏈上的某一個completer減小一個掛起任務.

2.tryComplete方法只會對root進行quietlyComplete,進而setComplete(NORMAL),對於鏈上的其餘任務,最多會幫助掛起數減一,而不會把它們置爲完成態,可是線程池在執行任務時,或者直接對一個鏈上的completer進行invoke,doExec甚至get等操做時,這些方法會將該中間completer進行setComplete.

3.每個CountedCompleter均可能有本身的completer棧鏈,每個CountedCompleter也能夠位於其餘CountedCompleter的棧鏈上且上游不惟一而下游惟一一(倒樹形),任何一條棧鏈只能有一個root,root的completer爲null.

4.從tryComplete方法來看正常運行狀況下的規則,每個CountedCompleter的tryComplete只能向前影響到鏈上的另外一個completer,由於實現數量的增長方法有好幾處,用戶在實現時,隨時可能將一些completer的數量設置成任意的數,故能夠出現前面tryComplete註釋中隱藏的7的狀況,即存在一個completer,它的下一個completer的掛起數是0,它卻能將下下個completer安靜完成或將其掛起數減一,即跨無掛起數節點傳遞.

5.前面列出的helpComplete方法是CountedCompleter的特殊工做竊取方法(或者也不能叫做竊取,由於非common池狀況竊取的是本身線程的任務,common池則依賴於一個探測值),具體的竊取細節在ForkJoinPool中,將在後面的文章中論述,但簡單的邏輯已經在註釋中描述清楚,把它歸到這一塊,也是由於它與前面描述的邏輯有所糾葛.124提到了tryComplete的向前影響結果,而在實際的應用中,咱們可能會有各類各樣的情景,ForkJoin框架沒法阻止咱們對ForkJoinTask的exec函數進行任意式的擴展,也沒法阻止咱們對CountedCompleter的compute任意擴展,那麼如何在咱們任意拓展的情景下保持效率和健壯?好比下面這個使用場景:

a.創建一種ForkJoinTask,直接繼承CountedCompleter並重寫compute方法,則它能夠運行在ForkJoinPool中.

b.咱們接下來在compute方法中屢次根據計算結果集的大小進行拆分並遞歸fork子任務入池,父任務成爲子任務的completer,同時compute方法自身也負責不可拆分的計算邏輯,並在自身這一塊計算結束後,可能等待全部fork入池的子任務結束,也可能不等待子任務,直接結束父任務,讓線程空出來作其餘的事.

c.全部子任務結束後,使用一個合併函數合併子任務的結果集和自身的結果,並做爲最終的結果.而後tryComplete(若是b中使用了join,或者判斷當前任務是root).

顯然,b中fork出的子任務,也一樣要執行bc的邏輯.那麼可能出現這樣的狀況:

不一樣的父任務子任務在ForkJoinPool最初始壓入當前工做線程的隊列中,但隨時可能被其餘工做線程甚至外部線程偷去執行.

父任務搶先搶得運行資源,運行完本身計算的部分,而入池的子任務及子孫任務有大量未完成.

難道父任務的執行線程就這樣乾等?在前一篇文章中說過,ForkJoin框架適宜多計算,輕io,輕阻塞的狀況,且自己就是爲了不線程忙的忙死餓的餓死,所以每一個任務等待子任務執行結束是不可取的,這或許也是爲何有了ForkJoinTask,卻還要有CountedCompleter的緣由之一吧.

若咱們在任何每個任務中只是單純地將該分出去的子任務fork入池並執行本身那一部分,並不讓當前線程join子任務呢?(事實上不join子任務剛好能夠將當前線程的資源騰出來作其餘的事)

因此,除了前面5中提到的若干種(124)向前影響completer棧鏈的掛起數或root的完成態,還須要一個能向棧鏈後方有所影響的操做,好比幫助子任務的完成,畢竟子任務也是b中fork出來且由本身入隊的.

helpComplete方法就能夠作到這一點,它在ForkJoinPool中,它僅應在當前任務未完成時使用,首先它會嘗試將當前任務從出隊列並執行(ForkJoinPool::popCC及成功後續doExec,LIFO),出隊失敗則表示正在被執行甚至被偷去執行.出隊這一步以後,再嘗試本身的線程工做隊列中找出本身的子孫任務(FIFO)並進行執行(ForkJoinPool::pollAndExecCC).

而若執行完某個父任務的工做線程必然會調用tryComplete等有關方法,將自身或棧鏈後方的某一個completer的掛起數減一,甚至由於一些不合理的api使用(如直接更改了後方某個任務的掛起數量)而直接終止了root,將root任務標記成完成態.(注意前面強調的"運行完本身計算的部分",這就是否認本句話的關鍵了,前面也說明"helpComplete僅在當前任務未完成時使用",顯然,完成了本身負責的計算內容並不表明當前任務完成了,由於它的子任務尚未完成,所以它不會調用tryComplete,而且能夠去幫助子任務)

同時,執行完父任務負責的計算內容的任務線程也會去找它棧鏈後方的其餘任務,按照b的邏輯,這將是它的子任務,幫助它們完成,每完成一個子任務(子任務無子任務,再也不help的狀況),會進行tryComplete傳遞一次.

餘下的方法很簡單.

//重寫自ForkJoinTask的結果,前文也說過CountedCompleter也不維護result,返回null.
//但並行流或者一些其餘並行操做能夠實現此結果,好比ConcurrentHashMap中支持的map reduce操做.
public T getRawResult() { return null; }

//同上,默認空,一些子類會有特別的實現.
protected void setRawResult(T t) { }

顯然,completer棧鏈上的全部任務是能夠並行執行的,且每個完成均可以向後tryComplete一次,並在其後能夠幫助前面的任務完成,而咱們若實現上述兩個方法,徹底能夠將自身運算的結果設置進去,在root被安靜完成後,ForkJoinTask將能夠get到結果(或join也將返回結果),可在此時合併計算結果,有些結果顯然是能夠並行的.

一些操做,好比find類型,任何一個子任務完成了find,就能夠直接讓root結束,而後直接讓整條棧鏈上的任務cancelIgnoringExceptions.

一些須要聚合每個任務結果的操做,好比reduce類型,須要每一個父任務根據子任務的結果去reduce,它的父任務再根據他和兄弟任務的結果reduce,最終合併到root.顯然,mapper由子任務實現,reducer由父任務實現.

一些接近find或reduce類型(或者說find的變種),好比filter,每個任務都會有結果,這個結果多是本身負責的原集中的一部分子集,也可能就是個空集,父任務合併每一個子任務的結果集,直到root.

排序類型的操做,如使用歸併排序,顯然每一個父任務便是divider也是merger,分解出的每一個子集交給子任務去計算,父任務再去負責merge.

......

以上是ForkJoinTask的抽象子類CountedCompleter的源碼分析,接下來咱們繼續分析工做線程.

ForkJoinWorkerThread源碼

只要對java的線程結構稍有了解,ForkJoinWorkerThread的源碼十分簡單,且前面提過,ForkJoinTask被聲稱是一個輕量於普通線程和Future的實體,而它在ForkJoinPool中的運行載體即是ForkJoinWorkerThread,這個輕量究竟體如今何處?

//類簽名,直接繼承自Thread
public class ForkJoinWorkerThread extends Thread {
//每一個ForkJoinWorkerThread都只能屬於一個線程池,且保存該池的引用.
final ForkJoinPool pool; 
//每一個ForkJoinWorkerThread都有一個工做隊列, 顯然隊列中的任務就是該線程幹活的最小單位了.它也是工做竊取機制的核心.             
final ForkJoinPool.WorkQueue workQueue; 

//構造函數,建立時指定線程池.
protected ForkJoinWorkerThread(ForkJoinPool pool) {
    // 線程名稱
    super("aForkJoinWorkerThread");
    this.pool = pool;
    //將工做線程註冊到ForkJoinPool後會返回一個工做隊列,供當前線程使用和供其餘線程偷取.
    this.workQueue = pool.registerWorker(this);
}

//帶線程組的構造器
ForkJoinWorkerThread(ForkJoinPool pool, ThreadGroup threadGroup,
                     AccessControlContext acc) {
    super(threadGroup, null, "aForkJoinWorkerThread");
    //inheritedAccessControlContext是從Thread繼承下來的,字面意思是繼承的訪問控制上下文,設置爲acc.
    U.putOrderedObject(this, INHERITEDACCESSCONTROLCONTEXT, acc);
    //註冊入池以前,清除掉本地化信息
    eraseThreadLocals(); 
    this.pool = pool;
    this.workQueue = pool.registerWorker(this);
}

//返回註冊的池.

public ForkJoinPool getPool() {
    return pool;
}

//返回當前線程工做隊列在池中的索引,每一個隊列都會維護一個在池中的索引.
public int getPoolIndex() {
    return workQueue.getPoolIndex();
}

/**
 * Initializes internal state after construction but before
 * processing any tasks. If you override this method, you must
 * invoke {@code super.onStart()} at the beginning of the method.
 * Initialization requires care: Most fields must have legal
 * default values, to ensure that attempted accesses from other
 * threads work correctly even before this thread starts
 * processing tasks.
 */
//空函數,可交給子類實現,按照官方註釋,它的做用是在構造以後(這個構造不是指new出線程對象,
//而是在run方法已進入的時候,說明"構造"是指線程已經完成了建立可以正常運行),處理任務以前.
protected void onStart() {
}


//工做線程終止時的勾子方法,負責執行一些有關的清理操做.可是若要重寫它,必須在方法的
//最後調用super.onTermination.參數exception是形成該線程終止的異常.如果正常結束,
//則它是null.
protected void onTermination(Throwable exception) {
}

//核心方法.
public void run() {
    //doug在這一塊標註"只運行一次",查看ForkJoinPool的源碼,
    //ForkJoinPool中會有一個WorkQueue的數組,在取消線程的註冊後,
    //本線程關聯的WorkQueue會從該數組移除,但WorkQueue中的array不會置空.
    if (workQueue.array == null) {
        Throwable exception = null;
        try {
            //前面說過的預先操做
            onStart();
            //用線程池的runWorker方法執行,傳入隊列.
            pool.runWorker(workQueue);
        } catch (Throwable ex) {
            //發生異常,中斷前記錄下來
            exception = ex;
        } finally {
            try {
                //將記錄下來的異常調用勾子方法.
                onTermination(exception);
            } catch (Throwable ex) {
                if (exception == null)
                    //執行勾子方法自己出現了異常,記錄下來
                    exception = ex;
            } finally {
                //調用線程池的解除註冊方法,會將本線程的WorkQueue從數組中移除,同時使用上述異常.
                pool.deregisterWorker(this, exception);
            }
        }
    }
}

//擦除本地變量.把當前線程的兩個ThreadLocalMap所有置空
final void eraseThreadLocals() {
    U.putObject(this, THREADLOCALS, null);
    U.putObject(this, INHERITABLETHREADLOCALS, null);
}

//每正常運行完一次頂級task,就調用一次它.這個頂級任務自帶易誤解天性,其實能夠理解爲每一次從隊列取出的任務.
void afterTopLevelExec() {
}




//自帶子類.它不具有任何特殊權限,也不是用戶定義的任何線程組的成員,每次運行完一個頂級任務,
//則擦除本地化變量.
static final class InnocuousForkJoinWorkerThread extends ForkJoinWorkerThread {
   //自已建立默認線程組.
    private static final ThreadGroup innocuousThreadGroup =
        createThreadGroup();
    //訪問控制上下文支持權限.
    private static final AccessControlContext INNOCUOUS_ACC =
        new AccessControlContext(
            new ProtectionDomain[] {
                new ProtectionDomain(null, null)
            });
    //構造函數.
    InnocuousForkJoinWorkerThread(ForkJoinPool pool) {
        super(pool, innocuousThreadGroup, INNOCUOUS_ACC);
    }

    @Override 
    void afterTopLevelExec() {
        //在每一次從隊列取出的"頂級"任務運行後即擦除本地化變量.
        eraseThreadLocals();
    }

    @Override 
    public ClassLoader getContextClassLoader() {
        //若是獲取線程上下文類加載器,永遠直接返回系統類加載器.
        return ClassLoader.getSystemClassLoader();
    }

    //嘗試對未捕獲異常處理器的設置,忽略.
    @Override 
    public void setUncaughtExceptionHandler(UncaughtExceptionHandler x) { }

    //禁止直接設置線程的上下文類加載器.
    @Override 
    public void setContextClassLoader(ClassLoader cl) {
        throw new SecurityException("setContextClassLoader");
    }

    
    //建立一個以頂級線程組爲父的線程組.
    private static ThreadGroup createThreadGroup() {
        try {
            sun.misc.Unsafe u = sun.misc.Unsafe.getUnsafe();
            Class<?> tk = Thread.class;
            Class<?> gk = ThreadGroup.class;
            long tg = u.objectFieldOffset(tk.getDeclaredField("group"));
            long gp = u.objectFieldOffset(gk.getDeclaredField("parent"));
            //當前線程的所屬組.
            ThreadGroup group = (ThreadGroup)
                u.getObject(Thread.currentThread(), tg);
            //循環條件,當前線程的所屬組不是null
            while (group != null) {
                //不停地循環向上取parent
                ThreadGroup parent = (ThreadGroup)u.getObject(group, gp);
                if (parent == null)
                    //發現無parent的線程組,說明是系統頂級線程組,用它當parent建立一個"無害"線程組返回.
                    return new ThreadGroup(group,
                                           "InnocuousForkJoinWorkerThreadGroup");
                //有parent,把它賦給group開啓下一輪循環.
                group = parent;
            }
        } catch (Exception e) {
            //有異經常使用Error包裝拋出.
            throw new Error(e);
        }
        //不能return就拋出Error.
        throw new Error("Cannot create ThreadGroup");
    }
}

以上是工做線程的代碼,粗略總結一下它和普通線程的區別.

首先,它內部會維護一個工做隊列,用它來實現任務調度和竊取.

其次,它提供了一些擴展,如每次頂層任務運行結束,清理ThreadLocal,這也是一種保護機制,避免同線程的本地化數據隨之污染.但粗略去看ForkJoinPool的代碼,發現它只是在每次從隊列取出並運行完一個任務後清除,並稱這個爲"頂級循環",這倒也沒錯,但這個任務並不能稱之爲頂級任務,由於這裏的任務類型是ForkJoinTask,不必定是CountedCompleter等明顯標識了依賴關係的子類,因此父任務和子任務被塞進一個隊列,即便未被竊取,只由當前線程執行,兩次的本地化數據也是不一樣的.

不過若是咱們在ForkJoinTask的exec方法中加入本地化,或在CountedCompleter中加入本地化,顯然每個在今生成的子任務都會在相應的線程執行doExec時設置這些屬性,並在執行結束後清除.

最後官方提供的默認子類,以及一些線程組,優先級,權限等做者也未深刻研究,可是咱們構建線程池的時候有一個參數就是"線程工廠",瞭解下它或許能對後續的ForkJoinPool源碼閱讀有所幫助.

接下來簡述一個官方提供的案例,並以此聊一聊並行流.

官方案例

第一節論述了CountedCompleter,顯然它做爲一個抽象類,只是定義了某一些環節,以及一些環節的子環節的組合過程,而具體的實現與使用它定義的api則由用戶實現,它的源碼中並沒有使用(固然也能夠看一些子類,但比較複雜),在CountedCompleter的源碼註釋中,道格大神提供了若干案例,這裏舉出兩個來簡要說明一下前面論述過的使用方式,也能夠爲下一節論述官方提供的子類(並行流api中)提供閱讀基礎.

第一個是並行的可竊取的分治查找算法.

@Test
public void testDivideSearch(){
    Integer[] array = new Integer[10000000];
    for(int i = 0; i < array.length; i++){
        array[i] = i+1;
    }
    AtomicReference<Integer> result = new AtomicReference<>();
    Integer find = new Searcher<>(null, array, result, 0,
            array.length - 1,this::match).invoke();
    LOGGER.info("查找結束,任務返回:{},result:{}",find,result.get());

}

static class Searcher<E> extends CountedCompleter<E> {

    final E[] array; final AtomicReference<E> result; final int lo, hi;
    final Function<E,Boolean> matcher;

    Searcher(CountedCompleter<?> p, E[] array, AtomicReference<E> result,
             int lo, int hi,Function<E,Boolean> matcher){
        super(p);
        this.array = array;
        this.result = result;
        this.lo = lo;
        this.hi = hi;
        this.matcher = matcher;
    }
    @Override
    public void compute() {
        int l = this.lo;int h = this.hi;
        while(result.get() == null && h >= l){

            if(h - l >=2){
                int mid = (l + h)>>>1;
                //添加掛起任務數量,這樣當出現tryComplete時能夠觸發root的結束(未查到)
                addToPendingCount(1);
                new Searcher<E>(this,array,result,mid,h,matcher).fork();
                h = mid;
            }else{
                E x = array[l];
                if(matcher.apply(x) &&  result.compareAndSet(null,x)){
                    super.quietlyCompleteRoot();
                }
                break;
            }
        }
        //當前未有任何一個線程查到結果,當前任務也完成了子集查找,減小一個掛起數量,若掛起數已減至0則終止.
        if(null == result.get())
            tryComplete();
    }

}

private boolean match(Integer x) {
    return x > 2000000 &&  x%2 ==0 && x%3 == 0 && x%5 ==0 && x %7 ==0;
}

該案例的邏輯很簡單,給定一個很是大的數組,充分利用本機的資源去查找知足一個條件的元素.爲了方便,在具體的查找數據上選定了整型,查找的條件也很是簡單.

在該案例中,會對結果進行分治,首先分治出足夠多的子任務,剩下的不需再分的父任務由當前線程完成,子任務則壓入工做隊列,其餘空閒的線程就會來偷取子任務並執行.當有任務一個子任務查找到相應的數字後,即將它存放到result,並安靜地完成根任務.

此時整個任務鏈處在一個很是尷尬的狀況:查找到結果的子任務將root設置爲完成,而整條鏈上的非root任務均未完成.但因循環條件不知足,退出了循環.此時查到result已有值,並不執行最後的tryComplete,執行結束,任務的status依舊爲未完成,是否有重複執行的問題?

答案是沒有問題,由於ForkJoinTask絕對會在ForkJoinPool中調度(哪怕是common池),在common池中,任務執行前必須出隊,儘管compute方法在本例中沒有將這些任務設置爲完成,但任務不會被二次執行.可見,上一章中費大力介紹的status字段也有無用的時候.

可是除了root任務須要使用到獲取結果的功能,須要保證status是負數,它產生的子孫任務還有什麼用呢?全部compute方法會由於循環停止而結束,此後的這些任務不存在任何外部引用,會被gc清理,即便存在外部引用,用它去獲取子孫任務的執行狀況或result也沒有任何意義.

顯然這個案例解決了至少兩個疑問,一是怎麼實現一個保存result的ForkJoinTask,二是ForkJoin框架如何在查找方面大幅提高性能,很明顯,相比單線程遍歷的辦法,此例多線程查詢,且任何一個子任務在並行條件下完成了查詢,整個大任務都可以終止.

第二個是傳說中的map reduce.大數據中常使用此概念(跨節點).

在並行流中,map能夠表明非阻斷操做,reduce能夠表明阻斷操做,可是reduce一樣能夠並行地執行.

道格在註釋上給出了兩個map reduce案例,咱們只看第一個,它也是後續並行流一節咱們要看的例子比較相近的解法.方法二有些繞,較難理解,但也優雅.

@Test
public void testMapReduce() {
    Integer[] array = {1, 2, 3};
    //方法一.
    Integer result = new MapRed<>(null, array, (a)->a+2, (a,b)->a+b,  0,array.length).invoke();
    LOGGER.info("方法一result:{}",result);
    //方法二我就不抄了,就在官方註釋上.
    result = new MapReducer<>(null, array, (a) -> a + 1
            , (a, b) -> a + b, 0, array.length, null).invoke();
    LOGGER.info("方法二result:{}", result);

}


/**
 * 第一種map reduce方式,很好理解.
 * @param <E>
 */
private class MapRed<E> extends CountedCompleter<E> {
    final E[] array;
    final MyMapper<E> mapper;
    final MyReducer<E> reducer;
    final int lo, hi;
    MapRed<E> sibling;//兄弟節點的引用
    E result;

    MapRed(CountedCompleter<?> p, E[] array, MyMapper<E> mapper,
               MyReducer<E> reducer, int lo, int hi) {
        super(p);
        this.array = array;
        this.mapper = mapper;
        this.reducer = reducer;
        this.lo = lo;
        this.hi = hi;
    }

    public void compute() {
        if (hi - lo >= 2) {
            int mid = (lo + hi) >>> 1;
            MapRed<E> left = new MapRed(this, array, mapper, reducer, lo, mid);
            MapRed<E> right = new MapRed(this, array, mapper, reducer, mid, hi);
            left.sibling = right;
            right.sibling = left;
            //只掛起右任務
            setPendingCount(1);
            right.fork();
            //直接運算左任務.
            left.compute();     
        } else {
            if (hi > lo)
                result = mapper.apply(array[lo]);
            //它會依次調用onCompletion.而且是本身調本身或completer調子,
            //且只有左右兩個子後完成的能調成功(父任務的掛起數達到0).
            tryComplete();
        }
    }

    public void onCompletion(CountedCompleter<?> caller) {
        //忽略本身調本身.
        if (caller != this) {
            //參數是子任務.
            MapRed<E> child = (MapRed<E>) caller;
            MapRed<E> sib = child.sibling;
            //設置父的result.
            if (sib == null || sib.result == null)
                result = child.result;
            else
                result = reducer.apply(child.result, sib.result);
        }
    }

    public E getRawResult() {
        return result;
    }
}
//mapper和reducer簡單的不能再簡單.
@FunctionalInterface
private static interface MyMapper<E> {
    E apply(E e);
}
@FunctionalInterface
private static interface MyReducer<E> {
    E apply(E a, E b);
}

上面的邏輯也很簡單,首先就是對任務的分解,簡單的將任務分爲左和右,左直接由父任務執行(可能再分),右則入池,全部子任務直到不能再分(葉子任務)以map爲result,每一個葉子任務完成後會調用tryComplete.

這個動做會觸發一系列的completer棧元素的掛起數降低或完成,顯然,若是把completer理解爲一個普通樹(這是做者不多見到的非二叉樹的狀況,儘管這個例子寫成了二叉樹,咱們徹底能夠在compute中將父任務一分爲多,而不是限2個),從葉子節點開始,每一個葉子節點完成(result是mapper的結果)會嘗試onCompletion並減小父節點的掛起任務數,但只有同父節點的最後一個兄弟節點能夠進入onCompletion設置父節點的結果,而且因爲這個設置過程的前提是父節點符合掛起任務數爲0,所以符合循環繼續的條件,葉子節點的動做會繼續向上判斷父節點的父節點,直到root爲止.假設線程數量足夠,保證每一個子任務都有一個線程處理,那麼深度每上一層,就會有一半(非二叉樹的狀況每一個父節點只能有一個經過)的執行葉子節點任務的線程因不符合某個任務的掛起數量爲0的條件而退出,這樣逐級傳導,最後到root調用它最後一個子節點的onCompletion,使用reducer進行合併.

本例中進行結果合併的寫法(onCompletion)只適合二叉樹,有興趣的讀者能夠看看道格在註釋中給出的第二種寫法,幾叉均可以.並且該實現很優雅,並未寫onCompletion函數,可是寫法真心夠繞的.

並行流簡述

在JAVA8中支持了lamda表達式的同時,也支持了函數式編程,由此出現了一種新型的計算方式:流式計算,也出現了一種讓包括做者在內不少人興奮不已的編程方式:響應式編程.

流式計算的核心在於Stream api,流有不少分類,好比並行流和串行流,這點能夠顧名思義,一樣的,流中的每個操做均可以劃分類型,好比阻斷操做和非阻斷操做.

java中實現並行流就是基於這些操做,CountedCompleter的一些子類就是這些操做的類型,顯然,如在前一篇文章所說,使用了並行流,就是使用了ForkJoin框架.

當咱們使用下面的代碼,會發生什麼操做?

Stream.of(1,2,3,4,5).parallel().map(x -> x + 1).reduce((a, b) -> a + b).get();

//map只是將動做簡單地記了下來,包裝起來,等到阻斷操做時纔會真正執行. 位於ReferencePipeline
public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
    Objects.requireNonNull(mapper);//非空檢查
    //返回一個無狀態操做.
    return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
        @Override
        Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
            //典型的適配器模式.將action一概封裝爲Sink.
            return new Sink.ChainedReference<P_OUT, R>(sink) {
                @Override
                public void accept(P_OUT u) {
                    downstream.accept(mapper.apply(u));
                }
            };
        }
    };
}
//阻斷操做reduce位於 ReferencePipeline
public final Optional<P_OUT> reduce(BinaryOperator<P_OUT> accumulator) {
    return evaluate(ReduceOps.makeRef(accumulator));
}
//AbstractPipeline
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
    assert getOutputShape() == terminalOp.inputShape();
    if (linkedOrConsumed)
        throw new IllegalStateException(MSG_STREAM_LINKED);
    linkedOrConsumed = true;

    return isParallel()
           ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
           : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}
//TerminalOp阻斷操做接口的默認方法
default <P_IN> R evaluateParallel(PipelineHelper<E_IN> helper,
                                  Spliterator<P_IN> spliterator) {
    if (Tripwire.ENABLED)
        Tripwire.trip(getClass(), "{0} triggering TerminalOp.evaluateParallel serial default");
    return evaluateSequential(helper, spliterator);
}
//看ReduceOps 它返回了一內部類ReduceTask
public <P_IN> R evaluateParallel(PipelineHelper<T> helper,
                                     Spliterator<P_IN> spliterator) {
        return new ReduceTask<>(this, helper, spliterator).invoke().get();
    }
//內部類ReduceTask間接繼承自CountedCompleter
private static final class ReduceTask<P_IN, P_OUT, R,
                                      S extends AccumulatingSink<P_OUT, R, S>>
        extends AbstractTask<P_IN, P_OUT, S, ReduceTask<P_IN, P_OUT, R, S>> {
    private final ReduceOp<P_OUT, R, S> op;

    ReduceTask(ReduceOp<P_OUT, R, S> op,
               PipelineHelper<P_OUT> helper,
               Spliterator<P_IN> spliterator) {
        super(helper, spliterator);
        this.op = op;
    }

    ReduceTask(ReduceTask<P_IN, P_OUT, R, S> parent,
               Spliterator<P_IN> spliterator) {
        super(parent, spliterator);
        this.op = parent.op;
    }
    //老外起的名子,造小孩. 
    @Override
    protected ReduceTask<P_IN, P_OUT, R, S> makeChild(Spliterator<P_IN> spliterator) {
        //和上面的例子很是類似的代碼,只是封裝更好.
        return new ReduceTask<>(this, spliterator);
    }

    @Override
    protected S doLeaf() {
        //葉子節點作這個.
        return helper.wrapAndCopyInto(op.makeSink(), spliterator);
    }

    //重寫了前面提過的onCompletion函數
    @Override
    public void onCompletion(CountedCompleter<?> caller) {
        if (!isLeaf()) {
            //不是葉子節點.這條件,和前面我們分析的多麼匹配.
            //計算左結果
            S leftResult = leftChild.getLocalResult();
            //聯合右結果.
            leftResult.combine(rightChild.getLocalResult());
            //聯合完的結果就是當前completer的結果.
            setLocalResult(leftResult);
        }
        // 直接父類是AbstractTask,它會對父,左右子幫助gc.
        super.onCompletion(caller);
    }
}
//AbstractTask幫助gc
public void onCompletion(CountedCompleter<?> caller) {
    spliterator = null;
    leftChild = rightChild = null;
}
//更多實現細節自閱...

顯然,並行流(至少我舉的這個例子)是基於ForkJoin框架的.分治的思想與前面道格的例子類似,只是更加優雅和封裝更好.有了前面的基礎,若要詳細熟悉並行流原理,須要進一步瞭解的只有他們的繼承樹,分割聚合組件等邊角料,核心的調度思想已經再也不是困難.

回到問題,當咱們使用並行流時發生了什麼?首先是非阻斷操做時,與串行流狀況一樣,也是先將action封裝成適配器,僅在阻斷操做發生時的調度不一樣,並行流在阻斷操做下使用ForkJoin框架進行調度,任務的分割則使用它的Splitor,結果的合併也有它的Combiner.其餘的流程與上面的案例無異.

後語

1.CountedCompleter使用普通樹的結構存放動做,可是它又是另類的樹,由於子節點能找到父節點,父節點卻找不到子節點,而只知道子節點表明的動做未執行的數量,所以或許從訪問方式的角度來看仍是用棧來理解更好.在這裏樹既是數據結構,也是一個另類的操做棧.只從一個completer往下看,它是個棧,但從父節點的角度來說,它是一個訪問不到子節點的普通樹(或許咱們不該該強行爲它套上一個數據結構,否則總以爲不三不四,可是用樹這個形狀便於理解).每一個節點會存放掛起任務數量,每一個節點的任務完成未必會設置它本身的完成態,但會嘗試將completer父元素棧(或者樹的一條線)上的每一個任務掛起數量減一或將根節點安靜置爲完成態.關於具體的理解和代碼實現,以及如何保存一個任務的運行結果,能夠參考前面案例的章節,也能夠以此爲基礎去看並行流的源碼,但也要相應的理解並行流爲了便捷實現而提供的各類分割合併組件.

2.ForkJoinWorkerThread是運行在ForkJoinPool中的主要線程,它內部維護了一個工做任務隊列,並存放了該隊列在線程池中的間接索引.藉此實現任務的竊取,避免過於空閒等待,任務fork會直接push到該隊列,第一次擴容時,纔給該隊列初始化任務數組,當線程從池中卸載時,不會清除掉該數組,這樣線程沒法再次啓動.線程的啓動有一些勾子,官方提供的線程工廠有兩個,一個直接建立ForkJoinWorkerThread,另外一個建立它的子類

InnocuousForkJoinWorkerThread,它除了一些安全策略外,最大的區別在於ForkJoinWorkerThread在註冊入池前進行本地化數據的清理,而它則每次完成一個主任務處理就清理一次.

3.並行流是ForkJoin框架的一個典型應用,JAVA8 Stream api中的並行流定義了大量的以CountedCompleter爲基礎的操做.利用分割/合併和周邊組件實現了基於ForkJoin框架的並行計算調度.

相關文章
相關標籤/搜索