CountDownLatch源碼解析

一 概述
   二 總覽
   三 await方法
   四 countDown方法
   五 應用
   六 總結
複製代碼

一 概述

CountDownLatch是一個同步計數工具類,它能夠用於控制一個或多個線程等待其餘線程任務完成。初始一個計數count後,每當線程完成任務則調用countDown方法使計數count減1,當調用await方法時則會阻塞等待count爲0(也就是全部線程任務完成)後纔會取消阻塞繼續執行。java

二 總覽

public class CountDownLatch {
    // 繼承AQS的內部同步控制器
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;
        // 初始計數值,複用的AQS中的state屬性
        Sync(int count) { setState(count); }
        // 獲取計數器
        int getCount() { return getState(); }
        // 初始嘗試獲取共享鎖方法
        protected int tryAcquireShared(int acquires) { …… }
        // 釋放共享鎖方法
        protected boolean tryReleaseShared(int releases) { …… }
    }
    // 內部同步器屬性
    private final Sync sync;
    // 構造方法
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }
    // 響應中斷的阻塞等待全部任務完成的方法
    public void await() throws InterruptedException {……}
    // 響應中斷且超時結束的阻塞等待全部任務完成的方法
    public boolean await(long timeout, TimeUnit unit) throws InterruptedException { …… }
    // 完成一個任務,計數器減1,當計數爲0時則從AQS隊列中喚醒全部等待的線程
    public void countDown() { …… }
    // 獲取當前計數值
    public long getCount() { …… }
    // 重寫的toString
    public String toString() { …… }
}
複製代碼

總的來看CountDownLatch的內部構造很簡單,內部維護了一個繼承於AQS的同步控制器sync,使用AQS中的state屬性做爲計數值,並提供瞭如下幾種操做方法:node

方法 說明
await() 調用該方法的線程阻塞等待全部線程任務完成後返回,支持響應中斷
await(long timeout, TimeUnit unit) 調用該方法的線程阻塞等待全部線程任務完成後返回,支持響應中斷及超時後直接返回
countDown() 完成一個任務,計數器減1,當計數爲0時則從AQS隊列中喚醒全部等待的線程
getCount() 獲取當前計數值
toString() 重寫的toString,輸出當前計數器值

三 await方法

主方法體微信

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
複製代碼

實際調用的是AQS中的acquireSharedInterruptibly方法:工具

public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}
複製代碼

先嚐試一次檢查當前計數值是否爲0,爲0則說明全部任務都完成了,則直接返回成功oop

protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}
複製代碼

不然建立同步等待節點並加入到AQS同步等待隊列中進行阻塞等待:ui

private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
    // 在同步隊列中增長等待節點
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            // 獲取當前節點的前驅節點
            final Node p = node.predecessor();
            // 若是前驅節點爲head節點,表示當前節點是同步等待隊列中的第一個,故繼續嘗試一次獲取鎖
            if (p == head) {
                // 嘗試獲取令牌,此時會跳轉到semaphore中(由於重寫了該方法)
                int r = tryAcquireShared(arg);
                // 返回大於0則表示成功獲取到令牌了
                if (r >= 0) {
                    // 將當前節點設爲head節點
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            // 自旋幾回後爲避免強佔CPU,則對該線程進行休眠處理
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        // 因中斷請求則取消排隊請求
        if (failed)
            cancelAcquire(node);
    }
}
複製代碼

四 countDown方法

主方法體this

public void countDown() {
    sync.releaseShared(1);
}
複製代碼

調用AQS中的releaseShared方法:spa

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}
複製代碼

先對計數器進行減1操做,採用AQS方式進行,當計數器爲0則返回true;線程

protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        if (c == 0)
            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}
複製代碼

計數器返回0則須要去喚醒在AQS同步隊列中休眠的線程code

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}
複製代碼

五 應用

countDownLatch經常使用於一個線程須要等待其餘幾個線程的結果才執行的場景,這個場景與FutureTask中的get方法阻塞等待任務返回結果相似。簡單demo以下:

CountDownLatch countDownLatch = new CountDownLatch(10);
try {
    for (int i = 0; i < 10; i++){
        new Thread(() -> {
            try {
                // 執行任務
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                // 任務執行完後釋放
                countDownLatch.countDown();
            }
        });
    }
    // 等全部的線程跑完,並設置20分鐘超時
    countDownLatch.await(20, TimeUnit.MINUTES);
    // 繼續執行其餘業務
}catch (Exception e){
    e.printStackTrace();
}
複製代碼

六 總結

CountDownLatch內部構造比較簡單,基本都是徹底複用的AQS的功能來實現的,好比採用state屬性做爲計數值,依賴計數器值個數任務完成的線程會加入到AQS同步等待隊列中等待。每一個持有計數值的線程任務完成後對計數器減1,減爲0的時候去同步隊列中喚醒等待的線程。


更多原創文章請關注微信公衆號
👇👇👇
嘮吧嗑吧

相關文章
相關標籤/搜索