瘋狂創客圈 經典圖書 : 《Netty Zookeeper Redis 高併發實戰》 面試必備 + 面試必備 + 面試必備 【博客園總入口 】html
瘋狂創客圈 經典圖書 : 《SpringCloud、Nginx高併發核心編程》 大廠必備 + 大廠必備 + 大廠必備 【博客園總入口 】java
入大廠+漲工資必備: 高併發【 億級流量IM實戰】 實戰系列 【 SpringCloud Nginx秒殺】 實戰系列 【博客園總入口 】面試
從字面上的意思能夠知道,這個類的中文意思是「循環柵欄」。大概的意思就是一個可循環利用的屏障。編程
它的做用就是會讓全部線程都等待完成後纔會繼續下一步行動。多線程
現實生活中咱們常常會遇到這樣的情景,在進行某個活動前須要等待人所有都齊了纔開始。例如吃飯時要等全家人都上座了才動筷子,旅遊時要等所有人都到齊了纔出發,比賽時要等運動員都上場後纔開始。併發
在JUC包中爲咱們提供了一個同步工具類可以很好的模擬這類場景,它就是CyclicBarrier類。利用CyclicBarrier類能夠實現一組線程相互等待,當全部線程都到達某個屏障點後再進行後續的操做。下圖演示了這一過程。ide
CyclicBarrier字面意思是「可重複使用的柵欄」,CyclicBarrier 相比 CountDownLatch 來講,要簡單不少,其源碼沒有什麼高深的地方,它是 ReentrantLock 和 Condition 的組合使用。高併發
看以下示意圖,CyclicBarrier 和 CountDownLatch 是否是很像,只是 CyclicBarrier 能夠有不止一個柵欄,由於它的柵欄(Barrier)能夠重複使用(Cyclic)。工具
public CyclicBarrier(int parties) public CyclicBarrier(int parties, Runnable barrierAction)
解析:this
parties 是參與線程的個數
第二個構造方法有一個 Runnable 參數,這個參數的意思是最後一個到達線程要作的任務
public int await() throws InterruptedException, BrokenBarrierException public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException
解析:
線程調用 await() 表示本身已經到達柵欄
BrokenBarrierException 表示柵欄已經被破壞,破壞的緣由多是其中一個線程 await() 時被中斷或者超時
2.3.1 需求
一個線程組的線程須要等待全部線程完成任務後再繼續執行下一次任務
2.3.2 代碼實現
public class CyclicBarrierDemo {
static class TaskThread extends Thread { CyclicBarrier barrier; public TaskThread(CyclicBarrier barrier) { this.barrier = barrier; } @Override public void run() { try { Thread.sleep(1000); System.out.println(getName() + " 到達柵欄 A"); barrier.await(); System.out.println(getName() + " 衝破柵欄 A"); Thread.sleep(2000); System.out.println(getName() + " 到達柵欄 B"); barrier.await(); System.out.println(getName() + " 衝破柵欄 B"); } catch (Exception e) { e.printStackTrace(); } } } public static void main(String[] args) { int threadNum = 5; CyclicBarrier barrier = new CyclicBarrier(threadNum, new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName() + " 完成最後任務"); } }); for(int i = 0; i < threadNum; i++) { new TaskThread(barrier).start(); } }
}
打印結果:
Thread-1 到達柵欄 A Thread-3 到達柵欄 A Thread-0 到達柵欄 A Thread-4 到達柵欄 A Thread-2 到達柵欄 A Thread-2 完成最後任務 Thread-2 衝破柵欄 A Thread-1 衝破柵欄 A Thread-3 衝破柵欄 A Thread-4 衝破柵欄 A Thread-0 衝破柵欄 A Thread-4 到達柵欄 B Thread-0 到達柵欄 B Thread-3 到達柵欄 B Thread-2 到達柵欄 B Thread-1 到達柵欄 B Thread-1 完成最後任務 Thread-1 衝破柵欄 B Thread-0 衝破柵欄 B Thread-4 衝破柵欄 B Thread-2 衝破柵欄 B Thread-3 衝破柵欄 B
從打印結果能夠看出,全部線程會等待所有線程到達柵欄以後纔會繼續執行,而且最後到達的線程會完成 Runnable 的任務。
能夠用於多線程計算數據,最後合併計算結果的場景。
而 CyclicBarrier 基於 Condition 來實現的。由於 CyclicBarrier 的源碼相對來講簡單許多,讀者只要熟悉了前面關於 Condition 的分析,那麼這裏的源碼是毫無壓力的,就是幾個特殊概念罷了。
在CyclicBarrier類的內部有一個計數器,每一個線程在到達屏障點的時候都會調用await方法將本身阻塞,此時計數器會減1,當計數器減爲0的時候全部因調用await方法而被阻塞的線程將被喚醒。這就是實現一組線程相互等待的原理,下面咱們先看看CyclicBarrier有哪些成員變量。
//同步操做鎖 private final ReentrantLock lock = new ReentrantLock(); //線程攔截器 private final Condition trip = lock.newCondition(); //每次攔截的線程數 private final int parties; //換代前執行的任務 private final Runnable barrierCommand; //表示柵欄的當前代 private Generation generation = new Generation(); //計數器 private int count; //靜態內部類Generation private static class Generation { boolean broken = false; }
上面貼出了CyclicBarrier全部的成員變量,能夠看到CyclicBarrier內部是經過條件隊列trip來對線程進行阻塞的,而且其內部維護了兩個int型的變量parties和count,parties表示每次攔截的線程數,該值在構造時進行賦值。count是內部計數器,它的初始值和parties相同,之後隨着每次await方法的調用而減1,直到減爲0就將全部線程喚醒。CyclicBarrier有一個靜態內部類Generation,該類的對象表明柵欄的當前代,就像玩遊戲時表明的本局遊戲,利用它能夠實現循環等待。barrierCommand表示換代前執行的任務,當count減爲0時表示本局遊戲結束,須要轉到下一局。在轉到下一局遊戲以前會將全部阻塞的線程喚醒,在喚醒全部線程以前你能夠經過指定barrierCommand來執行本身的任務。我用一圖來描繪下 CyclicBarrier 裏面的一些概念:
接下來咱們看看它的構造器。
//構造器1 public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; } //構造器2 public CyclicBarrier(int parties) { this(parties, null); }
CyclicBarrier有兩個構造器,其中構造器1是它的核心構造器,在這裏你能夠指定本局遊戲的參與者數量(要攔截的線程數)以及本局結束時要執行的任務,還能夠看到計數器count的初始值被設置爲parties。
CyclicBarrier類最主要的功能就是使先到達屏障點的線程阻塞並等待後面的線程,其中它提供了兩種等待的方法,分別是定時等待和非定時等待。
//非定時等待 public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); } } //定時等待 public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException { return dowait(true, unit.toNanos(timeout)); }
能夠看到不論是定時等待仍是非定時等待,它們都調用了dowait方法,只不過是傳入的參數不一樣而已。下面咱們就來看看dowait方法都作了些什麼。
//核心等待方法 private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { final Generation g = generation; //檢查當前柵欄是否被打翻 if (g.broken) { throw new BrokenBarrierException(); } //檢查當前線程是否被中斷 if (Thread.interrupted()) { //若是當前線程被中斷會作如下三件事 //1.打翻當前柵欄 //2.喚醒攔截的全部線程 //3.拋出中斷異常 breakBarrier(); throw new InterruptedException(); } //每次都將計數器的值減1 int index = --count; //計數器的值減爲0則需喚醒全部線程並轉換到下一代 if (index == 0) { boolean ranAction = false; try { //喚醒全部線程前先執行指定的任務 final Runnable command = barrierCommand; if (command != null) { command.run(); } ranAction = true; //喚醒全部線程並轉到下一代 nextGeneration(); return 0; } finally { //確保在任務未成功執行時能將全部線程喚醒 if (!ranAction) { breakBarrier(); } } } //若是計數器不爲0則執行此循環 for (;;) { try { //根據傳入的參數來決定是定時等待仍是非定時等待 if (!timed) { trip.await(); }else if (nanos > 0L) { nanos = trip.awaitNanos(nanos); } } catch (InterruptedException ie) { //若當前線程在等待期間被中斷則打翻柵欄喚醒其餘線程 if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { //若在捕獲中斷異常前已經完成在柵欄上的等待, 則直接調用中斷操做 Thread.currentThread().interrupt(); } } //若是線程由於打翻柵欄操做而被喚醒則拋出異常 if (g.broken) { throw new BrokenBarrierException(); } //若是線程由於換代操做而被喚醒則返回計數器的值 if (g != generation) { return index; } //若是線程由於時間到了而被喚醒則打翻柵欄並拋出異常 if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } ``` } finally { lock.unlock(); } }
上面貼出的代碼中註釋都比較詳細,咱們只挑一些重要的來說。能夠看到在dowait方法中每次都將count減1,減完後立馬進行判斷看看是否等於0,若是等於0的話就會先去執行以前指定好的任務,執行完以後再調用nextGeneration方法將柵欄轉到下一代,在該方法中會將全部線程喚醒,將計數器的值從新設爲parties,最後會從新設置柵欄代次,在執行完nextGeneration方法以後就意味着遊戲進入下一局。若是計數器此時還不等於0的話就進入for循環,根據參數來決定是調用trip.awaitNanos(nanos)仍是trip.await()方法,這兩方法對應着定時和非定時等待。若是在等待過程當中當前線程被中斷就會執行breakBarrier方法,該方法叫作打破柵欄,意味着遊戲在中途被掐斷,設置generation的broken狀態爲true並喚醒全部線程。同時這也說明在等待過程當中有一個線程被中斷整盤遊戲就結束,全部以前被阻塞的線程都會被喚醒。線程醒來後會執行下面三個判斷,看看是否由於調用breakBarrier方法而被喚醒,若是是則拋出異常;看看是不是正常的換代操做而被喚醒,若是是則返回計數器的值;看看是否由於超時而被喚醒,若是是的話就調用breakBarrier打破柵欄並拋出異常。這裏還須要注意的是,若是其中有一個線程由於等待超時而退出,那麼整盤遊戲也會結束,其餘線程都會被喚醒。下面貼出nextGeneration方法和breakBarrier方法的具體代碼。
最後,咱們來看看怎麼重置一個柵欄:
public void reset() { final ReentrantLock lock = this.lock; lock.lock(); try { breakBarrier(); // break the current generation nextGeneration(); // start a new generation } finally { lock.unlock(); } }
咱們設想一下,若是初始化時,指定了線程 parties = 4,前面有 3 個線程調用了 await 等待,在第 4 個線程調用 await 以前,咱們調用 reset 方法,那麼會發生什麼?
首先,打破柵欄,那意味着全部等待的線程(3個等待的線程)會喚醒,await 方法會經過拋出 BrokenBarrierException 異常返回。而後開啓新的一代,重置了 count 和 generation,至關於一切歸零了。
CountDownLatch 是一次性的,CyclicBarrier 是可循環利用的
CountDownLatch 參與的線程的職責是不同的,有的在倒計時,有的在等待倒計時結束。CyclicBarrier 參與的線程職責是同樣的。
CyclicBarrier 的源碼實現和 CountDownLatch 截然不同,CountDownLatch 基於 AQS 的共享模式的使用,而 CyclicBarrier 基於 Condition 來實現的。由於 CyclicBarrier 的源碼相對來講簡單許多,讀者只要熟悉了前面關於 Condition 的分析,那麼這裏的源碼是毫無壓力的,就是幾個特殊概念罷了。
瘋狂創客圈 - Java高併發研習社羣,爲你們開啓大廠之門