從源碼看JDK8併發工具類CountDownLatch的實現原理

CountDownLatch,是幾個重要的併發編程工具類之一,字面意思就是門鎖的意思,內部會維護一個計數器的常量,這個常量表明執行的線程數。java

在多線程協做完成業務功能時,有時候須要等待其餘多個線程完成任務以後,主線程才能繼續往下執行業務功能,在這種的業務場景下,一般可使用Thread類的join方法,讓主線程等待被join的線程執行完以後,主線程才能繼續往下執行。固然,使用線程間消息通訊機制也能夠完成。其實,java併發工具類中爲咱們提供了相似「倒計時」(CountDownLatch)這樣的工具類,能夠十分方便的完成所說的這種業務場景。編程

CountDownLatch容許一個或多個線程等待其餘線程完成操做,調用await()方法的線程回去判斷count的值來判斷是否會被掛起,它會等待直到count值爲0纔會繼續執行。控制檯輸出count=0最後輸出,這個時候就看cpu切換到哪一個線程上執行了,在初始化的時候咱們會設置好count的值,當每調用一次countDown()方法,會使count的值減一也就是將AQS維護同步狀態的state值減一。安全

在咱們閱讀源碼以前,若是你看過AQS源碼(www.jianshu.com/p/e0066f934…)與跟可重入鎖(www.jianshu.com/p/5d57573b0…)相關的內容,你會更加對CountDownLatch自己是如何實現的以及他的本質有一個更透徹的理解。bash

說說個人理解以前看看他的類多線程

能夠看到他一樣運用了一個繼承了AQS同步器的靜態內部類來重寫父類AQS裏面的一些方法而後再調用該父類裏面的獲取鎖的方法來實現具體的功能。

來看看構造方法中:併發

//設置初始化count的值,並傳遞給Sync類
public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }
複製代碼

Sync類的源碼以下:CountDownLatch的實現依賴於AQSdom

先介紹下兩個方法 countDown()每執行一次該方法,也就是將由AQS維護的同步狀態值state值減1,其通常是執行任務的線程調用。 調用countDown()釋放同步狀態,每次調用同步狀態值-1。ide

public void countDown() {
        sync.releaseShared(1);
    }

//父類AQS中
public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {   //若是釋放同步狀態線程成功,若是返回false,則表示,獲取失敗同步狀態。
  //返回flase,以CountDownLatch的實現角度來說,此時還要等待N(N>0)個線程,由於state還沒減到等於0,若是返回true,表示此時已經執行N次了,此時state已經減到0了,這時候會執行doReleaseShared(),表示釋放其餘處於等待的節點。
            doReleaseShared();   //喚醒後續處於等待的節點,看下面具體的解釋。
            return true;
        }
        return false;
    }

//在CountDownLatch的靜態內部工具類Sync繼承了AQS重寫的tryReleaseShared
protected boolean tryReleaseShared(int releases) {
            // 自旋
            for (;;) {
                int c = getState();  //獲取AQS維護的state值
                if (c == 0)   //若是爲0,表示沒有一個線程在運行返回false
                    return false;
                int nextc = c-1;     //若是不等於0,這裏確定會>0的,因此減去1
                if (compareAndSetState(c, nextc))  //CAS去直接修改內存地址的偏移量去修改值,保證線程安全。
                 return nextc == 0;       //重點來了。這裏的意思是若是共享式獲取同步狀態後,state還不是爲0,則獲取失敗。返回false
            }
        }
複製代碼

下面這個類,在個人這篇文章也解析過了。 工具

await(),當執行該方法是,內部會檢查那個計數常量的值,若是不等於0,就會進入等待(waiting)狀態,直到執行了countDown使內部的值減到0的時候,就會恢復線程,同時執行,咱們來看看實現:測試

public void await() throws InterruptedException {
        //共享式獲取
        sync.acquireSharedInterruptibly(1);
    }

public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())  //響應中斷
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)   
      //tryAcquireShared(arg) 返回1,此時state=0不阻塞,返回的是-1,執行doAcquireSharedInterruptibly(arg);
            doAcquireSharedInterruptibly(arg);
    }

protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;   
        }
複製代碼

這裏須要解釋下doAcquireSharedInterruptibly的主要做用:一、將當前線程構形成共享模式的節點,經過自旋的方式嘗試獲取同步狀態二、若是獲取同步狀態成功,則喚醒後續處於共享模式的節點;若是沒有獲取到同步狀態,則對調用shouldParkAfterFailedAcquire(Node, Node)和parkAndCheckInterrupt()方法掛起當前線程,這樣能夠避免該線程無限循環而獲取不到共享鎖,從而形成資源浪費。這裏須要注意的是:當有多個線程調用await()方法時,這些線程都會經過addWaiter(Node.SHARED)方法被構形成節點加入到等待隊列中。當最後一個調用countDown()方法的線程執行了countDown()後(這裏有點拗口),會喚醒處於等待隊列中距離頭節點最近的一個節點,也就是說該線程被喚醒以後會繼續自旋嘗試獲取同步狀態,此時執行到tryAcquireShared(int)方法時,發現r大於0(由於state已經被置爲0了),該線程就會調用setHeadAndPropagate(Node, int)方法將喚醒傳遞下去,而且退出當前循環,開始執行awat()方法以後的代碼。

而後說說CountDownLatch的兩種用法:

1.能夠設置new CountDownLatch(1); 若是須要控制多個線程同時開始執行的時候,能夠每一個線程剛開始執行run的時候,先執行await, 進入等待狀態。當最後全部線程都準備好了,就調用countDown,減一,這時全部線程就會主動同時開始執行。 2.假設能夠設置new CountDownLatch(10),這時有10個線程,咱們須要作的是等10個線程,依次執行countDown(),等到全部線程都執行好了,這時候再執行await。全部線程都準備就緒了。

await(long timeout,TimeUtil unit) 做用使線程在指定的最大時間內,處於await狀態,超過這個時間就會自動喚醒了。 getCount()
可以獲取當前計數的值。

下面舉一個實現的例子:

默認10個運動員進行跑步比賽的全過程:

public class MyThread extends Thread{

    /**等待運動員到來*/
    private CountDownLatch comingTag;
    /**等待裁判說開始*/
    private CountDownLatch waitTag;
    /** 等待起跑*/
    private CountDownLatch waitRunTag;
    /**起跑*/
    private CountDownLatch beginTag;
    /** 全部運動員道終點*/
    private CountDownLatch endTag;

    public MyThread(CountDownLatch comingTag, CountDownLatch waitTag, CountDownLatch waitRunTag, CountDownLatch beginTag, CountDownLatch endTag) {
        super();
        this.comingTag = comingTag;
        this.waitTag = waitTag;
        this.waitRunTag = waitRunTag;
        this.beginTag = beginTag;
        this.endTag = endTag;
    }

    @Override
    public void run() {
        try {
            System.out.println("運動員正陸續入場");
            Thread.sleep((int)Math.random()*10000);
            System.out.println(Thread.currentThread().getName()+"到起跑點了");
            comingTag.countDown();
            System.out.println("等待裁判說準備");
            waitTag.await();
            System.out.println("準備。。。。。開始");
            waitRunTag.countDown();
            beginTag.await();
            System.out.println(Thread.currentThread().getName()+"開始跑,而且跑步過程不肯定");
            Thread.sleep((int)Math.random()*10000);
            endTag.countDown();
            System.out.println(Thread.currentThread().getName()+"到達終點");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
複製代碼

測試類:

public class Run {

    public static void main(String[] args) {


        CountDownLatch comingTag = new CountDownLatch(10);
        CountDownLatch waitTag=new CountDownLatch(1);
        CountDownLatch waitRunTag = new CountDownLatch(10);
        CountDownLatch beginTag=new CountDownLatch(1);
        CountDownLatch endTag = new CountDownLatch(10);

        MyThread[] threads=new MyThread[10];

        for(int i=0;i<threads.length;i++){
            threads[i]=new MyThread(comingTag,waitTag,waitRunTag,beginTag,endTag);
            threads[i].setName("運動員"+(i+1));
            threads[i].start();
        }

        try {
            System.out.println("裁判正在等待選手的到來。。。。");
            comingTag.await();
            System.out.println("全部的運動員都到齊了,準備開始,各就位。。。。預備");
            Thread.sleep(5000);
            waitTag.countDown();
            System.out.println("各就各位。。。。");
            waitRunTag.await();
            Thread.sleep(2000);
            System.out.println("命令槍,開!!!");
            beginTag.countDown();
            endTag.await();
            System.out.println("全部運動員都到得終點了。。。");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}
複製代碼

image.png
image.png

通常來講,都會把CountDownLatch與CyclicBarrier進行比較?

CountDownLatch通常用於某個線程A等待若干個其餘線程執行完任務以後,它才執行;而CyclicBarrier通常用於一組線程互相等待至某個狀態,而後這一組線程再同時執行;CountDownLatch強調一個線程等多個線程完成某件事情。CyclicBarrier是多個線程互等,等你們都完成,再攜手共進。 調用CountDownLatch的countDown方法後,當前線程並不會阻塞,會繼續往下執行;而調用CyclicBarrier的await方法,會阻塞當前線程,直到CyclicBarrier指定的線程所有都到達了指定點的時候,才能繼續往下執行; CountDownLatch方法比較少,操做比較簡單,而CyclicBarrier提供的方法更多,好比可以經過getNumberWaiting(),isBroken()這些方法獲取當前多個線程的狀態,而且CyclicBarrier的構造方法能夠傳入barrierAction,指定當全部線程都到達時執行的業務功能; CountDownLatch是不能複用的,而CyclicBarrier是能夠複用的。就是說,當CountDownLatch執行countDown時若是此時countDown執行的state的值減到0了,這時候再調用,不能循環執行了,而CyclicBarrier是能夠的,能夠看一下這篇文章: www.jianshu.com/p/ff6c2ef5e…

整理不易,喜歡能夠關注我

相關文章
相關標籤/搜索