一、在前面章節瞭解了CAS、AQS後,想必你們已經對這塊知識有了深入的瞭解了; 二、而JDK中有一個關於計數同步器的工具類,它也是基於AQS實現的; 三、那麼本章節就和你們分享分析一下JDK1.8的CountDownLatch的工做原理;
一、CountDownLatch從英文字面上理解,count計數作down的減法動做,而Latch又是門閂的意思; 二、CountDownLatch是一種同步幫助,容許一個或多個線程等待,直到在其餘線程中執行的一組操做完成。; 三、CountDownLatch內部沒有所謂的公平鎖\非公平鎖的靜態內部類,只有一個Sync靜態內部類,CountDownLatch內部基本上也是經過sync.xxx之類的這種調用方式的; 四、CountDownLatch內部維護了一個虛擬的資源池,若是許可數不爲爲0一直線程阻塞等待,直到許可數爲0時才釋放繼續往下執行;
一、其實CountDownLatch的實現也偏偏很好利用了其父類AQS的state變量值; 二、初始化一個數量值做爲計數器的默認值,假設爲N,那麼當任何線程調用一次countDown則計數值減1,直到許可爲0時才釋放等待; 三、CountDownLatch,簡單大體意思爲:A組線程等待另外B組線程,B組線程執行完了,A組線程才能夠執行;
一、public CountDownLatch(int count) // 建立一個給定許計數值的計數同步器對象 二、public void await() // 入隊等待,直到計數器值爲0則釋放等待 三、public void countDown() // 釋放許可,計數器值減1,若計數器值爲0則觸發釋放無用結點 四、public long getCount() // 獲取目前最新的共享資源計數器值
一、獲取共享鎖: public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } await{ 若是檢測中斷狀態發現被中斷過的話,那麼則拋出InterruptedException異常 若是嘗試獲取共享鎖失敗的話( 嘗試獲取共享鎖的各類方式由AQS的子類實現 ), 那麼就新增共享鎖結點經過自旋操做加入到隊列中,而後經過調用LockSupport.park進入阻塞等待,直到計數器值爲零才釋放等待 } 二、釋放共享鎖: public void countDown() { sync.releaseShared(1); } release{ 若是嘗試釋放共享鎖失敗的話( 嘗試釋放共享鎖的各類方式由AQS的子類實現 ), 那麼經過自旋操做完成阻塞線程的喚起操做 }
好比百米賽跑,我就以賽跑爲例生活化闡述該CountDownLatch原理: 一、場景:百米賽跑十人蔘賽,終點處有一個裁判計數; 二、開跑一聲槍響,十我的爭先恐後的向終點跑去,真的是振奮多秒,使人振奮; 三、當一我的到達終點,這我的就完成了他的賽跑事情了,就沒事一邊玩去了,那麼裁判則減去一我的; 四、隨着人員陸陸續續的都跑到了終點,最後裁判計數顯示還有0我的未到達,意思就是人員都達到了; 五、而後裁判就拿着登記的成績屁顛屁顛去輸入電腦登記了; 八、到此打止,這一系列的動做認爲是A組線程等待另外其餘組線程的操做,直到計數器爲零,那麼A則再幹其餘事情;
一、構造器源碼: /** * Constructs a {@code CountDownLatch} initialized with the given count. * * @param count the number of times {@link #countDown} must be invoked * before threads can pass through {@link #await} * @throws IllegalArgumentException if {@code count} is negative */ public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } 二、建立一個給定許計數值的計數同步器對象,計數器值必須大於零,count值最後賦值給了state這個共享資源值;
一、AQS --> Sync 二、CountDownLatch內的同步器都是經過Sync抽象接口來操做調用關係的,細看會發現基本上都是經過sync.xxx之類的這種調用方式的;
一、源碼: /** * Causes the current thread to wait until the latch has counted down to * zero, unless the thread is {@linkplain Thread#interrupt interrupted}. * * // 致使當前線程等待,直到計數器值減爲零則釋放等待,或者因爲線程被中斷也可致使釋放等待; * * <p>If the current count is zero then this method returns immediately. * * <p>If the current count is greater than zero then the current * thread becomes disabled for thread scheduling purposes and lies * dormant until one of two things happen: * <ul> * <li>The count reaches zero due to invocations of the * {@link #countDown} method; or * <li>Some other thread {@linkplain Thread#interrupt interrupts} * the current thread. * </ul> * * <p>If the current thread: * <ul> * <li>has its interrupted status set on entry to this method; or * <li>is {@linkplain Thread#interrupt interrupted} while waiting, * </ul> * then {@link InterruptedException} is thrown and the current thread's * interrupted status is cleared. * * @throws InterruptedException if the current thread is interrupted * while waiting */ public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } 二、await此方法被調用後,則一直會處於等待狀態,其核心仍是因爲調用了LockSupport.park進入阻塞等待; 當計數器值state=0時能夠打破等待現狀,固然還有線程被中斷後也能夠打破線程等待現狀;
一、源碼: /** * Acquires in shared mode, aborting if interrupted. Implemented * by first checking interrupt status, then invoking at least once * {@link #tryAcquireShared}, returning on success. Otherwise the * thread is queued, possibly repeatedly blocking and unblocking, * invoking {@link #tryAcquireShared} until success or the thread * is interrupted. * @param arg the acquire argument. * This value is conveyed to {@link #tryAcquireShared} but is * otherwise uninterpreted and can represent anything * you like. * @throws InterruptedException if the current thread is interrupted */ public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) // 調用以前先檢測該線程中斷標誌位,檢測該線程在以前是否被中斷過 throw new InterruptedException(); // 若被中斷過的話,則拋出中斷異常 if (tryAcquireShared(arg) < 0) // 嘗試獲取共享資源鎖,小於0則獲取失敗,此方法由AQS的具體子類實現 doAcquireSharedInterruptibly(arg); // 將嘗試獲取鎖資源的線程進行入隊操做 } 二、因爲是實現同步計數器功能,因此tryAcquireShared首次調用一定小於0,則就順利了進入了doAcquireSharedInterruptibly線程等待; 至於首次調用爲何會小於0,請看子類的實現,子類的實現判斷爲 "(getState() == 0) ? 1 : -1" ;
一、源碼: protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; // 計數器值與零比較判斷,小於零則獲取鎖失敗,大於零則獲取鎖成功 } 二、嘗試獲取共享鎖資源,可是在計數器CountDownLatch這個功能中,小於零則須要入隊,進入阻塞隊列進行等待;大於零則喚醒等待隊列,釋放await方法的阻塞等待;
一、源碼: /** * Acquires in shared interruptible mode. * @param arg the acquire argument */ private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { // 按照給定的mode模式建立新的結點,模式有兩種:Node.EXCLUSIVE獨佔模式、Node.SHARED共享模式; final Node node = addWaiter(Node.SHARED); // 建立共享模式的結點 boolean failed = true; try { for (;;) { // 自旋的死循環操做方式 final Node p = node.predecessor(); // 獲取結點的前驅結點 if (p == head) { // 若前驅結點爲head的話,那麼說明當前結點天然不用說了,僅次於老大以後的即是老二了咯 int r = tryAcquireShared(arg); // 並且老二也但願嘗試去獲取一下鎖,萬一頭結點恰巧剛剛釋放呢?但願仍是要有的,萬一實現了呢。。。 if (r >= 0) { // 若r>=0,說明已經成功的獲取到了共享鎖資源 setHeadAndPropagate(node, r); // 把當前node結點設置爲頭結點,而且調用doReleaseShared釋放一下無用的結點 p.next = null; // help GC failed = false; return; } // 可是在await方法首次被調用會流轉到此,這個時候獲取鎖資源會失敗,即r<0,因此會進入是否須要休眠的判斷 // 可是第一次進入休眠方法,由於被建立的結點waitStatus=0,因此會被修改一次爲SIGNAL狀態,再次循環一次 // 而第二次循環進入shouldParkAfterFailedAcquire方法時,返回true就是須要休眠,則順利調用park方式阻塞等待 } if (shouldParkAfterFailedAcquire(p, node) && // 根據前驅結點看看是否須要休息一下子 parkAndCheckInterrupt()) // 阻塞操做,正常狀況下,獲取不到共享鎖,代碼就在該方法中止了,直到被喚醒 // 被喚醒後,發現parkAndCheckInterrupt()裏面檢測了被中斷了的話,則補上中斷異常,所以拋了個異常 throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } 二、doAcquireSharedInterruptibly在實現計數器原理的時候,主要的乾的事情就是等待再等待,等到計數器值爲零時才甦醒;
一、源碼: /** * Decrements the count of the latch, releasing all waiting threads if * the count reaches zero. * * <p>If the current count is greater than zero then it is decremented. * If the new count is zero then all waiting threads are re-enabled for * thread scheduling purposes. * * <p>If the current count equals zero then nothing happens. */ public void countDown() { sync.releaseShared(1); // 釋放一個許可資源 } 二、釋放許可資源,也就是計數器值不斷的作減1操做,當計數器值爲零的時候,該方法將會釋放全部正在等待的線程隊列; 至於爲何還會釋放全部,請看後續的releaseShared(int arg)講解;
一、源碼: /** * Releases in shared mode. Implemented by unblocking one or more * threads if {@link #tryReleaseShared} returns true. * * @param arg the release argument. This value is conveyed to * {@link #tryReleaseShared} but is otherwise uninterpreted * and can represent anything you like. * @return the value returned from {@link #tryReleaseShared} */ public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { // 嘗試釋放共享鎖資源,此方法由AQS的具體子類實現 doReleaseShared(); // 自旋操做,喚醒後繼結點 return true; // 返回true代表全部線程已釋放 } return false; // 返回false代表目前還沒釋放徹底,只要計數器值不爲零的話,那麼都會返回false } 二、releaseShared方法首先就判斷了tryReleaseShared(arg)的返回值,可是計數器值只要不爲零,都會返回false,所以releaseShared該方法就立馬返回false了; 三、因此當計數器值慢慢減至零時,則立馬返回true,那麼也就立馬會調用doReleaseShared釋放全部等待的線程隊列;
一、源碼: // CountDownLatch 的靜態內部類 Sync 類的 tryReleaseShared 方法 protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { // 自旋的死循環操做方式 int c = getState(); // 獲取最新的計數器值 if (c == 0) // 若計數器值爲零,說明已經經過CAS操做減至零了,因此在併發中讀取到零時並不須要作什麼操做,所以返回false return false; int nextc = c-1; // 計數器值減1操做 if (compareAndSetState(c, nextc)) // 經過CAS比較,順利狀況下設置成功返回true return nextc == 0; // 當經過計算操做獲得的nextc爲零時經過CAS修改爲功,那麼代表全部事情都已經作完,須要釋放全部等待的線程隊列 // 若CAS失敗,想都不用想確定是因爲併發操做,致使CAS失敗,那麼惟一可作的就是下一次循環查看是否已經被其餘線程處理了 } } 二、CountDownLatch的靜態內部類實現父類AQS的方法,用來處理如何釋放鎖,籠統的講,若返回負數則須要進入阻塞隊列,不然須要釋放全部等待隊列;
一、源碼: /** * Release action for shared mode -- signals successor and ensures * propagation. (Note: For exclusive mode, release just amounts * to calling unparkSuccessor of head if it needs signal.) */ private void doReleaseShared() { /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ for (;;) { // 自旋的死循環操做方式 Node h = head; // 每次都是取出隊列的頭結點 if (h != null && h != tail) { // 若頭結點不爲空且也不是隊尾結點 int ws = h.waitStatus; // 那麼則獲取頭結點的waitStatus狀態值 if (ws == Node.SIGNAL) { // 若頭結點是SIGNAL狀態則意味着頭結點的後繼結點須要被喚醒了 // 經過CAS嘗試設置頭結點的狀態爲空狀態,失敗的話,則繼續循環,由於併發有可能其它地方也在進行釋放操做 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); // 喚醒頭結點的後繼結點 } // 如頭結點爲空狀態,則把其改成PROPAGATE狀態,失敗的則多是由於併發而被改動過,則再次循環處理 else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } // 若頭結點沒有發生什麼變化,則說明上述設置已經完成,大功告成,功成身退 // 若發生了變化,多是操做過程當中頭結點有了新增或者啥的,那麼則必須進行重試,以保證喚醒動做能夠延續傳遞 if (h == head) // loop if head changed break; } } 二、主要目的是釋放線程中全部等待的隊列,當計數器值爲零時,此方法立刻會被調用,經過自旋方式輪詢幹掉全部等待的隊列;
一、有了分析AQS的基礎後,再來分析CountDownLatch便快了不少; 二、在這裏我簡要總結一下CountDownLatch的流程的一些特性: • 管理一個大於零的計數器值; • 每countDown一次則state就減1一次,直到許可證數量等於0則釋放隊列中全部的等待線程; • 也能夠經過countDown/await組合一塊兒使用,來實現CyclicBarrier的功能;
