CountDownLatch理解

  CountDownLatch使用方法很是簡單,主要就是兩個方法,await()方法和countDown()方法,await()方法會使線程阻塞。countDown()會將線程同步狀態減1,當同步狀態爲0使喚醒線程。node

仍是經過源碼來理解這個類。oop

public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }
public class CountDownLatch {
    /**
     * Synchronization control For CountDownLatch.
     * Uses AQS state to represent count.
     */
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }

        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

與重入鎖同樣,也是有一個內部Sync類。從代碼中能夠看出,這個類得初始化須要設置一個大於0得count,這個count用來標記線程的同步狀態。構造方法就很少說了,這裏主要講最重要的await()和countDown()方法。ui

public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
       // 判斷線程是否處於阻塞狀態,若是是,拋出異常
throw new InterruptedException(); if (tryAcquireShared(arg) < 0) //內部Sync類中的方法,線程同步狀態爲0,返回1,不然返回-1 doAcquireSharedInterruptibly(arg); }

而後咱們再來看doAcquireSharedInterruptibly(arg)這個方法this

private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
     //生成一個共享模式的新節點,而且將這個節點添加到同步隊列的隊尾
final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) {
          //找到這個節點的前一個節點
final Node p = node.predecessor(); if (p == head) {
            // 若是前一個節點是head節點,也就是說這個節點處於同步隊列的最前面
int r = tryAcquireShared(arg); if (r >= 0) {
              // 查詢線程的同步狀態,若是大於等於0,將這個節點置爲head節點。而且喚醒這個線程 setHeadAndPropagate(node, r); p.next
= null; // help GC failed = false; return; } }
// 判斷是否須要將當前線程阻塞
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed)
// 若是失敗,將這個節點從同步隊列移除 cancelAcquire(node); } }

這裏涉及到了三個方法,setHeadAndPropagate(),shouldPardAfterFailedAcquire()和cancelAcquire()方法,下面一一介紹。不過在這以前先要說一下Node的幾種等待狀態,他方便後面理解代碼。spa

(1)CANCELLED:值爲1,因爲在同步隊列中等待的線程等待超時或被中斷,須要從同步隊列中取消等待,節點進入該狀態將不會變化線程

(2)SINGAL:值爲-1,後繼節點的線程處於等待狀態,當前節點若是釋放了同步狀態,將會通知後繼節點,使後繼節點得以運行debug

(3)CONDITION:值爲-2,節點在等待隊列中(這個在Condition的博客裏會講到),節點線程等待在Condition上,當其餘線程對Condition調用了singal後,該節點會從等待隊列轉移到同步隊列,加入到對同步狀態的獲取中去code

(4)PROPAGEATE:值爲-3,表示下一次共享式同步狀態獲取將會無條件被傳播下去blog

private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        // 將這個節點設置爲head節點
     setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared())
// 喚醒處於同步隊列最前面的線程,這個方法後面再說 doReleaseShared(); } }
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            return true;
        if (ws > 0) {
// 若是等待狀態大於0,也就是處於CANCELLED狀態,這裏的循環是爲了過濾同步狀態中等待狀態爲CANCELLED的節點 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else {
// CAS操做,將前置節點的等待狀態設置爲SIGNAL compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
private void cancelAcquire(Node node) {
        // Ignore if node doesn't exist
        if (node == null)
            return;

        node.thread = null;
        // 聲明node的前置節點pred
        Node pred = node.prev;
// 過濾掉同步隊列中等待狀態爲CANCELLED的節點
while (pred.waitStatus > 0) node.prev = pred = pred.prev; // 聲明pred的下一個節點predNext Node predNext = pred.next; node.waitStatus = Node.CANCELLED;
//若是node爲尾節點,設置pred爲tail節點,而且設置pred的next節點爲null
if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0)
// 設置node節點的下一個節點爲pred的下一個節點 compareAndSetNext(pred, predNext, next); }
else {
//喚醒線程 unparkSuccessor(node); } node.next
= node; // help GC } }

整個await()方法的全部代碼就是這些。若是認真看源代碼而且仔細梳理的話會發現其實也沒那麼難懂,下面介紹countDown()方法。隊列

public void countDown() {
        sync.releaseShared(1);
    }
public final boolean releaseShared(int arg) {
//線程同步狀態-1,若是狀態爲0返回true,不然返回false
if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }

相信看代碼已經很清楚了,那麼咱們再來看doReleaseShared()這個方法。

private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
// 若是等待狀態爲SIGNAL,將head節點的等待狀態改成0,若是成功,喚醒線程
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS }
// 若是執行了unparkSuccessor(h)方法,head節點會變化,這樣下面的h == head就不成立了,就會繼續執行循環
if (h == head) // loop if head changed break; } }

整個CountDownLatch類,我斷斷續續看了幾天纔看完,主要是開始看AQS這個類感受比較繞,就看的不是很認真,比較浮躁。其實若是真正用心,跟着代碼一步一步去走,一步一步去理解也沒那麼難,而後經過debug跟着代碼去走,查看每一步作了什麼,變量,屬性的變化,這樣有助於深入理解代碼。

相關文章
相關標籤/搜索