CountdownLatch,CyclicBarrier是很是經常使用併發工具類,能夠說是Java工程師必會技能了。不但在項目實戰中常常涉及,並且在編寫壓測程序,多線程demo也是必不可少,因此掌握它們的用法和實現原理很是有必要。java
年年不忘,必有迴響!
點贊走一走,找到女友~nginx
CountDownLatch容許一個或多個線程等待其餘線程完成操做。也就是說經過使用CountDownLatch工具類,可讓一組線程等待彼此執行完畢後在共同執行下一個操做。具體流程以下圖所示,箭頭表示任務,矩形表示柵欄,當三個任務都到達柵欄時,柵欄後wait的任務纔開始執行。多線程
CountDownLatch維護有個int型的狀態碼,每次調用countDown時狀態值就會減1;調用wait方法的線程會阻塞,直到狀態碼爲0時纔會繼續執行。併發
在多線程協同工做時,可能須要等待其餘線程執行完畢以後,主線程才接着往下執行。首先咱們可能會想到使用線程的join方法(調用join方法的線程優先執行,該線程執行完畢後纔會執行其餘線程),顯然這是能夠完成的。框架
public class RunningRaceTest { public static void main(String[] args) throws InterruptedException { Thread runner1 = new Thread(new Runner(), "1號"); Thread runner2 = new Thread(new Runner(), "2號"); Thread runner3 = new Thread(new Runner(), "3號"); Thread runner4 = new Thread(new Runner(), "4號"); Thread runner5 = new Thread(new Runner(), "5號"); runner1.start(); runner2.start(); runner3.start(); runner4.start(); runner5.start(); runner1.join(); runner2.join(); runner3.join(); runner4.join(); runner5.join(); // 裁判等待5名選手準備完畢 System.out.println("裁判:比賽開始~~"); } } class Runner implements Runnable { @Override public void run() { try { int sleepMills = ThreadLocalRandom.current().nextInt(1000); Thread.sleep(sleepMills); System.out.println(Thread.currentThread().getName() + " 選手已就位, 準備共用時: " + sleepMills + "ms"); } catch (InterruptedException e) { e.printStackTrace(); } } }
Thread.join()徹底能夠實現這個需求,不過存在一個問題,若是調用join的線程一直存活,則當前線程則須要一直等待。這顯然不夠靈活,而且當前線程可能會出現死等的狀況。dom
jdk1.5以後的併發包中提供了CountDownLatch併發工具了,也能夠實現join的功能,而且功能更增強大。ide
// 參賽選手線程 class Runner implements Runnable { private CountDownLatch countdownLatch; public Runner(CountDownLatch countdownLatch) { this.countdownLatch = countdownLatch; } @Override public void run() { try { int sleepMills = ThreadLocalRandom.current().nextInt(1000); Thread.sleep(sleepMills); System.out.println(Thread.currentThread().getName() + " 選手已就位, 準備共用時: " + sleepMills + "ms"); } catch (InterruptedException e) { e.printStackTrace(); } finally { // 準備完畢,舉手示意 countdownLatch.countDown(); } } } public class RunningRaceTest { public static void main(String[] args) throws InterruptedException { // 使用線程池的正確姿式 int size = 5; AtomicInteger counter = new AtomicInteger(); ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(size, size, 1000, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100), (r) -> new Thread(r, counter.addAndGet(1) + " 號 "), new ThreadPoolExecutor.AbortPolicy()); CountDownLatch countDownLatch = new CountDownLatch(5); for (int i = 0; i < size; i++) { threadPoolExecutor.submit(new Runner(countDownLatch)); } // 裁判等待5名選手準備完畢 countDownLatch.await(); // 爲了不死等,也能夠添加超時時間 System.out.println("裁判:比賽開始~~"); threadPoolExecutor.shutdownNow(); } }
輸出結果:工具
5 號 選手已就位, 準備共用時: 20ms 4 號 選手已就位, 準備共用時: 156ms 1 號 選手已就位, 準備共用時: 288ms 2 號 選手已就位, 準備共用時: 519ms 3 號 選手已就位, 準備共用時: 945ms 比賽開始~~
CyclicBarrier能夠實現CountDownLatch同樣的功能,不一樣的是CountDownLatch屬於一次性對象,聲明後只能使用一次,而CyclicBarrier能夠循環使用。ui
從字面意義上來看,CyclicBarrier表示循環的屏障,當一組線程所有都到達屏障時,屏障纔會被移除,不然只能阻塞在屏障處。this
public class RunningRace { public static void main(String[] args) { // 使用線程池的正確姿式 int size = 5; AtomicInteger counter = new AtomicInteger(); ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(size, size, 1000, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100), (r) -> new Thread(r, counter.addAndGet(1) + " 號 "), new ThreadPoolExecutor.AbortPolicy()); CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> System.out.println("裁判:比賽開始~~")); for (int i = 0; i < 10; i++) { threadPoolExecutor.submit(new Runner(cyclicBarrier)); } } } class Runner implements Runnable { private CyclicBarrier cyclicBarrier; public Runner(CyclicBarrier countdownLatch) { this.cyclicBarrier = countdownLatch; } @Override public void run() { try { int sleepMills = ThreadLocalRandom.current().nextInt(1000); Thread.sleep(sleepMills); System.out.println(Thread.currentThread().getName() + " 選手已就位, 準備共用時: " + sleepMills + "ms" + cyclicBarrier.getNumberWaiting()); cyclicBarrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } } }
因爲CyclicBarrier能夠循環使用,因此CyclicBarrier的構造方法中能夠傳入一個Runnable參數,在每一輪執行完畢以後就會馬上執行這個Runnable任務。
CountDownLath是基於AQS框架的一種簡單實現,有兩個核心的方法,即await()和countDown(),經過構造方法傳入一個狀態值,調用await()方法時線程會阻塞,直到狀態碼被修改爲0時纔會返回,每次調用countDown()時會將狀態值減1。
wait方法:執行wait方法後,會嘗試獲取同步狀態,若是爲狀態爲0則方法繼續執行,否擇當前線程會被加入到同步隊列中,詳情可見筆者關於AQS的兩篇文章。
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 若是狀態碼不爲0,嘗試獲取同步狀態,若是失敗則被加入到同步隊列中 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } // 當狀態碼爲0時返回1,否擇返回-1,這個方法中參數沒有任何用處 protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }
countDown方法:每次執行countDown方法時,會將狀態碼的值減1.
public void countDown() { sync.releaseShared(1); }
CyclicBarrier與CountDownLatch實現思想相同,也是基於AQS框架實現。不一樣的是CyclicBarrier內部維護一個狀態值, 藉助基於AQS實現的鎖ReentrantLock來實現狀態值的同步更新,以及AQS除了同步狀態以外的另外一個核心概念條件隊列來完成線程的阻塞。
parties: 和CountdownLatch中的狀態值同樣,用來記錄每次要相互等待的線程數量,只有parties個線程同時到達屏障時,纔會喚醒阻塞的線程。
count臨時計數器: 因爲CyclicBarrier是能夠循環使用的,count能夠理解爲是一個臨時變量,每一輪執行完畢或者被打斷都會重置count爲parties值。
Generation內部類: 只有一個屬性 broken表示當前這一輪執行是否被中斷,若是被中斷後其餘線程再執行await方法會拋出異常(目的是中止本輪線程未執行線程的繼續執行)。
await方法: 當執行await方法時,會同步得對內部的count執行--count操做, 若是count = 0,則執行barrierCommand任務(經過構造方法傳來的Runnable參數)。
reset方法:中斷本輪執行,重置count值,喚醒等待的線程而後開始下一輪,此時本輪正在執行的線程調用await方法會拋出異常。
// await方法實際執行的代碼 private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; // 加鎖,保證併發操做的一致性 lock.lock(); try { // 若是當前這一輪操做被中斷,拋出中斷異常(該異常只是起警示做用,沒有任何其餘信息) final Generation g = generation; if (g.broken) throw new BrokenBarrierException(); if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } // 本輪執行的計數器 數值-1 int index = --count; if (index == 0) { // 計數器值=1, 本輪線程所有到達屏障,執行barrierCommand任務 boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; nextGeneration();// 喚醒全部等待在條件隊列上的任務 return 0; } finally { if (!ranAction) breakBarrier(); } } // 若是狀態不等於0,循環等待直到計數器值爲0,本輪執行被打破,線程被中斷,或者等待超時 for (;;) { try { if (!timed) // 狀態碼不爲0,將當前線程加入到條件隊列中,進入阻塞狀態 trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier();// 喚醒全部條件隊列中的線程,重置count的值 throw new TimeoutException(); } } } finally { lock.unlock(); } }
重置柵欄的狀態
public void reset() { final ReentrantLock lock = this.lock; lock.lock(); try { breakBarrier(); // break the current generation nextGeneration(); // start a new generation } finally { lock.unlock(); } } /** * Sets current barrier generation as broken and wakes up everyone. * Called only while holding lock. */ private void breakBarrier() { generation.broken = true; count = parties; trip.signalAll(); }
當一輪執行完畢以後,既count=0後,CyclicBarrier的臨時狀態會重置爲parties
/** * 進入下一輪 * 喚醒全部等待線程,充值count */ private void nextGeneration() { // signal completion of last generation trip.signalAll(); // set up next generation count = parties; generation = new Generation(); }