JUC 之Phaser

前言

在前面的幾篇文章中詳述了ForkJoin框架的若干組分,在相應的官方文檔中總會不時地提起"Phaser",一樣的,也提到Phaser能夠用於幫助運行在ForkJoinPool中的ForkJoinTask運行時保持有效的執行並行度(其實特指其餘task都在等待一個phase的前進時).node

熟悉JUC的朋友都知道它的大概組成部分包含:Containers(支持併發的容器),Synchronizers(同步器),Executors(線程池),BlockingQueue(阻塞隊列),Atomic(原子類),Lock and Condition(鎖).而Phaser和CyclicBarrier,Semaphore等同樣是一個同步器.算法

本文主要介紹Phaser的內部實現,粗略介紹使用,它的源碼相比於線程池較爲簡單,但最好能對比其餘同步器來了解,讀者最好擁有juc其餘同步器,原子類,部分ForkJoin框架的基礎.編程

同時,本文也會再次提到ForkJoinPool::managedBlock(blocker),以前在ForkJoinPool一文提到了實現和接口,而在CompletableFuture中見到了一個blocker的實現.併發

Phaser源碼

首先來看一些與Phaser狀態有關的簡單的常量.框架

//64位整數表示Phaser的狀態.
private volatile long state;

private static final int  MAX_PARTIES     = 0xffff;//最大parties,後16位表示.
private static final int  MAX_PHASE       = Integer.MAX_VALUE;//最大phase,最大整數值.
private static final int  PARTIES_SHIFT   = 16;//取parties使用的移位數,16
private static final int  PHASE_SHIFT     = 32;//取phase的移位數,32
private static final int  UNARRIVED_MASK  = 0xffff;      //未到的,取後16位.
private static final long PARTIES_MASK    = 0xffff0000L; //參加者,17-32位.
private static final long COUNTS_MASK     = 0xffffffffL; //數量,後32位.
private static final long TERMINATION_BIT = 1L << 63;//終止態,首位.

// 特殊值.
private static final int  ONE_ARRIVAL     = 1;
private static final int  ONE_PARTY       = 1 << PARTIES_SHIFT;
private static final int  ONE_DEREGISTER  = ONE_ARRIVAL|ONE_PARTY;//第1位和17位.顯然,它表示了一個ONE_ARRIVAL信息和PARTY信息.
private static final int  EMPTY           = 1;

//對一個state s計算unarrived的count,
private static int unarrivedOf(long s) {
    //直接取整數位,若是等於EMPTY(1)則返回0,不然取後16位.
    int counts = (int)s;
    return (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
}

//對一個state,取出parties信息,直接取state的17至32位.
private static int partiesOf(long s) {
    return (int)s >>> PARTIES_SHIFT;
}
//對於一個state,取出phase信息,直接取前32位.
private static int phaseOf(long s) {
    return (int)(s >>> PHASE_SHIFT);
}
//對於一個state,取出arrived信息
private static int arrivedOf(long s) {
    int counts = (int)s;
    //state的後32位等於1(EMPTY)返回0,不然返回parties(state的17至32位,參考上面的partiesOf方法)和UNARRIVED(state的後16位)的差.
    return (counts == EMPTY) ? 0 :
        (counts >>> PARTIES_SHIFT) - (counts & UNARRIVED_MASK);
}

上面都是一些常量,沒什麼可分析的,簡單來個總結.函數

Phaser用一個long型的state保存狀態信息.工具

state的前32位表示phase,後16位表示unarrivied,17至32位表示parties,parties減去unarrived即arrived.單元測試

下面咱們看一些成員變量和有關函數.測試

//this的父,能夠是null表示none
private final Phaser parent;

//phaser顯然是個樹的結果,root表明根,若是當前phaser不在樹內,則root==this
private final Phaser root;


//偶數隊列和奇數隊列.它們存放等待線程棧的頭,爲了減小當添加線程與釋放線程的競態,
//這裏使用了兩個隊列並互相切換,子phaser共享root的隊列以加快釋放.
private final AtomicReference<QNode> evenQ;
private final AtomicReference<QNode> oddQ;

//決定某個phase的等待線程隊列.
private AtomicReference<QNode> queueFor(int phase) {
    //選擇隊列的方法,若是參數phase是偶數,使用evenQ,不然oddQ.
    return ((phase & 1) == 0) ? evenQ : oddQ;
}

//出現arrive事件時的邊界異常信息.
private String badArrive(long s) {
    return "Attempted arrival of unregistered party for " +
        stateToString(s);
}

//註冊時的邊界異常信息.
private String badRegister(long s) {
    return "Attempt to register more than " +
        MAX_PARTIES + " parties for " + stateToString(s);
}
//他們都用到的stateToString(s),計算參數s對應的phase,parties,arrived.
private String stateToString(long s) {
    return super.toString() +
        "[phase = " + phaseOf(s) +
        " parties = " + partiesOf(s) +
        " arrived = " + arrivedOf(s) + "]";
}

爲了便於理解,先來看隊列的實現.ui

//表示等待隊列的QNode,實現了ManagedBlocker
static final class QNode implements ForkJoinPool.ManagedBlocker {
    //存放所屬phaser
    final Phaser phaser;
    //所屬phase
    final int phase;
    //是否可擾動
    final boolean interruptible;
    //是否認時
    final boolean timed;
    //是否已擾動
    boolean wasInterrupted;
    //計時相關
    long nanos;
    final long deadline;
    //關聯線程,當是null時,取消等待.
    volatile Thread thread; 
    //下一個QNode
    QNode next;

    QNode(Phaser phaser, int phase, boolean interruptible,
          boolean timed, long nanos) {
        this.phaser = phaser;
        this.phase = phase;
        this.interruptible = interruptible;
        this.nanos = nanos;
        this.timed = timed;
        this.deadline = timed ? System.nanoTime() + nanos : 0L;
        //取當前線程.
        thread = Thread.currentThread();
    }
    //isReleasable方法
    public boolean isReleasable() {
        if (thread == null)
            //1.線程已置空(如2),返回true釋放.
            return true;
        if (phaser.getPhase() != phase) {
            //2.發現phaser所處的phase不是構建QNode時的phase了,就置線程爲空,返回true.
            thread = null;
            return true;
        }
        if (Thread.interrupted())
            //3.若是當前線程擾動了.
            wasInterrupted = true;
        if (wasInterrupted && interruptible) {
            //4.發現擾動標記,而且QNode配置爲可擾動,則置線程null並返回true
            thread = null;
            return true;
        }
        if (timed) {
            //5.定時邏輯,還有nanos,計算新的時長.
            if (nanos > 0L) {
                nanos = deadline - System.nanoTime();
            }
            if (nanos <= 0L) {
                //已經到時間,返回true,線程置空.
                thread = null;
                return true;
            }
        }
        return false;
    }
    //block邏輯
    public boolean block() {
        if (isReleasable())
            return true;
        else if (!timed)
            //不定時的park
            LockSupport.park(this);
        else if (nanos > 0L)
            //定時的狀況.
            LockSupport.parkNanos(this, nanos);
        //老規矩
        return isReleasable();
    }
}

前面介紹過CompletableFuture的Singnaller,以及ForkJoinPool中的managedBlock,這一塊的邏輯顯然得心應手.

很明顯,若是咱們在ForkJoinPool中使用它做爲blocker,並在相應的ForkJoinTask的exec或CountedCompleter的compute方法中使用ForkJoinPool::managedBlock(blocker),將每一個ForkJoinWorkerThread在阻塞前構建一個QNode進入Phaser的等待隊列(雖然尚未講到相關內容,可是Phaser顯然不用咱們直接操做內部類QNode),那麼它將依照上述邏輯進行補償,保障有效的並行度.

前面完成了承前啓後,預熱到此結束,開始看Phaser的核心方法.

//doArrive方法
//它是arrive和arriveAndDeregister方法的主要實現.手動調用這些方法能夠加速經過和最小化競態窗口期.
//參數表明要從當前state中減去的調整數值,它的單位依託於業務,當爲arrive時減去的單位爲ONE_ARRIVAL,
//當爲arriveAndDeregister時減去的單位爲ONE_DEREGISTER.
private int doArrive(int adjust) {
    final Phaser root = this.root;
    for (;;) {
        //1.變量s初始化,取決因而否當前Phaser是root.不是root將試圖從root同步滯後的state.
        long s = (root == this) ? state : reconcileState();
        //計算phase,前32位.
        int phase = (int)(s >>> PHASE_SHIFT);
        if (phase < 0)
            //2.負數直接返回.說明原來的state首位就是1,前面的TERMINATE_BIT就是64位置1.
            return phase;
        //取count,後32位.
        int counts = (int)s;
        //計算unarrived,和前面同樣的邏輯.
        int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
        if (unarrived <= 0)//2.1
            //沒有unarrived了,說明不該該調用此方法,拋出異常,信息就是前面介紹過的badArrive
            throw new IllegalStateException(badArrive(s));
        //3.嘗試將state減去adjust數.
        if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adjust)) {
            //3.1cas成功後,unarrived餘1,則前進一個phase
            if (unarrived == 1) {
                //3.1.1取出parties做爲下一個state的基礎.
                long n = s & PARTIES_MASK;
                //3.1.2 下一個unarrived,數值上等於parties.
                int nextUnarrived = (int)n >>> PARTIES_SHIFT;
                if (root == this) {
                    //3.1.3當前Phaser是root,onAdvance返回true,則加上終止信號.
                    if (onAdvance(phase, nextUnarrived))
                        n |= TERMINATION_BIT;
                    else if (nextUnarrived == 0)
                        //3.1.4 onAdvance返回false,而計算得出的nextUnarrived是0,即沒有parties,n加上一個empty(1)
                        n |= EMPTY;
                    else
                        //3.1.5nextUnArrived不是0,加到n上.
                        n |= nextUnarrived;
                    //3.1.6前面的流程完成了state的後32位(parties和unarrived),接下來處理前32位.
                    //限定在MAX_PHASE以內,對當前phase加1.
                    int nextPhase = (phase + 1) & MAX_PHASE;
                    //將nextPhase的值加到n的前32位.並用n去cas掉原來的state,由於有3處入口的cas,此處必定能成功
                    n |= (long)nextPhase << PHASE_SHIFT;
                    UNSAFE.compareAndSwapLong(this, stateOffset, s, n);
                    //更新到新的phase,喚醒等待的waiter.
                    releaseWaiters(phase);
                }
                //3.1.7當前Phaser不是root,當nextUnarrived計算得0時,像父傳遞解除註冊,參數ONE_DEREGISTER
                //會同時減去一個unarrived和一個parties.下輪循環正常應進入3.1.8
                else if (nextUnarrived == 0) { 
                    phase = parent.doArrive(ONE_DEREGISTER);
                    //完成傳遞後,將本身的state置empty.
                    UNSAFE.compareAndSwapLong(this, stateOffset,
                                              s, s | EMPTY);
                }
                else
                    //3.1.8,當前Phaser不是root,計算的nextUnarrived非0,像父傳遞一個arrive事件,減去一個unarrived.
                    phase = parent.doArrive(ONE_ARRIVAL);
            }
            //3.2返回當前phase,多是已進入3.1遞增的.僅有此處可退出循環.
            return phase;
        }
    }
}

關於該方法的執行流程,咱們結合幾個周邊方法一併分析,先來看註冊方法和onAdvance勾子.

//註冊和批量註冊.參數表明parties和unarrived字段的增長數,它必須大於0.
private int doRegister(int registrations) {
    // 1.用參數計算一個adjust,同時包含parties和arrive.
    long adjust = ((long)registrations << PARTIES_SHIFT) | registrations;
    final Phaser parent = this.parent;
    int phase;
    //循環嘗試更改.
    for (;;) {
        //2.存在parent,則用root的phase調整this的state.
        long s = (parent == null) ? state : reconcileState();
        //取出當前state中保存的counts,parties,unarrived信息.
        int counts = (int)s;
        int parties = counts >>> PARTIES_SHIFT;
        int unarrived = counts & UNARRIVED_MASK;
        if (registrations > MAX_PARTIES - parties)
            //要註冊的數量大於了餘量,拋出異常.
            throw new IllegalStateException(badRegister(s));
        //3.計算出phase
        phase = (int)(s >>> PHASE_SHIFT);
        if (phase < 0)
            //phase爲負說明state爲負,即終止態,終止.
            break;
        //4.當前state表示的參與數非空的邏輯,當前註冊非首次註冊.
        if (counts != EMPTY) {           
            if (parent == null || reconcileState() == s) {
                //this是root或者從root同步的state不變,繼續執行,不然從新循環.
                if (unarrived == 0)  
                    //4.1本輪循環經過原state計算的unarrived爲0,說明應等待下一phase,使用root等待      
                    root.internalAwaitAdvance(phase, null);
                else if (UNSAFE.compareAndSwapLong(this, stateOffset,
                                                   s, s + adjust))
                    //4.2本輪循環未發現應等待下一phase,嘗試原子更新,增長adjust到state上.
                    break;
            }
        }
        //5.當前不存在counts,且自身就是root,表明root的首次註冊.
        else if (parent == null) { 
            //5.1計算下一個state,由於沒有參與數,使用phase初始化前32位,並使用adjust作後32位.        
            long next = ((long)phase << PHASE_SHIFT) | adjust;
            if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next))
                //5.2 cas成功,退出,不成功,下輪循環.
                break;
        }
        //6.是首次註冊,但也不是root的邏輯.表明非root的Phaser的首次註冊.
        else {
            //6.1對當前Phaser加鎖並double check,避免同時調用.加鎖失敗的線程將在後續進入2的邏輯.
            synchronized (this) {  
                //double check state未發生改變.   
                if (state == s) {
                    //6.2首先向父Phaser註冊1.
                    phase = parent.doRegister(1);
                    if (phase < 0)
                        //發現進入終止態,直接中止.
                        break;
                    
                    //6.3向父Phaser註冊成功,循環嘗試cas掉老的state,新state的算法同上,phase加adjust.
                    //在整個while循環中,再也不考慮phase進入終止態的狀況,由於這些操做處於同一個"事務"中,
                    //且因競態等緣由,若某次cas時計入了負數的phase,方法返回後也能夠及時發現.
                    while (!UNSAFE.compareAndSwapLong
                           (this, stateOffset, s,
                            ((long)phase << PHASE_SHIFT) | adjust)) {
                        //若是cas不成功,則讀取s爲新的state,計算新的phase並從新循環.
                        s = state;
                        phase = (int)(root.state >>> PHASE_SHIFT);
                        // assert (int)s == EMPTY;
                    }
                    //6.4cas成功後退出循環.
                    break;
                }
                //若是if(state==s)判斷失敗,說明有別的線程有當前線程進入synchronized塊前已經加鎖並執行了內部的邏輯且稍後釋放了鎖,
                //這樣當前線程加鎖成功,但if判斷失敗,它會當即釋放鎖並返回到2.
            }
        }

    }
    return phase;
}


//使用root的phase調整this的state,更新滯後的結果.這通常發生在root前進了phase可是
//子phaser尚未作到這一步,這種狀況下,子phaser必須完成這個前進的步驟,這一過程當中,phase將
//被置爲root的phase,unarrived則會重置爲parties,若parties爲0,則置爲EMPTY.返回結果state.
private long reconcileState() {
    final Phaser root = this.root;
    long s = state;
    //不是root才進行下一步.
    if (root != this) {
        int phase, p;
        //cas,phase採用root,parties不變,unarrived重置爲parties或EMPTY.
        while ((phase = (int)(root.state >>> PHASE_SHIFT)) !=
               (int)(s >>> PHASE_SHIFT) &&
                //phase滯後於root
                //嘗試cas.
               !UNSAFE.compareAndSwapLong
               (this, stateOffset, s,
                //肯定新state的前32位,使用root的phase.
                s = (((long)phase << PHASE_SHIFT) |
                    //新phase<0,後32位直接取this的state表示的counts.
                     ((phase < 0) ? (s & COUNTS_MASK) :
                    //phase有效,this的state表示的parties爲0,則後32位使用empty
                      (((p = (int)s >>> PARTIES_SHIFT) == 0) ? EMPTY :
                        //不然,後32位使用parties.
                       ((s & PARTIES_MASK) | p))))))
            s = state;
    }
    return s;
}


//onAdvance勾子方法,參數爲當前phase和註冊的parties數.
//默認實現爲parties數爲0,方法返回true時,調用者會嘗試終止Phaser.(參考前面的doArrive).隨後調用isTerminated方法將返回true.
//執行此方法時拋出的運行時異常或Error將直接上拋給嘗試advance相應的phase的線程,這種狀況下不會發生phase的advance.
//方法的入參表示的是Phaser當前的state(未advance前),所以若在onAdvance方法中執行arrive,regist,waiting這三種操做的行爲是不肯定的也不可靠的.
//若是當前Phaser是一個級聯的成員,那麼onAdvance只會由root在每次advance時調用.
//方法的默認實現返回true的場景目前只能是通過數次arriveAndDeregister調用形成parties歸零的結果.咱們繼承Phaser能夠輕易地重寫此行爲,
//好比簡單粗暴地返回false,那麼將永遠容許新的註冊.
protected boolean onAdvance(int phase, int registeredParties) {
    return registeredParties == 0;
}

通過前面的代碼分析,已經對Phaser的核心函數doRegister,doArrive有了全面的瞭解.

二者都會在一開始同步root的phase,且若是出現落後root的狀況,同步了新的phase的同時,也會從新初始化unarrived,而且使用parties的值.

doArrive方法會每次調整unarrived數量(也可包含parties數量,若是使用瞭解除註冊),當Phaser調用自身的arrive/arriveAndDeregister時,會作出相應的減小,並根據是否爲root而決定向上遞歸.

Phaser減小自身unarrived信號(也可能同時有parties信號)後,若發現這已是最後一個unarrived信號,則進行接下來的判斷:

1.當前Phaser是root,advance並喚醒waiter.(重要的喚醒操做執行點,root一輪完成)

2.當前Phaser不是root,且它已經不具有繼續下一輪的條件(計算nextUnarrived爲0,即parties已經被arriveAndDeregister置0),則從父Phaser減小一個unarrived和parties.

3.當前Phaser不是root,但它仍具備parties,知足進行下一輪的條件(計算nextUnarrived不是0),則從父Phaser減小一個unarrived,但不減小parties.

顯然,子Phaser的最後一個unarrived的消失必定會形成父的unarrived減小,子Phaser不能繼續下一phase的register和arrive時,從父Phaser中卸載.

若不是本Phaser的最後一個unarrived信號,則直接結束,至關於只進行了上面的減小信號操做.

doRegister方法的邏輯大體相反,不一樣於doArrive,它的參數registrations同時做用於parties和unarrived,即兩個位上同時加上registrations參數.它的大體邏輯:

1.當前註冊並不是首次註冊,且出現unarrived==0,即本輪已經完成了arrive,那麼本輪將不能註冊,須要等待root更新到下輪.(這也是咱們碰到的第一個阻塞)

2.當前註冊並不是首次註冊,unarrived也不是0,則在本phase進行註冊,增長相應的parties和unarrived.

3.當前註冊是root的首次註冊,給root的state加上相應的parties和unarrived.

4.當前註冊是非root的首次註冊,加鎖(this),對本身的state加上相應的parties和unarrived(同上,以registrations爲單位),而對parent加上一個parties和unarrived單位.

很明顯,對於單Phaser的狀況很是好理解,每次減小unarrived數量(先不考慮減小parties),則最終致使Phaser自身進入下一個phase,而後從新初始化unarrived到下一輪,unarrived的新值是前一輪剩下的parties數量.

當咱們同時也嘗試減小parties數量,即解除parties的註冊,最終致使沒有parties,那麼Phaser將進入終止態.

整個過程當中,只要Phaser沒進入終止態,隨時能夠進行新的註冊,並增長parties和unarrived的數量.每一個arrive能夠減小unarrived的數量爲任何正整數,不必定是1.

對於多Phaser的狀況,有兩個特殊點:

1.對任意Phaser樹中的某一個Phaser調用註冊操做,會令自身加上相應參數個parties和unarrived單位,僅會在該Phaser第一次註冊時增長父Phaser(極端可能,僅從一個葉子節點第一個註冊的狀況下可一直遞歸到root)的parties數和unarrived數各1單位(不論參數是多少).

2.對任意Phaser樹中的某一個Phaser調用arrive操做,會令自身減去相應的參數個parties和unarrived單位,同時僅當本Phaser此時是最後一個unarrived時,會減去父Phaser的一個unarrived單位(當前子Phaser仍舊有parties能夠構建下一phase),或減去父Phaser一個Parties和unarrived單位.(極端狀況下,每一級都是最後一個unarrived時,減小葉子節點的最後一個unarrived會遞歸到root).

每新增一個子Phaser,父Phaser就會增長一個要完成觸發phase的advance前必需要等到arrive的單位;每個子Phaser中全部的arrive完成,父Phaser都將減小一個要等待advance所必需觸發的arrive.

目前沒有看到await方法,但能夠提早說明,等待操做徹底依賴於root是否完成本輪.也就是全部子Phaser都完成了同一輪(arrive打滿),才能讓父Phaser自己減去一個全部arrive單位,再觸發父Phaser本輪的完成,此時對任何已完成的Phaser進入註冊,都會進入上述的root.internalAwaitAdvance(phase, null)方法等待root進入下一phase.若是對已經完成全部arrive的Phaser繼續進行arrive操做,由於unarrived已是0,則會拋出異常.

因此對於使用子Phaser的場景,若是發生很巧妙的狀況,Phaser樹上當前子Phaser的arrive結束條件知足了,使得新來的註冊只能等待下一輪次,而其餘分支的子Phaser又恰恰不能完成本輪次,那麼新的phaser.doRegister方法將阻塞在此.

好在咱們使用Phaser可能會相似CyclicBarrier的使用方式,可對每一輪(phase)進行註冊並等待(也許只等一輪,那麼arrive就要帶上deregister),每一輪最後一個線程arrive了,就會中止全部線程的等待,讓全部線程繼續執行,同時開啓了下一輪次,這些線程此時又能夠不經註冊直接在新的輪次中進行等待,直到最後一個arrive了,再次喚醒全部線程並繼續執行,同時Phaser再前進一輪,如此往復.中間使用arrive並deregister的線程會從本輪起減小一個unarrive數量(由於parties也減小了,因此再下一輪初始化unarrive數量時也會減小一次).咱們可讓這些線程參與任意的輪次,但要注意的是,若是有線程中途不參加了,必定要解除註冊,不然由於每輪初始化時,要等待arrive的數量都是上一輪剩下的parties數量,有線程中止了執行,卻不減小parties數,那麼下輪全部等待的線程將永遠等不到phaser知足喚醒的條件.

上述的過程當中能夠明顯的看出,目前已介紹的兩個重要核心函數:註冊和arrive並無直接記錄和操做線程的操做,相應的操做在等待方法和喚醒方法中(前面提到過release),咱們稍後介紹.

如今假設一個特殊的使用場景,也能夠區別於CyclicBarrie和CountDownLatch的使用.仍是上面的例子,可是咱們準備的線程數與Phaser的parties數/unarrived數不一樣(通常前者要多些),會發生什麼事?

首先建立了Phaser,不指定最初parties數,並用每一個線程去註冊(我甚至能夠用一個線程去重複註冊,每次的參數registrations還能夠不一樣,註冊的做用並非將當前線程壓入隊列,而是爲本phase設置一個unarrive數量,以控制到達下個phase前必須有多少次arrive的發生),則parties數和unarrived的初值徹底與此有關,是一個依託於咱們隨意註冊而產生的隨意值.那麼假定咱們的線程數量大於這個parties數(假定調用註冊方法的線程和arrive及等待的線程無關),並令有的線程執行arrive(徹底能夠一次arrive減去多個信號量,甚至一個線程屢次arrive),有的線程執行await等待信號advance到下一個phase(一個線程在一個週期只能調用一次),有的線程執行了arrive也等待phase前進(這種狀況一個線程一週期也只能一次.其實這些分別對應了還未介紹的arrive,waitAdvance,arriveAndWaitAdvance等方法),單獨進行await操做的線程能夠是任意數量,執行arrive方法的線程加上執行arrive並wait的操做的線程和必須超過unarrived,這才能喚醒等待線程.

目前這些還比較抽象,等到咱們看過相應的幾個方法便了然了.

onAdvance的方法默認實現就是判斷本階段註冊的parties數量,若是已是0則說明沒有parties了,Phaser應該結束.可是咱們其實能夠從新實現,好比參數中同時傳入了當前的phase,我能夠規定上面的例子中phase最多隻有3輪次,那麼不論何時arrive,發現了當前phase已進入3輪,Phaser就被終止.固然,這一過程是由root執行的,可是子Phaser的phase會在每次註冊和arrive發生時同步root,所以本例中對於phase數的判斷能夠粗放到全部Phaser,對於parties數則只能做用於root(事實上調用onAdvance的必定是root).

接下來看全量構造方法和若干和上面有關的公有函數.

//初始化一個Phaser,指定parent,指定未到來的參與者數(unarrived parties),但這只是一個初值,
//當咱們在任什麼時候候調用註冊方法時,還會相應的增長.
public Phaser(Phaser parent, int parties) {
    if (parties >>> PARTIES_SHIFT != 0)
        //太大了,超過了後16位能表示的整數.
        throw new IllegalArgumentException("Illegal number of parties");
    //初始phase爲0.
    int phase = 0;
    this.parent = parent;
    if (parent != null) {
        //1.有parent的狀況,共享parent的root,隊列,並向parent中註冊1個parties和unarrived,
        //同時同步一次phase(表面上是同步了parent的,實際上前面已經看過,會同步root).
        final Phaser root = parent.root;
        this.root = root;
        this.evenQ = root.evenQ;
        this.oddQ = root.oddQ;
        if (parties != 0)
            phase = parent.doRegister(1);
    }
    else {
        //2.無parent的狀況,root就是this,並初始化奇偶等待隊列.它使用原子引用的形式存放一個QNode,而QNode咱們前面已介紹.
        this.root = this;
        this.evenQ = new AtomicReference<QNode>();
        this.oddQ = new AtomicReference<QNode>();
    }
    //統一初始化state,後32位的決定依託於parties,若是parties是0則給予EMPTY,直接無論高32位.
    //不爲0則給予phase設置爲前32位,parties設置parties位和unarrived位.
    this.state = (parties == 0) ? (long)EMPTY :
        ((long)phase << PHASE_SHIFT) |
        ((long)parties << PARTIES_SHIFT) |
        ((long)parties);
}


//註冊方法,就是調用doRegister,參數1.
//它會向this添加一個unarrived的party,若是正巧root正在進行advance,它須要等待下個phase.
//若是this有parent,且它以前沒有過註冊的parties,則首次註冊會觸發自身向parent的註冊.
//若是this已經終止了,那麼嘗試註冊將會無效並返回負值.若是註冊的數量大於了最大支持parties(後16位整數),
//會拋出IllegalStateException
public int register() {
    return doRegister(1);
}


//批量註冊指定的信號量,並返回最新的phase.規則基本同上.
public int bulkRegister(int parties) {
    if (parties < 0)
        throw new IllegalArgumentException();
    if (parties == 0)
        //參數0直接查詢最新的phase返回
        return getPhase();
    return doRegister(parties);
}


//arrive一個信號,不等待其餘arrive事件,返回最新phase(終止態爲負).
//當前Phaser的arrive事件已滿,則對parent來講也會觸發一個arrive.(若是有parent)
public int arrive() {
    return doArrive(ONE_ARRIVAL);
}


//arrive並解除一個註冊parties,也不阻塞等待其餘arrive.若是當前Phaser的解除註冊操做
//將parties減至0,且this有parent,這將致使parent也減小一個parties(本phaser解除在parent的註冊).
public int arriveAndDeregister() {
    return doArrive(ONE_DEREGISTER);
}

接下來要看上面已經作足了鋪墊的等待方法了,並結合前面的隊列一塊看.

//令當前線程"到達"此phaser並等待其餘parties,它等效於awaitAdvance(arrive()).
//注意,按照道格的註釋,若是你在一個未進行註冊(調用register)的線程裏調用此方法實際上是一個使用錯誤,
//可是從本方法和前面以及後面有關的方法來看,全部記錄線程的方法均只與arrive和等待有關,與註冊無關.
//所以Phaser自己沒法規避這種使用錯誤,咱們徹底可使用另外一個線程去註冊,而當前線程去arrive,將兩個動做分開.
//方法會返回arrive時最新的phase號.終止時會是負值.
public int arriveAndAwaitAdvance() {
    //記錄root,開始循環.
    final Phaser root = this.root;
    for (;;) {
        //1.預計算,首先同步state
        long s = (root == this) ? state : reconcileState();
        //計算phase
        int phase = (int)(s >>> PHASE_SHIFT);
        if (phase < 0)
            //已終結直接返回最終phase.
            return phase;
        //計算counts,unarrived
        int counts = (int)s;
        int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
        if (unarrived <= 0)
            //已經沒有空餘的unarrived信號了,不能再調用arrive,拋出異常.
            throw new IllegalStateException(badArrive(s));
        //2.減餘arrive的有關邏輯.嘗試cas減去一個arrive
        if (UNSAFE.compareAndSwapLong(this, stateOffset, s,
                                      s -= ONE_ARRIVAL)) {
            if (unarrived > 1)
                //2.1當前要減的信號不是本Phaser的最後一個信號量,調用root的等待方法.參數2是node,傳空.
                return root.internalAwaitAdvance(phase, null);
            if (root != this)
                //2.2當前要減的信號量是非root的Phaser的最後一個,遞歸給parent(雖然用了return,可是parent也可能在進入2.1後阻塞).
                return parent.arriveAndAwaitAdvance();
            //2.3當前要減的信號量是root的最後一個.
            //2.3.1準備計算下一個狀態,先取出state的parties信息.
            long n = s & PARTIES_MASK; 
            //計算nextUnarrived,它是如今的parties.
            int nextUnarrived = (int)n >>> PARTIES_SHIFT;
            //2.3.2前進phase邏輯.
            if (onAdvance(phase, nextUnarrived))
                //須要終止,給新state的計算基石n加上終止標記.
                n |= TERMINATION_BIT;
            else if (nextUnarrived == 0)
                //計算的nextUnarrived是0,即沒有parties,加上空標記位.
                n |= EMPTY;
            else
                //下一輪能正常進行,加上nextUnarrived位.
                n |= nextUnarrived;
            //2.3.3給n加上下一個phase.
            int nextPhase = (phase + 1) & MAX_PHASE;
            n |= (long)nextPhase << PHASE_SHIFT;
            if (!UNSAFE.compareAndSwapLong(this, stateOffset, s, n))
                //用n進行cas不成功,將新的phase返回.
                //說明一下,由於方法執行到此前已經執行過2的入口cas,減去了最後一個unarrived,所以在2到此的過程當中如有新的註冊,
                //它內部會讀到0個unarrived,就會等待下一個phase(參考前面介紹過的註冊方法),所以cas失敗不會是由於2以後有新的註冊.
                //在整個arrive系列的方法中,最後一次arrive發生後,本Phaser不可能有其餘線程再去執行相似2處的減餘的狀況.
                //故出現這種狀況的緣由目前來看有二,一是還未介紹的強制關閉Phaser的方法,此時也會突兀地改掉state形成cas恰巧失敗,二是
                //出現一些用戶作出的奇葩行爲,好比重寫了其餘公有方法.咱們天然忽略第二種狀況,doug大神也是簡單註釋了一個"terminated".
                return (int)(state >>> PHASE_SHIFT); // terminated
            //cas成功,釋放等待隊列中的線程,返回下一個phase(由於在此過程當中的register會等到advance,此時的phase已是nextPhase了).
            releaseWaiters(phase);
            return nextPhase;
        }
        //3.減餘失敗說明出現競態,直接開啓下輪循環從新減餘.
    }
}


//等待當前Phaser從給定的phase前進結束,若是當前phase不等於給定的phase,或者Phaser已終止當即返回.
//1.傳入phase爲負,返回它自己.
//2.傳入的phase不是最新的phase,返回最新的.
//3.傳入了最新的phase,等待到advance並返回advance後的phase.
public int awaitAdvance(int phase) {
    final Phaser root = this.root;
    long s = (root == this) ? state : reconcileState();
    int p = (int)(s >>> PHASE_SHIFT);
    if (phase < 0)
        return phase;
    if (p == phase)
        //匹配成功,等root前進.參數node爲null
        return root.internalAwaitAdvance(phase, null);
    return p;
}


//參考前面的幾個方法,區別是可擾動.
public int awaitAdvanceInterruptibly(int phase)
    throws InterruptedException {
    final Phaser root = this.root;
    long s = (root == this) ? state : reconcileState();
    int p = (int)(s >>> PHASE_SHIFT);
    if (phase < 0)
        //1.參數phase小於0直接返回它自己.
        return phase;
    if (p == phase) {
        //2.參數phase匹配,回憶一個前面介紹的QNode,匹配當前Phaser和phase,配置爲可擾動且不計時.
        QNode node = new QNode(this, phase, true, false, 0L);
        //3.放入root的等待隊列阻塞.
        p = root.internalAwaitAdvance(phase, node);
        if (node.wasInterrupted)
            //4.等待結束,判斷是不是擾動形成的結束,前面介紹過QNode的相關邏輯,
            //它實現了ForkJoinPool.ManagedBlocker,所以在managedBlock方法進行時,
            //會循環調用問詢是否能release,當咱們配置了可擾動且擾動了,就會標記這個wasInterrupted,釋放線程引用並返回.
            //發現此種狀況拋出異常.
            //同時,當發現等待成功,也會結束,釋放線程引用並返回,但不帶有擾動標記.
            throw new InterruptedException();
    }
    //5.返回1處以前讀取的phase或3處獲得的最新phase值.
    return p;
}

//同上方法,但帶有計時.
public int awaitAdvanceInterruptibly(int phase,
                                     long timeout, TimeUnit unit)
    throws InterruptedException, TimeoutException {
    long nanos = unit.toNanos(timeout);
    final Phaser root = this.root;
    long s = (root == this) ? state : reconcileState();
    int p = (int)(s >>> PHASE_SHIFT);
    if (phase < 0)
        return phase;
    if (p == phase) {
        //不一樣於上面方法的地方,創建的QNode帶有計時和等待時長.
        QNode node = new QNode(this, phase, true, true, nanos);
        p = root.internalAwaitAdvance(phase, node);
        if (node.wasInterrupted)
            //被擾動的狀況.
            throw new InterruptedException();
        else if (p == phase)
            //時間到了phase沒有前進,超時.
            throw new TimeoutException();
    }
    return p;
}

前面的幾個核心方法粗略過完,補充一些重要內容.

首先在前面曾分析過有線程阻塞等待下一個phase的狀況,並無加上定時等待的考慮.在超時的狀況下,阻塞的線程可能會收到異常並退出.

創建QNode能夠限定是否認時和可擾動,這取決於咱們使用哪一個方法去await.

除最後一個線程arrive外,全部線程調用這些方法都會減小一個arrive並加入等待隊列,直到(1)配置了定時且超時,(2)當前是可擾動等待且使用了Thread.interrupt(),(3)最後一個線程使用上述方法或arrive方法,使得Phaser前進了一個輪次,internalWaitAdvance結束.其中(1)(2)均會遷成arrive線程拋出異常,只有(3)纔是正常的狀況.

QNode前面已介紹,它是一個blocker,須要調用ForkJoinPool::managedBlock纔會起做用(顯然root的internalAwaitAdvance必然與此方法有關聯).固然這個做用與任務是否運行在ForkJoinPool無關,若是等待phaser前進的線程是運行在ForkJoinPool中的ForkJoinWorkerThread,顯然會在internalAwaitAdvance期間進行補償.這一塊可參考前面的"CompletableFuture與響應式編程"和"ForkJoin框架之ForkJoinPool"兩篇文章.

另外,這些代碼也再次說明了root的做用: (1)對一切非root的Phaser進行等待都會用root的internalAwaitAdvance;(2)每次註冊或arrive必定會同步root的最新phase.

其中(1)也間接說明了爲何構建Phaser時只有root建立等待隊列,全部子Phaser共享.

上面還保留了一個疑問,提到了"強制關閉Phaser"形成arriveAndAwaitAdvance出現cas失敗的問題,doug大神直接註釋了一個terminated,咱們立刻來看這一塊,以及一些周邊的公共函數,加深理解,而後再來解決關於等待隊列最後的一些問題.

//強制關閉Phaser,讓Phaser進入終止態,可是這個過程不影響它已註冊的parties,若是此Phaser是
//一個Phaser樹中的成員,那麼全部phaser集中的Phaser都會關閉,若是它已經關閉,此方法無效.此方法能夠
//用於若干任務出現意料以外異常的狀況下的協調恢復.
public void forceTermination() {
    // Only need to change root state
    final Phaser root = this.root;
    long s;
    //已經是終止態直接忽略.
    while ((s = root.state) >= 0) {
        //直接嘗試給root的state加上終止位.顯然加上了它,子Phaser在註冊和arrive等方法同步回新的phase就是個負數,
        //所以更改root的phase爲負至關於判了全部Phaser的死刑.惟一須要解決的是已經阻塞在root.internalAwaitAdvandce的線程.
        if (UNSAFE.compareAndSwapLong(root, stateOffset,
                                      s, s | TERMINATION_BIT)) {
            // 加上終止位成功,前後喚醒偶數等待隊列和奇數等待隊列.
            releaseWaiters(0); // Waiters on evenQ
            releaseWaiters(1); // Waiters on oddQ
            //返回
            return;
        }
    }
}


//返回當前phase,直接用root的state去取.
public final int getPhase() {
    return (int)(root.state >>> PHASE_SHIFT);
}

//查詢註冊的parties數量.調用前面介紹過的partiesOf
public int getRegisteredParties() {
    return partiesOf(state);
}

//查詢已經arrived的parties數量.調用介紹過的arriveOf

public int getArrivedParties() {
    return arrivedOf(reconcileState());
}

//查詢未arrive的parties數量,調用前面介紹過的unarrivedOf
public int getUnarrivedParties() {
    return unarrivedOf(reconcileState());
}

//返回parent
public Phaser getParent() {
    return parent;
}

//返回root
public Phaser getRoot() {
    return root;
}

//判斷當前Phaser是否終止,直接取root的state是否爲負,可見,終止態徹底取決於root.
public boolean isTerminated() {
    return root.state < 0L;
}

這些方法都比較簡單,只有forceTermination須要再強調一翻,前面介紹arrayAndAwaitAdvance時曾提過在減去最後一個unarrived信號後去cas到下一個phase失敗的狀況,doug大神簡單註釋了一句terminated,直接返回了當前的phase(顯然只能是負),在周邊方法重重加鎖的前提下,那一次cas的失敗惟一一處就是強制關閉,由於它只改關閉標記位,至關於動了phase,而沒有動unarrived標記位和parties標記位.因此重寫Phaser的方法要謹慎,極可能不當心打破了這個封裝.

從上面的有關方法能夠看出,子Phaser的終止態嚴重依賴於root,目前能夠肯定的是root的phase一旦表現出終止態,全部新來的註冊,arrive,arrive並await將會當即返回,惟一須要關注的就是root被設置了終止標記後,正陷入等待的線程怎麼辦的問題.

咱們下面就來看Phaser的等待機制,這裏面又能見到道格大神很是有趣的玩法.

//工具方法,移除某個phase的等待者.
private void releaseWaiters(int phase) {
    QNode q;  //保存隊列中的隊首
    Thread t;  // 保存線程引用.
    //取隊列,用phase的奇偶決定,phase是偶數就取偶數隊列,不然取奇數隊列.而這個phase其實只用來取隊列了,後續的操做與它無關.
    AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
    //循環,找出全部phase不等於root的phase的(其實root是最大的,因此就是找出非最新phase加入進來的waiter QNode)
    while ((q = head.get()) != null &&
           q.phase != (int)(root.state >>> PHASE_SHIFT)) {
        //找出了,利用原子引用將head指向next.
        if (head.compareAndSet(q, q.next) &&
            (t = q.thread) != null) {
            //發現阻塞者,喚醒線程.回憶下前面實現blocker方法中的isReleaseble和block方法都有將線程置空的操做.(三種狀況,喚醒擾動超時都會置空)
            //可是那些方法並無將表明該阻塞線程的QNode移除隊列,所以可能會發現thread已是null(表明無阻塞者)的狀況,只須要移除隊列便可.
            q.thread = null;
            LockSupport.unpark(t);
        }
    }
}


//上面releaseWaiters方法的一個變種,但它只會處理遍歷過程當中位於頭部的元素,出現正常的等待節點就會當即返回.
//此方法在這一塊能夠有效的減小內存的佔用.退出時返回當前的phase.
private int abortWait(int phase) {
    //一樣,參數phase只是用來選擇要處理的隊列.
    AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
    for (;;) {
        Thread t;
        QNode q = head.get();
        //計算最新phase的值p
        int p = (int)(root.state >>> PHASE_SHIFT);
        if (q == null || ((t = q.thread) != null && q.phase == p))
            //1.出現q爲null表明整隊列元素已出隊,直接返回p;
            //或者在出隊過程當中head(q)記錄的線程引用還在,說明未超時或擾動,且是本phase的等待節點,終止循環並返回最新phase.
            return p;
        if (head.compareAndSet(q, q.next) && t != null) {
            //進入條件,參考1的條件,由於1會直接返回.故進入2的條件實際上是q非空且處於舊的phase.只有這種狀況才能夠出隊.
            //2.將q出隊,置空線程引用,釋放線程.
            q.thread = null;
            LockSupport.unpark(t);
        }
    }
}

//計算有效cpu,控制自旋.
private static final int NCPU = Runtime.getRuntime().availableProcessors();

//常量,每輪arrive等待的字旋數,取決於NCPU,小於2則取1,不小於2取2的8次冪.
static final int SPINS_PER_ARRIVAL = (NCPU < 2) ? 1 : 1 << 8;


//珊珊來遲的內部等待方法.它可能會一直阻塞到phase的advance發生(除非取消了等待).
//此方法僅限root調用.參數phase表示當前的phase,參數node表示等待節點,用於追蹤節點的擾動或超時.
//若是是null,表示是一次不可擾動的等待.返回值爲當前最新的phase.
private int internalAwaitAdvance(int phase, QNode node) {
    // 1.調用releaseWaiters,傳入參數phase的前一個phase,顯然這只是決定釋放哪個隊列.參數絕對實時準確的狀況下會先將老的隊列釋放掉.
    releaseWaiters(phase-1); 
    //節點入隊標記,入隊了就會變爲true
    boolean queued = false;   
    //記錄每一輪循環的unarrived數量,用於決定是否擴增自旋等待次數.
    int lastUnarrived = 0; 
    //自旋數,參考上面的計算邏輯.
    int spins = SPINS_PER_ARRIVAL;
    long s;
    int p;
    //開啓循環,直到phase前進爲止或內部判斷已取消等待.
    while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) {
        //2.傳入node是null,即非可擾動的模式邏輯.只有非可擾動模式纔有自旋.
        if (node == null) {  
            //2.1每輪自讀進入都會嘗試計算新的unarrived,若是發現出現了變更(變大或者變小),
            //會將它保存到前面的lastUnarrived.        
            int unarrived = (int)s & UNARRIVED_MASK;
            if (unarrived != lastUnarrived &&
                (lastUnarrived = unarrived) < NCPU)
                //發現新變化的unarrived<NCPU,擴增自旋次數,繼續自旋.
                //unarrived的變化,若沒有大量新的parties註冊,會在自旋過程當中變小,反之大量加入註冊,大於了NCPU則放棄增長自旋次數.
                spins += SPINS_PER_ARRIVAL;
            //2.2,未發現本輪循環unarrived發生變化,或者增長了大量註冊,形成大於NCPU的邏輯,首先記錄此時的線程擾動狀態.
            boolean interrupted = Thread.interrupted();
            //2.3接2.2,若是發現了線程被擾動了,或者經若干次自旋減小次數,自旋次數並未能在2.1進行增長,直至減爲0,進入if.
            if (interrupted || --spins < 0) { // need node to record intr
                //2.4,知足2.3進入if的條件,再也不繼續自旋了,由於參數沒有提供node,此處初始化一個node,不定時,不可擾動,並保存擾動狀態.
                //下輪循環將沒法進入2.
                node = new QNode(this, phase, false, false, 0L);
                node.wasInterrupted = interrupted;
            }
        }
        //3.參數傳入了node,或者在2.4進入了node的初始化,每一輪循環到此都先判斷是否可釋放(若能夠,內部會置thread爲null).
        else if (node.isReleasable()) 
            //發現node所處的phase已經達到或者取消了,則break掉循環.
            break;
        //4.未能在非擾動模式下自旋解決(2)或提早發現node的擾動且未將node入隊的狀況下,將node入隊.
        else if (!queued) { 
            //選擇當前phase表明的隊列.
            AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
            QNode q = node.next = head.get();
            //這一行不起眼的if條件代碼真的是一個悄無聲息解決了一個大暗坑的地方,後面說.
            if ((q == null || q.phase == phase) &&
                (int)(state >>> PHASE_SHIFT) == phase) 
                //double check避免髒入隊,入隊條件是(1)無頭,(2)或者頭元素的phase等於參數phase(由於相鄰的兩個phase絕對不會入同一個隊).
                //知足(1)(2)的同時,還要知足(3),參數phase就是當前的state表示的phase(由於此方法只能root使用,故爲root表示的最新phase).
                //條件知足,入隊,取代原來的head,原來head表明的node成爲node的next.而條件不知足進入下一循環,極可能while條件就不知足了退出循環.
                queued = head.compareAndSet(q, node);
        }
        //5.已經在某一輪循環入隊了,使用ForkJoinPool的managedBlock管理block,其間可能會釋放線程引用.
        else {
            try {
                //5.1它內部也有循環,且會調用前面看到過的isReleasable和block實現,顯然它一旦結束(包含擾動),必定會形成下輪外循環終止於3處.
                ForkJoinPool.managedBlock(node);
            } catch (InterruptedException ie) {
                //5.2出現擾動異常catch住,並保存.下輪循環也會終止在3處.
                node.wasInterrupted = true;
            }
        }
    }
    //6.走出上面的while循環,多是root已經advance到下一個phase(2前的循環),也多是傳入node的狀況下出現了擾動或超時(5)形成(3)知足
    if (node != null) {
        //6.1node存在表明可能已經壓入隊列,結果要麼是已出現擾動或超時(方法結束後會拋出異常),要麼是已正常完成.
        //顯然,代碼執行到此處就要返回了,阻塞的線程會拋出異常結束(超時或擾動)或繼續執行(正常advance),
        //沒有必要去嘗試喚醒能執行出前面while循環到達6立刻要返回的線程.
        if (node.thread != null)
            //6.2取消node中的線程引用,避免外面的線程嘗試喚醒.
            node.thread = null;       // avoid need for unpark()
        if (node.wasInterrupted && !node.interruptible)
            //6.3若是node自己設置了不可被擾動,但5.2處判斷線程自己拋出了擾動異常,卻被catch住了,此處擾動本線程.
            Thread.currentThread().interrupt();
        if (p == phase && (p = (int)(state >>> PHASE_SHIFT)) == phase)
            //6.4發現phase並未前進.仍是參數傳入的pahse,說明必定是擾動或超時的結果,abortWait對本phase使用的隊列進行清理,
            //而清理的目標前面已論述過,是本隊列頭部開始的早於本phase的元素.(發現一個不知足條件的就中止了清理).
            return abortWait(phase); // possibly clean up on abort
    }
    //7.退出上面的while循環必定會到此幫助釋放早於最新階段的waiter.注意,是早於最新phase的,參數phase只是決定了選哪一個隊列(奇偶).
    //若是是6.4表明的那種擾動超時狀況,此處其實釋放的是舊的結果.被喚醒的線程其實通常是執行在5.1處阻塞的.當前線程能運行到此絕對不須要喚醒.
    releaseWaiters(phase);
    return p;
}

到此Phaser的代碼解析已完畢,咱們來分析關於隊列,等待和喚醒的問題.

1.Phaser維護了兩個"隊列",不論加入等待隊列仍是彈出等待隊列,都是從頭部進行,新加入的成員會成功隊列的新頭,原來的頭會成爲它的next,彈出時next成爲新頭.因此至關於一個對頭部的"後進先出",考慮官方起名和註釋,咱們依舊保持隊列這個稱呼.

2.喚醒時,會從隊列的頭部依次彈出node 的phase早於root的最新phase的node,.

3.等待時,入隊的node成爲新的頭.

4.當輪次增長時,會使用和本輪不一樣的隊列增長元素,同時也會喚醒本輪中等待的node.

由於喚醒和等待同時進行,且各自操做各自的隊列(不一樣的phase),所以彼此之間沒有競態(儘管一個是頭入一個是頭出),能夠說設計巧妙,下面咱們來腦洞大開,思考一個極端狀況.

咱們假設一種極端的phase切換場景,奇數phase大量等待入隊,偶數phase則迅速完成.假設當前phase對應的隊列是奇數對列,輪次提高完成後,它去釋放當前的隊列元素,結果未等這個釋放操做執行完畢,偶數隊列的輪次很快執行完,奇數隊列中積壓了成千上萬個node未能釋放,輪次卻又切回到了奇數隊列,會出現什麼事?

顯然奇數隊列若是一直保持這種極端場景,它會愈來愈龐大,逼近撐爆內存的同時,大量線程也會得不到釋放,甚至於老一輪的線程須要等待新一輪的線程去釋放.爲何老一輪的線程會去等待新一輪的線程釋放呢?

releaseWaiter的方法咱們已經看出,它只會釋放phase早於最新的node,此時最新壓入的元素屬於當前最新的phase,顯然不知足條件,那麼會形成奇數隊列中兩輪前壓入的元素不能獲得清除,兩輪前就在釋放當時積壓node的線程(那一輪最後一個arrive)發現不符合清理條件,就直接return並終止了,只能等待本輪最後一個arrive出現後繼續進行釋放.若是本輪最後一個arrive出現很晚,在下一輪依舊保持如此極端,往返數輪,確實會致使奇數隊列中積壓大量node,且第一輪就在等待該輪次結束的線程早就知足了釋放條件(升到了2輪),事實上多是第n輪才獲得釋放,這還符合Phaser的定義嗎?咱們使用它,就是要保證每一輪單獨使用,每一輪次達到條件,線程釋放並執行,下一輪次是下一輪次.

然而doug的代碼就是這個樣子,想遍各類極端,以爲可能找到了bug,那麼就須要仔細思考了.做者來簡述一下這個趟坑的分析過程.

這個問題確實已經獲得了極大的規避了,畢竟是個極端狀況.

1.線程的喚醒真的很快,儘管此處除了喚醒還包含了原子引用的更新(每次出隊都要cas).

2.若是沒有註冊,顯然就沒有arrive相關的狀況,儘管能夠單獨調用,但必須保證在arrive時傳入的數量此時已經註冊了,所以每一輪次(phase)中可能積壓等待喚醒的線程的操做必定是在註冊以後,可是咱們回憶一下,註冊方法的第一步就是要等待完成advance,並且傳給internalAwaitAdvance的node會是null,即不能擾動和超時,因此當本輪次阻塞了必定數量的線程後,若是不去arrive,也不考慮超時和擾動的狀況,那麼線程將一直阻塞.咱們不可能在輪次advance前進行註冊,也就不可能在advance以前進行新一phase的arrive.

3.當本輪次的最後一個arrive線程觸發了輪次的更新後,才能夠開啓註冊以及新輪次的arrive,可是此時使用了另外一個等待隊列,而觸發了輪次更新的上一輪的arrive線程將會當即進行前一個隊列中積壓的線程的喚醒操做.只有該喚醒操做足夠慢,且新的輪次極快就完成了的狀況,纔可能形成在原arrive線程未能及時釋放奇數隊列的狀況下,新一輪次再次向其中添加元素.

4.最重要的還在上面的internalAwaitAdvance方法,那一段被做者標上了入隊條件的註釋處,要想入隊,必須if ((q == null || q.phase == phase) &&加上後面的條件,而這兩個條件的限定已經很明顯,要想入隊,必須知足該等待隊列沒有元素或者隊首是本輪的元素,而該方法又是下一輪首次註冊時必須等待完成的,下一輪的arrive又必須發生在下一輪的首次註冊以後,所以根本不會出現本輪wait的線程還要等下一輪甚至下N輪的線程去釋放的極端狀況,哪怕真的去作一個極端測試:讓奇數輪大量積壓線程,讓偶數輪快速切換,而後測試第一輪壓入的線程究竟是不是本輪釋放的.(做者差點就要寫單元測試去作這個極端測試了!)

這一段不經意的if,一個小小的條件,若是不注意真的忽略了,小代碼大功效,誰能想到,這麼深的暗坑就這樣被規避了.

總結

前面已經詳述了Phaser的源碼以及若干趟坑辛路.其實已經沒什麼好總結的了,就在此順便對比常見同步器CyclicBarrier,CountDownLatch,Semaphore的特徵和實現.

從使用特徵上看:

1.CountDownLatch是一次性的,只能初始化決定parties數量,等待者能夠是多個,每次釋放都會減小一個信號量,直到歸0時爲止,最後一個釋放者將喚醒其餘等待的線程.它也不能繼續使用.

2.CyclicBarrier是可重用的,分代的,每一代之間彼此獨立,可是每一代的初始parties是相同的,不可在運行期內動態調整,每一代最後一個線程會去開啓一下代,並能夠在此時運行一個用戶指定的action,與此同時喚醒其餘線程繼續執行.它能夠在運行完一代後繼續被使用.而且它還支持重置.

3.Semaphore是一個資源量的典型,若是說CountDownLatch和CyclicBarrier或者Phaser都是等到"人夠了"再放行,Semaphore倒是起到限流的做用,它控制了有限的令牌數,這個數量不能夠動態地更改,在不能acquire到足夠的令牌數時,線程將阻塞,直到其餘線程釋放了足量的令牌數並喚醒它爲止.每個持有了令牌的線程均可以喚醒阻塞等待獲取的線程.

4.Phaser的功能上不一樣很明顯,首先它的參與者數量幾乎時刻可變(除了正在進入下一phase期間),隨時能夠增長減小parties數量,每一phase等待者能夠是多個,每一phase中,每一個能從internalAwaitAdvance方法中走出循環的線程均可以幫助喚醒,固然最終能進入喚醒操做仍是要歸功於最後一個arrive的線程(儘管它arrive後其餘線程醒來後也會幫助喚醒).Phaser的喚醒者不必定是參與者.

從實現來看:

1.CountDownLatch藉助了aqs來實現parties的釋放,它使用cas+park的方式,不使用Lock.

2.CyclicBarrier須要藉助重入鎖和condition,每個await的線程都要全局加鎖,阻塞時await在condition上.

3.Semaphore在實現上相似CountDownLatch,也是基於aqs,只不過它容許獲取和釋放,對state有增有減,總量不變.也是cas+park的方式阻塞,也不使用Lock

4.Phaser由於功能的要求,不基於AQS(它不能有構建時就固定的state,儘管能夠初始化一個state,但它必須支持改變),它依託於原子引用實現了一個內部的隊列,相應的等待/入隊/喚醒等操做經過cas自旋+park的方式,一樣不使用Lock.並利用雙隊列的方式規避了前一輪的釋放和後一輪的響醒的阻塞.

此外還有兩點結合前面的推理和自測驗證的結論:

1.Phaser中的每個phase是保證了可見性的,經做者自測,在任何使用Phaser的代碼中await先後,不會出現串phase讀出的亂序狀況(側面說明每一個phase不會依賴後一個或幾個phase的釋放).

2.Phaser須要對await的線程進行阻塞時,是將它打包成一個node(blocker),利用ForkJoinPool來block的.若是使用Phaser同步的任務是運行在ForkJoinPool中的,它將會利用到相應的補償機制,經做者自測,這將保證Phaser中block的每個任務必然獲得執行,每個阻塞的線程必然獲得釋放.

相關文章
相關標籤/搜索