多線程-AQS-CountDownLatch

介紹:

CountDownLatch--發令槍
        Java1.5以後引入的Java併發工具類,談到CountDownLatch須要先介紹一個概念:閉鎖/門栓[latch]
         latch:一種同步方法,能夠延遲線程的進度直到線程到達某個終點狀態。閉鎖的狀態是一次性的,它確保在閉鎖打開以前全部特定的活動都須要在閉鎖打開以後才能完成。java

        如若掌握了AQS的實現原理,這裏的理解將會更加的水到渠成併發

==========================分割線==========================ide

應用場景
        A:如同賽跑,必須等待發令槍響後runner才能起跑同樣,在CountDownLatch的計數器歸零前,全部引用CountDownLatch閉鎖的線程都必須阻塞。總結:準備好了纔開始


        B:如同購物,只有全部商品都肯定購買了纔好結帳,在全部任務執行完[並進行countDown()]直到歸零前,當前任務必須阻塞。總結:準備好了才結束

應用A的樣例[準備好了纔開始]:工具

import java.util.concurrent.CountDownLatch;

/**
 * Created by ysma on 2018/6/7.
 */
public class Test {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch cdl = new CountDownLatch(1);
        threadX driverA = new threadX(cdl);
        threadX driverB = new threadX(cdl);
        new Thread(driverA).start();
        new Thread(driverB).start();
        System.out.println("===開始==time:"+System.currentTimeMillis()+"==count:"+cdl.getCount());
        cdl.countDown();
        Thread.sleep(10);
        System.out.println("========結束==time:"+System.currentTimeMillis()+"==count:"+cdl.getCount());

    }

    static class threadX implements Runnable {
        private CountDownLatch cdl;
        threadX(CountDownLatch cdl){
            this.cdl = cdl;
        }

        @Override
        public void run() {
            try {
                cdl.await();
                System.out.println(Thread.currentThread().getName()
                        +":running.... now:"
                + System.currentTimeMillis());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
    }
}

=============執行結果==========
===開始==time:1555310552568==count:1
Thread-0:running.... now:1555310552569
Thread-1:running.... now:1555310552569
========結束==time:1555310552579==count:0

應用B的樣例[準備好了才結束]:ui

/**
 * Created by ysma on 2018/6/7.
 */
public class Test {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch cdl = new CountDownLatch(2);
        threadX driverA = new threadX(cdl);
        threadX driverB = new threadX(cdl);
        new Thread(driverA).start();
        new Thread(driverB).start();
        System.out.println("===開始==time:"+System.currentTimeMillis()+"==count:"+cdl.getCount());
        cdl.await();
        System.out.println("========結束==time:"+System.currentTimeMillis()+"==count:"+cdl.getCount());

    }

    static class threadX implements Runnable {
        private CountDownLatch cdl;
        threadX(CountDownLatch cdl){
            this.cdl = cdl;
        }

        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName()
                        + ":running.... now:"
                        + System.currentTimeMillis());
                Thread.sleep(10);
                cdl.countDown();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
===========執行結果===========
Thread-0:running.... now:1555310728080
Thread-1:running.... now:1555310728080
===開始==time:1555310728080==count:2
========結束==time:1555310728090==count:0

============================分割線==================this

tips

       A:latch.countDown(); 建議放到finally語句裏。
       B:對這個計數器的操做都是原子操做,同時只能有一個線程去操做這個計數器。
       C:常與CyclicBarrier伴生討論,將在後續章節進行講述。
       D:主要功能簡述-
            public CountDownLatch(int count); //指定計數的次數,只能被設置1次[閉鎖特性]
            public void countDown();          //計數器減1
            /**await(...)會一直阻塞當前線程,直到計時器的值爲0,除非線程被中斷*/
            public void await() throws InterruptedException   
            Public boolean await(long timeout, TimeUnit unit) //and 返回false表明計數器超時。
           E:使用了AQS的狀態來表明計數count,因此CountDownLatch是基於AQS的實現不是CAS哦
spa

==================分割線=====================線程

源碼解讀及註釋

    

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;

public class CountDownLatch {
    /**
     * Synchronization control For CountDownLatch.
     * Uses AQS state to represent count.
     */
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }

        /**AQS共享模式加鎖 就必須實現tryAcquireShared和tryReleaseShared*/
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;//計數爲0時就能夠結束並解除阻塞了
        }

        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {//循環執行直到cas減一成立或者計數器歸零
                int c = getState();
                if (c == 0)//計數器歸零 退出
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))//cas減一
                    return nextc == 0;
            }
        }
    }

    private final Sync sync;

    /**Constructs a {@code CountDownLatch} initialized with the given count.*/
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

    /**當前線程阻塞知道計數歸零或者線程interrupted
     * 最終會由AbstractQueuedSynchronizer中的parkAndCheckInterrupt方法實現阻塞
     * ....LockSupport.park(this);
     * */
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    /**同上,增長了超時,timeout納秒內計數沒有歸零返回false*/
    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

    /**計數器減1*/
    public void countDown() {
        sync.releaseShared(1);
    }
}
相關文章
相關標籤/搜索