在前面的幾篇文章中詳述了ForkJoin框架的若干組分,在相應的官方文檔中總會不時地提起"Phaser",一樣的,也提到Phaser能夠用於幫助運行在ForkJoinPool中的ForkJoinTask運行時保持有效的執行並行度(其實特指其餘task都在等待一個phase的前進時).node
熟悉JUC的朋友都知道它的大概組成部分包含:Containers(支持併發的容器),Synchronizers(同步器),Executors(線程池),BlockingQueue(阻塞隊列),Atomic(原子類),Lock and Condition(鎖).而Phaser和CyclicBarrier,Semaphore等同樣是一個同步器.算法
本文主要介紹Phaser的內部實現,粗略介紹使用,它的源碼相比於線程池較爲簡單,但最好能對比其餘同步器來了解,讀者最好擁有juc其餘同步器,原子類,部分ForkJoin框架的基礎.編程
同時,本文也會再次提到ForkJoinPool::managedBlock(blocker),以前在ForkJoinPool一文提到了實現和接口,而在CompletableFuture中見到了一個blocker的實現.併發
首先來看一些與Phaser狀態有關的簡單的常量.框架
//64位整數表示Phaser的狀態. private volatile long state; private static final int MAX_PARTIES = 0xffff;//最大parties,後16位表示. private static final int MAX_PHASE = Integer.MAX_VALUE;//最大phase,最大整數值. private static final int PARTIES_SHIFT = 16;//取parties使用的移位數,16 private static final int PHASE_SHIFT = 32;//取phase的移位數,32 private static final int UNARRIVED_MASK = 0xffff; //未到的,取後16位. private static final long PARTIES_MASK = 0xffff0000L; //參加者,17-32位. private static final long COUNTS_MASK = 0xffffffffL; //數量,後32位. private static final long TERMINATION_BIT = 1L << 63;//終止態,首位. // 特殊值. private static final int ONE_ARRIVAL = 1; private static final int ONE_PARTY = 1 << PARTIES_SHIFT; private static final int ONE_DEREGISTER = ONE_ARRIVAL|ONE_PARTY;//第1位和17位.顯然,它表示了一個ONE_ARRIVAL信息和PARTY信息. private static final int EMPTY = 1; //對一個state s計算unarrived的count, private static int unarrivedOf(long s) { //直接取整數位,若是等於EMPTY(1)則返回0,不然取後16位. int counts = (int)s; return (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK); } //對一個state,取出parties信息,直接取state的17至32位. private static int partiesOf(long s) { return (int)s >>> PARTIES_SHIFT; } //對於一個state,取出phase信息,直接取前32位. private static int phaseOf(long s) { return (int)(s >>> PHASE_SHIFT); } //對於一個state,取出arrived信息 private static int arrivedOf(long s) { int counts = (int)s; //state的後32位等於1(EMPTY)返回0,不然返回parties(state的17至32位,參考上面的partiesOf方法)和UNARRIVED(state的後16位)的差. return (counts == EMPTY) ? 0 : (counts >>> PARTIES_SHIFT) - (counts & UNARRIVED_MASK); }
上面都是一些常量,沒什麼可分析的,簡單來個總結.函數
Phaser用一個long型的state保存狀態信息.工具
state的前32位表示phase,後16位表示unarrivied,17至32位表示parties,parties減去unarrived即arrived.單元測試
下面咱們看一些成員變量和有關函數.測試
//this的父,能夠是null表示none private final Phaser parent; //phaser顯然是個樹的結果,root表明根,若是當前phaser不在樹內,則root==this private final Phaser root; //偶數隊列和奇數隊列.它們存放等待線程棧的頭,爲了減小當添加線程與釋放線程的競態, //這裏使用了兩個隊列並互相切換,子phaser共享root的隊列以加快釋放. private final AtomicReference<QNode> evenQ; private final AtomicReference<QNode> oddQ; //決定某個phase的等待線程隊列. private AtomicReference<QNode> queueFor(int phase) { //選擇隊列的方法,若是參數phase是偶數,使用evenQ,不然oddQ. return ((phase & 1) == 0) ? evenQ : oddQ; } //出現arrive事件時的邊界異常信息. private String badArrive(long s) { return "Attempted arrival of unregistered party for " + stateToString(s); } //註冊時的邊界異常信息. private String badRegister(long s) { return "Attempt to register more than " + MAX_PARTIES + " parties for " + stateToString(s); } //他們都用到的stateToString(s),計算參數s對應的phase,parties,arrived. private String stateToString(long s) { return super.toString() + "[phase = " + phaseOf(s) + " parties = " + partiesOf(s) + " arrived = " + arrivedOf(s) + "]"; }
爲了便於理解,先來看隊列的實現.ui
//表示等待隊列的QNode,實現了ManagedBlocker static final class QNode implements ForkJoinPool.ManagedBlocker { //存放所屬phaser final Phaser phaser; //所屬phase final int phase; //是否可擾動 final boolean interruptible; //是否認時 final boolean timed; //是否已擾動 boolean wasInterrupted; //計時相關 long nanos; final long deadline; //關聯線程,當是null時,取消等待. volatile Thread thread; //下一個QNode QNode next; QNode(Phaser phaser, int phase, boolean interruptible, boolean timed, long nanos) { this.phaser = phaser; this.phase = phase; this.interruptible = interruptible; this.nanos = nanos; this.timed = timed; this.deadline = timed ? System.nanoTime() + nanos : 0L; //取當前線程. thread = Thread.currentThread(); } //isReleasable方法 public boolean isReleasable() { if (thread == null) //1.線程已置空(如2),返回true釋放. return true; if (phaser.getPhase() != phase) { //2.發現phaser所處的phase不是構建QNode時的phase了,就置線程爲空,返回true. thread = null; return true; } if (Thread.interrupted()) //3.若是當前線程擾動了. wasInterrupted = true; if (wasInterrupted && interruptible) { //4.發現擾動標記,而且QNode配置爲可擾動,則置線程null並返回true thread = null; return true; } if (timed) { //5.定時邏輯,還有nanos,計算新的時長. if (nanos > 0L) { nanos = deadline - System.nanoTime(); } if (nanos <= 0L) { //已經到時間,返回true,線程置空. thread = null; return true; } } return false; } //block邏輯 public boolean block() { if (isReleasable()) return true; else if (!timed) //不定時的park LockSupport.park(this); else if (nanos > 0L) //定時的狀況. LockSupport.parkNanos(this, nanos); //老規矩 return isReleasable(); } }
前面介紹過CompletableFuture的Singnaller,以及ForkJoinPool中的managedBlock,這一塊的邏輯顯然得心應手.
很明顯,若是咱們在ForkJoinPool中使用它做爲blocker,並在相應的ForkJoinTask的exec或CountedCompleter的compute方法中使用ForkJoinPool::managedBlock(blocker),將每一個ForkJoinWorkerThread在阻塞前構建一個QNode進入Phaser的等待隊列(雖然尚未講到相關內容,可是Phaser顯然不用咱們直接操做內部類QNode),那麼它將依照上述邏輯進行補償,保障有效的並行度.
前面完成了承前啓後,預熱到此結束,開始看Phaser的核心方法.
//doArrive方法 //它是arrive和arriveAndDeregister方法的主要實現.手動調用這些方法能夠加速經過和最小化競態窗口期. //參數表明要從當前state中減去的調整數值,它的單位依託於業務,當爲arrive時減去的單位爲ONE_ARRIVAL, //當爲arriveAndDeregister時減去的單位爲ONE_DEREGISTER. private int doArrive(int adjust) { final Phaser root = this.root; for (;;) { //1.變量s初始化,取決因而否當前Phaser是root.不是root將試圖從root同步滯後的state. long s = (root == this) ? state : reconcileState(); //計算phase,前32位. int phase = (int)(s >>> PHASE_SHIFT); if (phase < 0) //2.負數直接返回.說明原來的state首位就是1,前面的TERMINATE_BIT就是64位置1. return phase; //取count,後32位. int counts = (int)s; //計算unarrived,和前面同樣的邏輯. int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK); if (unarrived <= 0)//2.1 //沒有unarrived了,說明不該該調用此方法,拋出異常,信息就是前面介紹過的badArrive throw new IllegalStateException(badArrive(s)); //3.嘗試將state減去adjust數. if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adjust)) { //3.1cas成功後,unarrived餘1,則前進一個phase if (unarrived == 1) { //3.1.1取出parties做爲下一個state的基礎. long n = s & PARTIES_MASK; //3.1.2 下一個unarrived,數值上等於parties. int nextUnarrived = (int)n >>> PARTIES_SHIFT; if (root == this) { //3.1.3當前Phaser是root,onAdvance返回true,則加上終止信號. if (onAdvance(phase, nextUnarrived)) n |= TERMINATION_BIT; else if (nextUnarrived == 0) //3.1.4 onAdvance返回false,而計算得出的nextUnarrived是0,即沒有parties,n加上一個empty(1) n |= EMPTY; else //3.1.5nextUnArrived不是0,加到n上. n |= nextUnarrived; //3.1.6前面的流程完成了state的後32位(parties和unarrived),接下來處理前32位. //限定在MAX_PHASE以內,對當前phase加1. int nextPhase = (phase + 1) & MAX_PHASE; //將nextPhase的值加到n的前32位.並用n去cas掉原來的state,由於有3處入口的cas,此處必定能成功 n |= (long)nextPhase << PHASE_SHIFT; UNSAFE.compareAndSwapLong(this, stateOffset, s, n); //更新到新的phase,喚醒等待的waiter. releaseWaiters(phase); } //3.1.7當前Phaser不是root,當nextUnarrived計算得0時,像父傳遞解除註冊,參數ONE_DEREGISTER //會同時減去一個unarrived和一個parties.下輪循環正常應進入3.1.8 else if (nextUnarrived == 0) { phase = parent.doArrive(ONE_DEREGISTER); //完成傳遞後,將本身的state置empty. UNSAFE.compareAndSwapLong(this, stateOffset, s, s | EMPTY); } else //3.1.8,當前Phaser不是root,計算的nextUnarrived非0,像父傳遞一個arrive事件,減去一個unarrived. phase = parent.doArrive(ONE_ARRIVAL); } //3.2返回當前phase,多是已進入3.1遞增的.僅有此處可退出循環. return phase; } } }
關於該方法的執行流程,咱們結合幾個周邊方法一併分析,先來看註冊方法和onAdvance勾子.
//註冊和批量註冊.參數表明parties和unarrived字段的增長數,它必須大於0. private int doRegister(int registrations) { // 1.用參數計算一個adjust,同時包含parties和arrive. long adjust = ((long)registrations << PARTIES_SHIFT) | registrations; final Phaser parent = this.parent; int phase; //循環嘗試更改. for (;;) { //2.存在parent,則用root的phase調整this的state. long s = (parent == null) ? state : reconcileState(); //取出當前state中保存的counts,parties,unarrived信息. int counts = (int)s; int parties = counts >>> PARTIES_SHIFT; int unarrived = counts & UNARRIVED_MASK; if (registrations > MAX_PARTIES - parties) //要註冊的數量大於了餘量,拋出異常. throw new IllegalStateException(badRegister(s)); //3.計算出phase phase = (int)(s >>> PHASE_SHIFT); if (phase < 0) //phase爲負說明state爲負,即終止態,終止. break; //4.當前state表示的參與數非空的邏輯,當前註冊非首次註冊. if (counts != EMPTY) { if (parent == null || reconcileState() == s) { //this是root或者從root同步的state不變,繼續執行,不然從新循環. if (unarrived == 0) //4.1本輪循環經過原state計算的unarrived爲0,說明應等待下一phase,使用root等待 root.internalAwaitAdvance(phase, null); else if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s + adjust)) //4.2本輪循環未發現應等待下一phase,嘗試原子更新,增長adjust到state上. break; } } //5.當前不存在counts,且自身就是root,表明root的首次註冊. else if (parent == null) { //5.1計算下一個state,由於沒有參與數,使用phase初始化前32位,並使用adjust作後32位. long next = ((long)phase << PHASE_SHIFT) | adjust; if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next)) //5.2 cas成功,退出,不成功,下輪循環. break; } //6.是首次註冊,但也不是root的邏輯.表明非root的Phaser的首次註冊. else { //6.1對當前Phaser加鎖並double check,避免同時調用.加鎖失敗的線程將在後續進入2的邏輯. synchronized (this) { //double check state未發生改變. if (state == s) { //6.2首先向父Phaser註冊1. phase = parent.doRegister(1); if (phase < 0) //發現進入終止態,直接中止. break; //6.3向父Phaser註冊成功,循環嘗試cas掉老的state,新state的算法同上,phase加adjust. //在整個while循環中,再也不考慮phase進入終止態的狀況,由於這些操做處於同一個"事務"中, //且因競態等緣由,若某次cas時計入了負數的phase,方法返回後也能夠及時發現. while (!UNSAFE.compareAndSwapLong (this, stateOffset, s, ((long)phase << PHASE_SHIFT) | adjust)) { //若是cas不成功,則讀取s爲新的state,計算新的phase並從新循環. s = state; phase = (int)(root.state >>> PHASE_SHIFT); // assert (int)s == EMPTY; } //6.4cas成功後退出循環. break; } //若是if(state==s)判斷失敗,說明有別的線程有當前線程進入synchronized塊前已經加鎖並執行了內部的邏輯且稍後釋放了鎖, //這樣當前線程加鎖成功,但if判斷失敗,它會當即釋放鎖並返回到2. } } } return phase; } //使用root的phase調整this的state,更新滯後的結果.這通常發生在root前進了phase可是 //子phaser尚未作到這一步,這種狀況下,子phaser必須完成這個前進的步驟,這一過程當中,phase將 //被置爲root的phase,unarrived則會重置爲parties,若parties爲0,則置爲EMPTY.返回結果state. private long reconcileState() { final Phaser root = this.root; long s = state; //不是root才進行下一步. if (root != this) { int phase, p; //cas,phase採用root,parties不變,unarrived重置爲parties或EMPTY. while ((phase = (int)(root.state >>> PHASE_SHIFT)) != (int)(s >>> PHASE_SHIFT) && //phase滯後於root //嘗試cas. !UNSAFE.compareAndSwapLong (this, stateOffset, s, //肯定新state的前32位,使用root的phase. s = (((long)phase << PHASE_SHIFT) | //新phase<0,後32位直接取this的state表示的counts. ((phase < 0) ? (s & COUNTS_MASK) : //phase有效,this的state表示的parties爲0,則後32位使用empty (((p = (int)s >>> PARTIES_SHIFT) == 0) ? EMPTY : //不然,後32位使用parties. ((s & PARTIES_MASK) | p)))))) s = state; } return s; } //onAdvance勾子方法,參數爲當前phase和註冊的parties數. //默認實現爲parties數爲0,方法返回true時,調用者會嘗試終止Phaser.(參考前面的doArrive).隨後調用isTerminated方法將返回true. //執行此方法時拋出的運行時異常或Error將直接上拋給嘗試advance相應的phase的線程,這種狀況下不會發生phase的advance. //方法的入參表示的是Phaser當前的state(未advance前),所以若在onAdvance方法中執行arrive,regist,waiting這三種操做的行爲是不肯定的也不可靠的. //若是當前Phaser是一個級聯的成員,那麼onAdvance只會由root在每次advance時調用. //方法的默認實現返回true的場景目前只能是通過數次arriveAndDeregister調用形成parties歸零的結果.咱們繼承Phaser能夠輕易地重寫此行爲, //好比簡單粗暴地返回false,那麼將永遠容許新的註冊. protected boolean onAdvance(int phase, int registeredParties) { return registeredParties == 0; }
通過前面的代碼分析,已經對Phaser的核心函數doRegister,doArrive有了全面的瞭解.
二者都會在一開始同步root的phase,且若是出現落後root的狀況,同步了新的phase的同時,也會從新初始化unarrived,而且使用parties的值.
doArrive方法會每次調整unarrived數量(也可包含parties數量,若是使用瞭解除註冊),當Phaser調用自身的arrive/arriveAndDeregister時,會作出相應的減小,並根據是否爲root而決定向上遞歸.
Phaser減小自身unarrived信號(也可能同時有parties信號)後,若發現這已是最後一個unarrived信號,則進行接下來的判斷:
1.當前Phaser是root,advance並喚醒waiter.(重要的喚醒操做執行點,root一輪完成)
2.當前Phaser不是root,且它已經不具有繼續下一輪的條件(計算nextUnarrived爲0,即parties已經被arriveAndDeregister置0),則從父Phaser減小一個unarrived和parties.
3.當前Phaser不是root,但它仍具備parties,知足進行下一輪的條件(計算nextUnarrived不是0),則從父Phaser減小一個unarrived,但不減小parties.
顯然,子Phaser的最後一個unarrived的消失必定會形成父的unarrived減小,子Phaser不能繼續下一phase的register和arrive時,從父Phaser中卸載.
若不是本Phaser的最後一個unarrived信號,則直接結束,至關於只進行了上面的減小信號操做.
doRegister方法的邏輯大體相反,不一樣於doArrive,它的參數registrations同時做用於parties和unarrived,即兩個位上同時加上registrations參數.它的大體邏輯:
1.當前註冊並不是首次註冊,且出現unarrived==0,即本輪已經完成了arrive,那麼本輪將不能註冊,須要等待root更新到下輪.(這也是咱們碰到的第一個阻塞)
2.當前註冊並不是首次註冊,unarrived也不是0,則在本phase進行註冊,增長相應的parties和unarrived.
3.當前註冊是root的首次註冊,給root的state加上相應的parties和unarrived.
4.當前註冊是非root的首次註冊,加鎖(this),對本身的state加上相應的parties和unarrived(同上,以registrations爲單位),而對parent加上一個parties和unarrived單位.
很明顯,對於單Phaser的狀況很是好理解,每次減小unarrived數量(先不考慮減小parties),則最終致使Phaser自身進入下一個phase,而後從新初始化unarrived到下一輪,unarrived的新值是前一輪剩下的parties數量.
當咱們同時也嘗試減小parties數量,即解除parties的註冊,最終致使沒有parties,那麼Phaser將進入終止態.
整個過程當中,只要Phaser沒進入終止態,隨時能夠進行新的註冊,並增長parties和unarrived的數量.每一個arrive能夠減小unarrived的數量爲任何正整數,不必定是1.
對於多Phaser的狀況,有兩個特殊點:
1.對任意Phaser樹中的某一個Phaser調用註冊操做,會令自身加上相應參數個parties和unarrived單位,僅會在該Phaser第一次註冊時增長父Phaser(極端可能,僅從一個葉子節點第一個註冊的狀況下可一直遞歸到root)的parties數和unarrived數各1單位(不論參數是多少).
2.對任意Phaser樹中的某一個Phaser調用arrive操做,會令自身減去相應的參數個parties和unarrived單位,同時僅當本Phaser此時是最後一個unarrived時,會減去父Phaser的一個unarrived單位(當前子Phaser仍舊有parties能夠構建下一phase),或減去父Phaser一個Parties和unarrived單位.(極端狀況下,每一級都是最後一個unarrived時,減小葉子節點的最後一個unarrived會遞歸到root).
每新增一個子Phaser,父Phaser就會增長一個要完成觸發phase的advance前必需要等到arrive的單位;每個子Phaser中全部的arrive完成,父Phaser都將減小一個要等待advance所必需觸發的arrive.
目前沒有看到await方法,但能夠提早說明,等待操做徹底依賴於root是否完成本輪.也就是全部子Phaser都完成了同一輪(arrive打滿),才能讓父Phaser自己減去一個全部arrive單位,再觸發父Phaser本輪的完成,此時對任何已完成的Phaser進入註冊,都會進入上述的root.internalAwaitAdvance(phase, null)方法等待root進入下一phase.若是對已經完成全部arrive的Phaser繼續進行arrive操做,由於unarrived已是0,則會拋出異常.
因此對於使用子Phaser的場景,若是發生很巧妙的狀況,Phaser樹上當前子Phaser的arrive結束條件知足了,使得新來的註冊只能等待下一輪次,而其餘分支的子Phaser又恰恰不能完成本輪次,那麼新的phaser.doRegister方法將阻塞在此.
好在咱們使用Phaser可能會相似CyclicBarrier的使用方式,可對每一輪(phase)進行註冊並等待(也許只等一輪,那麼arrive就要帶上deregister),每一輪最後一個線程arrive了,就會中止全部線程的等待,讓全部線程繼續執行,同時開啓了下一輪次,這些線程此時又能夠不經註冊直接在新的輪次中進行等待,直到最後一個arrive了,再次喚醒全部線程並繼續執行,同時Phaser再前進一輪,如此往復.中間使用arrive並deregister的線程會從本輪起減小一個unarrive數量(由於parties也減小了,因此再下一輪初始化unarrive數量時也會減小一次).咱們可讓這些線程參與任意的輪次,但要注意的是,若是有線程中途不參加了,必定要解除註冊,不然由於每輪初始化時,要等待arrive的數量都是上一輪剩下的parties數量,有線程中止了執行,卻不減小parties數,那麼下輪全部等待的線程將永遠等不到phaser知足喚醒的條件.
上述的過程當中能夠明顯的看出,目前已介紹的兩個重要核心函數:註冊和arrive並無直接記錄和操做線程的操做,相應的操做在等待方法和喚醒方法中(前面提到過release),咱們稍後介紹.
如今假設一個特殊的使用場景,也能夠區別於CyclicBarrie和CountDownLatch的使用.仍是上面的例子,可是咱們準備的線程數與Phaser的parties數/unarrived數不一樣(通常前者要多些),會發生什麼事?
首先建立了Phaser,不指定最初parties數,並用每一個線程去註冊(我甚至能夠用一個線程去重複註冊,每次的參數registrations還能夠不一樣,註冊的做用並非將當前線程壓入隊列,而是爲本phase設置一個unarrive數量,以控制到達下個phase前必須有多少次arrive的發生),則parties數和unarrived的初值徹底與此有關,是一個依託於咱們隨意註冊而產生的隨意值.那麼假定咱們的線程數量大於這個parties數(假定調用註冊方法的線程和arrive及等待的線程無關),並令有的線程執行arrive(徹底能夠一次arrive減去多個信號量,甚至一個線程屢次arrive),有的線程執行await等待信號advance到下一個phase(一個線程在一個週期只能調用一次),有的線程執行了arrive也等待phase前進(這種狀況一個線程一週期也只能一次.其實這些分別對應了還未介紹的arrive,waitAdvance,arriveAndWaitAdvance等方法),單獨進行await操做的線程能夠是任意數量,執行arrive方法的線程加上執行arrive並wait的操做的線程和必須超過unarrived,這才能喚醒等待線程.
目前這些還比較抽象,等到咱們看過相應的幾個方法便了然了.
onAdvance的方法默認實現就是判斷本階段註冊的parties數量,若是已是0則說明沒有parties了,Phaser應該結束.可是咱們其實能夠從新實現,好比參數中同時傳入了當前的phase,我能夠規定上面的例子中phase最多隻有3輪次,那麼不論何時arrive,發現了當前phase已進入3輪,Phaser就被終止.固然,這一過程是由root執行的,可是子Phaser的phase會在每次註冊和arrive發生時同步root,所以本例中對於phase數的判斷能夠粗放到全部Phaser,對於parties數則只能做用於root(事實上調用onAdvance的必定是root).
接下來看全量構造方法和若干和上面有關的公有函數.
//初始化一個Phaser,指定parent,指定未到來的參與者數(unarrived parties),但這只是一個初值, //當咱們在任什麼時候候調用註冊方法時,還會相應的增長. public Phaser(Phaser parent, int parties) { if (parties >>> PARTIES_SHIFT != 0) //太大了,超過了後16位能表示的整數. throw new IllegalArgumentException("Illegal number of parties"); //初始phase爲0. int phase = 0; this.parent = parent; if (parent != null) { //1.有parent的狀況,共享parent的root,隊列,並向parent中註冊1個parties和unarrived, //同時同步一次phase(表面上是同步了parent的,實際上前面已經看過,會同步root). final Phaser root = parent.root; this.root = root; this.evenQ = root.evenQ; this.oddQ = root.oddQ; if (parties != 0) phase = parent.doRegister(1); } else { //2.無parent的狀況,root就是this,並初始化奇偶等待隊列.它使用原子引用的形式存放一個QNode,而QNode咱們前面已介紹. this.root = this; this.evenQ = new AtomicReference<QNode>(); this.oddQ = new AtomicReference<QNode>(); } //統一初始化state,後32位的決定依託於parties,若是parties是0則給予EMPTY,直接無論高32位. //不爲0則給予phase設置爲前32位,parties設置parties位和unarrived位. this.state = (parties == 0) ? (long)EMPTY : ((long)phase << PHASE_SHIFT) | ((long)parties << PARTIES_SHIFT) | ((long)parties); } //註冊方法,就是調用doRegister,參數1. //它會向this添加一個unarrived的party,若是正巧root正在進行advance,它須要等待下個phase. //若是this有parent,且它以前沒有過註冊的parties,則首次註冊會觸發自身向parent的註冊. //若是this已經終止了,那麼嘗試註冊將會無效並返回負值.若是註冊的數量大於了最大支持parties(後16位整數), //會拋出IllegalStateException public int register() { return doRegister(1); } //批量註冊指定的信號量,並返回最新的phase.規則基本同上. public int bulkRegister(int parties) { if (parties < 0) throw new IllegalArgumentException(); if (parties == 0) //參數0直接查詢最新的phase返回 return getPhase(); return doRegister(parties); } //arrive一個信號,不等待其餘arrive事件,返回最新phase(終止態爲負). //當前Phaser的arrive事件已滿,則對parent來講也會觸發一個arrive.(若是有parent) public int arrive() { return doArrive(ONE_ARRIVAL); } //arrive並解除一個註冊parties,也不阻塞等待其餘arrive.若是當前Phaser的解除註冊操做 //將parties減至0,且this有parent,這將致使parent也減小一個parties(本phaser解除在parent的註冊). public int arriveAndDeregister() { return doArrive(ONE_DEREGISTER); }
接下來要看上面已經作足了鋪墊的等待方法了,並結合前面的隊列一塊看.
//令當前線程"到達"此phaser並等待其餘parties,它等效於awaitAdvance(arrive()). //注意,按照道格的註釋,若是你在一個未進行註冊(調用register)的線程裏調用此方法實際上是一個使用錯誤, //可是從本方法和前面以及後面有關的方法來看,全部記錄線程的方法均只與arrive和等待有關,與註冊無關. //所以Phaser自己沒法規避這種使用錯誤,咱們徹底可使用另外一個線程去註冊,而當前線程去arrive,將兩個動做分開. //方法會返回arrive時最新的phase號.終止時會是負值. public int arriveAndAwaitAdvance() { //記錄root,開始循環. final Phaser root = this.root; for (;;) { //1.預計算,首先同步state long s = (root == this) ? state : reconcileState(); //計算phase int phase = (int)(s >>> PHASE_SHIFT); if (phase < 0) //已終結直接返回最終phase. return phase; //計算counts,unarrived int counts = (int)s; int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK); if (unarrived <= 0) //已經沒有空餘的unarrived信號了,不能再調用arrive,拋出異常. throw new IllegalStateException(badArrive(s)); //2.減餘arrive的有關邏輯.嘗試cas減去一個arrive if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s -= ONE_ARRIVAL)) { if (unarrived > 1) //2.1當前要減的信號不是本Phaser的最後一個信號量,調用root的等待方法.參數2是node,傳空. return root.internalAwaitAdvance(phase, null); if (root != this) //2.2當前要減的信號量是非root的Phaser的最後一個,遞歸給parent(雖然用了return,可是parent也可能在進入2.1後阻塞). return parent.arriveAndAwaitAdvance(); //2.3當前要減的信號量是root的最後一個. //2.3.1準備計算下一個狀態,先取出state的parties信息. long n = s & PARTIES_MASK; //計算nextUnarrived,它是如今的parties. int nextUnarrived = (int)n >>> PARTIES_SHIFT; //2.3.2前進phase邏輯. if (onAdvance(phase, nextUnarrived)) //須要終止,給新state的計算基石n加上終止標記. n |= TERMINATION_BIT; else if (nextUnarrived == 0) //計算的nextUnarrived是0,即沒有parties,加上空標記位. n |= EMPTY; else //下一輪能正常進行,加上nextUnarrived位. n |= nextUnarrived; //2.3.3給n加上下一個phase. int nextPhase = (phase + 1) & MAX_PHASE; n |= (long)nextPhase << PHASE_SHIFT; if (!UNSAFE.compareAndSwapLong(this, stateOffset, s, n)) //用n進行cas不成功,將新的phase返回. //說明一下,由於方法執行到此前已經執行過2的入口cas,減去了最後一個unarrived,所以在2到此的過程當中如有新的註冊, //它內部會讀到0個unarrived,就會等待下一個phase(參考前面介紹過的註冊方法),所以cas失敗不會是由於2以後有新的註冊. //在整個arrive系列的方法中,最後一次arrive發生後,本Phaser不可能有其餘線程再去執行相似2處的減餘的狀況. //故出現這種狀況的緣由目前來看有二,一是還未介紹的強制關閉Phaser的方法,此時也會突兀地改掉state形成cas恰巧失敗,二是 //出現一些用戶作出的奇葩行爲,好比重寫了其餘公有方法.咱們天然忽略第二種狀況,doug大神也是簡單註釋了一個"terminated". return (int)(state >>> PHASE_SHIFT); // terminated //cas成功,釋放等待隊列中的線程,返回下一個phase(由於在此過程當中的register會等到advance,此時的phase已是nextPhase了). releaseWaiters(phase); return nextPhase; } //3.減餘失敗說明出現競態,直接開啓下輪循環從新減餘. } } //等待當前Phaser從給定的phase前進結束,若是當前phase不等於給定的phase,或者Phaser已終止當即返回. //1.傳入phase爲負,返回它自己. //2.傳入的phase不是最新的phase,返回最新的. //3.傳入了最新的phase,等待到advance並返回advance後的phase. public int awaitAdvance(int phase) { final Phaser root = this.root; long s = (root == this) ? state : reconcileState(); int p = (int)(s >>> PHASE_SHIFT); if (phase < 0) return phase; if (p == phase) //匹配成功,等root前進.參數node爲null return root.internalAwaitAdvance(phase, null); return p; } //參考前面的幾個方法,區別是可擾動. public int awaitAdvanceInterruptibly(int phase) throws InterruptedException { final Phaser root = this.root; long s = (root == this) ? state : reconcileState(); int p = (int)(s >>> PHASE_SHIFT); if (phase < 0) //1.參數phase小於0直接返回它自己. return phase; if (p == phase) { //2.參數phase匹配,回憶一個前面介紹的QNode,匹配當前Phaser和phase,配置爲可擾動且不計時. QNode node = new QNode(this, phase, true, false, 0L); //3.放入root的等待隊列阻塞. p = root.internalAwaitAdvance(phase, node); if (node.wasInterrupted) //4.等待結束,判斷是不是擾動形成的結束,前面介紹過QNode的相關邏輯, //它實現了ForkJoinPool.ManagedBlocker,所以在managedBlock方法進行時, //會循環調用問詢是否能release,當咱們配置了可擾動且擾動了,就會標記這個wasInterrupted,釋放線程引用並返回. //發現此種狀況拋出異常. //同時,當發現等待成功,也會結束,釋放線程引用並返回,但不帶有擾動標記. throw new InterruptedException(); } //5.返回1處以前讀取的phase或3處獲得的最新phase值. return p; } //同上方法,但帶有計時. public int awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException { long nanos = unit.toNanos(timeout); final Phaser root = this.root; long s = (root == this) ? state : reconcileState(); int p = (int)(s >>> PHASE_SHIFT); if (phase < 0) return phase; if (p == phase) { //不一樣於上面方法的地方,創建的QNode帶有計時和等待時長. QNode node = new QNode(this, phase, true, true, nanos); p = root.internalAwaitAdvance(phase, node); if (node.wasInterrupted) //被擾動的狀況. throw new InterruptedException(); else if (p == phase) //時間到了phase沒有前進,超時. throw new TimeoutException(); } return p; }
前面的幾個核心方法粗略過完,補充一些重要內容.
首先在前面曾分析過有線程阻塞等待下一個phase的狀況,並無加上定時等待的考慮.在超時的狀況下,阻塞的線程可能會收到異常並退出.
創建QNode能夠限定是否認時和可擾動,這取決於咱們使用哪一個方法去await.
除最後一個線程arrive外,全部線程調用這些方法都會減小一個arrive並加入等待隊列,直到(1)配置了定時且超時,(2)當前是可擾動等待且使用了Thread.interrupt(),(3)最後一個線程使用上述方法或arrive方法,使得Phaser前進了一個輪次,internalWaitAdvance結束.其中(1)(2)均會遷成arrive線程拋出異常,只有(3)纔是正常的狀況.
QNode前面已介紹,它是一個blocker,須要調用ForkJoinPool::managedBlock纔會起做用(顯然root的internalAwaitAdvance必然與此方法有關聯).固然這個做用與任務是否運行在ForkJoinPool無關,若是等待phaser前進的線程是運行在ForkJoinPool中的ForkJoinWorkerThread,顯然會在internalAwaitAdvance期間進行補償.這一塊可參考前面的"CompletableFuture與響應式編程"和"ForkJoin框架之ForkJoinPool"兩篇文章.
另外,這些代碼也再次說明了root的做用: (1)對一切非root的Phaser進行等待都會用root的internalAwaitAdvance;(2)每次註冊或arrive必定會同步root的最新phase.
其中(1)也間接說明了爲何構建Phaser時只有root建立等待隊列,全部子Phaser共享.
上面還保留了一個疑問,提到了"強制關閉Phaser"形成arriveAndAwaitAdvance出現cas失敗的問題,doug大神直接註釋了一個terminated,咱們立刻來看這一塊,以及一些周邊的公共函數,加深理解,而後再來解決關於等待隊列最後的一些問題.
//強制關閉Phaser,讓Phaser進入終止態,可是這個過程不影響它已註冊的parties,若是此Phaser是 //一個Phaser樹中的成員,那麼全部phaser集中的Phaser都會關閉,若是它已經關閉,此方法無效.此方法能夠 //用於若干任務出現意料以外異常的狀況下的協調恢復. public void forceTermination() { // Only need to change root state final Phaser root = this.root; long s; //已經是終止態直接忽略. while ((s = root.state) >= 0) { //直接嘗試給root的state加上終止位.顯然加上了它,子Phaser在註冊和arrive等方法同步回新的phase就是個負數, //所以更改root的phase爲負至關於判了全部Phaser的死刑.惟一須要解決的是已經阻塞在root.internalAwaitAdvandce的線程. if (UNSAFE.compareAndSwapLong(root, stateOffset, s, s | TERMINATION_BIT)) { // 加上終止位成功,前後喚醒偶數等待隊列和奇數等待隊列. releaseWaiters(0); // Waiters on evenQ releaseWaiters(1); // Waiters on oddQ //返回 return; } } } //返回當前phase,直接用root的state去取. public final int getPhase() { return (int)(root.state >>> PHASE_SHIFT); } //查詢註冊的parties數量.調用前面介紹過的partiesOf public int getRegisteredParties() { return partiesOf(state); }
//查詢已經arrived的parties數量.調用介紹過的arriveOf
public int getArrivedParties() { return arrivedOf(reconcileState()); } //查詢未arrive的parties數量,調用前面介紹過的unarrivedOf public int getUnarrivedParties() { return unarrivedOf(reconcileState()); } //返回parent public Phaser getParent() { return parent; } //返回root public Phaser getRoot() { return root; } //判斷當前Phaser是否終止,直接取root的state是否爲負,可見,終止態徹底取決於root. public boolean isTerminated() { return root.state < 0L; }
這些方法都比較簡單,只有forceTermination須要再強調一翻,前面介紹arrayAndAwaitAdvance時曾提過在減去最後一個unarrived信號後去cas到下一個phase失敗的狀況,doug大神簡單註釋了一句terminated,直接返回了當前的phase(顯然只能是負),在周邊方法重重加鎖的前提下,那一次cas的失敗惟一一處就是強制關閉,由於它只改關閉標記位,至關於動了phase,而沒有動unarrived標記位和parties標記位.因此重寫Phaser的方法要謹慎,極可能不當心打破了這個封裝.
從上面的有關方法能夠看出,子Phaser的終止態嚴重依賴於root,目前能夠肯定的是root的phase一旦表現出終止態,全部新來的註冊,arrive,arrive並await將會當即返回,惟一須要關注的就是root被設置了終止標記後,正陷入等待的線程怎麼辦的問題.
咱們下面就來看Phaser的等待機制,這裏面又能見到道格大神很是有趣的玩法.
//工具方法,移除某個phase的等待者. private void releaseWaiters(int phase) { QNode q; //保存隊列中的隊首 Thread t; // 保存線程引用. //取隊列,用phase的奇偶決定,phase是偶數就取偶數隊列,不然取奇數隊列.而這個phase其實只用來取隊列了,後續的操做與它無關. AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ; //循環,找出全部phase不等於root的phase的(其實root是最大的,因此就是找出非最新phase加入進來的waiter QNode) while ((q = head.get()) != null && q.phase != (int)(root.state >>> PHASE_SHIFT)) { //找出了,利用原子引用將head指向next. if (head.compareAndSet(q, q.next) && (t = q.thread) != null) { //發現阻塞者,喚醒線程.回憶下前面實現blocker方法中的isReleaseble和block方法都有將線程置空的操做.(三種狀況,喚醒擾動超時都會置空) //可是那些方法並無將表明該阻塞線程的QNode移除隊列,所以可能會發現thread已是null(表明無阻塞者)的狀況,只須要移除隊列便可. q.thread = null; LockSupport.unpark(t); } } } //上面releaseWaiters方法的一個變種,但它只會處理遍歷過程當中位於頭部的元素,出現正常的等待節點就會當即返回. //此方法在這一塊能夠有效的減小內存的佔用.退出時返回當前的phase. private int abortWait(int phase) { //一樣,參數phase只是用來選擇要處理的隊列. AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ; for (;;) { Thread t; QNode q = head.get(); //計算最新phase的值p int p = (int)(root.state >>> PHASE_SHIFT); if (q == null || ((t = q.thread) != null && q.phase == p)) //1.出現q爲null表明整隊列元素已出隊,直接返回p; //或者在出隊過程當中head(q)記錄的線程引用還在,說明未超時或擾動,且是本phase的等待節點,終止循環並返回最新phase. return p; if (head.compareAndSet(q, q.next) && t != null) { //進入條件,參考1的條件,由於1會直接返回.故進入2的條件實際上是q非空且處於舊的phase.只有這種狀況才能夠出隊. //2.將q出隊,置空線程引用,釋放線程. q.thread = null; LockSupport.unpark(t); } } } //計算有效cpu,控制自旋. private static final int NCPU = Runtime.getRuntime().availableProcessors(); //常量,每輪arrive等待的字旋數,取決於NCPU,小於2則取1,不小於2取2的8次冪. static final int SPINS_PER_ARRIVAL = (NCPU < 2) ? 1 : 1 << 8; //珊珊來遲的內部等待方法.它可能會一直阻塞到phase的advance發生(除非取消了等待). //此方法僅限root調用.參數phase表示當前的phase,參數node表示等待節點,用於追蹤節點的擾動或超時. //若是是null,表示是一次不可擾動的等待.返回值爲當前最新的phase. private int internalAwaitAdvance(int phase, QNode node) { // 1.調用releaseWaiters,傳入參數phase的前一個phase,顯然這只是決定釋放哪個隊列.參數絕對實時準確的狀況下會先將老的隊列釋放掉. releaseWaiters(phase-1); //節點入隊標記,入隊了就會變爲true boolean queued = false; //記錄每一輪循環的unarrived數量,用於決定是否擴增自旋等待次數. int lastUnarrived = 0; //自旋數,參考上面的計算邏輯. int spins = SPINS_PER_ARRIVAL; long s; int p; //開啓循環,直到phase前進爲止或內部判斷已取消等待. while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) { //2.傳入node是null,即非可擾動的模式邏輯.只有非可擾動模式纔有自旋. if (node == null) { //2.1每輪自讀進入都會嘗試計算新的unarrived,若是發現出現了變更(變大或者變小), //會將它保存到前面的lastUnarrived. int unarrived = (int)s & UNARRIVED_MASK; if (unarrived != lastUnarrived && (lastUnarrived = unarrived) < NCPU) //發現新變化的unarrived<NCPU,擴增自旋次數,繼續自旋. //unarrived的變化,若沒有大量新的parties註冊,會在自旋過程當中變小,反之大量加入註冊,大於了NCPU則放棄增長自旋次數. spins += SPINS_PER_ARRIVAL; //2.2,未發現本輪循環unarrived發生變化,或者增長了大量註冊,形成大於NCPU的邏輯,首先記錄此時的線程擾動狀態. boolean interrupted = Thread.interrupted(); //2.3接2.2,若是發現了線程被擾動了,或者經若干次自旋減小次數,自旋次數並未能在2.1進行增長,直至減爲0,進入if. if (interrupted || --spins < 0) { // need node to record intr //2.4,知足2.3進入if的條件,再也不繼續自旋了,由於參數沒有提供node,此處初始化一個node,不定時,不可擾動,並保存擾動狀態. //下輪循環將沒法進入2. node = new QNode(this, phase, false, false, 0L); node.wasInterrupted = interrupted; } } //3.參數傳入了node,或者在2.4進入了node的初始化,每一輪循環到此都先判斷是否可釋放(若能夠,內部會置thread爲null). else if (node.isReleasable()) //發現node所處的phase已經達到或者取消了,則break掉循環. break; //4.未能在非擾動模式下自旋解決(2)或提早發現node的擾動且未將node入隊的狀況下,將node入隊. else if (!queued) { //選擇當前phase表明的隊列. AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ; QNode q = node.next = head.get(); //這一行不起眼的if條件代碼真的是一個悄無聲息解決了一個大暗坑的地方,後面說. if ((q == null || q.phase == phase) && (int)(state >>> PHASE_SHIFT) == phase) //double check避免髒入隊,入隊條件是(1)無頭,(2)或者頭元素的phase等於參數phase(由於相鄰的兩個phase絕對不會入同一個隊). //知足(1)(2)的同時,還要知足(3),參數phase就是當前的state表示的phase(由於此方法只能root使用,故爲root表示的最新phase). //條件知足,入隊,取代原來的head,原來head表明的node成爲node的next.而條件不知足進入下一循環,極可能while條件就不知足了退出循環. queued = head.compareAndSet(q, node); } //5.已經在某一輪循環入隊了,使用ForkJoinPool的managedBlock管理block,其間可能會釋放線程引用. else { try { //5.1它內部也有循環,且會調用前面看到過的isReleasable和block實現,顯然它一旦結束(包含擾動),必定會形成下輪外循環終止於3處. ForkJoinPool.managedBlock(node); } catch (InterruptedException ie) { //5.2出現擾動異常catch住,並保存.下輪循環也會終止在3處. node.wasInterrupted = true; } } } //6.走出上面的while循環,多是root已經advance到下一個phase(2前的循環),也多是傳入node的狀況下出現了擾動或超時(5)形成(3)知足 if (node != null) { //6.1node存在表明可能已經壓入隊列,結果要麼是已出現擾動或超時(方法結束後會拋出異常),要麼是已正常完成. //顯然,代碼執行到此處就要返回了,阻塞的線程會拋出異常結束(超時或擾動)或繼續執行(正常advance), //沒有必要去嘗試喚醒能執行出前面while循環到達6立刻要返回的線程. if (node.thread != null) //6.2取消node中的線程引用,避免外面的線程嘗試喚醒. node.thread = null; // avoid need for unpark() if (node.wasInterrupted && !node.interruptible) //6.3若是node自己設置了不可被擾動,但5.2處判斷線程自己拋出了擾動異常,卻被catch住了,此處擾動本線程. Thread.currentThread().interrupt(); if (p == phase && (p = (int)(state >>> PHASE_SHIFT)) == phase) //6.4發現phase並未前進.仍是參數傳入的pahse,說明必定是擾動或超時的結果,abortWait對本phase使用的隊列進行清理, //而清理的目標前面已論述過,是本隊列頭部開始的早於本phase的元素.(發現一個不知足條件的就中止了清理). return abortWait(phase); // possibly clean up on abort } //7.退出上面的while循環必定會到此幫助釋放早於最新階段的waiter.注意,是早於最新phase的,參數phase只是決定了選哪一個隊列(奇偶). //若是是6.4表明的那種擾動超時狀況,此處其實釋放的是舊的結果.被喚醒的線程其實通常是執行在5.1處阻塞的.當前線程能運行到此絕對不須要喚醒. releaseWaiters(phase); return p; }
到此Phaser的代碼解析已完畢,咱們來分析關於隊列,等待和喚醒的問題.
1.Phaser維護了兩個"隊列",不論加入等待隊列仍是彈出等待隊列,都是從頭部進行,新加入的成員會成功隊列的新頭,原來的頭會成爲它的next,彈出時next成爲新頭.因此至關於一個對頭部的"後進先出",考慮官方起名和註釋,咱們依舊保持隊列這個稱呼.
2.喚醒時,會從隊列的頭部依次彈出node 的phase早於root的最新phase的node,.
3.等待時,入隊的node成爲新的頭.
4.當輪次增長時,會使用和本輪不一樣的隊列增長元素,同時也會喚醒本輪中等待的node.
由於喚醒和等待同時進行,且各自操做各自的隊列(不一樣的phase),所以彼此之間沒有競態(儘管一個是頭入一個是頭出),能夠說設計巧妙,下面咱們來腦洞大開,思考一個極端狀況.
咱們假設一種極端的phase切換場景,奇數phase大量等待入隊,偶數phase則迅速完成.假設當前phase對應的隊列是奇數對列,輪次提高完成後,它去釋放當前的隊列元素,結果未等這個釋放操做執行完畢,偶數隊列的輪次很快執行完,奇數隊列中積壓了成千上萬個node未能釋放,輪次卻又切回到了奇數隊列,會出現什麼事?
顯然奇數隊列若是一直保持這種極端場景,它會愈來愈龐大,逼近撐爆內存的同時,大量線程也會得不到釋放,甚至於老一輪的線程須要等待新一輪的線程去釋放.爲何老一輪的線程會去等待新一輪的線程釋放呢?
releaseWaiter的方法咱們已經看出,它只會釋放phase早於最新的node,此時最新壓入的元素屬於當前最新的phase,顯然不知足條件,那麼會形成奇數隊列中兩輪前壓入的元素不能獲得清除,兩輪前就在釋放當時積壓node的線程(那一輪最後一個arrive)發現不符合清理條件,就直接return並終止了,只能等待本輪最後一個arrive出現後繼續進行釋放.若是本輪最後一個arrive出現很晚,在下一輪依舊保持如此極端,往返數輪,確實會致使奇數隊列中積壓大量node,且第一輪就在等待該輪次結束的線程早就知足了釋放條件(升到了2輪),事實上多是第n輪才獲得釋放,這還符合Phaser的定義嗎?咱們使用它,就是要保證每一輪單獨使用,每一輪次達到條件,線程釋放並執行,下一輪次是下一輪次.
然而doug的代碼就是這個樣子,想遍各類極端,以爲可能找到了bug,那麼就須要仔細思考了.做者來簡述一下這個趟坑的分析過程.
這個問題確實已經獲得了極大的規避了,畢竟是個極端狀況.
1.線程的喚醒真的很快,儘管此處除了喚醒還包含了原子引用的更新(每次出隊都要cas).
2.若是沒有註冊,顯然就沒有arrive相關的狀況,儘管能夠單獨調用,但必須保證在arrive時傳入的數量此時已經註冊了,所以每一輪次(phase)中可能積壓等待喚醒的線程的操做必定是在註冊以後,可是咱們回憶一下,註冊方法的第一步就是要等待完成advance,並且傳給internalAwaitAdvance的node會是null,即不能擾動和超時,因此當本輪次阻塞了必定數量的線程後,若是不去arrive,也不考慮超時和擾動的狀況,那麼線程將一直阻塞.咱們不可能在輪次advance前進行註冊,也就不可能在advance以前進行新一phase的arrive.
3.當本輪次的最後一個arrive線程觸發了輪次的更新後,才能夠開啓註冊以及新輪次的arrive,可是此時使用了另外一個等待隊列,而觸發了輪次更新的上一輪的arrive線程將會當即進行前一個隊列中積壓的線程的喚醒操做.只有該喚醒操做足夠慢,且新的輪次極快就完成了的狀況,纔可能形成在原arrive線程未能及時釋放奇數隊列的狀況下,新一輪次再次向其中添加元素.
4.最重要的還在上面的internalAwaitAdvance方法,那一段被做者標上了入隊條件的註釋處,要想入隊,必須if ((q == null || q.phase == phase) &&加上後面的條件,而這兩個條件的限定已經很明顯,要想入隊,必須知足該等待隊列沒有元素或者隊首是本輪的元素,而該方法又是下一輪首次註冊時必須等待完成的,下一輪的arrive又必須發生在下一輪的首次註冊以後,所以根本不會出現本輪wait的線程還要等下一輪甚至下N輪的線程去釋放的極端狀況,哪怕真的去作一個極端測試:讓奇數輪大量積壓線程,讓偶數輪快速切換,而後測試第一輪壓入的線程究竟是不是本輪釋放的.(做者差點就要寫單元測試去作這個極端測試了!)
這一段不經意的if,一個小小的條件,若是不注意真的忽略了,小代碼大功效,誰能想到,這麼深的暗坑就這樣被規避了.
前面已經詳述了Phaser的源碼以及若干趟坑辛路.其實已經沒什麼好總結的了,就在此順便對比常見同步器CyclicBarrier,CountDownLatch,Semaphore的特徵和實現.
從使用特徵上看:
1.CountDownLatch是一次性的,只能初始化決定parties數量,等待者能夠是多個,每次釋放都會減小一個信號量,直到歸0時爲止,最後一個釋放者將喚醒其餘等待的線程.它也不能繼續使用.
2.CyclicBarrier是可重用的,分代的,每一代之間彼此獨立,可是每一代的初始parties是相同的,不可在運行期內動態調整,每一代最後一個線程會去開啓一下代,並能夠在此時運行一個用戶指定的action,與此同時喚醒其餘線程繼續執行.它能夠在運行完一代後繼續被使用.而且它還支持重置.
3.Semaphore是一個資源量的典型,若是說CountDownLatch和CyclicBarrier或者Phaser都是等到"人夠了"再放行,Semaphore倒是起到限流的做用,它控制了有限的令牌數,這個數量不能夠動態地更改,在不能acquire到足夠的令牌數時,線程將阻塞,直到其餘線程釋放了足量的令牌數並喚醒它爲止.每個持有了令牌的線程均可以喚醒阻塞等待獲取的線程.
4.Phaser的功能上不一樣很明顯,首先它的參與者數量幾乎時刻可變(除了正在進入下一phase期間),隨時能夠增長減小parties數量,每一phase等待者能夠是多個,每一phase中,每一個能從internalAwaitAdvance方法中走出循環的線程均可以幫助喚醒,固然最終能進入喚醒操做仍是要歸功於最後一個arrive的線程(儘管它arrive後其餘線程醒來後也會幫助喚醒).Phaser的喚醒者不必定是參與者.
從實現來看:
1.CountDownLatch藉助了aqs來實現parties的釋放,它使用cas+park的方式,不使用Lock.
2.CyclicBarrier須要藉助重入鎖和condition,每個await的線程都要全局加鎖,阻塞時await在condition上.
3.Semaphore在實現上相似CountDownLatch,也是基於aqs,只不過它容許獲取和釋放,對state有增有減,總量不變.也是cas+park的方式阻塞,也不使用Lock
4.Phaser由於功能的要求,不基於AQS(它不能有構建時就固定的state,儘管能夠初始化一個state,但它必須支持改變),它依託於原子引用實現了一個內部的隊列,相應的等待/入隊/喚醒等操做經過cas自旋+park的方式,一樣不使用Lock.並利用雙隊列的方式規避了前一輪的釋放和後一輪的響醒的阻塞.
此外還有兩點結合前面的推理和自測驗證的結論:
1.Phaser中的每個phase是保證了可見性的,經做者自測,在任何使用Phaser的代碼中await先後,不會出現串phase讀出的亂序狀況(側面說明每一個phase不會依賴後一個或幾個phase的釋放).
2.Phaser須要對await的線程進行阻塞時,是將它打包成一個node(blocker),利用ForkJoinPool來block的.若是使用Phaser同步的任務是運行在ForkJoinPool中的,它將會利用到相應的補償機制,經做者自測,這將保證Phaser中block的每個任務必然獲得執行,每個阻塞的線程必然獲得釋放.