同步工具類—— CountDownLatch

本博客系列是學習併發編程過程當中的記錄總結。因爲文章比較多,寫的時間也比較散,因此我整理了個目錄貼(傳送門),方便查閱。html

併發編程系列博客傳送門java

CountDownLatch簡介

CountDownLatch是JDK併發包中提供的一個同步工具類。官方文檔對這個同步工具的介紹是:node

A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.編程

上面的英文介紹大體意思是:CountDownLatch的主要功能是讓一個或者多個線程等待直到一組在其餘線程中執行的操做完成。併發

使用列子

觀看上面的解釋可能並不能直觀的說明CountDownLatch的做用,下面咱們經過一個簡單的列子看下CountDownLatch的使用。ide

場景:主人(主線程)請客人(子線程)吃晚飯,須要等待全部客人都到了以後纔開飯。咱們用CountDownLatch來模擬下這個場景。函數

public class CountDownLatchDemo {

    private static final int PERSON_COUNT = 5;

    private static final CountDownLatch c = new CountDownLatch(PERSON_COUNT);

    public static void main(String[] args) throws InterruptedException {
        System.out.println("l am master, waiting guests...");
        for (int i = 0; i < PERSON_COUNT; i++) {
            int finalI = i;
            new Thread(new Runnable() {
                @SneakyThrows
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName()+" l am person["+ finalI +"]");
                    TimeUnit.MILLISECONDS.sleep(500);
                    //System.out.println(Thread.currentThread().getName()+" count:"+c.getCount());
                    c.countDown();
                }
            }).start();
        }
        c.await();
        System.out.println("all guests get, begin dinner...");
    }

}

上面的列子中,主人(master線程)請了5個客人吃飯,每一個客人到了以後會將CountDownLatch的值減一,主人(master)會一直等待全部客人到來,最後輸出」開飯「。工具

CountDownLatch的使用方式很簡單,下面來看下它的實現原理。學習

原理剖析

首先咱們先看下CountDownLatch重要的APIui

- getCount():獲取當前count的值。
- wait():讓當前線程在此CountDownLatch對象上等待,能夠中斷。與notify()、notifyAll()方法對應。
- await():讓當前線程等待此CountDownLatch對象的count變爲0,能夠中斷。
- await(timeout,TimeUnit):讓當前線程等待此CountDownLatch對象的count變爲0,能夠超時、能夠中斷。
- countDown():使此CountDownLatch對象的count值減1(不管執行多少次,count最小值爲0)。

下面咱們看下具體API的源代碼

構造函數

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

在構建CountDownLatch對象時須要傳入一個int型的初始值,這個值就是計數器的初始值。從上面的代碼中能夠看出,建立CountDownLatch是new了一個Sync對象。

private static final class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 4982264981922014374L;

    Sync(int count) {
        setState(count);
    }

    int getCount() {
        return getState();
    }

    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }

    protected boolean tryReleaseShared(int releases) {
        // Decrement count; signal when transition to zero
        for (;;) {
            int c = getState();
            if (c == 0)
                return false;
            int nextc = c-1;
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }
}

Sync對象是基於AQS機制實現的,本身實現了tryAcquireSharedtryReleaseShared方法。

await方法

public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

調用await方法實際上是調用了AQS的acquireSharedInterruptibly方法。

public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

acquireSharedInterruptibly中先判斷了下當前線程有沒有被中斷,假如線程已經被中斷了,直接拋出中斷異常。不然進入doAcquireSharedInterruptibly

private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

doAcquireSharedInterruptibly的處理邏輯是先判斷隊列中是否只有當前線程,若是隻有當前線程的先嚐試獲取下資源,若是獲取資源成功就直接返回了。獲取資源不成功就判斷下是否要park當前線程,若是須要park當前線程,
那麼當前線程就進入waiting狀態。不然在for循環中一直執行上面的邏輯。

countDown方法

public void countDown() {
    sync.releaseShared(1);
}

熟悉AQS機制的會知道上面的代碼其實也是調的AQS的releaseSharedreleaseShared的方法會調到Sync中的tryReleaseShared

protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        if (c == 0)
            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

上面的代碼邏輯很簡單:status的值是0的話就返回true,不然返回false。返回true的話,就會喚醒AQS隊列中全部阻塞的線程。

使用場景

  • 場景一:將任務分割成多個子任務,每一個子任務由單個線程去完成,等全部線程完成後再將結果彙總。(MapReduce)這種場景下,CountDoenLatch做爲一個完成信號來使用。
  • 場景二:多個線程等到,一直等到某個條件發生。好比多個賽跑運動員都作好了準備,就等待裁判手中的發令槍響。這種場景下,就能夠將CountdownLatch的初始值設置成1。

簡單總結

  • CountDownLatch的初始值不能重置,只能減小不能增長,最多減小到0;
  • CountDownLatch計數值沒減小到0以前,調用await方法可能會讓調用線程進組一個阻塞隊列,等待計數值減少到0;
  • 調用countDown方法會讓計數值每次都減少1,可是最多減小到0。當CountDownLatch的計數值減小到0的時候,會喚醒全部在阻塞隊列中的線程。
相關文章
相關標籤/搜索