CyclicBarrier是一個同步輔助類,容許一組線程互相等待,直到到達某個公共屏障點 (common barrier point)。由於該 barrier 在釋放等待線程後能夠重用,因此稱它爲循環 的 barrier。html
注意比較CountDownLatch和CyclicBarrier:
(01) CountDownLatch的做用是容許1或N個線程等待其餘線程完成執行;而CyclicBarrier則是容許N個線程相互等待。
(02) CountDownLatch的計數器沒法被重置;CyclicBarrier的計數器能夠被重置後使用,所以它被稱爲是循環的barrier。java
CyclicBarrier(int parties) 建立一個新的 CyclicBarrier,它將在給定數量的參與者(線程)處於等待狀態時啓動,但它不會在啓動 barrier 時執行預約義的操做。 CyclicBarrier(int parties, Runnable barrierAction) 建立一個新的 CyclicBarrier,它將在給定數量的參與者(線程)處於等待狀態時啓動,並在啓動 barrier 時執行給定的屏障操做,該操做由最後一個進入 barrier 的線程執行。 int await() 在全部參與者都已經在此 barrier 上調用 await 方法以前,將一直等待。 int await(long timeout, TimeUnit unit) 在全部參與者都已經在此屏障上調用 await 方法以前將一直等待,或者超出了指定的等待時間。 int getNumberWaiting() 返回當前在屏障處等待的參與者數目。 int getParties() 返回要求啓動此 barrier 的參與者數目。 boolean isBroken() 查詢此屏障是否處於損壞狀態。 void reset() 將屏障重置爲其初始狀態。
CyclicBarrier是包含了"ReentrantLock對象lock"和"Condition對象trip",它是經過獨佔鎖實現的。下面經過源碼去分析究竟是如何實現的。數據結構
1. 構造函數app
CyclicBarrier的構造函數共2個:CyclicBarrier 和 CyclicBarrier(int parties, Runnable barrierAction)。第1個構造函數是調用第2個構造函數來實現的,下面第2個構造函數的源碼函數
public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); // parties表示「必須同時到達barrier的線程個數」。 this.parties = parties; // count表示「處在等待狀態的線程個數」。 this.count = parties; // barrierCommand表示「parties個線程到達barrier時,會執行的動做」。 this.barrierCommand = barrierAction; }
2. 等待函數this
public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen; } }
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; // 獲取「獨佔鎖(lock)」 lock.lock(); try { // 保存「當前的generation」 final Generation g = generation; // 若「當前generation已損壞」,則拋出異常。 if (g.broken) throw new BrokenBarrierException(); // 若是當前線程被中斷,則經過breakBarrier()終止CyclicBarrier,喚醒CyclicBarrier中全部等待線程。 if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } // 將「count計數器」-1 int index = --count; // 若是index=0,則意味着「有parties個線程到達barrier」。 if (index == 0) { // tripped boolean ranAction = false; try { // 若是barrierCommand不爲null,則執行該動做。 final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; // 喚醒全部等待線程,並更新generation。 nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } // 當前線程一直阻塞,直到「有parties個線程到達barrier」 或 「當前線程被中斷」 或 「超時」這3者之一發生, // 當前線程才繼續執行。 for (;;) { try { // 若是不是「超時等待」,則調用awati()進行等待;不然,調用awaitNanos()進行等待。 if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { // 若是等待過程當中,線程被中斷,則執行下面的函數。 if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { Thread.currentThread().interrupt(); } } // 若是「當前generation已經損壞」,則拋出異常。 if (g.broken) throw new BrokenBarrierException(); // 若是「generation已經換代」,則返回index。 if (g != generation) return index; // 若是是「超時等待」,而且時間已到,則經過breakBarrier()終止CyclicBarrier,喚醒CyclicBarrier中全部等待線程。 if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { // 釋放「獨佔鎖(lock)」 lock.unlock(); } }
說明:dowait()的做用就是讓當前線程阻塞,直到「有parties個線程到達barrier」 或 「當前線程被中斷」 或 「超時」這3者之一發生,當前線程才繼續執行。spa
(01) generation是CyclicBarrier的一個成員遍歷,在CyclicBarrier中,同一批的線程屬於同一代,即同一個Generation;CyclicBarrier中經過generation對象,記錄屬於哪一代。當有parties個線程到達barrier,generation就會被更新換代。線程
它的定義以下:code
private Generation generation = new Generation(); private static class Generation { boolean broken = false; }
(02) 若是當前線程被中斷,即Thread.interrupted()爲true;則經過breakBarrier()終止CyclicBarrier。breakBarrier()會設置當前中斷標記broken爲true,意味着「將該Generation中斷」;同時,設置count=parties,即從新初始化count;最後,經過signalAll()喚醒CyclicBarrier上全部的等待線程。breakBarrier()的源碼以下:htm
private void breakBarrier() { generation.broken = true; count = parties; trip.signalAll(); }
(03) 將「count計數器」-1,即--count;而後判斷是否是「有parties個線程到達barrier」,即index是否是爲0。首先,它會調用signalAll()喚醒CyclicBarrier上全部的等待線程;接着,從新初始化count;最後,更新generation的值。
當index=0時,若是barrierCommand不爲null,則執行該barrierCommand,barrierCommand就是咱們建立CyclicBarrier時,傳入的Runnable對象。而後,調用nextGeneration()進行換代工做,nextGeneration()的源碼以下:
private void nextGeneration() { trip.signalAll(); count = parties; generation = new Generation(); }
04) 在for(;;)循環中。timed是用來表示當前是否是「超時等待」線程。若是不是,則經過trip.await()進行等待;不然,調用awaitNanos()進行超時等待。
import java.util.concurrent.CyclicBarrier; import java.util.concurrent.BrokenBarrierException; public class CyclicBarrierTest1 { private static int SIZE = 5; private static CyclicBarrier cb; public static void main(String[] args) { cb = new CyclicBarrier(SIZE); // 新建5個任務 for(int i=0; i<SIZE; i++) new InnerThread().start(); } static class InnerThread extends Thread{ public void run() { try { System.out.println(Thread.currentThread().getName() + " wait for CyclicBarrier."); // 將cb的參與者數量加1 cb.await(); // cb的參與者數量等於5時,才繼續日後執行 System.out.println(Thread.currentThread().getName() + " continued."); } catch (BrokenBarrierException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } }
Thread-1 wait for CyclicBarrier. Thread-2 wait for CyclicBarrier. Thread-3 wait for CyclicBarrier. Thread-4 wait for CyclicBarrier. Thread-0 wait for CyclicBarrier. Thread-0 continued. Thread-4 continued. Thread-2 continued. Thread-3 continued. Thread-1 continued.
結果說明:主線程中新建了5個線程,全部的這些線程都調用cb.await()等待。全部這些線程一直等待,直到cb中全部線程都達到barrier時,這些線程才繼續運行!
示例2
新建5個線程,當這5個線程達到必定的條件時,執行某項任務。
import java.util.concurrent.CyclicBarrier; import java.util.concurrent.BrokenBarrierException; public class CyclicBarrierTest2 { private static int SIZE = 5; private static CyclicBarrier cb; public static void main(String[] args) { cb = new CyclicBarrier(SIZE, new Runnable () { public void run() { System.out.println("CyclicBarrier's parties is: "+ cb.getParties()); } }); // 新建5個任務 for(int i=0; i<SIZE; i++) new InnerThread().start(); } static class InnerThread extends Thread{ public void run() { try { System.out.println(Thread.currentThread().getName() + " wait for CyclicBarrier."); // 將cb的參與者數量加1 cb.await(); // cb的參與者數量等於5時,才繼續日後執行 System.out.println(Thread.currentThread().getName() + " continued."); } catch (BrokenBarrierException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } }
Thread-1 wait for CyclicBarrier. Thread-2 wait for CyclicBarrier. Thread-3 wait for CyclicBarrier. Thread-4 wait for CyclicBarrier. Thread-0 wait for CyclicBarrier. CyclicBarrier's parties is: 5 Thread-0 continued. Thread-4 continued. Thread-2 continued. Thread-3 continued. Thread-1 continued.
結果說明:主線程中新建了5個線程,全部的這些線程都調用cb.await()等待。全部這些線程一直等待,直到cb中全部線程都達到barrier時,執行新建cb時註冊的Runnable任務。