CountDownLatch是AbstractQueuedSynchronizer中共享鎖模式的一個的實現,是一個同步工具類,用來協調多個線程之間的同步。CountDownLatch可以使一個或多個線程在等待另一些線程完成各自工做以後,再繼續執行。CountDownLatch內部使用一個計數器進行實現線程通知條件,計數器初始值爲進行通知線程的數量。當每個通知線程完成本身任務後,計數器的值就會減一。當計數器的值爲0時,表示全部的通知線程都已經完成一些任務,而後在CountDownLatch上全部等待的線程就能夠恢復執行接下來的任務。基本上CountDownLatch的原理就是這樣,下面咱們一塊兒去看看源碼。html
public class CountDownLatch { private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { setState(count); } .... } private final Sync sync; public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } .... }
從上面簡略的源碼能夠看出,CountDownLatch和ReentrantLock同樣,在內部聲明瞭一個繼承AbstractQueuedSynchronizer的Sync內部類(重寫了父類的tryAcquireShared(int acquires)和tryReleaseShared(int releases)),並在聲明瞭一個sync屬性。CountDownLatch只有一個有參構造器,參數count就是上面說的的進行通知的線程數目,說白點也就是countDown()方法須要被調用的次數。node
CountDownLatch的主要方法是wait()和countDown(),咱們先看wait()方法源碼。多線程
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
和ReentrantLock同樣,CountDownLatch依然算是一件外衣,實際仍是靠sync進行操做。咱們接着看AQS的acquireSharedInterruptibly(int arg)方法(實際上參數在CountDownLatch裏沒什麼用)工具
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
看到先判斷當前線程是不是中斷狀態,而後調用子類重寫的tryAcquireShared(int acquires)方法去判斷是否須要進行阻塞(也便是嘗試獲取鎖),若是返回值小於0 ,就調用doAcquireSharedInterruptibly(int acquires)方法進行線程阻塞。先看tryAcquireShared(int acquires)方法oop
protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }
源碼很簡單,就是看下state是否等於0,等於0,就返回1,表明不須要線程阻塞,不等於0(實際上state只會大於或者等於0),就返回-1,表示須要進行線程阻塞。這裏有個伏筆就是若是CountDownLatch的計數器state被減至0時,後續再有線程調用CountDownLatch的wait()方法時,會直接往下執行調用者方法的代碼,不會形成線程阻塞。源碼分析
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
在doAcquireSharedInterruptibly(int acquires)方法中進行線程阻塞的步驟依然是先調用addWaiter(Node mode)方法將該線程封裝到AQS內部的CLH隊列的Node.SHARE(共享)模式的Node節點,並放入到隊尾,而後在循環中去嘗試持有鎖和進行線程阻塞。在循環體內,先獲取到前任隊尾,而後判斷前任隊尾是不是隊首head,若是是就調用tryAcquireShared(int acquires)嘗試獲取鎖,若是返回1表示獲取到了鎖,就調用setHeadAndPropagate(Node node, int propagate)方法將節點設置head而後再往下傳播,解除後續節點的線程阻塞狀態(這就是共享鎖的核心)。若是返回-1,表示沒有獲取到鎖,就調用shouldParkAfterFailedAcquire(Node pre, Node node)進行pre節點的waitStatus賦值爲Node.SIGNAL,而後在墨跡一次循環,調用parkAndCheckInterrupt()方法進行線程阻塞。咱們先看setHeadAndPropagate(Node node, int propagate)方法源碼ui
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) // 釋放共享鎖,這是這一步是最關鍵的一步 doReleaseShared(); } }
在setHeadAndPropagate(Node node, int propagate)方法中,直接將node設置從head,由於參數propagate爲始終爲1(到這一步就表示獲取了共享鎖,state等於0,在tryAcquireShared(int acquires)方法中就只會返回1),因此也就是後面直接獲取head的next節點,若是head的next節點存在,而且是共享模式,就調用doReleaseShared()方法去釋放CLH中head的next節點。this
shouldParkAfterFailedAcquire(Node pre, Node node)和parkAndCheckInterrupt()兩個方法在JUC之ReentrantLock源碼分析一篇博客中已經講過了,這裏再也不贅述了。spa
doReleaseShared()這個方法其實也是countDown()方法的核心實現,這裏先賣個關子,後面會講到。咱們接着看doReleaseShared()方法。線程
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
當調用wait()方法進行線程阻塞等待被別的線程解除阻塞後,對於AQS中共享鎖最核心的代碼就是doReleaseShared()這個方法。先獲取head節點,若是head節點存在而且有後續節點,就先判斷head節點的狀態,若是狀態是Node.SIGNAL(表示後續節點須要鎖釋放通知),將head節點狀態改成0,而後解除下一節點的線程阻塞狀態,而後判斷下以前獲取的head和如今的head是否仍是同一個,若是是就結束循環,若是不是,那就接着循環。其實就算是存在線程在執行完unparkSuccessor(Node node)方法後失去了CPU的執行權,一直到被解除線程阻塞的next節點坐上了head節點後纔有機會獲取到CPU執行權這種狀況,無非就是以前獲取到head和如今的head不相同了,大不了再循環一次,也算是多線程去解除當前head節點的next節點線程阻塞,影響不大;若是狀態是0,嘗試將狀態有0改成Node.PROPAGATE,而後重複循環,head節點是0的這種狀態在CountDownLatch中應該是不會出現的,多是AQS對別的類的兼容,也多是我眼拙沒看出來。若是head爲null或者head與tail相同,就結束循環。
到這裏CountDownLatch的wait()方法就分析完成了,這裏總結下wait()方法流程:
一、若是state是0,直接往下執行調用者的代碼,不會進行線程等待阻塞
二、將當前線程封裝到共享模式的Node節點中,而後放入到CLH隊列的隊尾
三、將前任隊尾的waitStatus改變爲Node.SIGNAL,而後調用LockSupport.park()阻塞當前線程,等待前節點喚醒
四、被前節點喚醒後,把本身設爲head,而後去喚醒next節點
咱們看完了wait()方法,如今在來看下countDown()方法的源碼
public void countDown() { sync.releaseShared(1); }
一如既往的簡單,直接調用AQS的releaseShared(int arg)方法,咱們直接去看AQS的releaseShared(int arg)方法
AQS方法 public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } CountDownLatch方法 protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } }
在AQS的releaseShared(int arg)中先去調用必定要被子類重寫的tryReleaseShared(int releases)方法,返回值表示是否須要進行喚醒操做,若是返回true,那就調用doReleaseShared()方法去解除head節點的next節點線程阻塞狀態。是的,你沒看錯,就是咱們前面分析的doReleaseShared()方法,他們複用了同一個方法。而在CountDownLatch的tryReleaseShared(int releases)方法實現也很是簡單,就是判斷下當前state是不是0,若是是0,表示已經減完了,不須要再減了,等待線程已經在被依次喚醒了,返回false表示不須要去喚醒後續節點了。最後再看看減完後的state是不是等於0,等於0,表示須要去解除後續節點的阻塞狀態;不等於0(實際上是大於0),表示調用countDown()方法去減state的次數還不夠,暫時還不能解除後續節點的阻塞狀態。
countDown()方法比較簡單,咱們也總結下countDown()流程:
一、若是state爲0,表示已經有線程在解除CLH隊列節點的阻塞狀態了,這裏直接結束
二、若是state減去1後不等於0,表示還要等有線程再次調用countDown()方法進行state減1,這裏直接結束
三、若是state減去1後等於0,表示已經線程調用countDown()方法的次數已經達到最初設定的次數,而後去解除CLH隊列上節點的阻塞狀態