閱讀 JDK 源碼:AQS 對 Condition 的實現

前兩篇文章分別介紹了 AQS 框架中的獨佔模式和共享模式,本篇將介紹 AQS 對 Condition 接口的實現。
在閱讀本篇以前,建議先了解 AQS 中的數據結構和獨佔模式的實現原理java

JUC 經過 Lock 和 Condition 兩個接口實現管程(Monitor),其中 Lock 用於解決互斥問題,而 Condition 用於解決同步問題,而 AQS 對 Lock 和 Condition 接口的實現提供了一個基礎的框架。node

本文基於 jdk1.8.0_91

1. Condition 接口

Lock 替代了 synchronized 方法和語句的使用,Condition 替代了 Object 監視器方法的使用。 c#

Condition 實現能夠提供不一樣於 Object 監視器方法的行爲和語義,好比受保證的通知排序,或者在執行通知時不須要保持一個鎖。
若是某個實現提供了這樣特殊的語義,則該實現必須記錄這些語義。 segmentfault

方法摘要:數組

// 形成當前線程在接到信號或被中斷以前一直處於等待狀態。 
void await() 

// 形成當前線程在接到信號、被中斷或到達指定等待時間以前一直處於等待狀態。 
boolean await(long time, TimeUnit unit) 

// 形成當前線程在接到信號、被中斷或到達指定等待時間以前一直處於等待狀態。
long awaitNanos(long nanosTimeout) 

// 形成當前線程在接到信號以前一直處於等待狀態。
void awaitUninterruptibly() 

// 形成當前線程在接到信號、被中斷或到達指定最後期限以前一直處於等待狀態。 
boolean awaitUntil(Date deadline) 

// 喚醒一個等待線程。
void signal() 

// 喚醒全部等待線程。
void signalAll()

Condition 本質上是一個隊列(稱爲條件隊列),線程等待某個條件成立時,在隊列中阻塞,直到其餘線程檢查條件成立後來通知它。
對於同一個鎖,只會存在一個同步隊列,可是可能會有多個條件隊列,只有在使用了 Condition 纔會存在條件隊列。數據結構

AQS 中對條件隊列的使用:框架

當線程獲取鎖以後,執行 Condition.await() 會釋放鎖並進入條件隊列,阻塞等待直到被其餘線程喚醒。
當其餘線程執行 Condition.signal() 喚醒當前線程時,當前線程會從條件隊列轉移到同步隊列來等待再次獲取鎖。
當前線程再一次獲取鎖以後,須要在 while 循環中判斷條件是否成立,若不成立需從新執行 Condition.await() 去等待。less

2. Condition 使用

Condition 實例實質上被綁定到一個鎖上。要爲特定 Lock 實例得到 Condition 實例,請使用其 newCondition() 方法。 ui

Java 官方文檔提供 Condition 接口的使用示例:this

對於一個有界阻塞數組,當數組非滿時才能夠往數組中存放數據,不然阻塞;當數據非空時才能夠往數組中取元素,不然阻塞。

class BoundedBuffer {
    final Lock lock = new ReentrantLock();
    final Condition notFull = lock.newCondition();
    final Condition notEmpty = lock.newCondition();

    final Object[] items = new Object[100];
    int putptr, takeptr, count;

    // 生產者方法,往數組裏面寫數據
    public void put(Object x) throws InterruptedException {
        lock.lock();
        try {
            while (count == items.length)
                notFull.await(); // 阻塞直到非滿
            items[putptr] = x;
            if (++putptr == items.length) putptr = 0;
            ++count;
            notEmpty.signal(); // 通知非空
        } finally {
            lock.unlock();
        }
    }

    // 消費者方法,從數組裏面拿數據
    public Object take() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0)
                notEmpty.await(); // 阻塞直到非空
            Object x = items[takeptr];
            if (++takeptr == items.length) takeptr = 0;
            --count;
            notFull.signal(); // 通知非滿
            return x;
        } finally {
            lock.unlock();
        }
    }
}

在 JDK 的實現中,獨佔模式纔可以使用 Condition,共享模式不支持 Condition。
由於 AQS 的內部類 ConditionObject 只支持獨佔模式。

java.util.concurrent.locks.ReentrantLock.Sync#newCondition

final ConditionObject newCondition() {
    return new ConditionObject();
}

java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock#newCondition

public Condition newCondition() {
    return sync.newCondition();
}

java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock#newCondition

/**
 * Throws {@code UnsupportedOperationException} because
 * {@code ReadLocks} do not support conditions.
 *
 * @throws UnsupportedOperationException always
 */
public Condition newCondition() {
    throw new UnsupportedOperationException();
}

3. Condition 等待方法

3.1 Condition#await

代碼流程:

  1. 判斷線程是否被中斷,若是是,直接拋出InterruptedException,不然進入下一步
  2. 將當前線程封裝爲節點,存入條件隊列。
  3. 釋放當前線程已得到的所有的鎖,若無持有鎖則拋異常。
  4. 在條件隊列中,阻塞當前節點。
  5. 當前節點從阻塞中被喚醒(signalled or interrupted),則會從條件隊列轉移到同步隊列(被動或主動地)。
  6. 在同步隊列中,自旋、阻塞等待獲取鎖成功。
  7. 判斷整個過程當中是否發生過中斷,進行不一樣的處理(拋異常 或 從新中斷)。

java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#await()

/**
 * Implements interruptible condition wait.
 * <ol>
 * <li> If current thread is interrupted, throw InterruptedException.
 * <li> Save lock state returned by {@link #getState}.
 * <li> Invoke {@link #release} with saved state as argument,
 *      throwing IllegalMonitorStateException if it fails.
 * <li> Block until signalled or interrupted.
 * <li> Reacquire by invoking specialized version of
 *      {@link #acquire} with saved state as argument.
 * <li> If interrupted while blocked in step 4, throw InterruptedException.
 * </ol>
 */
public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();    // 將當前線程封裝成節點存入條件隊列
    int savedState = fullyRelease(node); // 釋放已經持有的鎖(就是在調用 Condition#await 以前持有的 Lock#lock 鎖),並返回釋放前的鎖狀態
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {       // 檢查節點是否在同步隊列上
        LockSupport.park(this);          // 節點還在條件隊列中,則阻塞   
        // 節點從阻塞中被喚醒(condition#signal,Thread#interrupt),檢查中斷狀態,設置中斷處理模式
        // 補充:被 condition#signal 喚醒後的線程會從條件隊列轉移到同步隊列(先出隊再入隊)
        // 補充:若在條件隊列中就發生了中斷,也會被轉移到同步隊列(不出隊,只入隊,見 checkInterruptWhileWaiting -> transferAfterCancelledWait)
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)  
            break;                                                    
    }                                                                 
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE) // 在同步隊列等待獲取資源直到成功,判斷設置中斷處理模式
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null)  // clean up if cancelled 
        // nextWaiter不爲空,說明當前節點是由 Thread#interrupt 喚醒的(condition#signal 喚醒阻塞節點會設置nextWaiter爲空)
        // 此時當前節點同時存在於同步隊列、條件隊列上!可是 waitStatus 不是 CONDITION
        // 須要清除條件隊列中已取消的節點
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode); // 處理中斷:拋異常,或者補上中斷狀態
}

注意:

  1. 線程被喚醒,多是執行了 Condition#signal(其中執行 LockSupport#unpark 來喚醒條件隊列的頭節點),也多是調用了 Thread#interrupt (會更新線程的中斷標識)。
  2. 若是是由 Condition#signal 喚醒的,則當前節點被喚醒後,已經位於同步隊列。
  3. 若是是由 Thread#interrupt 喚醒的,則當前節點被喚醒後,須要判斷是位於同步隊列仍是條件隊列。
    3.1 若是是位於同步隊列,說明是先獲得 Condition#signal 通知,再被 Thread#interrupt 中斷。
    3.2 若是是位於條件隊列,說明未獲得 Condition#signal 通知就被 Thread#interrupt 中斷了,須要自行加入到同步隊列中,再從條件隊列中移除。
  4. 當前節點從條件隊列轉移到同步隊列的過程當中,發生了中斷,該節點依舊會在同步隊列中自旋、阻塞直到獲取鎖,再響應中斷(拋異常或從新中斷)。

3.1.1 addConditionWaiter

將當前線程封裝爲節點(waitStatus 爲 CONDITION),添加到條件隊列尾部。
若條件隊列不存在則進行初始化,把當前節點做爲頭節點(不使用 dummy node)。

java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#addConditionWaiter

/**
 * Adds a new waiter to wait queue.
 * @return its new wait node
 */
private Node addConditionWaiter() {
    Node t = lastWaiter;
    // If lastWaiter is cancelled, clean out. // 清理條件隊列中已取消的尾節點
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    Node node = new Node(Thread.currentThread(), Node.CONDITION); // 構建節點,尾插法
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

3.1.2 fullyRelease

釋放當前線程已持有的鎖/資源,返回釋放以前的鎖/資源。
若未持有鎖,報錯。

這裏存在 BUG:報錯以前,當前線程已經加入到條件隊列之中了,會致使條件隊列存儲無效的節點數據。
應該將是否持有鎖的校驗提早到 addConditionWaiter 以前,JDK 11 中已修復該問題。

java.util.concurrent.locks.AbstractQueuedSynchronizer#fullyRelease

final int fullyRelease(Node node) { // 釋放當前線程已持有的鎖
    boolean failed = true;
    try {
        int savedState = getState(); // 獲取 volatile 的 state,獨佔模式下表示當前線程鎖持有的所有鎖
        if (release(savedState)) {   // 釋放所有的鎖
            failed = false;
            return savedState;
        } else {
            throw new IllegalMonitorStateException(); // 未持有鎖,報錯
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}

3.1.3 isOnSyncQueue

判斷節點是否在同步隊列上。

java.util.concurrent.locks.AbstractQueuedSynchronizer#isOnSyncQueue

/**
 * Returns true if a node, always one that was initially placed on
 * a condition queue, is now waiting to reacquire on sync queue.
 * @param node the node
 * @return true if is reacquiring
 */
final boolean isOnSyncQueue(Node node) {
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    if (node.next != null) // If has successor, it must be on queue
        return true;
    return findNodeFromTail(node); // 從尾節點向前遍歷查找
}

/**
 * Returns true if node is on sync queue by searching backwards from tail.
 * Called only when needed by isOnSyncQueue.
 * @return true if present
 */
private boolean findNodeFromTail(Node node) {
    Node t = tail;
    for (;;) {
        if (t == node)
            return true;
        if (t == null)
            return false;
        t = t.prev;
    }
}

1. 若是 waitStatus == CONDITION 說明必定是位於條件隊列上。

從條件隊列入隊,構造節點的時候默認就爲 CONDITION 狀態。
將節點從條件隊列轉移到同步隊列,首先會 CAS 設置 waitStatus 狀態爲 CONDITION,再執行入隊操做。

2. node.prev == null 說明必定是位於條件隊列上。

同步隊列只有頭節點符合 node.prev == null,可是同步隊列的頭節點是 dummy node,其 thread 爲空。
也就是說,來調用 isOnSyncQueue 方法且符合 node.prev == null 條件的節點,只多是位於條件隊列上的節點。

3. 若是 node.next != null 說明必定是處於同步隊列上。

節點加入同步隊列是個複合操做,最後一步是設置 node.next,當 node.next != null 說明入隊操做已執行完成。

4. 若是以上都沒法判斷節點是否位於同步隊列,則遍歷鏈表查找節點。

存在 node.prev != null 可是節點尚未徹底入隊成功的狀況,由於入隊操做設置 prev -> tail -> next 是非原子操做。
因此須要從 tail 向前遍歷,才能準確判斷 node 是否位於同步隊列上。
調用 findNodeFromTail 方法前,節點通常位於尾節點附近,不會遍歷過多節點。

3.1.4 checkInterruptWhileWaiting

阻塞在 Condition#await 的線程被喚醒以後,調用 checkInterruptWhileWaiting 來檢查是不是由線程中斷喚醒的。

若是是由線程中斷喚醒的,須要進一步判斷如何處理中斷:

  1. THROW_IE:throw new InterruptedException();
  2. REINTERRUPT:Thread.currentThread().interrupt();

java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#checkInterruptWhileWaiting

/**
 * Checks for interrupt, returning THROW_IE if interrupted
 * before signalled, REINTERRUPT if after signalled, or
 * 0 if not interrupted.
 */
private int checkInterruptWhileWaiting(Node node) {
    // 校驗當前線程是否被中斷,並清除線程的中斷狀態
    return Thread.interrupted() ? 
        (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 
        0; // 若是線程未被中斷,返回0
}

3.1.5 transferAfterCancelledWait

若是阻塞在 Condition#await 的線程是被中斷喚醒的,執行 transferAfterCancelledWait 判斷髮生中斷髮生時節點所在的位置。
若是是位於條件隊列,則將其添加到同步隊列,返回 true;不然返回 false。

如何判斷中斷髮生時節點所在的位置?

  1. 已知 Condition#signal 方法會修改狀態(CONDITION -> 0),並操做節點從條件隊列出隊,從同步隊列入隊。
  2. 若是 Condition#await 執行 CAS 修改狀態成功(CONDITION -> 0),說明線程中斷髮生時 Condition#signal 還沒執行,此時節點是位於條件隊列,須要將節點加入同步隊列。
  3. 若是 Condition#await 執行 CAS 修改狀態失敗(CONDITION -> 0),說明線程中斷髮生時 Condition#signal 已經執行,當前線程須要自旋等待 Condition#signal 執行完。

java.util.concurrent.locks.AbstractQueuedSynchronizer#transferAfterCancelledWait

/**
 * Transfers node, if necessary, to sync queue after a cancelled wait.
 * Returns true if thread was cancelled before being signalled.
 *
 * @param node the node
 * @return true if cancelled before the node was signalled
 */
final boolean transferAfterCancelledWait(Node node) {
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { // 經過 CAS 成功與否來判斷節點位置
        enq(node); // 若是CAS成功,說明節點是位於條件隊列,須要將它添加到同步隊列
        return true;
    }
    /*
     * If we lost out to a signal(), then we can't proceed
     * until it finishes its enq().  Cancelling during an
     * incomplete transfer is both rare and transient, so just
     * spin.
     */                          // 條件隊列上的節點獲得通知(Condition#signal)以後,會添加到同步隊列中去。
    while (!isOnSyncQueue(node)) // 這裏循環檢測,直到確認節點已經成功添加到同步隊列中。
        Thread.yield();
    return false;
}

3.1.6 unlinkCancelledWaiters

在 Condition#await 方法中,當線程從阻塞中被線程中斷喚醒後,判斷節點是位於條件隊列中,除了將節點加入同步隊列以外,還須要將節點從條件隊列中移除。

官方的說明:

  1. 持有鎖時纔可調用該方法。
  2. 當前線程在 Condition#wait 中阻塞,在被 Condition#signal 喚醒以前,由線程中斷或等待超時喚醒。此時須要調用該方法清除條件隊列中的無效節點。
  3. 儘管該方法會遍歷整個隊列,可是隻有在 Condition#signal 沒有執行以前發生中斷或取消纔會調用。
  4. 該方法會遍歷整個條件隊列,一次性把全部無效節點都清除掉,當隊列中短期出現大量無效節點時(cancellation storms)可避免屢次遍歷隊列。

java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#unlinkCancelledWaiters

/**
 * Unlinks cancelled waiter nodes from condition queue.
 * Called only while holding lock. This is called when
 * cancellation occurred during condition wait, and upon
 * insertion of a new waiter when lastWaiter is seen to have
 * been cancelled. This method is needed to avoid garbage
 * retention in the absence of signals. So even though it may
 * require a full traversal, it comes into play only when
 * timeouts or cancellations occur in the absence of
 * signals. It traverses all nodes rather than stopping at a
 * particular target to unlink all pointers to garbage nodes
 * without requiring many re-traversals during cancellation
 * storms.
 */ 
private void unlinkCancelledWaiters() { // 清除條件隊列中狀態不爲CONDITION的節點
    Node t = firstWaiter; // 遊標節點,記錄當前遍歷的節點
    Node trail = null;    // 遊標節點,記錄遍歷過的最後一個有效節點(狀態爲CONDITION)
    while (t != null) {   // 從條件隊列的頭節點開始遍歷(下面註釋用next表明下一個節點)
        Node next = t.nextWaiter;
        if (t.waitStatus != Node.CONDITION) { // 當前t爲無效節點
            t.nextWaiter = null;
            if (trail == null)           // 首次遍歷到t爲有效節點時,纔會初始化trail
                firstWaiter = next;      // 設置t.next爲新的頭節點(下一次循環校驗t.next:若t.next無效,則把t.next.next設爲新的頭節點)
            else
                trail.nextWaiter = next; // 設置trail.next爲t.next(把t出隊,下一次循環校驗t.next:若t.next無效,則把t.next.next設爲trail.next)
            if (next == null)
                lastWaiter = trail;      // 設置trail爲新的尾節點
        }
        else      // 當前t爲有效節點
            trail = t;
        t = next; // 繼續遍歷t.next
    }
}

3.1.7 reportInterruptAfterWait

Condition#await 執行到最後,從阻塞中被喚醒且從新取得鎖,判斷 interruptMode != 0,即 Condition#await 整個過程當中發生過中斷,須要對中斷進行統一處理。

具體見設置 interruptMode 的代碼:checkInterruptWhileWaiting

  1. 若是在獲得通知以前被中斷(在條件隊列中),返回 THROW_IE
  2. 若是在獲得通知以後被中斷(在同步隊列中),返回 REINTERRUPT

java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#reportInterruptAfterWait

/**
 * Throws InterruptedException, reinterrupts current thread, or
 * does nothing, depending on mode.
 */
private void reportInterruptAfterWait(int interruptMode)
    throws InterruptedException {
    if (interruptMode == THROW_IE)
        throw new InterruptedException();
    else if (interruptMode == REINTERRUPT)
        selfInterrupt();
}

java.util.concurrent.locks.AbstractQueuedSynchronizer#selfInterrupt

/**
 * Convenience method to interrupt current thread.
 */
static void selfInterrupt() {
    Thread.currentThread().interrupt();
}

3.2 Condition#awaitNanos

在 Condition 條件上阻塞,具備超時時間。

  1. Condition#await:Block until signalled or interrupted.
  2. Condition#awaitNanos:Block until signalled, interrupted, or timed out.

java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#awaitNanos

/**
 * Implements timed condition wait.
 * <ol>
 * <li> If current thread is interrupted, throw InterruptedException.
 * <li> Save lock state returned by {@link #getState}.
 * <li> Invoke {@link #release} with saved state as argument,
 *      throwing IllegalMonitorStateException if it fails.
 * <li> Block until signalled, interrupted, or timed out.
 * <li> Reacquire by invoking specialized version of
 *      {@link #acquire} with saved state as argument.
 * <li> If interrupted while blocked in step 4, throw InterruptedException.
 * </ol>
 */
public final long awaitNanos(long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    final long deadline = System.nanoTime() + nanosTimeout; // 等待截止的時間戳
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        if (nanosTimeout <= 0L) { // 已超時,檢查節點所在位置,判斷是否把節點加入同步隊列
            transferAfterCancelledWait(node);
            break;
        }
        if (nanosTimeout >= spinForTimeoutThreshold) // 大於時間閾值,進行阻塞;小於時間閾值,進行自旋
            LockSupport.parkNanos(this, nanosTimeout);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) // 從阻塞中喚醒後,檢查是否發生中斷,若有中斷則結束自旋
            break;
        nanosTimeout = deadline - System.nanoTime(); // 剩餘等待時間
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
    return deadline - System.nanoTime(); // 返回剩餘的阻塞時間
}

java.util.concurrent.locks.AbstractQueuedSynchronizer#spinForTimeoutThreshold

/**
 * The number of nanoseconds for which it is faster to spin
 * rather than to use timed park. A rough estimate suffices
 * to improve responsiveness with very short timeouts.
 */
static final long spinForTimeoutThreshold = 1000L;

3.3 Condition#awaitUninterruptibly

在 Condition 條件上阻塞,只能被 Condition#signal 喚醒。

  1. Condition#await:Block until signalled or interrupted.
  2. Condition#awaitNanos:Block until signalled, interrupted, or timed out.
  3. Condition#awaitUninterruptibly:Block until signalled.

java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#awaitUninterruptibly

/**
 * Implements uninterruptible condition wait.
 * <ol>
 * <li> Save lock state returned by {@link #getState}.
 * <li> Invoke {@link #release} with saved state as argument,
 *      throwing IllegalMonitorStateException if it fails.
 * <li> Block until signalled.
 * <li> Reacquire by invoking specialized version of
 *      {@link #acquire} with saved state as argument.
 * </ol>
 */
public final void awaitUninterruptibly() {
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    boolean interrupted = false;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if (Thread.interrupted())
            interrupted = true;
    }
    if (acquireQueued(node, savedState) || interrupted)
        selfInterrupt();
}

4. Condition 通知方法

4.1 Condition#signal

喚醒在 Condition#await 上等待最久的線程。
把條件隊列的頭節點出隊,把它加入同步隊列,並喚醒節點中的線程。
被喚醒的線程從 Condition#await 中醒來後,執行 AbstractQueuedSynchronizer#acquireQueued 等待再次獲取鎖。

java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#signal

/**
 * Moves the longest-waiting thread, if one exists, from the
 * wait queue for this condition to the wait queue for the
 * owning lock.
 *
 * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
 *         returns {@code false}
 */
public final void signal() {
    if (!isHeldExclusively()) // 未持有獨佔鎖,報錯
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first); // 喚醒隊首節點(等待時間最長)
}

4.1.1 doSignal

  1. 找到合適的可喚醒的節點,通常是條件隊列的頭節點,將它從條件隊列出隊。
  2. 若是頭節點是無效節點,則出隊直到找到有效節點。

java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#doSignal

/**
 * Removes and transfers nodes until hit non-cancelled one or
 * null. Split out from signal in part to encourage compilers
 * to inline the case of no waiters.
 * @param first (non-null) the first node on condition queue
 */
private void doSignal(Node first) { // 把條件隊列的頭節點轉移到同步隊列
    do {
        if ( (firstWaiter = first.nextWaiter) == null) // 當前節點的後繼節點做爲新的頭節點(出隊),若爲空,說明隊列爲空
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&    // 把當前節點轉移到同步隊列(入隊),並喚醒節點上的線程(說明條件隊列的頭節點不是dummy node)
             (first = firstWaiter) != null); // 轉移失敗,取最新的firstWaiter,若不爲空則重試,若爲空,說明隊列爲空
}

4.1.2 transferForSignal

  1. 修改節點狀態:CONDITION -> 0
  2. 加入到同步隊列
  3. 喚醒節點

java.util.concurrent.locks.AbstractQueuedSynchronizer#transferForSignal

/**
 * Transfers a node from a condition queue onto sync queue. // 將一個節點從條件隊列轉移到同步隊列
 * Returns true if successful.
 * @param node the node
 * @return true if successfully transferred (else the node was
 * cancelled before signal)
 */
final boolean transferForSignal(Node node) {
    /*
     * If cannot change waitStatus, the node has been cancelled. // 條件隊列上的節點狀態不爲CONDITION,說明是已取消
     */
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    /*
     * Splice onto queue and try to set waitStatus of predecessor to
     * indicate that thread is (probably) waiting. If cancelled or
     * attempt to set waitStatus fails, wake up to resync (in which
     * case the waitStatus can be transiently and harmlessly wrong).
     */
    Node p = enq(node); // 添加到同步隊列,返回上一個節點
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread); // 檢查上一個節點,發現不具有喚醒當前節點條件,則當即喚醒當前節點
    return true;                         // 補充:node.thread 從 Condition#await 之中被喚醒,後續執行 acquireQueued 嘗試獲取鎖
}

4.2 Condition#signalAll

遍歷條件隊列,依次喚醒全部節點。
全部節點都會遷移到同步隊列等待獲取鎖。

java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#signalAll

/**
 * Moves all threads from the wait queue for this condition to
 * the wait queue for the owning lock.
 *
 * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
 *         returns {@code false}
 */
public final void signalAll() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignalAll(first);
}

java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#doSignalAll

/**
 * Removes and transfers all nodes.
 * @param first (non-null) the first node on condition queue
 */
private void doSignalAll(Node first) {
    lastWaiter = firstWaiter = null;
    do {
        Node next = first.nextWaiter;
        first.nextWaiter = null;
        transferForSignal(first);
        first = next;
    } while (first != null);
}

相關閱讀:
閱讀 JDK 源碼:AQS 中的獨佔模式
閱讀 JDK 源碼:AQS 中的共享模式
閱讀 JDK 源碼:AQS 對 Condition 的實現

做者:Sumkor

相關文章
相關標籤/搜索