併發編程之Phaser原理與應用

點贊再看,養成習慣,公衆號搜一搜【一角錢技術】關注更多原創技術文章。本文 GitHub org_hejianhui/JavaStudy 已收錄,有個人系列文章。java

前言

JDK5中引入了CyclicBarrier和CountDownLatch這兩個併發控制類,而JDK7中引入的Phaser按照官方的說法是提供了一個功能相似可是更加靈活的實現。接下來咱們帶着幾個問題來研究一下Phaser與(CountDownLath、CyclicBarrier)到底有哪些相似,同時帶來了哪些靈活性?node

  1. Phaser 是什麼?
  2. Phaser 具備哪些特性?
  3. Phaser相對於 CyclicBarrier 和 CountDownLatch的優點?

CyclicBarrier和CountDownLatch

CyclicBarrier介紹

在使用CyclicBarrier時,須要建立一個CyclicBarrier對象,構造函數須要一個整數做爲參數,這個參數是一個「目標」,在CyclicBarrier對象建立後,內部會有一個計數器,初始值爲0,CyclicBarrier對象的await方法每被調用一次,這個計數器就會加1,一旦這個計數器的值達到設定的「目標」,全部被CyclicBarrier.await阻塞住的線程都會繼續執行。這個目標是固定的,一旦設定便不能修改。git

舉一個例子,假設有5我的爬香山,他們要爬到山頂,等到5我的到齊了在同時出發下山,那麼咱們要在山頂設定一個「目標」,同時還有一個計數器,這個目標就是5,每個人到山頂後,這我的就要等待,同時計數器加1,等到5我的到齊了,也就是計數器達到了這個「目標」,全部等待的人就開始下山了。 更多內容請閱讀《併發編程之CyclicBarrier原理與使用》github

CountDownLathch介紹

使用CountDownLatch時,須要建立一個CountDownLatch對象,構造函數也須要一個整數做爲參數,能夠把這個參數想象成一個倒計時器,CountDownLatch對象自己是一個發令槍,全部調用CountDownLatch.await方法的線程都會等待發令槍的指令,一旦倒計時器爲0,這些線程同時開始執行,而CountDownLatch.countDown方法就是爲倒計時器減1。web

更多內容請閱讀《併發編程之CountDownLatch原理與使用》編程

對比分析

CyclicBarrier和CountDownLatch的共同點都是有一個目標和一個計數器,等到計數器達到目標後,全部阻塞的線程都將繼續執行。它們的不一樣點是CyclicBarrier.await在等待的同時還修改計數器,而CountDownLatch.await只負責等待,CountDownLatch.countDown才修改計數器。markdown

CountDownLatch和CyclicBarrier都可以實現線程之間的等待,只不過它們側重點不一樣:併發

  • CountDownLatch通常用於一個或多個線程,等待其餘線程執行完任務後,再才執行;
  • CyclicBarrier通常用於一組線程互相等待至某個狀態,而後這一組線程再同時執行;
  • CountDownLatch 是一次性的,CyclicBarrier 是可循環利用的;
  • CountDownLathch是一個計數器,線程完成一個記錄一個,計數器遞減,只能用一次。以下圖:

  • CyclicBarrier的計數器更像一個閥門,須要全部線程都到達,而後繼續執行,計數器遞減,提供reset功能,能夠屢次使用。以下圖:

Phaser是什麼?

Phaser,翻譯爲移相器(階段),它適用於這樣一種場景,一個大任務能夠分爲多個階段完成,且每一個階段的任務能夠多個線程併發執行,可是必須上一個階段的任務都完成了才能夠執行下一個階段的任務。ide

這種場景雖然使用CyclicBarrier 或者 CountDownLatch 也能夠實現,可是要複雜的多,首先,具體須要多少個階段是可能變的,其次,每一個階段的任務數也可能會變的。相比於CyclicBarrier 和 CountDownLath ,Phaser更加靈活更加方便。svg

Phaser使用方法

Phaser同時包含CyclicBarrier和CountDownLatch兩個類的功能。

  • Phaser的arrive方法將將計數器加1,awaitAdvance將線程阻塞,直到計數器達到目標,這兩個方法與CountDownLatch的countDown和await方法相對應;
  • Phaser的arriveAndAwaitAdvance方法將計數器加1的同時將線程阻塞,直到計數器達到目標後繼續執行,這個方法對應CyclicBarrier的await方法。

除了包含以上兩個類的功能外,Phaser還提供了更大的靈活性。CyclicBarrier和CountdownLatch在構造函數指定目標後就沒法修改,而Phaser提供了register和deregister方法能夠對目標進行動態修改。

下面看一個最簡單的使用案例:

package com.niuh.tools;

import java.util.concurrent.Phaser;

/** * <p> * Phaser示例 * </p> */
public class PhaserRunner {
    // 定義每一個階段須要執行3個小任務
    public static final int PARTIES = 3;
    // 定義須要4個階段完成的大任務
    public static final int PHASES = 4;

    public static void main(String[] args) {

        Phaser phaser = new Phaser(PARTIES) {
            @Override
            protected boolean onAdvance(int phase, int registeredParties) {
                System.out.println("=======phase: " + phase + " finished=============");
                return super.onAdvance(phase, registeredParties);
            }
        };

        for (int i = 0; i < PARTIES; i++) {
            new Thread(() -> {
                for (int j = 0; j < PHASES; j++) {
                    System.out.println(String.format("%s: phase: %d", Thread.currentThread().getName(), j));
                    phaser.arriveAndAwaitAdvance();
                }
            }, "Thread " + i).start();
        }
    }
}
複製代碼

這裏咱們定義個須要4個階段完成的大任務,每一個階段須要3個小任務,針對這些小任務,咱們分別起3個線程來執行這些小任務,查看輸出結果爲:

Thread 2: phase: 0
Thread 0: phase: 0
Thread 1: phase: 0
=======phase: 0 finished=============
Thread 2: phase: 1
Thread 1: phase: 1
Thread 0: phase: 1
=======phase: 1 finished=============
Thread 1: phase: 2
Thread 2: phase: 2
Thread 0: phase: 2
=======phase: 2 finished=============
Thread 1: phase: 3
Thread 0: phase: 3
Thread 2: phase: 3
=======phase: 3 finished=============
複製代碼

能夠看到,每一個階段都是三個線程都完成來才進入下一個階段。這是怎麼實現的呢?

Phaser原理猜想

結合AQS的原理,大概猜想一下Phaser的實現原理:

  • 首先,須要存儲當前階段phase、當前階段的任務數(參與者)parties、未完成參與者的數量,這三個變量咱們能夠放在一個變量state中存儲。
  • 其次,須要一個隊列存儲先完成的參與者,當最後一個參與者完成任務時,須要喚醒隊列中的參與者。

結合上面的案例帶入:初始時當前階段爲0,參與者爲3個,未完成參與者數爲3;

  • 第一個線程執行到 phaser.arriveAndAwaitAdvance(); 時進入隊列;
  • 第二個線程執行到 phaser.arriveAndAwaitadvance(); 時進入隊列;
  • 第三個線程執行到 phaser.arriveAndAwaitadvance(); 時先執行這個階段的總結 onAdvance(), 再喚醒簽名兩個線程繼續執行下一個階段的任務。

基於這樣的一個思路,總體能說的通,至因而不是這樣?讓咱們一塊兒來看源碼吧。

Phaser源碼分析

主要API

  1. register(),增長一個參與者,須要同時增長parties和unarrived兩個數值,也就是state中的16位和低16位
  2. onAdvance(int phase, int registeredParties),當前階段全部線程完成時,會調用OnAdvance()
  3. bulkRegister(int parties),指定參與者數目註冊到Phaser中,同時增長parties和unarrived兩個數值
  4. arrive(),做用使parties值加1,而且不在屏障處等待,直接運行下面的代碼
  5. awaitAdvance(int phase),若是傳入的參數與當前階段一致,這個方法會將當前線程置於休眠,直到這個階段的參與者都完成運行。若是傳入的階段參數與當前階段不一致,當即返回
  6. arriveAndAwaitAdvance(),當前線程當前階段執行完畢,等待其它線程完成當前階段
  7. arriveAndDeregister(),當一個線程調用來此方法時,parties將減1,而且通知這個線程已經完成來當前預警,不會參加到下一個階段中,所以Phaser對象在開始下一個階段時不會等待這個線程。
  8. awaitAdvanceInterruptibly(int phase),這個方法跟awaitAdvance(int phase)同樣,不一樣之處是,若是這個方法中休眠的線程被中斷,它將拋出InterruptedException異常。
  9. getPhase(),當前階段
  10. getRegisteredParties(),總數
  11. getArrivedParties(),到達總數
  12. getUnarrivedParties(),未到達總數

內部類QNode

QNode用來跟蹤當前線程的信息的。QNode被組織成單向鏈表的形式。用來管理是否阻塞或者被中斷。

QNode繼承自ForkJoinPool.ManagedBlocker。ForkJoinPool來管理是否阻塞和中斷狀態。這裏只須要重寫isReleasableblock

  • isReleaseable用於判斷是否釋放當前節點。
  • block用於阻塞。
static final class QNode implements ForkJoinPool.ManagedBlocker {
        final Phaser phaser;
        final int phase;
        final boolean interruptible;
        final boolean timed;
        boolean wasInterrupted;
        long nanos;
        final long deadline;
        volatile Thread thread; // nulled to cancel wait
        QNode next;

        QNode(Phaser phaser, int phase, boolean interruptible,
              boolean timed, long nanos) {
            this.phaser = phaser;
            this.phase = phase;
            this.interruptible = interruptible;
            this.nanos = nanos;
            this.timed = timed;
            this.deadline = timed ? System.nanoTime() + nanos : 0L;
            thread = Thread.currentThread();
        }

        public boolean isReleasable() {
            if (thread == null)
                return true;
            if (phaser.getPhase() != phase) {
                thread = null;
                return true;
            }
            if (Thread.interrupted())
                wasInterrupted = true;
            if (wasInterrupted && interruptible) {
                thread = null;
                return true;
            }
            if (timed) {
                if (nanos > 0L) {
                    nanos = deadline - System.nanoTime();
                }
                if (nanos <= 0L) {
                    thread = null;
                    return true;
                }
            }
            return false;
        }

        public boolean block() {
            if (isReleasable())
                return true;
            else if (!timed)
                LockSupport.park(this);
            else if (nanos > 0L)
                LockSupport.parkNanos(this, nanos);
            return isReleasable();
        }
    }
複製代碼

總體代碼比較簡單。要注意的是在isReleasable中使用了thread=null來使得避免解鎖任務。使用方法相似於internalAwaitAdvance中的用法。先完成的參與者放入隊列中的節點,這裏咱們只須要關注 threadnext 兩個屬性便可,很明顯這是一個單鏈表,存儲這入隊的線程。

主要屬性

/* * unarrived -- 尚未抵達屏障的參與者的個數 (bits 0-15) * parties -- 須要等待的參與者的個數 (bits 16-31) * phase -- 屏障所處的階段 (bits 32-62) * terminated -- 屏障的結束標記 (bit 63 / sign) */
// 狀態變量,用於存儲當前階段phase、參與者數parties、未完成的參與者數unarrived_count 
private volatile long state;
// 最多能夠有多少個參與者,即每一個階段最多有多少個任務
private static final int  MAX_PARTIES     = 0xffff;
// 最多能夠有多少階段
private static final int  MAX_PHASE       = Integer.MAX_VALUE;
// 參與者數量的偏移量
private static final int  PARTIES_SHIFT   = 16;
// 當前階段的偏移量
private static final int  PHASE_SHIFT     = 32;
// 未完成的參與者數的掩碼,低16位
private static final int  UNARRIVED_MASK  = 0xffff;      // to mask ints
// 參與者數,中間16位
private static final long PARTIES_MASK    = 0xffff0000L; // to mask longs
// counts的掩碼,counts等於參與者數和未完成的參與者數的 '|' 操做
private static final long COUNTS_MASK     = 0xffffffffL;
private static final long TERMINATION_BIT = 1L << 63;

// 一次一個參與者完成
private static final int  ONE_ARRIVAL     = 1;
// 增長減小參與者時使用
private static final int  ONE_PARTY       = 1 << PARTIES_SHIFT;
// 減小參與者時使用
private static final int  ONE_DEREGISTER  = ONE_ARRIVAL|ONE_PARTY;
// 沒有參與者使用
private static final int  EMPTY           = 1;
// 用於求未完成參與者數量
private static int unarrivedOf(long s) {
	int counts = (int)s;
    return (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
}
// 用於求參與者數量(中間16位),注意int的爲止
private static int partiesOf(long s) {
	return (int)s >>> PARTIES_SHIFT;
}
// 用於求階段數(高32位),注意int的位置
private static int phaseOf(long s) {
	return (int)(s >>> PHASE_SHIFT);
}
// 已完成參與者數量
private static int arrivedOf(long s) {
	int counts = (int)s;
	return (counts == EMPTY) ? 0 :
		(counts >>> PARTIES_SHIFT) - (counts & UNARRIVED_MASK);
}

/** * The parent of this phaser, or null if none */
private final Phaser parent;

/** * The root of phaser tree. Equals this if not in a tree. */
private final Phaser root;

// 用於存儲已經=完成參與者所在的線程,根據當前階段的奇偶性選擇不一樣的隊列
private final AtomicReference<QNode> evenQ;
private final AtomicReference<QNode> oddQ;
複製代碼

主要屬性位 stateevenQoddQ

  • state,volatile的long來表示狀態變量,高32位存儲當前階段phase,中間16位存儲參與者的數量,低16位存儲未完成參與者的數量。

  • unarrived -- 尚未抵達屏障的參與者的個數 (bits 0-15)
  • parties -- 須要等待的參與者的個數 (bits 16-31)
  • phase -- 屏障所處的階段 (bits 32-62)
  • terminated -- 屏障的結束標記 (bit 63 / sign)

若是是空狀態,也就是沒有子階段註冊的初始階段。這裏用一個EMPTY狀態表示,也就是0個子階段和一個未到達階段。

全部的狀態變化都是經過CAS操做執行的,惟一例外是註冊一個子相移器(sub-Phaser),用於構成樹的,也就是Phaser的父Phaser非空。這個子相移器的分階段是經過一個內置鎖來設置。

  • evenQ 和 oddQ,是根據phaser的奇偶狀態來設置的,用來存儲等待的線程。爲了不競爭,這裏使用了Phaser的數值奇偶來存儲,此外對於子相移器,它與其根相移器使用同一個evenQ或者oddQ,以加速釋放。

構造方法

public Phaser() {
	this(null, 0);
}

public Phaser(int parties) {
	this(null, parties);
}

public Phaser(Phaser parent) {
	this(parent, 0);
}

public Phaser(Phaser parent, int parties) {
	if (parties >>> PARTIES_SHIFT != 0)
		throw new IllegalArgumentException("Illegal number of parties");
    int phase = 0;
    this.parent = parent;
    if (parent != null) { // 父phaser不爲空
    	final Phaser root = parent.root;
        this.root = root; // 指向root phaser
        this.evenQ = root.evenQ; // 兩個棧,整個phaser鏈只有一份
        this.oddQ = root.oddQ;
        if (parties != 0)
        	phase = parent.doRegister(1); // 向父phaser註冊當前線程
    }
    else {
    	this.root = this; // 不然,本身是root phaser
        this.evenQ = new AtomicReference<QNode>(); // 負責建立兩個棧(QNode鏈)
        this.oddQ = new AtomicReference<QNode>();
    }
    // 狀態變量state的存儲分爲三段
    this.state = (parties == 0) ? (long)EMPTY :
            ((long)phase << PHASE_SHIFT) |
            ((long)parties << PARTIES_SHIFT) |
            ((long)parties);
}
複製代碼

構造函數中還有一個parent和root,這是用來構造多層級階段的,用於構成樹的。

重點仍是仍是看state的賦值方式,高32位存儲當前階段phase,中間16位存儲參與者的數量,低16位存儲未完成參與者的數量。

主要方法

下面咱們一塊兒來看看幾個主要方法的源碼,重點是三個private的核心方法:doArrive、doRegister、reconcileState

register方法

增長一個參與者,須要同時增長parties和unarrived兩個數值,也就是state中的16位和低16位(中間16位存儲參與者的數量,低16位存儲未完成參與者的數量)

public int register() {
	return doRegister(1);
}
複製代碼

這裏主要調用的是doRegister方法,咱們往下看。

doRegister方法

private int doRegister(int registrations) {
    // adjustment to state
    // state應該加的值,注意這裏是至關於同時增長parties和unarrived
    long adjust = ((long)registrations << PARTIES_SHIFT) | registrations; //計算出須要調整的量
    final Phaser parent = this.parent; //查看可能存在的相移器
    int phase;
    for (;;) {
        // state的值
        long s = (parent == null) ? state : reconcileState(); // reconcileState()方法是調整當前phaser的狀態與root的一致
        // state的低32未,也就是parties和unarrived的值
        int counts = (int)s;
        // parties的值
        int parties = counts >>> PARTIES_SHIFT;
        // unarrived的值
        int unarrived = counts & UNARRIVED_MASK;
        // 檢查是否溢出
        if (registrations > MAX_PARTIES - parties) //若是須要註冊的數量超過運行註冊的最大值,則拋出異常狀態異常
            throw new IllegalStateException(badRegister(s));
		// 當前階段phase
        phase = (int)(s >>> PHASE_SHIFT);
        if (phase < 0) //若是當前狀態爲終止狀態則跳出循環直接退出
            break;
        // 不是第一個參與者
        if (counts != EMPTY) {          // not 1st registration //若是當前狀態不是第一次註冊線程
            if (parent == null || reconcileState() == s) { //若是當相移器的父相移器爲空,則直接信息CAS,若是當前相移器部位空則調用reconcileState處理,這個稍後再看。reconcileState這裏主要爲了防止出現同步性錯誤。
                // unarrived等於0說明當前階段正在執行onAdvance()方法,等待其執行完畢
                if (unarrived == 0)             // wait out advance
                    root.internalAwaitAdvance(phase, null);
                // 不然就修改state的值,增長adjust,若是成功就跳出循環
                else if (UNSAFE.compareAndSwapLong(this, stateOffset,
                                                   s, s + adjust))
                    break;
            }
        }
        // 是第一個參與者,當前狀態是第一次註冊。若是若是當前相移器沒有父相移器。則直接進行CAS
        else if (parent == null) {          // 1st root registration
            // 計算state的值
            long next = ((long)phase << PHASE_SHIFT) | adjust;
            // 修改state的值,若是成功就跳出循環
            if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next))
            	break;
            }
        else { // 若是當前是第一次設置,而且該相移器被組織在一個樹中則須要考慮一下,則須要使用內置鎖來進如
            // 多層級階段的處理方式
            synchronized (this) {               // 1st sub registration
                if (state == s) {               // recheck under lock 這裏有可能發生競爭。因此這裏還須要檢查一下,若是失敗則需退出同步區從新嘗試進入。
                    phase = parent.doRegister(1); // 調用其父相移器的註冊方法
                    if (phase < 0)
                        break;
                    // finish registration whenever parent registration
                    // succeeded, even when racing with termination,
                    // since these are part of the same "transaction".
                    while (!UNSAFE.compareAndSwapLong
                           (this, stateOffset, s,
                            ((long)phase << PHASE_SHIFT) | adjust)) {
                        s = state;
                        phase = (int)(root.state >>> PHASE_SHIFT);
                        // assert (int)s == EMPTY;
                    }
                    break;
                }
            }
        }
    }
    return phase;
}
複製代碼

增長一個參與者的整體的邏輯爲:

  • 增長一個參與者,須要同時增長parties和unarrived兩個數值,也就是state中的16位和低16位;
  • 若是是第一個參與者,則嘗試原子更新state的值,若是成功了就退出;
  • 若是不是第一個參與者,則檢查是否是在執行onAdvance() , 若是是等待onAdvance() 執行完成,若是不然嘗試原子更新state的值,直到成功退出;
  • 等待onAdvance() 完成是採用先自旋後進入隊列排隊的方式等待,減小線程上下文切換;

這裏有一個須要重點看一下的方法即reconcileState,下面咱們分析下。

reconcileState方法

這個方法主要是爲了處理在樹構造中可能存在的相位延遲問題。好比有時候當父相移器已經步進了,可是其子相移器並無步進。這很正常。這時候須要使得子相移器的未到達子階段爲0。(或者子階段數爲0,則從新設置未註冊的空狀態)。然而這個方法也會致使也有可能會有一些浮動的子相移器想要設置未到達子階段數量純粹只是爲了遇上當前線程,這樣的狀況下會調用這個方法。這時候計數不會受到影響。

private long reconcileState() {
    final Phaser root = this.root;
    long s = state;
    if (root != this) {
        int phase, u, p;
        // CAS root phase with current parties; possibly trip unarrived
        //下面這個while語句比較的麻煩。實際上就是幹了一件事,當子相移器和父相移器的階段不一樣的時候從新設置當前相移器的狀態。
        while ((phase = (int)(root.state >>> PHASE_SHIFT)) != (int)(s >>> PHASE_SHIFT) &&
               !UNSAFE.compareAndSwapLong
               (this, stateOffset, s,
                s = (((long)phase << PHASE_SHIFT) |
                     (s & PARTIES_MASK) |
                     ((p = (int)s >>> PARTIES_SHIFT) == 0 ? EMPTY :
                      (u = (int)s & UNARRIVED_MASK) == 0 ? p : u))))
            s = state;
    }
    return s;
}
複製代碼

arriveAndAwaitAdvance()方法

當前線程當前階段執行完畢,等待其餘線程完成當前階段。 若是當前線程是該階段最後一個到達的,則當前線程會執行onAdvance()方法,並喚醒其它線程進入下一個階段。

public int arriveAndAwaitAdvance() {
        // Specialization of doArrive+awaitAdvance eliminating some reads/paths
        final Phaser root = this.root;
        for (;;) {
            // state的值
            long s = (root == this) ? state : reconcileState();
            // 當前階段
            int phase = (int)(s >>> PHASE_SHIFT);
            if (phase < 0)
                return phase;
            // parties 和 unarrived的值
            int counts = (int)s;
            // unarrived的值(state的低16位)
            int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
            if (unarrived <= 0)
                throw new IllegalStateException(badArrive(s));
            // 修改state的值
            if (UNSAFE.compareAndSwapLong(this, stateOffset, s,
                                          s -= ONE_ARRIVAL)) {
                // 若是不是最後一個到達的,則調用internalAwaitAdvance()方法自旋或進入隊列等待
                if (unarrived > 1)
                    // 這裏是直接返回了,internalAwaitAdvance()方法的源碼見register()方法解析
                    return root.internalAwaitAdvance(phase, null);
                // 到這裏說明是最後一個到達的參與者
                if (root != this)
                    return parent.arriveAndAwaitAdvance();
                // n 只保留了state中parties的部分,也就是中16位
                long n = s & PARTIES_MASK;  // base of next state
                // parties的值,即下一次須要到達的參與者數量
                int nextUnarrived = (int)n >>> PARTIES_SHIFT;
                // 執行onAdvance()方法,返回true表示下一階段參與者數量爲0了,也就是結束了
                if (onAdvance(phase, nextUnarrived))
                    n |= TERMINATION_BIT;
                else if (nextUnarrived == 0)
                    n |= EMPTY;
                else
                    n |= nextUnarrived; // n加上unarrived的值
                // 下階段等待當前階段加1
                int nextPhase = (phase + 1) & MAX_PHASE;
                // n 加上下一個階段的值
                n |= (long)nextPhase << PHASE_SHIFT;
                // 修改state的值爲n
                if (!UNSAFE.compareAndSwapLong(this, stateOffset, s, n))
                    return (int)(state >>> PHASE_SHIFT); // terminated
                // 喚醒其它參與者並進入下一個階段
                releaseWaiters(phase);
                // 返回下一階段的值
                return nextPhase;
            }
        }
    }

複製代碼

arriveAndAwaitAdvance的大體邏輯爲:

  • 修改state中unarrived部分的值減1;
  • 若是不是最後一個到達,則調用internalAwaitAdvance() 方法自旋或排隊等待;
  • 若是是最後一個到達的,則調用onAdvance() 方法,而後修改state的值爲下一階段對應的值,並喚醒其它等待的線程;
  • 返回下一階段俄值。

internalAwaitAdvance方法

internalAwaitAdvance方法。實際上Phaser中阻塞都是經過這個語句實現的。這個語句必須經過根相移器調用。換句話說全部的阻塞都是在根相移器阻塞的。

輸入參數中phase是須要阻塞的階段。node是用來跟蹤可能中斷的阻塞節點。

// 等待onAdvance()方法執行完畢
// 原理是先自旋必定次數,若是進入下一個階段,這個方法直接返回了,
// 若是自旋必定次數尚未進入下一個階段,則當前線程入隊列,等待onAdvance()執行完成喚醒
private int internalAwaitAdvance(int phase, QNode node) {
    // assert root == this;
    // 保證隊列爲空
    releaseWaiters(phase-1);      // ensure old queue clean
    boolean queued = false;       // true when node is enqueued
    int lastUnarrived = 0;        // to increase spins upon change
    // 自旋的次數
    int spins = SPINS_PER_ARRIVAL;
    long s;
    int p;
    // 檢查當前階段是否變化,若是變化了說明進入下一個階段了,這時候就沒有必要自旋了
    while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) {
        // 若是node爲空,註冊的時候傳入的爲空
        if (node == null) {           // spinning in noninterruptible mode
            // 未完成的參與者數量
            int unarrived = (int)s & UNARRIVED_MASK;
            // unarrived 有變化,增長自旋次數
            if (unarrived != lastUnarrived &&
                (lastUnarrived = unarrived) < NCPU)
                spins += SPINS_PER_ARRIVAL;
            boolean interrupted = Thread.interrupted();
            // 自旋次數萬了,則新建一個節點
            if (interrupted || --spins < 0) { // need node to record intr
                node = new QNode(this, phase, false, false, 0L);
                node.wasInterrupted = interrupted;
            }
        } else if (node.isReleasable()) // done or aborted
            break;
        else if (!queued) {           // push onto queue
            // 節點入隊列
            AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
            QNode q = node.next = head.get();
            if ((q == null || q.phase == phase) &&
                (int)(state >>> PHASE_SHIFT) == phase) // avoid stale enq
                queued = head.compareAndSet(q, node);
        } else {
            try {
                // 當前線程進入阻塞狀態,跟調用LockSupport.park()同樣,等待被喚醒。
                ForkJoinPool.managedBlock(node);
            } catch (InterruptedException ie) {
                node.wasInterrupted = true;
            }
        }
  	}
	// 到這裏說明節點所在線程已經被喚醒了
    if (node != null) {
        // 置空節點中的線程
        if (node.thread != null)
        node.thread = null;       // 被喚醒後,置空thread引用,避免再次unpark
        if (node.wasInterrupted && !node.interruptible) // 不可中斷模式下,傳遞中斷
            Thread.currentThread().interrupt();
        if (p == phase && (p = (int)(state >>> PHASE_SHIFT)) == phase)
            return abortWait(phase); // 依舊沒有進入到下一個狀態,清除那些因爲超時或中斷再也不等待下一階段的結點
    }
    // 喚醒阻塞的線程
    releaseWaiters(phase);
    return p;
}
複製代碼

doArrive方法

doArrive是用來完成任務完成後到達的操做的

private int doArrive(boolean deregister) {
    int adj = deregister ? ONE_ARRIVAL|ONE_PARTY : ONE_ARRIVAL;//經過傳入參數判斷有哪些參數須要減1。
    final Phaser root = this.root;
    for (;;) {
        long s = (root == this) ? state : reconcileState();//獲取當前狀態,以及並解析當前參數。
        int phase = (int)(s >>> PHASE_SHIFT);
        int counts = (int)s;
        int unarrived = (counts & UNARRIVED_MASK) - 1;
        if (phase < 0)//phase爲負說明出現特殊狀況則將phase返回。
            return phase;
        else if (counts == EMPTY || unarrived < 0) {//若是狀態爲空或者未到達線程爲負,則邏輯上不該該存在線程到達,
            if (root == this || reconcileState() == s)//若是root爲this則說明狀態出錯拋出異常,可是若是該相移器還有父相移器,則還有可能出現相位傳播的延遲,這裏交給reconcileState來判斷,若是依然出現非法狀態則拋出異常。reconcileState後面會說到。
                throw new IllegalStateException(badArrive(s));
        }
        else if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adj)) {//完成條件判斷後,嘗試CAS設置當前狀態。
            if (unarrived == 0) {//若是當前到達是該階段最後一個到達的程序則須要進入下一個階段。
                long n = s & PARTIES_MASK;  // base of next state//保留子階段數值。
                int nextUnarrived = (int)n >>> PARTIES_SHIFT;//設置下一個階段你的數值。
                if (root != this)//若是當前phaser有根節點則調用父節點的根節點。
                    return parent.doArrive(nextUnarrived == 0);
                if (onAdvance(phase, nextUnarrived))//判斷是否能夠補進當前節點,實際上這個函數判斷是就是nextUnarrived是不是0若是是0則不該該補進,若是不該該補進則返回真,這時候就將phaser終止。這裏之因此還專門用一個onAdvance其實是提供一個hook方法,爲後續的實現提供方便。
                    n |= TERMINATION_BIT;
                else if (nextUnarrived == 0)//若是不該該終止,並且nextUnarrived又爲0,則須要專門設置一個空狀態。理由以前說過。
                    n |= EMPTY;
                else//固然更廣泛的狀況下仍是隻是設置一下下一個階段未到達線程數量。
                    n |= nextUnarrived;
                n |= (long)((phase + 1) & MAX_PHASE) << PHASE_SHIFT;//構造一個新的state變量。並使用CAS的方式去設置他。
                UNSAFE.compareAndSwapLong(this, stateOffset, s, n);
                releaseWaiters(phase);//釋放全部等的節點。
            }
            return phase;//返回phase數字
        }
    }
}
複製代碼

此方法,與arriveAndAwaitAdvance()相似,但不阻塞,可能會有註銷操做。

Phaser原理總結

如上圖所示,phaser,支持phaser樹(圖中,簡化爲phaser鏈表模式,獨子單傳,後文也稱phaser鏈)模式,分攤併發的壓力。每一個phaser結點的father指針指向前一個phaser結點,最前頭的結點成爲root結點,其father指針指向null, 每個結點的root指針指向root結點,root結點的root指針指向它本身。

只有root結點的evenQ和oddQ分別指向兩個QNode鏈表。每一個QNode結點包含有phaser和thread等關鍵屬性,其中,thread指向當前線程,phaser指向當前線程所註冊的phaser。

這兩個鏈表裏的線程所對應的phase(階段)要麼都爲奇數,要麼都爲偶數,相鄰階段的兩組線程必定在不一樣的鏈表裏面,這樣在新老階段更迭時,操做的是不一樣的鏈表,不會錯亂。整個phaser鏈,共用這兩個QNode鏈。

並且,線程也只會在root結點上被封裝進QNode結點入棧(QNode鏈,入棧,FIFO,後文有時也叫入隊,不影響功能),每一個phaser在初始時(被第一個線程註冊時)以當前線程向其父phaser註冊的方式與其父phaser創建聯繫,當此phaser上的線程都到達了,再以當前線程(最後一個抵達的線程)通知其父phaser,本身這邊OK了,每一個phaser都以一樣的方式通知其父phaser,最後到達root phaser,開始喚醒睡在棧裏(QNode鏈表)的線程,準備進入下一階段。

phaser的關鍵屬性state,是一個64位的long類型數據,劃分爲4個域:

  • unarrived -- 尚未抵達屏障的參與者的個數 (bits 0-15)
  • parties -- 須要等待的參與者的個數 (bits 16-31)
  • phase -- 屏障所處的階段 (bits 32-62)
  • terminated -- 屏障的結束標記 (bit 63 / sign)

特別地,初始時,state的值爲1,稱爲EMPTY,也便是unarrived = 1,其他都爲0,這是一個標記,表示此phaser尚未線程來註冊過,EMPTY = 1,而不是0,是由於0有特殊的含義,可能表示全部的線程都到達屏障了,此時unarrived也爲0(而不是初始狀態),正常來說,parties表示總的註冊的線程的個數,大於等於unarrived,初始時,parties = 0,而unarrived = 1,更易於辨別。

總結

Phaser

  1. Phaser適用於多階段多任務的場景,每一個階段的任務均可以控制的很細;
  2. Phaser內部使用state變量及隊列實現整個邏輯;
  3. state的高32位存儲當前階段phase,中16位存儲當前階段參與者(任務)的數量parties,低16位存儲未完成參與者的數量unarrived;
  4. 隊列會根據當前階段的奇偶性選擇不一樣的隊列;
  5. 當不是最後一個參與者到達時,會自旋或者進入隊列排隊來等待全部參與者完成任務;
  6. 當最後一個參與者完成任務時,會喚醒隊列中的線程並進入下一階段。

Phaser相對於CyclicBarrier和CountDownLatch的優點?

優點主要有兩點:

  1. Phaser能夠完成多階段,而一個CyclicBarrier 或者CountDownLatch通常只能控制一到兩個階段的任務;
  2. Phaser每一個階段的任務數量能夠控制,而一個CyclicBarrier 或者 CountDownLatch任務數量一旦肯定不可修改。

多階段協同,示意圖以下:

參考

PS:以上代碼提交在 Githubgithub.com/Niuh-Study/…

PS:這裏有一個技術交流羣(扣扣羣:1158819530),方便你們一塊兒交流,持續學習,共同進步,有須要的能夠加一下。

文章持續更新,能夠公衆號搜一搜「 一角錢技術 」第一時間閱讀, 本文 GitHub org_hejianhui/JavaStudy 已經收錄,歡迎 Star。

相關文章
相關標籤/搜索