系列傳送門:java
Contition是一種廣義上的條件隊列,它利用await()和signal()爲線程提供了一種更爲靈活的等待/通知模式。node
圖源:《Java併發編程的藝術》編程
Condition必需要配合Lock一塊兒使用,由於對共享狀態變量的訪問發生在多線程環境下。c#
一個Condition的實例必須與一個Lock綁定,所以await和signal的調用必須在lock和unlock之間,有鎖以後,才能使用condition嘛。以ReentrantLock爲例,簡單使用以下:多線程
public class ConditionTest { public static void main(String[] args) { final ReentrantLock lock = new ReentrantLock(); final Condition condition = lock.newCondition(); Thread thread1 = new Thread(() -> { String name = Thread.currentThread().getName(); lock.lock(); System.out.println(name + " <==成功獲取到鎖" + lock); try { System.out.println(name + " <==進入條件隊列等待"); condition.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(name + " <==醒了"); lock.unlock(); System.out.println(name + " <==釋放鎖"); }, "等待線程"); thread1.start(); Thread thread2 = new Thread(() -> { String name = Thread.currentThread().getName(); lock.lock(); System.out.println(name + " ==>成功獲取到鎖" + lock); try { System.out.println("========== 這裏演示await中的線程沒有被signal的時候會一直等着 ==========="); Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(name + " ==>通知等待隊列的線程"); condition.signal(); lock.unlock(); System.out.println(name + " ==>釋放鎖"); }, "通知線程"); thread2.start(); } } 等待線程 <==成功獲取到鎖java.util.concurrent.locks.ReentrantLock@3642cea8[Locked by thread 等待線程] 等待線程 <==進入條件隊列等待 通知線程 ==>成功獲取到鎖java.util.concurrent.locks.ReentrantLock@3642cea8[Locked by thread 通知線程] ========== 這裏演示await中的線程沒有被signal的時候會一直等着 =========== 通知線程 ==>通知等待隊列的線程 通知線程 ==>釋放鎖 等待線程 <==醒了 等待線程 <==釋放鎖
接下來咱們將從源碼的角度分析上面這個流程,理解所謂條件隊列的內涵。併發
AQS,Lock,Condition,ConditionObject
之間的關係:less
ConditionObject是AQS的內部類,實現了Condition接口,Lock中提供newCondition()方法,委託給內部AQS的實現Sync來建立ConditionObject對象,享受AQS對Condition的支持。oop
// ReentrantLock#newCondition public Condition newCondition() { return sync.newCondition(); } // Sync#newCondition final ConditionObject newCondition() { // 返回Contition的實現,定義在AQS中 return new ConditionObject(); }
ConditionObject用來結合鎖實現線程同步,ConditionObject能夠直接訪問AQS對象內部的變量,好比state狀態值和AQS隊列。源碼分析
ConditionObject是條件變量,每一個條件變量對應一個條件隊列(單向鏈表隊列),其用來存放調用條件變量的await方法後被阻塞的線程,ConditionObject維護了首尾節點,沒錯這裏的Node就是咱們以前在學習AQS的時候見到的那個Node,咱們會在下面回顧:post
public class ConditionObject implements Condition, java.io.Serializable { private static final long serialVersionUID = 1173984872572414699L; /** 條件隊列的第一個節點. */ private transient Node firstWaiter; /** 條件隊列的最後一個節點. */ private transient Node lastWaiter; }
看到這裏咱們須要明確這裏的條件隊列和咱們以前說的AQS同步隊列是不同的:
這裏着重說明一下,接下來的源碼學習部分,咱們會將兩個隊列進行區分,涉及到同步隊列和阻塞隊列的描述,意味着是AQS的同步隊列,而條件隊列指的是Condition隊列,望讀者知曉。
這裏咱們針對上面的demo來分析一下會更好理解一些:
爲了簡化,接下來我將用D表示等待線程,用T表示通知線程。
lock.lock()
方法,此時無競爭,【D】被加入到AQS同步隊列中。condition.await()
方法,此時【D】被構建爲等待節點並加入到condition對應的條件等待隊列中,並從AQS同步隊列中移除。condition.signal()
方法,這時condition對應的條件隊列中只有一個節點【D】,因而【D】被取出,並被再次加入AQS的等待隊列中。此時【D】並無被喚醒,只是單純換了個位置。lock.unlock()
,釋放鎖鎖以後,會喚醒AQS隊列中的【D】,此時【D】真正被喚醒且執行。OK,lock -> await -> signal -> unlock
這一套流程相信已經大概可以理解,接下來咱們試着看看源碼吧。
咱們這裏再簡單回顧一下AQS中Node類與Condition相關的字段:
// 記錄當前線程的等待狀態, volatile int waitStatus; // 前驅節點 volatile Node prev; // 後繼節點 volatile Node next; // node存儲的線程 volatile Thread thread; // 當前節點在Condition中等待隊列上的下一個節點 Node nextWaiter;
waitStatus能夠取五種狀態:
固然,除了-2這個condition狀態,其餘的等待狀態咱們以前都或多或少分析過,今天着重學習condition這個狀態的意義。
咱們還能夠看到一個Node類型的nextWaiter,它表示條件隊列中當前節點的下一個節點,能夠看出用以實現條件隊列的單向鏈表。
調用Condition的await()方法,會使當前線程進入等待隊列並釋放鎖,同時線程狀態變爲等待狀態。
其實就是從AQS同步隊列的首節點,注意不是head,而是獲取了鎖的節點,移動到Condition的等待隊列中。
瞭解這些以後,咱們直接來看看具體方法的源碼:
public final void await() throws InterruptedException { // 這個方法是響應中斷的 if (Thread.interrupted()) throw new InterruptedException(); // 添加到條件隊列中 Node node = addConditionWaiter(); // 釋放同步資源,也就是釋放鎖 int savedState = fullyRelease(node); int interruptMode = 0; // 若是這個節點的線程不在同步隊列中,說明該線程還不具有競爭鎖的資格 while (!isOnSyncQueue(node)) { // 掛起線程 LockSupport.park(this); // 若是線程中斷,退出 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // 上面的循環退出有兩種狀況: // 1. isOnSyncQueue(node) 爲true,即當前的node已經轉移到阻塞隊列了 // 2. checkInterruptWhileWaiting != 0, 表示線程中斷 // 退出循環,被喚醒以後,進入阻塞隊列,等待獲取鎖 acquireQueued if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
addConditionWaiter() 是將當前節點加入到條件隊列中:
private Node addConditionWaiter() { Node t = lastWaiter; // 若是lastWaiter被取消了,將其清除 if (t != null && t.waitStatus != Node.CONDITION) { // 遍歷整個條件隊列,將已取消的全部節點清除出列 unlinkCancelledWaiters(); // t從新賦值一下,由於last可能改變了 t = lastWaiter; } //注意這裏,node在初始化的時候,會指定ws爲CONDITION Node node = new Node(Thread.currentThread(), Node.CONDITION); // t == null 表示隊列此時爲空,初始化firstWaiter if (t == null) firstWaiter = node; else t.nextWaiter = node;// 入隊尾 lastWaiter = node;// 將尾指針指向新建的node return node; }
unlinkCancelledWaiters用於清除隊列中已經取消等待的節點。
private void unlinkCancelledWaiters() { Node t = firstWaiter; // trail這裏表示取消節點的前驅節點 Node trail = null; // t會從頭至尾遍歷這個單鏈表 while (t != null) { // next用於保存下一個 Node next = t.nextWaiter; // 若是發現當前這個節點 不是 condition了, 那麼考慮移除它 // 下面是單鏈表的移除節點操做 簡單來講就是 trail.next = t.next if (t.waitStatus != Node.CONDITION) { t.nextWaiter = null; // 說明first就是否是condition了 if (trail == null) firstWaiter = next; else //trail.next = t.next trail.nextWaiter = next; // trail後面沒東西,天然trail就是lastWaiter了 if (next == null) lastWaiter = trail; } // 當前節點是一直跟到不是condition節點的上一個 else trail = t; // 向後遍歷 t = t.next t = next; } }
總結一下addConditionWaiter的過程:
將節點加入等待隊列中後,就須要徹底釋放線程擁有的獨佔鎖了,徹底釋放針對重入鎖的狀況。咱們能夠拉到await()方法中看看,將會調用:int savedState = fullyRelease(node);
,這句話有什麼內涵呢?
咱們看到這個方法返回了一個savedState變量,簡單的理解就是保存狀態。咱們知道重入鎖的state由重入的次數,若是一個state爲N,咱們能夠認爲它持有N把鎖。
await()方法必須將state置0,也就是徹底釋放鎖,後面的線程才能獲取到這把鎖,置0以後,咱們須要用個變量標記一下,也就是這裏的savedState。
這樣它被從新喚醒的時候,咱們就知道,他須要獲取savedState把鎖。
final int fullyRelease(Node node) { boolean failed = true; try { // 獲取當前的state值,重入次數 int savedState = getState(); // 釋放N = savedState資源 if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { // 若是獲取失敗,將會將節點設置爲取消狀態,並拋出異常 if (failed) node.waitStatus = Node.CANCELLED; } }
這裏其實咱們就會明白開頭說的:若是某個線程沒有獲取lock,就直接調用condition的await()方法,結果是什麼呢,在release的時候拋出異常,而後節點被取消,以後節點進來的時候,將它清理掉。
ok,徹底釋放鎖以後,將會來到這幾步,若是這個節點的線程不在同步隊列中,說明該線程還不具有競爭鎖的資格,將被一直掛起,這裏的同步隊列指的是AQS的阻塞隊列。
int interruptMode = 0; // 若是這個節點的線程不在同步隊列中,說明該線程還不具有競爭鎖的資格,會一直掛起 while (!isOnSyncQueue(node)) { // 掛起線程 LockSupport.park(this); // 若是線程中斷,退出 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; }
下面這個方法會判斷節點是否是已經到阻塞隊列中了,若是是的話,就直接返回true,這個方法的必要性是什麼呢?
其實啊,這裏須要提早說一下signal()方法,signal的做用和await()方法,將在等待隊列中阻塞的節點移動到AQS同步隊列中,這個方法就是說判斷一下這個節點是否是移過去了。
final boolean isOnSyncQueue(Node node) { // 1. 節點的等待狀態仍是condition表示還在等待隊列中 // 2. node.prev == null 表示還沒移到阻塞隊列中[prev和next都是阻塞隊列中用的] if (node.waitStatus == Node.CONDITION || node.prev == null) return false; // 若是node已經有了後繼節點,表示已經在阻塞隊列中了 if (node.next != null) // If has successor, it must be on queue return true; /* * node.prev can be non-null, but not yet on queue because * the CAS to place it on queue can fail. So we have to * traverse from tail to make sure it actually made it. It * will always be near the tail in calls to this method, and * unless the CAS failed (which is unlikely), it will be * there, so we hardly ever traverse much. */ // 來到這裏的狀況:ws != condition && node.prev != null && node.next == null // 想一想:爲何node.prev != null不能做爲判斷不在阻塞隊列的依據呢? // CAS首先設置node.prev 指向tail,這個時候node.prev 是不爲null的,但CAS可能會失敗 return findNodeFromTail(node); }
爲何node.prev != null不能做爲判斷不在阻塞隊列的依據呢?
官方給出瞭解答: 由於CAS的入隊操做中,首先設置node.prev 指向tail,這個時候node.prev 是不爲null的。你可以說他入隊成功必定成功嗎?不必定,由於CAS可能會失敗,因此要findNodeFromTail(node)。
從阻塞隊列的尾部向前遍歷,若是找到這個node,表示它已經在了,那就返回true。
private boolean findNodeFromTail(Node node) { Node t = tail; for (;;) { // 已經有了 if (t == node) return true; // 尾都沒有,找啥呢,返回false if (t == null) return false; // 一直往前找 t = t.prev; } }
因爲以前節點被加入等待隊列將會一直阻塞,爲了連貫性,咱們來看看喚醒它的signal方法吧:
以前說到,若是這個線程會在等待隊列中等待,那麼喚醒它的signal方法的流程是怎麼樣的呢,前面其實已經說了一丟丟了,咱們猜想,signal會將isOnSyncQueue方法的循環打破,接下來看看是否是這樣子的。
public final void signal() { // 同樣的,必須佔有當前這個鎖才能用signal方法 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); }
該方法會從頭至尾遍歷條件隊列,找到須要移到同步隊列的節點。
private void doSignal(Node first) { do { // firstWaiter 指向first的下一個 if ( (firstWaiter = first.nextWaiter) == null) // 若是first是最後一個且要被移除了,就將last置null lastWaiter = null; // first斷絕與條件隊列的鏈接 first.nextWaiter = null; // fisrt轉移失敗,就看看後面是否是須要的 } while (!transferForSignal(first) && (first = firstWaiter) != null); }
這裏的while循環表示,若是first沒有轉移成功,就接着判斷first後面的節點是否是須要轉移。
該方法將節點從條件隊列轉移到阻塞隊列。
final boolean transferForSignal(Node node) { /* * CAS操做嘗試將Condition的節點的ws改成0 * 若是失敗,意味着:節點的ws已經不是CONDITION,說明節點已經被取消了 * 若是成功,則該節點的狀態ws被改成0了 */ if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; /* * 經過enq方法將node自旋的方式加入同步隊列隊尾 * 這裏放回的p是node在同步隊列的前驅節點 */ Node p = enq(node); int ws = p.waitStatus; // ws大於0 的狀況只有 cancenlled,表示node的前驅節點取消了爭取鎖,那直接喚醒node線程 // ws <= 0 會使用cas操做將前驅節點的ws置爲signal,若是cas失敗也會喚醒node if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; } // 自旋的方式入隊 private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; // 返回的是node的前驅節點 return t; } } } }
ok,一旦signal以後,節點被成功轉移到同步隊列後,這時下面這個循環就會退出了,繼續回到這裏:
int interruptMode = 0; // 若是這個節點的線程不在同步隊列中,說明該線程還不具有競爭鎖的資格,會一直掛起 while (!isOnSyncQueue(node)) { // 掛起線程 LockSupport.park(this); // 若是線程中斷,退出 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; }
interruptMode能夠有如下幾種取值:
/** await 返回的時候,須要從新設置中斷狀態 */ private static final int REINTERRUPT = 1; /** await 返回的時候,須要拋出 InterruptedException 異常 */ private static final int THROW_IE = -1; /** interruptMode取0的時候表示在await()期間,沒有發生中斷 */
說到這裏咱們須要明白,LockSupport.park(this)
掛起的線程是何時喚醒的:
LockSupport.unpark(node.thread);
方法。喚醒以後,咱們能夠看到調用checkInterruptWhileWaiting方法檢查等待期間是否發生了中斷,若是不爲0表示確實在等待期間發生了中斷。
但其實這個方法的返回結果用interruptMode變量接收,擁有更加豐富的內涵,它還可以判斷中斷的時機是否在signal以前。
該方法用於判斷該線程是否在掛起期間發生了中斷。
private int checkInterruptWhileWaiting(Node node) { return Thread.interrupted() ?// 若是處於中斷狀態,返回true,且將重置中斷狀態 (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :// 若是中斷了,判斷什麼時候中斷 0; // 沒有中斷, 返回0 }
該方法判斷什麼時候中斷,是否在signal以前。
final boolean transferAfterCancelledWait(Node node) { // 嘗試使用CAS操做將node 的ws設置爲0 // 若是成功,說明在signal方法以前中斷就已經發生: // 緣由在於:signal若是在此以前發生,必然已經cas操做將ws設置爲0了,這裏不可能設置成功 if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { // 就算中斷了,也將節點入隊 enq(node); return true; } /* * If we lost out to a signal(), then we can't proceed * until it finishes its enq(). Cancelling during an * incomplete transfer is both rare and transient, so just * spin. * 這裏就是signal以後發生的中斷 * 可是signal可能還在進行轉移中,這邊自旋等一下它完成 */ while (!isOnSyncQueue(node)) Thread.yield(); return false; }
這裏的話,咱們仍是稍微總結一下:
接下來三個部分我將一一說明:
// 第一部分 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; // 第二部分 if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); // 清除取消的節點 // 第三部分 if (interruptMode != 0) reportInterruptAfterWait(interruptMode);
第一部分
signal喚醒的線程並不會當即獲取到資源,從while循環退出後,會經過acquireQueued方法加入獲取同步狀態的競爭中。
// 第一部分 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT;
acquireQueued(node, savedState)
中node此時已經被加入同步隊列了,savedState是以前存儲的state。
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; // } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
acquireQueued方法返回時,表示已經獲取到了鎖,且返回的是interrupted值,若是返回true,表示已經被中斷。
接着判斷interruptMode != THROW_IE
表示是在signal以後發生的中斷,須要從新中斷當前線程,將interruptMode設置爲REINTERRUPT。
第二部分
// 第二部分 if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); // 清除取消的節點
前面說了,signal會將節點移到同步隊列中,最後一步須要和條件隊列斷開關係,也就是:node.nextWaiter = null
,但這是想象中比較正常的狀況,若是在signal以前被中斷,節點也會被加入同步隊列中,這時實際上是沒有調用這個斷開關係的。
所以這邊作一點處理, unlinkCancelledWaiters()
邏輯上面也說過了,能夠回過頭去看看,主要是清除隊列中已經取消等待的節點。
第三部分
最後一個部分,就是對兩種interruptMode的狀況進行處理,看看代碼就知道了:
private void reportInterruptAfterWait(int interruptMode) throws InterruptedException { // signal 以前的中斷, 須要拋出異常 if (interruptMode == THROW_IE) throw new InterruptedException(); // signal 以後發生的中斷, 須要從新中斷 else if (interruptMode == REINTERRUPT) selfInterrupt(); }
帶超時機制的await()方法有如下幾個,簡單看下便可:
咱們選最後一個來看看,主要看看和以前await()方法不同的地方:
public final boolean await(long time, TimeUnit unit) throws InterruptedException { // 計算等待的時間 long nanosTimeout = unit.toNanos(time); if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); // 截止時間 final long deadline = System.nanoTime() + nanosTimeout; // 表示是否超時 boolean timedout = false; int interruptMode = 0; while (!isOnSyncQueue(node)) { // 等待時間到了 if (nanosTimeout <= 0L) { // 這個方法返回true表示在這個方法內,已經將節點轉移到阻塞隊列中 // 返回false,表示signal已經發生,表示沒有超時 timedout = transferAfterCancelledWait(node); break; } //spinForTimeoutThreshold 是AQS中的一個字段,若是超過1ms,使用parkNonos if (nanosTimeout >= spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; // 更新一下還須要等待多久 nanosTimeout = deadline - System.nanoTime(); } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return !timedout; }
public final void awaitUninterruptibly() { Node node = addConditionWaiter(); int savedState = fullyRelease(node); boolean interrupted = false; while (!isOnSyncQueue(node)) { LockSupport.park(this); if (Thread.interrupted()) interrupted = true; } // 相比await() 針對中斷少了拋出異常的操做,而是直接進行中斷 if (acquireQueued(node, savedState) || interrupted) selfInterrupt(); }
最後以一個Java doc給的例子結尾吧:
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; class BoundedBuffer { final Lock lock = new ReentrantLock(); // condition 依賴於 lock 來產生 final Condition notFull = lock.newCondition(); final Condition notEmpty = lock.newCondition(); final Object[] items = new Object[100]; int putptr, takeptr, count; // 生產 public void put(Object x) throws InterruptedException { lock.lock(); try { while (count == items.length) notFull.await(); // 隊列已滿,等待,直到 not full 才能繼續生產 items[putptr] = x; if (++putptr == items.length) putptr = 0; ++count; notEmpty.signal(); // 生產成功,隊列已經 not empty 了,發個通知出去 } finally { lock.unlock(); } } // 消費 public Object take() throws InterruptedException { lock.lock(); try { while (count == 0) notEmpty.await(); // 隊列爲空,等待,直到隊列 not empty,才能繼續消費 Object x = items[takeptr]; if (++takeptr == items.length) takeptr = 0; --count; notFull.signal(); // 被我消費掉一個,隊列 not full 了,發個通知出去 return x; } finally { lock.unlock(); } } }
其實這個以前也說過,ArrayBlockingQueue就是採用了這種方式實現的生產者-消費者模式,若是你感興趣,能夠看看具體的實現細節哦。
isOnSyncQueue(Node node)
,若是在等待隊列中,就一直等着,若是signal將它移到AQS隊列中,則退出循環。