通俗易懂的JUC源碼剖析-CountDownLatch

前言

在實際開發中,有時會遇到這樣的場景:主任務須要等待若干子任務完成後,再進行後續的操做。這時能夠用join或者本文的CountDownLatch實現。它們的區別在於CountDownLatch更加靈活。好比,子任務的工做分爲兩個階段,主任務只需子任務完成第一個階段便可開始主任務,無需等第二個階段完成。這種場景join就沒法作到,CountDownLatch就能夠實現。下面是實例代碼。java

import java.util.concurrent.CountDownLatch;
public class CountDownLatchDemo {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(2);
        Worker worker1 = new Worker("worker1", countDownLatch);
        Worker worker2 = new Worker("worker2", countDownLatch);
        worker1.start();
        worker2.start();
        System.out.println("main task wait for work1 and work2 finish their stage 1");
        countDownLatch.await();
        System.out.println("main task begin to work");
        Thread.sleep(3000);
        System.out.println("main task finished");
    }
    
    static class Worker extends Thread {
        private final CountDownLatch count;
        public Worker(String name, CountDownLatch count) {
            super.setName(name);
            this.count = count;
        }
        @Override
        public void run() {
            try {
               Thread.sleep(5000);
               System.out.println(Thread.currentThread().getName() + " stage 1 finished");
               count.countDown();
               Thread.sleep(5000);
               System.out.println(Thread.currentThread().getName() + " stage 2 finished");
            } catch (InterruptedException e) {
                // ignore
            }
        }
    }
}

運行結果以下:
image.png
主線程等待work1和work2完成它們的第一個階段任務後,就開始工做,無需等待第二個階段也完成。而join只能等待子線程整個run()執行完畢才能日後執行,所以CountDownLatch更加靈活。編程

實現原理

從CountDownLatch的命名可猜想,它內部應該用了一個計數器,每當子線程調用countDown()方法時,計數器就減1,減到0時,主線程就會從調用await()阻塞處甦醒返回。併發

先來看看構造方法:ide

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

其中Sync是它的內部類,實現了AQS接口。ui

private static final class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 4982264981922014374L;
    Sync(int count) {
        setState(count);
    }
    int getCount() {
        return getState();
    }
    protected int tryAcquireShared(int acquires) {
        // 計數器爲0,則獲取鎖成功,能夠從await()返回
        // 不然須要等待
        return (getState() == 0) ? 1 : -1;
    }
    protected boolean tryReleaseShared(int releases) {
        // Decrement count; signal when transition to zero
        for (;;) {
            int c = getState();
            if (c == 0)
                return false;
            // 計數器減1
            int nextc = c-1;
            if (compareAndSetState(c, nextc))
                // 減到0時會unpark喚醒阻塞在await()的線程
                return nextc == 0;
        }
    }
}

能夠看到,它是一個共享鎖實現,多個線程經過Sync來同步計數器count的值。this

再來看經常使用的await()和countDown()方法:spa

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

await()調用的是AQS中的模板方法:線程

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    // 調用子類Sync的tryAcquireShared方法,若是共享式獲取鎖失敗,doAcquireSharedInterruptibly裏面會讓當前線程在隊列裏阻塞等待獲取鎖。
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}
public void countDown() {
    sync.releaseShared(1);
}

countDown調用的也是AQS中的模板方法:code

public final boolean releaseShared(int arg) {
    // 調用子類Sync的tryReleaseShared()共享式地釋放鎖,
    // 計數器減爲0時,doReleaseShared裏面會喚醒等待在await()方法處的線程。
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true; 
    }
    return false;
}

參考資料:
《Java併發編程之美》blog

相關文章
相關標籤/搜索