Phaser是java7中引入的,是併發包中提供的線程同步輔助工具類,是個階段屏障, 全部parties線程需等待此階段屏障的全部線程都到達,才能進入下一階段屏障持續執行。 CyclicBarrier、CountDownLatch、Phaser 三個都是線程同步輔助工具類,同步輔助三劍客。CountDownLatch不能重用,CyclicBarrier、Phaser均可以重用,而且Phaser 更加靈活能夠在運行期間隨時加入(register)新的parties,也能夠在運行期間隨時退出(deregister),支持層級,根root Phaser,父Phaser把每一個子的 Phaser(每一個子的內部也有多個parties線程,也有多階段) 當作父Phaser的一個parties,父Phaser等待全部的parties都到達父的階段屏障, 即子Phaser的全部階段都執行完,即子Phaser都到達父的階段屏障。喚醒全部的子Phaser的parties線程繼續執行下一階段。 CyclicBarrier也能夠作爲階段屏障使用,每一個線程重複作爲CyclicBarrier的parties,可是沒辦法想Phaser那樣支持層級。 例如比賽,一個比賽分爲3個階段(phase): 初賽、複賽和決賽,規定全部運動員都完成上一個階段的比賽才能夠進行下一階段的比賽,而且比賽的過程當中容許退賽(deregister),這個場景就很適合Phaser,這個例子的話CyclicBarrier也能夠實現,若是是更復雜如奧運閉幕(父Phaser),需等待各類比賽結束,如跳遠(子Phaser,也有多個階段,初賽、複賽和決賽),田徑(子Phaser,也有多個階段,初賽、複賽和決賽) 等,Phaser就更加適合,你也能夠把重造輪子寫個CyclicBarrier子類,改形成相似Phaser支持層級的功能。CyclicBarrier能夠看個人另外一篇源碼分析 juejin.im/post/5d3bf8…,CountDownLatch能夠看個人另外一篇juejin.im/post/5d3593…。 java
/** * Phaser中的狀態,64位的屬性state不一樣位被用來存放不一樣的值,低16位存放unarrived,低32位中的高16位存放parties,高32位的低31位存放phase,最高位存放terminated,即Phaser是否關閉 * * unarrived -- 還沒到達階段屏障的參與者數量,能夠在運行期間增長,屬性state的低16位存放 (bits 0-15) * parties -- Phaser中的參與者,能夠在運行期間增長,低32位中的高16位存放parties,每次進入下一階段unarrived會被從新賦值爲parties值 (bits 16-31) * phase -- 記錄Phaser的當前階段,只有全部的參與者都到達階段屏障,才能進入下一階段屏蔽,高32位的低31位存放phase(bits 32-62) * terminated -- 屬性state最高位存放terminated,即Phaser是否關閉 (bit 63 / sign) */
private volatile long state;
//Phaser中的最大參與者值
private static final int MAX_PARTIES = 0xffff;
//Phaser中的最大階段值,屬性state的高32位的低31位存放phase記錄Phaser的當前階段的最大值private static final int MAX_PHASE = Integer.MAX_VALUE;
//屬性state的低32位中的高16位存放parties,即Phaser中的參與者,在下面獲取parties時,須要用到的移位操做值
private static final int PARTIES_SHIFT = 16;
//屬性state的高32位的低31位存放記錄Phaser的當前階段值,在下面獲取phase時,須要用到的移位操做值
private static final int PHASE_SHIFT = 32;
//屬性state的低32位中的低16位存放未抵達當前階段屏障的參與者個數,與UNARRIVED_MASK值&得未抵達當前階段屏障的參與者個數
private static final int UNARRIVED_MASK = 0xffff; // to mask ints
//屬性state的低32位中的高16位存放parties,即Phaser中的參與者,與PARTIES_MASK值&得屏障的參與者個數
private static final long PARTIES_MASK = 0xffff0000L; // to mask longs
//屬性state的低32位中的低16位存放未抵達當前階段屏障的參與者個數,低32位中的高16位存放parties,即Phaser中的參與者,與COUNTS_MASK值&得unarrived和parties兩部分值
private static final long COUNTS_MASK = 0xffffffffL;
//屬性state最高位存放terminated,即Phaser是否關閉,若是想關閉屏蔽state屬性值或(|)上TERMINATION_BIT值
private static final long TERMINATION_BIT = 1L << 63;
//階段屏障的一個參與者到達,即unarrived值減1
private static final int ONE_ARRIVAL = 1;
//一個參與者,屬性state的低32位的高16位存放parties,只在ONE_DEREGISTER中使用到
private static final int ONE_PARTY = 1 << PARTIES_SHIFT;
//屏障的一個參與者取消註冊,即一個參與者退出屏障,即屬性state值的低32位的高16位parties值減1,低16位unarrived值減1
private static final int ONE_DEREGISTER = ONE_ARRIVAL|ONE_PARTY;
//若是在構造Phaser時,傳入進來的參與者個數parties等於0,屬性state的初始值
private static final int EMPTY = 1;
//Phaser支持層級,父Phaser把每一個子的Phaser(每一個子的內部也有多個parties線程,也有多階段)當作父Phaser的一個parties,父Phaser等待全部的parties都到達父的階段屏障, 即子Phaser的全部階段都執行完,即子Phaser都到達父的階段屏障。喚醒全部的子Phaser的parties線程繼續執行下一階段
private final Phaser parent;
//不論是有層級仍是無層級的Phaser的根Phaser root都是同一個,沒有層級是自身,有層級就是父的root
private final Phaser root;
//偶鏈表,Phaser屏障是有多個階段,爲了防止競爭,偶數的階段採用偶鏈表,一個參與者在到達階段屏障時,還有其餘參與者還未到達,自旋一段時間,其他參與者還未到達,將其封裝成節點加入鏈表中
private final AtomicReference<QNode> evenQ;
//奇鏈表
private final AtomicReference<QNode> oddQ;
//得到可用的處理器個數
private static final int NCPU = Runtime.getRuntime().availableProcessors();
//一個參與者在到達階段屏障時,還有其餘參與者還未到達,參與者線程不是直接進入等待狀態而是先自旋一段時間,自旋值根據處理器個數
static final int SPINS_PER_ARRIVAL = (NCPU < 2) ? 1 : 1 << 8;
//UnSafe的使用,若是不清楚的話,能夠看https://juejin.im/user/5bd8718051882528382d8728/shares中的UnSafe介紹
private static final sun.misc.Unsafe UNSAFE;
//屬性state的相對偏移量,相對Phaser實例的起始內存位置的相對偏移量,定義成靜態的緣由是,屬性的相對實例的偏移量都是相等的
private static final long stateOffset;
static {
try {
//獲取UnSafe實例
UNSAFE = sun.misc.Unsafe.getUnsafe();
//Phaser的Class信息
Class<?> k = Phaser.class;
//使用UnSafe實例獲取Phaser類的屬性state的相對偏移量
stateOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("state"));
} catch (Exception e) {
throw new Error(e);
}
}複製代碼
//一個參與者在到達階段屏障時,還有其餘參與者還未到達,自旋一段時間,其他參與者還未到達,將其封裝成節點QNode加入鏈表中
static final class QNode implements ForkJoinPool.ManagedBlocker {
//Phaser實例,需拿到state的階段值phaser,因爲state用volatile修飾,java內存模型加上happen-before,保證state的寫對後續的讀可見
final Phaser phaser;
//當前節點所處的屏障Phaser階段
final int phase;
//當前等待節點線程是否可被中斷
final boolean interruptible;
//當前等待節點線程是否超時等待其餘未到達階段屏障參與者線程到達階段屏障
final boolean timed;
//當前等待節點線程在等待的過程當中是否被中斷
boolean wasInterrupted;
//當前等待線程若是支持超時等待,等待的超時時間,單位納秒
long nanos;
//當前等待節點等待的超時的時間點
final long deadline;
//當前等待節點的線程,可能爲空,當前節點被取消等待
volatile Thread thread; // nulled to cancel wait
//因爲QNode是個鏈表,當前等待節點的下一等待節點
QNode next;
//QNode構造函數,傳入phaser實例,phase當前等待節點所處的階段,interruptible當前節點線程是否可被中斷,timed當前等待節點線程是否超時等待,nanos當前等待線程若是支持超時等待,等待的超時時間
QNode(Phaser phaser, int phase, boolean interruptible,
boolean timed, long nanos) {
this.phaser = phaser;
this.phase = phase;
this.interruptible = interruptible;
this.nanos = nanos;
this.timed = timed;
this.deadline = timed ? System.nanoTime() + nanos : 0L;
thread = Thread.currentThread();
}
//當前等待節點是否被釋放
public boolean isReleasable() {
//若是等待節點的線程爲空,代表當前等待節點被釋放
if (thread == null)
//返回true,代表當前等待節點被釋放
return true;
//若是當前等待節點的屏障階段和當前的phaser的階段不一致,代表當前等待節點被釋放
if (phaser.getPhase() != phase) {
//將等待節點對應的線程置爲空
thread = null;
//返回true,代表當前等待節點被釋放
return true;
}
//若是節點對應的線程被中斷
if (Thread.interrupted())
//wasInterrupted置爲true,代表節點對應線程被中斷
wasInterrupted = true;
//若是節點對應線程被中斷,而且當前節點支持線程可中斷
if (wasInterrupted && interruptible) {
//將等待節點對應的線程置爲空
thread = null;
//返回true,代表當前等待節點被釋放
return true;
}
//當前等待節點線程超時等待其餘未到達階段屏障參與者線程到達階段屏障
if (timed) {
//屬性nanos超時時間大於0
if (nanos > 0L) {
//節點的超時時間點減去當前時間,得到新的超時時間
nanos = deadline - System.nanoTime();
}
//若是屬性nanos超時時間小於0,代表當前等待節點線程等待其他參與者線程到達階段屏障超時
if (nanos <= 0L) {
//將等待節點對應的線程置爲空
thread = null;
//返回true,代表當前等待節點被釋放
return true;
}
}
//返回false,代表當前等待節點還未被釋放
return false;
}
//阻塞節點線程,線程被喚醒時,判斷節點是否被釋放
public boolean block() {
//調用上面的isReleasable判斷節點是否被釋放
if (isReleasable())
//返回true,節點被釋放
return true;
//若是節點線程不支持超時等待其他參與者線程到達階段屏障,LockSupport.park阻塞節點線程,直到調用unpark喚醒
else if (!timed)
//調用LockSupport阻塞節點線程
LockSupport.park(this);
//若是節點支持超時等待其他參與者線程到達階段屏障,而且超時時間大於0
else if (nanos > 0L)
//調用LockSupport.parkNanos超時的等待其他參與者線程到達階段屏障,直到超時,或者調用unpark喚醒
LockSupport.parkNanos(this, nanos);
//調用上面的isReleasable判斷節點是否被釋放
return isReleasable();
}
}複製代碼
/** * 建立新的屏障Phaser實例,無參構造Phaser實例,沒有註冊parties參與者,而且沒有父Phaser,初始屏障階段數字0 */
public Phaser() {
//調用下面Phaser(Phaser parent, int parties)構造函數,父Phaser爲null,parties參與者初始化爲0
this(null, 0);
}
/** * 建立新的屏障Phaser實例,註冊parties參與者數parties,沒有父Phaser,初始屏障階段數字0 * * @param parties 初始參與者數 */
public Phaser(int parties) {
//調用下面Phaser(Phaser parent, int parties)構造函數,父Phaser爲null,parties參與者初始化爲parties
this(null, parties);
}
/** * 建立新的屏障Phaser實例,沒有註冊parties參與者,傳入父Phaser,初始屏障階段數字0 * * @param parent 父Phaser */
public Phaser(Phaser parent) {
//調用下面Phaser(Phaser parent, int parties)構造函數,父Phaser爲parent,parties參與者初始化爲0
this(parent, 0);
}
/** * 建立新的屏障Phaser實例,註冊parties參與者,傳入父Phaser,初始屏障階段數字0 */
public Phaser(Phaser parent, int parties) {
//若是傳入parties是負數,或者值超過MAX_PARTIES,都會拋出IllegalArgumentException異常
if (parties >>> PARTIES_SHIFT != 0)
//拋出IllegalArgumentException異常
throw new IllegalArgumentException("Illegal number of parties");
//初始屏障階段爲0
int phase = 0;
//屬性值parent賦值爲傳入進來的父Phaser parent
this.parent = parent;
//若是傳入的父Phaser不爲空,代表當前Phaser實例支持層級
if (parent != null) {
//獲取父的root phaser
final Phaser root = parent.root;
//將當前Phaser實例根root phaser賦值爲父Phaser的root phaser
this.root = root;
//將當前Phaser實例偶鏈表賦值爲根root phaser的偶鏈表
this.evenQ = root.evenQ;
//將當前Phaser實例奇鏈表賦值爲根root phaser的奇鏈表
this.oddQ = root.oddQ;
//註冊parties參與者不等於0,代表有註冊parties參與者
if (parties != 0)
//父Phaser把每一個子的 Phaser(每一個子的內部也有多個parties線程,也有多階段)當作父Phaser的一個parties,爲此須要調用父的doRegister方法將當前子Phaser當作一個parties註冊到父Phaser中,doRegister方法在下面進行介紹
phase = parent.doRegister(1);
}
//若是傳入的父Phaser爲空,代表當前Phaser實例不支持層級
else {
//將當前Phaser實例根root phaser賦值爲自身
this.root = this;
//初始化一個新的偶鏈表
this.evenQ = new AtomicReference<QNode>();
//初始化一個新的奇鏈表
this.oddQ = new AtomicReference<QNode>();
}
//初始化屬性state,若是沒有註冊的parties參與者,將屬性state值賦值爲EMPTY
this.state = (parties == 0) ? (long)EMPTY :
//不然將phaser左移32位,或(|)上 ,將parties左移16位,或上parties值,即unarrived初始爲parties
((long)phase << PHASE_SHIFT) | ((long)parties << PARTIES_SHIFT) | ((long)parties);
}複製代碼
/** * 在Phaser運行期間註冊一個參與者 */
public int register() {
//調用下面介紹的doRegister方法註冊一個參與者到Phaser實例中
return doRegister(1);
}
//@param registrations 註冊參與者數目
private int doRegister(int registrations) {
// adjustment to state unarrived
//因爲註冊參與者,須要調整parties和unarrived的值,爲此registrations需左移16位獲得parties值,或(|)上registrations(unarrived值)
long adjust = ((long)registrations << PARTIES_SHIFT) | registrations;
//當前Phaser實例獲取父Phaser實例
final Phaser parent = this.parent;
//記錄當前屏障處於第幾個階段
int phase;
//循環,直到註冊參與者成功
for (;;) {
//若是當前Phaser實例沒有父Phaser,獲取當前Phaser實例的屬性state值(terminated、phase、parties、unarrived),不然調用reconcileState()方法,reconcileState是調整當前Phaser實例的屬性state中的phase值與root的一致
long s = (parent == null) ? state : reconcileState();
//獲取state屬性值的低32位,低32位的高16位存放parties值,低16位存放unarrived值
int counts = (int)s;
//counts高16位存放parties值,低16位存放unarrived值,無符號右移16位獲取到parties值
int parties = counts >>> PARTIES_SHIFT;
//counts低16位存放unarrived值,counts與(&)上UNARRIVED_MASK(0xffff),獲取到低16位存放unarrived值
int unarrived = counts & UNARRIVED_MASK;
//傳入進來要註冊參與者數目是否大於Phaser實例容許註冊的參與者數目,即傳入進來要註冊的參與者數目加上已經註冊的參與者數目大於Phaser實例容許的最大參與者數目MAX_PARTIES
if (registrations > MAX_PARTIES - parties)
//傳入進來要註冊參與者數目大於Phaser實例容許註冊的參與者數目,拋出IllegalStateException異常,badRegister方法看下面介紹
throw new IllegalStateException(badRegister(s));
//屬性state高32位的低31位存放phase(Phaser實例的階段值),屬性state值無符號右移32位獲取到phase值
phase = (int)(s >>> PHASE_SHIFT);
//若是phase值小於0,表示屬性state的最高位爲1,屬性state最高位存放terminated,即Phaser是否關閉,若是terminated爲1,Phaser關閉
if (phase < 0)
//若是Phaser關閉,直接退出循環
break;
//若是counts(高16位存放parties值,低16位存放unarrived值)不等於EMPTY,即當前Phaser實例不是註冊第一個參與者
if (counts != EMPTY) { // not 1st registration
//若是當前Phaser實例不爲空,或者屬性state值沒有改變
if (parent == null || reconcileState() == s) {
//若是當前階段屏障Phaser實例的unarrived等於0,代表當前全部參與者都到達階段屏障,只是還沒使用UnSafe的compareAndSwapLong將unarrived更新成parties,即還沒使用CAS將屬性state值更新成新的state值,調用internalAwaitAdvance方法先自旋一段等待Phaser實例的進入到下一階段,而後將當前註冊的參與者註冊到下一階段中,不然的話進入等待狀態直到Phaser實例進入到下一階段,internalAwaitAdvance方法能夠看下面的介紹
if (unarrived == 0) // wait out advance
//使用根root Phaser實例調用下面介紹的internalAwaitAdvance方法,傳入當前Phaser實例階段和節點爲null
root.internalAwaitAdvance(phase, null);
//若是當前Phaser實例的unarrived不等於0,使用UnSafe的cas將傳入進來的參與者數目加到屬性state中
else if (UNSAFE.compareAndSwapLong(this, stateOffset,
s, s + adjust))
//退出當前循環
break;
}
}
//若是counts等於EMPTY,而且當前Phaser實例沒有父Phaser,代表還沒註冊參與者,當前註冊的參與者是第一批
else if (parent == null) { // 1st root registration
//使用phase(階段值)移位32位或(|)上adjust(高16位存放parties值,低16位存放unarrived值)的調整值
long next = ((long)phase << PHASE_SHIFT) | adjust;
//使用UnSafe的cas將屬性state值更新成新的值next
if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next))
//退出循環
break;
}
//若是counts等於EMPTY,而且當前Phaser實例有父Phaser,代表還沒註冊參與者,當前註冊的參與者是第一批,當前Phaser實例作爲父Phaser實例的一個parties
else {
//加鎖,由於存在併發操做,當前Phaser實例作爲鎖對象
synchronized (this) { // 1st sub registration
//可能多個線程同時進入到else中,爲此須要從新檢查下屬性state值,由於當前Phaser實例只能被當作一次父Phaser的一個parties參與者
if (state == s) { // recheck under lock
//將當前Phaser實例作爲父Phaser實例的一個parties參與者
phase = parent.doRegister(1);
//若是父Phaser的階段值phase小於0,直接退出循環
if (phase < 0)
//退出循環
break;
// finish registration whenever parent registration
// succeeded, even when racing with termination,
// since these are part of the same "transaction".
//循環直到cas將註冊進來的參與者註冊到當前Phaser實例
while (!UNSAFE.compareAndSwapLong
(this, stateOffset, s,
((long)phase << PHASE_SHIFT) | adjust)) {
//使用CAS更新當前Phaser實例的屬性state爲新的值失敗,從新獲取新的屬性state值
s = state;
//從新從屬性state值中獲取到phase值,從新循環
phase = (int)(root.state >>> PHASE_SHIFT);
// assert (int)s == EMPTY;
}
//退出循環
break;
}
}
}
}
//返回當前Phaser實例的phase階段值
return phase;
}
//調整當前Phaser實例的屬性state
private long reconcileState() {
//獲取當前實例Phaser的根root Phaser實例
final Phaser root = this.root;
//獲取當前實例Phaser的屬性state值
long s = state;
//若是當前實例Phaser有層級,即root Phaser不是自身實例
if (root != this) {
int phase, p;
// CAS to root phase with current parties, tripping unarrived
//若是當前Phaser實例的階段值phase和根root phase的階段值不一致,使用CAS將當前Phaser實例屬性state替換成新的state值,即當前Phaser實例phase階段值和root phaser的階段值一致
while ((phase = (int)(root.state >>> PHASE_SHIFT)) !=
(int)(s >>> PHASE_SHIFT) &&
!UNSAFE.compareAndSwapLong
(this, stateOffset, s,
s = (((long)phase << PHASE_SHIFT) |
((phase < 0) ? (s & COUNTS_MASK) :
(((p = (int)s >>> PARTIES_SHIFT) == 0) ? EMPTY :
((s & PARTIES_MASK) | p))))))
//從新獲取當前Phaser實例的屬性state值
s = state;
}
//返回調整完的當前Phaser實例的屬性state值
return s;
}
//傳入進來要註冊參與者數目大於Phaser實例容許註冊的參與者數目
private String badRegister(long s) {
return "Attempt to register more than " +
MAX_PARTIES + " parties for " + stateToString(s);
}
/** * 在Phaser實例運行期間批量註冊參與者 */
public int bulkRegister(int parties) {
//若是批量註冊參與者數目小於0,拋出IllegalArgumentException異常
if (parties < 0)
throw new IllegalArgumentException();
//若是批量註冊參與者數目等於0
if (parties == 0)
//返回根root Phaser實例的階段值,即根root Phaser實例的phase值
return getPhase();
//調用上面介紹的doRegister方法註冊批量參與者到Phaser實例中
return doRegister(parties);
}
/** * 獲取Phaser實例的階段值,若是不支持層級root就是自身Phaser實例 */
public final int getPhase() {
//root Phaser實例state屬性高32位的低31位存放phase,爲此須要右移32位獲得phase值
return (int)(root.state >>> PHASE_SHIFT);
}
複製代碼
/** * 一個參與者到達階段屏障 */
public int arrive() {
//調用下面介紹的doArrive方法,將屬性state中的unarrived作減1操做,若是是最後一個到達還需喚醒其餘等待其餘參與者都到達階段屏障的參與者線程
return doArrive(ONE_ARRIVAL);
}
//傳入adjust調整值,多是ONE_ARRIVAL或者ONE_DEREGISTER
private int doArrive(int adjust) {
//獲取根root Phaser實例
final Phaser root = this.root;
//循環,直到屬性state中的unarrived值作減1操做成功,若是adjust是ONE_DEREGISTER,直到屬性state中的unarrived和parties都作減1操做成功
for (;;) {
//若是根root Phaser實例就是自身Phaser實例,即當前Phaser實例沒有父Phaser,獲取當前Phaser實例的屬性state值(terminated、phase、parties、unarrived),不然調用reconcileState()方法,reconcileState是調整當前Phaser實例的屬性state中的phase值與root的一致
long s = (root == this) ? state : reconcileState();
//屬性state高32位的低31位存放phase(Phaser實例的階段值),屬性state值無符號右移32位獲取到phase值
int phase = (int)(s >>> PHASE_SHIFT);
//若是phase值小於0,表示屬性state的最高位爲1,屬性state最高位存放terminated,即Phaser是否關閉,若是terminated爲1,Phaser關閉
if (phase < 0)
//若是Phaser關閉,直接返回phase
return phase;
//獲取state屬性值的低32位,低32位的高16位存放parties值,低16位存放unarrived值
int counts = (int)s;
//若是counts(高16位存放parties值,低16位存放unarrived值)等於EMPTY,即當前Phaser實例還未註冊參與者,不然的話counts & UNARRIVED_MASK獲取到低16位存放unarrived值
int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
//若是unarrived小於等於0,代表還沒註冊一個參與者,就有參與者到達階段屏障,拋出IllegalStateException異常,badArrive看下面介紹
if (unarrived <= 0)
throw new IllegalStateException(badArrive(s));
//使用CAS更新當前Phaser實例的屬性state值,即屬性state值更改成減去調整值的新值
if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adjust)) {
//若是當前參與者是最後一個到達階段屏障
if (unarrived == 1) {
//獲取到屬性state的parties值,作爲unarrived的值
long n = s & PARTIES_MASK; // base of next state
//將parties值左移16位作爲下一階段的unarrived的值
int nextUnarrived = (int)n >>> PARTIES_SHIFT;
//若是當前Phaser實例沒有層級,即根root Phaser實例爲自身Phaser實例
if (root == this) {
//調用onAdvance方法,判斷當前Phaser實例是否須要關閉,即當前Phaser實例階段是否都執行完,這個onAdvance方法應該被子類重寫,內部能夠自定義各個不一樣階段執行的方法,好比switch{0:階段0,1:階段1,2:階段2},若是返回true代表當前Phaser實例須要關閉
if (onAdvance(phase, nextUnarrived))
//若是onAdvance返回true將屬性state值的最高位置爲1
n |= TERMINATION_BIT;
else if (nextUnarrived == 0)
//若是下一階段的unarrived的值爲0,將屬性state值的低32位置爲EMPTY
n |= EMPTY;
else
//不然的話,將屬性state值的低16位unarrived置爲nextUnarrived
n |= nextUnarrived;
//將phase階段值加1作爲下一階段值
int nextPhase = (phase + 1) & MAX_PHASE;
//從新計算屬性state值,將下一階段值左移32位,或(|)上低32位的parties值和unarrived值
n |= (long)nextPhase << PHASE_SHIFT;
//使用UnSafe的cas更新當前Phaser實例的屬性state
UNSAFE.compareAndSwapLong(this, stateOffset, s, n);
//喚醒全部其餘等待其餘參與者都到達階段屏障的參與者線程,releaseWaiters方法看下面介紹
releaseWaiters(phase);
}
//當前Phaser實例有層級,即有父Phaser,若是nextUnarrived等於0,即當前Phaser實例沒有參與者
else if (nextUnarrived == 0) { // propagate deregistration
//因爲當前Phaser實例作爲父Phaser實例的一個parties參與者,當前Phaser實例沒有參與者,需將當前Phaser實例作爲參與者從父Phaser實例中註銷掉
phase = parent.doArrive(ONE_DEREGISTER);
//使用UnSafe的cas將屬性state值的低32位置爲EMPTY
UNSAFE.compareAndSwapLong(this, stateOffset,
s, s | EMPTY);
}
//當前Phaser實例有層級,即有父Phaser,而且nextUnarrived不等於0,即當前Phaser實例有參與者,因爲當前Phaser實例作爲父Phaser實例的一個parties參與者,爲此當前Phaser實例的一個階段屏障參與者都到達,當前Phaser實例作爲父Phaser實例的一個參與者到達
else
//當前Phaser實例作爲父Phaser的一個參與者到達
phase = parent.doArrive(ONE_ARRIVAL);
}
//返回當前所處的階段值
return phase;
}
}
}
//若是unarrived小於等於0,代表還沒註冊一個參與者,就有參與者到達階段屏障
private String badArrive(long s) {
return "Attempted arrival of unregistered party for " +
stateToString(s);
}
//控制Phaser實例是否繼續執行下一階段,能夠自定義子類繼承Phaser控制Phaser須要執行幾個階段,不然registeredParties不等於0,Phaser會一直執行下去
protected boolean onAdvance(int phase, int registeredParties) {
//若是registeredParties等於0,Phaser實例須要關閉
return registeredParties == 0;
}
/** * 喚醒全部其餘等待其餘參與者都到達階段屏障的節點(參與者)線程 * * @param phase Phaser實例階段值 */
private void releaseWaiters(int phase) {
//等待節點鏈表的第一個節點元素
QNode q; // first element of queue
//節點對應的線程
Thread t; // its thread
//根據傳入進來的階段值獲取操做的是偶鏈表仍是奇鏈表
AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
//循環的獲取頭節點,頭節點不爲空而且節點的階段值和根的階段值不相等,代表全部的參與者都到達傳入進來的階段屏障
while ((q = head.get()) != null &&
q.phase != (int)(root.state >>> PHASE_SHIFT)) {
//CAS將頭節點設置爲頭節點的下一節點,若是頭節點設置爲下一節點成功,而且頭節點的線程不爲空
if (head.compareAndSet(q, q.next) &&
(t = q.thread) != null) {
//將頭節點的線程置爲空
q.thread = null;
//喚醒頭節點線程,從新循環,直到鏈表爲空或節點的階段值和根的階段值相等
LockSupport.unpark(t);
}
}
}
/** * 一個參與者到達階段屏障而且從屏障中註銷移除,屬性state中的parties和unarrived值都須要減1,若是是最後一個到達還需喚醒其餘等待其餘參與者都到達階段屏障的參與者線程,若是一個參與者從屏障移除,而且屏障沒有其餘參與者即屬性state值的parties等於0,將屬性state的低32位置成EMPTY */
public int arriveAndDeregister() {
//調用上面介紹的doArrive方法,將屬性state中的unarrived作減1操做,若是是最後一個到達還需喚醒其餘等待其餘參與者都到達階段屏障的參與者線程,而且屏障沒有其餘參與者即屬性state值的parties等於0,將屬性state的低32位置成EMPTY
return doArrive(ONE_DEREGISTER);
}
複製代碼
/** * 一個參與者到達階段屏障而且等待其餘參與者到達階段屏障 */
public int arriveAndAwaitAdvance() {
//獲取根root Phaser實例
final Phaser root = this.root;
//循環直到其餘參與者都到達階段屏障
for (;;) {
//若是根root Phaser實例就是自身Phaser實例,即當前Phaser實例不支持層級沒有父Phaser,獲取當前Phaser實例的屬性state值(terminated、phase、parties、unarrived),不然調用reconcileState()方法,reconcileState是調整當前Phaser實例的屬性state中的phase值與root的一致
long s = (root == this) ? state : reconcileState();
//屬性state高32位的低31位存放phase(Phaser實例的階段值),屬性state值無符號右移32位獲取到phase值
int phase = (int)(s >>> PHASE_SHIFT);
//若是phase值小於0,表示屬性state的最高位爲1,屬性state最高位存放terminated,即Phaser是否關閉,若是terminated爲1,Phaser關閉
if (phase < 0)
//若是Phaser關閉,直接返回phase
return phase;
//獲取state屬性值的低32位,低32位的高16位存放parties值,低16位存放unarrived值
int counts = (int)s;
//若是counts(高16位存放parties值,低16位存放unarrived值)等於EMPTY,即當前Phaser實例還未註冊參與者,不然的話counts & UNARRIVED_MASK獲取到低16位存放unarrived值
int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
//若是unarrived小於等於0,代表還沒註冊一個參與者,就有參與者到達階段屏障,拋出IllegalStateException異常,badArrive看上面介紹
if (unarrived <= 0)
throw new IllegalStateException(badArrive(s));
//使用UnSafe的cas將屬性state更新成新的值,即屬性state中的unarrived值-1,若是更新失敗從新循環
if (UNSAFE.compareAndSwapLong(this, stateOffset, s,
s -= ONE_ARRIVAL)) {
//若是unarrived大於1,代表還有其餘參與者還未到達階段屏障
if (unarrived > 1)
//根root Phser實例(若是沒有層級root就是自身Phaser實例)調用下面的internalAwaitAdvance方法,讓當前線程先自旋一段時間,若是自旋一段時間其他參與者還未到達階段屏障,封裝成節點,加入到等待鏈表中,等待被喚醒,能夠看下面對此方法的介紹
return root.internalAwaitAdvance(phase, null);
//若是根root Phaser不是自身Phaser實例,代表當前Phaser實例支持層級,當前Phaser實例作爲父Phaser實例的一個參與者註冊進去,爲此當前Phaser實例(每一個子的內部也有多個parties線程,也有多階段))的一個階段執行完成,作爲父Phaser的一個參與者,代表當前參與者到達父的一個階段屏障
if (root != this)
//父Phaser把每一個子的Phaser(每一個子的內部也有多個parties線程,也有多階段)當作父Phaser的一個parties參與者,爲此當子Phaser實例的一個階段執行完成代表子Phaser實例參與者也到達父Phaser實例的一個階段
return parent.arriveAndAwaitAdvance();
//獲取到屬性state的parties值,作爲unarrived的值
long n = s & PARTIES_MASK; // base of next state
//將parties值左移16位作爲下一階段的unarrived的值
int nextUnarrived = (int)n >>> PARTIES_SHIFT;
//調用onAdvance方法,判斷當前Phaser實例是否須要關閉,即當前Phaser實例階段是否都執行完,這個onAdvance方法應該被子類重寫,內部能夠自定義各個不一樣階段執行的方法,好比switch{0:階段0,1:階段1,2:階段2},若是返回true代表當前Phaser實例須要關閉
if (onAdvance(phase, nextUnarrived))
//若是onAdvance返回true將屬性state值的最高位置爲1,關閉Phaser實例
n |= TERMINATION_BIT;
else if (nextUnarrived == 0)
//若是下一階段的unarrived的值爲0,將屬性state值的低32位置爲EMPTY
n |= EMPTY;
else
//不然的話,將屬性state值的低16位unarrived置爲nextUnarrived
n |= nextUnarrived;
//將phase階段值加1作爲下一階段值
int nextPhase = (phase + 1) & MAX_PHASE;
//從新計算屬性state值,將下一階段值左移32位,或(|)上低32位的parties值和unarrived值
n |= (long)nextPhase << PHASE_SHIFT;
//使用UnSafe的cas更新當前Phaser實例的屬性state,若是更新失敗,代表有其餘線程調用關閉Phaser實例,由於CAS更新屬性state值只有最後一個參與者到達階段屏障纔會執行
if (!UNSAFE.compareAndSwapLong(this, stateOffset, s, n))
return (int)(state >>> PHASE_SHIFT); // terminated
//喚醒全部其餘等待其餘參與者都到達階段屏障的參與者線程,releaseWaiters方法看上面介紹
releaseWaiters(phase);
//返回下一階段值
return nextPhase;
}
}
}
//讓當前線程先自旋一段時間,若是自旋一段時間其他參與者還未到達階段屏障,封裝成節點,加入到等待鏈表中,等待被喚醒
private int internalAwaitAdvance(int phase, QNode node) {
// assert root == this;
////喚醒全部其餘等待其餘參與者都到達傳入階段的上一階段屏障的參與者線程
releaseWaiters(phase-1); // ensure old queue clean
//節點是否加入到鏈表的標誌位,true爲節點已加入到鏈表中
boolean queued = false; // true when node is enqueued
//記錄最近的unarrived值,即最近還有幾個參與者還未到達階段屏障值
int lastUnarrived = 0; // to increase spins upon change
//線程自旋的次數
int spins = SPINS_PER_ARRIVAL;
long s;
int p;
//循環,直到傳入進來的階段值和當前Phaser實例的階段值不相等,代表最後一個參與者已經到達階段屏障,其他等待參與者線程需被喚醒
while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) {
//若是傳入的節點爲空
if (node == null) { // spinning in noninterruptible mode
//從屬性state中獲取到低16位的unarrived值
int unarrived = (int)s & UNARRIVED_MASK;
//若是unarrived值和lastUnarrived值不相等,代表又有參與者到達階段屏障
if (unarrived != lastUnarrived &&
//若是unarrived小於NCPU,即還未到達的階段屏障的參與者數目小於可用的處理器個數
(lastUnarrived = unarrived) < NCPU)
//把自旋值再加上SPINS_PER_ARRIVAL
spins += SPINS_PER_ARRIVAL;
//獲取當前線程是否被中斷
boolean interrupted = Thread.interrupted();
//若是線程被中斷,或者已經自旋完
if (interrupted || --spins < 0) { // need node to record intr
//建立新的節點,QNode能夠看上面的介紹
node = new QNode(this, phase, false, false, 0L);
//將線程是否被中斷標誌位賦值給節點
node.wasInterrupted = interrupted;
}
}
//判斷節點是否被釋放,如節點線程被中斷,或者等待其餘參與者到達超時,節點的isReleasable方法能夠看上面介紹
else if (node.isReleasable()) // done or aborted
//若是節點被釋放,退出循環
break;
//若是節點還未加入到鏈表中,將新建節點加入到鏈表中
else if (!queued) { // push onto queue
//根據傳入進來的階段值獲取操做的是偶鏈表仍是奇鏈表
AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
//獲取頭節點,作爲新建節點的下一節點,新建節點作爲鏈表新的頭節點
QNode q = node.next = head.get();
//若是頭節點爲空空鏈表,或者節點的階段值和傳入進來的階段值相等,而且傳入進來的階段值和當前Phaser實例的階段值相等
if ((q == null || q.phase == phase) &&
(int)(state >>> PHASE_SHIFT) == phase) // avoid stale enq
//使用CAS將新建節點作爲頭節點
queued = head.compareAndSet(q, node);
}
else {
try {
//使用ForkJoinPool的managedBlock方法讓節點node對應的線程進入等待狀態
ForkJoinPool.managedBlock(node);
} catch (InterruptedException ie) {
//節點線程在等待其他參與者到達階段屏障的過程當中被其餘線程中斷,將節點標識線程是否被中斷wasInterrupted 置爲true
node.wasInterrupted = true;
}
}
}
//若是節點不爲空
if (node != null) {
//若是節點線程不爲空,代表線程不是被正常喚醒,有多是被中斷喚醒
if (node.thread != null)
//將當前節點線程置爲空
node.thread = null; // avoid need for unpark()
//節點線程在等待其他參與者到達階段屏障的過程當中被其餘線程中斷,而且節點不支持線程中斷,因爲線程在被其餘線程中斷喚醒,拋出InterruptedException異常,線程的中斷標誌位被重置
if (node.wasInterrupted && !node.interruptible)
//線程在被其餘線程中斷喚醒,拋出InterruptedException異常,線程的中斷標誌位被重置,爲此當前線程須要再次調用interrupt()保留中斷標誌位
Thread.currentThread().interrupt();
//若是線程不是被最後一個參與者到達階段屏障喚醒,即當前Phaser實例的階段值和傳入的階段值相等
if (p == phase && (p = (int)(state >>> PHASE_SHIFT)) == phase)
//將喚醒和當前Phaser實例階段值不相等的節點,abortWait看下面方法的介紹
return abortWait(phase); // possibly clean up on abort
}
//喚醒全部其餘等待其餘參與者都到達階段屏障的參與者線程,releaseWaiters方法看上面介紹
releaseWaiters(phase);
//返回傳入的階段值
return p;
}
//ForkJoinPool的managedBlock方法,阻塞線程,直到線程被喚醒
public static void managedBlock(ManagedBlocker blocker) throws InterruptedException {
ForkJoinPool p;
ForkJoinWorkerThread wt;
Thread t = Thread.currentThread();
//if這部分等分析ForkJoinPool再解析
if ((t instanceof ForkJoinWorkerThread) &&
(p = (wt = (ForkJoinWorkerThread)t).pool) != null) {
WorkQueue w = wt.workQueue;
while (!blocker.isReleasable()) {
if (p.tryCompensate(w)) {
try {
do {} while (!blocker.isReleasable() &&
!blocker.block());
} finally {
U.getAndAddLong(p, CTL, AC_UNIT);
}
break;
}
}
}
else {
//循環調用節點QNode的isReleasable判斷節點是否被釋放,QNode的block方法讓線程進入等待狀態,不清楚的能夠看QNode內部的這兩個方法介紹
do {} while (!blocker.isReleasable() &&
!blocker.block());
}
}
//傳入階段值,喚醒和當前Phaser實例階段值不相等的節點
private int abortWait(int phase) {
//根據傳入進來的階段值獲取操做的是偶鏈表仍是奇鏈表
AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
//循環,直到喚醒全部等待在傳入進來的phase階段值的節點線程
for (;;) {
Thread t;
//獲取鏈表頭節點
QNode q = head.get();
//root Phaser實例state屬性高32位的低31位存放phase,爲此須要右移32位獲得phase值
int p = (int)(root.state >>> PHASE_SHIFT);
//若是鏈表中沒有等待節點,或者頭節點線程不爲空和節點階段值和當前Phaser實例階段值相等,直接返回+
if (q == null || ((t = q.thread) != null && q.phase == p))
return p;
if (head.compareAndSet(q, q.next) && t != null) {
q.thread = null;
LockSupport.unpark(t);
}
}
}
public int awaitAdvance(int phase) {
final Phaser root = this.root;
long s = (root == this) ? state : reconcileState();
int p = (int)(s >>> PHASE_SHIFT);
if (phase < 0)
return phase;
if (p == phase)
return root.internalAwaitAdvance(phase, null);
return p;
}
public int awaitAdvanceInterruptibly(int phase) throws InterruptedException {
final Phaser root = this.root;
long s = (root == this) ? state : reconcileState();
int p = (int)(s >>> PHASE_SHIFT);
if (phase < 0)
return phase;
if (p == phase) {
QNode node = new QNode(this, phase, true, false, 0L);
p = root.internalAwaitAdvance(phase, node);
if (node.wasInterrupted)
throw new InterruptedException();
}
return p;
}
public int awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException {
long nanos = unit.toNanos(timeout);
final Phaser root = this.root;
long s = (root == this) ? state : reconcileState();
int p = (int)(s >>> PHASE_SHIFT);
if (phase < 0)
return phase;
if (p == phase) {
QNode node = new QNode(this, phase, true, true, nanos);
p = root.internalAwaitAdvance(phase, node);
if (node.wasInterrupted)
throw new InterruptedException();
else if (p == phase)
throw new TimeoutException();
}
return p;
}複製代碼
public void forceTermination() {
// Only need to change root state
final Phaser root = this.root;
long s;
while ((s = root.state) >= 0) {
if (UNSAFE.compareAndSwapLong(root, stateOffset,
s, s | TERMINATION_BIT)) {
// signal all threads
releaseWaiters(0); // Waiters on evenQ
releaseWaiters(1); // Waiters on oddQ
return;
}
}
}複製代碼
public final int getPhase() {
return (int)(root.state >>> PHASE_SHIFT);
}
public int getRegisteredParties() {
return partiesOf(state);
}
public int getArrivedParties() {
return arrivedOf(reconcileState());
}
public int getUnarrivedParties() {
return unarrivedOf(reconcileState());
}
public Phaser getParent() {
return parent;
}
public Phaser getRoot() {
return root;
}
public boolean isTerminated() {
return root.state < 0L;
}
public String toString() {
return stateToString(reconcileState());
}複製代碼