Java多線程進階(二二)—— J.U.C之synchronizer框架:Phaser

123.jpg

本文首發於一世流雲專欄: https://segmentfault.com/blog...

1、Phaser簡介

PhaserJDK1.7開始引入的一個同步工具類,適用於一些須要分階段的任務的處理。它的功能與 CyclicBarrierCountDownLatch有些相似,相似於一個多階段的柵欄,而且功能更強大,咱們來比較下這三者的功能:java

同步器 做用
CountDownLatch 倒數計數器,初始時設定計數器值,線程能夠在計數器上等待,當計數器值歸0後,全部等待的線程繼續執行
CyclicBarrier 循環柵欄,初始時設定參與線程數,當線程到達柵欄後,會等待其它線程的到達,當到達柵欄的總數知足指定數後,全部等待的線程繼續執行
Phaser 多階段柵欄,能夠在初始時設定參與線程數,也能夠中途註冊/註銷參與者,當到達的參與者數量知足柵欄設定的數量後,會進行階段升級(advance)

Phaser中有一些比較重要的概念,理解了這些概念才能理解Phaser的功能。node

phase(階段)

咱們知道,在CyclicBarrier中,只有一個柵欄,線程在到達柵欄後會等待其它線程的到達。segmentfault

Phaser也有柵欄,在Phaser中,柵欄的名稱叫作phase(階段),在任意時間點,Phaser只處於某一個phase(階段),初始階段爲0,最大達到Integerr.MAX_VALUE,而後再次歸零。當全部parties參與者都到達後,phase值會遞增。併發

若是看過以前關於CyclicBarrier的文章,就會知道,Phaser中的phase(階段)這個概念其實和CyclicBarrier中的Generation很類似,只不過Generation沒有計數。框架

parties(參與者)

parties(參與者)其實就是CyclicBarrier中的參與線程的概念。ide

CyclicBarrier中的參與者在初始構造指定後就不能變動,而Phaser既能夠在初始構造時指定參與者的數量,也能夠中途經過registerbulkRegisterarriveAndDeregister等方法註冊/註銷參與者。函數

arrive(到達) / advance(進階)

Phaser註冊完parties(參與者)以後,參與者的初始狀態是unarrived的,當參與者到達(arrive)當前階段(phase)後,狀態就會變成arrived。當階段的到達參與者數知足條件後(註冊的數量等於到達的數量),階段就會發生進階(advance)——也就是phase值+1。高併發

clipboard.png

Termination(終止)

表明當前Phaser對象達到終止狀態,有點相似於CyclicBarrier中的柵欄被破壞的概念。工具

Tiering(分層)

Phaser支持分層(Tiering) —— 一種樹形結構,經過構造函數能夠指定當前待構造的Phaser對象的父結點。之因此引入Tiering,是由於當一個Phaser有大量參與者(parties)的時候,內部的同步操做會使性能急劇降低,而分層能夠下降競爭,從而減少因同步致使的額外開銷。性能

在一個分層Phasers的樹結構中,註冊和撤銷子Phaser或父Phaser是自動被管理的。當一個Phaser的參與者(parties)數量變成0時,若是有該Phaser有父結點,就會將它從父結點中溢移除。

關於Phaser的分層,後續咱們在講Phaser原理時會進一步討論。

2、Phaser示例

爲了更好的理解Phaser的功能,咱們來看幾個示例:

示例一

經過Phaser控制多個線程的執行時機:有時候咱們但願全部線程到達指定點後再同時開始執行,咱們能夠利用 CyclicBarrierCountDownLatch來實現,這裏給出使用Phaser的版本。
public class PhaserTest1 {
    public static void main(String[] args) {
        Phaser phaser = new Phaser();
        for (int i = 0; i < 10; i++) {
            phaser.register();                  // 註冊各個參與者線程
       new Thread(new Task(phaser), "Thread-" + i).start();
        }
    }
}

class Task implements Runnable {
    private final Phaser phaser;

    Task(Phaser phaser) {
        this.phaser = phaser;
    }

    @Override
    public void run() {
        int i = phaser.arriveAndAwaitAdvance();     // 等待其它參與者線程到達
     // do something
        System.out.println(Thread.currentThread().getName() + ": 執行完任務,當前phase =" + i + "");
    }
}

輸出:

Thread-8: 執行完任務,當前phase =1
Thread-4: 執行完任務,當前phase =1
Thread-3: 執行完任務,當前phase =1
Thread-0: 執行完任務,當前phase =1
Thread-5: 執行完任務,當前phase =1
Thread-6: 執行完任務,當前phase =1
Thread-7: 執行完任務,當前phase =1
Thread-9: 執行完任務,當前phase =1
Thread-1: 執行完任務,當前phase =1
Thread-2: 執行完任務,當前phase =1

以上示例中,建立了10個線程,並經過register方法註冊Phaser的參與者數量爲10。當某個線程調用arriveAndAwaitAdvance方法後,arrive數量會加1,若是數量沒有知足總數(參與者數量10),當前線程就是一直等待,當最後一個線程到達後,全部線程都會繼續往下執行。

注意: arriveAndAwaitAdvance方法是不響應中斷的,也就是說即便當前線程被中斷, arriveAndAwaitAdvance方法也不會返回或拋出異常,而是繼續等待。若是但願可以響應中斷,能夠參考 awaitAdvanceInterruptibly方法。

示例二

經過Phaser實現開關。在之前講 CountDownLatch時,咱們給出過以 CountDownLatch實現開關的示例,也就是說,咱們但願一些外部條件獲得知足後,而後打開開關,線程才能繼續執行,咱們看下如何用 Phaser來實現此功能。
public class PhaserTest2 {

    public static void main(String[] args) throws IOException {
        Phaser phaser = new Phaser(1);       // 註冊主線程,當外部條件知足時,由主線程打開開關
        for (int i = 0; i < 10; i++) {
            phaser.register();                      // 註冊各個參與者線程
            new Thread(new Task2(phaser), "Thread-" + i).start();
        }

        // 外部條件:等待用戶輸入命令
        System.out.println("Press ENTER to continue");
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        reader.readLine();

        // 打開開關
        phaser.arriveAndDeregister();
        System.out.println("主線程打開了開關");
    }
}

class Task2 implements Runnable {
    private final Phaser phaser;

    Task2(Phaser phaser) {
        this.phaser = phaser;
    }

    @Override
    public void run() {
        int i = phaser.arriveAndAwaitAdvance();     // 等待其它參與者線程到達

        // do something
        System.out.println(Thread.currentThread().getName() + ": 執行完任務,當前phase =" + i + "");
    }
}

輸出:

主線程打開了開關
Thread-7: 執行完任務,當前phase =1
Thread-4: 執行完任務,當前phase =1
Thread-3: 執行完任務,當前phase =1
Thread-1: 執行完任務,當前phase =1
Thread-0: 執行完任務,當前phase =1
Thread-9: 執行完任務,當前phase =1
Thread-8: 執行完任務,當前phase =1
Thread-2: 執行完任務,當前phase =1
Thread-5: 執行完任務,當前phase =1
Thread-6: 執行完任務,當前phase =1

以上示例中,只有當用戶按下回車以後,任務才真正開始執行。這裏主線程Main至關於一個協調者,用來控制開關打開的時機,arriveAndDeregister方法不會阻塞,該方法會將到達數加1,同時減小一個參與者數量,最終返回線程到達時的phase值。

示例三

經過 Phaser控制任務的執行輪數
public class PhaserTest3 {
    public static void main(String[] args) throws IOException {

        int repeats = 3;    // 指定任務最多執行的次數

        Phaser phaser = new Phaser() {
            @Override
            protected boolean onAdvance(int phase, int registeredParties) {
                System.out.println("---------------PHASE[" + phase + "],Parties[" + registeredParties + "] ---------------");
                return phase + 1 >= repeats  || registeredParties == 0;
            }
        };

        for (int i = 0; i < 10; i++) {
            phaser.register();                      // 註冊各個參與者線程
       new Thread(new Task3(phaser), "Thread-" + i).start();
        }
    }
}

class Task3 implements Runnable {
    private final Phaser phaser;

    Task3(Phaser phaser) {
        this.phaser = phaser;
    }

    @Override
    public void run() {
        while (!phaser.isTerminated()) {   //只要Phaser沒有終止, 各個線程的任務就會一直執行
            int i = phaser.arriveAndAwaitAdvance();     // 等待其它參與者線程到達
            // do something
            System.out.println(Thread.currentThread().getName() + ": 執行完任務");
        }
    }
}

輸出:

---------------PHASE[0],Parties[5] ---------------
Thread-4: 執行完任務
Thread-1: 執行完任務
Thread-2: 執行完任務
Thread-3: 執行完任務
Thread-0: 執行完任務
---------------PHASE[1],Parties[5] ---------------
Thread-0: 執行完任務
Thread-3: 執行完任務
Thread-1: 執行完任務
Thread-4: 執行完任務
Thread-2: 執行完任務
---------------PHASE[2],Parties[5] ---------------
Thread-2: 執行完任務
Thread-4: 執行完任務
Thread-1: 執行完任務
Thread-0: 執行完任務
Thread-3: 執行完任務

以上示例中,咱們在建立Phaser對象時,覆寫了onAdvance方法,這個方法相似於CyclicBarrier中的barrierAction任務。
也就是說,當最後一個參與者到達時,會觸發onAdvance方法,入參phase表示到達時的phase值,registeredParties表示到達時的參與者數量,返回true表示須要終止Phaser。

咱們經過phase + 1 >= repeats ,來控制階段(phase)數的上限爲2(從0開始計),最終控制了每一個線程的執行任務次數爲repeats次。

示例四

Phaser支持分層功能,咱們先來考慮下如何用利用Phaser的分層來實現高併發時的優化,在 示例三中,咱們其實建立了10個任務,而後10個線程共用一個Phaser對象,以下圖:

clipboard.png

若是任務數繼續增大,那麼同步產生的開銷會很是大,利用Phaser分層的功能,咱們能夠限定每一個Phaser對象的最大使用線程(任務數),以下圖:
clipboard.png

能夠看到,上述Phasers其實構成了一顆多叉樹,若是任務數繼續增多,還能夠將Phaser的葉子結點繼續分裂,而後將分裂出的子結點供工做線程使用。

public class PhaserTest4 {
    private static final int TASKS_PER_PHASER = 4;      // 每一個Phaser對象對應的工做線程(任務)數

    public static void main(String[] args) throws IOException {

        int repeats = 3;    // 指定任務最多執行的次數
        Phaser phaser = new Phaser() {
            @Override
            protected boolean onAdvance(int phase, int registeredParties) {
                System.out.println("---------------PHASE[" + phase + "],Parties[" + registeredParties + "] ---------------");
                return phase + 1 >= repeats || registeredParties == 0;
            }
        };

        Tasker[] taskers = new Tasker[10];
        build(taskers, 0, taskers.length, phaser);       // 根據任務數,爲每一個任務分配Phaser對象

        for (int i = 0; i < taskers.length; i++) {          // 執行任務
            Thread thread = new Thread(taskers[i]);
            thread.start();
        }
    }

    private static void build(Tasker[] taskers, int lo, int hi, Phaser phaser) {
        if (hi - lo > TASKS_PER_PHASER) {
            for (int i = lo; i < hi; i += TASKS_PER_PHASER) {
                int j = Math.min(i + TASKS_PER_PHASER, hi);
                build(taskers, i, j, new Phaser(phaser));
            }
        } else {
            for (int i = lo; i < hi; ++i)
                taskers[i] = new Tasker(i, phaser);
        }

    }
}

class Task4 implements Runnable {
    private final Phaser phaser;

    Task4(Phaser phaser) {
        this.phaser = phaser;
        this.phaser.register();
    }

    @Override
    public void run() {
        while (!phaser.isTerminated()) {   //只要Phaser沒有終止, 各個線程的任務就會一直執行
            int i = phaser.arriveAndAwaitAdvance();     // 等待其它參與者線程到達
            // do something
            System.out.println(Thread.currentThread().getName() + ": 執行完任務");
        }
    }
}
輸出:
---------------PHASE[0],Parties[3] ---------------
Thread-9: 執行完任務
Thread-6: 執行完任務
Thread-5: 執行完任務
Thread-4: 執行完任務
Thread-1: 執行完任務
Thread-0: 執行完任務
Thread-7: 執行完任務
Thread-8: 執行完任務
Thread-2: 執行完任務
Thread-3: 執行完任務
---------------PHASE[1],Parties[3] ---------------
Thread-3: 執行完任務
Thread-7: 執行完任務
Thread-0: 執行完任務
Thread-1: 執行完任務
Thread-5: 執行完任務
Thread-8: 執行完任務
Thread-2: 執行完任務
Thread-9: 執行完任務
Thread-6: 執行完任務
Thread-4: 執行完任務
---------------PHASE[2],Parties[3] ---------------
Thread-4: 執行完任務
Thread-2: 執行完任務
Thread-8: 執行完任務
Thread-0: 執行完任務
Thread-3: 執行完任務
Thread-9: 執行完任務
Thread-6: 執行完任務
Thread-7: 執行完任務
Thread-1: 執行完任務
Thread-5: 執行完任務

3、Phaser原理

Phaser是本系列至今爲止,內部結構最爲複雜的同步器之一。在開始深刻Phaser原理以前,咱們有必要先來說講Phaser的內部組織結構和它的設計思想。

Phaser的內部結構

以前咱們說過,Phaser支持樹形結構,在示例四中,也給出了一個經過分層提升併發性和程序執行效率的例子。一個複雜分層結構的Phaser樹的內部結構以下圖所示:
clipboard.png

上面圖中的幾點關鍵點:

  1. 樹的根結點root連接着兩個「無鎖棧」——Treiber Stack,用於保存等待線程(好比當線程等待Phaser進入下一階段時,會根據當前階段的奇偶性,把本身掛到某個棧中),全部Phaser對象都共享這兩個棧。
  2. 當首次將某個Phaser結點連接到樹中時,會同時向該結點的父結點註冊一個參與者。
爲何須要向父結點註冊參與者?

首先咱們要明白對於Phaser來講,何時會發生躍遷(advance)進入下一階段?

廢話,固然是等它全部參與者都到達的時候。那麼它所等待的參與者都包含那幾類呢?

①對於一個孤立的Phaser結點(也能夠當作是隻有一個根結點的樹)
其等待的參與者,就是顯式註冊的參與者,這也是最多見的狀況。
好比下圖,若是有10個Task共用這個Phaser,那等待的參與者數就是10,當10個線程都到達後,Phaser就會躍遷至下一階段。
clipboard.png

對於一個非孤立的Phaser葉子結點,好比下圖中標綠的葉子結點
這種狀況和①同樣,子Phaser1和子Phaser2等待的參與者數是4,子Phaser3等待的參與者數是2。
clipboard.png

對於一個非孤立非葉子的Phaser結點,好比上圖中標藍色的結點
這是最特殊的一種狀況,這也是Phaser同步器關於分層的主要設計思路。
這種狀況,結點所等待的參與者數目包含兩部分:

  1. 直接顯式註冊的參與者(經過構造器或register方法)。——等於0
  2. 子結點的數目。——等於3

也就是說在上圖中,當左一的子Phaser1的4個參與者都到達後,它會通知父結點Phaser,本身的狀態已經OK了,這時Phaser會認爲子Phaser1已經準備就緒,會將本身的到達者數量加1,同理,當子Phaser2子Phaser3的全部參與者分別到達後,它們也會依次通知Phaser,只有當Phaser(根結點)的到達者數量爲3時,纔會釋放「無鎖棧」中等待着的線程,並將階段數phase增長1。

這是一種層層遞歸的設計, 只要當根結點的全部參與者都到達後(也就是到達參數者數等於其子結點數),全部等待線程纔會放行,柵欄纔會進入下一階段。

瞭解了上面這些,咱們再來看Phaser的源碼。

同步狀態定義

Phaser使用一個long類型來保存同步狀態值State,並按位劃分不一樣區域的含義,經過掩碼和位運算進行賦值和操做:
clipboard.png

棧結點定義

「無鎖棧」——Treiber Stack,保存在Phaser樹的根結點中,其他全部Phaser子結點共享這兩個棧:
clipboard.png

結點的定義很是簡單,內部保存了線程信息和Phsaer對象信息:
clipboard.png

注意:ForkJoinPool.ManagedBlocker是當棧包含ForkJoinWorkerThread類型的QNode阻塞的時候,ForkJoinPool內部會增長一個工做線程來保證並行度,後續講ForkJoin框架時咱們會進行分析。

Phaser的構造器

Phaser一共有4個構造器,能夠看到,最終其實都是調用了Phaser(Phaser parent, int parties)這個構造器。
clipboard.png

Phaser(Phaser parent, int parties)的內部實現以下,關鍵就是給當前的Phaser對象指定父結點時,若是當前Phaser的參與者不爲0,須要向父Phaser註冊一個參與者(表明當前結點自己):
clipboard.png

註冊參與者

Phaser提供了兩個註冊參與者的方法:

  • register:註冊單個參與者
  • bulkRegister:批量註冊參與者

clipboard.png

這兩個方法都很簡單,內部調用了doRegister方法:

/**
 * 註冊指定數目{#registrations}的參與者
 */
private int doRegister(int registrations) {
    // 首先計算註冊後當前State要調整的值adjust
    long adjust = ((long) registrations << PARTIES_SHIFT) | registrations;
    final Phaser parent = this.parent;
    int phase;
    for (; ; ) {
        long s = (parent == null) ? state : reconcileState();   // reconcileState()調整當前Phaser的State與root一致
        int counts = (int) s;
        int parties = counts >>> PARTIES_SHIFT;                 // 參與者數目
        int unarrived = counts & UNARRIVED_MASK;                // 未到達的數目
        if (registrations > MAX_PARTIES - parties)
            throw new IllegalStateException(badRegister(s));
        phase = (int) (s >>> PHASE_SHIFT);                      // 當前Phaser所處的階段phase
        if (phase < 0)
            break;
        if (counts != EMPTY) {                                  // CASE1: 當前Phaser已經註冊過參與者
            if (parent == null || reconcileState() == s) {
                if (unarrived == 0)    // 參與者已所有到達柵欄, 當前Phaser正在Advance, 須要阻塞等待這一過程完成
                    root.internalAwaitAdvance(phase, null);
                else if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s + adjust))    // 不然,直接更新State
                    break;
            }
        } else if (parent == null) {                            // CASE2: 當前Phaser未註冊過參與者(第一次註冊),且沒有父結點
            long next = ((long) phase << PHASE_SHIFT) | adjust;
            if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next))  // CAS更新當前Phaser的State值
                break;
        } else {                                                // CASE3: 當前Phaser未註冊過參與者(第一次註冊),且有父結點
            synchronized (this) {
                if (state == s) {
                    phase = parent.doRegister(1);   // 向父結點註冊一個參與者
                    if (phase < 0)
                        break;
                    while (!UNSAFE.compareAndSwapLong(this, stateOffset, s,
                            ((long) phase << PHASE_SHIFT) | adjust)) {
                        s = state;
                        phase = (int) (root.state >>> PHASE_SHIFT);
                    }
                    break;
                }
            }
        }
    }
    return phase;
}

doRegister方法用來給當前Phaser對象註冊參與者,主要有三個分支:
①當前Phaser已經註冊過參與者
若是參與者已經所有到達柵欄,則當前線程須要阻塞等待(由於此時phase正在變化,增長1到下一個phase),不然直接更新State。

②當前Phaser未註冊過參與者(第一次註冊),且沒有父結點
這種狀況最簡單,直接更新當前Phaser的State值。

③當前Phaser未註冊過參與者(第一次註冊),且有父結點
說明當前Phaser是新加入的葉子結點,須要向父結點註冊自身,同時更新自身的State值。

注意: reconcileState方法比較特殊,由於當出現樹形結構時,根結點首先進行phase的更新,因此須要顯式同步,使當前結點和根結點保持一致。
clipboard.png

另外,阻塞等待調用的是internalAwaitAdvance方法,其實就是根據當前階段phase,將線程包裝成結點加入到root結點所指向的某個「無鎖棧」中:

/**
 * internalAwaitAdvance的主要邏輯就是:當前參與者線程等待Phaser進入下一個階段(就是phase值變化).
 * @return 返回新的階段
 */
private int internalAwaitAdvance(int phase, QNode node) {
    // assert root == this;
    releaseWaiters(phase - 1);       // 清空不用的Treiber Stack(奇偶Stack交替使用)
    boolean queued = false;                 // 入隊標識
    int lastUnarrived = 0;
    int spins = SPINS_PER_ARRIVAL;
    long s;
    int p;
    while ((p = (int) ((s = state) >>> PHASE_SHIFT)) == phase) {
        if (node == null) {           // spinning in noninterruptible mode
            int unarrived = (int) s & UNARRIVED_MASK;
            if (unarrived != lastUnarrived &&
                    (lastUnarrived = unarrived) < NCPU)
                spins += SPINS_PER_ARRIVAL;
            boolean interrupted = Thread.interrupted();
            if (interrupted || --spins < 0) { // need node to record intr
                node = new QNode(this, phase, false, false, 0L);
                node.wasInterrupted = interrupted;
            }
        } else if (node.isReleasable()) // done or aborted
            break;
        else if (!queued) {           // 將結點壓入棧頂
            AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
            QNode q = node.next = head.get();
            if ((q == null || q.phase == phase) &&
                    (int) (state >>> PHASE_SHIFT) == phase) // avoid stale enq
                queued = head.compareAndSet(q, node);
        } else {
            try {
                // 阻塞等待
                ForkJoinPool.managedBlock(node);
            } catch (InterruptedException ie) {
                node.wasInterrupted = true;
            }
        }
    }
 
    if (node != null) {
        if (node.thread != null)
            node.thread = null;       // avoid need for unpark()
        if (node.wasInterrupted && !node.interruptible)
            Thread.currentThread().interrupt();
        if (p == phase && (p = (int) (state >>> PHASE_SHIFT)) == phase)
            return abortWait(phase); // possibly clean up on abort
    }
    releaseWaiters(phase);
    return p;
}

參與者到達並等待

clipboard.png

arriveAndAwaitAdvance的主要邏輯以下:
首先將同步狀態值State中的未到達參與者數量減1,而後判斷未到達參與者數量是否爲0?

若是不爲0,則阻塞當前線程,以等待其餘參與者到來;

若是爲0,說明當前線程是最後一個參與者,若是有父結點則對父結點遞歸調用該方法。(由於只有根結點的未到達參與者數目爲0時),纔會進階phase。

4、Phaser類/接口聲明

類聲明

clipboard.png

構造器聲明

clipboard.png

接口聲明

clipboard.png
clipboard.png

相關文章
相關標籤/搜索