CountDownLatch源碼探究 (JDK 1.8)

CountDownLatch可以實現讓線程等待某個計數器倒數到零的功能,以前對它的瞭解也僅僅是簡單的使用,對於其內部如何實現線程等待卻不是很瞭解,最好的辦法就是經過看源碼來了解底層的實現細節。CountDownLatch的源碼並非很複雜,由於其核心的功能是依賴AbstractQueuedSynchronizer(下文簡稱AQS)來實現的。CountDownLatch經常使用的方法不多,可是由於涉及到AQS,邏輯有些繞,要理清中間的邏輯稍微要費一些時間。node

1.內部類Sync

CountDownLatch的核心功能是經過內部類Sync實現的,這個類繼承了AQSapp

private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        //構造器,根據傳入的整數初始化狀態字段state
        Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }

        //tryAcquireShared惟一的做用是查看狀態字段是否是等於0
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            //自旋,在兩種條件下會退出自旋:a)state字段已經爲0;b)線程成功地將state字段減1
            for (;;) {
                int c = getState();
                //若是state已經爲0,就返回false
                if (c == 0)
                    return false;
                int nextc = c-1;
                //從下面的語句能夠看到,只有當state=0纔會返回true
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }

2.構造器

CountDownLatch只有一個構造器,在構造器中會初始化sync字段,結合Sync類的定義可知,構造器的惟一工做是將state字段初始化爲傳入的參數:oop

public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

3.節點狀態waitStatus

等待的線程會構形成節點放在等待隊列中,節點的狀態waitStatus有以下幾種:ui

/** waitStatus value to indicate thread has cancelled */
    static final int CANCELLED =  1;
    /** waitStatus value to indicate successor's thread needs unparking */
    static final int SIGNAL    = -1;
    /** waitStatus value to indicate thread is waiting on condition */
    static final int CONDITION = -2;
    /**
     * waitStatus value to indicate the next acquireShared should
     * unconditionally propagate
     */
    static final int PROPAGATE = -3;

注意,在CountDownLatch中並無用到CONDITION狀態,所以後文將會直接忽略該狀態,當waitStatus > 0時,指的就是CANCELLED狀態。this

4.核心方法

  • await()
    當計數器沒不等於0時,await()方法會讓當前線程掛起,該方法調用了AQS類的acquireSharedInterruptibly方法,以下:
public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    public final void acquireSharedInterruptibly(int arg)  throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        //顯然,tryAcquireShared方法只有在state=0時才返回1,表示計數器已歸零,此時方法直接返回,被阻塞的線程就能夠繼續執行
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

一般,調用await()的線程在執行到acquireSharedInterruptibly方法時,計數器並不爲0,那麼當前線程就須要執行doAcquireSharedInterruptibly方法中的阻塞邏輯了。因爲該方法內部調用了三個主要方法:addWaitershouldParkAfterFailedAcquireparkAndCheckInterrupt,在解析的過程當中不免會穿插對這些方法的介紹,從而引入跳躍性。爲了不跳躍性引起的閱讀和理解上的困難,這裏準備先介紹addWaiter方法。atom

  • addWaiter
private Node addWaiter(Node mode) {
        //將當前線程構形成一個Node節點
        Node node = new Node(Thread.currentThread(), mode);
        //獲取尾節點
        Node pred = tail;
        //尾節點不爲空,說明隊列已完成初始化
        if (pred != null) {
            //將node節點放到對尾,這裏的作法是先將node的prev指針指向尾節點,而後經過原子操做將新添加的node更新成尾節點,成功的話addWaiter方法結束
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                //原子操做成功的話,更新原尾節點的next指針
                pred.next = node;
                return node;
            }
        }
        //執行到這裏有兩種狀況:1)尾節點爲空,即隊列還沒初始化;2)隊列已初始化,可是上文將node節點設置成尾節點失敗,此時node節點尚未真正添加進隊列
        enq(node);
        return node;
    }

    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            //若是隊列還沒初始化,則先初始化,作法是將一個空節點做爲頭結點,而後讓尾節點也指向這個空節點
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                //這裏會一直自旋,直到成功地將node節點更新成尾節點
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

addWaiter方法的主要做用就是將當前線程添加到等待隊列的隊尾,若是隊列還沒初始化,則先初始化,enq方法使用自旋避免入隊失敗。線程

  • doAcquireSharedInterruptibly
    接下來正式開始介紹doAcquireSharedInterruptibly方法,源碼以下:
private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        //將當前線程添加到等待隊列,注意參數是Node.SHARED,下文會用到
        final Node node = addWaiter(Node.SHARED);
        //該字段在state=0時纔會被設置爲false
        boolean failed = true;
        try {
            //又是自旋,該自旋的終止條件有兩種:1)state=0,計數器正常結束,執行return語句返回;2)線程響應中斷異常,跳出自旋
            for (;;) {
                //獲取node的前驅節點
                final Node p = node.predecessor();
                //若是前驅節點是頭結點,則執行if代碼塊的邏輯
                if (p == head) {
                    //獲取state字段的狀態,若是state=0則返回1,不然返回-1
                    int r = tryAcquireShared(arg);
                    //r>=0,說明計數器結束了,須要喚醒阻塞的線程
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        //計數器正常結束時,會將failed設置爲false,避免執行finally中的語句
                        failed = false;
                        return;
                    }
                }
                //執行到這裏說明state!=0,真正的阻塞邏輯在parkAndCheckInterrupt方法裏
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            //若是線程被中斷,那麼failed=true,執行cancelAcquire方法
            if (failed)
                cancelAcquire(node);
        }
    }

doAcquireSharedInterruptibly先經過addWaiter方法將當前線程添加到等待隊列尾部,而後開始自旋。若是state字段不爲0,那麼會執行到末尾的條件語句:3d

if (shouldParkAfterFailedAcquire(p, node) &&
        parkAndCheckInterrupt())
        throw new InterruptedException();

先來看看shouldParkAfterFailedAcquire幹了些什麼:指針

//注意pred是node的前驅節點
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        //若是已是SIGNAL狀態,則之間返回true
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        //ws>0只能是cancelled狀態,此時經過修改指針將這些cancelled的節點從隊列刪除
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            //若是前驅節點的狀態既不是SIGNAL,也不是CANCELLED,那麼只多是0或者PROPAGATE,就把前驅節點的狀態更新爲 Node.SIGNAL。注意:1)CONDITION狀態在CountDownLatch中並無用到;2)節點新建的時候狀態都是0,是在這裏才被修改爲了SIGNAL
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

以前對節點的SIGNAL狀態是怎麼來的一直有點迷糊,看了上面的代碼才發現是在最後一個else分支中設置的。從shouldParkAfterFailedAcquire源碼瞭解到,該方法只有在前驅節點狀態是SIGNAL時才返回true,此時纔有機會執行parkAndCheckInterrupt方法。parkAndCheckInterrupt是真正讓線程掛起的地方,來看看其源碼:code

private final boolean parkAndCheckInterrupt() {
        //線程最終會阻塞在這裏,線程恢復以後也將從這裏繼續執行
        LockSupport.park(this);
        return Thread.interrupted();
    }

parkAndCheckInterrupt方法藉助LockSupport實現線程阻塞,被阻塞的線程在被喚醒後會返回當前線程的中斷狀態(注意Thread.interrupted()會清除線程的中斷狀態)。好了,到這裏整個邏輯就比較清楚了,若是線程是正常被喚醒(即state=0),那麼parkAndCheckInterrupt返回falsedoAcquireSharedInterruptibly方法會接着自旋一次,這裏再次將自旋代碼貼出:

for (;;) {
        //獲取node的前驅節點
        final Node p = node.predecessor();
        //若是前驅節點是頭結點,則執行if代碼塊的邏輯
        if (p == head) {
            //獲取state字段的狀態,若是state=0則返回1,不然返回-1
            int r = tryAcquireShared(arg);
            //r>=0,說明計數器結束了,須要喚醒阻塞的線程
            if (r >= 0) {
                setHeadAndPropagate(node, r);
                p.next = null; // help GC
                failed = false;
                return;
            }
        }
        //執行到這裏說明state!=0,真正的阻塞邏輯在parkAndCheckInterrupt方法裏
        if (shouldParkAfterFailedAcquire(p, node) &&
            parkAndCheckInterrupt())
            throw new InterruptedException();
    }

那麼setHeadAndPropagate方法作了些什麼事呢,看看它的源碼(刪掉了源碼中的註釋):

//回憶一下,顯然propagate=1,node是當前插入到對尾的新節點
    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        //把node設置爲頭結點
        setHead(node);
        //此時propagate > 0的條件已經知足,直接執行if代碼塊的邏輯
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            //若是沒有下一個節點,或者下一個節點的isShared返回true,就釋放。還記得嗎,在構造新節點的時候addWaiter的參數是Node.SHARED,這裏就是判斷這個字段
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

    private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }

接下來看一下doReleaseShared是如何實現的:

private void doReleaseShared() {
        /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
        for (;;) {
            //獲取頭結點
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                //若是頭結點的狀態是SIGNAL,那麼會將其狀態修改成0,該步驟一直自旋直到成功爲止
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    //成功修改頭結點的狀態後,會執行下面這個方法
                    unparkSuccessor(h);
                }
                //若是頭結點狀態已經改爲0了,就再次將其狀態更新爲Node.PROPAGATE,目的是???
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

頭結點的狀態成功更新爲0後,會執行unparkSuccessor方法的邏輯,該方法源碼以下:

private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        //獲取後繼節點
        Node s = node.next;
        //若是沒有後繼節點,或者後繼節點是CANCELLED狀態,則執行下面的代碼塊
        if (s == null || s.waitStatus > 0) {
            s = null;
            //從隊列末尾向開頭遍歷,找到靠近頭結點的第一個不爲CANCELLED狀態的節點
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        //找到這樣的非CANCELLED節點,就將其喚醒
        if (s != null)
            LockSupport.unpark(s.thread);
    }

unparkSuccessor的主要工做是將頭結點後面第一個非CANCELLED狀態的節點所對應的線程喚醒。

  • cancelAcquire
    到目前爲止,並無發現CANCELLED狀態是在哪裏設置,由於還有一個方法沒有分析。doAcquireSharedInterruptibly中的finally語句塊會處理線程被中斷的狀況,執行的是cancelAcquire方法的邏輯,其源碼以下:
private void cancelAcquire(Node node) {
        // Ignore if node doesn't exist
        if (node == null)
            return;
        //線程中斷後,將其對應的節點中保存的線程清空
        node.thread = null;

        // Skip cancelled predecessors
        //從隊列中刪除狀態爲CANCELLED的節點
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;

        // predNext is the apparent node to unsplice. CASes below will
        // fail if not, in which case, we lost race vs another cancel
        // or signal, so no further action is necessary.
        Node predNext = pred.next;

        // Can use unconditional write instead of CAS here.
        // After this atomic step, other Nodes can skip past us.
        // Before, we are free of interference from other threads.
        //CANCELLED狀態在這裏設置
        node.waitStatus = Node.CANCELLED;

        // If we are the tail, remove ourselves.
        //若是當前是尾節點,其第一個非CANCELLED狀態的前驅節點設置爲新的尾節點,pred後面的節點將會被GC回收。注意,下面的兩個原子操做,不論是否成功,都沒有重試
        if (node == tail && compareAndSetTail(node, pred)) {
            compareAndSetNext(pred, predNext, null);
        } else {
            // If successor needs signal, try to set pred's next-link
            // so it will get one. Otherwise wake it up to propagate.
            int ws;
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
                Node next = node.next;
                //當前線程對應的節點不是尾節點,其有後繼節點而且後繼節點不是CANCELLED狀態,經過修改指針將當前線程節點從隊列刪除
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
                //根據前面的if條件,在如下幾種狀況時會執行到這裏,喚醒node節點的後繼節點
                //1)pred=head,即當前被中斷的線程前面的全部線程都是CANCELLED狀態
                //2)pred!=head,可是pred節點的狀態不等於SIGNAL,且將pred節點的狀態修改成SIGNAL失敗
                //3)pred節點記錄的線程是null,目前已知頭結點的thread字段確實爲null,除此以外還有其餘狀況嗎???
                unparkSuccessor(node);
            }

            node.next = node; // help GC
        }
    }

分析到這裏,纔剛把await()的邏輯分析完,可是僅僅分析代碼仍然是不夠的,由於本人分析到這裏的時候,腦殼仍然是蒙的,主要緣由是缺乏一個全局的認識。代碼放在這裏都能看懂,可是代碼爲何這樣寫?當計數器結束(即state=0)時,隊列中的等待線程是一塊兒所有換新,仍是一個一個依次喚醒?線程被喚醒後從新執行doAcquireSharedInterruptibly中的自旋時,和第一次執行到底有哪些地方不同呢?所以,有必要對以上的邏輯進行總體梳理。
看完這部分源碼以後,發現核心的邏輯都包含在doAcquireSharedInterruptibly中,如今是時候回過頭來整理一下該方法的邏輯了。
假設有如今有一個線程t1執行了await方法,因爲等待隊列還沒初始化,所以先構造一個空的頭節點,而且把t1構形成節點加到隊列中,以下圖:

接着,在shouldParkAfterFailedAcquire方法中修改頭結點的狀態:

如今又有新的t2線程執行了await,此時隊列的結構將更新爲下圖:

即每添加一個節點到等待隊尾,就將其前驅節點的狀態更新爲Node.SIGNAL(即-1),而後全部的線程都阻塞在parkAndCheckInterrupt方法裏。如今,計數器已經結束,最後一個執行countDown方法的線程順帶執行了doReleaseShared方法,將頭結點的waitStatus更新成了0,以下圖:

繼續向下執行到unparkSuccessor方法,喚醒線程t1t1parkAndCheckInterrupt方法中醒來,繼續自旋。t1的前置節點就是頭結點head,且state=0t1開始執行setHeadAndPropagate,將本身設置爲頭結點,並在setHead方法中將threadprev字段都設置爲空,以下圖:

線程t1接着執行doReleaseShared方法,把頭節點(此時t1就是頭結點)狀態更新爲0,並喚醒t2,開始執行await以後的邏輯,以下圖:

喚醒t2後,t1退出await方法,此時隊列以下:

t2開始執行後,一樣把本身設置爲頭結點,以下:

在執行setHeadAndPropagate方法時,t2沒有後繼節點了,仍然會執行doReleaseShared方法,可是在doReleaseShared方法中,t2即便頭結點也是尾節點,那就什麼也不作,直接結束並退出await方法,此時隊列裏就只剩下一個頭結點了。

  • countDown
    如今,終於能夠開始看看countDown方法的邏輯了:
public void countDown() {
        sync.releaseShared(1);
    }

    public final boolean releaseShared(int arg) {
        //以前分析過,該方法會將state的值減1,而且只有在減1後state=0纔會返回true,表示計數器結束了
        if (tryReleaseShared(arg)) {
            //喚醒後繼節點中第一個不爲CANCELLED狀態的節點
            doReleaseShared();
            return true;
        }
        return false;
    }

當一個線程將state修改爲0時,順便還要執行doReleaseShared方法,這個方法會將頭結點的後繼節點喚醒。
有一個小細節須要注意,doReleaseShared方法在源碼中有兩個地方調用,一個入口就是剛講的countDown方法,另外一個就是從await方法進入,在setHeadAndPropagate中調用,可是兩者是有前後順序的是,是countDown方法喚醒最前面的線程以後,再由該線程依次喚醒後面的線程。

相關文章
相關標籤/搜索