java併發編程學習之再談CountDownLatch

以前講了AQS的獨佔鎖的源碼,這邊,講一下另一個鎖的實現,共享鎖,以CountDownLatch爲例。node

源碼分析

構造方法

Sync方法是內部內的方法,跟以前ReentrantLock同樣。構造方法須要傳入一個不小於0的整數,用於賦值給state。當其餘線程調用countDown方法的時候,state的值就減一。減到0的時候,其餘調用await方法將被喚醒。await方法能夠被多個線程調用,調用的時候,就進入了阻塞狀態,直至state爲0。後面重點講countDown和await方法。segmentfault

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}
Sync(int count) {
    setState(count);
}

await

咱們先看看阻塞的方法,此時要等state爲0的時候,才被喚醒。oop

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
    if (Thread.interrupted())//中斷狀況,拋出異常
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);//state不爲0的狀況下
}
protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;//當前狀態爲0,返回1
}
private void doAcquireSharedInterruptibly(int arg)//獲取共享鎖,而且可被中斷
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);//這個跟以前不一樣的是Node.SHARED,加入到隊列,不在說明
    boolean failed = true;
    try {
        for (;;) {//自旋
            final Node p = node.predecessor();//獲取前面節點
            if (p == head) {
                int r = tryAcquireShared(arg);//嘗試獲取鎖
                if (r >= 0) {//state爲0的狀況下
                    setHeadAndPropagate(node, r);//這個方法下面講
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())//這部分同以前,掛起
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

countDown

public void countDown() {
    sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();//爲true,喚醒
        return true;
    }
    return false;
}
//用自旋使status-1
protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        if (c == 0)
            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}
private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {//有阻塞隊列,而且頭節點不是尾節點
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))//經過cas把頭節點的waitStatus設置爲0
                    continue;            // loop to recheck cases不成功從新設置
                unparkSuccessor(h);//喚醒下一個節點,以前的內容
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;//
    }
}

剛剛wait的時候,doAcquireSharedInterruptibly中若是沒獲取到鎖,就掛起。如今status爲0,就喚醒他,繼續自旋,次數看下面的代碼源碼分析

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below獲取頭結點
    setHead(node);//當前節點設置爲頭節點
    
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;//獲取下一個節點
        if (s == null || s.isShared())
            doReleaseShared();//喚醒下一個節點
    }
}

其餘

CountDownLatch,實際上就是經過先設置state,再遞減,等於0的時候,喚醒其餘線程。
CyclicBarrier,也是先經過設置state,每一個await遞減,state不等於0的時候放入Condition,等於0的時候喚醒。
Semaphore,也是先經過設置state,每次被獲取state-1,釋放+1,等於0就等待,大於0就喚醒ui

相關文章
相關標籤/搜索