CountDownLatch
是多線程控制的一種工具,它被稱爲 門閥
、 計數器
或者 閉鎖
。這個工具常常用來用來協調多個線程之間的同步,或者提及到線程之間的通訊(而不是用做互斥的做用)。下面咱們就來一塊兒認識一下 CountDownLatchjava
我把本身以往的文章彙總成爲了 Github ,歡迎各位大佬 star
https://github.com/crisxuan/bestJavaernode
CountDownLatch 可以使一個線程在等待另一些線程完成各自工做以後,再繼續執行。它至關因而一個計數器,這個計數器的初始值就是線程的數量,每當一個任務完成後,計數器的值就會減一,當計數器的值爲 0 時,表示全部的線程都已經任務了,而後在 CountDownLatch 上等待的線程就能夠恢復執行接下來的任務。git
CountDownLatch 提供了一個構造方法,你必須指定其初始值,還指定了 countDown
方法,這個方法的做用主要用來減少計數器的值,當計數器變爲 0 時,在 CountDownLatch 上 await
的線程就會被喚醒,繼續執行其餘任務。固然也能夠延遲喚醒,給 CountDownLatch 加一個延遲時間就能夠實現。程序員
其主要方法以下github
CountDownLatch 主要有下面這幾個應用場景微信
典型的應用場景就是當一個服務啓動時,同時會加載不少組件和服務,這時候主線程會等待組件和服務的加載。當全部的組件和服務都加載完畢後,主線程和其餘線程在一塊兒完成某個任務。數據結構
CountDownLatch 還能夠實現學生一塊兒比賽跑步的程序,CountDownLatch 初始化爲學生數量的線程,鳴槍後,每一個學生就是一條線程,來完成各自的任務,當第一個學生跑徹底程後,CountDownLatch 就會減一,直到全部的學生完成後,CountDownLatch 會變爲 0 ,接下來再一塊兒宣佈跑步成績。多線程
順着這個場景,你本身就能夠延伸、拓展出來不少其餘任務場景。併發
下面咱們經過一個簡單的計數器來演示一下 CountDownLatch 的用法框架
public class TCountDownLatch { public static void main(String[] args) { CountDownLatch latch = new CountDownLatch(5); Increment increment = new Increment(latch); Decrement decrement = new Decrement(latch); new Thread(increment).start(); new Thread(decrement).start(); try { Thread.sleep(6000); } catch (InterruptedException e) { e.printStackTrace(); } } } class Decrement implements Runnable { CountDownLatch countDownLatch; public Decrement(CountDownLatch countDownLatch){ this.countDownLatch = countDownLatch; } @Override public void run() { try { for(long i = countDownLatch.getCount();i > 0;i--){ Thread.sleep(1000); System.out.println("countdown"); this.countDownLatch.countDown(); } } catch (InterruptedException e) { e.printStackTrace(); } } } class Increment implements Runnable { CountDownLatch countDownLatch; public Increment(CountDownLatch countDownLatch){ this.countDownLatch = countDownLatch; } @Override public void run() { try { System.out.println("await"); countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Waiter Released"); } }
在 main 方法中咱們初始化了一個計數器爲 5 的 CountDownLatch,在 Decrement 方法中咱們使用 countDown
執行減一操做,而後睡眠一段時間,同時在 Increment 類中進行等待,直到 Decrement 中的線程完成計數減一的操做後,喚醒 Increment 類中的 run 方法,使其繼續執行。
下面咱們再來經過學生賽跑這個例子來演示一下 CountDownLatch 的具體用法
public class StudentRunRace { CountDownLatch stopLatch = new CountDownLatch(1); CountDownLatch runLatch = new CountDownLatch(10); public void waitSignal() throws Exception{ System.out.println("選手" + Thread.currentThread().getName() + "正在等待裁判發佈口令"); stopLatch.await(); System.out.println("選手" + Thread.currentThread().getName() + "已接受裁判口令"); Thread.sleep((long) (Math.random() * 10000)); System.out.println("選手" + Thread.currentThread().getName() + "到達終點"); runLatch.countDown(); } public void waitStop() throws Exception{ Thread.sleep((long) (Math.random() * 10000)); System.out.println("裁判"+Thread.currentThread().getName()+"即將發佈口令"); stopLatch.countDown(); System.out.println("裁判"+Thread.currentThread().getName()+"已發送口令,正在等待全部選手到達終點"); runLatch.await(); System.out.println("全部選手都到達終點"); System.out.println("裁判"+Thread.currentThread().getName()+"彙總成績排名"); } public static void main(String[] args) { ExecutorService service = Executors.newCachedThreadPool(); StudentRunRace studentRunRace = new StudentRunRace(); for (int i = 0; i < 10; i++) { Runnable runnable = () -> { try { studentRunRace.waitSignal(); } catch (Exception e) { e.printStackTrace(); } }; service.execute(runnable); } try { studentRunRace.waitStop(); } catch (Exception e) { e.printStackTrace(); } service.shutdown(); } }
下面咱們就來一塊兒分析一下 CountDownLatch
的源碼
CountDownLatch 使用起來比較簡單,可是卻很是有用,如今你能夠在你的工具箱中加上 CountDownLatch 這個工具類了。下面咱們就來深刻認識一下 CountDownLatch。
CountDownLatch 的底層是由 AbstractQueuedSynchronizer
支持,而 AQS 的數據結構的核心就是兩個隊列,一個是 同步隊列(sync queue)
,一個是條件隊列(condition queue)
。
CountDownLatch 在其內部是一個 Sync ,它繼承了 AQS 抽象類。
private static final class Sync extends AbstractQueuedSynchronizer {...}
CountDownLatch 其實其內部只有一個 sync
屬性,而且是 final 的
private final Sync sync;
CountDownLatch 只有一個帶參數的構造方法
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); }
也就是說,初始化的時候必須指定計數器的數量,若是數量爲負會直接拋出異常。
而後把 count 初始化爲 Sync 內部的 count,也就是
Sync(int count) { setState(count); }
注意這裏有一個 setState(count),這是什麼意思呢?見聞知意這只是一個設置狀態的操做,可是實際上不僅僅是,還有一層意思是 state 的值表明着待達到條件的線程數。這個咱們在聊 countDown 方法的時候再討論。
getCount()
方法的返回值是 getState()
方法,它是 AbstractQueuedSynchronizer 中的方法,這個方法會返回當前線程計數,具備 volatile 讀取的內存語義。
// ---- CountDownLatch ---- int getCount() { return getState(); } // ---- AbstractQueuedSynchronizer ---- protected final int getState() { return state; }
tryAcquireShared()
方法用於獲取·共享狀態下對象的狀態,判斷對象是否爲 0 ,若是爲 0 返回 1 ,表示可以嘗試獲取,若是不爲 0,那麼返回 -1,表示沒法獲取。
protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } // ---- getState() 方法和上面的方法相同 ----
這個 共享狀態
屬於 AQS 中的概念,在 AQS 中分爲兩種模式,一種是 獨佔模式
,一種是 共享模式
。
tryReleaseShared()
方法用於共享模式下的釋放
protected boolean tryReleaseShared(int releases) { // 減少數量,變爲 0 的時候進行通知。 for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } }
這個方法是一個無限循環,獲取線程狀態,若是線程狀態是 0 則表示沒有被線程佔有,沒有佔有的話那麼直接返回 false ,表示已經釋放;而後下一個狀態進行 - 1 ,使用 compareAndSetState CAS 方法進行和內存值的比較,若是內存值也是 1 的話,就會更新內存值爲 0 ,判斷 nextc 是否爲 0 ,若是 CAS 比較不成功的話,會再次進行循環判斷。
若是 CAS 用法不清楚的話,讀者朋友們能夠參考這篇文章 告訴你一個 AtomicInteger 的驚天大祕密!
await()
方法是 CountDownLatch 一個很是重要的方法,基本上能夠說只有 countDown 和 await 方法纔是 CountDownLatch 的精髓所在,這個方法將會使當前線程在 CountDownLatch 計數減至零以前一直等待,除非線程被中斷。
CountDownLatch 中的 await 方法有兩種,一種是不帶任何參數的 await()
,一種是能夠等待一段時間的await(long timeout, TimeUnit unit)
。下面咱們先來看一下 await() 方法。
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
await 方法內部會調用 acquireSharedInterruptibly 方法,這個 acquireSharedInterruptibly 是 AQS 中的方法,以共享模式進行中斷。
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
能夠看到,acquireSharedInterruptibly 方法的內部會首先判斷線程是否中斷
,若是線程中斷,則直接拋出線程中斷異常。若是沒有中斷,那麼會以共享的方式獲取。若是可以在共享的方式下不能獲取鎖,那麼就會以共享的方式斷開連接。
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
這個方法有些長,咱們分開來看
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }
首先會設置頭節點,而後進行一系列的判斷,獲取節點的獲取節點的後繼,以共享模式進行釋放,就會調用 doReleaseShared 方法,咱們再來看一下 doReleaseShared 方法
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
這個方法會以無限循環的方式首先判斷頭節點是否等於尾節點,若是頭節點等於尾節點的話,就會直接退出。若是頭節點不等於尾節點,會判斷狀態是否爲 SIGNAL,不是的話就繼續循環 compareAndSetWaitStatus,而後斷開後繼節點。若是狀態不是 SIGNAL,也會調用 compareAndSetWaitStatus 設置狀態爲 PROPAGATE,狀態爲 0 而且不成功,就會繼續循環。
也就是說 setHeadAndPropagate 就是設置頭節點而且釋放後繼節點的一系列過程。
shouldParkAfterFailedAcquire(p, node)
這裏if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException();
若是上面 Node p = node.predecessor() 獲取前驅節點不是頭節點,就會進行 park 斷開操做,判斷此時是否可以斷開,判斷的標準以下
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true; if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
這個方法會判斷 Node p 的前驅節點的結點狀態(waitStatus)
,節點狀態一共有五種,分別是
CANCELLED(1)
:表示當前結點已取消調度。當超時或被中斷(響應中斷的狀況下),會觸發變動爲此狀態,進入該狀態後的結點將不會再變化。
SIGNAL(-1)
:表示後繼結點在等待當前結點喚醒。後繼結點入隊時,會將前繼結點的狀態更新爲 SIGNAL。
CONDITION(-2)
:表示結點等待在 Condition 上,當其餘線程調用了 Condition 的 signal() 方法後,CONDITION狀態的結點將從等待隊列轉移到同步隊列中,等待獲取同步鎖。
PROPAGATE(-3)
:共享模式下,前繼結點不只會喚醒其後繼結點,同時也可能會喚醒後繼的後繼結點。
0
:新結點入隊時的默認狀態。
若是前驅節點是 SIGNAL 就會返回 true 表示能夠斷開,若是前驅節點的狀態大於 0 (此時爲何不用 ws == Node.CANCELLED ) 呢?由於 ws 大於 0 的條件只有 CANCELLED 狀態了。而後就是一系列的查找遍歷操做直到前驅節點的 waitStatus > 0。若是 ws <= 0 ,並且還不是 SIGNAL 狀態的話,就會使用 CAS 替換前驅節點的 ws 爲 SIGNAL 狀態。
若是檢查判斷是中斷狀態的話,就會返回 false。
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
這個方法使用 LockSupport.park
斷開鏈接,而後返回線程是否中斷的標誌。
cancelAcquire()
用於取消等待隊列,若是等待過程當中沒有成功獲取資源(如timeout,或者可中斷的狀況下被中斷了),那麼取消結點在隊列中的等待。private void cancelAcquire(Node node) { if (node == null) return; node.thread = null; Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; Node predNext = pred.next; node.waitStatus = Node.CANCELLED; if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { unparkSuccessor(node); } node.next = node; // help GC } }
因此,對 CountDownLatch 的 await 調用大體會有以下的調用過程。
一個和 await 重載的方法是 await(long timeout, TimeUnit unit)
,這個方法和 await 最主要的區別就是這個方法可以能夠等待計數器一段時間再執行後續操做。
countDown 是和 await 同等重要的方法,countDown 用於減小計數器的數量,若是計數減爲 0 的話,就會釋放全部的線程。
public void countDown() { sync.releaseShared(1); }
這個方法會調用 releaseShared 方法,此方法用於共享模式下的釋放操做,首先會判斷是否可以進行釋放,判斷的方法就是 CountDownLatch 內部類 Sync 的 tryReleaseShared 方法
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } // ---- CountDownLatch ---- protected boolean tryReleaseShared(int releases) { for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } }
tryReleaseShared 會進行 for 循環判斷線程狀態值,使用 CAS 不斷嘗試進行替換。
若是可以釋放,就會調用 doReleaseShared 方法
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
能夠看到,doReleaseShared 其實也是一個無限循環不斷使用 CAS 嘗試替換的操做。
本文是 CountDownLatch 的基本使用和源碼分析,CountDownLatch 就是一個基於 AQS 的計數器,它內部的方法都是圍繞 AQS 框架來談的,除此以外還有其餘好比 ReentrantLock、Semaphore 等都是 AQS 的實現,因此要研究併發的話,離不開對 AQS 的探討。CountDownLatch 的源碼看起來不多,比較簡單,可是其內部好比 await 方法的調用鏈路卻很長,也值得花費時間深刻研究。
我是 cxuan,一枚技術創做的程序員。若是本文你以爲不錯的話,跪求讀者點贊、在看、分享!
另外,我本身肝了六本 PDF,微信搜索「程序員cxuan」關注公衆號後,在後臺回覆 cxuan ,領取所有 PDF,這些 PDF 以下