CountDownLatch是併發包中提供的一個同步輔助類,在完成一組正在其餘線程中執行的操做以前,它容許一個或多個線程一直等待。 用給定的計數值初始化CountDownLatch。調用countDown()方法將計數減一,因此在當前計數到達零以前,調用await()方法會一直受阻塞, 直到這個CountDownLatch對象的計數值減到0爲止。計數值等於0後,會釋放全部等待的線程,await 的全部後續調用都將當即返回。 計數沒法被重置,若是CountDownLatch的計數減爲0時,後續有線程調用await()方法會直接經過。CountDownLatch也是經過AQS的共享模式進行實現。若是須要重置計數, 請考慮使用CyclicBarrier,能夠看個人另外一篇CyclicBarrier源碼分析juejin.im/post/5d3bf8…。基於Java8。java
//CountDownLatch是經過AbstractQueuedSynchronizer的實現類Sync進行實現,能夠看下下面對Sync的介紹
private final Sync sync;複製代碼
//Sync實現AbstractQueuedSynchronizer,AQS不清楚的能夠看下個人另外一篇AbstractQueuedSynchronizer源碼分析https://juejin.im/post/5d0b3b55f265da1bc23f7dca
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
//傳入計數值(即AQS的狀態,表示調用await方法的線程須要等待調用count次的countDown方法)
Sync(int count) {
//設置AQS的屬性state值
setState(count);
}
//獲取當前的計數值,即AQS中的state屬性
int getCount() {
//返回AQS中的state屬性
return getState();
}
//重寫AQS中的tryAcquireShared方法,tryAcquireShared在AQS中是模板方法,子類進行實現,不然拋出UnsupportedOperationException異常,AQS的tryAcquireSharedNanos方法中進行調用,await調用AQS的tryAcquireSharedNanos方法,返回1表示await方法無需等待,返回-1調用await方法線程須要等待
protected int tryAcquireShared(int acquires) {
//若是當前計數值等於0,返回1,表示獲取共享鎖成功,不然返回-1,獲取共享鎖失敗,返回1即調用await方法的線程能夠直接執行,無需等待
return (getState() == 0) ? 1 : -1;
}
//重寫AQS中的tryReleaseShared方法,tryReleaseShared在AQS中是模板方法,子類進行實現,不然拋出UnsupportedOperationException異常,AQS的releaseShared方法中進行調用,countDown調用AQS的releaseShared方法,返回true表示調用線程將其計數值減爲0,若是原先就等於0,直接返回false
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
//使用循環的將計數值減1,由於tryReleaseShared方法會被多個線程進行調用,循環進行遞減成功,或者計數值已經被遞減爲0
for (;;) {
//獲取計數值,即AQS的屬性值
int c = getState();
//若是計數值已經等於0
if (c == 0)
//直接返回失敗
return false;
//將計數值作減1操做
int nextc = c-1;
//使用CAS將計數值,從c修改爲nextc
if (compareAndSetState(c, nextc))
//若是cas更改計數值成功,而且計數值減到0,返回true
return nextc == 0;
}
}
}複製代碼
//傳入count計數值構造CountDownLatch實例
public CountDownLatch(int count) {
//若是count小於0,拋出IllegalArgumentException異常
if (count < 0) throw new IllegalArgumentException("count < 0");
//調用上面的介紹的Sync的構造函數,將計數值count設置爲sync從AQS繼承下來的state屬性,不清楚的能夠看下上面對Sync的介紹
this.sync = new Sync(count);
}複製代碼
//調用await方法的線程等待CountDownLatch的計數值等於0,在等待CountDownLatch的計數值等於0的時候等待過程當中被中斷,拋出中斷異常,支持中斷
public void await() throws InterruptedException {
//調用sync從AQS中繼承下來的acquireSharedInterruptibly方法進行判斷計數值是否等於0,即AQS的屬性state是否等於0
sync.acquireSharedInterruptibly(1);
}
//Sync從AbstractQueuedSynchronizer繼承下來的acquireSharedInterruptibly方法
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
//檢查調用線程是否有被中斷
if (Thread.interrupted())
//有的話直接拋出中斷異常
throw new InterruptedException();
//調用Sync重寫AQS的tryAcquireShared方法,判斷計數值是否等於0,若是等於0返回1,就無需調用doAcquireSharedInterruptibly方法等待計數值等於0,詳細的能夠看下上面對Sync的tryAcquireShared方法的介紹
if (tryAcquireShared(arg) < 0)
//doAcquireSharedInterruptibly是線程在等待CountDownLatch的計數值等於0的時候過程當中被中斷,線程等待被喚醒時,若是有中斷請求直接拋出中斷異常
doAcquireSharedInterruptibly(arg);
}
//Sync從AbstractQueuedSynchronizer繼承下來的doAcquireSharedInterruptibly方法
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
//往同步隊列中從尾節點加入一個模式爲共享鎖的節點,看下面對addWaiter方法的介紹
final Node node = addWaiter(Node.SHARED);
//執行是否失敗的標誌位,若是失敗會將此節點從同步隊列中移除
boolean failed = true;
try {
//死循環的判斷CountDownLatch的計數值是否等於0,即AQS的state屬性是否等於0,除非CountDownLatch的計數值等於0,獲取線程在等待的過程當中,被中斷,拋出中斷異常
for (;;) {
//獲取當前節點的前一個節點,前一個節點不存在拋出空指針異常,CLH(同步隊列必定會有個有效的前置節點),predecessor方法會在下面進行介紹
final Node p = node.predecessor();
//若是當前節點的前置節點爲表頭
if (p == head) {
//使用上面介紹的Sync重寫的tryAcquireShared嘗試判斷CountDownLatch的計數值是否等於0,若是等於0表示線程await方法能夠直接執行
int r = tryAcquireShared(arg);
//若是r大於0,即r等於1,CountDownLatch的計數值等於0
if (r >= 0) {
//設置當前節點爲頭節點,而且喚醒等待CountDownLatch的計數值等於0的下一個節點,能夠看下面對setHeadAndPropagate方法的介紹
setHeadAndPropagate(node, r);
//將當前節點的前置節點下一個節點置爲空,即當前節點的前置節點從同步隊列中移除
p.next = null; // help GC
//CountDownLatch的計數值等於0,線程執行成功
failed = false;
//直接返回
return;
}
}
//判斷當前節點的前置節點的狀態值是否爲SIGNAL,若是是調用parkAndCheckInterrupt方法阻塞當前線程,shouldParkAfterFailedAcquire和parkAndCheckInterrupt方法能夠看下面對這兩個方法的詳細介紹
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//若是線程在等待CountDownLatch的計數值等於0的過程當中,有其餘線程調用當前線程的interrupt方法,中斷請求,拋出中斷異常
throw new InterruptedException();
}
} finally {
//若是線程等待CountDownLatch的計數值等於0失敗,即等待線程被中斷
if (failed)
//將當前節點從同步隊列中移除,能夠看下面對cancelAcquire方法的介紹
cancelAcquire(node);
}
}
//Sync從AbstractQueuedSynchronizer繼承下來的addWaiter方法
//@param mode 要建立節點的模式,是要等待CountDownLatch的計數值等於0
private Node addWaiter(Node mode) {
//根據當前線程和傳入的節點模式建立新節點
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
//獲取同步隊列(CLH)的尾節點
Node pred = tail;
//若是尾節點不爲空
if (pred != null) {
//將新建節點的前置節點設置爲尾節點
node.prev = pred;
//使用CAS將新建節點設置爲尾節點
if (compareAndSetTail(pred, node)) {
//若是CAS成功,尾節點的下一節點爲新建節點
pred.next = node;
//返回新建節點
return node;
}
}
//不然調用enq方法進行循環的將新建節點加入同步隊列中,作爲同步隊列的尾節點,詳細的能夠看下面enq方法的介紹
enq(node);
//返回新建節點
return node;
}
//Sync從AbstractQueuedSynchronizer繼承下來的enq方法
private Node enq(final Node node) {
//死循環的將傳入節點加入到同步隊列中,作爲同步隊列的尾節點,直到節點加入隊列成功爲止
for (;;) {
//獲取尾節點
Node t = tail;
//若是尾節點爲空,代表同步隊列不存在節點
if (t == null) {
//新建個節點作爲同步隊列的頭節點,使用CAS進行頭節點的設置
if (compareAndSetHead(new Node()))
//若是頭節點設置成功,將尾節點設置爲頭節點
tail = head;
} else {//不然隊列不爲空
//將新建節點的前置節點設置爲尾節點
node.prev = t;
//使用CAS將新建節點設置爲尾節點
if (compareAndSetTail(t, node)) {
//若是CAS成功,尾節點的下一節點爲新建節點
t.next = node;
//返回新建節點
return t;
}
}
}
}
//AbstractQueuedSynchronizer中的內部類Node的predecessor方法
//獲取當前節點的前驅節點,若是調用節點的前置節點爲null,則拋出空指針異常
final Node predecessor() throws NullPointerException {
//當前節點的前驅節點
Node p = prev;
//若是前驅節點爲空
if (p == null)
//拋出空指針異常
throw new NullPointerException();
else
//返回當前節點的前驅節點
return p;
}
//Sync從AbstractQueuedSynchronizer繼承下來的setHeadAndPropagate方法,從新設置CLH隊列頭,若是CLH隊列頭的下一個節點爲null或者頭節點的下一節點模式爲共享模式,那麼就要喚醒同步隊列的下一等待的線程
private void setHeadAndPropagate(Node node, int propagate) {
//獲取同步隊列的頭節點
Node h = head; // Record old head for check below
//將傳入進來的節點設置爲同步隊列的表頭,將傳入進來的前置節點和線程都置爲空
setHead(node);
//當propagate等於1,CountDownLatch的計數值等於0,或者頭節點爲空,或者頭節點的狀態爲SIGNAL
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
// 獲取新的CLH隊列頭的下一個節點s
Node s = node.next;
// 若是節點s是空或者共享模式節點,那麼就要喚醒同步隊列上等待的線程
if (s == null || s.isShared())
//喚醒下一個等待隊列的線程,在下面對此方法進行介紹
doReleaseShared();
}
}
//Sync從AbstractQueuedSynchronizer繼承下來的doReleaseShared方法,喚醒同步隊列頭節點的下一等待CountDownLatch的計數值等於0的節點
private void doReleaseShared() {
for (;;) {
// 將同步隊列頭賦值給節點h
Node h = head;
// 若是節點h不爲null,且不等於同步隊列尾
if (h != null && h != tail) {
//獲得節點h的狀態
int ws = h.waitStatus;
//若是狀態是Node.SIGNAL,就要喚醒節點h後繼節點的線程
if (ws == Node.SIGNAL) {
// 將節點h的狀態設置成0,若是設置失敗,就繼續循環,再試一次。
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 喚醒節點h後繼節點的線程
unparkSuccessor(h);
}
//若是節點h的狀態是0,就設置ws的狀態是PROPAGATE,對AQS的內部類Node節點的狀態不清楚的,能夠看下個人另外一篇對AQS源碼的分析
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// 若是同步隊列頭head節點發生改變,繼續循環,
// 若是沒有改變,就跳出循環
if (h == head) // loop if head changed
break;
}
}
/** * Sync從AbstractQueuedSynchronizer繼承下來的shouldParkAfterFailedAcquire方法,根據前一個節點pred的狀態,來判斷當前節點對應的線程是否應該被阻塞 * @param pred : node節點的前一個節點 * @param node : 等待CountDownLatch的計數值等於0的節點 * @return 返回true 表示當前線程應該被阻塞,而後調用parkAndCheckInterrupt方法來阻塞當前線程 */
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//獲取前置節點的狀態值
int ws = pred.waitStatus;
//若是前置節點的狀態值爲SIGNAL
if (ws == Node.SIGNAL)
// 若是前一個pred的狀態是Node.SIGNAL,那麼直接返回true,當前線程應該被阻塞
return true;
//若是前置節點已經取消,循環獲取不是取消的前置節點
if (ws > 0) {
// 若是前一個節點狀態是Node.CANCELLED(大於0就是CANCELLED),
// 表示前一個節點所在線程已經被喚醒了,要從CLH隊列中移除CANCELLED的節點。
// 因此從pred節點一直向前查找直到找到不是CANCELLED狀態的節點,並把它賦值給node.prev,
// 表示node節點的前一個節點已經改變
do {
//從新賦值當前節點的前置節點
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
//不是取消的前置節點的下一節點從新賦值爲當前節點
pred.next = node;
} else {
// 此時前一個節點pred的狀態只能是0或者PROPAGATE,不多是CONDITION狀態
// CONDITION(只在condition條件隊列中節點存在,CLH同步隊列中沒有此狀態的節點)
// 將前一個節點pred的狀態設置成Node.SIGNAL,這樣在下一次循環時,就是直接阻塞當前線程
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
//返回要等待CountDownLatch的計數值等於0的節點對應的線程不會阻塞
return false;
}
//Sync從AbstractQueuedSynchronizer繼承下來的parkAndCheckInterrupt方法,阻塞當前線程,而且喚醒時檢驗當前線程在等待的過程當中是否有其餘線程發起中斷請求
private final boolean parkAndCheckInterrupt() {
//阻塞當前線程
LockSupport.park(this);
//當前線程被喚醒後,返回當前線程的中斷標誌位
return Thread.interrupted();
}
//將傳入進來的節點從同步隊列中移除,將傳入節點對應的線程置爲空,狀態置爲CANCELLED
private void cancelAcquire(Node node) {
//若是當前節點爲空,直接退出
if (node == null)
//直接退出
return;
//將當前節點對應的線程置爲空
node.thread = null;
//獲取當前要取消節點的前置節點
Node pred = node.prev;
//循環跳過前置節點狀態爲CANNELLED的值
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
//獲取狀態不是取消的前置的節點的下一個節點,在設置前置節點的下一個節點使用到
Node predNext = pred.next;
//將當前要取消的節點狀態賦值爲CANCELLED
node.waitStatus = Node.CANCELLED;
//若是要取消節點爲尾節點,將尾節點設置爲要取消節點的前一個節點
if (node == tail && compareAndSetTail(node, pred)) {
//若是設置成功,將要取消節點的前置節點的下一個節點設置爲空
compareAndSetNext(pred, predNext, null);
} else {
int ws;
//若是前置不是頭節點,而且前置節點的狀態值爲SIGNAL,或者將前置節點的狀態值設置爲SIGNAL,而且前置節點的線程不爲空
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
//獲取要取消節點的下一個節點
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
//不然的話喚醒node節點的下一個節點
unparkSuccessor(node);
}
//將要取消節點的下一節點設置爲自身,加快gc
node.next = node;
}
}
//調用await方法的線程,在一段時間內等待CountDownLatch的計數值等於0,在等待CountDownLatch的計數值等於0的時候等待過程當中被中斷,拋出中斷異常,支持中斷
public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
//調用sync從AQS中繼承下來的tryAcquireSharedNanos方法在必定時間內進行判斷計數值是否等於0,即AQS的屬性state是否等於0,可能阻塞當前線程,在必定時間內CountDownLatch的計數值不等於0,返回失敗,不然CountDownLatch的計數值等於0,喚醒當前線程
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
//Sync從AbstractQueuedSynchronizer繼承下來的tryAcquireSharedNanos方法
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {
//檢查線程是否有被中斷
if (Thread.interrupted())
//有的話直接拋出中斷異常
throw new InterruptedException();
//調用Sync重寫AQS的tryAcquireShared方法,若是CountDownLatch的計數值等於0,返回1,不然返回-1,若是返回-1調用doAcquireSharedNanos方法,詳細的能夠看下Sync的tryAcquireShared方法
return tryAcquireShared(arg) >= 0 ||
doAcquireSharedNanos(arg, nanosTimeout);
}
//Sync從AbstractQueuedSynchronizer繼承下來的doAcquireSharedNanos方法,死循環的判斷CountDownLatch的計數值是否等於0,直到超時、或者獲取到寫鎖
private boolean doAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {
//若是傳入的超時時間小於等於0
if (nanosTimeout <= 0L)
//返回調用await(long timeout, TimeUnit unit)方法的線程等待CountDownLatch的計數值等於0失敗
return false;
//當前時間加上超時時間獲得死亡時間
final long deadline = System.nanoTime() + nanosTimeout;
//根據當前線程和共享模式建立新節點,addWaiter方法能夠看上面的介紹
final Node node = addWaiter(Node.SHARED);
//在等待CountDownLatch的計數值等於0是否失敗,若是失敗在finally中將節點從同步隊列中移除
boolean failed = true;
try {
for (;;) {
//獲取新建節點的前置節點
final Node p = node.predecessor();
//新建節點的前置節點若是爲頭節點
if (p == head) {
//調用Sync重寫AQS的tryAcquireShared方法,若是CountDownLatch的計數值等於0,返回1,不然返回-1,若是返回-1調用doAcquireSharedNanos方法,詳細的能夠看下Sync的tryAcquireShared方法
int r = tryAcquireShared(arg);
//若是r大於0,即r等於1,CountDownLatch的計數值等於0
if (r >= 0) {
//設置當前節點爲頭節點,而且喚醒等待CountDownLatch的計數值等於0的下一個節點,能夠看下面對setHeadAndPropagate方法的介紹
setHeadAndPropagate(node, r);
//新建節點的前置節點的下一節點設置爲空
p.next = null; // help GC
//等待CountDownLatch的計數值等於0標誌位成功
failed = false;
//返回等待CountDownLatch的計數值等於0成功
return true;
}
}
//死亡時間減去當前時間,獲得超時時間
nanosTimeout = deadline - System.nanoTime();
//若是超時時間小於等於0,直接返回等待CountDownLatch的計數值等於0失敗
if (nanosTimeout <= 0L)
//返回等待CountDownLatch的計數值等於0失敗
return false;
//shouldParkAfterFailedAcquire方法在上一次等待CountDownLatch的計數值等於0失敗時,是否須要阻塞,根據當前節點的前置節點狀態來判斷,詳細的能夠看上面的介紹
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold) //超時時間大於必定的閾值,纔會阻塞等待CountDownLatch的計數值等於0的線程
//超時阻塞要等待CountDownLatch的計數值等於0的線程
LockSupport.parkNanos(this, nanosTimeout);
//在等待CountDownLatch的計數值等於0的線程被喚醒時,等待的過程當中有其餘線程發起中斷請求,直接拋出中斷異常
if (Thread.interrupted())
//拋出中斷異常
throw new InterruptedException();
}
} finally {
//若是等待CountDownLatch的計數值等於0標誌位失敗
if (failed)
//若是等待CountDownLatch的計數值等於0標誌位失敗,從同步隊列中移除當前節點,根據當前節點的前置節點狀態是否喚醒當前節點的不爲空的下一節點線程,cancelAcquire方法能夠看上面詳細介紹
cancelAcquire(node);
}
}
複製代碼
//CountDownLatch的計數值減1操做,即AQS的屬性state值作減1操做,使用Sync對AQS的屬性state值作減1操做
public void countDown() {
sync.releaseShared(1);
}
//Sync從AbstractQueuedSynchronizer繼承下來的releaseShared方法,參數arg沒有使用到
public final boolean releaseShared(int arg) {
//調用Sync重寫AQS的tryReleaseShared方法,嘗試將CountDownLatch的計數值減1,若是計數值減一操做成功,而且減1操做後計數值等於0,表示CountDownLatch的計數值等於0,喚醒等待CountDownLatch的計數值等於0的全部線程
if (tryReleaseShared(arg)) {
//喚醒等待CountDownLatch的計數值等於0的全部線程
doReleaseShared();
//返回計數值減一操做,而且減1操做後計數值等於0,喚醒等待CountDownLatch的計數值等於0的全部線程成功
return true;
}
//返回失敗,可能計數值減一操做成功,但減1操做後計數值不等於0,或者計數值原先就等於0
return false;
}
//Sync從AbstractQueuedSynchronizer繼承下來的doReleaseShared方法,喚醒等待CountDownLatch的計數值等於0的全部線程
private void doReleaseShared() {
for (;;) {
// 將同步隊列頭賦值給節點h
Node h = head;
// 若是節點h不爲null,且不等於同步隊列尾
if (h != null && h != tail) {
// 獲得節點h的狀態
int ws = h.waitStatus;
//若是頭節點的狀態是Node.SIGNAL,就要喚醒節點h後繼節點的線程
if (ws == Node.SIGNAL) {
// 將節點h的狀態設置成0,若是設置失敗,就繼續循環,再試一次。
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
// 喚醒頭節點h後繼節點的線程
unparkSuccessor(h);
}
// 若是節點h的狀態是0,就設置ws的狀態是PROPAGATE。
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
// 若是同步隊列頭head節點發生改變,繼續循環,
// 若是沒有改變,就跳出循環
if (h == head)
break;
}
}
複製代碼
//獲取CountDownLatch當前的計數值
public long getCount() {
//直接調用sync的getCount方法,獲取計數值,即AQS的state屬性
return sync.getCount();
}
//CountDownLatch重寫Object的toString方法
public String toString() {
//返回Object的toString方法和CountDownLatch的計數值組合
return super.toString() + "[Count = " + sync.getCount() + "]";
}
複製代碼