囉裏吧嗦CountDownLatch

java.util.concurrent
Class CountDownLatchhtml

目錄
  • CountDownLatch 是什麼

CountDownLatch是一個同步工具類,它容許一個或多個線程一直等待,直到其餘線程的操做執行完後再執行java

  • CountDownLatch 怎麼用

CountDownLatch是經過一個計數器來實現的,計數器的初始值爲線程的數量,這個值只能被設置一次且後期沒法更改node

每當一個線程完成了本身的任務後,計數器的值就會減1緩存

當計數器值到達0時,它表示全部的線程已經完成了任務,而後在 閉鎖上等待的線程就能夠恢復執行任務安全

線程必須在啓動其餘線程後當即調用 CountDownLatch.await() 方法

這樣主線程的操做就會在這個方法上阻塞,直到其餘線程完成各自的任務,而且調用CountDownLatch實例的countDown()方法。

每調用一次這個方法,在構造函數中初始化的count值就減1

直到計數器爲0的時候, 中止阻塞
  • CountDownLatch 案例
    話很少說,直接上
import java.util.concurrent.CountDownLatch;


public class TestCountDownLatch {
    
    static int n = 0;
    
    public static void main(String[] args) {
        
        int thread_num = 10;
        
        final CountDownLatch countDown = new CountDownLatch(thread_num);
        
        long start = System.currentTimeMillis();
        
        for (int i =0; i<thread_num; i++) {
            
            //模擬多線程執行任務 ,啓動10個線程, 
            new Thread(new Runnable(){

                @Override
                public void run() {
                    // 好比你想測多線程環境下 餓漢式懶漢式 執行效率
                    // 可在裏面執行要測試的代碼,我就簡單模擬下
                    for (int i =0; i<1000; i++) {
                        n++;
                    }
                    System.out.println("線程:" + Thread.currentThread().getName()+" 任務執行完畢");
                    //計數器減一
                    countDown.countDown();
                }
                
            }).start();
        }
        
        try {
            //主線程就一直阻塞了
            countDown.await();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        
        System.out.println("線程:" + Thread.currentThread().getName()+" 恢復,開始接着執行");
        
        long end = System.currentTimeMillis()-start;
        
        System.out.println("執行時間:" + end);
    }

}

執行結果多線程

線程:Thread-0 任務執行完畢
線程:Thread-2 任務執行完畢
線程:Thread-1 任務執行完畢
線程:Thread-4 任務執行完畢
線程:Thread-3 任務執行完畢
線程:Thread-5 任務執行完畢
線程:Thread-6 任務執行完畢
線程:Thread-8 任務執行完畢
線程:Thread-7 任務執行完畢
線程:Thread-9 任務執行完畢
線程:main 恢復,開始接着執行
執行時間:2

可見主線程以前一直被阻塞,直到全部的線程都執行完畢,再接着執行
若是不使用CountDownLatch, 那麼可能其餘線程還沒執行完, 主線程就結束了, 主線程又不是守護線程
相似這樣eclipse

線程:Thread-0 任務執行完畢
線程:Thread-1 任務執行完畢
線程:Thread-3 任務執行完畢
線程:Thread-7 任務執行完畢
線程:Thread-2 任務執行完畢
線程:Thread-5 任務執行完畢
線程:main 恢復,開始接着執行
執行時間:19
線程:Thread-4 任務執行完畢
線程:Thread-9 任務執行完畢
線程:Thread-6 任務執行完畢
線程:Thread-8 任務執行完畢

題外話,若是不使用CountDownLatch有沒有其餘的辦法,其實也有jvm

去掉 count相關代碼, 加一句

while(Thread.activeCount()>1)  //保證前面的線程都執行完
            Thread.yield();

System.out.println("線程:" + Thread.currentThread().getName()+" 恢復,開始接着執行");
  • CountDownLatch 源碼解析

首先,若是讓你實現這個工具類, 可想的辦法有哪些ide

1. 好比  在主線程執行的代碼裏 , 用 threadB.join(), 先執行 B線程的join方法,  再執行主線程
2. 好比  用object對象的 wait(),和notify() notifyAll()方法, 須要注意的是這兩個方法須要配合着 synchronized 一塊兒使用,
否則會報 java.lang.IllegalMonitorStateException  
親測, 而且使用wait 鎖住的對象  和  notify 喚醒 釋放鎖的對象必須是同一個

爲了方便eclipse步入,我寫了個測試類, 而後打斷點看的更清楚,
這裏模擬CountDown簡單寫了下函數

import java.util.concurrent.locks.AbstractQueuedSynchronizer;




public class TestCountDownLatch1 {
    
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            System.out.println("this" + this);
            setState(count);
        }

        int getCount() {
            return getState();
        }

        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
        
      
        
    }
    
    private final Sync sync;

      
    public TestCountDownLatch1(int count) {
            if (count < 0) throw new IllegalArgumentException("count < 0");
            this.sync = new Sync(count);
    }
    
    public void await() throws InterruptedException {
         sync.acquireSharedInterruptibly(1);//在這裏打上斷點
    }
    
    public void countDown() {
        sync.releaseShared(1);//在這裏打上斷點
    }
    
    public static void main(String[] args) {
        final TestCountDownLatch1 a = new TestCountDownLatch1(11);
        
        try {
            a.await();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

}

首先看CountDownLatch的構造方法

public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

發現初始化了sync實例, 而且傳入了計數器的值
進入sync構造器,

Sync(int count) {
            setState(count);
        }

setState點進去,發現是繼承的AbstractQueuedSynchronizer類裏的方法, 簡稱AQS,給抽象類的

private volatile int state;

state賦值, 能夠猜到 此變量就是實際用來表示計數器的值, 至於爲何要用 volatile關鍵字, 有興趣的童鞋能夠去看看這篇博客
https://www.cnblogs.com/dolphin0520/p/3920373.html

簡單來講volatile關鍵字保證了其對線程的透明性, 用其修飾的代碼 jvm 保證了其的 可見性和有序性 ,相對來講更安全

具體來講就是 當此變量被修改, 會被當即刷新到主存,而且將其餘線程的緩存行置爲失效狀態
被它修飾的變量 不會被進行指令重排序

簡單的猜測下,countDown.await();就是阻塞線程, 而後不停的檢查state的值, 若是爲0, 則中止阻塞
而 countDown.countDown(); 就是將計數器的值減一

好, 如今看countDown的await方法, 將TestCountDownLatch1的斷點打好, 而後debug as 啓動該類

F5步入 sync.acquireSharedInterruptibly(1); 方法, 發現sync並無實現該方法, 使用的是AQS裏的

public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

這方法的名字叫 得到 共享的 斷點? ,
方法聲明瞭一個InterruptedException異常,表示調用該方法的線程支持打斷操做,若是中斷了,清除掉, 捕獲異常,再接着往下執行


這裏先檢查了下 線程的中斷狀態 , 這裏要說下, Thread.interrupted()方法

public static void main(String[] args) {
        
        System.out.println(Thread.currentThread().getName());
        
        System.out.println(Thread.interrupted());
        
        Thread.currentThread().interrupt();
        
        System.out.println(Thread.interrupted());
        
        System.out.println(Thread.interrupted());
)

main
false
true
false

該方法是得到線程的中斷狀態,而且會清除線程的中斷


再接着往下看, 別忘了此時的arg 是1 , 雖然在CountDownLatch工具類中沒有用到, 但其餘工具類有可能會用
AQS有兩套方式獲取鎖,一個獨佔式,一個共享式
獨佔式就是隻能一個線程訪問,例如Reentrantlock,同步隊列每次也只喚醒一個線程;
共享式就是多個線程訪問,例如CountDownLatch,同步隊列喚醒頭節點,而後依次喚醒後面全部節點,實現共享狀態傳播

方法名:嘗試 得到 共享 ,

tryAcquireShared(arg) < 0 
馬後炮猜猜這個方法的做用,   
這個方法應該是判斷計數器是否爲0, 爲0 則不阻塞了, 線程接着往下走, 不爲0 , 
則繼續阻塞
返回true
接着執行doAcquireSharedInterruptibly

接着F5步入, 發現是CountDownLatch的Syn內部靜態類本身重寫了此方法
根據名字和判斷<0 這個值 , 我以爲這個方法的含義是 返回值 若是 >=0, 那麼就是 得到 共享了, 而後中止阻塞, 線程接着往下執行
返回負數 就表示 獲取失敗, 接着阻塞吧

//共享的模式下獲取狀態
protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

果真, 入參acquires,也就是 arg 是沒什麼用的, 它判斷 AQS的state 計數器是否是0 , 若是爲0 返回1 ,
那麼 1<0 爲false ,方法直接結束退出

咱們代碼裏設置的是10, 返回-1,那麼接着看 doAcquireSharedInterruptibly 方法

AQS裏的 方法名: 去作 得到 共享 中斷 --沒必要在乎 瞎解釋的

private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED); // 添加Node節點  不明白爲何要這樣寫  static final Node SHARED = new Node();  一個靜態的node對象
        .........
    }

這裏先看下AQS裏的一個代碼圖

*      +------+  prev +-----+       +-----+
     * head |      | <---- |     | <---- |     |  tail
     *      +------+       +-----+       +-----+
     *

接着進去看 addWaiter方法, 名字上看是 添加等待者,
這裏實際上要說下AQS,抽象的同步隊列, AQS裏有個 static final class Node {}, 靜態內部類,
該類裏面有 volatile Node prev; // 指向 當前節點的 前一個節點
volatile Node next; // 指向 當前節點的 後一個節點
volatile Thread thread; //放入線程 包裝線程

固然若是是頭節點,那麼它的prev爲null,同理尾節點的next爲null

而後AQS裏有
private transient volatile Node head;

private transient volatile Node tail;
用來表示同步隊列的頭節點和尾節點
---

接着F5步入addWaiter方法

private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);  //包裝節點  當前節點  置入 當前線程對象 和 Node對象 
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail; // 聲明尾節點
        if (pred != null) {  // 
            node.prev = pred; // 若是尾節點不爲空,  那麼新節點 的 前一個節點 是尾節點 ,  
            if (compareAndSetTail(pred, node)) { //   畢竟是多線程操做, 1-N個線程都能被阻塞, 等待, 添加到隊列裏, 有volatile關鍵字還不夠
                 //  還須要 cas 方式替換AQS裏的 尾節點對象 compareAndSetTail , 會比較 pred 和 如今AQS的尾節點是否是一個對象
                 // 若是是 則替換 node 爲新的尾節點  替換成功 , 則以前的尾節點的 next 指向  新的尾節點
                pred.next = node;
                return node; //
            }
        }
        enq(node);  //咱們只有主線程阻塞, 並且是第一次進來, 因此尾節點 頭節點 確定都是空的, 因此走這裏
        return node;
    }

額外小芝士:
不少人不明白compareAndSetTail(pred, node) 是什麼, 這個實際上是CAS, Compare And SWAP, 先比較 , 再替換, 只有比較的和預期對象相等, 纔會替換成新的對象
模仿着寫個小荔枝

package thead1;

import sun.misc.Unsafe;

import java.lang.reflect.Field;


public class TestUnsafe {

    public static void main(String[] args) {
        Node node = new Node();
        /**
         * 經過CAS方法更新node的next屬性
         * 原子操做
         */
        Node n = new Node();
        
        boolean flag = node.casNext(null,n);// 一開始的 volatile Node next; 確實是null 
        
        System.out.println(flag); //true ,被更新成n 
        
        flag = node.casNext(new Node(),new Node()); //沒更新 , 由於如今 next 應該是n 指向的對象
        
        System.out.println(flag);//false
        
        flag = node.casNext(n,new Node()); 
        
        System.out.println(flag);//true
        
    }

    private static class Node{

        volatile Node next;

        /**
         * 使用Unsafe CAS方法
         * @param cmp 目標值與cmp比較,若是相等就更新返回true;若是不相等就不更新返回false;
         * @param val 須要更新的值;
         * @return
         */
        boolean casNext(Node cmp, Node val) {
            /**
             * compareAndSwapObject(Object var1, long var2, Object var3, Object var4)
             * var1 操做的對象
             * var2 操做的對象屬性    而這個offset只是記錄該屬性放哪  ,  比較的應該是屬性 所指的對象 的地址
             * var3 var2與var3比較,相等才更新
             * var4 更新值
             */
             System.out.println("nextOffset : " +nextOffset + " this " +this + " cmp " +cmp 
                    + " val " +val + " next " + next);
            
             Boolean a = UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
           
             System.out.println(" next " + next + " 更新結果  " + a);
           
             return a;
        }

        private static final sun.misc.Unsafe UNSAFE;
        private static final long nextOffset;

        static {
            try {
                UNSAFE = getUnsafe();
                Class<?> k = Node.class;
                nextOffset = UNSAFE.objectFieldOffset
                        (k.getDeclaredField("next"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }

        /**
         * 獲取Unsafe的方法
         * 
         * @return
         */
        public static Unsafe getUnsafe() {
            try {
                Field f = Unsafe.class.getDeclaredField("theUnsafe");
                f.setAccessible(true);
                return (Unsafe)f.get(null);
            } catch (Exception e) {
                return null;
            }
        }
    }
    
    
    
    
    
}

會發現第一次更新成功 , 應該剛new 的node對象 next屬性爲null ,
還記得 以前的volatile 關鍵字嗎 , 因爲不保證原子性 , 若是多個線程進行更新, 就會出現問題

好比 i++ 能夠拆分紅3個動做

讀取i的原始值 i副本壓入操做數棧
對i進行+1 操做,
彈出操做數棧,寫入主存

好比線程A 讀取i 的值10, 正準備向cpu發送指令 +1時被阻塞了, 線程A因爲還沒修改, 不會致使線程B的工做內存中緩存變量inc的緩存行無效

而後線程B 也去讀, 線程A還沒修改, 線程B 讀內存的值10 , +1 , 而後把11 寫入工做內存,寫入主存 volatile雖然保證線程B修改後能夠另其餘線程緩存行失效,並當即寫入主存
但此時線程A已經讀到了i的值,

線程A已經讀取到了值, 不在涉及讀操做, 因此並無更新緩存,(個人理解是若是線程A 還須要讀, 那麼纔會發現本身的緩存失效了, 那麼才從主存讀11)
以前已經把操做數放入了本身的操做數棧中 線程A才中斷的 CPU因爲保存了上次線程A的工做狀態
所以, 輪到線程A工做時, 會繼續上次的操做, 即: 開始對操做數棧中的數進行+1操做, 而後當即刷回主存, 所以再也不涉及讀操做,不然CPU保存線程的工做狀態將毫無心義
變成11 寫入主存

兩次操做,只加了1

寫個例子證實下

package thead1;

import java.lang.reflect.Field;

import sun.misc.Unsafe;

public class TestVolatile {
    
    private  volatile int i = 0 ;
    
    private int j = 0 ;

    private  volatile int next = 0 ;
    
    private static final sun.misc.Unsafe UNSAFE;
    private static final long nextOffset;
    
    public static Unsafe getUnsafe() {
        try {
            Field f = Unsafe.class.getDeclaredField("theUnsafe");
            f.setAccessible(true);
            return (Unsafe)f.get(null);
        } catch (Exception e) {
            return null;
        }
    }
    
    static {
        try {
            UNSAFE = getUnsafe();
            Class<?> k = TestVolatile.class;
            nextOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("next"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
    
    public final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return UNSAFE.compareAndSwapInt(this, nextOffset, expect, update);
    }
    
    
    public void add () {
        i++;
    }
    
    
    public void add1 () {
        synchronized(this) {
            j++;
        }
    }
    
    public void add2 () {
        int c = next;
        int nextc = c+1;
        for ( ;; ){
            if (compareAndSetState(c, nextc)){
                return;
            }
        }
        
           
    }
    
    
    public static void main(String[] args) {
        
        final TestVolatile tr = new TestVolatile();
        
        for (int i =0; i<10; i++) {
            
             new Thread(new Runnable(){

                    @Override
                    public void run() {
                        // TODO Auto-generated method stub
                        for (int i =0; i<300; i++) {
                            tr.add();
                            tr.add1();
                            //tr.add2();
                        }
                    }
                     
                 }).start();;
            
        }
        
        for (int i =0; i<10; i++) {
            new Thread(new Runnable(){

                @Override
                public void run() {
                    // TODO Auto-generated method stub
                    for (int i =0; i<300; i++) {
                        tr.add2();
                    }
                }
                 
             }).start();;
        }
        
        
        while(Thread.activeCount()>1)  //保證前面的線程都執行完
                Thread.yield();
        
        //然main方法等到他們都執行完了在打印
        System.out.println(tr.i);
        System.out.println(tr.j);
        System.out.println(tr.next);
        
    }

}

能夠看到變量i 雖然加了volite, 依然不能保證每次執行的結果是3000,
synchronized是用來對比的

線程方法裏面的循環能夠設置成10000會更明顯點, i老是低於10w的一個數

那麼用CAS原子性的方式去更改能不能保證呢, 答案是確定了, 我試了不少次
next的結果和 j的結果都同樣 ,

附: 有個小疑問, 就是當線程裏循環的次數是1w時, 很容易停住不動, 是產生死鎖了嗎
因此才用的300 200來測試


好的, 題外話說完, 在接着回到AQS,
addWaiter 方法裏, 因爲咱們是第一次進入, 因此AQS的尾節點確定是空的, 執行enq()方法

private Node enq(final Node node) {
        for (;;) {//死循環
            Node t = tail;//拿到尾節點
            if (t == null) { // Must initialize   
                if (compareAndSetHead(new Node()))//  必須初始化尾節點, 仍是cas, 判斷頭節點是空的, 那麼就new 一個節點實例給 頭節點
                    tail = head; // 頭節點 尾節點 都用一個 實例對象
            } else {
                node.prev = t; // 尾節點不爲空  將 當前節點的 prev 前一個節點執行 尾節點    head  tail <----prev---node 
                if (compareAndSetTail(t, node)) {// 只有將 尾節點 替換爲 當前節點  這個時候方法才結束 退出
                    t.next = node;  head  tail <----prev---node 就是新的尾節點
                    return t;                         -----next --->
                }
            }
        }
    }

這個方法很簡單, 就是初始化尾節點和頭節點, 而且設置 當前node 爲新的尾節點, 而後把先後關係都關聯上 ,在回到addWaiter方法
而後返回 新加的 這個尾節點
在回到doAcquireSharedInterruptibly

private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);// 添加新的節點 爲尾節點 而且初始化節點 而且設置新的節點爲尾節點  暫時不明白爲何要包裝一下 塞一個靜態的Node對象
        boolean failed = true;
        try {
            for (;;) {//死循環 注意退出條件
                final Node p = node.predecessor();//不帶着看了, 點進去其實就是 返回 當前節點的 上一個節點 ,若是爲空拋異常,  
                if (p == head) {// 若是 當前節點的 上一個節點  就是  頭節點 , 咱們第一次進來 實際上是的,  還記得 enq裏的方法嗎 , 頭尾節點都是一個地址, 當前節點是尾節點, 指向上一個節點即頭尾
                    int r = tryAcquireShared(arg); // 不解釋了 子類重寫的方法 本身定義什麼狀況下可以得到共享 , 不在阻塞 ,  第一次進來確定是 -1
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);//在同步隊列中掛起的線程,它們自省的觀察本身是否知足條件醒來(state==0,且爲頭節點),若是成立將調用setHeadAndPropagate這個方法
                        p.next = null; // help GC
                        failed = false;
                        return;//  因此第一次進來沒法退出 , 而後我發現F6一直走 ,到了判斷下面的if 條件後, 走兩遍, eclipse的步入 下一步都置灰了, 多是判斷若是沒有新的條件, 死循環沒法退出吧, 
        //因此一直阻塞着這裏
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed) // 只有 上面第二個if條件 中斷退出 纔會執行 這個方法
                cancelAcquire(node);
        }
    }

若是計數器爲0 , tryAcquireShared 爲1大於等於0 , 設置頭節點和 傳播 共享狀態

private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
       if (propagate > 0 || h == null || h.waitStatus < 0) {  // 
            Node s = node.next; //  得到 當前節點的 下一個節點  
            if (s == null || s.isShared())
                doReleaseShared();
        }
 }

設置當前節點爲頭節點 那麼很天然 當前節點的前一個節點 即原本的頭節點爲空

private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }

看到這裏其實 countDown的方法也能猜到大概了
其實就是 想辦法讓 state技術器的值減1 , 還得保證線程安全,
volatile其實適合一寫多讀, 若是多個線程都寫, 那麼就須要CAS去更新

因爲咱們測試代碼是阻塞一個main線程, 其實CountDownLatch能同時阻塞多個線程, 因此纔用到隊列
而後await()方法死循環裏檢測到條件知足了, 就退出死循環,退出阻塞, 接着往下執行了

以前咱們瞭解到, 當不知足tryAcquireShared(),條件時,
await()方法就一直 死循環阻塞

那麼猜countDown()方法除了讓計數器減一之外, 還須要依次喚醒被阻塞的線程

即 當前線程節點 的前一個節點 爲 頭節點 , 當它知足這個條件 , 同時計數器又爲0

猜想應該會 將該節點移除 , 將 頭節點的下一個設置爲null, p.next = null

該線程退出這個死循環

同時後面的那個 線程 應該會補上 , 它的prev 指向 頭節點

if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
  • CountDownLatch 的 countDown 源碼解析
public void countDown() {
        sync.releaseShared(1);//在countDownLatch 這個入參沒什麼用
    }

AQS裏的

名字: 可能意思就是 釋放 共享的 猜想這個方法就嘗試 釋放共享的鎖
主要就是調用同步器的tryReleaseShared方法來釋放狀態,並同時在doReleaseShared方法中喚醒其後繼節點

調用該方法釋放共享狀態,每次獲取共享狀態acquireShared都會操做狀態,
一樣在共享鎖釋放的時候,也須要將狀態釋放。好比說,一個限定必定數量訪問的同步工具,每次獲取都是共享的,
可是若是超過了必定的數量,將會阻塞後續的獲取操做,只有當以前獲取的消費者將狀態釋放纔可使阻塞的獲取操做得以運行

public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {// 子類負責實現  自定義  返回true 就表示要釋放  不然無論  
            doReleaseShared(); //我設置了11個線程 每次都是 false , 直到最後一個線程執行 該方法時   state變成0 走這裏 
            return true;
        }
        return false;// 
    }

CountDownLatch 能夠把TestNode 的count 設置成1 在打斷點 看下

//共享的模式下釋放狀態
protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) { //死循環
                int c = getState(); // 獲取計數器
                if (c == 0)
                    return false;// 若是計數器爲0  退出  
                int nextc = c-1;  // 第一個線程進來時 11 變10
                if (compareAndSetState(c, nextc)) //CAS 比較 c 和 AQS裏的state是否是同樣的 同樣的則更新爲10  在for死循環中 直到更新成功  
                    return nextc == 0;// 若是爲0 則退出 返回true 
            }
        }

名字的意思多是 作 釋放 共享鎖的事 state爲0 的時候執行此方法

爲了看head 頭節點的ws 什麼時候變成-1, 從新跟蹤await, 發現 直到初始化頭節點 ws都是0 ,

shouldParkAfterFailedAcquire 在此方法裏 將頭節點的ws 變成了-1

int waitStatus
表示節點的狀態。其中包含的狀態有:
CANCELLED,值爲1,表示當前的線程被取消;
SIGNAL,值爲-1,表示當前節點的後繼節點包含的線程須要運行,也就是unpark;
CONDITION,值爲-2,表示當前節點在等待condition,也就是在condition隊列中;
PROPAGATE,值爲-3,表示當前場景下後續的acquireShared可以得以執行;
值爲0,表示當前節點在sync隊列中,等待着獲取鎖。

private void doReleaseShared() {
 
        for (;;) {
            Node h = head;//頭節點  
            if (h != null && h != tail) {
                int ws = h.waitStatus;  
                if (ws == Node.SIGNAL) { // -1  若是當前節點是single 表示它等待被喚醒     而後我設置count爲1 走此方法看 ws爲-1 , 由於
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) 重置waitStatus標誌位 爲0 
                        continue;            // loop to recheck cases
                    unparkSuccessor(h); // 重置成功 喚醒下一個節點    unpark 開走停着的汽車  喚醒  successor 繼承者
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))  同時把頭節點的 0-0 替換成 -3 失敗了 則接着循環
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;//  退出條件 h==head,即該線程是頭節點,且狀態爲共享狀態
        }
    }

爲啥執着的猜名字, 由於寫代碼一般講究見名知意, 若是名字起得好 , 別人看起來事半功倍

因此總結一下, CountDownLatch內部實現了一個靜態內部類syn,主要利用了AQS這個抽象的同步隊列類, 也能夠叫同步器,

調用await 是調用AQS的 acquireSharedInterruptibly (該方法提供獲取狀態能力,在沒法獲取狀態的狀況下會進入sync隊列進行排隊), 進行線程中斷和排隊

調用countDown 實際上就是調用 releaseShared 方法釋放共享狀態

  • 結束語

學無止境, 學海無涯

參考:https://www.cnblogs.com/yanphet/p/5788260.html

水平有限,歡迎討論

相關文章
相關標籤/搜索