CyclicBarrier是併發包中提供的一個同步輔助類,可使必定數量的線程所有在柵欄位置處聚集,parties的線程才能繼續往下執行。當線程到達柵欄位置時調用await方法,這個方法將阻塞直到全部線程都到達柵欄位置。若是全部線程都到達柵欄位置,那麼柵欄將打開,此時全部的線程都將被釋放,而柵欄將被重置以便下次使用。CyclicBarrier內部是基於ReentrantLock和Condition(AQS內部類)進行實現,維護一個parties的線程數。和CountDownLatch的區別是一、CyclicBarrier能夠被重用。二、CountDownLatch是經過AQS的共享模式進行實現,CyclicBarrier是基於ReentrantLock和Condition,以及內部維護一個parties的線程數進行實現,對ReentrantLock不清楚的,能夠看下個人另外一篇對ReentrantLock的源碼分析juejin.im/post/5d1b0d…。基於Java8。java
//獨佔鎖,屬性generation、 count不是線程安全須要在獨佔鎖加鎖操做的前提下,對ReentrantLock不清楚的能夠看下個人另外一篇https://juejin.im/post/5d1b0da8f265da1bc5527d36
private final ReentrantLock lock = new ReentrantLock();
//獨佔鎖的條件變量,等待全部線程都到達柵欄位置的全部線程,都是在條件隊列中等待全部線程都到達柵欄位置
private final Condition trip = lock.newCondition();
//還未到達柵欄位置的起始線程數量,主要是CyclicBarrier重複使用的時候,從新賦值給count
private final int parties;
//最後一個到達柵欄的線程須要執行的任務
private final Runnable barrierCommand;
//CyclicBarrier柵欄的代數,有一代被打破,CyclicBarrier就不能再使用了,即等待在柵欄的其中一個線程在等待的過程當中,此屬性不是線程安全的
private Generation generation = new Generation();
//還未到達柵欄位置的線程數量,有一個線程到達柵欄count就作減1操做,此屬性的操做須要在ReentrantLock獨佔鎖的條件下
private int count;複製代碼
//柵欄的代數,有一代被打破,CyclicBarrier就不能再使用了,即等待在柵欄的其中一個線程在等待的過程當中
private static class Generation {
//若是柵欄被打破,broken就會置爲true
boolean broken = false;
}複製代碼
/** * 傳入parties和barrierAction構造CyclicBarrier實例 * * @param parties 還未到達柵欄位置的起始線程數量,主要是CyclicBarrier重複使用的時候,從新賦值給count * @param barrierAction 最後一個到達柵欄的線程須要執行的任務 */
public CyclicBarrier(int parties, Runnable barrierAction) {
//若是傳入進來的parties小於等於0,拋出IllegalArgumentException異常
if (parties <= 0) throw new IllegalArgumentException();
//將傳入進來的parties賦值給屬性parties
this.parties = parties;
//將傳入進來的parties賦值給屬性count
this.count = parties;
//將傳入進來的barrierAction任務賦值給屬性barrierAction
this.barrierCommand = barrierAction;
}
/** * 只傳入parties構造CyclicBarrier實例 * * @param parties 還未到達柵欄位置的起始線程數量,主要是CyclicBarrier重複使用的時候,從新賦值給count */
public CyclicBarrier(int parties) {
//調用上面介紹的CyclicBarrier構造函數,barrierAction任務傳入空
this(parties, null);
}
複製代碼
/** * 返回當前線程第幾個到達柵欄,當前線程等待其餘全部線程到達柵欄,不然會阻塞到其餘線程都到達柵欄被喚醒 * * @return 當前線程到達柵欄的指數,即第幾個到達柵欄 * @throws InterruptedException 若是當前線程在等待其餘全部的線程也到達柵欄時被中斷,會拋出中斷異常,此時柵欄已經不能被使用了,Generation已經被打破,broken已經爲true,再次使用柵欄會拋出BrokenBarrierException異常 * @throws BrokenBarrierException 若是CyclicBarrier柵欄的Generation已經被打破,即Generation的屬性broken爲true,當前線程再次使用會拋出BrokenBarrierException異常 */
public int await() throws InterruptedException, BrokenBarrierException {
try {
//等待其餘全部線程都到達柵欄,會一直阻塞到全部線程到達柵欄被喚醒,或者其中一個線程被中斷破壞了柵欄,也會被喚醒,不支持超時的調用下面介紹的dowait方法
return dowait(false, 0L);
} catch (TimeoutException toe) {//因爲是一直阻塞到被喚醒,爲此不會拋出超時異常,因爲TimeoutException是編譯異常,爲此須要對其進行捕獲
//不會發生,若是發生直接將超時異常封裝成Error
throw new Error(toe);
}
}
/** * 返回當前線程第幾個到達柵欄,當前線程超時的等待其餘全部線程到達柵欄,不然會超時的等待到其餘線程都到達柵欄被喚醒,若是超時其餘線程還都沒有所有到達柵欄,會拋出TimeoutException異常 * * @return 當前線程到達柵欄的指數,即第幾個到達柵欄 * @throws InterruptedException 若是當前線程在等待其餘全部的線程也到達柵欄時被中斷,會拋出中斷異常,此時柵欄已經不能被使用了,Generation已經被打破,broken已經爲true,再次使用柵欄會拋出BrokenBarrierException異常 * @throws BrokenBarrierException 若是CyclicBarrier柵欄的Generation已經被打破,即Generation的屬性broken爲true,當前線程再次使用會拋出BrokenBarrierException異常 * @throws TimeoutException 若是超時其餘線程還都沒有所有到達柵欄,會拋出TimeoutException異常 */
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException {
//超時的調用下面介紹的dowait方法
return dowait(true, unit.toNanos(timeout));
}
/** * 返回當前線程第幾個到達柵欄,當前線程等待其餘全部線程到達柵欄,不然會阻塞到其餘線程都到達柵欄被喚醒,支持當前線程超時的等待和非超時的等待 * * @param timed 是否超時等待其餘線程到達柵欄 * @param nanos 超時的時間參數 * @return 當前線程到達柵欄的指數,即第幾個到達柵欄 * @throws InterruptedException 若是當前線程在等待其餘全部的線程也到達柵欄時被中斷,會拋出中斷異常,此時柵欄已經不能被使用了,Generation已經被打破,broken已經爲true,再次使用柵欄會拋出BrokenBarrierException異常 * @throws BrokenBarrierException 若是CyclicBarrier柵欄的Generation已經被打破,即Generation的屬性broken爲true,當前線程再次使用會拋出BrokenBarrierException異常 */
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
//獲取獨佔鎖,因爲generation、count屬性都不是線程安全的,爲此須要在獨佔鎖下才能操做
final ReentrantLock lock = this.lock;
//加獨佔鎖
lock.lock();
try {
//獲取CyclicBarrier柵欄的代信息屬性generation
final Generation g = generation;
//判斷柵欄是否被打破,即generation的broken屬性,若是CyclicBarrier柵欄被打破,broken屬性爲true,CyclicBarrier柵欄不能再被使用
if (g.broken)
//若是CyclicBarrier柵欄被打破,拋出BrokenBarrierException異常
throw new BrokenBarrierException();
//判斷當前線程是否被中斷,若是被中斷,調用下面介紹的breakBarrier方法
if (Thread.interrupted()) {
//調用下面介紹的breakBarrier方法,將generation的broken屬性置爲true,表示CyclicBarrier柵欄被打破不能再被使用,將屬性count從新賦值爲parties屬性值,喚醒全部等待其餘線程都到達柵欄的線程
breakBarrier();
//拋出InterruptedException異常
throw new InterruptedException();
}
//將count先作減一操做,再賦值給index,表示當前線程第幾個到達柵欄,在當前線程被喚醒時,作爲返回值返回
int index = --count;
//若是當前線程是最後一個到達CyclicBarrier柵欄的線程
if (index == 0) {
//最後一個到達柵欄的線程執行任務是否成功標識,再finally中須要作對應的處理,喚醒其餘對待的線程,有可能傳入進來的任務執行拋出異常
boolean ranAction = false;
try {
//獲取到須要最後一個到達柵欄的線程執行的任務,可能爲空
final Runnable command = barrierCommand;
//若是任務不爲空
if (command != null)
//當前最後一個到達CyclicBarrier柵欄的線程執行任務,調用任務的run方法
command.run();
//若是任務不爲空,執行成功,ranAction標識爲置爲true
ranAction = true;
//調用下面介紹的nextGeneration方法,喚醒全部等待其餘線程都到達柵欄的線程
nextGeneration();
//返回0,由於是最後一個到達柵欄的線程
return 0;
} finally {
//若是最後一個到達CyclicBarrier柵欄的線程執行不爲空的任務失敗
if (!ranAction)
//調用下面介紹的breakBarrier方法,將generation的broken屬性置爲true,表示CyclicBarrier柵欄被打破不能再被使用,將屬性count從新賦值爲parties屬性值,喚醒全部等待其餘線程都到達柵欄的線程
breakBarrier();
}
}
//循環直到其餘線程都到達柵欄,或者柵欄不可用,或者等待的線程被中斷,或者等待其餘線程都到達柵欄超時
for (;;) {
try {
//若是不支持超時
if (!timed)
//調用Condition的await方法,Condition不清楚的能夠看個人另外一篇AQS源碼對Condition的分析https://juejin.im/post/5d0b3b55f265da1bc23f7dca
trip.await();
//若是支持超時,而且超時時間nanos小於0,調用await支持超時會拋出超時異常
else if (nanos > 0L)
//若是支持超時timed爲true,而且nanos大於0,調用Condition的await的超時方法
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {//若是線程在等待其餘全部線程到大柵欄的過程當中,被中斷
//若是柵欄的代信息對象沒有改變,而且代信息對象的broken屬性爲false,柵欄沒有被打破,爲何代信息對象會改變,由於調用下面介紹的reset的方法,會改變代信息
if (g == generation && ! g.broken) {
//調用下面介紹的breakBarrier方法,將generation的broken屬性置爲true,表示CyclicBarrier柵欄被打破不能再被使用,將屬性count從新賦值爲parties屬性值,喚醒全部等待其餘線程都到達柵欄的線程
breakBarrier();
//拋出線程被中斷異常
throw ie;
} else {
//若是上面的柵欄的代信息已經改變,或者柵欄已經被打破,即generation的屬性broken爲true,重置線程的中斷標誌位,由於異常線程的中斷標誌位會被重置
Thread.currentThread().interrupt();
}
}
//若是CyclicBarrier柵欄已經被打破,即generation的屬性broken爲true
if (g.broken)
//拋出BrokenBarrierException異常
throw new BrokenBarrierException();
//若是柵欄的代信息已經改變,主動調用reset方法,會改變柵欄的代信息
if (g != generation)
//返回線程第幾個到達柵欄
return index;
//若是超時的調用await方法,即timed爲true,而且nanos小於等於0
if (timed && nanos <= 0L) {
//調用下面介紹的breakBarrier方法,將generation的broken屬性置爲true,表示CyclicBarrier柵欄被打破不能再被使用,將屬性count從新賦值爲parties屬性值,喚醒全部等待其餘線程都到達柵欄的線程
breakBarrier();
//拋出超時異常
throw new TimeoutException();
}
}
} finally {
//釋放獨佔鎖
lock.unlock();
}
}
//重置CyclicBarrier柵欄的代信息,喚醒全部等待其餘線程都到達柵欄的線程,以及將count重置爲parties
private void nextGeneration() {
//喚醒全部等待其餘線程都到達柵欄的線程
trip.signalAll();
//將count重置爲parties
count = parties;
//重置CyclicBarrier柵欄的代信息
generation = new Generation();
}
//將generation的broken屬性置爲true,表示CyclicBarrier柵欄被打破不能再被使用,將屬性count從新賦值爲parties屬性值,喚醒全部等待其餘線程都到達柵欄的線程
private void breakBarrier() {
//將generation的broken屬性置爲true
generation.broken = true;
//將count重置爲parties
count = parties;
//喚醒全部等待其餘線程都到達柵欄的線程
trip.signalAll();
}
複製代碼
//判斷CyclicBarrier柵欄是否被打破,即CyclicBarrier柵欄的代信息generation的屬性broken是否爲true
public boolean isBroken() {
//獲取獨佔鎖,因爲generation屬性不是線程安全的,爲此須要在獨佔鎖下才能操做
final ReentrantLock lock = this.lock;
//加獨佔鎖
lock.lock();
try {
//返回CyclicBarrier柵欄的代信息generation的屬性broken,若是爲true代表柵欄被打破
return generation.broken;
} finally {
//釋放獨佔鎖
lock.unlock();
}
}
//重置CyclicBarrier柵欄
public void reset() {
//獲取獨佔鎖,因爲generation、count屬性都不是線程安全的,爲此須要在獨佔鎖下才能操做
final ReentrantLock lock = this.lock;
//加獨佔鎖
lock.lock();
try {
//調用上面介紹的breakBarrier方法,將generation的broken屬性置爲true,表示CyclicBarrier柵欄被打破不能再被使用,將屬性count從新賦值爲parties屬性值,喚醒全部等待其餘線程都到達柵欄的線程
breakBarrier(); // break the current generation
//重置CyclicBarrier柵欄的代信息,喚醒全部等待其餘線程都到達柵欄的線程,以及將count重置爲parties
nextGeneration(); // start a new generation
} finally {
//釋放獨佔鎖
lock.unlock();
}
}
//獲取有多少線程在等待其餘線程到達柵欄
public int getNumberWaiting() {
//獲取獨佔鎖,因爲count屬性都不是線程安全的,爲此須要在獨佔鎖下才能操做
final ReentrantLock lock = this.lock;
//加獨佔鎖
lock.lock();
try {
//使用還未到達柵欄位置的起始線程數量parties減去目前還未到達柵欄位置的線程數量,獲取到有多少線程在等待其餘線程到達柵欄
return parties - count;
} finally {
//釋放獨佔鎖
lock.unlock();
}
}
複製代碼