j.u.c系列(08)---之併發工具類:CountDownLatch

寫在前面

  CountDownLatch所描述的是」在完成一組正在其餘線程中執行的操做以前,它容許一個或多個線程一直等待「:用給定的計數 初始化 CountDownLatch。因爲調用了 countDown() 方法,因此在當前計數到達零以前,await 方法會一直受阻塞。以後,會釋放全部等待的線程,await 的全部後續調用都將當即返回。CountDownLatch的本質也是一個"共享鎖"node

\併發

CountDownLatch(int count)
構造一個用給定計數初始化的 CountDownLatch。

// 使當前線程在鎖存器倒計數至零以前一直等待,除非線程被中斷。
void await()
// 使當前線程在鎖存器倒計數至零以前一直等待,除非線程被中斷或超出了指定的等待時間。
boolean await(long timeout, TimeUnit unit)
// 遞減鎖存器的計數,若是計數到達零,則釋放全部等待的線程。
void countDown()
// 返回當前計數。
long getCount()
// 返回標識此鎖存器及其狀態的字符串。
String toString()

  CountDownLatch是經過一個計數器來實現的,當咱們在new 一個CountDownLatch對象的時候須要帶入該計數器值,該值就表示了線程的數量。每當一個線程完成本身的任務後,計數器的值就會減1。當計數器的值變爲0時,就表示全部的線程均已經完成了任務,而後就能夠恢復等待的線程繼續執行了。ide

  雖然,CountDownlatch與CyclicBarrier(後續會接受。另一併發工具類)區別:工具

  1. CountDownLatch的做用是容許1或N個線程等待其餘線程完成執行;而CyclicBarrier則是容許N個線程相互等待
  1. CountDownLatch的計數器沒法被重置;CyclicBarrier的計數器能夠被重置後使用,所以它被稱爲是循環的barrier

 

實現分析

  經過上面的結構圖咱們能夠看到,CountDownLatch內部依賴Sync實現,而Sync繼承AQS。CountDownLatch僅提供了一個構造方法:ui

  CountDownLatch(int count) : 構造一個用給定計數初始化的 CountDownLatchthis

 

   public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

  sync爲CountDownLatch的一個內部類,其定義以下:spa

 private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setState(count);
        }

        //獲取同步狀態
        int getCount() {
            return getState();
        }

        //獲取同步狀態
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

        //釋放同步狀態
        protected boolean tryReleaseShared(int releases) {
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }

 

   經過這個內部類Sync咱們能夠清楚地看到CountDownLatch是採用共享鎖來實現的。線程

  CountDownLatch提供await()方法來使當前線程在鎖存器倒計數至零以前一直等待,除非線程被中斷,定義以下:code

public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

 

   await其內部使用AQS的acquireSharedInterruptibly(int arg):對象

    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

 

   在內部類Sync中重寫了tryAcquireShared(int arg)方法:

 protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

  getState()獲取同步狀態,其值等於計數器的值,從這裏咱們能夠看到若是計數器值不等於0,則會調用doAcquireSharedInterruptibly(int arg),該方法爲一個自旋方法會嘗試一直去獲取同步狀態:

private void doAcquireSharedInterruptibly(int arg)
            throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    /**
                     * 對於CountDownLatch而言,若是計數器值不等於0,那麼r 會一直小於0
                     */
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                //等待
                if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

 

   CountDownLatch提供countDown() 方法遞減鎖存器的計數,若是計數到達零,則釋放全部等待的線程。

public void countDown() {
        sync.releaseShared(1);
    }

 

   內部調用AQS的releaseShared(int arg)方法來釋放共享鎖同步狀態:

 public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

 

   tryReleaseShared(int arg)方法被CountDownLatch的內部類Sync重寫:

protected boolean tryReleaseShared(int releases) {
        for (;;) {
            //獲取鎖狀態
            int c = getState();
            //c == 0 直接返回,釋放鎖成功
            if (c == 0)
                return false;
            //計算新「鎖計數器」
            int nextc = c-1;
            //更新鎖狀態(計數器)
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }

 

 

總結

  CountDownLatch內部經過共享鎖實現。在建立CountDownLatch實例時,須要傳遞一個int型的參數:count,該參數爲計數器的初始值,也能夠理解爲該共享鎖能夠獲取的總次數。當某個線程調用await()方法,程序首先判斷count的值是否爲0,若是不會0的話則會一直等待直到爲0爲止。當其餘線程調用countDown()方法時,則執行釋放共享鎖狀態,使count值 - 1。當在建立CountDownLatch時初始化的count參數,必需要有count線程調用countDown方法纔會使計數器count等於0,鎖纔會釋放,前面等待的線程纔會繼續運行。注意CountDownLatch不能回滾重置。

應用示例

示例仍然使用開會案例。老闆進入會議室等待5我的所有到達會議室纔會開會。因此這裏有兩個線程老闆等待開會線程、員工到達會議室:

public class CountDownLatchTest {
    private volatile static CountDownLatch countDownLatch = new CountDownLatch(5);

    /**
     * Boss線程,等待員工到達開會
     */
    static class BossThread extends Thread{
        
        BossThread(String name){
            super(name);
        }
        
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + ":Boss在會議室等待,總共有" + countDownLatch.getCount() + "我的開會...");
            try {
                //Boss等待
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            System.out.println(Thread.currentThread().getName() + ":全部人都已經到齊了,開會吧...");
        }
    }

    //員工到達會議室
    static class EmpleoyeeThread  extends Thread{
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + ",到達會議室....");
            //員工到達會議室 count - 1
            countDownLatch.countDown();
        }
    }
    
    public static void main(String[] args) throws InterruptedException{
        //Boss線程啓動
        new BossThread("張總").start();
        new BossThread("李總").start();
        new BossThread("王總").start();
        Thread.sleep(1000);
        for(int i = 0 ; i < 5 ; i++){
            new EmpleoyeeThread().start();
        }
    }
}
張總:Boss在會議室等待,總共有5我的開會...
李總:Boss在會議室等待,總共有5我的開會...
王總:Boss在會議室等待,總共有5我的開會...
Thread-0,到達會議室....
Thread-1,到達會議室....
Thread-2,到達會議室....
Thread-3,到達會議室....
Thread-4,到達會議室....
張總:全部人都已經到齊了,開會吧...
王總:全部人都已經到齊了,開會吧...
李總:全部人都已經到齊了,開會吧...
相關文章
相關標籤/搜索