1、前言html
分析完了CyclicBarrier後,下面分析CountDownLatch,CountDownLatch用於同步一個或多個任務,強制他們等待由其餘任務執行的一組操做完成。CountDownLatch典型的用法是將一個程序分爲n個互相獨立的可解決任務,並建立值爲n的CountDownLatch。當每個任務完成時,都會在這個鎖存器上調用countDown,等待問題被解決的任務調用這個鎖存器的await,將他們本身攔住,直至鎖存器計數結束。下面開始分析源碼。java
2、CountDownLatch數據結構node
從源碼可知,其底層是由AQS提供支持,因此其數據結構能夠參考AQS的數據結構,而AQS的數據結構核心就是兩個虛擬隊列:同步隊列sync queue 和條件隊列condition queue,不一樣的條件會有不一樣的條件隊列。讀者能夠參考以前介紹的AQS。數據結構
3、CountDownLatch源碼分析app
3.1 類的繼承關係 ide
public class CountDownLatch {}
說明:能夠看到CountDownLatch沒有顯示繼承哪一個父類或者實現哪一個父接口,根據Java語言規定,可知其父類是Object。函數
3.2 類的內部類oop
CountDownLatch類存在一個內部類Sync,繼承自AbstractQueuedSynchronizer,其源代碼以下。 源碼分析
private static final class Sync extends AbstractQueuedSynchronizer { // 版本號 private static final long serialVersionUID = 4982264981922014374L; // 構造器 Sync(int count) { setState(count); } // 返回當前計數 int getCount() { return getState(); } // 試圖在共享模式下獲取對象狀態 protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } // 試圖設置狀態來反映共享模式下的一個釋放 protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero // 無限循環 for (;;) { // 獲取狀態 int c = getState(); if (c == 0) // 沒有被線程佔有 return false; // 下一個狀態 int nextc = c-1; if (compareAndSetState(c, nextc)) // 比較而且設置成功 return nextc == 0; } } }
說明:對CountDownLatch方法的調用會轉發到對Sync或AQS的方法的調用,因此,AQS對CountDownLatch提供支持。ui
3.3 類的屬性
public class CountDownLatch { // 同步隊列 private final Sync sync; }
說明:能夠看到CountDownLatch類的內部只有一個Sync類型的屬性,這個屬性至關重要,後面會進行分析。
3.4 類的構造函數
1. CountDownLatch(int) 型構造函數
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); // 初始化狀態數 this.sync = new Sync(count); }
說明:該構造函數能夠構造一個用給定計數初始化的CountDownLatch,而且構造函數內完成了sync的初始化,並設置了狀態數。
3.5 核心函數分析
1. await函數
此函數將會使當前線程在鎖存器倒計數至零以前一直等待,除非線程被中斷。其源碼以下
public void await() throws InterruptedException { // 轉發到sync對象上 sync.acquireSharedInterruptibly(1); }
說明:由源碼可知,對CountDownLatch對象的await的調用會轉發爲對Sync的acquireSharedInterruptibly(從AQS繼承的方法)方法的調用,acquireSharedInterruptibly源碼以下
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
說明:從源碼中可知,acquireSharedInterruptibly又調用了CountDownLatch的內部類Sync的tryAcquireShared和AQS的doAcquireSharedInterruptibly函數。tryAcquireShared函數的源碼以下
protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }
說明:該函數只是簡單的判斷AQS的state是否爲0,爲0則返回1,不爲0則返回-1。doAcquireSharedInterruptibly函數的源碼以下
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { // 添加節點至等待隊列 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { // 無限循環 // 獲取node的前驅節點 final Node p = node.predecessor(); if (p == head) { // 前驅節點爲頭結點 // 試圖在共享模式下獲取對象狀態 int r = tryAcquireShared(arg); if (r >= 0) { // 獲取成功 // 設置頭結點並進行繁殖 setHeadAndPropagate(node, r); // 設置節點next域 p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // 在獲取失敗後是否須要禁止線程而且進行中斷檢查 // 拋出異常 throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
說明:在AQS的doAcquireSharedInterruptibly中可能會再次調用CountDownLatch的內部類Sync的tryAcquireShared方法和AQS的setHeadAndPropagate方法。setHeadAndPropagate方法源碼以下。
private void setHeadAndPropagate(Node node, int propagate) { // 獲取頭結點 Node h = head; // Record old head for check below // 設置頭結點 setHead(node); /* * Try to signal next queued node if: * Propagation was indicated by caller, * or was recorded (as h.waitStatus either before * or after setHead) by a previous operation * (note: this uses sign-check of waitStatus because * PROPAGATE status may transition to SIGNAL.) * and * The next node is waiting in shared mode, * or we don't know, because it appears null * * The conservatism in both of these checks may cause * unnecessary wake-ups, but only when there are multiple * racing acquires/releases, so most need signals now or soon * anyway. */ // 進行判斷 if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { // 獲取節點的後繼 Node s = node.next; if (s == null || s.isShared()) // 後繼爲空或者爲共享模式 // 以共享模式進行釋放 doReleaseShared(); } }
說明:該方法設置頭結點而且釋放頭結點後面的知足條件的結點,該方法中可能會調用到AQS的doReleaseShared方法,其源碼以下。
private void doReleaseShared() { /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ // 無限循環 for (;;) { // 保存頭結點 Node h = head; if (h != null && h != tail) { // 頭結點不爲空而且頭結點不爲尾結點 // 獲取頭結點的等待狀態 int ws = h.waitStatus; if (ws == Node.SIGNAL) { // 狀態爲SIGNAL if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) // 不成功就繼續 continue; // loop to recheck cases // 釋放後繼結點 unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // 狀態爲0而且不成功,繼續 continue; // loop on failed CAS } if (h == head) // 若頭結點改變,繼續循環 break; } }
說明:該方法在共享模式下釋放,具體的流程再以後會經過一個示例給出。
因此,對CountDownLatch的await調用大體會有以下的調用鏈。
說明:上圖給出了可能會調用到的主要方法,並不是必定會調用到,以後,會經過一個示例給出詳細的分析。
2. countDown函數
此函數將遞減鎖存器的計數,若是計數到達零,則釋放全部等待的線程
public void countDown() { sync.releaseShared(1); }
說明:對countDown的調用轉換爲對Sync對象的releaseShared(從AQS繼承而來)方法的調用。releaseShared源碼以下
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
說明:此函數會以共享模式釋放對象,而且在函數中會調用到CountDownLatch的tryReleaseShared函數,而且可能會調用AQS的doReleaseShared函數,其中,tryReleaseShared源碼以下
protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero // 無限循環 for (;;) { // 獲取狀態 int c = getState(); if (c == 0) // 沒有被線程佔有 return false; // 下一個狀態 int nextc = c-1; if (compareAndSetState(c, nextc)) // 比較而且設置成功 return nextc == 0; } }
說明:此函數會試圖設置狀態來反映共享模式下的一個釋放。具體的流程在下面的示例中會進行分析。AQS的doReleaseShared的源碼以下
private void doReleaseShared() { /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ // 無限循環 for (;;) { // 保存頭結點 Node h = head; if (h != null && h != tail) { // 頭結點不爲空而且頭結點不爲尾結點 // 獲取頭結點的等待狀態 int ws = h.waitStatus; if (ws == Node.SIGNAL) { // 狀態爲SIGNAL if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) // 不成功就繼續 continue; // loop to recheck cases // 釋放後繼結點 unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // 狀態爲0而且不成功,繼續 continue; // loop on failed CAS } if (h == head) // 若頭結點改變,繼續循環 break; } }
說明:此函數在共享模式下釋放資源。
因此,對CountDownLatch的countDown調用大體會有以下的調用鏈。
說明:上圖給出了可能會調用到的主要方法,並不是必定會調用到,以後,會經過一個示例給出詳細的分析。
4、示例
下面給出了一個使用CountDownLatch的示例。
package com.hust.grid.leesf.cyclicbarrier; import java.util.concurrent.CountDownLatch; class MyThread extends Thread { private CountDownLatch countDownLatch; public MyThread(String name, CountDownLatch countDownLatch) { super(name); this.countDownLatch = countDownLatch; } public void run() { System.out.println(Thread.currentThread().getName() + " doing something"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " finish"); countDownLatch.countDown(); } } public class CountDownLatchDemo { public static void main(String[] args) { CountDownLatch countDownLatch = new CountDownLatch(2); MyThread t1 = new MyThread("t1", countDownLatch); MyThread t2 = new MyThread("t2", countDownLatch); t1.start(); t2.start(); System.out.println("Waiting for t1 thread and t2 thread to finish"); try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " continue"); } }
運行結果(某一次):
Waiting for t1 thread and t2 thread to finish t1 doing something t2 doing something t1 finish t2 finish main continue
說明:本程序首先計數器初始化爲2。根據結果,可能會存在以下的一種時序圖。
說明:首先main線程會調用await操做,此時main線程會被阻塞,等待被喚醒,以後t1線程執行了countDown操做,最後,t2線程執行了countDown操做,此時main線程就被喚醒了,能夠繼續運行。下面,進行詳細分析。
① main線程執行countDownLatch.await操做,主要調用的函數以下。
說明:在最後,main線程就被park了,即禁止運行了。此時Sync queue(同步隊列)中有兩個節點,AQS的state爲2,包含main線程的結點的nextWaiter指向SHARED結點。
② t1線程執行countDownLatch.countDown操做,主要調用的函數以下。
說明:此時,Sync queue隊列裏的結點個數未發生變化,可是此時,AQS的state已經變爲1了。
③ t2線程執行countDownLatch.countDown操做,主要調用的函數以下。
說明:通過調用後,AQS的state爲0,而且此時,main線程會被unpark,能夠繼續運行。當main線程獲取cpu資源後,繼續運行。
④ main線程獲取cpu資源,繼續運行,因爲main線程是在parkAndCheckInterrupt函數中被禁止的,因此此時,繼續在parkAndCheckInterrupt函數運行。
說明:main線程恢復,繼續在parkAndCheckInterrupt函數中運行,以後又會回到最終達到的狀態爲AQS的state爲0,而且head與tail指向同一個結點,該節點的額nextWaiter域仍是指向SHARED結點。
5、總結
通過分析CountDownLatch的源碼可知,其底層結構仍然是AQS,對其線程所封裝的結點是採用共享模式,而ReentrantLock是採用獨佔模式。因爲採用的共享模式,因此會致使後面的操做會有所差別,經過閱讀源碼就會很容易掌握CountDownLatch實現機制。謝謝各位園友的觀看~