CyclicBarrier源碼分析

1、簡介

          CyclicBarrier是併發包中提供的一個同步輔助類,可使必定數量的線程所有在柵欄位置處聚集,parties的線程才能繼續往下執行。當線程到達柵欄位置時調用await方法,這個方法將阻塞直到全部線程都到達柵欄位置。若是全部線程都到達柵欄位置,那麼柵欄將打開,此時全部的線程都將被釋放,而柵欄將被重置以便下次使用。CyclicBarrier內部是基於ReentrantLock和Condition(AQS內部類)進行實現,維護一個parties的線程數。和CountDownLatch的區別是一、CyclicBarrier能夠被重用。二、CountDownLatch是經過AQS的共享模式進行實現,CyclicBarrier是基於ReentrantLock和Condition,以及內部維護一個parties的線程數進行實現,對ReentrantLock不清楚的,能夠看下個人另外一篇對ReentrantLock的源碼分析juejin.im/post/5d1b0d…。基於Java8。java

2、屬性

//獨佔鎖,屬性generation、 count不是線程安全須要在獨佔鎖加鎖操做的前提下,對ReentrantLock不清楚的能夠看下個人另外一篇https://juejin.im/post/5d1b0da8f265da1bc5527d36
private final ReentrantLock lock = new ReentrantLock();
//獨佔鎖的條件變量,等待全部線程都到達柵欄位置的全部線程,都是在條件隊列中等待全部線程都到達柵欄位置
private final Condition trip = lock.newCondition();
//還未到達柵欄位置的起始線程數量,主要是CyclicBarrier重複使用的時候,從新賦值給count
private final int parties;
//最後一個到達柵欄的線程須要執行的任務
private final Runnable barrierCommand;
//CyclicBarrier柵欄的代數,有一代被打破,CyclicBarrier就不能再使用了,即等待在柵欄的其中一個線程在等待的過程當中,此屬性不是線程安全的
private Generation generation = new Generation();
//還未到達柵欄位置的線程數量,有一個線程到達柵欄count就作減1操做,此屬性的操做須要在ReentrantLock獨佔鎖的條件下
private int count;複製代碼

3、內部類

//柵欄的代數,有一代被打破,CyclicBarrier就不能再使用了,即等待在柵欄的其中一個線程在等待的過程當中
private static class Generation {
        //若是柵欄被打破,broken就會置爲true
        boolean broken = false;
}複製代碼

4、構造函數

/** * 傳入parties和barrierAction構造CyclicBarrier實例 * * @param parties 還未到達柵欄位置的起始線程數量,主要是CyclicBarrier重複使用的時候,從新賦值給count * @param barrierAction 最後一個到達柵欄的線程須要執行的任務 */
public CyclicBarrier(int parties, Runnable barrierAction) {
        //若是傳入進來的parties小於等於0,拋出IllegalArgumentException異常 
        if (parties <= 0) throw new IllegalArgumentException();
        //將傳入進來的parties賦值給屬性parties
        this.parties = parties;
        //將傳入進來的parties賦值給屬性count
        this.count = parties;
        //將傳入進來的barrierAction任務賦值給屬性barrierAction 
        this.barrierCommand = barrierAction;
}

/** * 只傳入parties構造CyclicBarrier實例 * * @param parties 還未到達柵欄位置的起始線程數量,主要是CyclicBarrier重複使用的時候,從新賦值給count */
public CyclicBarrier(int parties) {
        //調用上面介紹的CyclicBarrier構造函數,barrierAction任務傳入空
        this(parties, null);
}
複製代碼

5、等待全部線程到達

/** * 返回當前線程第幾個到達柵欄,當前線程等待其餘全部線程到達柵欄,不然會阻塞到其餘線程都到達柵欄被喚醒 * * @return 當前線程到達柵欄的指數,即第幾個到達柵欄 * @throws InterruptedException 若是當前線程在等待其餘全部的線程也到達柵欄時被中斷,會拋出中斷異常,此時柵欄已經不能被使用了,Generation已經被打破,broken已經爲true,再次使用柵欄會拋出BrokenBarrierException異常 * @throws BrokenBarrierException 若是CyclicBarrier柵欄的Generation已經被打破,即Generation的屬性broken爲true,當前線程再次使用會拋出BrokenBarrierException異常 */
public int await() throws InterruptedException, BrokenBarrierException {
        try {
            //等待其餘全部線程都到達柵欄,會一直阻塞到全部線程到達柵欄被喚醒,或者其中一個線程被中斷破壞了柵欄,也會被喚醒,不支持超時的調用下面介紹的dowait方法 
            return dowait(false, 0L);
        } catch (TimeoutException toe) {//因爲是一直阻塞到被喚醒,爲此不會拋出超時異常,因爲TimeoutException是編譯異常,爲此須要對其進行捕獲
            //不會發生,若是發生直接將超時異常封裝成Error
            throw new Error(toe); 
        }
}

/** * 返回當前線程第幾個到達柵欄,當前線程超時的等待其餘全部線程到達柵欄,不然會超時的等待到其餘線程都到達柵欄被喚醒,若是超時其餘線程還都沒有所有到達柵欄,會拋出TimeoutException異常 * * @return 當前線程到達柵欄的指數,即第幾個到達柵欄 * @throws InterruptedException 若是當前線程在等待其餘全部的線程也到達柵欄時被中斷,會拋出中斷異常,此時柵欄已經不能被使用了,Generation已經被打破,broken已經爲true,再次使用柵欄會拋出BrokenBarrierException異常 * @throws BrokenBarrierException 若是CyclicBarrier柵欄的Generation已經被打破,即Generation的屬性broken爲true,當前線程再次使用會拋出BrokenBarrierException異常 * @throws TimeoutException 若是超時其餘線程還都沒有所有到達柵欄,會拋出TimeoutException異常 */
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException {
        //超時的調用下面介紹的dowait方法 
        return dowait(true, unit.toNanos(timeout));
}

/** * 返回當前線程第幾個到達柵欄,當前線程等待其餘全部線程到達柵欄,不然會阻塞到其餘線程都到達柵欄被喚醒,支持當前線程超時的等待和非超時的等待 * * @param timed 是否超時等待其餘線程到達柵欄 * @param nanos 超時的時間參數 * @return 當前線程到達柵欄的指數,即第幾個到達柵欄 * @throws InterruptedException 若是當前線程在等待其餘全部的線程也到達柵欄時被中斷,會拋出中斷異常,此時柵欄已經不能被使用了,Generation已經被打破,broken已經爲true,再次使用柵欄會拋出BrokenBarrierException異常 * @throws BrokenBarrierException 若是CyclicBarrier柵欄的Generation已經被打破,即Generation的屬性broken爲true,當前線程再次使用會拋出BrokenBarrierException異常 */
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
        //獲取獨佔鎖,因爲generation、count屬性都不是線程安全的,爲此須要在獨佔鎖下才能操做
        final ReentrantLock lock = this.lock;
        //加獨佔鎖
        lock.lock();
        try {
            //獲取CyclicBarrier柵欄的代信息屬性generation
            final Generation g = generation;
            
            //判斷柵欄是否被打破,即generation的broken屬性,若是CyclicBarrier柵欄被打破,broken屬性爲true,CyclicBarrier柵欄不能再被使用 
            if (g.broken)
                //若是CyclicBarrier柵欄被打破,拋出BrokenBarrierException異常
                throw new BrokenBarrierException();
            //判斷當前線程是否被中斷,若是被中斷,調用下面介紹的breakBarrier方法
            if (Thread.interrupted()) {
                //調用下面介紹的breakBarrier方法,將generation的broken屬性置爲true,表示CyclicBarrier柵欄被打破不能再被使用,將屬性count從新賦值爲parties屬性值,喚醒全部等待其餘線程都到達柵欄的線程 
                breakBarrier();
                //拋出InterruptedException異常
                throw new InterruptedException();
            }
            
            //將count先作減一操做,再賦值給index,表示當前線程第幾個到達柵欄,在當前線程被喚醒時,作爲返回值返回
            int index = --count;
            //若是當前線程是最後一個到達CyclicBarrier柵欄的線程
            if (index == 0) {  
                //最後一個到達柵欄的線程執行任務是否成功標識,再finally中須要作對應的處理,喚醒其餘對待的線程,有可能傳入進來的任務執行拋出異常
                boolean ranAction = false;
                try {
                    //獲取到須要最後一個到達柵欄的線程執行的任務,可能爲空 
                    final Runnable command = barrierCommand;
                    //若是任務不爲空
                    if (command != null)
                        //當前最後一個到達CyclicBarrier柵欄的線程執行任務,調用任務的run方法
                        command.run();
                    //若是任務不爲空,執行成功,ranAction標識爲置爲true
                    ranAction = true;
                    //調用下面介紹的nextGeneration方法,喚醒全部等待其餘線程都到達柵欄的線程
                    nextGeneration();
                    //返回0,由於是最後一個到達柵欄的線程
                    return 0;
                } finally {
                    //若是最後一個到達CyclicBarrier柵欄的線程執行不爲空的任務失敗
                    if (!ranAction)
                        //調用下面介紹的breakBarrier方法,將generation的broken屬性置爲true,表示CyclicBarrier柵欄被打破不能再被使用,將屬性count從新賦值爲parties屬性值,喚醒全部等待其餘線程都到達柵欄的線程
                        breakBarrier();
                }
            }
            
            //循環直到其餘線程都到達柵欄,或者柵欄不可用,或者等待的線程被中斷,或者等待其餘線程都到達柵欄超時 
            for (;;) {
                try {
                    //若是不支持超時 
                    if (!timed)
                        //調用Condition的await方法,Condition不清楚的能夠看個人另外一篇AQS源碼對Condition的分析https://juejin.im/post/5d0b3b55f265da1bc23f7dca
                        trip.await();
                    //若是支持超時,而且超時時間nanos小於0,調用await支持超時會拋出超時異常 
                    else if (nanos > 0L)
                        //若是支持超時timed爲true,而且nanos大於0,調用Condition的await的超時方法
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {//若是線程在等待其餘全部線程到大柵欄的過程當中,被中斷
                    //若是柵欄的代信息對象沒有改變,而且代信息對象的broken屬性爲false,柵欄沒有被打破,爲何代信息對象會改變,由於調用下面介紹的reset的方法,會改變代信息
                    if (g == generation && ! g.broken) {
                        //調用下面介紹的breakBarrier方法,將generation的broken屬性置爲true,表示CyclicBarrier柵欄被打破不能再被使用,將屬性count從新賦值爲parties屬性值,喚醒全部等待其餘線程都到達柵欄的線程 
                        breakBarrier();
                        //拋出線程被中斷異常 
                        throw ie;
                    } else {
                        //若是上面的柵欄的代信息已經改變,或者柵欄已經被打破,即generation的屬性broken爲true,重置線程的中斷標誌位,由於異常線程的中斷標誌位會被重置
                        Thread.currentThread().interrupt();
                    }
                }
                //若是CyclicBarrier柵欄已經被打破,即generation的屬性broken爲true
                if (g.broken)
                    //拋出BrokenBarrierException異常
                    throw new BrokenBarrierException();
                //若是柵欄的代信息已經改變,主動調用reset方法,會改變柵欄的代信息
                if (g != generation)
                    //返回線程第幾個到達柵欄
                    return index;
                //若是超時的調用await方法,即timed爲true,而且nanos小於等於0
                if (timed && nanos <= 0L) {
                    //調用下面介紹的breakBarrier方法,將generation的broken屬性置爲true,表示CyclicBarrier柵欄被打破不能再被使用,將屬性count從新賦值爲parties屬性值,喚醒全部等待其餘線程都到達柵欄的線程 
                    breakBarrier();
                    //拋出超時異常
                    throw new TimeoutException();
                }
            }
        } finally {
            //釋放獨佔鎖
            lock.unlock();
        }
}

//重置CyclicBarrier柵欄的代信息,喚醒全部等待其餘線程都到達柵欄的線程,以及將count重置爲parties
private void nextGeneration() {
        //喚醒全部等待其餘線程都到達柵欄的線程
        trip.signalAll();
        //將count重置爲parties 
        count = parties;
        //重置CyclicBarrier柵欄的代信息 
        generation = new Generation();
}

//將generation的broken屬性置爲true,表示CyclicBarrier柵欄被打破不能再被使用,將屬性count從新賦值爲parties屬性值,喚醒全部等待其餘線程都到達柵欄的線程
private void breakBarrier() { 
        //將generation的broken屬性置爲true 
        generation.broken = true;
        //將count重置爲parties
        count = parties;
        //喚醒全部等待其餘線程都到達柵欄的線程
        trip.signalAll();
}
複製代碼

6、其餘方法

//判斷CyclicBarrier柵欄是否被打破,即CyclicBarrier柵欄的代信息generation的屬性broken是否爲true
public boolean isBroken() {
        //獲取獨佔鎖,因爲generation屬性不是線程安全的,爲此須要在獨佔鎖下才能操做
        final ReentrantLock lock = this.lock;
        //加獨佔鎖
        lock.lock();
        try {
            //返回CyclicBarrier柵欄的代信息generation的屬性broken,若是爲true代表柵欄被打破
            return generation.broken;
        } finally {
            //釋放獨佔鎖
            lock.unlock();
        }
}

//重置CyclicBarrier柵欄
public void reset() {
        //獲取獨佔鎖,因爲generation、count屬性都不是線程安全的,爲此須要在獨佔鎖下才能操做
        final ReentrantLock lock = this.lock;
        //加獨佔鎖
        lock.lock();
        try {
            //調用上面介紹的breakBarrier方法,將generation的broken屬性置爲true,表示CyclicBarrier柵欄被打破不能再被使用,將屬性count從新賦值爲parties屬性值,喚醒全部等待其餘線程都到達柵欄的線程 
            breakBarrier();   // break the current generation
            //重置CyclicBarrier柵欄的代信息,喚醒全部等待其餘線程都到達柵欄的線程,以及將count重置爲parties
            nextGeneration(); // start a new generation
        } finally {
            //釋放獨佔鎖
            lock.unlock();
        }
}

//獲取有多少線程在等待其餘線程到達柵欄
public int getNumberWaiting() {
        //獲取獨佔鎖,因爲count屬性都不是線程安全的,爲此須要在獨佔鎖下才能操做
        final ReentrantLock lock = this.lock;
        //加獨佔鎖
        lock.lock();
        try {
            //使用還未到達柵欄位置的起始線程數量parties減去目前還未到達柵欄位置的線程數量,獲取到有多少線程在等待其餘線程到達柵欄 
            return parties - count;
        } finally {
            //釋放獨佔鎖
            lock.unlock();
        }
}
複製代碼
相關文章
相關標籤/搜索