本文首發於一世流雲專欄: https://segmentfault.com/blog...
Phaser
是JDK1.7開始引入的一個同步工具類,適用於一些須要分階段的任務的處理。它的功能與 CyclicBarrier和CountDownLatch有些相似,相似於一個多階段的柵欄,而且功能更強大,咱們來比較下這三者的功能:java
同步器 | 做用 |
---|---|
CountDownLatch | 倒數計數器,初始時設定計數器值,線程能夠在計數器上等待,當計數器值歸0後,全部等待的線程繼續執行 |
CyclicBarrier | 循環柵欄,初始時設定參與線程數,當線程到達柵欄後,會等待其它線程的到達,當到達柵欄的總數知足指定數後,全部等待的線程繼續執行 |
Phaser | 多階段柵欄,能夠在初始時設定參與線程數,也能夠中途註冊/註銷參與者,當到達的參與者數量知足柵欄設定的數量後,會進行階段升級(advance) |
Phaser中有一些比較重要的概念,理解了這些概念才能理解Phaser的功能。node
咱們知道,在CyclicBarrier中,只有一個柵欄,線程在到達柵欄後會等待其它線程的到達。segmentfault
Phaser也有柵欄,在Phaser中,柵欄的名稱叫作phase(階段),在任意時間點,Phaser只處於某一個phase(階段),初始階段爲0,最大達到Integerr.MAX_VALUE
,而後再次歸零。當全部parties參與者都到達後,phase值會遞增。併發
若是看過以前關於CyclicBarrier的文章,就會知道,Phaser中的phase(階段)這個概念其實和CyclicBarrier中的Generation很類似,只不過Generation沒有計數。框架
parties(參與者)其實就是CyclicBarrier中的參與線程的概念。ide
CyclicBarrier中的參與者在初始構造指定後就不能變動,而Phaser既能夠在初始構造時指定參與者的數量,也能夠中途經過register
、bulkRegister
、arriveAndDeregister
等方法註冊/註銷參與者。函數
Phaser註冊完parties(參與者)以後,參與者的初始狀態是unarrived的,當參與者到達(arrive)當前階段(phase)後,狀態就會變成arrived。當階段的到達參與者數知足條件後(註冊的數量等於到達的數量),階段就會發生進階(advance)——也就是phase值+1。高併發
表明當前Phaser對象達到終止狀態,有點相似於CyclicBarrier中的柵欄被破壞的概念。工具
Phaser支持分層(Tiering) —— 一種樹形結構,經過構造函數能夠指定當前待構造的Phaser對象的父結點。之因此引入Tiering,是由於當一個Phaser有大量參與者(parties)的時候,內部的同步操做會使性能急劇降低,而分層能夠下降競爭,從而減少因同步致使的額外開銷。性能
在一個分層Phasers的樹結構中,註冊和撤銷子Phaser或父Phaser是自動被管理的。當一個Phaser的參與者(parties)數量變成0時,若是有該Phaser有父結點,就會將它從父結點中溢移除。
關於Phaser的分層,後續咱們在講Phaser原理時會進一步討論。
爲了更好的理解Phaser的功能,咱們來看幾個示例:
經過Phaser控制多個線程的執行時機:有時候咱們但願全部線程到達指定點後再同時開始執行,咱們能夠利用 CyclicBarrier或 CountDownLatch來實現,這裏給出使用Phaser的版本。
public class PhaserTest1 { public static void main(String[] args) { Phaser phaser = new Phaser(); for (int i = 0; i < 10; i++) { phaser.register(); // 註冊各個參與者線程 new Thread(new Task(phaser), "Thread-" + i).start(); } } } class Task implements Runnable { private final Phaser phaser; Task(Phaser phaser) { this.phaser = phaser; } @Override public void run() { int i = phaser.arriveAndAwaitAdvance(); // 等待其它參與者線程到達 // do something System.out.println(Thread.currentThread().getName() + ": 執行完任務,當前phase =" + i + ""); } }
輸出:
Thread-8: 執行完任務,當前phase =1 Thread-4: 執行完任務,當前phase =1 Thread-3: 執行完任務,當前phase =1 Thread-0: 執行完任務,當前phase =1 Thread-5: 執行完任務,當前phase =1 Thread-6: 執行完任務,當前phase =1 Thread-7: 執行完任務,當前phase =1 Thread-9: 執行完任務,當前phase =1 Thread-1: 執行完任務,當前phase =1 Thread-2: 執行完任務,當前phase =1
以上示例中,建立了10個線程,並經過register
方法註冊Phaser的參與者數量爲10。當某個線程調用arriveAndAwaitAdvance
方法後,arrive數量會加1,若是數量沒有知足總數(參與者數量10),當前線程就是一直等待,當最後一個線程到達後,全部線程都會繼續往下執行。
注意:arriveAndAwaitAdvance
方法是不響應中斷的,也就是說即便當前線程被中斷, arriveAndAwaitAdvance方法也不會返回或拋出異常,而是繼續等待。若是但願可以響應中斷,能夠參考awaitAdvanceInterruptibly
方法。
經過Phaser實現開關。在之前講 CountDownLatch時,咱們給出過以 CountDownLatch實現開關的示例,也就是說,咱們但願一些外部條件獲得知足後,而後打開開關,線程才能繼續執行,咱們看下如何用 Phaser來實現此功能。
public class PhaserTest2 { public static void main(String[] args) throws IOException { Phaser phaser = new Phaser(1); // 註冊主線程,當外部條件知足時,由主線程打開開關 for (int i = 0; i < 10; i++) { phaser.register(); // 註冊各個參與者線程 new Thread(new Task2(phaser), "Thread-" + i).start(); } // 外部條件:等待用戶輸入命令 System.out.println("Press ENTER to continue"); BufferedReader reader = new BufferedReader(new InputStreamReader(System.in)); reader.readLine(); // 打開開關 phaser.arriveAndDeregister(); System.out.println("主線程打開了開關"); } } class Task2 implements Runnable { private final Phaser phaser; Task2(Phaser phaser) { this.phaser = phaser; } @Override public void run() { int i = phaser.arriveAndAwaitAdvance(); // 等待其它參與者線程到達 // do something System.out.println(Thread.currentThread().getName() + ": 執行完任務,當前phase =" + i + ""); } }
輸出:
主線程打開了開關 Thread-7: 執行完任務,當前phase =1 Thread-4: 執行完任務,當前phase =1 Thread-3: 執行完任務,當前phase =1 Thread-1: 執行完任務,當前phase =1 Thread-0: 執行完任務,當前phase =1 Thread-9: 執行完任務,當前phase =1 Thread-8: 執行完任務,當前phase =1 Thread-2: 執行完任務,當前phase =1 Thread-5: 執行完任務,當前phase =1 Thread-6: 執行完任務,當前phase =1
以上示例中,只有當用戶按下回車以後,任務才真正開始執行。這裏主線程Main至關於一個協調者,用來控制開關打開的時機,arriveAndDeregister
方法不會阻塞,該方法會將到達數加1,同時減小一個參與者數量,最終返回線程到達時的phase值。
經過 Phaser控制任務的執行輪數
public class PhaserTest3 { public static void main(String[] args) throws IOException { int repeats = 3; // 指定任務最多執行的次數 Phaser phaser = new Phaser() { @Override protected boolean onAdvance(int phase, int registeredParties) { System.out.println("---------------PHASE[" + phase + "],Parties[" + registeredParties + "] ---------------"); return phase + 1 >= repeats || registeredParties == 0; } }; for (int i = 0; i < 10; i++) { phaser.register(); // 註冊各個參與者線程 new Thread(new Task3(phaser), "Thread-" + i).start(); } } } class Task3 implements Runnable { private final Phaser phaser; Task3(Phaser phaser) { this.phaser = phaser; } @Override public void run() { while (!phaser.isTerminated()) { //只要Phaser沒有終止, 各個線程的任務就會一直執行 int i = phaser.arriveAndAwaitAdvance(); // 等待其它參與者線程到達 // do something System.out.println(Thread.currentThread().getName() + ": 執行完任務"); } } }
輸出:
---------------PHASE[0],Parties[5] --------------- Thread-4: 執行完任務 Thread-1: 執行完任務 Thread-2: 執行完任務 Thread-3: 執行完任務 Thread-0: 執行完任務 ---------------PHASE[1],Parties[5] --------------- Thread-0: 執行完任務 Thread-3: 執行完任務 Thread-1: 執行完任務 Thread-4: 執行完任務 Thread-2: 執行完任務 ---------------PHASE[2],Parties[5] --------------- Thread-2: 執行完任務 Thread-4: 執行完任務 Thread-1: 執行完任務 Thread-0: 執行完任務 Thread-3: 執行完任務
以上示例中,咱們在建立Phaser對象時,覆寫了onAdvance
方法,這個方法相似於CyclicBarrier中的barrierAction
任務。
也就是說,當最後一個參與者到達時,會觸發onAdvance
方法,入參phase表示到達時的phase值,registeredParties表示到達時的參與者數量,返回true表示須要終止Phaser。
咱們經過phase + 1 >= repeats
,來控制階段(phase)數的上限爲2(從0開始計),最終控制了每一個線程的執行任務次數爲repeats次。
Phaser支持分層功能,咱們先來考慮下如何用利用Phaser的分層來實現高併發時的優化,在 示例三中,咱們其實建立了10個任務,而後10個線程共用一個Phaser對象,以下圖:
若是任務數繼續增大,那麼同步產生的開銷會很是大,利用Phaser分層的功能,咱們能夠限定每一個Phaser對象的最大使用線程(任務數),以下圖:
能夠看到,上述Phasers其實構成了一顆多叉樹,若是任務數繼續增多,還能夠將Phaser的葉子結點繼續分裂,而後將分裂出的子結點供工做線程使用。
public class PhaserTest4 { private static final int TASKS_PER_PHASER = 4; // 每一個Phaser對象對應的工做線程(任務)數 public static void main(String[] args) throws IOException { int repeats = 3; // 指定任務最多執行的次數 Phaser phaser = new Phaser() { @Override protected boolean onAdvance(int phase, int registeredParties) { System.out.println("---------------PHASE[" + phase + "],Parties[" + registeredParties + "] ---------------"); return phase + 1 >= repeats || registeredParties == 0; } }; Tasker[] taskers = new Tasker[10]; build(taskers, 0, taskers.length, phaser); // 根據任務數,爲每一個任務分配Phaser對象 for (int i = 0; i < taskers.length; i++) { // 執行任務 Thread thread = new Thread(taskers[i]); thread.start(); } } private static void build(Tasker[] taskers, int lo, int hi, Phaser phaser) { if (hi - lo > TASKS_PER_PHASER) { for (int i = lo; i < hi; i += TASKS_PER_PHASER) { int j = Math.min(i + TASKS_PER_PHASER, hi); build(taskers, i, j, new Phaser(phaser)); } } else { for (int i = lo; i < hi; ++i) taskers[i] = new Tasker(i, phaser); } } } class Task4 implements Runnable { private final Phaser phaser; Task4(Phaser phaser) { this.phaser = phaser; this.phaser.register(); } @Override public void run() { while (!phaser.isTerminated()) { //只要Phaser沒有終止, 各個線程的任務就會一直執行 int i = phaser.arriveAndAwaitAdvance(); // 等待其它參與者線程到達 // do something System.out.println(Thread.currentThread().getName() + ": 執行完任務"); } } }
輸出: ---------------PHASE[0],Parties[3] --------------- Thread-9: 執行完任務 Thread-6: 執行完任務 Thread-5: 執行完任務 Thread-4: 執行完任務 Thread-1: 執行完任務 Thread-0: 執行完任務 Thread-7: 執行完任務 Thread-8: 執行完任務 Thread-2: 執行完任務 Thread-3: 執行完任務 ---------------PHASE[1],Parties[3] --------------- Thread-3: 執行完任務 Thread-7: 執行完任務 Thread-0: 執行完任務 Thread-1: 執行完任務 Thread-5: 執行完任務 Thread-8: 執行完任務 Thread-2: 執行完任務 Thread-9: 執行完任務 Thread-6: 執行完任務 Thread-4: 執行完任務 ---------------PHASE[2],Parties[3] --------------- Thread-4: 執行完任務 Thread-2: 執行完任務 Thread-8: 執行完任務 Thread-0: 執行完任務 Thread-3: 執行完任務 Thread-9: 執行完任務 Thread-6: 執行完任務 Thread-7: 執行完任務 Thread-1: 執行完任務 Thread-5: 執行完任務
Phaser是本系列至今爲止,內部結構最爲複雜的同步器之一。在開始深刻Phaser原理以前,咱們有必要先來說講Phaser的內部組織結構和它的設計思想。
以前咱們說過,Phaser支持樹形結構,在示例四中,也給出了一個經過分層提升併發性和程序執行效率的例子。一個複雜分層結構的Phaser樹的內部結構以下圖所示:
上面圖中的幾點關鍵點:
爲何須要向父結點註冊參與者?
首先咱們要明白對於Phaser來講,何時會發生躍遷(advance)進入下一階段?
廢話,固然是等它全部參與者都到達的時候。那麼它所等待的參與者都包含那幾類呢?
①對於一個孤立的Phaser結點(也能夠當作是隻有一個根結點的樹)
其等待的參與者,就是顯式註冊的參與者,這也是最多見的狀況。
好比下圖,若是有10個Task共用這個Phaser,那等待的參與者數就是10,當10個線程都到達後,Phaser就會躍遷至下一階段。
②對於一個非孤立的Phaser葉子結點,好比下圖中標綠的葉子結點
這種狀況和①同樣,子Phaser1和子Phaser2等待的參與者數是4,子Phaser3等待的參與者數是2。
③對於一個非孤立非葉子的Phaser結點,好比上圖中標藍色的結點
這是最特殊的一種狀況,這也是Phaser同步器關於分層的主要設計思路。
這種狀況,結點所等待的參與者數目包含兩部分:
也就是說在上圖中,當左一的子Phaser1的4個參與者都到達後,它會通知父結點Phaser,本身的狀態已經OK了,這時Phaser會認爲子Phaser1已經準備就緒,會將本身的到達者數量加1,同理,當子Phaser2和子Phaser3的全部參與者分別到達後,它們也會依次通知Phaser,只有當Phaser(根結點)的到達者數量爲3時,纔會釋放「無鎖棧」中等待着的線程,並將階段數phase增長1。
這是一種層層遞歸的設計, 只要當根結點的全部參與者都到達後(也就是到達參數者數等於其子結點數),全部等待線程纔會放行,柵欄纔會進入下一階段。
瞭解了上面這些,咱們再來看Phaser的源碼。
Phaser使用一個long類型來保存同步狀態值State,並按位劃分不一樣區域的含義,經過掩碼和位運算進行賦值和操做:
「無鎖棧」——Treiber Stack,保存在Phaser樹的根結點中,其他全部Phaser子結點共享這兩個棧:
結點的定義很是簡單,內部保存了線程信息和Phsaer對象信息:
注意:ForkJoinPool.ManagedBlocker
是當棧包含ForkJoinWorkerThread類型的QNode阻塞的時候,ForkJoinPool內部會增長一個工做線程來保證並行度,後續講ForkJoin框架時咱們會進行分析。
Phaser一共有4個構造器,能夠看到,最終其實都是調用了Phaser(Phaser parent, int parties)
這個構造器。
Phaser(Phaser parent, int parties)
的內部實現以下,關鍵就是給當前的Phaser對象指定父結點時,若是當前Phaser的參與者不爲0,須要向父Phaser註冊一個參與者(表明當前結點自己):
Phaser提供了兩個註冊參與者的方法:
這兩個方法都很簡單,內部調用了doRegister方法:
/** * 註冊指定數目{#registrations}的參與者 */ private int doRegister(int registrations) { // 首先計算註冊後當前State要調整的值adjust long adjust = ((long) registrations << PARTIES_SHIFT) | registrations; final Phaser parent = this.parent; int phase; for (; ; ) { long s = (parent == null) ? state : reconcileState(); // reconcileState()調整當前Phaser的State與root一致 int counts = (int) s; int parties = counts >>> PARTIES_SHIFT; // 參與者數目 int unarrived = counts & UNARRIVED_MASK; // 未到達的數目 if (registrations > MAX_PARTIES - parties) throw new IllegalStateException(badRegister(s)); phase = (int) (s >>> PHASE_SHIFT); // 當前Phaser所處的階段phase if (phase < 0) break; if (counts != EMPTY) { // CASE1: 當前Phaser已經註冊過參與者 if (parent == null || reconcileState() == s) { if (unarrived == 0) // 參與者已所有到達柵欄, 當前Phaser正在Advance, 須要阻塞等待這一過程完成 root.internalAwaitAdvance(phase, null); else if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s + adjust)) // 不然,直接更新State break; } } else if (parent == null) { // CASE2: 當前Phaser未註冊過參與者(第一次註冊),且沒有父結點 long next = ((long) phase << PHASE_SHIFT) | adjust; if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next)) // CAS更新當前Phaser的State值 break; } else { // CASE3: 當前Phaser未註冊過參與者(第一次註冊),且有父結點 synchronized (this) { if (state == s) { phase = parent.doRegister(1); // 向父結點註冊一個參與者 if (phase < 0) break; while (!UNSAFE.compareAndSwapLong(this, stateOffset, s, ((long) phase << PHASE_SHIFT) | adjust)) { s = state; phase = (int) (root.state >>> PHASE_SHIFT); } break; } } } } return phase; }
doRegister方法用來給當前Phaser對象註冊參與者,主要有三個分支:
①當前Phaser已經註冊過參與者
若是參與者已經所有到達柵欄,則當前線程須要阻塞等待(由於此時phase正在變化,增長1到下一個phase),不然直接更新State。
②當前Phaser未註冊過參與者(第一次註冊),且沒有父結點
這種狀況最簡單,直接更新當前Phaser的State值。
③當前Phaser未註冊過參與者(第一次註冊),且有父結點
說明當前Phaser是新加入的葉子結點,須要向父結點註冊自身,同時更新自身的State值。
注意: reconcileState方法比較特殊,由於當出現樹形結構時,根結點首先進行phase的更新,因此須要顯式同步,使當前結點和根結點保持一致。
另外,阻塞等待調用的是internalAwaitAdvance方法,其實就是根據當前階段phase,將線程包裝成結點加入到root結點所指向的某個「無鎖棧」中:
/** * internalAwaitAdvance的主要邏輯就是:當前參與者線程等待Phaser進入下一個階段(就是phase值變化). * @return 返回新的階段 */ private int internalAwaitAdvance(int phase, QNode node) { // assert root == this; releaseWaiters(phase - 1); // 清空不用的Treiber Stack(奇偶Stack交替使用) boolean queued = false; // 入隊標識 int lastUnarrived = 0; int spins = SPINS_PER_ARRIVAL; long s; int p; while ((p = (int) ((s = state) >>> PHASE_SHIFT)) == phase) { if (node == null) { // spinning in noninterruptible mode int unarrived = (int) s & UNARRIVED_MASK; if (unarrived != lastUnarrived && (lastUnarrived = unarrived) < NCPU) spins += SPINS_PER_ARRIVAL; boolean interrupted = Thread.interrupted(); if (interrupted || --spins < 0) { // need node to record intr node = new QNode(this, phase, false, false, 0L); node.wasInterrupted = interrupted; } } else if (node.isReleasable()) // done or aborted break; else if (!queued) { // 將結點壓入棧頂 AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ; QNode q = node.next = head.get(); if ((q == null || q.phase == phase) && (int) (state >>> PHASE_SHIFT) == phase) // avoid stale enq queued = head.compareAndSet(q, node); } else { try { // 阻塞等待 ForkJoinPool.managedBlock(node); } catch (InterruptedException ie) { node.wasInterrupted = true; } } } if (node != null) { if (node.thread != null) node.thread = null; // avoid need for unpark() if (node.wasInterrupted && !node.interruptible) Thread.currentThread().interrupt(); if (p == phase && (p = (int) (state >>> PHASE_SHIFT)) == phase) return abortWait(phase); // possibly clean up on abort } releaseWaiters(phase); return p; }
arriveAndAwaitAdvance的主要邏輯以下:
首先將同步狀態值State中的未到達參與者數量減1,而後判斷未到達參與者數量是否爲0?
若是不爲0,則阻塞當前線程,以等待其餘參與者到來;
若是爲0,說明當前線程是最後一個參與者,若是有父結點則對父結點遞歸調用該方法。(由於只有根結點的未到達參與者數目爲0時),纔會進階phase。