CyclicBarrier是如何成爲一個"柵欄"的

CyclicBarrier是一種相似於柵欄的存在,意思就是在柵欄開放以前你都只能被擋在柵欄的一側,當柵欄移除以後,以前被擋在一側的多個對象則同時開始動起來。html

1. 如何使用CyclicBarrier

  在介紹其原理以前,先了解一下CyclicBarrier應該如何使用。java

  假設如今有這樣的場景,咱們須要開一個會議,須要張一、張二、張3三我的參加,
會議須要三我的都到齊以後才能開始,不然只能乾等着;這個場景用CyclicBarrier能夠很契合的模擬出來。代碼以下:app

public static void main(String[] args) {
    // 線程池,每一個線程表明一我的
    ThreadPoolExecutor executor = ThreadPoolProvider.getInstance();
    // 會議所需的人數爲3
    CyclicBarrier barrier = new CyclicBarrier(3);

    executor.execute(() -> {
        try {
            System.err.println("張1到達會議室");
            barrier.await();
            System.err.println("會議開始,張1開始發言");
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }

    });

    executor.execute(() -> {
        try {
            System.err.println("張2到達會議室");
            barrier.await();
            System.err.println("會議開始,張2開始發言");
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }

    });

    executor.execute(() -> {
        try {
            System.err.println("張3先去個廁所,內急解決再去開會");
            TimeUnit.SECONDS.sleep(1);
            System.err.println("張3到達會議室");
            barrier.await();
            System.err.println("會議開始,張3開始發言");
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }

    });


    executor.shutdown();
}

結果圖:
例子圖
  經過上方代碼能夠知道CyclicBarrier的幾點:ide

  1. 使用await()來表示完成了某些事情。(上方例子的表現爲到達了會議室)
  2. 使用await()以後當前線程就進入阻塞狀態,須要等待徹底知足CyclicBarrier的條件後喚醒才能繼續接下來的操做。(上方例子中 爲3我的都到達會議室)
  3. 在最後一個線程達到條件以後,以前阻塞的線程所有放開,繼續接下來的操做。(上方例子爲張3到達會議室)

  這個簡單的例子也讓咱們瞭解CyclicBarrier的使用方法,那來看看其內部到底是如何實現柵欄的效果的。函數

2. CyclicBarrier是如何成爲"柵欄"的

  從第一節的代碼中咱們也能看到,須要關注的就兩個地方this

  1. 構造函數
  2. await()方法

只要瞭解這兩個方法的內部,至關於瞭解了CyclicBarrier的內部。
那在深刻了解以前,先來看下CyclicBarrier的幾個變量,不用刻意去記,看代碼的時候知道這個東西作什麼用的就好了:線程

lock:CyclicBarrier類建立的ReentrantLock實例,關於ReentrantLock不清楚的能夠->傳送。code

trip:lock中的conditionCyclicBarrier使用該變量來實現各線程之間的阻塞和同時喚醒。一樣,不明白condition做用的=>傳送門htm

parties:須要知足條件(調用await方法)的總數,就是說當有parties個線程await()以後就會喚醒所有線程。對象

barrierCommand:一個Runnable變量,在await方法的調用次數到達總數parties以後,在喚醒所有線程以前執行其run()方法

generation:其內部類,能夠理解爲週期,週期內須要完成n個任務,只要一個任務失敗,當前週期的全部任務就算失敗,結束當前週期,再開啓下個週期。

count:當前週期剩餘須要完成的任務數(剩餘調用await方法的次數)

如下爲源碼:

public class CyclicBarrier {
    // 內部類,可理解爲週期
    private static class Generation {
        // 當前週期是否失敗
        boolean broken = false;
    }

    // 鎖的實例
    private final ReentrantLock lock = new ReentrantLock();
    // ReentrantLock的condition變量,用來控制線程喚醒和阻塞
    private final Condition trip = lock.newCondition();
    // 須要知足條件的次數,即須要調用await方法的次數
    private final int parties;
    // 知足條件次數達到parties以後,喚醒全部線程以前執行其 run()方法
    private final Runnable barrierCommand;
    // 當前週期
    private Generation generation = new Generation();
    // 剩餘知足條件次數
    private int count;
    
    // ...
}

  看完CyclicBarrier的幾個變量後,來看其具體的內部實現。

  首先來看構造函數,其構造函數有兩個,一個在達到條件總數(parties)後直接叫醒全部線程;另外一個指定一個Runnable在達到條件總數後先執行其run()方法再叫醒。

  • 不指定Runnable,參數只有一個:須要達成的任務數
public CyclicBarrier(int parties) {
    // 直接調用另外一個構造方法,Runnable傳null,表示不執行
    this(parties, null);
}
  • 指定Runnable的構造方法,賦值任務總數、剩餘任務數、喚醒操做以前的Runnable
public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    // 任務總數
    this.parties = parties;
    // 剩餘須要完成的任務數
    this.count = parties;
    // 喚醒以前執行的Runnable
    this.barrierCommand = barrierAction;
}

  在第一節咱們使用的是第一個構造方法,來試試第二個

public static void main(String[] args) throws InterruptedException {

    ThreadPoolExecutor executor = ThreadPoolProvider.getInstance();
    /** =======增長Runnable,其餘地方保持一致=============*/
    CyclicBarrier barrier = new CyclicBarrier(3, ()-> System.err.println("在會議開始以前,先給你們發下開會資料"));

    executor.execute(() -> {
        try {
            System.err.println("張1到達會議室");
            barrier.await();
            System.err.println("會議開始,張1開始發言");
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }

    });

    executor.execute(() -> {
        try {
            System.err.println("張2到達會議室");
            barrier.await();
            System.err.println("會議開始,張2開始發言");
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }

    });

    executor.execute(() -> {
        try {
            System.err.println("張3先去個廁所,內急解決再去開會");
            TimeUnit.SECONDS.sleep(1);
            System.err.println("張3到達會議室");
            barrier.await();
            System.err.println("會議開始,張3開始發言");
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }

    });


    executor.shutdown();
}

結果圖:

pic2

 看完構造函數,就算理解了一半CyclicBarrier了,接下來來看另外一半——await();跟蹤代碼,看到是這樣的

public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}

直接調用dowait方法,傳參爲false0,意思就是不限時等待,除非線程被打斷或者喚醒。再進入dowait方法,這個方法就是CyclicBarrier的另外一半,在下方的代碼中很清楚的寫了整個執行流程

/** 參數說明, timed:是否限時, nanos:限時時間*/
private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException, TimeoutException {
    // 鎖
    final ReentrantLock lock = this.lock;
    // 獲取鎖,若是失敗的話線程睡眠,進入同步隊列(AQS中的知識)
    lock.lock();
    try {
        /* 拿到鎖以後進入代碼處理邏輯*/
        
        // 當前週期
        final Generation g = generation;

        // 若是當前週期是失敗的,那麼直接拋錯
        if (g.broken)
            throw new BrokenBarrierException();

        // 若是當前線程被打斷了,那麼這次週期失敗,設置相關參數,而後拋錯
        if (Thread.interrupted()) {
            // 實現代碼在下行的註釋中,設置相關參數來提醒其餘線程週期失敗了
            breakBarrier();
            /*
             * private void breakBarrier() {
             *     generation.broken = true;
             *     count = parties;
             *     // 喚醒condition中的全部線程
             *     trip.signalAll();
             * }
             */
            throw new InterruptedException();
        }

        // 若是成功了,那麼剩餘任務數(count)減1
        int index = --count;
        // 若是爲0則表示達到剩餘的任務數沒有了,達到CyclicBarrier的條件總數了,須要喚醒其餘線程
        if (index == 0) {  
            boolean ranAction = false;
            try {
                // 喚醒以前的Runnable
                final Runnable command = barrierCommand;
                // 若是不爲空的話執行其run方法
                if (command != null)
                    command.run();
                ranAction = true;
                // 開啓下個週期,這個方法是CyclicBarrier能夠複用的緣由,具體實如今下行註釋
                nextGeneration();
                /* private void nextGeneration() {
                 *     // 首先叫醒當前週期的其餘線程,告訴其週期結束了,能夠執行接下來的操做了
                 *     trip.signalAll();
                 *     // 而後開啓下個週期,剩餘任務數重置
                 *     count = parties;
                 *     // 下個週期
                 *     generation = new Generation();
                 * }
                 */
                return 0;
            } finally {
                if (!ranAction)
                    breakBarrier();
            }
        }

        // 若是還不能結束本週期,就一直等待直到結束或者週期失敗
        for (;;) {
            try {
                // await的過程當中是釋放鎖的
                // 不限時的話就一直等待直到被喚醒或者打斷
                if (!timed)
                    trip.await();
                else if (nanos > 0L)
                    // 不然的話等待一段時間後醒來
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    // We're about to finish waiting even if we had not
                    // been interrupted, so this interrupt is deemed to
                    // "belong" to subsequent execution.
                    Thread.currentThread().interrupt();
                }
            }

            if (g.broken)
                throw new BrokenBarrierException();

            if (g != generation)
                return index;

            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        // 釋放鎖
        lock.unlock();
    }
}

  到這裏就基本理解CyclicBarrier的內部實現了,其餘像帶參數的await也是同樣邏輯,只不過是多了限時的條件而已。

  其實若是你瞭解ReentrantLock的話,就知道CyclicBarrier整個就是對ReentrantLockcondition的活用而已。

3.總結

  總體來講CyclicBarrier的實現相對較簡單,說是ReentrantLockcondition的升級版也不爲過。其關鍵點爲兩個,一個爲其構造函數,決定任務個數和喚醒前操做;另一個點爲await方法,在正常狀況下每次await都會減小一個任務數(總數由構造方法決定),在任務數變爲0的時候表示週期結束,須要喚醒condition的其餘線程,而途中遇到失敗的話當前週期失敗,喚醒其餘線程一塊兒拋錯。



失敗不會讓你變得弱小,懼怕失敗會。

相關文章
相關標籤/搜索