CountDownLatch與CyclicBarrier對比

在併發編程時總會遇到一種這樣的場景:等待一系列任務作完後,才能開始作某個任務。當遇到這種場景時,兩個類cross our mind:CountDownLatch和CyclicBarrier。下面從使用方法和內部實現原理分別對這兩個類作出介紹。java

使用方法

CountDownLatch

任務

class MyThread extends Thread{

    private CountDownLatch latch;

    public MyThread(CountDownLatch latch) {
        this.latch = latch;
    }

    @Override
    public void run() {
        try {
            System.out.println(Thread.currentThread().getName());
            Thread.sleep(100);
            // 任務完成 state - 1
            latch.countDown();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            latch.countDown();
        }
    }
}

在完成每個任務後,latch中的int數字作減一操做。編程

測試

public class CountDownLatchTest {
    @Test
    public void main() {
        // 初始化值爲3
        CountDownLatch latch = new CountDownLatch(3);
        // 啓動3個任務
        for (int i = 3; i > 0; i --) {
            new MyThread(latch).start();
        }
        try {
            // 等待三個任務完成
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("count = " + latch.getCount());
        System.out.println("finished");
    }
}

主線程中啓動了三個子線程,而後調用了latch.await()方法。併發

輸出結果

Thread-1
Thread-0
Thread-2
count = 0
finished

從輸出結果能夠看出,主線程在等待三個子線程完成任務以後才結束的。app

CyclicBarrier

先完成的任務

public class NormalTask implements Runnable {
    CyclicBarrier barrier;

    NormalTask(CyclicBarrier barrier) {
        this.barrier = barrier;
    }
    @Override
    public void run() {

        try {
            Thread.sleep(100);
            barrier.await();
            System.out.println(System.currentTimeMillis() + " first step finished");
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }
    }
}

每一個任務完成須要100ms。ide

後完成的任務

public class FinalTask implements Runnable {
    @Override
    public void run() {
        try {
            Thread.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(System.currentTimeMillis() + " second step finished");
    }
}

後完成的任務須要10ms。函數

主線程

主線程啓動了兩個先執行的線程,將後完成的線程做爲參數傳入CyclicBarrier。oop

@Test
public void testInterruptException() throws InterruptedException {
    // 主線程做爲參數傳入,主線程須要等待子線程完成
    CyclicBarrier barrier = new CyclicBarrier(2,new FinalTask());
    new Thread(new NormalTask(barrier)).start();
    new Thread(new NormalTask(barrier)).start();
    Thread.sleep(300);
}

運行結果測試

1543326017854 first step finished
1543326017854 first step finished
1543326017870 second step finished

從運行結構能夠看出,先啓動的任務幾乎同時完成,然後完成的任務結束時間比前兩個線程完成時間晚16ms,其中6ms是啓動線程所花費的。主線程中sleep 300ms 是爲了等待全部的線程都執行完成。也可使用join實現相同的效果。在這裏解釋一下爲何不能像CountDownLatch同樣用主線程做爲等待線程。我剛開始也是這樣作的,發現主線程一下就跑完了,根本不停。查看了源碼才發現,CyclicBarrier沒有park主線程。具體邏輯相見下文的原理分析。this

相同點

兩個類均可以實現一個任務等待其餘幾個任務完成後再執行。線程

不一樣點

  • 任務中調用的接口不一樣:CountDownLatch在任務完成後調用的是countDown函數。在等待線程中調用了await方法。而CyclicBarrier只在先開始的任務中調用了await方法。後運行任務中沒有涉及到任何和CyclicBarrier的信息。
  • CountDownLatch 在完成全部的操做後不可重用了,可是CyclicBarrier能夠在完成任務或者有線程拋出異常後調用reset方法繼續使用,這應該是這個類叫作循環屏障的緣由。

原理

兩個類都是在初始化時,傳入一個整形數字,表示須要等待幾個任務完成後才能開始執行等待的任務。可是其底層實現的原理徹底不一樣。下面對兩個類的實現原理作具體介紹。

CountDownLatch

CountDownLatch park 的是主線程,是主線程和全部的子線程在競爭同一把鎖。可是初始化時,他把鎖默認給了子線程(將AQS中的state 置爲須要等待的子線程的個數)。

Sync(int count) {
    setState(count);
}
public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

而主線程在調用await方法時,先檢測state是否爲0,若是=0 就不用park了,這時說明子線程都已完成了。若是!= 0。則park。
每一個子線程在執行完任務後,將state使用cas的方式減1,並嘗試取喚醒(unpark)主線程。

public void countDown() {
    sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}
// 經過cas將state減1 若是state = 0 則調用doReleaseShared喚醒AQS隊列中的主線程
protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        if (c == 0)
            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

上面完整的介紹了CountDownLatch的工做原理。

CyclicBarrier

爲了與CountDwonLatch 對比,也爲了方便描述問題,咱們將先執行的任務叫作子線程,將後執行的任務叫作主線程。
CyclicBarrier 在初始化時將int值不但賦值給了state,其內部也留了一個備份,這就是CyclicBarrier能夠調用reset從新使用的一個緣由。並且其內部是在可重入鎖ReentrantLock和Condition的基礎上實現的,在其代碼內部幾乎看不到CAS代碼,看到更多的是重入鎖的lock和unlock以及Condition的await和singal。

public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction;
}

其中的parties就是子線程個數的備份,而barrierAction無關緊要。
在子任務完成後就會調用await方法:

public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }

其核心邏輯在dowait方法中。dowait的核心邏輯是,先上鎖,然後檢查異常,若是有線程拋出過異常則當前線程也拋出異常。

private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
            TimeoutException {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        final Generation g = generation;

        if (g.broken)
            throw new BrokenBarrierException();

        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }
        ....

若是沒有線程拋出異常,將count減一,並檢查count是否爲0 若是不爲0 將當前的線程放入Condition的等待隊列。若是等於0 則喚醒以前的全部線程。

int index = --count;
    if (index == 0) {  // 若是等於0說明全部的任務都已完成,喚醒全部Condition中的線程。
        boolean ranAction = false;
        try {
            final Runnable command = barrierCommand;
            if (command != null)
                command.run();
            ranAction = true;
            nextGeneration();
            return 0;
        } finally {
            if (!ranAction)
                breakBarrier();
        }
    }

    // loop until tripped, broken, interrupted, or timed out
    for (;;) {
        try {
            if (!timed)
                trip.await(); // 放入Condition隊列中
            else if (nanos > 0L)
                nanos = trip.awaitNanos(nanos);
        } catch (InterruptedException ie) {

至此,CyclicBarrier的原理頁介紹完成了。

不一樣點

  • CountDownLatch 在AQS隊列中park的是主線程,而CyclicBarrier在AQS中park的是全部的子線程。
  • CountDownLatch 是放到AQS隊列中,而CyclicBarrier是將子線程放到Condition隊列中。
  • CountDownLatch 喚醒的是主線程,而CyclicBarrier 是經過singleAll函數,將全部的子線程移動到AQS隊列中,而後再開始執行。

總結

經過以上分析能夠得出,CountDownLatch 更適合一個任務等待一些任務執行完成後再執行,而CyclicBarrier更適合保證一批任務同時結束。

相關文章
相關標籤/搜索