Phaser是JDK7新添加的線程同步輔助類,做用同CyclicBarrier,CountDownLatch相似,可是使用起來更加靈活:函數
1. Parties是動態的。this
2. Phaser支持樹狀結構,即Phaser能夠有一個父Phaser。spa
Phaser的構造函數涉及到兩個參數:父Phaser和初始的parties,所以提供了4個構造函數:線程
public Phaser(); public Phaser(int parties); public Phaser(Phaser parent); public Phaser(Phaser parent, int parties);
由於Phaser的特點在在於動態的parties,所以首先來看動態更新parties是如何實現的。code
Phaser提供了兩個方法:register和bulkRegister,前者會添加一個須要同步的線程,後者會添加parties個須要同步的線程。對象
public int register() { return doRegister(1); } // 增長了參數的檢查 public int bulkRegister(int parties) { if (parties < 0) throw new IllegalArgumentException(); if (parties == 0) return getPhase(); return doRegister(parties); }
兩個方法都調用了doRegister方法,所以接下來就來看看doRegister方法。blog
在分析doRegister以前先來講說Phaser的成員變量:state,它存儲了Phaser的狀態信息:ci
private volatile long state;
1. state的最高位是一個標誌位,1表示Phaser的線程同步已經結束,0表示線程同步正在進行get
2. state的低32位中,低16位表示沒有到達的線程數量,高16位表示Parties值同步
3. state的高32位除了最高位以外的其餘31位表示的Phaser的phase,能夠理解爲第多少次同步(從0開始計算)。
介紹完了state,來看方法doRegister:
private int doRegister(int registrations) { // 把registrations值同時加到parties值和還未達到的線程數量中去 long adj = ((long)registrations << PARTIES_SHIFT) | registrations; final Phaser parent = this.parent; int phase; for (;;) { long s = state; int counts = (int)s; int parties = counts >>> PARTIES_SHIFT; int unarrived = counts & UNARRIVED_MASK; // 超過了容許的最大parties if (registrations > MAX_PARTIES - parties) throw new IllegalStateException(badRegister(s)); // 最高位爲1,表示Phaser的線程同步已經結束 else if ((phase = (int)(s >>> PHASE_SHIFT)) < 0) break; // Phaser中的parties不是0 else if (counts != EMPTY) { // 若是當前Phaser沒有父Phaser,或者若是有父Phaser, // 刷新本身的state值,若是刷新後的state沒有變化。 // 這裏刷新子Phaser的緣由在於,會出現父Phaser已經進入下一個phase // 而子Phaser卻沒有及時進入下一個phase的延遲現象 if (parent == null || reconcileState() == s) { // 若是全部線程都到達了,等待Phaser進入下一次同步開始 if (unarrived == 0) root.internalAwaitAdvance(phase, null); // 更新state成功,跳出循環完成註冊 else if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s + adj)) break; } } // 第一次註冊,且不是子Phaser else if (parent == null) { // 更新當前Phaser的state值成功則完成註冊 long next = ((long)phase << PHASE_SHIFT) | adj; if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next)) break; } // 第一次註冊到子Phaser else { // 鎖定當前Phaser對象 synchronized (this) { // 再次檢查state值,確保沒有被更新 if (state == s) { // 註冊到父Phaser中去 parent.doRegister(1); do { // 獲取當前phase值 phase = (int)(root.state >>> PHASE_SHIFT); } while (!UNSAFE.compareAndSwapLong (this, stateOffset, state, ((long)phase << PHASE_SHIFT) | adj));// 更新當前Phaser的state值 break; } } } } return phase; }
看完了註冊,那麼來看同步操做的arrive,這裏也涉及到兩個方法:arrive和arriveAndDeregister,前者會等待其餘線程的到達,後者則會馬上返回:
public int arrive() { return doArrive(false); } public int arriveAndDeregister() { return doArrive(true); }
兩個方法都調用了doArrive方法,區別在於參數一個是false,一個是true。那麼來看doArrive:
private int doArrive(boolean deregister) { // arrive須要把未到達的線程數減去1, // deregister爲true,須要把parties值也減去1 int adj = deregister ? ONE_ARRIVAL|ONE_PARTY : ONE_ARRIVAL; final Phaser root = this.root; for (;;) { // 若是是有父Phaser,首先刷新本身的state long s = (root == this) ? state : reconcileState(); int phase = (int)(s >>> PHASE_SHIFT); int counts = (int)s; int unarrived = (counts & UNARRIVED_MASK) - 1; // 最高位爲1,表示同步已經結束,返回phase值 if (phase < 0) return phase; // 若是parties爲0或者在這次arrive以前全部線程到達 else if (counts == EMPTY || unarrived < 0) { // 對於非子Phaser來講,上述狀況的arrive確定是非法的 // 對於子Phaser首先刷新一下狀態再作檢查 if (root == this || reconcileState() == s) throw new IllegalStateException(badArrive(s)); } // 正常狀況下,首先更新state else if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adj)) { // 全部線程都已經到達 if (unarrived == 0) { // 計算parties做爲下一個phase的未到達的parties long n = s & PARTIES_MASK; int nextUnarrived = (int)n >>> PARTIES_SHIFT; // 調用父Phaser的doArrive if (root != this) // 若是下一個phase的未到達的parties爲0,則須要向 // 父Phaser取消註冊 return parent.doArrive(nextUnarrived == 0); // 正在進入下一個Phase,默認的實現是nextUnarrived爲0 // 表示正在進入下一個Phase,由於下一個phase的parties // 爲0,須要等待parties不爲0 if (onAdvance(phase, nextUnarrived)) // 正在等待下一個phase,設置狀態爲終止 n |= TERMINATION_BIT; else if (nextUnarrived == 0) // 下一個phase的parties爲0,更新未到達的parties的值 n |= EMPTY; else // 更新下一個phase的未到達的parties的值 n |= nextUnarrived; // phase值加1 n |= (long)((phase + 1) & MAX_PHASE) << PHASE_SHIFT; // 更新state值 UNSAFE.compareAndSwapLong(this, stateOffset, s, n); // 喚醒等待的線程 releaseWaiters(phase); } return phase; } } }
關於arrive還有一個方法:arriveAndAwaitAdvance。這個方法會等到下一個phase開始再返回,相等於doArrive方法添加了awaitAdvance方法的功能。基本邏輯和上面說的doArrive方法相似:
public int arriveAndAwaitAdvance() { final Phaser root = this.root; for (;;) { long s = (root == this) ? state : reconcileState(); int phase = (int)(s >>> PHASE_SHIFT); int counts = (int)s; int unarrived = (counts & UNARRIVED_MASK) - 1; if (phase < 0) return phase; else if (counts == EMPTY || unarrived < 0) { // 對於非子Phaser來講,由於能夠等待下一個phase, // 因此不是非法arrive if (reconcileState() == s) throw new IllegalStateException(badArrive(s)); } else if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s -= ONE_ARRIVAL)) { // 還有其餘線程沒有達到,就會等待直到下一個phase開始 if (unarrived != 0) return root.internalAwaitAdvance(phase, null); if (root != this) return parent.arriveAndAwaitAdvance(); long n = s & PARTIES_MASK; // base of next state int nextUnarrived = (int)n >>> PARTIES_SHIFT; if (onAdvance(phase, nextUnarrived)) n |= TERMINATION_BIT; else if (nextUnarrived == 0) n |= EMPTY; else n |= nextUnarrived; int nextPhase = (phase + 1) & MAX_PHASE; n |= (long)nextPhase << PHASE_SHIFT; if (!UNSAFE.compareAndSwapLong(this, stateOffset, s, n)) return (int)(state >>> PHASE_SHIFT); releaseWaiters(phase); return nextPhase; } } }
這一部分主要講了Phaser的動態更新parties以及線程的arrive,下一部分將會分析線程等待的實現。