CountDownLatch是JDK1.5提供用來多線程併發同步的工具,可讓一個或多個線程等待另外一個線程執行完再執行。java
private static CountDownLatch countDownLatch = new CountDownLatch(1); static class ThreadRunnable1 implements Runnable{ @Override public void run() { countDownLatch.countDown(); } } public static void main(String[] args) throws InterruptedException { Thread thread1 = new Thread(new ThreadRunnable1()); thread1.setName("線程1"); thread1.start(); countDownLatch.await(); }
如上例子,先建立了計數器爲1的CountDownLatch,運行線程1調用countDown()方法,主線程調用await()方法並阻塞。調用countDown()會將計數器減1當計數器到達0,會喚醒await()的阻塞並繼續執行下面的邏輯。node
CountDownLatch定義了一個成員變量多線程
private final Sync sync;
Sync繼承至AbstractQueuedSynchronizer是CountDownLatch的內部類,提供了阻塞隊列的支持。CountDownLatch的UML關係圖以下併發
CountDownLatch主要利用AbstractQueuedSynchronizer(下文稱AQS)中的隊列和隊列的通知傳播的技術實現。建立CountDownLatch時先初始化設置的計數器,調用countDown()能夠減計數器,當計數器到0時會通知去喚醒調用了await()的線程,而調用了await()線程會被加入到AQS同步阻塞隊列,因此先喚醒AQS的同步阻塞隊列的頭節點線程,頭節點線程被喚醒後會一樣喚醒AQS同步阻塞隊列的下個節點,以此傳播喚醒全部調用了await()的線程。ide
下面來分析下CountDownLatch的2個主要方法countDown()和await()工具
countDown()源碼以下oop
public void countDown() { sync.releaseShared(1); }
countDown()是直接調用了AQS裏的releaseShared(1)方法咱們來看下releaseShared(1)方法源碼分析
releaseShared(1)源碼以下ui
public final boolean releaseShared(int arg) { //計數器減一併返回剩餘計數器是否等於0 if (tryReleaseShared(arg)) { //喚醒同步阻塞隊列中的頭節點的下個節點線程 doReleaseShared(); return true; } return false; }
先調用tryReleaseShared(arg)將計數器減一併返回剩餘計數器是否等於0,若是是調用doReleaseShared()喚醒同步阻塞隊列中的頭節點的下個節點線程。.net
tryReleaseShared(arg)方法是CountDownLatch內部類Sync的實現。咱們來看下tryReleaseShared(arg)的源碼。
protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { //獲取當前計時器 int c = getState(); //若是已經爲0則返回false,說明被以前線程減成0了 if (c == 0) return false; //計時器減一 int nextc = c-1; //CSA更新值,若是更新失敗自旋更新直至成功 if (compareAndSetState(c, nextc)) //更新成功後剩餘的計時器是否爲0 return nextc == 0; } }
tryReleaseShared(arg)實現很簡單,獲取當前計時器減一併使用CAS方式更新,若是更新失敗自旋更新直至成功,最後返回剩餘的計時器是否爲0。
doReleaseShared()這個方法比較重要await()中也會調用,這裏先不作分析,下面會在await()方法中一塊兒分析。
await()源碼以下
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
await()是直接調用了AQS裏的acquireSharedInterruptibly(1)方法咱們來看下acquireSharedInterruptibly(1)方法
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { //判斷當前線程是否被中斷,中斷的話拋出異常 if (Thread.interrupted()) throw new InterruptedException(); //判斷當前計數器是否爲0是的話返回1不然返回-1 if (tryAcquireShared(arg) < 0) //加入到同步阻塞隊列等待被喚醒 doAcquireSharedInterruptibly(arg); }
先判斷當前線程是否被中斷,中斷的話拋出異常。而後調用tryAcquireShared(arg)判斷當前計數器是否爲0是的話返回1不然返回-1,這裏若是計數器爲0了就表示計數器已經被countDown()減完了無需進行阻塞。若是計數器未被減完則調用 doAcquireSharedInterruptibly(arg)加入到同步阻塞隊列等待被喚醒。
tryAcquireShared(arg)方法也是CountDownLatch內部類Sync的實現,下面看下tryAcquireShared(arg)的源碼,源碼以下
protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }
實現很簡單取計數器判斷是否爲0是的話返回1不然返回-1。
下面doAcquireSharedInterruptibly(arg)來看下的源碼
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { //以當前線程爲基礎增長共享節點到同步阻塞隊列 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { //獲取前一個節點 final Node p = node.predecessor(); if (p == head) { //判斷當前計數器是否爲0是的話返回1不然返回-1 int r = tryAcquireShared(arg); if (r >= 0) { //設置頭節點並傳播喚醒同步阻塞隊列中的下個節點 setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } //阻塞線程 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) 刪除失敗節點 cancelAcquire(node); } }
doAcquireSharedInterruptibly裏有個循環,這個循環的主要做用就是在線程喚醒後重試獲取鎖直到獲取鎖。node.predecessor()獲取當前線程節點的前一個節點,若是是頭節點,則當前線程嘗試獲取鎖,獲取鎖成功調用setHeadAndPropagate(node, r)設置頭節點並傳播通知。若是獲取失敗或者非頭節點則調用shouldParkAfterFailedAcquire(p, node)判斷是否須要阻塞等待,若是須要阻塞等待則調用parkAndCheckInterrupt()阻塞當前線程並讓出cup資源資質被前一個節點喚醒,若是線程被中斷則拋出InterruptedException()異常。
doAcquireSharedInterruptibly中的addWaiter(Node.SHARED),shouldParkAfterFailedAcquire(p, node)和parkAndCheckInterrupt()方法以前的文章已經描述過(連接)這裏就再也不贅述。
下面來看下setHeadAndPropagate(node, r)的實現
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; //更新當前節點爲頭節點 setHead(node); //這裏propagate必大於0 if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; //若是下個節點爲空或者是共享模式,就要傳播喚醒下個節點 if (s == null || s.isShared()) doReleaseShared(); } }
主要邏輯就是設置當前節點爲頭節點,若是當前節點的下個節點不爲空就要傳播喚醒下個節點。
下面主要看下doReleaseShared()方法
private void doReleaseShared() { for (;;) { //1.獲取頭節點 Node h = head; //2.頭節點不會空,並且頭節點有下個節點 if (h != null && h != tail) { int ws = h.waitStatus; //3.若是頭節點狀態是SIGNAL,則喚醒 if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } //4.若是節點爲初始狀態則設置節點爲PROPAGATE狀態 else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } //5.若是頭節點改變,繼續循環 if (h == head) // loop if head changed break; } }
整個循環就是利用樂觀鎖的方式改變節點狀態並喚醒下個節點線程。5個步驟以下
(1)獲取頭節點
(2)頭節點不會空,並且頭節點有下個節點
(3)步驟就是判斷頭節點狀態是否是SIGNAL狀態,這裏若是頭節點狀態是SIGNAL說明此節點已經被它的下個節點設置爲SIGNAL狀態,須要喚醒此節點的下個節點, 這裏調用unparkSuccessor(h)喚醒,unparkSuccessor(h)源碼方法以前的文章已經描述過(連接)這裏就再也不贅述。
(4)若是頭節點爲初始狀態0這設置狀態爲PROPAGATE。這裏不會喚醒他的下個節點,那麼下個節點會不會一直阻塞。其實下個節點線程不會一直阻塞,若是頭節點爲初始狀態0說明它的下個節點還沒調用shouldParkAfterFailedAcquire(p, node)方法改變頭節點的狀態爲SIGNAL。若是在步驟4前下個節點線程調用shouldParkAfterFailedAcquire(p, node)改變頭節點的狀態爲SIGNAL,shouldParkAfterFailedAcquire(p, node)會返回false,繼續循環這時下個節點線程不會阻塞直接進入setHeadAndPropagate(node, r)方法。若是當前線程執行步驟4設置失敗,說明下個節點線程在調用此方法以前先調用shouldParkAfterFailedAcquire(p, node)改變頭節點的狀態,這樣也不影響流程,這時從新循環調用unparkSuccessor(h)便可。不過本人沒看出步驟4在CountDownLatch有什麼實質用處。
(5)步驟5是循環退出的條件若是頭節點未變退出循環,不然繼續循環,頭節點何時會改變,一個是喚醒下個節點後下個節點在步驟5前調用setHeadAndPropagate(node, r)方法更新頭節點,另一個就是另外一個線程添加節點後沒有阻塞直接setHeadAndPropagate(node, r)方法更新頭節點。