前兩篇文章分別介紹了 AQS 框架中的獨佔模式和共享模式,本篇將介紹 AQS 對 Condition 接口的實現。
在閱讀本篇以前,建議先了解 AQS 中的數據結構和獨佔模式的實現原理。java
JUC 經過 Lock 和 Condition 兩個接口實現管程(Monitor),其中 Lock 用於解決互斥問題,而 Condition 用於解決同步問題,而 AQS 對 Lock 和 Condition 接口的實現提供了一個基礎的框架。node
本文基於 jdk1.8.0_91
Lock 替代了 synchronized 方法和語句的使用,Condition 替代了 Object 監視器方法的使用。 c#
Condition 實現能夠提供不一樣於 Object 監視器方法的行爲和語義,好比受保證的通知排序,或者在執行通知時不須要保持一個鎖。
若是某個實現提供了這樣特殊的語義,則該實現必須記錄這些語義。 segmentfault
方法摘要:數組
// 形成當前線程在接到信號或被中斷以前一直處於等待狀態。 void await() // 形成當前線程在接到信號、被中斷或到達指定等待時間以前一直處於等待狀態。 boolean await(long time, TimeUnit unit) // 形成當前線程在接到信號、被中斷或到達指定等待時間以前一直處於等待狀態。 long awaitNanos(long nanosTimeout) // 形成當前線程在接到信號以前一直處於等待狀態。 void awaitUninterruptibly() // 形成當前線程在接到信號、被中斷或到達指定最後期限以前一直處於等待狀態。 boolean awaitUntil(Date deadline) // 喚醒一個等待線程。 void signal() // 喚醒全部等待線程。 void signalAll()
Condition 本質上是一個隊列(稱爲條件隊列),線程等待某個條件成立時,在隊列中阻塞,直到其餘線程檢查條件成立後來通知它。
對於同一個鎖,只會存在一個同步隊列,可是可能會有多個條件隊列,只有在使用了 Condition 纔會存在條件隊列。數據結構
AQS 中對條件隊列的使用:框架
當線程獲取鎖以後,執行 Condition.await()
會釋放鎖並進入條件隊列,阻塞等待直到被其餘線程喚醒。
當其餘線程執行 Condition.signal()
喚醒當前線程時,當前線程會從條件隊列轉移到同步隊列來等待再次獲取鎖。
當前線程再一次獲取鎖以後,須要在 while 循環中判斷條件是否成立,若不成立需從新執行 Condition.await()
去等待。less
Condition 實例實質上被綁定到一個鎖上。要爲特定 Lock 實例得到 Condition 實例,請使用其 newCondition() 方法。 ui
Java 官方文檔提供 Condition 接口的使用示例:this
對於一個有界阻塞數組,當數組非滿時才能夠往數組中存放數據,不然阻塞;當數據非空時才能夠往數組中取元素,不然阻塞。
class BoundedBuffer { final Lock lock = new ReentrantLock(); final Condition notFull = lock.newCondition(); final Condition notEmpty = lock.newCondition(); final Object[] items = new Object[100]; int putptr, takeptr, count; // 生產者方法,往數組裏面寫數據 public void put(Object x) throws InterruptedException { lock.lock(); try { while (count == items.length) notFull.await(); // 阻塞直到非滿 items[putptr] = x; if (++putptr == items.length) putptr = 0; ++count; notEmpty.signal(); // 通知非空 } finally { lock.unlock(); } } // 消費者方法,從數組裏面拿數據 public Object take() throws InterruptedException { lock.lock(); try { while (count == 0) notEmpty.await(); // 阻塞直到非空 Object x = items[takeptr]; if (++takeptr == items.length) takeptr = 0; --count; notFull.signal(); // 通知非滿 return x; } finally { lock.unlock(); } } }
在 JDK 的實現中,獨佔模式纔可以使用 Condition,共享模式不支持 Condition。
由於 AQS 的內部類 ConditionObject 只支持獨佔模式。
java.util.concurrent.locks.ReentrantLock.Sync#newCondition
final ConditionObject newCondition() { return new ConditionObject(); }
java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock#newCondition
public Condition newCondition() { return sync.newCondition(); }
java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock#newCondition
/** * Throws {@code UnsupportedOperationException} because * {@code ReadLocks} do not support conditions. * * @throws UnsupportedOperationException always */ public Condition newCondition() { throw new UnsupportedOperationException(); }
代碼流程:
java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#await()
/** * Implements interruptible condition wait. * <ol> * <li> If current thread is interrupted, throw InterruptedException. * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. * <li> Block until signalled or interrupted. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * <li> If interrupted while blocked in step 4, throw InterruptedException. * </ol> */ public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); // 將當前線程封裝成節點存入條件隊列 int savedState = fullyRelease(node); // 釋放已經持有的鎖(就是在調用 Condition#await 以前持有的 Lock#lock 鎖),並返回釋放前的鎖狀態 int interruptMode = 0; while (!isOnSyncQueue(node)) { // 檢查節點是否在同步隊列上 LockSupport.park(this); // 節點還在條件隊列中,則阻塞 // 節點從阻塞中被喚醒(condition#signal,Thread#interrupt),檢查中斷狀態,設置中斷處理模式 // 補充:被 condition#signal 喚醒後的線程會從條件隊列轉移到同步隊列(先出隊再入隊) // 補充:若在條件隊列中就發生了中斷,也會被轉移到同步隊列(不出隊,只入隊,見 checkInterruptWhileWaiting -> transferAfterCancelledWait) if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) // 在同步隊列等待獲取資源直到成功,判斷設置中斷處理模式 interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled // nextWaiter不爲空,說明當前節點是由 Thread#interrupt 喚醒的(condition#signal 喚醒阻塞節點會設置nextWaiter爲空) // 此時當前節點同時存在於同步隊列、條件隊列上!可是 waitStatus 不是 CONDITION // 須要清除條件隊列中已取消的節點 unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); // 處理中斷:拋異常,或者補上中斷狀態 }
注意:
將當前線程封裝爲節點(waitStatus 爲 CONDITION),添加到條件隊列尾部。
若條件隊列不存在則進行初始化,把當前節點做爲頭節點(不使用 dummy node)。
java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#addConditionWaiter
/** * Adds a new waiter to wait queue. * @return its new wait node */ private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out. // 清理條件隊列中已取消的尾節點 if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION); // 構建節點,尾插法 if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }
釋放當前線程已持有的鎖/資源,返回釋放以前的鎖/資源。
若未持有鎖,報錯。
這裏存在 BUG:報錯以前,當前線程已經加入到條件隊列之中了,會致使條件隊列存儲無效的節點數據。
應該將是否持有鎖的校驗提早到 addConditionWaiter 以前,JDK 11 中已修復該問題。
java.util.concurrent.locks.AbstractQueuedSynchronizer#fullyRelease
final int fullyRelease(Node node) { // 釋放當前線程已持有的鎖 boolean failed = true; try { int savedState = getState(); // 獲取 volatile 的 state,獨佔模式下表示當前線程鎖持有的所有鎖 if (release(savedState)) { // 釋放所有的鎖 failed = false; return savedState; } else { throw new IllegalMonitorStateException(); // 未持有鎖,報錯 } } finally { if (failed) node.waitStatus = Node.CANCELLED; } }
判斷節點是否在同步隊列上。
java.util.concurrent.locks.AbstractQueuedSynchronizer#isOnSyncQueue
/** * Returns true if a node, always one that was initially placed on * a condition queue, is now waiting to reacquire on sync queue. * @param node the node * @return true if is reacquiring */ final boolean isOnSyncQueue(Node node) { if (node.waitStatus == Node.CONDITION || node.prev == null) return false; if (node.next != null) // If has successor, it must be on queue return true; return findNodeFromTail(node); // 從尾節點向前遍歷查找 } /** * Returns true if node is on sync queue by searching backwards from tail. * Called only when needed by isOnSyncQueue. * @return true if present */ private boolean findNodeFromTail(Node node) { Node t = tail; for (;;) { if (t == node) return true; if (t == null) return false; t = t.prev; } }
1. 若是 waitStatus == CONDITION
說明必定是位於條件隊列上。
從條件隊列入隊,構造節點的時候默認就爲 CONDITION 狀態。
將節點從條件隊列轉移到同步隊列,首先會 CAS 設置 waitStatus 狀態爲 CONDITION,再執行入隊操做。
2. node.prev == null
說明必定是位於條件隊列上。
同步隊列只有頭節點符合 node.prev == null
,可是同步隊列的頭節點是 dummy node,其 thread 爲空。
也就是說,來調用 isOnSyncQueue 方法且符合 node.prev == null
條件的節點,只多是位於條件隊列上的節點。
3. 若是 node.next != null
說明必定是處於同步隊列上。
節點加入同步隊列是個複合操做,最後一步是設置 node.next,當 node.next != null
說明入隊操做已執行完成。
4. 若是以上都沒法判斷節點是否位於同步隊列,則遍歷鏈表查找節點。
存在 node.prev != null
可是節點尚未徹底入隊成功的狀況,由於入隊操做設置 prev -> tail -> next 是非原子操做。
因此須要從 tail 向前遍歷,才能準確判斷 node 是否位於同步隊列上。
調用 findNodeFromTail 方法前,節點通常位於尾節點附近,不會遍歷過多節點。
阻塞在 Condition#await 的線程被喚醒以後,調用 checkInterruptWhileWaiting 來檢查是不是由線程中斷喚醒的。
若是是由線程中斷喚醒的,須要進一步判斷如何處理中斷:
java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#checkInterruptWhileWaiting
/** * Checks for interrupt, returning THROW_IE if interrupted * before signalled, REINTERRUPT if after signalled, or * 0 if not interrupted. */ private int checkInterruptWhileWaiting(Node node) { // 校驗當前線程是否被中斷,並清除線程的中斷狀態 return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; // 若是線程未被中斷,返回0 }
若是阻塞在 Condition#await 的線程是被中斷喚醒的,執行 transferAfterCancelledWait 判斷髮生中斷髮生時節點所在的位置。
若是是位於條件隊列,則將其添加到同步隊列,返回 true;不然返回 false。
如何判斷中斷髮生時節點所在的位置?
java.util.concurrent.locks.AbstractQueuedSynchronizer#transferAfterCancelledWait
/** * Transfers node, if necessary, to sync queue after a cancelled wait. * Returns true if thread was cancelled before being signalled. * * @param node the node * @return true if cancelled before the node was signalled */ final boolean transferAfterCancelledWait(Node node) { if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { // 經過 CAS 成功與否來判斷節點位置 enq(node); // 若是CAS成功,說明節點是位於條件隊列,須要將它添加到同步隊列 return true; } /* * If we lost out to a signal(), then we can't proceed * until it finishes its enq(). Cancelling during an * incomplete transfer is both rare and transient, so just * spin. */ // 條件隊列上的節點獲得通知(Condition#signal)以後,會添加到同步隊列中去。 while (!isOnSyncQueue(node)) // 這裏循環檢測,直到確認節點已經成功添加到同步隊列中。 Thread.yield(); return false; }
在 Condition#await 方法中,當線程從阻塞中被線程中斷喚醒後,判斷節點是位於條件隊列中,除了將節點加入同步隊列以外,還須要將節點從條件隊列中移除。
官方的說明:
java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#unlinkCancelledWaiters
/** * Unlinks cancelled waiter nodes from condition queue. * Called only while holding lock. This is called when * cancellation occurred during condition wait, and upon * insertion of a new waiter when lastWaiter is seen to have * been cancelled. This method is needed to avoid garbage * retention in the absence of signals. So even though it may * require a full traversal, it comes into play only when * timeouts or cancellations occur in the absence of * signals. It traverses all nodes rather than stopping at a * particular target to unlink all pointers to garbage nodes * without requiring many re-traversals during cancellation * storms. */ private void unlinkCancelledWaiters() { // 清除條件隊列中狀態不爲CONDITION的節點 Node t = firstWaiter; // 遊標節點,記錄當前遍歷的節點 Node trail = null; // 遊標節點,記錄遍歷過的最後一個有效節點(狀態爲CONDITION) while (t != null) { // 從條件隊列的頭節點開始遍歷(下面註釋用next表明下一個節點) Node next = t.nextWaiter; if (t.waitStatus != Node.CONDITION) { // 當前t爲無效節點 t.nextWaiter = null; if (trail == null) // 首次遍歷到t爲有效節點時,纔會初始化trail firstWaiter = next; // 設置t.next爲新的頭節點(下一次循環校驗t.next:若t.next無效,則把t.next.next設爲新的頭節點) else trail.nextWaiter = next; // 設置trail.next爲t.next(把t出隊,下一次循環校驗t.next:若t.next無效,則把t.next.next設爲trail.next) if (next == null) lastWaiter = trail; // 設置trail爲新的尾節點 } else // 當前t爲有效節點 trail = t; t = next; // 繼續遍歷t.next } }
Condition#await 執行到最後,從阻塞中被喚醒且從新取得鎖,判斷 interruptMode != 0,即 Condition#await 整個過程當中發生過中斷,須要對中斷進行統一處理。
具體見設置 interruptMode 的代碼:checkInterruptWhileWaiting
java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#reportInterruptAfterWait
/** * Throws InterruptedException, reinterrupts current thread, or * does nothing, depending on mode. */ private void reportInterruptAfterWait(int interruptMode) throws InterruptedException { if (interruptMode == THROW_IE) throw new InterruptedException(); else if (interruptMode == REINTERRUPT) selfInterrupt(); }
java.util.concurrent.locks.AbstractQueuedSynchronizer#selfInterrupt
/** * Convenience method to interrupt current thread. */ static void selfInterrupt() { Thread.currentThread().interrupt(); }
在 Condition 條件上阻塞,具備超時時間。
java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#awaitNanos
/** * Implements timed condition wait. * <ol> * <li> If current thread is interrupted, throw InterruptedException. * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. * <li> Block until signalled, interrupted, or timed out. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * <li> If interrupted while blocked in step 4, throw InterruptedException. * </ol> */ public final long awaitNanos(long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); final long deadline = System.nanoTime() + nanosTimeout; // 等待截止的時間戳 int interruptMode = 0; while (!isOnSyncQueue(node)) { if (nanosTimeout <= 0L) { // 已超時,檢查節點所在位置,判斷是否把節點加入同步隊列 transferAfterCancelledWait(node); break; } if (nanosTimeout >= spinForTimeoutThreshold) // 大於時間閾值,進行阻塞;小於時間閾值,進行自旋 LockSupport.parkNanos(this, nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) // 從阻塞中喚醒後,檢查是否發生中斷,若有中斷則結束自旋 break; nanosTimeout = deadline - System.nanoTime(); // 剩餘等待時間 } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return deadline - System.nanoTime(); // 返回剩餘的阻塞時間 }
java.util.concurrent.locks.AbstractQueuedSynchronizer#spinForTimeoutThreshold
/** * The number of nanoseconds for which it is faster to spin * rather than to use timed park. A rough estimate suffices * to improve responsiveness with very short timeouts. */ static final long spinForTimeoutThreshold = 1000L;
在 Condition 條件上阻塞,只能被 Condition#signal 喚醒。
java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#awaitUninterruptibly
/** * Implements uninterruptible condition wait. * <ol> * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. * <li> Block until signalled. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * </ol> */ public final void awaitUninterruptibly() { Node node = addConditionWaiter(); int savedState = fullyRelease(node); boolean interrupted = false; while (!isOnSyncQueue(node)) { LockSupport.park(this); if (Thread.interrupted()) interrupted = true; } if (acquireQueued(node, savedState) || interrupted) selfInterrupt(); }
喚醒在 Condition#await 上等待最久的線程。
把條件隊列的頭節點出隊,把它加入同步隊列,並喚醒節點中的線程。
被喚醒的線程從 Condition#await 中醒來後,執行 AbstractQueuedSynchronizer#acquireQueued 等待再次獲取鎖。
java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#signal
/** * Moves the longest-waiting thread, if one exists, from the * wait queue for this condition to the wait queue for the * owning lock. * * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ public final void signal() { if (!isHeldExclusively()) // 未持有獨佔鎖,報錯 throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); // 喚醒隊首節點(等待時間最長) }
java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#doSignal
/** * Removes and transfers nodes until hit non-cancelled one or * null. Split out from signal in part to encourage compilers * to inline the case of no waiters. * @param first (non-null) the first node on condition queue */ private void doSignal(Node first) { // 把條件隊列的頭節點轉移到同步隊列 do { if ( (firstWaiter = first.nextWaiter) == null) // 當前節點的後繼節點做爲新的頭節點(出隊),若爲空,說明隊列爲空 lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && // 把當前節點轉移到同步隊列(入隊),並喚醒節點上的線程(說明條件隊列的頭節點不是dummy node) (first = firstWaiter) != null); // 轉移失敗,取最新的firstWaiter,若不爲空則重試,若爲空,說明隊列爲空 }
java.util.concurrent.locks.AbstractQueuedSynchronizer#transferForSignal
/** * Transfers a node from a condition queue onto sync queue. // 將一個節點從條件隊列轉移到同步隊列 * Returns true if successful. * @param node the node * @return true if successfully transferred (else the node was * cancelled before signal) */ final boolean transferForSignal(Node node) { /* * If cannot change waitStatus, the node has been cancelled. // 條件隊列上的節點狀態不爲CONDITION,說明是已取消 */ if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; /* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */ Node p = enq(node); // 添加到同步隊列,返回上一個節點 int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); // 檢查上一個節點,發現不具有喚醒當前節點條件,則當即喚醒當前節點 return true; // 補充:node.thread 從 Condition#await 之中被喚醒,後續執行 acquireQueued 嘗試獲取鎖 }
遍歷條件隊列,依次喚醒全部節點。
全部節點都會遷移到同步隊列等待獲取鎖。
java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#signalAll
/** * Moves all threads from the wait queue for this condition to * the wait queue for the owning lock. * * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ public final void signalAll() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignalAll(first); }
java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#doSignalAll
/** * Removes and transfers all nodes. * @param first (non-null) the first node on condition queue */ private void doSignalAll(Node first) { lastWaiter = firstWaiter = null; do { Node next = first.nextWaiter; first.nextWaiter = null; transferForSignal(first); first = next; } while (first != null); }
相關閱讀:
閱讀 JDK 源碼:AQS 中的獨佔模式
閱讀 JDK 源碼:AQS 中的共享模式
閱讀 JDK 源碼:AQS 對 Condition 的實現
做者:Sumkor