JUC之CountDownLatch源碼分析

CountDownLatch是AbstractQueuedSynchronizer中共享鎖模式的一個的實現,是一個同步工具類,用來協調多個線程之間的同步。CountDownLatch可以使一個或多個線程在等待另一些線程完成各自工做以後,再繼續執行。CountDownLatch內部使用一個計數器進行實現線程通知條件,計數器初始值爲進行通知線程的數量。當每個通知線程完成本身任務後,計數器的值就會減一。當計數器的值爲0時,表示全部的通知線程都已經完成一些任務,而後在CountDownLatch上全部等待的線程就能夠恢復執行接下來的任務。基本上CountDownLatch的原理就是這樣,下面咱們一塊兒去看看源碼。html

public class CountDownLatch {
    
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setState(count);
        }
        ....
    }

    private final Sync sync;

    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }
    ....
}

從上面簡略的源碼能夠看出,CountDownLatch和ReentrantLock同樣,在內部聲明瞭一個繼承AbstractQueuedSynchronizer的Sync內部類(重寫了父類的tryAcquireShared(int acquires)和tryReleaseShared(int releases)),並在聲明瞭一個sync屬性。CountDownLatch只有一個有參構造器,參數count就是上面說的的進行通知的線程數目,說白點也就是countDown()方法須要被調用的次數。node

CountDownLatch的主要方法是wait()和countDown(),咱們先看wait()方法源碼。多線程

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

和ReentrantLock同樣,CountDownLatch依然算是一件外衣,實際仍是靠sync進行操做。咱們接着看AQS的acquireSharedInterruptibly(int arg)方法(實際上參數在CountDownLatch裏沒什麼用)工具

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

看到先判斷當前線程是不是中斷狀態,而後調用子類重寫的tryAcquireShared(int acquires)方法去判斷是否須要進行阻塞(也便是嘗試獲取鎖),若是返回值小於0 ,就調用doAcquireSharedInterruptibly(int acquires)方法進行線程阻塞。先看tryAcquireShared(int acquires)方法oop

protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

源碼很簡單,就是看下state是否等於0,等於0,就返回1,表明不須要線程阻塞,不等於0(實際上state只會大於或者等於0),就返回-1,表示須要進行線程阻塞。這裏有個伏筆就是若是CountDownLatch的計數器state被減至0時,後續再有線程調用CountDownLatch的wait()方法時,會直接往下執行調用者方法的代碼,不會形成線程阻塞源碼分析

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

在doAcquireSharedInterruptibly(int acquires)方法中進行線程阻塞的步驟依然是先調用addWaiter(Node mode)方法將該線程封裝到AQS內部的CLH隊列的Node.SHARE(共享)模式的Node節點,並放入到隊尾,而後在循環中去嘗試持有鎖和進行線程阻塞。在循環體內,先獲取到前任隊尾,而後判斷前任隊尾是不是隊首head,若是是就調用tryAcquireShared(int acquires)嘗試獲取鎖,若是返回1表示獲取到了鎖,就調用setHeadAndPropagate(Node node, int propagate)方法將節點設置head而後再往下傳播,解除後續節點的線程阻塞狀態(這就是共享鎖的核心)。若是返回-1,表示沒有獲取到鎖,就調用shouldParkAfterFailedAcquire(Node pre, Node node)進行pre節點的waitStatus賦值爲Node.SIGNAL,而後在墨跡一次循環,調用parkAndCheckInterrupt()方法進行線程阻塞。咱們先看setHeadAndPropagate(Node node, int propagate)方法源碼ui

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);
    
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            // 釋放共享鎖,這是這一步是最關鍵的一步
            doReleaseShared();
    }
}

在setHeadAndPropagate(Node node, int propagate)方法中,直接將node設置從head,由於參數propagate爲始終爲1(到這一步就表示獲取了共享鎖,state等於0,在tryAcquireShared(int acquires)方法中就只會返回1),因此也就是後面直接獲取head的next節點,若是head的next節點存在,而且是共享模式,就調用doReleaseShared()方法去釋放CLH中head的next節點。this

shouldParkAfterFailedAcquire(Node pre, Node node)和parkAndCheckInterrupt()兩個方法在JUC之ReentrantLock源碼分析一篇博客中已經講過了,這裏再也不贅述了。spa

doReleaseShared()這個方法其實也是countDown()方法的核心實現,這裏先賣個關子,後面會講到。咱們接着看doReleaseShared()方法。線程

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

 當調用wait()方法進行線程阻塞等待被別的線程解除阻塞後,對於AQS中共享鎖最核心的代碼就是doReleaseShared()這個方法。先獲取head節點,若是head節點存在而且有後續節點,就先判斷head節點的狀態,若是狀態是Node.SIGNAL(表示後續節點須要鎖釋放通知),將head節點狀態改成0,而後解除下一節點的線程阻塞狀態,而後判斷下以前獲取的head和如今的head是否仍是同一個,若是是就結束循環,若是不是,那就接着循環。其實就算是存在線程在執行完unparkSuccessor(Node node)方法後失去了CPU的執行權,一直到被解除線程阻塞的next節點坐上了head節點後纔有機會獲取到CPU執行權這種狀況,無非就是以前獲取到head和如今的head不相同了,大不了再循環一次,也算是多線程去解除當前head節點的next節點線程阻塞,影響不大;若是狀態是0,嘗試將狀態有0改成Node.PROPAGATE,而後重複循環,head節點是0的這種狀態在CountDownLatch中應該是不會出現的,多是AQS對別的類的兼容,也多是我眼拙沒看出來。若是head爲null或者head與tail相同,就結束循環。

到這裏CountDownLatch的wait()方法就分析完成了,這裏總結下wait()方法流程:
  一、若是state是0,直接往下執行調用者的代碼,不會進行線程等待阻塞
  二、將當前線程封裝到共享模式的Node節點中,而後放入到CLH隊列的隊尾
  三、將前任隊尾的waitStatus改變爲Node.SIGNAL,而後調用LockSupport.park()阻塞當前線程,等待前節點喚醒
  四、被前節點喚醒後,把本身設爲head,而後去喚醒next節點

咱們看完了wait()方法,如今在來看下countDown()方法的源碼

public void countDown() {
    sync.releaseShared(1);
}

一如既往的簡單,直接調用AQS的releaseShared(int arg)方法,咱們直接去看AQS的releaseShared(int arg)方法

AQS方法
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

CountDownLatch方法
protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        if (c == 0)
            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

在AQS的releaseShared(int arg)中先去調用必定要被子類重寫的tryReleaseShared(int releases)方法,返回值表示是否須要進行喚醒操做,若是返回true,那就調用doReleaseShared()方法去解除head節點的next節點線程阻塞狀態。是的,你沒看錯,就是咱們前面分析的doReleaseShared()方法,他們複用了同一個方法。而在CountDownLatch的tryReleaseShared(int releases)方法實現也很是簡單,就是判斷下當前state是不是0,若是是0,表示已經減完了,不須要再減了,等待線程已經在被依次喚醒了,返回false表示不須要去喚醒後續節點了。最後再看看減完後的state是不是等於0,等於0,表示須要去解除後續節點的阻塞狀態;不等於0(實際上是大於0),表示調用countDown()方法去減state的次數還不夠,暫時還不能解除後續節點的阻塞狀態。

countDown()方法比較簡單,咱們也總結下countDown()流程:
  一、若是state爲0,表示已經有線程在解除CLH隊列節點的阻塞狀態了,這裏直接結束
  二、若是state減去1後不等於0,表示還要等有線程再次調用countDown()方法進行state減1,這裏直接結束
  三、若是state減去1後等於0,表示已經線程調用countDown()方法的次數已經達到最初設定的次數,而後去解除CLH隊列上節點的阻塞狀態

相關文章
相關標籤/搜索