線程間的同步與通訊(6)——CountDownLatch源碼分析

前言

系列文章目錄 java

CountDownLatch是一個頗有用的工具,latch是門閂的意思,該工具是爲了解決某些操做只能在一組操做所有執行完成後才能執行的情景。例如,小組早上開會,只有等全部人到齊了才能開;再如,遊樂園裏的過山車,一次能夠坐10我的,爲了節約成本,一般是等夠10我的了纔開。CountDown是倒數計數,因此CountDownLatch的用法一般是設定一個大於0的值,該值即表明須要等待的總任務數,每完成一個任務後,將總任務數減一,直到最後該值爲0,說明全部等待的任務都執行完了,「門閂」此時就被打開,後面的任務能夠繼續執行。node

CountDownLatch自己是基於共享鎖實現的,若是你還不瞭解共享鎖,建議先讀一下逐行分析AQS源碼(3)——共享鎖的獲取與釋放,而後再繼續往下看。segmentfault

核心屬性

CountDownLatch主要是經過AQS的共享鎖機制實現的,所以它的核心屬性只有一個sync,它繼承自AQS,同時覆寫了tryAcquireSharedtryReleaseShared,以完成具體的實現共享鎖的獲取與釋放的邏輯。框架

private final Sync sync;
/**
 * Synchronization control For CountDownLatch.
 * Uses AQS state to represent count.
 */
private static final class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 4982264981922014374L;

    Sync(int count) {
        setState(count);
    }

    int getCount() {
        return getState();
    }

    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }

    protected boolean tryReleaseShared(int releases) {
        // Decrement count; signal when transition to zero
        for (;;) {
            int c = getState();
            if (c == 0)
                return false;
            int nextc = c-1;
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }
}

構造函數

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

在構造函數中,咱們就是簡單傳入了一個不小於0的任務數,由上面Sync的構造函數可知,這個任務數就是AQS的state的初始值。函數

核心方法

CountDownLatch最核心的方法只有兩個,一個是countDown方法,每調用一次,就會將當前的count減一,當count值爲0時,就會喚醒全部等待中的線程;另外一個是await方法,它有兩種形式,一種是阻塞式,一種是帶超時機制的形式,該方法用於將當前等待「門閂」開啓的線程掛起,直到count值爲0,這一點很相似於條件隊列,至關於等待的條件就是count值爲0,然而其底層的實現並非用條件隊列,而是共享鎖。工具

countDown()

public void countDown() {
    sync.releaseShared(1);
}

前面說過,countDown()方法的目的就是將count值減一,而且在count值爲0時,喚醒全部等待的線程,它內部調用的實際上是釋放共享鎖的操做:學習

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

該方法由AQS實現,可是tryReleaseShared方法由Sync類本身實現:ui

protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        if (c == 0)
            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

該方法的實現很簡單,就是獲取當前的state值,若是已經爲0了,直接返回false;不然經過CAS操做將state值減一,以後返回的是nextc == 0,因而可知,該方法只有在count值原來不爲0,可是調用後變爲0時,纔會返回true,不然返回false,而且也能夠看出,該方法在返回true以後,後面若是再次調用,仍是會返回false。也就是說,調用該方法只有一種狀況會返回true,那就是state值從大於0變爲0值時,這時也是全部在門閂前的任務都完成了。this

tryReleaseShared返回true之後,將調用doReleaseShared方法喚醒全部等待中的線程,該方法咱們在前面的文章中已經詳細分析過了,這裏就再也不贅述了。spa

值得一提的是,咱們其實並不關心releaseShared的返回值,而只關心tryReleaseShared的返回值,或者只關心count到0了沒有,這裏更像是借了共享鎖的「殼」,來完成咱們的目的,事實上咱們徹底能夠本身設一個全局變量count來實現相同的效果,只不過對這個全局變量的操做也必須使用CAS。

await()

Condition的await()方法的語義相同,該方法是阻塞式地等待,而且是響應中斷的,只不過它不是在等待signal操做,而是在等待count值爲0:

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

可見,await方法內部調用的是acquireSharedInterruptibly方法,至關於借用了獲取共享鎖的「殼」:

public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

咱們來回憶一下獨佔模式下對應的方法:

public final void acquireInterruptibly(int arg) throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}

可見,二者用的是同一個框架,只是這裏:

  • tryAcquire(arg) 換成了 tryAcquireShared(arg) (子類實現)
  • doAcquireInterruptibly(arg) 換成了 doAcquireSharedInterruptibly(arg) (AQS提供)

咱們先來看看Sync子類對於tryAcquireShared的實現:

protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

該方法彷佛有點掛羊頭賣狗肉的感受——所謂的獲取共享鎖,事實上並非什麼搶鎖的行爲,沒有任何CAS操做,它就是判斷當前的state值是否是0,是就返回1,不是就返回-1。

值得注意的是,在逐行分析AQS源碼(3)——共享鎖的獲取與釋放中咱們特別提到過tryAcquireShared返回值的含義:

  • 若是該值小於0,則表明當前線程獲取共享鎖失敗
  • 若是該值大於0,則表明當前線程獲取共享鎖成功,而且接下來其餘線程嘗試獲取共享鎖的行爲極可能成功
  • 若是該值等於0,則表明當前線程獲取共享鎖成功,可是接下來其餘線程嘗試獲取共享鎖的行爲會失敗

因此,當該方法的返回值不小於0時,就說明搶鎖成功,能夠直接退出了,所對應的就是count值已經爲0,全部等待的事件都知足了。不然,咱們調用doAcquireSharedInterruptibly(arg)將當前線程封裝成Node,丟到sync queue中去阻塞等待:

private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

前面咱們介紹共享鎖的獲取時,已經分析過了doAcquireShared方法,只是它是不拋出InterruptedException的,doAcquireSharedInterruptibly(arg)是它的可中斷版本,咱們能夠直接對比一下:
doAcquireShared-vs-doAcquireSharedInterruptibly

可見,它們僅僅是在對待中斷的處理方式上有所不一樣,其餘部分都是同樣的,因爲doAcquireShared前面的文章中咱們已經詳細分析過了,這裏就再也不贅述了。

await(long timeout, TimeUnit unit)

相較於await()方法,await(long timeout, TimeUnit unit)提供了超時等待機制:

public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout);
}
private boolean doAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
            }
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L)
                return false;
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

注意,在tryAcquireSharedNanos方法中,咱們用到了doAcquireSharedNanos的返回值,若是該方法由於超時而退出時,則將返回false。因爲await()方法是阻塞式的,也就是說沒有獲取到鎖是不會退出的,所以它沒有返回值,換句話說,若是它正常返回了,則必定是由於獲取到了鎖而返回; 而await(long timeout, TimeUnit unit)因爲有了超時機制,它是有返回值的,返回值爲true則表示獲取鎖成功,爲false則表示獲取鎖失敗。doAcquireSharedNanos的這個返回值有助於咱們理解該方法到底是由於獲取到了鎖而返回,仍是由於超時時間到了而返回。

至於doAcquireSharedNanos的實現細節,因爲他和doAcquireSharedInterruptibly相比只是多了一個超時機制:
doAcquireSharedInterruptibly-vs-doAcquireSharedNanos

代碼自己很簡單,就不贅述了。

實戰

接下來咱們來學習一個使用CountDownLatch的實際例子,Java的官方源碼已經爲咱們提供了一個使用的示例代碼:

class Driver { // ...
    void main() throws InterruptedException {
        CountDownLatch startSignal = new CountDownLatch(1);
        CountDownLatch doneSignal = new CountDownLatch(N);

        for (int i = 0; i < N; ++i) // create and start threads
            new Thread(new Worker(startSignal, doneSignal)).start();

        doSomethingElse();            // don't let run yet
        startSignal.countDown();      // let all threads proceed
        doSomethingElse();
        doneSignal.await();           // wait for all to finish
    }
}

class Worker implements Runnable {
    private final CountDownLatch startSignal;
    private final CountDownLatch doneSignal;

    Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
        this.startSignal = startSignal;
        this.doneSignal = doneSignal;
    }

    public void run() {
        try {
            startSignal.await();
            doWork();
            doneSignal.countDown();
        } catch (InterruptedException ex) {
        } // return;
    }

    void doWork() { ...}
}

在這個例子中,有兩個「閘門」,一個是CountDownLatch startSignal = new CountDownLatch(1),它開啓後,等待在這個「閘門」上的任務才能開始運行;另外一個「閘門」是CountDownLatch doneSignal = new CountDownLatch(N), 它表示等待N個任務都執行完成後,才能繼續往下。

Worker實現了Runnable接口,表明了要執行的任務,在它的run方法中,咱們先調用了startSignal.await(),等待startSignal這一「閘門」開啓,閘門開啓後,咱們就執行本身的任務,任務完成後再執行doneSignal.countDown(),將等待的總任務數減一。

代碼自己的邏輯很是簡單好懂,這裏不贅述了。

總結

  • CountDownLatch至關於一個「門栓」,一個「閘門」,只有它開啓了,代碼才能繼續往下執行。一般狀況下,若是當前線程須要等其餘線程執行完成後才能執行,咱們就可使用CountDownLatch。
  • 使用CountDownLatch#await方法阻塞等待一個「閘門」的開啓。
  • 使用CountDownLatch#countDown方法減小閘門所等待的任務數。
  • CountDownLatch基於共享鎖實現。
  • CountDownLatch是一次性的,「閘門」開啓後,沒法再重複使用,若是想重複使用,應該用[CyclicBarrier]()

(完)

系列文章目錄

相關文章
相關標籤/搜索