Java 線程同步組件 CountDownLatch 與 CyclicBarrier 原理分析

1.簡介

在分析完AbstractQueuedSynchronizer(如下簡稱 AQS)和ReentrantLock的原理後,本文將分析 java.util.concurrent 包下的兩個線程同步組件CountDownLatchCyclicBarrier。這兩個同步組件比較經常使用,也常常被放在一塊兒對比。經過分析這兩個同步組件,可以使咱們對 Java 線程間協同有更深刻的瞭解。同時經過分析其原理,也可以使咱們作到知其然,並知其因此然。java

這裏首先來介紹一下 CountDownLatch 的用途,CountDownLatch 容許一個或一組線程等待其餘線程完成後再恢復運行。線程可經過調用await方法進入等待狀態,在其餘線程調用countDown方法將計數器減爲0後,處於等待狀態的線程便可恢復運行。CyclicBarrier (可循環使用的屏障)則與此不一樣,CyclicBarrier 容許一組線程到達屏障後阻塞住,直到最後一個線程進入到達屏障,全部線程才恢復運行。它們之間主要的區別在於喚醒等待線程的時機。CountDownLatch 是在計數器減爲0後,喚醒等待線程。CyclicBarrier 是在計數器(等待線程數)增加到指定數量後,再喚醒等待線程。除此以外,兩種之間還有一些其餘的差別,這個將會在後面進行說明。app

在下一章中,我將會介紹一下二者的實現原理,繼續往下看吧。源碼分析

2.原理

2.1 CountDownLatch 的實現原理

CountDownLatch 的同步功能是基於 AQS 實現的,CountDownLatch 使用 AQS 中的 state 成員變量做爲計數器。在 state 不爲0的狀況下,凡是調用 await 方法的線程將會被阻塞,並被放入 AQS 所維護的同步隊列中進行等待。大體示意圖以下:學習

每一個阻塞的線程都會被封裝成節點對象,節點之間經過 prev 和 next 指針造成同步隊列。初始狀況下,隊列的頭結點是一個虛擬節點。該節點僅是一個佔位符,沒什麼特別的意義。每當有一個線程調用 countDown 方法,就將計數器 state--。當 state 被減至0時,隊列中的節點就會按照 FIFO 順序被喚醒,被阻塞的線程便可恢復運行。ui

CountDownLatch 自己的原理並不難理解,不過若是你們想深刻理解 CountDownLatch 的實現細節,那麼須要先去學習一下 AQS 的相關原理。CountDownLatch 是基於 AQS 實現的,因此理解 AQS 是學習 CountDownLatch 的前置條件。我在以前寫過一篇關於 AQS 的文章 Java 重入鎖 ReentrantLock 原理分析,有興趣的朋友能夠去讀一讀。this

2.2 CyclicBarrier 的實現原理

與 CountDownLatch 的實現方式不一樣,CyclicBarrier 並無直接經過 AQS 實現同步功能,而是在重入鎖 ReentrantLock 的基礎上實現的。在 CyclicBarrier 中,線程訪問 await 方法需先獲取鎖才能訪問。在最後一個線程訪問 await 方法前,其餘線程進入 await 方法中後,會調用 Condition 的 await 方法進入等待狀態。在最後一個線程進入 CyclicBarrier await 方法後,該線程將會調用 Condition 的 signalAll 方法喚醒全部處於等待狀態中的線程。同時,最後一個進入 await 的線程還會重置 CyclicBarrier 的狀態,使其能夠重複使用。spa

在建立 CyclicBarrier 對象時,須要轉入一個值,用於初始化 CyclicBarrier 的成員變量 parties,該成員變量表示屏障攔截的線程數。當到達屏障的線程數小於 parties 時,這些線程都會被阻塞住。當最後一個線程到達屏障後,此前被阻塞的線程纔會被喚醒。線程

3.源碼分析

經過前面簡單的分析,相信你們對 CountDownLatch 和 CyclicBarrier 的原理有必定的瞭解了。那麼接下來趁熱打鐵,咱們一塊兒探索一下這兩個同步組件的具體實現吧。指針

3.1 CountDownLatch 源碼分析

CountDownLatch 的原理不是很複雜,因此在具體的實現上,也不是很複雜。固然,前面說過 CountDownLatch 是基於 AQS 實現的,AQS 的實現則要複雜的多。不過這裏僅要求你們掌握 AQS 的基本原理,知道它內部維護了一個同步隊列,同步隊列中的線程會按照 FIFO 依次獲取同步狀態就好了。好了,下面咱們一塊兒去看一下 CountDownLatch 的源碼吧。code

3.1.1 源碼結構

CountDownLatch 的代碼量不大,加上註釋也不過300多行,因此它的代碼結構也會比較簡單。以下:

如上圖,CountDownLatch 源碼包含一個構造方法和一個私有成員變量,以及數個普通方法和一個重要的靜態內部類 Sync。CountDownLatch 的主要邏輯都是封裝在 Sync 和其父類 AQS 裏的。因此分析 CountDownLatch 的源碼,本質上是分析 Sync 和 AQS 的原理。相關的分析,將會在下一節中展開,本節先說到這。

3.1.2 構造方法及成員變量

本節來分析一下 CountDownLatch 的構造方法和其 Sync 類型的成員變量實現,以下:

public class CountDownLatch {

    private final Sync sync;

    /** CountDownLatch 的構造方法,該方法要求傳入大於0的整型數值做爲計數器 */
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        // 初始化 Sync
        this.sync = new Sync(count);
    }
    
    /** CountDownLatch 的同步控制器,繼承自 AQS */
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            // 設置 AQS state
            setState(count);
        }

        int getCount() {
            return getState();
        }

        /** 嘗試在共享狀態下獲取同步狀態,該方法在 AQS 中是抽象方法,這裏進行了覆寫 */
        protected int tryAcquireShared(int acquires) {
            /*
             * 若是 state = 0,則返回1,代表可獲取同步狀態,
             * 此時線程調用 await 方法時就不會被阻塞。
             */ 
            return (getState() == 0) ? 1 : -1;
        }

        /** 嘗試在共享狀態下釋放同步狀態,該方法在 AQS 中也是抽象方法 */
        protected boolean tryReleaseShared(int releases) {
            /*
             * 下面的邏輯是將 state--,state 減至0時,調用 await 等待的線程會被喚醒。
             * 這裏使用循環 + CAS,代表會存在競爭的狀況,也就是多個線程可能會同時調用 
             * countDown 方法。在 state 不爲0的狀況下,線程調用 countDown 是必需要完
             * 成 state-- 這個操做。因此這裏使用了循環 + CAS,確保 countDown 方法可正
             * 常運行。
             */
            for (;;) {
                // 獲取 state
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                
                // 使用 CAS 設置新的 state 值
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }
}

須要說明的是,Sync 中的 tryAcquireShared 和 tryReleaseShared 方法並非直接給 await 和 countDown 方法調用了的,這兩個方法以「try」開頭的方法最終會在 AQS 中被調用。

3.1.3 await

CountDownLatch中有兩個版本的 await 方法,一個響應中斷,另外一個在此基礎上增長了超時功能。本節將分析無超時功能的 await,以下:

/** 
 * 該方法會使線程進入等待狀態,直到計數器減至0,或者線程被中斷。當計數器爲0時,調用
 * 此方法將會當即返回,不會被阻塞住。
 */
public void await() throws InterruptedException {
    // 調用 AQS 中的 acquireSharedInterruptibly 方法
    sync.acquireSharedInterruptibly(1);
}

/** 帶有超時功能的 await */
public boolean await(long timeout, TimeUnit unit)
    throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

+--- AbstractQueuedSynchronizer
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
    // 若線程被中斷,則直接拋出中斷異常
    if (Thread.interrupted())
        throw new InterruptedException();
    // 調用 Sync 中覆寫的 tryAcquireShared 方法,嘗試獲取同步狀態
    if (tryAcquireShared(arg) < 0)
        /*
         * 若 tryAcquireShared 小於0,則表示獲取同步狀態失敗,
         * 此時將線程放入 AQS 的同步隊列中進行等待。
         */ 
        doAcquireSharedInterruptibly(arg);
}

從上面的代碼中能夠看出,CountDownLatch await 方法實際上調用的是 AQS 的 acquireSharedInterruptibly 方法。該方法會在內部調用 Sync 所覆寫的 tryAcquireShared 方法。在 state != 0時,tryAcquireShared 返回值 -1。此時線程將進入 doAcquireSharedInterruptibly 方法中,在此方法中,線程會被放入同步隊列中進行等待。若 state = 0,此時 tryAcquireShared 返回1,acquireSharedInterruptibly 會直接返回。此時調用 await 的線程也不會被阻塞住。

3.1.4 countDown

與 await 方法同樣,countDown 實際上也是對 AQS 方法的一層封裝。具體的實現以下:

/** 該方法的做用是將計數器進行自減操做,當計數器爲0時,喚醒正在同步隊列中等待的線程 */
public void countDown() {
    // 調用 AQS 中的 releaseShared 方法
    sync.releaseShared(1);
}

+--- AbstractQueuedSynchronizer
public final boolean releaseShared(int arg) {
    // 調用 Sync 中的 tryReleaseShared 嘗試釋放同步狀態
    if (tryReleaseShared(arg)) {
        /*
         * tryReleaseShared 返回 true 時,代表 state = 0,即計數器爲0。此時調用
         * doReleaseShared 方法喚醒正在同步隊列中等待的線程
         */ 
        doReleaseShared();
        return true;
    }
    return false;
}

以上就是 countDown 的源碼分析,不是很難懂,這裏就不囉嗦了。

3.2 CyclicBarrier 源碼分析

3.2.1 源碼結構

如前面所說,CyclicBarrier 是基於重入鎖 ReentrantLock 實現相關邏輯的。因此要弄懂 CyclicBarrier 的源碼,僅需有 ReentrantLock 相關的背景知識便可。關於重入鎖 ReentrantLock 方面的知識,有興趣的朋友能夠參考我以前寫的文章 Java 重入鎖 ReentrantLock 原理分析。下面看一下 CyclicBarrier 的代碼結構吧,以下:

從上圖能夠看出,CyclicBarrier 包含了一個靜態內部類Generation、數個方法和一些成員變量。結構上比 CountDownLatch 略爲複雜一些,但整體仍比較簡單。好了,接下來進入源碼分析部分吧。

3.2.2 構造方法及成員變量

CyclicBarrier 包含兩個有參構造方法,分別以下:

/** 建立一個容許 parties 個線程通行的屏障 */
public CyclicBarrier(int parties) {
    this(parties, null);
}

/** 
 * 建立一個容許 parties 個線程通行的屏障,若 barrierAction 回調對象不爲 null,
 * 則在最後一個線程到達屏障後,執行相應的回調邏輯
 */
public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction;
}

上面的第二個構造方法初始化了一些成員變量,下面咱們就來講明一下這些成員變量的做用。

成員變量 做用
parties 線程數,即當 parties 個線程到達屏障後,屏障纔會放行
count 計數器,當 count > 0 時,到達屏障的線程會進入等待狀態。當最後一個線程到達屏障後,count 自減至0。最後一個到達的線程會執行回調方法,並喚醒其餘處於等待狀態中的線程。
barrierCommand 回調對象,若是不爲 null,會在第 parties 個線程到達屏障後被執行

除了上面幾個成員變量,還有一個成員變量須要說明一下,以下:

/** 
 * CyclicBarrier 是可循環使用的屏障,這裏使用 Generation 記錄當前輪次 CyclicBarrier 
 * 的運行狀態。當全部線程到達屏障後,generation 將會被更新,表示 CyclicBarrier 進入新一
 * 輪的運行輪次中。
*/
private Generation generation = new Generation();

private static class Generation {
    // 用於記錄屏障有沒有被破壞
    boolean broken = false;
}

3.2.3 await

上一節所提到的幾個成員變量,在 await 方法中將會悉數登場。下面就來分析一下 await 方法的試下,以下:

public int await() throws InterruptedException, BrokenBarrierException {
    try {
        // await 的邏輯封裝在 dowait 中
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}
    
private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException, TimeoutException {
    
    final ReentrantLock lock = this.lock;
    // 加鎖
    lock.lock();
    try {
        final Generation g = generation;

        // 若是 g.broken = true,代表屏障被破壞了,這裏直接拋出異常
        if (g.broken)
            throw new BrokenBarrierException();

        // 若是線程中斷,則調用 breakBarrier 破壞屏障
        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }

        /*
         * index 表示線程到達屏障的順序,index = parties - 1 代表當前線程是第一個
         * 到達屏障的。index = 0,代表當前線程是最有一個到達屏障的。
         */ 
        int index = --count;
        // 當 index = 0 時,喚醒全部處於等待狀態的線程
        if (index == 0) {  // tripped
            boolean ranAction = false;
            try {
                final Runnable command = barrierCommand;
                // 若是回調對象不爲 null,則執行回調
                if (command != null)
                    command.run();
                ranAction = true;
                // 重置屏障狀態,使其進入新一輪的運行過程當中
                nextGeneration();
                return 0;
            } finally {
                // 若執行回調的過程當中發生異常,此時調用 breakBarrier 破壞屏障
                if (!ranAction)
                    breakBarrier();
            }
        }

        // 線程運行到此處的線程都會被屏障擋住,並進入等待狀態。
        for (;;) {
            try {
                if (!timed)
                    trip.await();
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                /*
                 * 若下面的條件成立,則代表本輪運行還未結束。此時調用 breakBarrier 
                 * 破壞屏障,喚醒其餘線程,並拋出異常
                 */ 
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    /*
                     * 若上面的條件不成立,則有兩種可能:
                     * 1. g != generation
                     *     此種狀況下,代表循環屏障的第 g 輪次的運行已經結束,屏障已經
                     *     進入了新的一輪運行輪次中。當前線程在稍後返回 到達屏障 的順序便可
                     *     
                     * 2. g = generation 但 g.broken = true
                     *     此種狀況下,代表已經有線程執行過 breakBarrier 方法了,當前
                     *     線程則會在稍後拋出 BrokenBarrierException
                     */
                    Thread.currentThread().interrupt();
                }
            }
            
            // 屏障被破壞,則拋出 BrokenBarrierException 異常
            if (g.broken)
                throw new BrokenBarrierException();

            // 屏障進入新的運行輪次,此時返回線程在上一輪次到達屏障的順序
            if (g != generation)
                return index;

            // 超時判斷
            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();
    }
}
    
/** 開啓新的一輪運行過程 */
private void nextGeneration() {
    // 喚醒全部處於等待狀態中的線程
    trip.signalAll();
    // 重置 count
    count = parties;
    // 從新建立 Generation,代表進入循環屏障進入新的一輪運行輪次中
    generation = new Generation();
}

/** 破壞屏障 */
private void breakBarrier() {
    // 設置屏障是否被破壞標誌
    generation.broken = true;
    // 重置 count
    count = parties;
    // 喚醒全部處於等待狀態中的線程
    trip.signalAll();
}

3.2.4 reset

reset 方法用於強制重置屏障,使屏障進入新一輪的運行過程當中。代碼以下:

public void reset() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 破壞屏障
        breakBarrier();   // break the current generation
        // 開啓新一輪的運行過程
        nextGeneration(); // start a new generation
    } finally {
        lock.unlock();
    }
}

reset 方法並不複雜,沒什麼好講的。CyclicBarrier 中還有其餘一些方法,均不復雜,這裏就不一一分析了。

4.二者區別

看完上面的分析,相信你們對着兩個同步組件有了更深刻的認識。那麼下面趁熱打鐵,簡單對比一下二者之間的區別。這裏用一個表格列舉一下:

差別點 CountDownLatch CyclicBarrier
等待線程喚醒時機 計數器減至0時,喚醒等待線程 到達屏障的線程數達到 parties 時,喚醒等待線程
是否可循環使用
是否可設置回調

除了上面列舉的差別點,還有一些其餘方面的差別,這裏就不一一列舉了。

5.總結

分析完 CountDownLatch 和 CyclicBarrier,不知道你們有什麼感受。我我的的感受是這兩個類的源碼並不複雜,比較好理解。固然,前提是創建在對 AQS 以及 ReentrantLock 有較深的理解之上。因此在學習這兩個類的源碼時,仍是建議你們先看看前置知識。

好了,本文到這裏就結束了。謝謝閱讀,再見。

本文在知識共享許可協議 4.0 下發布,轉載需在明顯位置處註明出處
做者:coolblog
本文同步發佈在個人我的博客: http://www.coolblog.xyz

cc
本做品採用知識共享署名-非商業性使用-禁止演繹 4.0 國際許可協議進行許可。

相關文章
相關標籤/搜索