在併發編程時總會遇到一種這樣的場景:等待一系列任務作完後,才能開始作某個任務。當遇到這種場景時,兩個類cross our mind:CountDownLatch和CyclicBarrier。下面從使用方法和內部實現原理分別對這兩個類作出介紹。java
class MyThread extends Thread{ private CountDownLatch latch; public MyThread(CountDownLatch latch) { this.latch = latch; } @Override public void run() { try { System.out.println(Thread.currentThread().getName()); Thread.sleep(100); // 任務完成 state - 1 latch.countDown(); } catch (InterruptedException e) { e.printStackTrace(); }finally { latch.countDown(); } } }
在完成每個任務後,latch中的int數字作減一操做。編程
public class CountDownLatchTest { @Test public void main() { // 初始化值爲3 CountDownLatch latch = new CountDownLatch(3); // 啓動3個任務 for (int i = 3; i > 0; i --) { new MyThread(latch).start(); } try { // 等待三個任務完成 latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("count = " + latch.getCount()); System.out.println("finished"); } }
主線程中啓動了三個子線程,而後調用了latch.await()方法。併發
Thread-1 Thread-0 Thread-2 count = 0 finished
從輸出結果能夠看出,主線程在等待三個子線程完成任務以後才結束的。app
public class NormalTask implements Runnable { CyclicBarrier barrier; NormalTask(CyclicBarrier barrier) { this.barrier = barrier; } @Override public void run() { try { Thread.sleep(100); barrier.await(); System.out.println(System.currentTimeMillis() + " first step finished"); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } } }
每一個任務完成須要100ms。ide
public class FinalTask implements Runnable { @Override public void run() { try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(System.currentTimeMillis() + " second step finished"); } }
後完成的任務須要10ms。函數
主線程啓動了兩個先執行的線程,將後完成的線程做爲參數傳入CyclicBarrier。oop
@Test public void testInterruptException() throws InterruptedException { // 主線程做爲參數傳入,主線程須要等待子線程完成 CyclicBarrier barrier = new CyclicBarrier(2,new FinalTask()); new Thread(new NormalTask(barrier)).start(); new Thread(new NormalTask(barrier)).start(); Thread.sleep(300); }
運行結果測試
1543326017854 first step finished 1543326017854 first step finished 1543326017870 second step finished
從運行結構能夠看出,先啓動的任務幾乎同時完成,然後完成的任務結束時間比前兩個線程完成時間晚16ms,其中6ms是啓動線程所花費的。主線程中sleep 300ms 是爲了等待全部的線程都執行完成。也可使用join實現相同的效果。在這裏解釋一下爲何不能像CountDownLatch同樣用主線程做爲等待線程。我剛開始也是這樣作的,發現主線程一下就跑完了,根本不停。查看了源碼才發現,CyclicBarrier沒有park主線程。具體邏輯相見下文的原理分析。this
兩個類均可以實現一個任務等待其餘幾個任務完成後再執行。線程
兩個類都是在初始化時,傳入一個整形數字,表示須要等待幾個任務完成後才能開始執行等待的任務。可是其底層實現的原理徹底不一樣。下面對兩個類的實現原理作具體介紹。
CountDownLatch park 的是主線程,是主線程和全部的子線程在競爭同一把鎖。可是初始化時,他把鎖默認給了子線程(將AQS中的state 置爲須要等待的子線程的個數)。
Sync(int count) { setState(count); } public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); }
而主線程在調用await方法時,先檢測state是否爲0,若是=0 就不用park了,這時說明子線程都已完成了。若是!= 0。則park。
每一個子線程在執行完任務後,將state使用cas的方式減1,並嘗試取喚醒(unpark)主線程。
public void countDown() { sync.releaseShared(1); } public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } // 經過cas將state減1 若是state = 0 則調用doReleaseShared喚醒AQS隊列中的主線程 protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } }
上面完整的介紹了CountDownLatch的工做原理。
爲了與CountDwonLatch 對比,也爲了方便描述問題,咱們將先執行的任務叫作子線程,將後執行的任務叫作主線程。
CyclicBarrier 在初始化時將int值不但賦值給了state,其內部也留了一個備份,這就是CyclicBarrier能夠調用reset從新使用的一個緣由。並且其內部是在可重入鎖ReentrantLock和Condition的基礎上實現的,在其代碼內部幾乎看不到CAS代碼,看到更多的是重入鎖的lock和unlock以及Condition的await和singal。
public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; }
其中的parties就是子線程個數的備份,而barrierAction無關緊要。
在子任務完成後就會調用await方法:
public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } }
其核心邏輯在dowait方法中。dowait的核心邏輯是,先上鎖,然後檢查異常,若是有線程拋出過異常則當前線程也拋出異常。
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { final Generation g = generation; if (g.broken) throw new BrokenBarrierException(); if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } ....
若是沒有線程拋出異常,將count減一,並檢查count是否爲0 若是不爲0 將當前的線程放入Condition的等待隊列。若是等於0 則喚醒以前的全部線程。
int index = --count; if (index == 0) { // 若是等於0說明全部的任務都已完成,喚醒全部Condition中的線程。 boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out for (;;) { try { if (!timed) trip.await(); // 放入Condition隊列中 else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) {
至此,CyclicBarrier的原理頁介紹完成了。
經過以上分析能夠得出,CountDownLatch 更適合一個任務等待一些任務執行完成後再執行,而CyclicBarrier更適合保證一批任務同時結束。