CyclicBarrier的做用是讓一組線程互相等待至某個狀態後並行執行(相對外部來講是並行,其實內部仍是串行)java
基本的使用方法是建立一個CyclicBarrier實例,而且指定parties的個數,而後線程依次調用CyclicBarrier的await()方法讓本身進入等待狀態,當最後一個線程進入await()方法時,將會喚醒全部正在等待的線程,並行執行。app
CyclicBarrier雖然也是同步器,可是並不是直接經過AQS來進行實現的,而是藉助了ReentrantLock以及Condition來進行實現。源碼分析
public class CyclicBarrier { /** * 存在一個Generation靜態內部類 */ private static class Generation { boolean broken = false; // 標識CyclicBarrier是否被破壞 } private final ReentrantLock lock = new ReentrantLock(); private final Condition trip = lock.newCondition(); // 與ReentrantLock綁定的Condition實例 private final int parties; // 用於記錄一共有多少個線程須要等待 private final Runnable barrierCommand; // 由最後一個進入await()方法的線程進行調用 private Generation generation = new Generation(); private int count; // 用於記錄還須要多少個線程進行等待 public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; } public CyclicBarrier(int parties) { this(parties, null); } public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } } public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException { return dowait(true, unit.toNanos(timeout)); } private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { // ...... } // 其餘省略 }
能夠看到CyclicBarrier存在全局的lock、trip、parties、count、barrierCommand以及generation屬性,其中parties屬性用於記錄一共有多少個線程須要等待,而count用於記錄還須要多少個線程進行等待。this
同時CyclicBarrier中定義了一個Generation靜態內部類,該內部類只有一個broken全局屬性,用於標識CyclicBarrier是否被破壞,默認爲false。線程
同時CyclicBarrier的構造方法會初始化全局的parties、count以及barrierCommand屬性(CyclicBarrier初始化後,count的數量等於parties的數量)code
因爲當建立CyclicBarrier實例以後,線程須要依次調用CyclicBarrier的await()方法讓本身進入等待狀態,所以從await()方法開始入手。ip
public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } }
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException { return dowait(true, unit.toNanos(timeout)); }
await()方法存在兩個重載,區別是一個支持超時,一個不支持超時,最終都會調用dowait()方法(使用timed參數表示是否有超時限制,若是timed參數爲true則須要傳遞具體的超時時間)同步
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { // 獲取全局的ReentrantLock實例,並進行加鎖 final ReentrantLock lock = this.lock; lock.lock(); try { // 獲取全局的Generation實例,若是Generation中的broken屬性爲true則表示CyclicBarrier已經被破壞,則直接拋出異常(默認是false) final Generation g = generation; if (g.broken) throw new BrokenBarrierException(); // 若是線程已經被設置了中斷標識,則調用breakBarrier()方法,破壞CyclicBarrier if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); // 拋出異常 } // index屬性用於記錄還須要多少個線程進行等待 int index = --count; // 若是index等於0,表示當前線程是最後一個進入await()方法的線程,若是barrierCommand不爲空,那麼執行barrierCommand的run()方法,而後調用nextGeneration()方法,喚醒在指定Condition實例中等待的全部線程,並重置CyclicBarrier,而後線程直接返回,作本身的事情 if (index == 0) { // 最後一個線程走這個邏輯 boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; nextGeneration(); return 0; } finally { // 若是在執行barrierCommand的run()方法時拋出異常,那麼ranAction標識爲false,那麼須要調用breakBarrier()方法,破壞CyclicBarrier if (!ranAction) breakBarrier(); } } // 若是非最後一個線程那麼將會往下執行 // 循環 for (;;) { try { // 若是沒有超時限制,那麼直接調用Condition實例的await()方法,讓線程在指定的Condition實例中進行等待,並釋放掉它擁有的鎖 // 若是有超時限制,那麼調用Condition實例的awaitNanos()方法,至多讓線程在指定的Condition實例中等待指定的時間,該方法返回線程被喚醒後剩餘的毫秒數(超時返回小於等於0),並釋放掉它擁有的鎖 if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { Thread.currentThread().interrupt(); } } // 當線程被喚醒後將會串行執行如下的邏輯 // 若是發現CyclicBarrier被破壞了,那麼就拋出異常 if (g.broken) throw new BrokenBarrierException(); // 正常狀況下,當調用了nextGeneration()方法以後,generation引用就指向一個新的Generation實例,所以g!=generation,那麼線程直接返回,作本身的事情 if (g != generation) return index; // 若是線程在Condition實例等待的過程當中因爲達到了超時時間而被喚醒了,那麼將會調用breakBarrier()方法,破壞CyclicBarrier if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); // 拋出異常 } } } finally { lock.unlock(); // 解鎖 } }
當線程進入dowait()方法後,須要獲取鎖,若是當前線程並不是最後一個進入await()方法的線程,那麼將會在指定的Condition實例中進行等待,而後釋放掉它擁有的鎖,若是當前線程是最後一個進入await()方法的線程(index==0,表示還須要0個線程進行等待),若是barrierCommand不爲空,那麼將會執行barrierCommand的run()方法,最後調用nextGeneration()方法。源碼
若是在執行dowait()方法的過程當中,線程已經被設置了中斷標識,或者最後一個線程在執行barrierCommand的run()方法時拋出異常,或者在指定Condition實例等待的線程因爲達到了超時時間而被喚醒,那麼都會調用breakBarrier()方法。it
private void nextGeneration() { // 喚醒在指定Condition實例中等待的全部線程 trip.signalAll(); // 將count的數量設置成parties count = parties; // 將generation引用指向一個新的Generation實例 generation = new Generation(); }
nextGeneration()方法用於指向下一個Generation,該方法將會喚醒在指定Condition實例中等待的全部線程,而後將count的數量設置成parties,恢復成CyclicBarrier初始化後的狀態,最後將generation引用指向一個新的Generation實例。
private void breakBarrier() { // 將Generation實例的broken屬性設置爲true,表示CyclicBarrier已經被破壞 generation.broken = true; // 將count的數量設置回parties count = parties; // 喚醒在指定Condition實例中等待的全部線程 trip.signalAll(); }
breakBarrier()方法用於破壞CyclicBarrier,將Generation實例的broken屬性設置爲true,表示CyclicBarrier已經被破壞,而後將count的數量設置成parties,最後喚醒在指定Condition實例中等待的全部線程。
public void reset() { final ReentrantLock lock = this.lock; lock.lock(); try { breakBarrier(); // 感受是多餘的 nextGeneration(); // 指向下一個Generation } finally { lock.unlock(); } }
reset()方法用於重置CyclicBarrier,其根本是將generation引用指向一個新的Generation實例。
1.當建立了一個CyclicBarrier實例以後,線程須要依次調用CyclicBarrier的await()方法,讓本身進入等待狀態。
2.await()方法又會調用dowait()方法,當線程進入dowait()方法後,須要獲取鎖,若是當前線程並不是最後一個進入await()方法的線程,那麼將會在指定的Condition實例中進行等待,而後釋放掉它擁有的鎖,若是當前線程是最後一個進入await()方法的線程(index==0,表示還須要0個線程進行等待),若是barrierCommand不爲空,那麼將會執行barrierCommand的run()方法,最後調用nextGeneration()方法。
3.nextGeneration()方法用於指向下一個Generation,該方法將會喚醒在指定Condition實例中等待的全部線程,而後將count的數量設置成parties,恢復成CyclicBarrier初始化後的狀態,最後將generation引用指向一個新的Generation實例,當最後一個線程執行完nextGeneration()方法後,將會直接返回,作本身的事情,最後釋放掉它擁有的鎖。
4.當被喚醒的線程依次獲取到鎖後,將會繼續往下執行,若是判斷到generation引用已經指向一個新的Generation實例,那麼直接返回,作本身的事情,最後釋放掉它擁有鎖。
5.若是在執行dowait()方法的過程當中,線程已經被設置了中斷標識,或者最後一個線程在執行barrierCommand的run()方法時拋出異常,或者在指定Condition實例等待的線程因爲達到了超時時間而被喚醒,那麼都會調用breakBarrier()方法,破壞CyclicBarrier,將Generation實例的broken屬性設置爲true,表示CyclicBarrier已經被破壞,而後將count的數量設置成parties,最後喚醒在指定Condition實例中等待的全部線程。
6.當被喚醒的線程依次獲取到鎖後,將會繼續往下執行,若是判斷到Generation實例的broken屬性被設置了true,也就是CyclicBarrier已經被破壞,那麼將會直接拋出異常,最後釋放掉它擁有的鎖。
7.當CyclicBarrier被破壞後是不可以進行復用的,由於Generation的broken屬性已經被設置成true,所以須要先調用一次reset()方法進行重置。
爲何說CyclicBarrier是能夠複用的?
由於當最後一個線程進入await()方法,將會調用nextGeneration()方法,該方法除了喚醒在指定Condition中等待的線程以外,還會將count的數量設置成parties,恢復成CyclicBarrier初始化後的狀態,同時將generation引用指向一個新的Generation實例,所以CyclicBarrier是能夠複用的,同時須要注意的是,若是CyclicBarrier已經被破壞,那麼須要先調用一次reset()方法以後纔可以進行復用。