JDK源碼分析-AbstractQueuedSynchronizer(3)

本文首發於微信公衆號【WriteOnRead】,歡迎關注。

1. 概述

前文「JDK源碼分析-AbstractQueuedSynchronizer(2)」分析了 AQS 在獨佔模式下獲取資源的流程,本文分析共享模式下的相關操做。java

其實兩者的操做大部分是相似的,理解了前面對獨佔模式的分析,再分析共享模式就相對容易了。node

2. 共享模式

2.1 方法概述

與獨佔模式相似,共享模式下也有與之相似的相應操做,分別以下:微信

  1. acquireShared(int arg): 以共享模式獲取資源,忽略中斷;
  2. acquireSharedInterruptibly(int arg): 以共享模式獲取資源,響應中斷;
  3. tryAcquireSharedNanos(int arg, long nanosTimeout): 以共享模式獲取資源,響應中斷,且有超時等待;
  4. releaseShared(int arg): 釋放資源,喚醒後繼節點,並確保傳播。

它們的操做與獨佔模式也比較相似,下面具體分析。併發

2.2 方法分析

2.2.1 共享模式獲取資源(忽略中斷)

acquireShared:app

public final void acquireShared(int arg) {
    // 返回值小於 0,表示獲取失敗
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

// 嘗試以共享模式獲取資源(返回值爲 int 類型)
protected int tryAcquireShared(int arg) {
    throw new UnsupportedOperationException();
}

與獨佔模式的 tryAcquire 方法相似,tryAcquireShared 方法在 AQS 中也拋出異常,由子類實現其邏輯。工具

不一樣的地方在於,tryAcquire 方法的返回結果是 boolean 類型,表示獲取成功與否;而 tryAcquireShared 的返回結果是 int 類型,分別爲:oop

  1. 負數:表示獲取失敗;
  2. 零:表示獲取成功,但後續共享模式的獲取會失敗;
  3. 正數:表示獲取成功,後續共享模式的獲取可能會成功(須要進行檢測)。

若 tryAcquireShared 獲取成功,則直接返回;不然執行 doAcquireShared 方法:源碼分析

private void doAcquireShared(int arg) {
    // 把當前線程封裝成共享模式的 Node 節點,插入主隊列末尾
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        // 中斷標誌位
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            // 若前驅節點爲頭節點,則嘗試獲取資源
            if (p == head) {
                int r = tryAcquireShared(arg);
                // 這裏表示當前線程成功獲取到了資源
                if (r >= 0) {
                    // 設置頭節點,並傳播狀態(注意這裏與獨佔模式不一樣)
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            // 是否應該休眠(與獨佔模式相同,再也不贅述)
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            // 取消操做(與獨佔模式相同)
            cancelAcquire(node);
    }
}

doAcquireShared 方法會把當前線程封裝成一個共享模式(SHARED)的節點,並插入主隊列末尾。addWaiter(Node mode) 方法前文已經分析過,再也不贅述。ui

該方法與 acquireQueued 方法的區別在於 setHeadAndPropagate 方法,把當前節點設置爲頭節點以後,還會有傳播(propagate)行爲:this

private void setHeadAndPropagate(Node node, int propagate) {
    // 記錄舊的頭節點
    Node h = head; // Record old head for check below
    // 將 node 設置爲頭節點
    setHead(node);
    /*
     * Try to signal next queued node if:
     *   Propagation was indicated by caller,
     *     or was recorded (as h.waitStatus either before
     *     or after setHead) by a previous operation
     *     (note: this uses sign-check of waitStatus because
     *      PROPAGATE status may transition to SIGNAL.)
     * and
     *   The next node is waiting in shared mode,
     *     or we don't know, because it appears null
     *
     * The conservatism in both of these checks may cause
     * unnecessary wake-ups, but only when there are multiple
     * racing acquires/releases, so most need signals now or soon
     * anyway.
     */
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        // 後繼節點爲空或共享模式喚醒
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

doReleaseShared:

private void doReleaseShared() {
    /*
     * Ensure that a release propagates, even if there are other
     * in-progress acquires/releases.  This proceeds in the usual
     * way of trying to unparkSuccessor of head if it needs
     * signal. But if it does not, status is set to PROPAGATE to
     * ensure that upon release, propagation continues.
     * Additionally, we must loop in case a new node is added
     * while we are doing this. Also, unlike other uses of
     * unparkSuccessor, we need to know if CAS to reset status
     * fails, if so rechecking.
     */
    for (;;) {
        // 這裏的頭節點已是上面設置後的頭節點了
        Node h = head;
        // 因爲該方法有兩個入口(setHeadAndPropagate 和 releaseShared),需考慮併發控制
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                // 喚醒後繼節點
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        // 若頭節點不變,則跳出循環;不然繼續循環
        if (h == head)                   // loop if head changed
            break;
    }
}

該方法與獨佔模式下的獲取方法 acquire 大致類似,不一樣在於該方法中,節點獲取資源後會傳播狀態,即,有可能會繼續喚醒後繼節點。值得注意的是:該方法有兩個入口 setHeadAndPropagate 和 releaseShared,可能有多個線程操做,需考慮併發控制。

此外,本人對於將節點設置爲 PROPAGATE 狀態的理解還不是很清晰,網上說法也不止一種,待後續研究明白再補充。

2.2.2 以共享模式獲取資源(響應中斷)

該方法與 acquireShared 相似:

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

tryAcquireShared 方法前面已分析,若獲取資源失敗,則會執行 doAcquireSharedInterruptly 方法:

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    // 把當前線程封裝成共享模式節點,並插入主隊列
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            // 與 doAcquireShared 相比,區別在於這裏拋出了異常
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

從代碼能夠看到,acquireSharedInterruptibly 方法與 acquireShared 方法幾乎徹底同樣,不一樣之處僅在於前者會拋出 InterruptedException 異常響應中斷;然後者僅記錄標誌位,獲取結束後才響應。

2.2.3 以共享模式獲取資源(響應中斷,且有超時)

代碼以下(該方法可與前文獨佔模式下的超時獲取方法比較分析):

public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquireShared(arg) >= 0 ||
        doAcquireSharedNanos(arg, nanosTimeout);
}

doAcquireSharedNanos:

private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
            }
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L)
                return false;
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

該方法可與獨佔模式下的超時等待方法 tryAcquireNanos(int arg, long nanosTimeout) 進行對比,兩者操做基本一致,再也不詳細分析。

2.2.4 釋放資源,喚醒節點,傳播狀態

以下:

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

tryReleaseShared:

protected boolean tryReleaseShared(int arg) {
    throw new UnsupportedOperationException();
}

doReleaseShared() 方法前面已經分析過了。本方法與獨佔模式的 release 方法相似,不一樣的地方在於「傳播」二字。

3. 場景分析

爲了便於理解獨佔模式和共享模式下隊列和節點的狀態,下面簡要舉例分析。

場景以下:有 T0~T4 共 5 個線程按前後順序獲取資源,其中 T2 和 T3 爲共享模式,其餘均爲獨佔模式。

就此場景分析:T0 先獲取到資源(假設佔用時間較長),然後 T1~T4 再獲取則失敗,會依次進入主隊列。此時主隊列中各個節點的狀態示意圖以下:

image.png

以後,T0 操做完畢並釋放資源,會將 T1 喚醒。T1(獨佔模式) 會從 acquireQueued(final Node node, int arg) 方法的循環中繼續獲取資源,這時會獲取成功,並將 T1 設置爲頭節點(T 被移除)。此時主隊列節點示意圖以下:

image.png

此時,T1 獲取到資源並進行相關操做。

然後,T1 操做完釋放資源,並喚醒下一個節點 T2,T2(共享模式) 繼續從 doAcquireShared(int) 方法的循環中執行。此時 T2 獲取資源成功,將自身設爲頭節點(T1 被移除),因爲後繼節點 T3 也是共享模式,所以 T1 會繼續喚醒 T3;T3 喚醒後的操做與 T2 相同,但後繼節點 T4 不是共享模式,所以再也不繼續喚醒。此時隊列節點狀態示意圖以下:

image.png

此時,T2 和 T3 同時獲取到資源。

以後,當兩者都釋放資源後會喚醒 T4:

image.png

T4 獲取資源的與 T1 相似。

PS: 該場景僅供參考,只爲便於理解,如有不當之處敬請指正。

4. 小結

本文分析了以共享模式獲取資源的三種方式,以及釋放資源的操做。分別爲:

  1. acquireShared: 共享模式獲取資源,忽略中斷;
  2. acquireSharedInterruptibly: 共享模式獲取資源,響應中斷;
  3. tryAcquireSharedNanos: 共享模式獲取資源,響應中斷,有超時;
  4. releaseShared: 釋放資源,喚醒後繼節點,並確保傳播。

並簡要分析一個場景下主隊列中各個節點的狀態。此外,AQS 中還有嵌套類 ConditionObject 及條件隊列的相關操做,後面涉及到的時候再進行分析。

單獨去分析 AQS 的源碼比較枯燥,後文會結合 ReentrantLock、CountdownLatch 等經常使用併發工具類的源碼進行分析。

上述解析是參考其餘資料及我的理解,如有不當之處歡迎指正。

相關閱讀:

JDK源碼分析-AbstractQueuedSynchronizer(1)

JDK源碼分析-AbstractQueuedSynchronizer(2)

image.png

相關文章
相關標籤/搜索