Java併發編程中級篇(五):更強大的多階段併發控制Phaser

Java API還提供了一個強大的同步輔助類Pahser,它能夠控制多階段併發輔助任務。當咱們有併發任務,而且須要分階段執行,每階段都須要等待全部線程執行本階段執行完畢纔可以繼續執行,這種機制就很是好用。Phaser類一樣須要一個整形做爲初始化參數來肯定有幾個線程參與執行。java

下面咱們來看一個例子,在這個例子中咱們把一個併發任務分爲三個階段,每一階段都須要全部線程完成後才能繼續執行下一階段的任務。併發

咱們來定義一個帶有Phaser機制的線程類PhaserRunnable,在線程開始運行的時候調用arriverAndAwaitAdvance()方法來表明線程已經進入執行狀態,其實這個也能夠算做一步,就是等待全部任務線程都啓動後你們一塊兒執行。而後開始執行第一步任務,每一個線程隨機休眠一段時間來模擬任務執行耗時,執行完畢後調用arriverAndAwaitAdvance()來表示任務執行完畢,而後等待其餘線程,等全部線程都滴啊用arriverAndAwaitAdvance()方法後,全部休眠的線程都被喚醒而後繼續執行第二步。也是隨機休眠一段時間後,第二部執行完畢,可是這裏有一個不一樣,若是休眠時間是一個雙數那麼線程將再也不執行第三步操做而是直接返回,這裏要調用phaser.arriveAndDeregister()方法來表示線程已經結束,之後再也不須要等待我了。dom

public class PhaserRunnable implements Runnable{
    private Phaser phaser;

    public PhaserRunnable(Phaser phaser) {
        this.phaser = phaser;
    }

    @Override
    public void run() {
        phaser.arriveAndAwaitAdvance();
        System.out.printf("%s: start.\n", Thread.currentThread().getName());

        long duration1 = (long) (Math.random() * 10) + 1;
        System.out.printf("%s: step1 start duration %d seconds.\n", Thread.currentThread().getName(), duration1);
        try {
            TimeUnit.SECONDS.sleep(duration1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.printf("%s: step1 done.\n", Thread.currentThread().getName());
        phaser.arriveAndAwaitAdvance();

        long duration2 = (long) (Math.random() * 10) + 1;
        System.out.printf("%s: step2 start duration %d seconds.\n", Thread.currentThread().getName(), duration2);
        try {
            TimeUnit.SECONDS.sleep(duration2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        if (duration2 % 2 == 0) {
            System.out.printf("%s: step2 done and task finished.\n", Thread.currentThread().getName());
            phaser.arriveAndDeregister();
            return;
        } else {
            System.out.printf("%s: step2 done.\n", Thread.currentThread().getName());
            phaser.arriveAndAwaitAdvance();
        }

        long duration3 = (long) (Math.random() * 10) + 1;
        System.out.printf("%s: step3 start duration %d seconds.\n", Thread.currentThread().getName(), duration3);
        try {
            TimeUnit.SECONDS.sleep(duration3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.printf("%s: step3 done.\n", Thread.currentThread().getName());
        phaser.arriveAndDeregister();
    }
}

主方法類中啓動三個任務線程,若是第二步都是休眠單數秒你能夠嘗試多運行幾回,最後打印Phaser狀態。ide

public class Main {
    public static void main(String[] args) {
        Phaser phaser = new Phaser(3);

        Thread[] threads = new Thread[3];

        for (int i = 0; i < 3; i++) {
            threads[i] = new Thread(new PhaserRunnable(phaser));
            threads[i].start();
        }

        for (int i = 0; i < 3; i++) {
            try {
                threads[i].join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        System.out.printf("%s: Phaser terminated.\n", Thread.currentThread().getName());
    }
}

查看控制檯日誌: this

Thread-2: start.
Thread-1: start.
Thread-0: start.
Thread-2: step1 start duration 9 seconds.
Thread-0: step1 start duration 2 seconds.
Thread-1: step1 start duration 6 seconds.
Thread-0: step1 done.
Thread-1: step1 done.
Thread-2: step1 done.
Thread-2: step2 start duration 7 seconds.
Thread-0: step2 start duration 2 seconds.
Thread-1: step2 start duration 1 seconds.
Thread-1: step2 done.
Thread-0: step2 done and task finished.
Thread-2: step2 done.
Thread-2: step3 start duration 3 seconds.
Thread-1: step3 start duration 7 seconds.
Thread-2: step3 done.
Thread-1: step3 done.
main: Phaser terminated.

Phaser類有兩種狀態:Active和Termination。有任務參與的時候Phaser狀態爲Active;當全部參與同步的線程都結束後Phaser也就沒有參與者了,這時Phaser進入了Termination態。當Phaser處於終止態,同步方法arriveAndAwaitAdvance()會當即返回。線程

Phaser類的一個重大特性就是沒必要對它的方法進行異常處理。不像其餘通不輔助類,被Phaser類置於休眠狀態的線程不會響應中斷事件,也不會拋出InterruptedException異常。日誌

Phaser類還提供了一些改變Phaser對象的方法,這些方法以下。code

  • arriver():這個方法通知phaser對象一個參與者已經完成了當前階段,可是他等待其餘參與者都完成當前階段。必須當心使用這個方法,由於他不會與其它線程同步。
  • awaitAdvance(int phase):若是傳入的階段參數與當前階段一致,那麼這個方法會將當前線程置於休眠,直到這個階段的全部參與者都完成運行。若是傳入階段參數與當前階段不一致,這個方法會當即返回。
  • awaitAdvanceInterruptibly(int phaser):這個方法跟awaitAdvance(int phase)同樣,不一樣之處在於,若是這個方法中休眠的線程被中斷,它將拋出InterruptedException異常。

Phaser類能夠動態地改變參與線程的數量:對象

  • register():這個方法能夠將一個新的線程註冊到Phaser中,這個新的參與者被當成本階段的任務來執行。
  • bulkRegister(int Parties):這個方法將指定數目的參與者註冊到Phaser中,全部這些新的參與者都將被當成本階段的任務來執行。

Phaser提供了一個方法forceTermination()方法來強制Phaser進入Termination狀態,這個方法無論Phaser中是否還有註冊的線程。當一個參與的線程出現錯誤,強制phaser終止是有意義的。當一個Phaser處於Termination狀態的時候,awaitAdvance()和arriveAndAwaitAdvance()方法都馬上返回一個負數,這樣就能夠驗證Phaser狀態是否是終止了,能夠根據這個狀態來終止線程的執行,直接返回或者作一些處理,好比數據回滾什麼的。事件

相關文章
相關標籤/搜索