阿里一面CyclicBarrier和CountDownLatch的區別是啥

引言

前面一篇文章咱們《Java高併發編程基礎三大利器之CountDownLatch》它有一個缺點,就是它的計數器只可以使用一次,也就是說當計數器(state)減到爲 0的時候,若是 再有線程調用去 await() 方法,該線程會直接經過,不會再起到等待其餘線程執行結果起到同步的做用。爲了解決這個問題CyclicBarrier就應運而生了。html

什麼是CyclicBarrier

CyclicBarrier是什麼?把它拆開來翻譯就是循環(Cycle)和屏障(Barrier
在這裏插入圖片描述
它的主要做用其實和CountDownLanch差很少,都是讓一組線程到達一個屏障時被阻塞,直到最後一個線程到達屏障時,屏障會被打開,全部被屏障阻塞的線程纔會繼續執行,不過它是能夠循環執行的,這是它與CountDownLanch最大的不一樣。CountDownLanch是隻有當最後一個線程把計數器置爲0的時候,其餘阻塞的線程纔會繼續執行。學習CyclicBarrier以前建議先去看看這幾篇文章:java

如何使用

咱們首先先來看下關於使用CyclicBarrier的一個demo:好比遊戲中有個關卡的時候,每次進入下一關的時候都須要進行加載一些地圖、特效背景音樂什麼的只有所有加載完了纔可以進行遊戲:編程

/**demo 來源https://blog.csdn.net/lstcui/article/details/107389371
 * 公衆號【java金融】
 */
public class CyclicBarrierExample {
    static class PreTaskThread implements Runnable {
        private String task;
        private CyclicBarrier cyclicBarrier;

        public PreTaskThread(String task, CyclicBarrier cyclicBarrier) {
            this.task = task;
            this.cyclicBarrier = cyclicBarrier;
        }

        @Override
        public void run() {
            for (int i = 0; i < 4; i++) {
                Random random = new Random();
                try {
                    Thread.sleep(random.nextInt(1000));
                    System.out.println(String.format("關卡 %d 的任務 %s 完成", i, task));
                    cyclicBarrier.await();
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }
        }

        public static void main(String[] args) {
            CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {
                System.out.println("本關卡全部的前置任務完成,開始遊戲... ...");
            });
            new Thread(new PreTaskThread("加載地圖數據", cyclicBarrier)).start();
            new Thread(new PreTaskThread("加載人物模型", cyclicBarrier)).start();
            new Thread(new PreTaskThread("加載背景音樂", cyclicBarrier)).start();
        }
    }
}

輸出結果以下:
在這裏插入圖片描述
咱們能夠看到每次遊戲開始都會等當前關卡把遊戲的人物模型,地圖數據、背景音樂加載完成後纔會開始進行遊戲。而且仍是能夠循環控制的。併發

源碼分析

結構組成

/** The lock for guarding barrier entry */
    private final ReentrantLock lock = new ReentrantLock();
    /** Condition to wait on until tripped */
    private final Condition trip = lock.newCondition();
    /** The number of parties */
    private final int parties;
    /* The command to run when tripped */
    private final Runnable barrierCommand;
    /** The current generation */
    private Generation generation = new Generation();
  • lock:用於保護屏障入口的鎖
  • trip :達到屏障而且不能放行的線程在trip條件變量上等待
  • parties :柵欄開啓須要的到達線程總數
  • barrierCommand:最後一個線程到達屏障後執行的回調任務
  • generation:這是一個內部類,經過它實現CyclicBarrier重複利用,每當await達到最大次數的時候,就會從新new 一個,表示進入了下一個輪迴。裏面只有一個boolean型屬性,用來表示當前輪迴是否有線程中斷。

主要方法

await方法app

public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }
 /**
     * Main barrier code, covering the various policies.
     */
    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock();
         try {
               //獲取barrier當前的 「代」也就是當前循環
             final Generation g = generation;
            if (g.broken)
                throw new BrokenBarrierException();

            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }
            // 每來一個線程調用await方法都會進行減1
            int index = --count;
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    // new CyclicBarrier 傳入 的barrierCommand, command.run()這個方法是同步的,若是耗時比較多的話,是否執行的時候須要考慮下是否異步來執行。
                    if (command != null)
                        command.run();
                    ranAction = true;
                    // 這個方法1. 喚醒全部阻塞的線程,2. 重置下count(count 每來一個線程都會進行減1)和generation,以便於下次循環。
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }

            // loop until tripped, broken, interrupted, or timed out
            for (;;) {
                try {
                     // 進入if條件,說明是不帶超時的await
                    if (!timed)
                         // 當前線程會釋放掉lock,而後進入到trip條件隊列的尾部,而後掛起本身,等待被喚醒。
                        trip.await();
                    else if (nanos > 0L)
                         //說明當前線程調用await方法時 是指定了 超時時間的!
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                     //Node節點在 條件隊列內 時 收到中斷信號時 會拋出中斷異常!
                    //g == generation 成立,說明當前代並無變化。
                    //! g.broken 當前代若是沒有被打破,那麼當前線程就去打破,而且拋出異常..
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                    //執行到else有幾種狀況?
                    //1.代發生了變化,這個時候就不須要拋出中斷異常了,由於 代已經更新了,這裏喚醒後就走正常邏輯了..只不過設置下 中斷標記。
                    //2.代沒有發生變化,可是代被打破了,此時也不用返回中斷異常,執行到下面的時候會拋出  brokenBarrier異常。也記錄下中斷標記位。
                        Thread.currentThread().interrupt();
                    }
                }
               //喚醒後,執行到這裏,有幾種狀況?
              //1.正常狀況,當前barrier開啓了新的一代(trip.signalAll())
              //2.當前Generation被打破,此時也會喚醒全部在trip上掛起的線程
              //3.當前線程trip中等待超時,而後主動轉移到 阻塞隊列 而後獲取到鎖 喚醒。
                if (g.broken)
                    throw new BrokenBarrierException();
               //喚醒後,執行到這裏,有幾種狀況?
            //1.正常狀況,當前barrier開啓了新的一代(trip.signalAll())
            //2.當前線程trip中等待超時,而後主動轉移到 阻塞隊列 而後獲取到鎖 喚醒。
                if (g != generation)
                    return index;
               //喚醒後,執行到這裏,有幾種狀況?
            //.當前線程trip中等待超時,而後主動轉移到 阻塞隊列 而後獲取到鎖 喚醒。
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
             lock.unlock();
        }
    }

小結

到了這裏咱們是否是能夠知道爲啥CyclicBarrier能夠進行循環計數?
CyclicBarrier採用一個內部類Generation來維護當前循環,每個await方法都會存儲當前的generation,獲取到相同generation對象的屬於同一組,每當count的次數耗盡就會從新new一個Generation而且從新設置count的值爲parties,表示進入下一次新的循環。
從這個await方法咱們是否是能夠知道只要有一個線程被中斷了,當代的 generationbroken 就會被設置爲true,因此會致使其餘的線程也會被拋出BrokenBarrierException。至關於一個失敗其餘也必須失敗,感受有「強一致性「的味道。dom

總結

  • CountDownLanch是爲計數器是設置一個值,當屢次執行countdown後,計數器減爲0的時候全部線程被喚醒,而後CountDownLanch失效,只可以使用一次。
  • CyclicBarrier是當count0時一樣喚醒所有線程,同時會從新設置countparties,從新new一個generation來實現重複利用。

結束

相關文章
相關標籤/搜索