併發工具類(五) Phaser類

前言

  JDK中爲了處理線程之間的同步問題,除了提供鎖機制以外,還提供了幾個很是有用的併發工具類:CountDownLatch、CyclicBarrier、Semphore、Exchanger、Phaser;
  CountDownLatch、CyclicBarrier、Semphore、Phaser 這四個工具類提供一種併發流程的控制手段;而Exchanger工具類則提供了在線程之間交換數據的一種手段。html

簡介

  Phaser 是JDK1.7版本中新增的,是一個可重用的同步barrier,它的功能與 CountDownLatch、CyclicBarrier 類似,可是使用起來更加靈活。能夠java

用來解決控制多個線程分階段共同完成任務的情景問題。

   Phaser中有兩個重要的計數:

  • phase
    :當前的週期索引(或者 階段索引),初始值爲0,當全部線程執行完本階段的任務後,phase就會加一,進入下一階段;能夠結合onAdvance()方法,在不一樣的階段,執行不一樣的屏障方法。
  • parties
    :註冊的線程數,即Phaser要監控的線程數量,或者說是 創建的屏障的數量。屏障的數量不是固定的,每一個階段的屏障的數量均可以是不同。

下面詳細介紹Phaser一些機制

一、Registration(註冊機制):
與其餘barrier不一樣的是,Phaser中的
「註冊的同步者(parties)」
會隨時間而變化,Phaser能夠經過構造器初始化parties個數,也能夠在Phaser運行期間隨時加入(方法
register( ), bulkRegister(int)
)新的parties,以及在運行期間註銷(方法
arriveAndDeregister( )
)parties。
運行時能夠隨時加入、註銷parties,只會影響Phaser內部的計數器,它創建任何內部的bookkeeping(帳本),所以task不能查詢本身是否已經註冊了,固然你能夠經過實現子類來達成這一設計要求。

二、Synchronization(同步機制):
相似於CyclicBarrier,Phaser也能夠awaited屢次,它的arrivedAndAwaitAdvance()方法的效果相似於CyclicBarrier的await()。Phaser的每一個週期(generation)都有一個phase數字,phase 從0開始,當全部的已註冊的parties都到達後(arrive)將會致使此phase數字自增(advance),當達到Integer.MAX_VALUE後繼續從0開始。這個phase數字用於表示當前parties所處於的「階段週期」,它既能夠標記和控制parties的wait行爲、喚醒等待的時機。

  • Arrival:
    Phaser中的arrive()、arriveAndDeregister()方法,這兩個方法不會阻塞(block),可是會返回相應的phase數字,當此phase中最後一個party也arrive之後,phase數字將會增長,即phase進入下一個週期,同時觸發(onAdvance)那些阻塞在上一phase的線程。這一點相似於CyclicBarrier的barrier到達機制;更靈活的是,咱們能夠經過重寫onAdvance方法來實現更多的觸發行爲。
  • Waiting:
    Phaser中的awaitAdvance()方法,須要指定一個phase數字,表示此Thread阻塞直到phase推動到此週期,arriveAndAwaitAdvance()方法阻塞到下一週期開始(或者當前phase結束)。不像CyclicBarrier,即便等待Thread已經interrupted,awaitAdvance方法會繼續等待。Phaser提供了Interruptible和Timout的阻塞機制,不過當線程Interrupted或者timout以後將會拋出異常,而不會修改Phaser的內部狀態。若是必要的話,你能夠在遇到此類異常時,進行相應的恢復操做,一般是在調用forceTermination()方法以後。
    Phaser一般在ForJoinPool中執行tasks,它能夠在有task阻塞等待advance時,確保其餘tasks的充分並行能力。

三、Termination(終止):
Phaser能夠進入Termination狀態,能夠經過isTermination()方法判斷;當Phaser被終止後,全部的同步方法將會當即返回(解除阻塞),不須要等到advance(即advance也會解除阻塞),且這些阻塞方法將會返回一個負值的phase值(awaitAdvance方法、arriveAndAwaitAdvance方法)。固然,向一個termination狀態的Phaser註冊party將不會有效;此時onAdvance()方法也將會返回true(默認實現),即全部的parties都會被deregister,即register個數爲0。

四、Tiering(分層):
Phaser能夠「分層」,以tree的方式構建Phaser來下降「競爭」。若是一個Phaser中有大量parties,這會致使嚴重的同步競爭,因此咱們能夠將它們分組並共享一個parent Phaser,這樣能夠提升吞吐能力;Phaser中註冊和註銷parties都會有Child 和parent Phaser自動管理。當Child Phaser中中註冊的parties變爲非0時(在構造函數Phaser(Phaser parent,int parties),或者register()方法),Child Phaser將會註冊到其Parent上;當Child Phaser中的parties變爲0時(好比由arrivedAndDegister()方法),那麼此時Child Phaser也將從其parent中註銷出去。

五、Monitoring.(監控):
同步的方法只會被register操做調用,對於當前state的監控方法能夠在任什麼時候候調用,好比getRegisteredParties()獲取已經註冊的parties個數,getPhase()獲取當前phase週期數等;由於這些方法並不是同步,因此只能反映當時的瞬間狀態。

Phaser的API介紹

構造方法bash

方法名 描述
Phaser() 構建一個Phaser
Phaser(int parties) 建立一個指定屏障數量的Phaser
Phaser(Phaser parent) 至關於 Phaser(parent, 0)
Phaser(Phaser parent, int parties) 建立一個指定屏障數量的Phaser,此phaser是註冊在另外一個Phaser parent下

方法摘要併發

方法名 描述
public int arrive() 到達此phaser的屏障點,使phaser的到達的線程數加一,但不會阻塞等待其餘線程。
返回:
phase值,即當前階段(週期)的索引,或者是負值(當Phaser 中止時)
public int arriveAndDeregister() 到達此phaser的屏障點,使phaser的到達的線程數加一,而且會取消一個屏障點的註冊。也不會阻塞等待其餘線程。
返回:
phase值,即當前階段(週期)的索引,或者是負值(當Phaser 中止時)
public int arriveAndAwaitAdvance() 到達此phaser的屏障點,而且阻塞等待其餘線程到達此屏障點。注意:這是
非中斷的阻塞
,此方法與awaitAdvance(arrive())等同。若是你但願阻塞機制支持timeout、interrupted響應,可使用相似的其餘方法(參見下文)。若是你但願到達後且註銷,並且阻塞等到當前phase下其餘的parties到達,可使用awaitAdvance(arriveAndDeregister())方法組合。
返回:
phase值,即當前階段(週期)的索引;若是Phaser 中止,則返回負值
public int awaitAdvance(int phase) 在指定的階段(週期)phase下等待其餘線程到達屏障點,注意:這是
非中斷的阻塞
。若是指定的phase與Phaser當前的phase不一致,或者Phaser 中止了,則當即返回。
參數 phase:
一般就是arrive()、arriveAndDeregister()的返回值;
public int awaitAdvanceInterruptibly(int phase)
throws InterruptedException
此方法是可中斷的,其餘與awaitAdvance()一致
public int awaitAdvanceInterruptibly(
int phase, long timeout,TimeUnit unit)
throws InterruptedException, TimeoutException
超時等待方法,其餘與awaitAdvance()一致
public int register() 新註冊一個party,致使Phaser內部registerPaties數量加1;若是此時onAdvance方法正在執行,此方法將會等待它執行完畢後纔會返回。此方法返回當前的phase週期數,若是Phaser已經中斷,將會返回負數。
public int bulkRegister(int parties) 批量註冊多個party,與register()類似
protected boolean onAdvance(int phase, int registeredParties) barrier action(屏障方法)。若是須要,則必須繼承Phaser類,重寫此方法。若是返回true表示此Phaser應該終止(此後將會把Phaser的狀態爲termination,即isTermination()將返回true。),不然能夠繼續進行。phase參數表示當前週期數,registeredParties表示當前已經註冊的parties個數。
默認實現爲:return registeredParties == 0;在不少狀況下,開發者能夠經過重寫此方法,來實現自定義的
public void forceTermination() 強制終止,此後Phaser對象將不可用,即register等將再也不有效。此方法將會致使Queue中全部的waiter線程被喚醒。這個方法對於在一個或多個任務遇到意外異常以後協調恢復是頗有用的。
public int getArrivedParties() 獲取已經到達的parties個數。
public int getUnarrivedParties() 獲取沒有到達的parties個數。
public Phaser getParent() 獲取其父親類Phaser,沒有則返回null
public Phaser getRoot() 返回該phaser的根祖先,若是沒有父類,返回此phaser。
public boolean isTerminated() 若是該phaser被終止,則返回true。

@ Example1 多階段(週期)、帶屏障事件示例

  例子很簡單,模擬跑步比賽的過程,分爲三個階段:一、參賽者到達起跑點,並在起跑點等待其餘參賽者;二、參賽者齊人後,開始準備,並等待槍聲。三、參賽這到達終點,並結束比賽,再也不等待任何狀況。ide

public class PhaserTest{

public static MyPhaser myPhaser = new MyPhaser();

    public static void main(String[] args) {
        MyPhaser myPhaser = new MyPhaser();
        // 一次性註冊5個party,即創建5個屏障點
        myPhaser.bulkRegister(5);
        for (int i = 0; i < 5; i++) {
            Thread runner = new Thread(new Runnable() {

                @Override
                public void run() {
                    // 第一階段(週期),phaser的週期數初始值爲0
                    System.out.println(Thread.currentThread().getName() + "到達了起跑點!");
                    // 到達了屏障點(起跑點),阻塞等待其餘線程
                    myPhaser.arriveAndAwaitAdvance();

                    // 繼續運行,將進入第二階段,phaser的週期數加一
                    System.out.println(Thread.currentThread().getName() + "準備起跑!");
                    // 到達了屏障點(準備起跑),阻塞等待其餘線程
                    myPhaser.arriveAndAwaitAdvance();

                    // 進入第三階段
                    System.out.println(Thread.currentThread().getName() + "到達了終點!");
                    // 參數者到達了終點,結束比賽,再也不等待其餘參賽者
                    myPhaser.arriveAndDeregister();// 取消註冊一個party
                }
            }, "參賽者" + i + "號");
            runner.start();
        }
    }
    }
複製代碼

MyPhaser類,定製 barrier action(屏障事件)
函數

public class MyPhaser extends Phaser {

    @Override
    //改寫onAdvance方法
    public boolean onAdvance(int phase, int registeredParties) {
        //判斷當前的Phaser是否終止
        if (!isTerminated()) {
            // 分紅三個階段,在不一樣的階段(週期),執行不一樣的屏障事件
            if (phase == 0) {
                // ....
                System.out.println("第一階段:全部參賽者都到達了起跑點!");
            } else if (phase == 1) {
                // ....
                System.out.println("第二階段:全部參賽者都已經就位,並準備好!比賽正式開始");
            } else if (phase == 2) {
                // ....
                System.out.println("第三階段:全部參賽者都到達終點,比賽結束!!");
            }
        }
        return super.onAdvance(phase, registeredParties);
    }
    }
複製代碼

運行結果:
工具

參賽者0號到達了起跑點!
參賽者3號到達了起跑點!
參賽者4號到達了起跑點!
參賽者2號到達了起跑點!
參賽者1號到達了起跑點!
第一階段:全部參賽者都到達了起跑點!
參賽者0號準備起跑!
參賽者1號準備起跑!
參賽者2號準備起跑!
參賽者3號準備起跑!
參賽者4號準備起跑!
第二階段:全部參賽者都已經就位,並準備好!比賽正式開始
參賽者4號到達了終點!
參賽者1號到達了終點!
參賽者0號到達了終點!
參賽者2號到達了終點!
參賽者3號到達了終點!
第三階段:全部參賽者都到達終點,比賽結束!
複製代碼

@ Example2 分層示例

下面的例子:每個Phaser週期類註冊的線程數目不能超過TASKS_PER_PHASER(例子中是4個),不然就要增長一層子phaser層。ui

public class PhaserTest6 {  
    // 
    private static final int  = 4;  
  
    public static void main(String args[]) throws Exception {  
        // 
        final int phaseToTerminate = 3;  
        //建立一個Phaser父類對象
        final Phaser phaser = new Phaser() {  
            @Override  
            protected boolean onAdvance(int phase, int registeredParties) { //屏障方法 
                System.out.println("====== " + phase + " ======");  
                return phase == phaseToTerminate || registeredParties == 0;  
            }  
        };  
          
        //建立10個任務 
        final Task tasks[] = new Task[10];  
        build(tasks, 0, tasks.length, phaser);  
        for (int i = 0; i < tasks.length; i++) {  
            System.out.println("starting thread, id: " + i);  
            final Thread thread = new Thread(tasks[i]);  
            thread.start();  
        }  
    }  
  
  //遞歸分層,
    public static void build(Task[] tasks, int lo, int hi, Phaser ph) {  
        
        //若是任務的數量超過每一層的phaser的閾值TASKS_PER_PHASER,則要繼續分層
        if (hi - lo > TASKS_PER_PHASER) {  
            for (int i = lo; i < hi; i += TASKS_PER_PHASER) {  
                int j = Math.min(i + TASKS_PER_PHASER, hi);  
                //當前的phaser(ph)做爲父週期,來建立一個子phaser
                build(tasks, i, j, new Phaser(ph));  
            }  
        } else { 
            //線程的數量在閾值內,無需分紅,能夠直接註冊線程到當前的Phaser
            for (int i = lo; i < hi; ++i)  
                tasks[i] = new Task(i, ph);  
        }  
    }  
  
    public static class Task implements Runnable {  
        // 
        private final int id;  
        private final Phaser phaser;  
  
        public Task(int id, Phaser phaser) {  
            this.id = id;  
            this.phaser = phaser;  
            this.phaser.register();  
        }  
  
        @Override  
        public void run() {  
            while (!phaser.isTerminated()) {  
                try {  
                    Thread.sleep(200);  
                } catch (InterruptedException e) {  
                    // NOP 
                }  
                System.out.println("in Task.run(), phase: " + phaser.getPhase()    + ", id: " + this.id);  
                phaser.arriveAndAwaitAdvance();  
            }  
        }  
    }  
}  
複製代碼

須要注意的是,TASKS_PER_PHASER的值取決於具體的Task實現。對於Task執行時間很短的場景(也就是競爭相對激烈),能夠考慮使用較小的TASKS_PER_PHASER值,例如4。反之能夠適當增大this

運行結果:spa

in Task.run(), phase: 0, id: 2
in Task.run(), phase: 0, id: 1
in Task.run(), phase: 0, id: 3
in Task.run(), phase: 0, id: 0
in Task.run(), phase: 0, id: 8
in Task.run(), phase: 0, id: 5
in Task.run(), phase: 0, id: 9
in Task.run(), phase: 0, id: 7
in Task.run(), phase: 0, id: 4
in Task.run(), phase: 0, id: 6
====== 0 ======
in Task.run(), phase: 1, id: 9
in Task.run(), phase: 1, id: 6
in Task.run(), phase: 1, id: 1
in Task.run(), phase: 1, id: 7
in Task.run(), phase: 1, id: 8
in Task.run(), phase: 1, id: 5
in Task.run(), phase: 1, id: 0
in Task.run(), phase: 1, id: 4
in Task.run(), phase: 1, id: 3
in Task.run(), phase: 1, id: 2
====== 1 ======
in Task.run(), phase: 2, id: 6
in Task.run(), phase: 2, id: 0
in Task.run(), phase: 2, id: 2
in Task.run(), phase: 2, id: 3
in Task.run(), phase: 2, id: 7
in Task.run(), phase: 2, id: 5
in Task.run(), phase: 2, id: 8
in Task.run(), phase: 2, id: 9
in Task.run(), phase: 2, id: 1
in Task.run(), phase: 2, id: 4
====== 2 ======
in Task.run(), phase: 3, id: 3
in Task.run(), phase: 3, id: 4
in Task.run(), phase: 3, id: 9
in Task.run(), phase: 3, id: 5
in Task.run(), phase: 3, id: 8
in Task.run(), phase: 3, id: 1
in Task.run(), phase: 3, id: 7
in Task.run(), phase: 3, id: 0
in Task.run(), phase: 3, id: 2
in Task.run(), phase: 3, id: 6
====== 3 ======
複製代碼

文章源地址:https://www.cnblogs.com/jinggod/p/8494624.html

相關文章
相關標籤/搜索