CyclicBarrier
是一種相似於柵欄的存在,意思就是在柵欄開放以前你都只能被擋在柵欄的一側,當柵欄移除以後,以前被擋在一側的多個對象則同時開始動起來。html
在介紹其原理以前,先了解一下CyclicBarrier
應該如何使用。java
假設如今有這樣的場景,咱們須要開一個會議,須要張一、張二、張3三我的參加,
會議須要三我的都到齊以後才能開始,不然只能乾等着;這個場景用CyclicBarrier
能夠很契合的模擬出來。代碼以下:app
public static void main(String[] args) { // 線程池,每一個線程表明一我的 ThreadPoolExecutor executor = ThreadPoolProvider.getInstance(); // 會議所需的人數爲3 CyclicBarrier barrier = new CyclicBarrier(3); executor.execute(() -> { try { System.err.println("張1到達會議室"); barrier.await(); System.err.println("會議開始,張1開始發言"); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }); executor.execute(() -> { try { System.err.println("張2到達會議室"); barrier.await(); System.err.println("會議開始,張2開始發言"); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }); executor.execute(() -> { try { System.err.println("張3先去個廁所,內急解決再去開會"); TimeUnit.SECONDS.sleep(1); System.err.println("張3到達會議室"); barrier.await(); System.err.println("會議開始,張3開始發言"); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }); executor.shutdown(); }
結果圖:
經過上方代碼能夠知道CyclicBarrier
的幾點:ide
await()
來表示完成了某些事情。(上方例子的表現爲到達了會議室)await()
以後當前線程就進入阻塞狀態,須要等待徹底知足CyclicBarrier
的條件後喚醒才能繼續接下來的操做。(上方例子中 爲3我的都到達會議室) 這個簡單的例子也讓咱們瞭解CyclicBarrier
的使用方法,那來看看其內部到底是如何實現柵欄的效果的。函數
從第一節的代碼中咱們也能看到,須要關注的就兩個地方this
- 構造函數
- await()方法
只要瞭解這兩個方法的內部,至關於瞭解了CyclicBarrier
的內部。
那在深刻了解以前,先來看下CyclicBarrier
的幾個變量,不用刻意去記,看代碼的時候知道這個東西作什麼用的就好了:線程
lock:
CyclicBarrier
類建立的ReentrantLock
實例,關於ReentrantLock
不清楚的能夠->傳送。codetrip:
lock
中的condition
,CyclicBarrier
使用該變量來實現各線程之間的阻塞和同時喚醒。一樣,不明白condition
做用的=>傳送門。htmparties:須要知足條件(調用
await
方法)的總數,就是說當有parties個線程await()以後就會喚醒所有線程。對象barrierCommand:一個
Runnable
變量,在await
方法的調用次數到達總數parties
以後,在喚醒所有線程以前執行其run()
方法generation:其內部類,能夠理解爲週期,週期內須要完成n個任務,只要一個任務失敗,當前週期的全部任務就算失敗,結束當前週期,再開啓下個週期。
count:當前週期剩餘須要完成的任務數(剩餘調用
await
方法的次數)
如下爲源碼:
public class CyclicBarrier { // 內部類,可理解爲週期 private static class Generation { // 當前週期是否失敗 boolean broken = false; } // 鎖的實例 private final ReentrantLock lock = new ReentrantLock(); // ReentrantLock的condition變量,用來控制線程喚醒和阻塞 private final Condition trip = lock.newCondition(); // 須要知足條件的次數,即須要調用await方法的次數 private final int parties; // 知足條件次數達到parties以後,喚醒全部線程以前執行其 run()方法 private final Runnable barrierCommand; // 當前週期 private Generation generation = new Generation(); // 剩餘知足條件次數 private int count; // ... }
看完CyclicBarrier
的幾個變量後,來看其具體的內部實現。
首先來看構造函數,其構造函數有兩個,一個在達到條件總數(parties)後直接叫醒全部線程;另外一個指定一個Runnable
在達到條件總數後先執行其run()方法再叫醒。
Runnable
,參數只有一個:須要達成的任務數public CyclicBarrier(int parties) { // 直接調用另外一個構造方法,Runnable傳null,表示不執行 this(parties, null); }
Runnable
的構造方法,賦值任務總數、剩餘任務數、喚醒操做以前的Runnable
public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); // 任務總數 this.parties = parties; // 剩餘須要完成的任務數 this.count = parties; // 喚醒以前執行的Runnable this.barrierCommand = barrierAction; }
在第一節咱們使用的是第一個構造方法,來試試第二個
public static void main(String[] args) throws InterruptedException { ThreadPoolExecutor executor = ThreadPoolProvider.getInstance(); /** =======增長Runnable,其餘地方保持一致=============*/ CyclicBarrier barrier = new CyclicBarrier(3, ()-> System.err.println("在會議開始以前,先給你們發下開會資料")); executor.execute(() -> { try { System.err.println("張1到達會議室"); barrier.await(); System.err.println("會議開始,張1開始發言"); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }); executor.execute(() -> { try { System.err.println("張2到達會議室"); barrier.await(); System.err.println("會議開始,張2開始發言"); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }); executor.execute(() -> { try { System.err.println("張3先去個廁所,內急解決再去開會"); TimeUnit.SECONDS.sleep(1); System.err.println("張3到達會議室"); barrier.await(); System.err.println("會議開始,張3開始發言"); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }); executor.shutdown(); }
結果圖:
看完構造函數,就算理解了一半CyclicBarrier
了,接下來來看另外一半——await()
;跟蹤代碼,看到是這樣的
public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } }
直接調用dowait
方法,傳參爲false
和0,意思就是不限時等待,除非線程被打斷或者喚醒。再進入dowait
方法,這個方法就是CyclicBarrier
的另外一半,在下方的代碼中很清楚的寫了整個執行流程
/** 參數說明, timed:是否限時, nanos:限時時間*/ private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { // 鎖 final ReentrantLock lock = this.lock; // 獲取鎖,若是失敗的話線程睡眠,進入同步隊列(AQS中的知識) lock.lock(); try { /* 拿到鎖以後進入代碼處理邏輯*/ // 當前週期 final Generation g = generation; // 若是當前週期是失敗的,那麼直接拋錯 if (g.broken) throw new BrokenBarrierException(); // 若是當前線程被打斷了,那麼這次週期失敗,設置相關參數,而後拋錯 if (Thread.interrupted()) { // 實現代碼在下行的註釋中,設置相關參數來提醒其餘線程週期失敗了 breakBarrier(); /* * private void breakBarrier() { * generation.broken = true; * count = parties; * // 喚醒condition中的全部線程 * trip.signalAll(); * } */ throw new InterruptedException(); } // 若是成功了,那麼剩餘任務數(count)減1 int index = --count; // 若是爲0則表示達到剩餘的任務數沒有了,達到CyclicBarrier的條件總數了,須要喚醒其餘線程 if (index == 0) { boolean ranAction = false; try { // 喚醒以前的Runnable final Runnable command = barrierCommand; // 若是不爲空的話執行其run方法 if (command != null) command.run(); ranAction = true; // 開啓下個週期,這個方法是CyclicBarrier能夠複用的緣由,具體實如今下行註釋 nextGeneration(); /* private void nextGeneration() { * // 首先叫醒當前週期的其餘線程,告訴其週期結束了,能夠執行接下來的操做了 * trip.signalAll(); * // 而後開啓下個週期,剩餘任務數重置 * count = parties; * // 下個週期 * generation = new Generation(); * } */ return 0; } finally { if (!ranAction) breakBarrier(); } } // 若是還不能結束本週期,就一直等待直到結束或者週期失敗 for (;;) { try { // await的過程當中是釋放鎖的 // 不限時的話就一直等待直到被喚醒或者打斷 if (!timed) trip.await(); else if (nanos > 0L) // 不然的話等待一段時間後醒來 nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { // 釋放鎖 lock.unlock(); } }
到這裏就基本理解CyclicBarrier
的內部實現了,其餘像帶參數的await
也是同樣邏輯,只不過是多了限時的條件而已。
其實若是你瞭解ReentrantLock
的話,就知道CyclicBarrier
整個就是對ReentrantLock
的condition
的活用而已。
總體來講CyclicBarrier
的實現相對較簡單,說是ReentrantLock
中condition
的升級版也不爲過。其關鍵點爲兩個,一個爲其構造函數,決定任務個數和喚醒前操做;另一個點爲await
方法,在正常狀況下每次await
都會減小一個任務數(總數由構造方法決定),在任務數變爲0的時候表示週期結束,須要喚醒condition
的其餘線程,而途中遇到失敗的話當前週期失敗,喚醒其餘線程一塊兒拋錯。
失敗不會讓你變得弱小,懼怕失敗會。