CyclicBarrier能夠理解爲Cyclic + Barrier, 可循環使用 + 屏障嘛。java
可讓一組線程所有到達一個屏障【同步點】,再所有衝破屏障,繼續向下執行。編程
public class CycleBarrierTest2 { private static final CyclicBarrier cyclicBarrier = new CyclicBarrier( 2, // 計數器的初始值 new Runnable() { // 計數器值爲0時須要執行的任務 @Override public void run () { System.out.println(Thread.currentThread() + " tripped ~"); } } ); public static void main (String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(2); executorService.submit(new Runnable() { @SneakyThrows @Override public void run () { Thread thread = Thread.currentThread(); System.out.println(thread + " step 1"); cyclicBarrier.await(); System.out.println(thread + " step 2"); cyclicBarrier.await(); System.out.println(thread + " step 3"); } }); executorService.submit(new Runnable() { @SneakyThrows @Override public void run () { Thread thread = Thread.currentThread(); System.out.println(thread + " step 1"); cyclicBarrier.await(); System.out.println(thread + " step 2"); cyclicBarrier.await(); System.out.println(thread + " step 3"); } }); executorService.shutdown(); } }
測試結果以下:併發
Thread[pool-1-thread-2,5,main] step 1 Thread[pool-1-thread-1,5,main] step 1 Thread[pool-1-thread-1,5,main] tripped ~ Thread[pool-1-thread-1,5,main] step 2 Thread[pool-1-thread-2,5,main] step 2 Thread[pool-1-thread-2,5,main] tripped ~ Thread[pool-1-thread-2,5,main] step 3 Thread[pool-1-thread-1,5,main] step 3
多個線程之間是相互等待的,加入當前計數器值爲N,以後N-1個線程調用await方法都會達到屏障點而阻塞,只有當第N個線程調用await方法時,計數器值爲0,第N個線程纔會喚醒以前等待的全部線程,再一塊兒向下執行。app
CyclicBarrier是可複用的,全部線程達到屏障點以後,CyclicBarrier會被重置。ide
public class CyclicBarrier { private static class Generation { boolean broken = false; } /** 獨佔鎖保證同步 */ private final ReentrantLock lock = new ReentrantLock(); /** condition實現等待通知機制 */ private final Condition trip = lock.newCondition(); /** 記錄線程個數 */ private final int parties; /* 達到屏障點執行的任務 */ private final Runnable barrierCommand; /** The current generation */ private Generation generation = new Generation(); /** * 記錄仍在等待的parties數量, 每一代count都會從初始的parties遞減至0 */ private int count; // 指定barrierAction, 在線程達到屏障後,優先執行barrierAction public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; } // 指定parties, 但願屏障攔截的線程數量 public CyclicBarrier(int parties) { this(parties, null); } }
CyclicBarrier是可複用的,所以使用兩個變量記錄線程個數,count變爲0時,會將parties賦值給count,進行復用。工具
本篇文章閱讀須要創建在必定獨佔鎖,Condition條件機制的基礎之上,這邊推薦幾篇前置文章,能夠瞅一眼:oop
CyclicBarrier是可複用的,Generation用於標記更新換代。性能
// 屏障的每一次使用都會生成一個新的Generation實例: 多是 tripped or reset private static class Generation { boolean broken = false; }
更新換代: 首先標記一下當前這代不用了, 而後換一個新的。學習
public void reset() { final ReentrantLock lock = this.lock; lock.lock(); try { breakBarrier(); // break掉當前的 nextGeneration(); // 開啓一個新的 } finally { lock.unlock(); } }
標記一下broken爲true,喚醒一下await等待線程,重置count。測試
private void breakBarrier() { // 標記broken 爲true generation.broken = true; // 重置count count = parties; // 喚醒因await等待的線程 trip.signalAll(); }
喚醒一下await等待線程,重置count,更新爲下一代。
private void nextGeneration() { // 喚醒因await等待的線程 trip.signalAll(); // 重置count,意味着下一代了 count = parties; // 下一代了 generation = new Generation(); }
當前線程調用await方法時會阻塞,除非遇到如下幾種狀況:
它內部調用了int dowait(boolean timed, long nanos)
,詳細解析往下面翻哈。
public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } }
相比於普通的await()方法,該方法增長了超時的控制,你懂的。
增長了一項:若是超時了,返回false。
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException { return dowait(true, unit.toNanos(timeout)); }
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { // 獲取獨佔鎖 final ReentrantLock lock = this.lock; lock.lock(); try { // 與當前屏障點關聯的Generation final Generation g = generation; // broken標誌爲true,則異常 if (g.broken) throw new BrokenBarrierException(); // 若是被打斷,則breakBarrier,並拋出異常 if (Thread.interrupted()) { // 打破: 1 標記broken爲true 2 重置count 3 喚醒await等待的線程 breakBarrier(); throw new InterruptedException(); } int index = --count; // 說明已經到達屏障點了 if (index == 0) { // tripped boolean ranAction = false; try { final Runnable command = barrierCommand; // 執行一下任務 if (command != null) command.run(); ranAction = true; // 更新: 1 喚醒await等待的線程 2 更新Generation nextGeneration(); return 0; } finally { // 執行失敗了,可能被打斷了 if (!ranAction) breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out // 死循環, 結束的狀況有:到達屏障點, broken了, 中斷, 超時 for (;;) { try { // 超時控制 if (!timed) trip.await(); else if (nanos > 0L) // awaitNanos阻塞一段時間 nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { // 標記broken爲true breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } // 正常被喚醒, 再次檢查當前這一代是否已經標記了broken if (g.broken) throw new BrokenBarrierException(); // 最後一個線程在等待線程醒來以前,已經經過nextGeneration將generation更新 if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } }
以parties爲N爲例,咱們來看看這一流程。
線程調用dowait方法後,首先會獲取獨佔鎖lock。若是是前N-1個線程,因爲index != 0
,會在條件隊列中等待trip.await() or trip.awaitNanos(nanos)
,會相應釋放鎖。
第N個線程調用dowait以後,此時index == 0
,將會執行命令command.run()
,而後調用nextGeneration()
更新換代,同時喚醒全部條件隊列中等待的N-1個線程。
第N個線程釋放鎖,後續被喚醒的線程移入AQS隊列,陸續獲取鎖,釋放鎖。
CountDownLatch基於AQS,state表示計數器的值,在構造時指定。CyclicBarrier基於ReentrantLock獨佔鎖與Condition條件機制實現屏障邏輯。
CountDownLatch的計數器只能使用一次,而CyclicBarrier的計數器能夠使用reset()方法重置,可複用性可以處理更爲複雜【分段任務有序執行】的業務場景。
CyclicBarrier還提供了其餘有用的方法,如getNumberWaiting
方法能夠得到CyclicBarrier阻塞的線程數量。isBroken()
方法用來了解阻塞的線程是否被中斷。
CyclicBarrier = Cyclic + Barrier, 可重用 + 屏障,可讓一組線程所有到達一個屏障【同步點】,再所有衝破屏障,繼續向下執行。
CyclicBarrier基於ReentrantLock獨佔鎖與Condition條件機制實現屏障邏輯。
CyclicBarrier須要指定parties【N】以及可選的任務,當N - 1個線程調用await的時候,會在條件隊列中阻塞,直到第N個線程調用await,執行指定的任務後,喚醒N - 1個等待的線程,並重置Generation,更新count。