之前多線程通訊使用的是Object類提供的wait
以及notify
方法,Condition
也有相似wait
、notify
的功能,且提供了當線程中斷時是否作出相應的處理方法。Condition
底層實現基於FIFO隊列,它必須結合Lock
鎖一塊兒使用,而且Condition
實例由Lock
建立。最後總結起來就是,Condition
是一種多線程通訊工具,表示多線程下參與數據競爭的線程的一種狀態,主要負責多線程環境下對線程的掛起和喚醒工做。node
下面貼出一張截取自《Java併發編程的藝術》的對比圖:編程
Condition
是一個接口,jdk中提供了Condition
的一個實現類ConditionObject
,ConditionObject
是AQS
中的內部類,由於Condition
的操做須要獲取相關的鎖,而AQS
又是實現同步鎖的基礎。Condition
提供了下面幾種操做方法:bash
void await() throws InterruptedException
當調用這個方法時,線程將被掛起,直到被其餘線程喚醒爲止。注意當掛起的線程被中斷時,將拋出InterruptedException
異常。多線程
void awaitUninterruptibly()
此方法與上面的await()
方法相似,區別是該方法不處理線程中斷的狀況。架構
long awaitNanos(long nanosTimeout) throws InterruptedException
表示能夠最長等待指定時間,除非中途被中斷或者提早喚醒了,返回值=nanosTimeout-已等待的時間。併發
void awaitUninterruptibly()
此方法與上面的await()
方法相似,區別是該方法不處理線程中斷的狀況。工具
boolean await(long time, TimeUnit unit) throws InterruptedException
同awaitNanos()
,只不過能夠指定時間單位。源碼分析
boolean awaitUntil(Date deadline) throws InterruptedException
表示線程掛起,知道某個時間點喚醒該線程。post
void signal()
以及void signalAll()
喚醒操做。ui
那麼問題來了,怎麼獲取Condition
對象呢?查看Lock
接口,該接口聲明瞭一個newCondition()
方法,返回一個Condition
對象,如Lock
接口的一個實現ReentrantLock
中則提供了該方法的實現,但本質上仍是建立了ConditionObject
對象。
源碼以下:
public Condition newCondition() {
return sync.newCondition();
}
複製代碼
其中sync
爲繼承自AQS
的同步器。
final ConditionObject newCondition() {
return new ConditionObject();
}
複製代碼
關於AQS
,能夠參考下面這篇文章:
下邊爲await()
方法源代碼:
public final void await() throws InterruptedException {
// 一、線程若是中斷,那麼拋出異常
if (Thread.interrupted())
throw new InterruptedException();
// 二、將當前線程包裝成爲一個Node節點,加入FIFO隊列中
Node node = addConditionWaiter();
// 三、釋放鎖
int savedState = fullyRelease(node);
int interruptMode = 0;
// 四、判斷節點是否在同步隊列(注意非Condition隊列)中,若是沒有,則掛起當前線程,由於該線程還沒有具有數據競爭資格
while (!isOnSyncQueue(node)) {
// 五、掛起線程
LockSupport.park(this);
// 六、中斷直接返回
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 七、參與數據競爭(非中斷時執行)
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 清理條件隊列中狀態爲cancelled的節點
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
複製代碼
從上面代碼咱們能夠看出,由於await
自己不支持中斷,所以若是當前線程被中斷了,那麼第一步直接拋出異常。第二步將當前線程包裝成一個Node節點,加入到Condition
條件隊列中,Node
與AQS
使用的是同一個類型,查看AQS
可知,每一個Node
與一個當前Thread
相關聯。
static final class Node {
volatile Thread thread;
}
複製代碼
接着執行第三步釋放鎖,這裏就涉及到爲何說Condition
使用時必須先得到鎖的問題了,下方爲fullyRelease()
源碼:
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
// 這裏直接執行釋放鎖操做
if (release(savedState)) {
failed = false;
return savedState;
} else {
// 釋放失敗,則報錯
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
複製代碼
第4-6步則不斷循環,直到該節點出如今同步隊列中,注意同步隊列不是條件隊列,一個屬於AQS
,一個屬於Condition
,那麼,這個while循環是怎樣判斷是否在同步隊列中的呢?其實很簡單,就是當把節點的狀態改爲非CONDITION
就能夠了,好比調用了signal()
,並且請注意,LockSupport.park(this);
這句代碼執行完以後是阻塞代碼了哈,後面分析signal
會提到。源碼以下:
final boolean isOnSyncQueue(Node node) {
// 判斷節點的狀態是否是CONDITION,CONDITION表示該節點正在處於等待某個條件,此時就應該park掛起線程
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
return findNodeFromTail(node);
}
複製代碼
第7步也很好理解,既然當前線程等待的條件都有了,被喚醒了,那麼就直接參與數據競爭就完事了,第8步清理掉一些狀態爲cancelled
的節點,線程因爲中斷或超時時,節點的狀態就會被標記成cancelled
public final void signal() {
// 一、必須得到鎖
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
複製代碼
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
// 將節點插入到同步隊列中
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
複製代碼
調用condition的signal的前提條件是當前線程已經獲取了lock,該方法會使得等待隊列中的頭節點即等待時間最長的那個節點移入到同步隊列,而移入到同步隊列後纔有機會使得等待線程被喚醒,即從await方法中的LockSupport.park(this)方法中返回(由於上文提到這個方法是阻塞住當前代碼的),從而纔有機會使得調用await方法的線程成功退出
使用鎖結合Condition
能夠很好的解決生產者消費者問題,須要注意的就是,使用Condition
時必須先獲取鎖,不然將報錯,也就是說,通常會按下面方式去調用Condition
。
lock.lock();
condition.await();
condition.signal();
lock.unlock();
複製代碼