CountDownLatch使用方法很是簡單,主要就是兩個方法,await()方法和countDown()方法,await()方法會使線程阻塞。countDown()會將線程同步狀態減1,當同步狀態爲0使喚醒線程。node
仍是經過源碼來理解這個類。oop
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); }
public class CountDownLatch { /** * Synchronization control For CountDownLatch. * Uses AQS state to represent count. */ private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { setState(count); } int getCount() { return getState(); } protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }
與重入鎖同樣,也是有一個內部Sync類。從代碼中能夠看出,這個類得初始化須要設置一個大於0得count,這個count用來標記線程的同步狀態。構造方法就很少說了,這裏主要講最重要的await()和countDown()方法。ui
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted())
// 判斷線程是否處於阻塞狀態,若是是,拋出異常 throw new InterruptedException(); if (tryAcquireShared(arg) < 0) //內部Sync類中的方法,線程同步狀態爲0,返回1,不然返回-1 doAcquireSharedInterruptibly(arg); }
而後咱們再來看doAcquireSharedInterruptibly(arg)這個方法this
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
//生成一個共享模式的新節點,而且將這個節點添加到同步隊列的隊尾 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) {
//找到這個節點的前一個節點 final Node p = node.predecessor(); if (p == head) {
// 若是前一個節點是head節點,也就是說這個節點處於同步隊列的最前面 int r = tryAcquireShared(arg); if (r >= 0) {
// 查詢線程的同步狀態,若是大於等於0,將這個節點置爲head節點。而且喚醒這個線程 setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } }
// 判斷是否須要將當前線程阻塞 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed)
// 若是失敗,將這個節點從同步隊列移除 cancelAcquire(node); } }
這裏涉及到了三個方法,setHeadAndPropagate(),shouldPardAfterFailedAcquire()和cancelAcquire()方法,下面一一介紹。不過在這以前先要說一下Node的幾種等待狀態,他方便後面理解代碼。spa
(1)CANCELLED:值爲1,因爲在同步隊列中等待的線程等待超時或被中斷,須要從同步隊列中取消等待,節點進入該狀態將不會變化線程
(2)SINGAL:值爲-1,後繼節點的線程處於等待狀態,當前節點若是釋放了同步狀態,將會通知後繼節點,使後繼節點得以運行debug
(3)CONDITION:值爲-2,節點在等待隊列中(這個在Condition的博客裏會講到),節點線程等待在Condition上,當其餘線程對Condition調用了singal後,該節點會從等待隊列轉移到同步隊列,加入到對同步狀態的獲取中去code
(4)PROPAGEATE:值爲-3,表示下一次共享式同步狀態獲取將會無條件被傳播下去blog
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below // 將這個節點設置爲head節點
setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared())
// 喚醒處於同步隊列最前面的線程,這個方法後面再說 doReleaseShared(); } }
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true; if (ws > 0) {
// 若是等待狀態大於0,也就是處於CANCELLED狀態,這裏的循環是爲了過濾同步狀態中等待狀態爲CANCELLED的節點 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else {
// CAS操做,將前置節點的等待狀態設置爲SIGNAL compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
private void cancelAcquire(Node node) { // Ignore if node doesn't exist if (node == null) return; node.thread = null; // 聲明node的前置節點pred Node pred = node.prev;
// 過濾掉同步隊列中等待狀態爲CANCELLED的節點 while (pred.waitStatus > 0) node.prev = pred = pred.prev; // 聲明pred的下一個節點predNext Node predNext = pred.next; node.waitStatus = Node.CANCELLED;
//若是node爲尾節點,設置pred爲tail節點,而且設置pred的next節點爲null if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0)
// 設置node節點的下一個節點爲pred的下一個節點 compareAndSetNext(pred, predNext, next); } else {
//喚醒線程 unparkSuccessor(node); } node.next = node; // help GC } }
整個await()方法的全部代碼就是這些。若是認真看源代碼而且仔細梳理的話會發現其實也沒那麼難懂,下面介紹countDown()方法。隊列
public void countDown() { sync.releaseShared(1); }
public final boolean releaseShared(int arg) {
//線程同步狀態-1,若是狀態爲0返回true,不然返回false if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } }
相信看代碼已經很清楚了,那麼咱們再來看doReleaseShared()這個方法。
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) {
// 若是等待狀態爲SIGNAL,將head節點的等待狀態改成0,若是成功,喚醒線程 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS }
// 若是執行了unparkSuccessor(h)方法,head節點會變化,這樣下面的h == head就不成立了,就會繼續執行循環 if (h == head) // loop if head changed break; } }
整個CountDownLatch類,我斷斷續續看了幾天纔看完,主要是開始看AQS這個類感受比較繞,就看的不是很認真,比較浮躁。其實若是真正用心,跟着代碼一步一步去走,一步一步去理解也沒那麼難,而後經過debug跟着代碼去走,查看每一步作了什麼,變量,屬性的變化,這樣有助於深入理解代碼。