JUC鎖框架——CountDownLatch

CountDownLatch簡單介紹

CountDownLatch是同步工具類之一,能夠指定一個計數值,在併發環境下由線程進行減1操做,當計數值變爲0以後,被await方法阻塞的線程將會喚醒,實現線程間的同步。java

CountDownLatch是一次性的,計數器的值只能在構造方法中初始化一次,以後沒有任何機制再次對其設置值,當CountDownLatch使用完畢後,它不能再次被使用。node

CountDownLatch的簡單應用

public static void main(String[] args) {
        int threadNum = 10;
        final CountDownLatch countDownLatch = new CountDownLatch(threadNum);
        for (int i = 0; i < threadNum; i++) {
            final int tNum = i + 1;
            new Thread(() -> {
                System.out.println("thread " + tNum + " start");
                Random random = new Random();
                try {
                    Thread.sleep(random.nextInt(10000) + 1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("thread " + tNum + " finish");
                countDownLatch.countDown();//每個線程執行完就會減一
            }).start();
        }
        //主線程啓動10個子線程後阻塞在await方法,須要等子線程都執行完畢,主線程才能喚醒繼續執行。
        try {
            countDownLatch.await();//當前線程被阻塞
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(threadNum + " thread finish");
    }

CountDownLatch的源碼分析

CountDownLatch和ReentrantLock同樣,內部使用Sync繼承AQS。Sync的構造函數接收了計數值並設置爲state。咱們知道AQS的state是一個由子類決定含義的「狀態」。對於ReentrantLock來講,state是線程獲取鎖的次數;對於CountDownLatch來講,則表示計數值的大小。併發

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}
Sync(int count) {
    setState(count);//設置state,表示計數值的大小
}

CountDownLatch的await方法

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
    if (Thread.interrupted())//判斷當前線程是否被中斷
        throw new InterruptedException();
    //tryAcquireShared:是否須要阻塞當前線程
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);//由CountDownLatch實現阻塞當前線程的邏輯。
}
/**
* tryAcquireShared方法其實就是判斷一下當前計數器的值,是否爲0。
* 爲0則返回1:表示當前線程不會阻塞在countDownLatch.await()
*/
protected int tryAcquireShared(int acquires) {
   return (getState() == 0) ? 1 : -1;
}
private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);//建立共享的node節點,並加入等待隊列
    boolean failed = true;
    try {
        for (;;) {//死循環
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);//判斷state是否爲0
                if (r >= 0) {
                    setHeadAndPropagate(node, r);//設置本身爲head,並通知其餘等待的線程
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

以上對於countDownLatch.await()方法是怎麼「阻塞」當前線程的咱們已經很明白了,即當咱們調用了countDownLatch.await()方法後,你當前線程就會進入了一個死循環當中,在這個死循環裏面,會不斷的進行判斷,經過調用tryAcquireShared方法,不斷判斷咱們上面說的那個計數器,看看它的值是否爲0了(爲0的時候,其實就是咱們調用了足夠屢次數的countDownLatch.countDown()方法的時候),若是是爲0的話,tryAcquireShared就會返回1,代碼會執行if(r>=0){...},而後跳出了循環,也就再也不「阻塞」當前線程了。須要注意的是,說是在不停的循環,其實也並不是在不停的執行for循環裏面的內容,由於在後面調用parkAndCheckInterrupt()方法時,在這個方法裏面是會調用 LockSupport.park(this);,來禁用當前線程的。dom

CountDownLatch的countDown方法

//釋放鎖的操做,每調用一次,計數值減小1
public void countDown() {
    sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {//嘗試釋放鎖,具體實如今CountDownLatch中
        doReleaseShared();
        return true;
    }
    return false;
}
protected boolean tryReleaseShared(int releases) {
    for (;;) {
        int c = getState();
        if (c == 0)
            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))//state的減1操做
            return nextc == 0;//當計數值等於0,表明全部子線程都執行完畢,被await阻塞的線程就能夠被喚醒
    }
}
//喚醒被await阻塞的線程
private void doReleaseShared() {
   for (;;) {
       Node h = head;
       if (h != null && h != tail) {
           int ws = h.waitStatus;
           if (ws == Node.SIGNAL) {
               //頭節點狀態若是SIGNAL,則狀態重置爲0,
               if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue;
               unparkSuccessor(h);//喚醒下個節點
           }
           //被喚醒的節點狀態會重置成0,在下一次循環中被設置成PROPAGATE狀態,
           //表明狀態要向後傳播。
           else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
               continue;
       }
       if (h == head)
           break;
   }
}
private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0) compareAndSetWaitStatus(node, ws, 0);//設置非CANCELLED節點的等待狀態爲0
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);//喚醒節點
}
private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head;
    setHead(node);

    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())//若是節點爲共享類型
            doReleaseShared();//釋放,下個節點被喚醒後,再次喚醒後續的等待節點,達到共享狀態向後傳播。
    }
}

參考地址:函數

相關文章
相關標籤/搜索