Java併發之CyclicBarrier、CountDownLatch、Phaser

在Java多線程編程中,常常會須要咱們控制併發流程,等其餘線程執行完畢,或者分階段執行。Java在1.5的juc中引入了CountDownLatchCyclicBarrier,1.7中又引入了Phaserjava

CountDownLatch

A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.編程

一個或多個線程等待其餘線程完成一系列操做後才執行。多線程

流程圖併發

基本使用:dom

使用兩個 countdown latches的示例。ide

第一個是開始信號,在發出執行命令前,阻止線程開始執行。函數

第二個是完成信號,直到全部線程執行完畢,主線程再開始執行。flex

class Driver { // ...
    void main() throws InterruptedException {
        CountDownLatch startSignal = new CountDownLatch(1);
        CountDownLatch doneSignal = new CountDownLatch(N);

        for (int i = 0; i < N; ++i) // create and start threads
            new Thread(new Worker(startSignal, doneSignal)).start();

        doSomethingElse();            // don't let run yet
        startSignal.countDown();      // let all threads proceed
        doSomethingElse();
        doneSignal.await();           // wait for all to finish
    }
}

class Worker implements Runnable {
    private final CountDownLatch startSignal;
    private final CountDownLatch doneSignal;
    Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
        this.startSignal = startSignal;
        this.doneSignal = doneSignal;
    }
    public void run() {
        try {
            startSignal.await();
            doWork();
            doneSignal.countDown();
        } catch (InterruptedException ex) {} // return;
    }

    void doWork() { ... }
}}

CyclicBarrier

A synchronizati on aid that allows a set of threads to all wait for each other to reach a common barrier point.this

多個線程互相等待,直到到達同一個同步點,再繼續一塊兒執行。CyclicBarrier適用於多個線程有固定的多步須要執行,線程間互相等待,當都執行完了,在一塊兒執行下一步。線程

流程圖:

基本使用:

public class CyclicBarrierTest {

    static CyclicBarrier c = new CyclicBarrier(2, new A());

    public static void main(String[] args) {
        new Thread(new Runnable() {

            @Override
            public void run() {
                try {
                    c.await();
                } catch (Exception e) {

                }
                System.out.println(1);
            }
        }).start();

        try {
            c.await();
        } catch (Exception e) {

        }
        System.out.println(2);
    }

    static class A implements Runnable {
        @Override
        public void run() {
            System.out.println(3);
        }
    }
}

以上的例子利用了CyclicBarrier提供的一個更高級的構造函數CyclicBarrier(int parties, Runnable barrierAction),用於在線程到達屏障時,優先執行barrierAction,方便處理更復雜的業務場景。

Phaser

A reusable synchronization barrier, similar in functionality to* {@link java.util.concurrent.CyclicBarrier CyclicBarrier} and* {@link java.util.concurrent.CountDownLatch CountDownLatch}* but supporting more flexible usage.

Phaser擁有與CyclicBarrierCountDownLatch相似的功勞.可是這個類提供了更加靈活的應用。它支持任務在多個點都進行同步,支持動態調整註冊任務的數量。固然你也能夠使用CountDownLatch,但你必須建立多個CountDownLatch對象,每一階段都須要一個CountDownLatch對象。

流程圖:

基本使用:

public class Match {

    // 模擬了100米賽跑,10名選手,只等裁判一聲令下。當全部人都到達終點時,比賽結束。
    public static void main(String[] args) throws InterruptedException {

        final Phaser phaser=new Phaser(1) ;
        // 十名選手
        for (int index = 0; index < 10; index++) {
            phaser.register();
            new Thread(new player(phaser),"player"+index).start();
        }
        System.out.println("Game Start");
        //註銷當前線程,比賽開始
        phaser.arriveAndDeregister();
        //是否非終止態一直等待
        while(!phaser.isTerminated()){
        }
        System.out.println("Game Over");
    }
}
class player implements Runnable{

    private  final Phaser phaser ;

    player(Phaser phaser){
        this.phaser=phaser;
    }
    @Override
    public void run() {
        try {
            // 第一階段——等待建立好全部線程再開始
            phaser.arriveAndAwaitAdvance();

            // 第二階段——等待全部選手準備好再開始
            Thread.sleep((long) (Math.random() * 10000));
            System.out.println(Thread.currentThread().getName() + " ready");
            phaser.arriveAndAwaitAdvance();

            // 第三階段——等待全部選手準備好到達,到達後,該線程從phaser中註銷,不在進行下面的階段。
            Thread.sleep((long) (Math.random() * 10000));
            System.out.println(Thread.currentThread().getName() + " arrived");
            phaser.arriveAndDeregister();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
相關文章
相關標籤/搜索