CountDownLatch 源碼解析—— await()

上一篇文章說了一下CountDownLatch的使用方法。這篇文章就從源碼層面說一下await() 的原理。html

咱們已經知道await 可以讓當前線程處於阻塞狀態,直到鎖存器計數爲零(或者線程中斷)。java

下面是它的源碼。node

end.await();  
    ↓
public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

sync 是CountDownLatch的內部類。下面是它的定義。框架

private static final class Sync extends AbstractQueuedSynchronizer {
  ...
}

它繼承了AbstractQueuedSynchronizer。AbstractQueuedSynchronizer 這個類在java線程中屬於一個很是重要的類。函數

它提供了一個框架來實現阻塞鎖,以及依賴FIFO等待隊列的相關同步器(好比信號、事件等)。ui

繼續走下去,就跳到 AbstractQueuedSynchronizer 這個類中。this

sync.acquireSharedInterruptibly(1);  
    ↓
public final void acquireSharedInterruptibly(int arg)  //AbstractQueuedSynchronizer
            throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

這裏有兩個判斷,首先判斷線程是否中斷,而後再進行下一個判斷,這裏咱們主要看看第二個判斷。spa

 

protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

須要注意的是 tryAcquireShared 這個方法是在Sync 中實現的。線程

AbstractQueuedSynchronizer 中雖然也有對它的實現,可是默認的實現是拋一個異常。指針

tryAcquireShared 這個方法是用來查詢當前對象的狀態是否可以被容許獲取鎖。

咱們能夠看到Sync 中是經過判斷state 是否爲0 來返回對應的 int 值的。

那麼 state 又表明什麼?  

/**
 * The synchronization state.
 */
   private volatile int state;

上面代碼很清楚的代表 state 是表示同步的狀態 。

須要注意的是 state 使用 volatile 關鍵字修飾。

volatile 關鍵字可以保證 state 的修改當即被更新到主存,當有其餘線程須要讀取時,會去內存中讀取新值。

也就是保證了state的可見性。是最新的數據。

走到這裏 state 是多少呢?

這裏咱們就須要看一看CountDownLatch 的 構造函數了。

CountDownLatch end = new CountDownLatch(2);
    ↓
public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}
    ↓
Sync(int count) {
    setState(count);
}

原來構造函數中的數字就是這個做用啊,用來set state 。

因此咱們這裏state == 2 了。tryAcquireShared 就返回 -1。進入到下面

doAcquireSharedInterruptibly(arg);
    ↓
private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

OK,這段代碼有點長,裏面還調用了幾個函數。咱們一行一行的看。

第一行 出現了一個新的類 Node。

Node 是AQS(AbstractQueuedSynchronizer)類中的內部類,定義了一種鏈式結構。以下所示。

     +------+  prev +-----+       +-----+
head |      | <---- |     | <---- |     |  tail
     +------+       +-----+       +-----+

千萬記住這個結構。

第一行代碼中還有一個方法 addWaiter(Node.SHARED) 。

addWaiter(Node.SHARED)  //Node.SHARED  表示該結點處於共享模式
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail; // private transient volatile Node tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

首先是構造了一個Node,將當前的線程存進去了,模式是共享模式。

tail 表示 這個等待隊列的隊尾,此刻是null. 因此 pred == null ,進入到enq(node) ;

enq(node)
    ↓
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

一樣tail 爲 null , 進入到 compareAndSetHead 。

compareAndSetHead(new Node())
    ↓
/**
 * CAS head field. Used only by enq.
 */
private final boolean compareAndSetHead(Node update) {
    return unsafe.compareAndSwapObject(this, headOffset, null, update);
}

這是一個CAS操做,若是head 是 null 的話,等待隊列的 head 就會被設置爲 update 的值,也就是一個新的結點。

 tail = head;  那麼此時 tail 也再也不是null了。進入下一次的循環。

此次首先將node 的 prev 指針指向 tail ,而後經過一個CAS 操做將node 設置爲尾部,並返回了隊列的 tail ,也就是 node 。

等待隊列的模型變化以下

           +------+  prev      +----------------+
head(tail) |      | <---- node | currentThread  |
           +------+            +----------------++------+  prev            +----------------+
head   |      | <---- node(tail) | currentThread  |
       +------+                  +----------------+

ok,到了這裏await 方法 就返回了,是一個 thread 等於當前線程的Node。

返回到 doAcquireSharedInterruptibly(int arg) 中,進入下面循環。

for (;;) {
    final Node p = node.predecessor();
    if (p == head) {
        int r = tryAcquireShared(arg);
        if (r >= 0) {
            setHeadAndPropagate(node, r);
            p.next = null; // help GC
            failed = false;
            return;
        }
    }
    if (shouldParkAfterFailedAcquire(p, node) &&
        parkAndCheckInterrupt())
        throw new InterruptedException();
}

這個時候假設state 仍然大於0,那麼此時 r < 0,因此進入到 shouldParkAfterFailedAcquire 這個方法 。

shouldParkAfterFailedAcquire(p, node)
    ↓
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)  //static final int SIGNAL    = -1;
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
    ↓
/**
 * CAS waitStatus field of a node.
 */
private static final boolean compareAndSetWaitStatus(Node node,
                                                     int expect,
                                                     int update) {
    return unsafe.compareAndSwapInt(node, waitStatusOffset,
                                    expect, update);
}
    
    

能夠看到 shouldParkAfterFailedAcquire  也是一路走,走到 compareAndSetWaitStatus。

compareAndSetWaitStatus 將 prev 的 waitStatus 設置爲 Node.SIGNAL 。

Node.SIGNAL 表示後續結點中的線程須要被unparking(相似被喚醒的意思)。該方法返回false。

通過這輪循環,隊列模型變成下面狀態

       +--------------------------+   prev           +------------------+
head   | waitStatus = Node.SIGNAL | <---- node(tail) | currentThread    |
       +--------------------------+                  +------------------+

由於shouldParkAfterFailedAcquire返回的是false,因此後面這個條件就再也不看了。繼續 for (;;)  中的循環。

若是state仍然大於0,再次進入到 shouldParkAfterFailedAcquire。

此次由於head 中的waitStatus 爲 Node.SIGNAL ,因此 shouldParkAfterFailedAcquire 返回true。

此次就須要看parkAndCheckInterrupt 這個方法了。

 private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

ok,線程沒有被中斷,因此,返回false。繼續 for (;;)  中的循環。

若是state 一直大於0,而且線程一直未被中斷,那麼就一直在這個循環中。也就是咱們上篇文章說的裁判一直不肯意宣佈比賽結束的狀況。

那麼什麼狀況下跳出循環呢?也就是什麼狀況下state 會 小於0呢? 下一篇文章 我將說明。

總結一下,await()  方法 其實就是初始化一個隊列,將須要等待的線程(state > 0)加入一個隊列中,並用waitStatus 標記後繼結點的線程狀態。

相關文章
相關標籤/搜索