在分析完AbstractQueuedSynchronizer
(如下簡稱 AQS)和ReentrantLock
的原理後,本文將分析 java.util.concurrent 包下的兩個線程同步組件CountDownLatch
和CyclicBarrier
。這兩個同步組件比較經常使用,也常常被放在一塊兒對比。經過分析這兩個同步組件,可以使咱們對 Java 線程間協同有更深刻的瞭解。同時經過分析其原理,也可以使咱們作到知其然,並知其因此然。java
這裏首先來介紹一下 CountDownLatch 的用途,CountDownLatch 容許一個或一組線程等待其餘線程完成後再恢復運行。線程可經過調用await
方法進入等待狀態,在其餘線程調用countDown
方法將計數器減爲0後,處於等待狀態的線程便可恢復運行。CyclicBarrier (可循環使用的屏障)則與此不一樣,CyclicBarrier 容許一組線程到達屏障後阻塞住,直到最後一個線程進入到達屏障,全部線程才恢復運行。它們之間主要的區別在於喚醒等待線程的時機。CountDownLatch 是在計數器減爲0後,喚醒等待線程。CyclicBarrier 是在計數器(等待線程數)增加到指定數量後,再喚醒等待線程。除此以外,兩種之間還有一些其餘的差別,這個將會在後面進行說明。app
在下一章中,我將會介紹一下二者的實現原理,繼續往下看吧。源碼分析
CountDownLatch 的同步功能是基於 AQS 實現的,CountDownLatch 使用 AQS 中的 state 成員變量做爲計數器。在 state 不爲0的狀況下,凡是調用 await 方法的線程將會被阻塞,並被放入 AQS 所維護的同步隊列中進行等待。大體示意圖以下:學習
每一個阻塞的線程都會被封裝成節點對象,節點之間經過 prev 和 next 指針造成同步隊列。初始狀況下,隊列的頭結點是一個虛擬節點。該節點僅是一個佔位符,沒什麼特別的意義。每當有一個線程調用 countDown 方法,就將計數器 state--。當 state 被減至0時,隊列中的節點就會按照 FIFO 順序被喚醒,被阻塞的線程便可恢復運行。ui
CountDownLatch 自己的原理並不難理解,不過若是你們想深刻理解 CountDownLatch 的實現細節,那麼須要先去學習一下 AQS 的相關原理。CountDownLatch 是基於 AQS 實現的,因此理解 AQS 是學習 CountDownLatch 的前置條件。我在以前寫過一篇關於 AQS 的文章 Java 重入鎖 ReentrantLock 原理分析,有興趣的朋友能夠去讀一讀。this
與 CountDownLatch 的實現方式不一樣,CyclicBarrier 並無直接經過 AQS 實現同步功能,而是在重入鎖 ReentrantLock 的基礎上實現的。在 CyclicBarrier 中,線程訪問 await 方法需先獲取鎖才能訪問。在最後一個線程訪問 await 方法前,其餘線程進入 await 方法中後,會調用 Condition 的 await 方法進入等待狀態。在最後一個線程進入 CyclicBarrier await 方法後,該線程將會調用 Condition 的 signalAll 方法喚醒全部處於等待狀態中的線程。同時,最後一個進入 await 的線程還會重置 CyclicBarrier 的狀態,使其能夠重複使用。spa
在建立 CyclicBarrier 對象時,須要轉入一個值,用於初始化 CyclicBarrier 的成員變量 parties,該成員變量表示屏障攔截的線程數。當到達屏障的線程數小於 parties 時,這些線程都會被阻塞住。當最後一個線程到達屏障後,此前被阻塞的線程纔會被喚醒。線程
經過前面簡單的分析,相信你們對 CountDownLatch 和 CyclicBarrier 的原理有必定的瞭解了。那麼接下來趁熱打鐵,咱們一塊兒探索一下這兩個同步組件的具體實現吧。指針
CountDownLatch 的原理不是很複雜,因此在具體的實現上,也不是很複雜。固然,前面說過 CountDownLatch 是基於 AQS 實現的,AQS 的實現則要複雜的多。不過這裏僅要求你們掌握 AQS 的基本原理,知道它內部維護了一個同步隊列,同步隊列中的線程會按照 FIFO 依次獲取同步狀態就好了。好了,下面咱們一塊兒去看一下 CountDownLatch 的源碼吧。code
CountDownLatch 的代碼量不大,加上註釋也不過300多行,因此它的代碼結構也會比較簡單。以下:
如上圖,CountDownLatch 源碼包含一個構造方法和一個私有成員變量,以及數個普通方法和一個重要的靜態內部類 Sync。CountDownLatch 的主要邏輯都是封裝在 Sync 和其父類 AQS 裏的。因此分析 CountDownLatch 的源碼,本質上是分析 Sync 和 AQS 的原理。相關的分析,將會在下一節中展開,本節先說到這。
本節來分析一下 CountDownLatch 的構造方法和其 Sync 類型的成員變量實現,以下:
public class CountDownLatch { private final Sync sync; /** CountDownLatch 的構造方法,該方法要求傳入大於0的整型數值做爲計數器 */ public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); // 初始化 Sync this.sync = new Sync(count); } /** CountDownLatch 的同步控制器,繼承自 AQS */ private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { // 設置 AQS state setState(count); } int getCount() { return getState(); } /** 嘗試在共享狀態下獲取同步狀態,該方法在 AQS 中是抽象方法,這裏進行了覆寫 */ protected int tryAcquireShared(int acquires) { /* * 若是 state = 0,則返回1,代表可獲取同步狀態, * 此時線程調用 await 方法時就不會被阻塞。 */ return (getState() == 0) ? 1 : -1; } /** 嘗試在共享狀態下釋放同步狀態,該方法在 AQS 中也是抽象方法 */ protected boolean tryReleaseShared(int releases) { /* * 下面的邏輯是將 state--,state 減至0時,調用 await 等待的線程會被喚醒。 * 這裏使用循環 + CAS,代表會存在競爭的狀況,也就是多個線程可能會同時調用 * countDown 方法。在 state 不爲0的狀況下,線程調用 countDown 是必需要完 * 成 state-- 這個操做。因此這裏使用了循環 + CAS,確保 countDown 方法可正 * 常運行。 */ for (;;) { // 獲取 state int c = getState(); if (c == 0) return false; int nextc = c-1; // 使用 CAS 設置新的 state 值 if (compareAndSetState(c, nextc)) return nextc == 0; } } } }
須要說明的是,Sync 中的 tryAcquireShared 和 tryReleaseShared 方法並非直接給 await 和 countDown 方法調用了的,這兩個方法以「try」開頭的方法最終會在 AQS 中被調用。
CountDownLatch中有兩個版本的 await 方法,一個響應中斷,另外一個在此基礎上增長了超時功能。本節將分析無超時功能的 await,以下:
/** * 該方法會使線程進入等待狀態,直到計數器減至0,或者線程被中斷。當計數器爲0時,調用 * 此方法將會當即返回,不會被阻塞住。 */ public void await() throws InterruptedException { // 調用 AQS 中的 acquireSharedInterruptibly 方法 sync.acquireSharedInterruptibly(1); } /** 帶有超時功能的 await */ public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } +--- AbstractQueuedSynchronizer public final void acquireSharedInterruptibly(int arg) throws InterruptedException { // 若線程被中斷,則直接拋出中斷異常 if (Thread.interrupted()) throw new InterruptedException(); // 調用 Sync 中覆寫的 tryAcquireShared 方法,嘗試獲取同步狀態 if (tryAcquireShared(arg) < 0) /* * 若 tryAcquireShared 小於0,則表示獲取同步狀態失敗, * 此時將線程放入 AQS 的同步隊列中進行等待。 */ doAcquireSharedInterruptibly(arg); }
從上面的代碼中能夠看出,CountDownLatch await 方法實際上調用的是 AQS 的 acquireSharedInterruptibly 方法。該方法會在內部調用 Sync 所覆寫的 tryAcquireShared 方法。在 state != 0時,tryAcquireShared 返回值 -1。此時線程將進入 doAcquireSharedInterruptibly 方法中,在此方法中,線程會被放入同步隊列中進行等待。若 state = 0,此時 tryAcquireShared 返回1,acquireSharedInterruptibly 會直接返回。此時調用 await 的線程也不會被阻塞住。
與 await 方法同樣,countDown 實際上也是對 AQS 方法的一層封裝。具體的實現以下:
/** 該方法的做用是將計數器進行自減操做,當計數器爲0時,喚醒正在同步隊列中等待的線程 */ public void countDown() { // 調用 AQS 中的 releaseShared 方法 sync.releaseShared(1); } +--- AbstractQueuedSynchronizer public final boolean releaseShared(int arg) { // 調用 Sync 中的 tryReleaseShared 嘗試釋放同步狀態 if (tryReleaseShared(arg)) { /* * tryReleaseShared 返回 true 時,代表 state = 0,即計數器爲0。此時調用 * doReleaseShared 方法喚醒正在同步隊列中等待的線程 */ doReleaseShared(); return true; } return false; }
以上就是 countDown 的源碼分析,不是很難懂,這裏就不囉嗦了。
如前面所說,CyclicBarrier 是基於重入鎖 ReentrantLock 實現相關邏輯的。因此要弄懂 CyclicBarrier 的源碼,僅需有 ReentrantLock 相關的背景知識便可。關於重入鎖 ReentrantLock 方面的知識,有興趣的朋友能夠參考我以前寫的文章 Java 重入鎖 ReentrantLock 原理分析。下面看一下 CyclicBarrier 的代碼結構吧,以下:
從上圖能夠看出,CyclicBarrier 包含了一個靜態內部類Generation
、數個方法和一些成員變量。結構上比 CountDownLatch 略爲複雜一些,但整體仍比較簡單。好了,接下來進入源碼分析部分吧。
CyclicBarrier 包含兩個有參構造方法,分別以下:
/** 建立一個容許 parties 個線程通行的屏障 */ public CyclicBarrier(int parties) { this(parties, null); } /** * 建立一個容許 parties 個線程通行的屏障,若 barrierAction 回調對象不爲 null, * 則在最後一個線程到達屏障後,執行相應的回調邏輯 */ public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; }
上面的第二個構造方法初始化了一些成員變量,下面咱們就來講明一下這些成員變量的做用。
成員變量 | 做用 |
---|---|
parties | 線程數,即當 parties 個線程到達屏障後,屏障纔會放行 |
count | 計數器,當 count > 0 時,到達屏障的線程會進入等待狀態。當最後一個線程到達屏障後,count 自減至0。最後一個到達的線程會執行回調方法,並喚醒其餘處於等待狀態中的線程。 |
barrierCommand | 回調對象,若是不爲 null,會在第 parties 個線程到達屏障後被執行 |
除了上面幾個成員變量,還有一個成員變量須要說明一下,以下:
/** * CyclicBarrier 是可循環使用的屏障,這裏使用 Generation 記錄當前輪次 CyclicBarrier * 的運行狀態。當全部線程到達屏障後,generation 將會被更新,表示 CyclicBarrier 進入新一 * 輪的運行輪次中。 */ private Generation generation = new Generation(); private static class Generation { // 用於記錄屏障有沒有被破壞 boolean broken = false; }
上一節所提到的幾個成員變量,在 await 方法中將會悉數登場。下面就來分析一下 await 方法的試下,以下:
public int await() throws InterruptedException, BrokenBarrierException { try { // await 的邏輯封裝在 dowait 中 return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } } private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; // 加鎖 lock.lock(); try { final Generation g = generation; // 若是 g.broken = true,代表屏障被破壞了,這裏直接拋出異常 if (g.broken) throw new BrokenBarrierException(); // 若是線程中斷,則調用 breakBarrier 破壞屏障 if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } /* * index 表示線程到達屏障的順序,index = parties - 1 代表當前線程是第一個 * 到達屏障的。index = 0,代表當前線程是最有一個到達屏障的。 */ int index = --count; // 當 index = 0 時,喚醒全部處於等待狀態的線程 if (index == 0) { // tripped boolean ranAction = false; try { final Runnable command = barrierCommand; // 若是回調對象不爲 null,則執行回調 if (command != null) command.run(); ranAction = true; // 重置屏障狀態,使其進入新一輪的運行過程當中 nextGeneration(); return 0; } finally { // 若執行回調的過程當中發生異常,此時調用 breakBarrier 破壞屏障 if (!ranAction) breakBarrier(); } } // 線程運行到此處的線程都會被屏障擋住,並進入等待狀態。 for (;;) { try { if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { /* * 若下面的條件成立,則代表本輪運行還未結束。此時調用 breakBarrier * 破壞屏障,喚醒其餘線程,並拋出異常 */ if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { /* * 若上面的條件不成立,則有兩種可能: * 1. g != generation * 此種狀況下,代表循環屏障的第 g 輪次的運行已經結束,屏障已經 * 進入了新的一輪運行輪次中。當前線程在稍後返回 到達屏障 的順序便可 * * 2. g = generation 但 g.broken = true * 此種狀況下,代表已經有線程執行過 breakBarrier 方法了,當前 * 線程則會在稍後拋出 BrokenBarrierException */ Thread.currentThread().interrupt(); } } // 屏障被破壞,則拋出 BrokenBarrierException 異常 if (g.broken) throw new BrokenBarrierException(); // 屏障進入新的運行輪次,此時返回線程在上一輪次到達屏障的順序 if (g != generation) return index; // 超時判斷 if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } } /** 開啓新的一輪運行過程 */ private void nextGeneration() { // 喚醒全部處於等待狀態中的線程 trip.signalAll(); // 重置 count count = parties; // 從新建立 Generation,代表進入循環屏障進入新的一輪運行輪次中 generation = new Generation(); } /** 破壞屏障 */ private void breakBarrier() { // 設置屏障是否被破壞標誌 generation.broken = true; // 重置 count count = parties; // 喚醒全部處於等待狀態中的線程 trip.signalAll(); }
reset 方法用於強制重置屏障,使屏障進入新一輪的運行過程當中。代碼以下:
public void reset() { final ReentrantLock lock = this.lock; lock.lock(); try { // 破壞屏障 breakBarrier(); // break the current generation // 開啓新一輪的運行過程 nextGeneration(); // start a new generation } finally { lock.unlock(); } }
reset 方法並不複雜,沒什麼好講的。CyclicBarrier 中還有其餘一些方法,均不復雜,這裏就不一一分析了。
看完上面的分析,相信你們對着兩個同步組件有了更深刻的認識。那麼下面趁熱打鐵,簡單對比一下二者之間的區別。這裏用一個表格列舉一下:
差別點 | CountDownLatch | CyclicBarrier |
---|---|---|
等待線程喚醒時機 | 計數器減至0時,喚醒等待線程 | 到達屏障的線程數達到 parties 時,喚醒等待線程 |
是否可循環使用 | 否 | 是 |
是否可設置回調 | 否 | 是 |
除了上面列舉的差別點,還有一些其餘方面的差別,這裏就不一一列舉了。
分析完 CountDownLatch 和 CyclicBarrier,不知道你們有什麼感受。我我的的感受是這兩個類的源碼並不複雜,比較好理解。固然,前提是創建在對 AQS 以及 ReentrantLock 有較深的理解之上。因此在學習這兩個類的源碼時,仍是建議你們先看看前置知識。
好了,本文到這裏就結束了。謝謝閱讀,再見。
本文在知識共享許可協議 4.0 下發布,轉載需在明顯位置處註明出處
做者:coolblog
本文同步發佈在個人我的博客: http://www.coolblog.xyz
本做品採用知識共享署名-非商業性使用-禁止演繹 4.0 國際許可協議進行許可。