JUC 中 迴環屏障 CyclicBarrier 的使用與分析,它也能夠實現像 CountDownLatch 同樣讓一組線程所有到達一個狀態後再所有同時執行,可是 CyclicBarrier 能夠被複用。那麼 CyclicBarrier 內部的實現與 CountDownLatch 有何不一樣那?java
CounDownLatch在解決多個線程同步方面相對於調用線程的 join 已經提供了很多改進,可是CountDownLatch的計數器是一次性的,也就是等到計數器變爲0後,再調用CountDownLatch的await ()和countDown()方法都會馬上返回,這就起不到線程同步的效果了。CyclicBarrier類的功能不限於CountDownLatch所提供的功能,從字面意思理解CyclicBarrier是迴環屏障的意思,它能夠實現讓一組線程所有達到一個狀態後再所有同時執行。這裏之因此叫作迴環是由於當全部等待線程執行完畢以後,重置CyclicBarrier的狀態後能夠被重用。下圖演示了這一過程。架構
一.CyclicBarrier的實現原理app
爲了能一覽CyclicBarrier的架構設計,下面先看下CyclicBarrier的類圖,以下圖:函數
如上面類圖,能夠知道CyclicBarrier 內部並非直接使用AQS實現,而是使用了獨佔鎖ReentrantLock來實現的同步;parties用來記錄線程個數,用來表示須要多少線程先調用await後,全部線程纔會衝破屏障繼續往下運行;而 count 一開始等一parties,每當線程調用await方法後就遞減 1 ,當爲 0 的時候就表示全部線程都到了屏障點,另外你可能會疑惑爲什麼維護parties 和 count 這兩個變量,只有count 不就好了嗎?別忘了CyclicBarries是能夠被複用的,使用兩個變量緣由是用parties始終來記錄總的線程個數,當count計數器變爲 0 後,會使用parties 賦值給count,已達到複用的做用。這兩個變量是在構造CyclicBarries對象的時候傳遞的,源碼以下:this
public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; }
這裏還有一個變量barrierConmmand也經過構造函數傳遞而來,這是一個任務,這個任務的執行時機是當全部線程都達到屏障點後。另外CyclicBarrier內部使用獨佔鎖Lock來保證同時只有一個線程調用await方法時候才能夠返回,使用lock首先保證了更新計數器count 的原子性,另外使用lock的條件變量 trip 支持了 線程間使用 notify,await 操做進行同步。spa
最後變量generation內部就一個變量broken用來記錄當前屏障是否被打破,另外注意這裏broken並無被聲明爲volatile ,這是由於鎖內使用變量不須要。源碼以下:線程
private static class Generation { boolean broken = false; }
接下來重點看一下CyclicBarrier的幾個重要的函數,以下:架構設計
1.int await() 當前線程調用 CyclicBarrier 的該方法時候,當前線程會被阻塞,知道知足下面條件之一纔會返回:(1)parties 個線程都調用了 await()方法,也就是線程都到了屏障點。(2)其餘線程調用了當前線程的interrupt()方法中斷了當前線程,則當前線程會拋出InterruptedException 異常返回。(3)當前屏障點關聯的Generation對象的broken標誌被設置爲true的時候,會拋出 BrokenBarrierException 異常。源碼以下:設計
public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } }
正如上面代碼能夠知道內部調用了dowait 方法,第一個參數false說明不設置超時時間,這時候第二個參數沒有意義。code
2.boolean await(long timeout, TimeUnit unit) 當前線程調用 CyclicBarrier 的該方法時候當前線程會被阻塞,直到知足下面條件之一纔會返回: (1) parties 個線程都調用了 await() 函數,也就是線程都到了屏障點,這時候返回 true。 (2) 當設置的超時時間到了後返回 false (3) 其它線程調用了當前線程的 interrupt()方法中斷了當前線程,則當前線程會拋出 InterruptedException 異常返回。 (4) 當前屏障點關聯的 Generation 對象的 broken 標誌被設置爲 true 時候,會拋出 BrokenBarrierException 異常。源碼以下:
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException { return dowait(true, unit.toNanos(timeout)); }
正如上面代碼能夠知道內部調用了dowait 方法,第一個參數true說明設置超時時間,這時候第二個參數是超時時間。
3.int dowait(boolean timed, long nanos) 該方法是實現 CyclicBarrier 的核心功能,源碼以下:
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { ... //(1)若是index==0說明全部線程都到到了屏障點,則執行初始化時候傳遞的任務 int index = --count; if (index == 0) { // tripped boolean ranAction = false; try { //(2)執行任務 if (command != null) command.run(); ranAction = true; //(3)激活其它調用await而被阻塞的線程,並重置CyclicBarrier nextGeneration(); //返回 return 0; } finally { if (!ranAction) breakBarrier(); } } // (4)若是index!=0 for (;;) { try { //(5)沒有設置超時時間, if (!timed) trip.await(); //(6)設置了超時時間 else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { ... } ... } } finally { lock.unlock(); } } private void nextGeneration() { //(7)喚醒條件隊列裏面阻塞線程 trip.signalAll(); //(8) 重置CyclicBarrier count = parties; generation = new Generation(); }
上面代碼是dowait方法的主幹代碼,當一個線程調用了dowait方法後首先會獲取獨佔鎖lock,若是建立CyclicBarrier的時候傳遞的參數爲 10 ,那麼後面 9 個調用線程會被阻塞;而後當前獲取線程對計數器count進行遞減操做,遞減後的count = index = 9 ,由於 index != 0 ,因此當前線程會執行代碼(4)。若是是無參數的當前線程調用的是無參數的await()方法,則這裏 timed = false,因此當前線程會被放入條件變量trip的阻塞隊列,當前線程會被掛起並釋放獲取的Lock鎖;若是調用的有參數的await 方法 則timed = true,則當前線程也會被放入條件變量阻塞隊列並釋放鎖的資源,可是不一樣的是當前線程會在指定時間超時後自動激活。
當第一個獲取鎖的線程因爲被阻塞釋放鎖後,被阻塞的 9 個線程中有一個會競爭到lock鎖,而後執行第一個線程一樣的操做,直到最後一個線程獲取到lock的時候,已經有 9 個線程被放入了Lock 的條件隊列裏面,最後一個線程 count 遞減後,count = index 等於 0 ,因此執行代碼(2),若是建立CyclicBarrier的時候傳遞了任務,則在其餘線程被喚醒前先執行任務,任務執行完畢後再執行代碼(3),喚醒其餘 9 個線程,並重置CyclicBarrier,而後這 10個線程就能夠繼續向下執行了。
到目前位置理解了CyclicBarrier的原理後,接下來用幾個例子來加深對CyclicBarrier的理解,下面例子咱們要實現的是使用兩個線程去執行一個被分解的任務 A,當兩個線程把本身的任務都執行完畢後在對它們的結果進行彙總處理。例子以下:
package com.hjc; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * Created by cong on 2018/7/7. */ public class CyclicBarrierTest1 { // 建立一個CyclicBarrier實例,添加一個全部子線程所有到達屏障後執行的一個任務 private static volatile CyclicBarrier cyclicBarrier = new CyclicBarrier(2, new Runnable() { public void run() { System.out.println(Thread.currentThread() + " task1 merge result"); } }); public static void main(String[] args) throws InterruptedException { //建立一個線程個數固定爲2的線程池 ExecutorService executorService = Executors.newFixedThreadPool(2); // 加入線程A到線程池 executorService.submit(new Runnable() { public void run() { try { System.out.println(Thread.currentThread() + " task1-1"); System.out.println(Thread.currentThread() + " enter in barrier"); cyclicBarrier.await(); System.out.println(Thread.currentThread() + " enter out barrier"); } catch (Exception e) { e.printStackTrace(); } } }); // 加入線程B到線程池 executorService.submit(new Runnable() { public void run() { try { System.out.println(Thread.currentThread() + " task1-2"); System.out.println(Thread.currentThread() + " enter in barrier"); cyclicBarrier.await(); System.out.println(Thread.currentThread() + " enter out barrier"); } catch (Exception e) { e.printStackTrace(); } } }); // 關閉線程池 executorService.shutdown(); } }
運行結果以下:
如上代碼建立了一個 CyclicBarrier 對象,第一個參數爲計數器初始值,第二個參數 Runable 是指當計數器爲 0 時候須要執行的任務。main 函數裏面首先建立了固定大小爲 2 的線程池,而後添加兩個子任務到線程池,每一個子任務在執行完本身的邏輯後會調用 await 方法。
一開始計數器爲 2,當第一個線程調用 await 方法時候,計數器會遞減爲 1,因爲計數器不爲 0,因此當前線程就到了屏障點會被阻塞,而後第二個線程調用 await 時候,會進入屏障,計數器也會遞減如今計數器爲 0,就會去執行在 CyclicBarrier 構造時候的任務,執行完畢後就會退出屏障點,而且會喚醒被阻塞的第一個線程,這時候第一個線程也會退出屏障點繼續向下運行。
上面的例子說明了多個線程之間是相互等待的,假如計數器爲 N,那麼調用 await 方法的前面 N-1 的線程都會由於到達屏障點被阻塞,當第 N 個線程調用 await 後,計數器爲 0 了,這時候第 N 個線程纔會發出通知喚醒前面的 N-1 個線程。也就是所有線程達到屏障點時候才能一塊繼續向下執行,對與這個例子來講使用 CountDownLatch 也能夠達到相似輸出結果。
下面在放個例子來講明 CyclicBarrier 的可複用性。
假設一個任務由階段 一、階段 二、階段 3 組成,每一個線程要串行的執行階段 1 和 2 和 3,多個線程執行該任務時候,必需要保證全部線程的階段 1 所有完成後才能進行階段 2 執行,全部線程的階段 2 所有完成後才能進行階段 3 執行,下面使用 CyclicBarrier 來完成這個需求。例子以下:
package com.hjc; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * Created by cong on 2018/7/7. */ public class CyclicBarrierTest2 { // 建立一個CyclicBarrier實例 private static volatile CyclicBarrier cyclicBarrier = new CyclicBarrier(2); public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(2); // 加入線程A到線程池 executorService.submit(new Runnable() { public void run() { try { System.out.println(Thread.currentThread() + " step1"); cyclicBarrier.await(); System.out.println(Thread.currentThread() + " step2"); cyclicBarrier.await(); System.out.println(Thread.currentThread() + " step3"); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); // 加入線程B到線程池 executorService.submit(new Runnable() { public void run() { try { System.out.println(Thread.currentThread() + " step1"); cyclicBarrier.await(); System.out.println(Thread.currentThread() + " step2"); cyclicBarrier.await(); System.out.println(Thread.currentThread() + " step3"); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); //關閉線程池 executorService.shutdown(); } }
運行結果以下:
如上代碼,在每一個子線程執行完 step1 後都調用了 await 方法,全部線程都到達屏障點後纔會一塊往下執行,這就保證了全部線程完成了 step1 後纔會開始執行 step2,而後在 step2 後面調用了 await 方法,這保證了全部線程的 step2 完成後,線程才能開始 step3 的執行,這個功能使用單個 CountDownLatch 是沒法完成的。