Java併發編程之CountDownLatch源碼解析

1、導語

最近在學習併發編程原理,因此準備整理一下本身學到的知識,先寫一篇CountDownLatch的源碼分析,以後但願能夠慢慢寫完整個併發編程。java

2、什麼是CountDownLatch

CountDownLatch是java的JUC併發包裏的一個工具類,能夠理解爲一個倒計時器,主要是用來控制多個線程之間的通訊。
好比有一個主線程A,它要等待其餘4個子線程執行完畢以後才能執行,此時就能夠利用CountDownLatch來實現這種功能了。node

3、簡單使用

public static void main(String[] args){
    System.out.println("主線程和他的兩個小兄弟約好去吃火鍋");
    System.out.println("主線程進入了飯店");
    System.out.println("主線程想要開始動筷子吃飯");
    //new一個計數器,初始值爲2,當計數器爲0時,主線程開始執行
    CountDownLatch latch = new CountDownLatch(2);
    
     new Thread(){
             public void run() {
                 try {
                    System.out.println("子線程1——小兄弟A 正在到飯店的路上");
                    Thread.sleep(3000);
                    System.out.println("子線程1——小兄弟A 到飯店了");
            //一個小兄弟到了,計數器-1
                    latch.countDown();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
             };
         }.start();
         
     new Thread(){
             public void run() {
                 try {
                    System.out.println("子線程2——小兄弟B 正在到飯店的路上");
                    Thread.sleep(3000);
                    System.out.println("子線程2——小兄弟B 到飯店了");
            //另外一個小兄弟到了,計數器-1
                    latch.countDown();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
             };
         }.start();
    
    //主線程等待,直到其餘兩個小兄弟也進入飯店(計數器==0),主線程才能吃飯
     latch.await();
     System.out.println("主線程終於能夠開始吃飯了~");
}

4、源碼分析

核心代碼:編程

CountDownLatch latch = new CountDownLatch(1);
        latch.await();
        latch.countDown();

其中構造函數的參數是計數器的值;
await()方法是用來阻塞線程,直到計數器的值爲0
countDown()方法是執行計數器-1操做併發

一、首先來看構造函數的代碼

public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

這段代碼很簡單,首先if判斷傳入的count是否<0,若是小於0直接拋異常。
而後new一個類Sync,這個Sync是什麼呢?咱們一塊兒來看下函數

private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }
    //嘗試獲取共享鎖
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }
    //嘗試釋放共享鎖
        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }

能夠看到Sync是一個內部類,繼承了AQS,AQS是一個同步器,以後咱們會詳細講。
其中有幾個核心點:工具

  1. 變量 state是父類AQS裏面的變量,在這裏的語義是計數器的值
  2. getState()方法也是父類AQS裏的方法,很簡單,就是獲取state的值
  3. tryAcquireShared和tryReleaseShared也是父類AQS裏面的方法,在這裏CountDownLatch對他們進行了重寫,先有個印象,以後詳講。

二、瞭解了CountDownLatch的構造函數以後,咱們再來看它的核心代碼,首先是await()。

public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

能夠看到,實際上是經過內部類Sync調用了父類AQS的acquireSharedInterruptibly()方法。oop

public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
    //判斷線程是不是中斷狀態
        if (Thread.interrupted())
            throw new InterruptedException();
    //嘗試獲取state的值
        if (tryAcquireShared(arg) < 0)//step1
            doAcquireSharedInterruptibly(arg);//step2
    }

tryAcquireShared(arg)這個方法就是咱們剛纔在Sync內看到的重寫父類AQS的方法,意思就是判斷是否getState() == 0,若是state爲0,返回1,則step1處不進入if體內acquireSharedInterruptibly(int arg)方法執行完畢。若state!=0,則返回-1,進入if體內step2處。 源碼分析

下面咱們來看acquireSharedInterruptibly(int arg)方法:學習

private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
    //step一、把當前線程封裝爲共享類型的Node,加入隊列尾部
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
        //step二、獲取當前node的前一個元素
                final Node p = node.predecessor();
        //step三、若是前一個元素是隊首
                if (p == head) {
            //step四、再次調用tryAcquireShared()方法,判斷state的值是否爲0
                    int r = tryAcquireShared(arg);
            //step五、若是state的值==0
                    if (r >= 0) {
            //step六、設置當前node爲隊首,並嘗試釋放共享鎖
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
        //step七、是否能夠安心掛起當前線程,是就掛起;而且判斷當前線程是否中斷
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
    //step八、若是出現異常,failed沒有更新爲false,則把當前node從隊列中取消
            if (failed)
                cancelAcquire(node);
        }
    }

按照代碼中的註釋,咱們能夠大概瞭解該方法的內容,下面咱們來仔細看下其中調用的一些方法是幹什麼的。
一、首先看addWaiter()ui

//step1
private Node addWaiter(Node mode) {
    //把當前線程封裝爲node
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
    //獲取當前隊列的隊尾tail,並賦值給pred
        Node pred = tail;
    //若是pred!=null,即當前隊尾不爲null
        if (pred != null) {
    //把當前隊尾tail,變成當前node的前繼節點
            node.prev = pred;
        //cas更新當前node爲新的隊尾
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
    //若是隊尾爲空,走enq方法
        enq(node);//step1.1
        return node;
    }

-----------------------------------------------------------------
//step1.1
private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
        //若是隊尾tail爲null,初始化隊列
            if (t == null) { // Must initialize
        //cas設置一個新的空node爲隊首
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
        //cas把當前node設置爲新隊尾,把前隊尾設置成當前node的前繼節點
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

二、接下來咱們在來看setHeadAndPropagate()方法,看其內部實現

//step6
private void setHeadAndPropagate(Node node, int propagate) {
    //獲取隊首head
        Node h = head; // Record old head for check below
    //設置當前node爲隊首,並取消node所關聯的線程
        setHead(node);
    //
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
        //若是當前node的後繼節點爲null或者是shared類型的
            if (s == null || s.isShared())
        //釋放鎖,喚醒下一個線程
                doReleaseShared();//step6.1
        }
    }
--------------------------------------------------------------------
//step6.1
private void doReleaseShared() {
        for (;;) {
        //找到頭節點
            Node h = head;
            if (h != null && h != tail) {
        //獲取頭節點狀態
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
            //喚醒head節點的next節點
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

三、接下來咱們來看countDown()方法。

public void countDown() {
        sync.releaseShared(1);
    }

能夠看到調用的是父類AQS的releaseShared 方法

public final boolean releaseShared(int arg) {
    //state-1
        if (tryReleaseShared(arg)) {//step1
        //喚醒等待線程,內部調用的是LockSupport.unpark方法
            doReleaseShared();//step2
            return true;
        }
        return false;
    }
------------------------------------------------------------------
//step1
protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
        //獲取當前state的值
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
        //cas操做來進行原子減1
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }

5、總結

CountDownLatch主要是經過計數器state來控制是否能夠執行其餘操做,若是不能就經過LockSupport.park()方法掛起線程,直到其餘線程執行完畢後喚醒它。
下面咱們經過一個簡單的圖來幫助咱們理解一下:
jucPS:本人也是還在學習的路上,理解的也不是特別透徹,若有錯誤,願傾聽教誨。^_^

相關文章
相關標籤/搜索