CountDownLatch
可以實現讓線程等待某個計數器倒數到零的功能,以前對它的瞭解也僅僅是簡單的使用,對於其內部如何實現線程等待卻不是很瞭解,最好的辦法就是經過看源碼來了解底層的實現細節。CountDownLatch
的源碼並非很複雜,由於其核心的功能是依賴AbstractQueuedSynchronizer
(下文簡稱AQS
)來實現的。CountDownLatch
經常使用的方法不多,可是由於涉及到AQS
,邏輯有些繞,要理清中間的邏輯稍微要費一些時間。node
CountDownLatch
的核心功能是經過內部類Sync
實現的,這個類繼承了AQS
:app
private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; //構造器,根據傳入的整數初始化狀態字段state Sync(int count) { setState(count); } int getCount() { return getState(); } //tryAcquireShared惟一的做用是查看狀態字段是否是等於0 protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero //自旋,在兩種條件下會退出自旋:a)state字段已經爲0;b)線程成功地將state字段減1 for (;;) { int c = getState(); //若是state已經爲0,就返回false if (c == 0) return false; int nextc = c-1; //從下面的語句能夠看到,只有當state=0纔會返回true if (compareAndSetState(c, nextc)) return nextc == 0; } } }
CountDownLatch
只有一個構造器,在構造器中會初始化sync
字段,結合Sync
類的定義可知,構造器的惟一工做是將state
字段初始化爲傳入的參數:oop
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); }
等待的線程會構形成節點放在等待隊列中,節點的狀態waitStatus
有以下幾種:ui
/** waitStatus value to indicate thread has cancelled */ static final int CANCELLED = 1; /** waitStatus value to indicate successor's thread needs unparking */ static final int SIGNAL = -1; /** waitStatus value to indicate thread is waiting on condition */ static final int CONDITION = -2; /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */ static final int PROPAGATE = -3;
注意,在CountDownLatch
中並無用到CONDITION
狀態,所以後文將會直接忽略該狀態,當waitStatus > 0
時,指的就是CANCELLED
狀態。this
await()
0
時,await()
方法會讓當前線程掛起,該方法調用了AQS
類的acquireSharedInterruptibly
方法,以下:public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); //顯然,tryAcquireShared方法只有在state=0時才返回1,表示計數器已歸零,此時方法直接返回,被阻塞的線程就能夠繼續執行 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
一般,調用await()
的線程在執行到acquireSharedInterruptibly
方法時,計數器並不爲0
,那麼當前線程就須要執行doAcquireSharedInterruptibly
方法中的阻塞邏輯了。因爲該方法內部調用了三個主要方法:addWaiter
、shouldParkAfterFailedAcquire
和parkAndCheckInterrupt
,在解析的過程當中不免會穿插對這些方法的介紹,從而引入跳躍性。爲了不跳躍性引起的閱讀和理解上的困難,這裏準備先介紹addWaiter
方法。atom
addWaiter
private Node addWaiter(Node mode) { //將當前線程構形成一個Node節點 Node node = new Node(Thread.currentThread(), mode); //獲取尾節點 Node pred = tail; //尾節點不爲空,說明隊列已完成初始化 if (pred != null) { //將node節點放到對尾,這裏的作法是先將node的prev指針指向尾節點,而後經過原子操做將新添加的node更新成尾節點,成功的話addWaiter方法結束 node.prev = pred; if (compareAndSetTail(pred, node)) { //原子操做成功的話,更新原尾節點的next指針 pred.next = node; return node; } } //執行到這裏有兩種狀況:1)尾節點爲空,即隊列還沒初始化;2)隊列已初始化,可是上文將node節點設置成尾節點失敗,此時node節點尚未真正添加進隊列 enq(node); return node; } private Node enq(final Node node) { for (;;) { Node t = tail; //若是隊列還沒初始化,則先初始化,作法是將一個空節點做爲頭結點,而後讓尾節點也指向這個空節點 if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; //這裏會一直自旋,直到成功地將node節點更新成尾節點 if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
addWaiter
方法的主要做用就是將當前線程添加到等待隊列的隊尾,若是隊列還沒初始化,則先初始化,enq
方法使用自旋避免入隊失敗。線程
doAcquireSharedInterruptibly
doAcquireSharedInterruptibly
方法,源碼以下:private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { //將當前線程添加到等待隊列,注意參數是Node.SHARED,下文會用到 final Node node = addWaiter(Node.SHARED); //該字段在state=0時纔會被設置爲false boolean failed = true; try { //又是自旋,該自旋的終止條件有兩種:1)state=0,計數器正常結束,執行return語句返回;2)線程響應中斷異常,跳出自旋 for (;;) { //獲取node的前驅節點 final Node p = node.predecessor(); //若是前驅節點是頭結點,則執行if代碼塊的邏輯 if (p == head) { //獲取state字段的狀態,若是state=0則返回1,不然返回-1 int r = tryAcquireShared(arg); //r>=0,說明計數器結束了,須要喚醒阻塞的線程 if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC //計數器正常結束時,會將failed設置爲false,避免執行finally中的語句 failed = false; return; } } //執行到這裏說明state!=0,真正的阻塞邏輯在parkAndCheckInterrupt方法裏 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { //若是線程被中斷,那麼failed=true,執行cancelAcquire方法 if (failed) cancelAcquire(node); } }
doAcquireSharedInterruptibly
先經過addWaiter
方法將當前線程添加到等待隊列尾部,而後開始自旋。若是state
字段不爲0
,那麼會執行到末尾的條件語句:3d
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException();
先來看看shouldParkAfterFailedAcquire
幹了些什麼:指針
//注意pred是node的前驅節點 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; //若是已是SIGNAL狀態,則之間返回true if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; //ws>0只能是cancelled狀態,此時經過修改指針將這些cancelled的節點從隊列刪除 if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ //若是前驅節點的狀態既不是SIGNAL,也不是CANCELLED,那麼只多是0或者PROPAGATE,就把前驅節點的狀態更新爲 Node.SIGNAL。注意:1)CONDITION狀態在CountDownLatch中並無用到;2)節點新建的時候狀態都是0,是在這裏才被修改爲了SIGNAL compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
以前對節點的SIGNAL
狀態是怎麼來的一直有點迷糊,看了上面的代碼才發現是在最後一個else
分支中設置的。從shouldParkAfterFailedAcquire
源碼瞭解到,該方法只有在前驅節點狀態是SIGNAL
時才返回true
,此時纔有機會執行parkAndCheckInterrupt
方法。parkAndCheckInterrupt
是真正讓線程掛起的地方,來看看其源碼:code
private final boolean parkAndCheckInterrupt() { //線程最終會阻塞在這裏,線程恢復以後也將從這裏繼續執行 LockSupport.park(this); return Thread.interrupted(); }
parkAndCheckInterrupt
方法藉助LockSupport
實現線程阻塞,被阻塞的線程在被喚醒後會返回當前線程的中斷狀態(注意Thread.interrupted()
會清除線程的中斷狀態)。好了,到這裏整個邏輯就比較清楚了,若是線程是正常被喚醒(即state=0
),那麼parkAndCheckInterrupt
返回false
,doAcquireSharedInterruptibly
方法會接着自旋一次,這裏再次將自旋代碼貼出:
for (;;) { //獲取node的前驅節點 final Node p = node.predecessor(); //若是前驅節點是頭結點,則執行if代碼塊的邏輯 if (p == head) { //獲取state字段的狀態,若是state=0則返回1,不然返回-1 int r = tryAcquireShared(arg); //r>=0,說明計數器結束了,須要喚醒阻塞的線程 if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } //執行到這裏說明state!=0,真正的阻塞邏輯在parkAndCheckInterrupt方法裏 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); }
那麼setHeadAndPropagate
方法作了些什麼事呢,看看它的源碼(刪掉了源碼中的註釋):
//回憶一下,顯然propagate=1,node是當前插入到對尾的新節點 private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below //把node設置爲頭結點 setHead(node); //此時propagate > 0的條件已經知足,直接執行if代碼塊的邏輯 if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; //若是沒有下一個節點,或者下一個節點的isShared返回true,就釋放。還記得嗎,在構造新節點的時候addWaiter的參數是Node.SHARED,這裏就是判斷這個字段 if (s == null || s.isShared()) doReleaseShared(); } } private void setHead(Node node) { head = node; node.thread = null; node.prev = null; }
接下來看一下doReleaseShared
是如何實現的:
private void doReleaseShared() { /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ for (;;) { //獲取頭結點 Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; //若是頭結點的狀態是SIGNAL,那麼會將其狀態修改成0,該步驟一直自旋直到成功爲止 if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases //成功修改頭結點的狀態後,會執行下面這個方法 unparkSuccessor(h); } //若是頭結點狀態已經改爲0了,就再次將其狀態更新爲Node.PROPAGATE,目的是??? else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
頭結點的狀態成功更新爲0
後,會執行unparkSuccessor
方法的邏輯,該方法源碼以下:
private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ //獲取後繼節點 Node s = node.next; //若是沒有後繼節點,或者後繼節點是CANCELLED狀態,則執行下面的代碼塊 if (s == null || s.waitStatus > 0) { s = null; //從隊列末尾向開頭遍歷,找到靠近頭結點的第一個不爲CANCELLED狀態的節點 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } //找到這樣的非CANCELLED節點,就將其喚醒 if (s != null) LockSupport.unpark(s.thread); }
unparkSuccessor
的主要工做是將頭結點後面第一個非CANCELLED
狀態的節點所對應的線程喚醒。
cancelAcquire
CANCELLED
狀態是在哪裏設置,由於還有一個方法沒有分析。doAcquireSharedInterruptibly
中的finally
語句塊會處理線程被中斷的狀況,執行的是cancelAcquire
方法的邏輯,其源碼以下:private void cancelAcquire(Node node) { // Ignore if node doesn't exist if (node == null) return; //線程中斷後,將其對應的節點中保存的線程清空 node.thread = null; // Skip cancelled predecessors //從隊列中刪除狀態爲CANCELLED的節點 Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; // predNext is the apparent node to unsplice. CASes below will // fail if not, in which case, we lost race vs another cancel // or signal, so no further action is necessary. Node predNext = pred.next; // Can use unconditional write instead of CAS here. // After this atomic step, other Nodes can skip past us. // Before, we are free of interference from other threads. //CANCELLED狀態在這裏設置 node.waitStatus = Node.CANCELLED; // If we are the tail, remove ourselves. //若是當前是尾節點,其第一個非CANCELLED狀態的前驅節點設置爲新的尾節點,pred後面的節點將會被GC回收。注意,下面的兩個原子操做,不論是否成功,都沒有重試 if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { // If successor needs signal, try to set pred's next-link // so it will get one. Otherwise wake it up to propagate. int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; //當前線程對應的節點不是尾節點,其有後繼節點而且後繼節點不是CANCELLED狀態,經過修改指針將當前線程節點從隊列刪除 if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { //根據前面的if條件,在如下幾種狀況時會執行到這裏,喚醒node節點的後繼節點 //1)pred=head,即當前被中斷的線程前面的全部線程都是CANCELLED狀態 //2)pred!=head,可是pred節點的狀態不等於SIGNAL,且將pred節點的狀態修改成SIGNAL失敗 //3)pred節點記錄的線程是null,目前已知頭結點的thread字段確實爲null,除此以外還有其餘狀況嗎??? unparkSuccessor(node); } node.next = node; // help GC } }
分析到這裏,纔剛把await()
的邏輯分析完,可是僅僅分析代碼仍然是不夠的,由於本人分析到這裏的時候,腦殼仍然是蒙的,主要緣由是缺乏一個全局的認識。代碼放在這裏都能看懂,可是代碼爲何這樣寫?當計數器結束(即state=0
)時,隊列中的等待線程是一塊兒所有換新,仍是一個一個依次喚醒?線程被喚醒後從新執行doAcquireSharedInterruptibly
中的自旋時,和第一次執行到底有哪些地方不同呢?所以,有必要對以上的邏輯進行總體梳理。
看完這部分源碼以後,發現核心的邏輯都包含在doAcquireSharedInterruptibly
中,如今是時候回過頭來整理一下該方法的邏輯了。
假設有如今有一個線程t1
執行了await
方法,因爲等待隊列還沒初始化,所以先構造一個空的頭節點,而且把t1
構形成節點加到隊列中,以下圖:
接着,在shouldParkAfterFailedAcquire
方法中修改頭結點的狀態:
如今又有新的t2
線程執行了await
,此時隊列的結構將更新爲下圖:
即每添加一個節點到等待隊尾,就將其前驅節點的狀態更新爲Node.SIGNAL
(即-1
),而後全部的線程都阻塞在parkAndCheckInterrupt
方法裏。如今,計數器已經結束,最後一個執行countDown
方法的線程順帶執行了doReleaseShared
方法,將頭結點的waitStatus
更新成了0
,以下圖:
繼續向下執行到unparkSuccessor
方法,喚醒線程t1
,t1
從parkAndCheckInterrupt
方法中醒來,繼續自旋。t1
的前置節點就是頭結點head
,且state=0
,t1
開始執行setHeadAndPropagate
,將本身設置爲頭結點,並在setHead
方法中將thread
和prev
字段都設置爲空,以下圖:
線程t1
接着執行doReleaseShared
方法,把頭節點(此時t1
就是頭結點)狀態更新爲0
,並喚醒t2
,開始執行await
以後的邏輯,以下圖:
喚醒t2
後,t1
退出await
方法,此時隊列以下:
t2
開始執行後,一樣把本身設置爲頭結點,以下:
在執行setHeadAndPropagate
方法時,t2
沒有後繼節點了,仍然會執行doReleaseShared
方法,可是在doReleaseShared
方法中,t2
即便頭結點也是尾節點,那就什麼也不作,直接結束並退出await
方法,此時隊列裏就只剩下一個頭結點了。
countDown
countDown
方法的邏輯了:public void countDown() { sync.releaseShared(1); } public final boolean releaseShared(int arg) { //以前分析過,該方法會將state的值減1,而且只有在減1後state=0纔會返回true,表示計數器結束了 if (tryReleaseShared(arg)) { //喚醒後繼節點中第一個不爲CANCELLED狀態的節點 doReleaseShared(); return true; } return false; }
當一個線程將state
修改爲0
時,順便還要執行doReleaseShared
方法,這個方法會將頭結點的後繼節點喚醒。
有一個小細節須要注意,doReleaseShared
方法在源碼中有兩個地方調用,一個入口就是剛講的countDown
方法,另外一個就是從await
方法進入,在setHeadAndPropagate
中調用,可是兩者是有前後順序的是,是countDown
方法喚醒最前面的線程以後,再由該線程依次喚醒後面的線程。