本文首發於一世流雲專欄: https://segmentfault.com/blog...
CyclicBarrier
是一個輔助同步器類,在JDK1.5時隨着J.U.C一塊兒引入。segmentfault
這個類的功能和咱們以前介紹的CountDownLatch有些相似。咱們知道,CountDownLatch
是一個倒數計數器,在計數器不爲0時,全部調用await的線程都會等待,當計數器降爲0,線程纔會繼續執行,且計數器一旦變爲0,就不能再重置了。微信
CyclicBarrier
能夠認爲是一個柵欄,柵欄的做用是什麼?就是阻擋前行。
顧名思義,CyclicBarrier是一個能夠循環使用的柵欄,它作的事情就是:
讓線程到達柵欄時被阻塞(調用await方法),直到到達柵欄的線程數知足指定數量要求時,柵欄纔會打開放行。框架
這其實有點像軍訓報數,報數總人數知足教官認爲的總數時,教官纔會安排後面的訓練。
能夠看下面這個圖來理解下:
一共4個線程A、B、C、D,它們到達柵欄的順序可能各不相同。當A、B、C到達柵欄後,因爲沒有知足總數【4】的要求,因此會一直等待,當線程D到達後,柵欄纔會放行。ide
從CyclicBarrier的構造器,咱們也能夠看出關於這個類的一些端倪,CyclicBarrier有兩個構造器:this
構造器一:
這個構造器的參數parties
就是以前說的須要知足的計數總數。spa
構造器二:
這個構造器稍微特殊一些,除了指定了計數總數外,傳入了一個Runnable
任務。線程
Runnable任務其實就是當最後一個線程到達柵欄時,後續當即要執行的任務。3d
好比,軍訓報數完畢後,總人數知足了要求,教官就會開始命令你們執行下一個任務,這個【下一個任務】就是這裏的Runnable。
咱們來看一個CyclicBarrier的示例,來理解下它的功能。code
假設如今有這樣一個場景:
5個運動員準備跑步比賽,運動員在賽跑前會準備一段時間,當裁判發現全部運動員準備完畢後,就舉起發令槍,比賽開始。
這裏的起跑線就是屏障,運動員必須在起跑線等待其餘運動員準備完畢。
public class CyclicBarrierTest { public static void main(String[] args) { int N = 5; // 運動員數 CyclicBarrier cb = new CyclicBarrier(N, new Runnable() { @Override public void run() { System.out.println("****** 全部運動員已準備完畢,發令槍:跑!******"); } }); for (int i = 0; i < N; i++) { Thread t = new Thread(new PrepareWork(cb), "運動員[" + i + "]"); t.start(); } } private static class PrepareWork implements Runnable { private CyclicBarrier cb; PrepareWork(CyclicBarrier cb) { this.cb = cb; } @Override public void run() { try { Thread.sleep(500); System.out.println(Thread.currentThread().getName() + ": 準備完成"); cb.await(); // 在柵欄等待 } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } } }
執行上面的程序,可能的輸出結果以下:對象
運動員[3]: 準備完成 運動員[1]: 準備完成 運動員[0]: 準備完成 運動員[2]: 準備完成 運動員[4]: 準備完成 ****** 全部運動員已準備完畢,發令槍:跑!******
從輸出能夠看到,線程到達柵欄時會被阻塞(調用await
方法),直到到達柵欄的線程數知足指定數量要求時,柵欄纔會打開放行。
咱們知道,線程在阻塞過程當中,可能被中斷,那麼既然CyclicBarrier放行的條件是等待的線程數達到指定數目,萬一線程被中斷致使最終的等待線程數達不到柵欄的要求怎麼辦?
CyclicBarrier必定有考慮到這種異常狀況,否則其它全部等待線程都會無限制地等待下去。
那麼CyclicBarrier是如何處理的呢?
咱們看下CyclicBarrier的await()
方法:
public int await() throws InterruptedException, BrokenBarrierException { //... }
能夠看到,這個方法除了拋出InterruptedException異常外,還會拋出BrokenBarrierException
。
BrokenBarrierException表示當前的CyclicBarrier已經損壞了,可能等不到全部線程都到達柵欄了,因此已經在等待的線程也不必再等了,能夠散夥了。
出現如下幾種狀況之一時,當前等待線程會拋出BrokenBarrierException異常:
另外,只要正在Barrier上等待的任一線程拋出了異常,那麼Barrier就會認爲確定是湊不齊全部線程了,就會將柵欄置爲損壞(Broken)狀態,並傳播BrokenBarrierException給其它全部正在等待(await)的線程。
咱們來對上面的例子作個改造,模擬下異常狀況:
public class CyclicBarrierTest { public static void main(String[] args) throws InterruptedException { int N = 5; // 運動員數 CyclicBarrier cb = new CyclicBarrier(N, new Runnable() { @Override public void run() { System.out.println("****** 全部運動員已準備完畢,發令槍:跑!******"); } }); List<Thread> list = new ArrayList<>(); for (int i = 0; i < N; i++) { Thread t = new Thread(new PrepareWork(cb), "運動員[" + i + "]"); list.add(t); t.start(); if (i == 3) { t.interrupt(); // 運動員[3]置中斷標誌位 } } Thread.sleep(2000); System.out.println("Barrier是否損壞:" + cb.isBroken()); } private static class PrepareWork implements Runnable { private CyclicBarrier cb; PrepareWork(CyclicBarrier cb) { this.cb = cb; } @Override public void run() { try { System.out.println(Thread.currentThread().getName() + ": 準備完成"); cb.await(); } catch (InterruptedException e) { System.out.println(Thread.currentThread().getName() + ": 被中斷"); } catch (BrokenBarrierException e) { System.out.println(Thread.currentThread().getName() + ": 拋出BrokenBarrierException"); } } } }
可能的輸出結果:
運動員[0]: 準備完成 運動員[2]: 準備完成 運動員[1]: 準備完成 運動員[3]: 準備完成 運動員[3]: 被中斷 運動員[4]: 準備完成 運動員[4]: 拋出BrokenBarrierException 運動員[0]: 拋出BrokenBarrierException 運動員[1]: 拋出BrokenBarrierException 運動員[2]: 拋出BrokenBarrierException Barrier是否損壞:true
這段代碼,模擬了中斷線程3的狀況,從輸出能夠看到,線程0、一、2首先到達Brrier等待。
而後線程3到達,因爲以前設置了中斷標誌位,因此線程3拋出中斷異常,致使Barrier損壞,此時全部已經在柵欄等待的線程(0、一、2)都會拋出BrokenBarrierException異常。
此時,即便再有其它線程到達柵欄(線程4),都會拋出BrokenBarrierException異常。
注意:使用
CyclicBarrier
時,對異常的處理必定要當心,好比線程在到達柵欄前就拋出異常,此時若是沒有重試機制,其它已經到達柵欄的線程會一直等待(由於沒有尚未知足總數),最終致使程序沒法繼續向下執行。
CyclicBarrier有兩個構造器:CyclicBarrier cb = new CyclicBarrier(10);
構造器內部的各個字段含義以下:
字段名 | 做用 |
---|---|
parties | 柵欄開啓須要的到達線程總數 |
count | 剩餘未到達的線程總數 |
barrierCommand | 最後一個線程到達後執行的任務 |
CyclicBarrier 並無本身去實現AQS框架的API,而是利用了ReentrantLock
和Condition
。
public class CyclicBarrier { private final ReentrantLock lock = new ReentrantLock(); private final Condition trip = lock.newCondition(); // 柵欄開啓須要的到達線程總數 private final int parties; // 最後一個線程到達後執行的任務 private final Runnable barrierCommand; // 剩餘未到達的線程總數 private int count; // 當前輪次的運行狀態 private Generation generation = new Generation(); // ... }
須要注意的是generation
這個字段:
咱們知道,CyclicBarrier 是能夠循環複用的,因此CyclicBarrier 的每一輪任務都須要對應一個generation 對象。
generation 對象內部有個broken字段,用來標識當前輪次的CyclicBarrier 是否已經損壞。
nextGeneration方法用來建立一個新的generation 對象,並喚醒全部等待線程,重置內部參數。
咱們先來看下await方法:
能夠看到,不管有沒有超時功能,內部都是調了dowait這個方法:
dowait方法並不複雜,一共有3部分:
破壞柵欄用的是breakBarrier方法:
再來看下CyclicBarrier的reset方法:
該方法先破壞柵欄,而後開始下一輪(新建一個generation對象)。
類聲明:
構造器聲明:
接口聲明: