任意一個Java對象,都擁有一組監視器方法(定義在java.lang.Object上),主要包括wait()、 wait(long timeout)、notify()以及notifyAll()方法,這些方法與synchronized同步關鍵字配合,能夠實現等待/通知模式。Condition接口也提供了相似Object的監視器方法,與Lock配合能夠實現等待/通知模式,可是這二者在使用方式以及功能特性上仍是有差異的。java
Condition接口做爲Lock接口的wait()、notify()方法,實現等待/通知模式。node
Object的監視器方法和Condition接口的對比數組
對比項 | Object監視器方法 | Codition |
---|---|---|
前置條件 | 獲取對象的鎖(隱式獲取) | 調用Lock()獲取鎖,而後調用Lock.newCondition()獲取Condition對象 |
調用方式 | 直接調用,如:object.wait() | 直接調用,如:condition.await() |
等待隊列個數 | 一個 | 多個(這裏思考一下爲何會有多個等待隊列?) |
當前線程釋放鎖進入等待狀態 | 支持 | 支持 |
當前線程釋放鎖進入等待狀態,在等待狀態中不響應中斷 | 不支持 | 支持 |
當前線程釋放鎖並進入超時等待狀態 | 支持 | 支持 |
當前線程釋放鎖並進入等待狀態到未來的某個時間 | 不支持 | 支持 |
喚醒等待隊列中的某個線程 | 支持 | 支持 |
喚醒等待隊列中的所有線程 | 支持 | 支持 |
看不懂不要緊,把下面的內容看完再回頭過來看,就會懂了。:)安全
Condition是在AQS中配合使用的wait/nofity線程通訊協調工具類,咱們能夠稱之爲等待隊列 Condition定義了等待/通知兩種類型的方法,當前線程調用這些方法時,須要提早獲取到Condition對象關聯的鎖。Condition對象是調用Lock對象的newCondition()方法建立出來的,換句話說,Condition是依賴Lock對象。bash
下面看看Condition接口是如何和Lock接口使用併發
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
public void conditionWait() throws InterruptedException {
lock.lock();
try {
condition.await();
} finally {
lock.unlock;
}
}
public void conditionSignal() throws InterruptedException {
lock.lock();
try {
condition.signal();
} finally {
lock.unlock();
}
}
複製代碼
如示例所示,通常都會將Condition對象做爲成員變量。當調用await()方法後,當前線程會 釋放鎖並在此等待,而其餘線程調用Condition對象的signal()方法,通知當前線程後,當前線程 才從await()方法返回,而且在返回前已經獲取了鎖。工具
下面看看Condition部分方法的詳細解析 ui
下面以一個有界隊列來深刻了解Condition的使用方式this
public class BoundedQueue<T> {
//存儲隊列元素的對象數組
private Object[] items;
//添加的下標,刪除的下標和數組當前數量
private int addIndex, removeIndex, count;
private Lock lock = new ReentrantLock();
//定義一個非空等待隊列
private Condition notEmpty = lock.newCondition();
//定義一個非滿等待隊列
private Condition notFull = lock.newCondition();
public BoundedQueue(int size) {
items = new Object[size];
}
public void add(T t) throws InterruptedException {
lock.lock();
try {
//若是當前隊列已滿,則進入非滿等待隊列
while(count == items.length) {
//調用await()方法,進入等待隊列,只有在remove方法將元素移出有界隊列,而後進行notFull.signal()方法才能繼續進行添加操做。
notFull.await();
}
items[addIndex] = t;
if(++addIndex == items.length) {
addIndex = 0;
}
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}
}
public T remove() throws InterruptedException {
lock.lock();
try {
//若是當前隊列爲空,則進入notEmpty等待隊列
while(count == 0) {
//只有調用add方法向有界隊列中添加元素,而後調用notEmpty.signal()方法後,才能從該方法返回。
notEmpty.await();
}
Object x = items[removeIndex];
if(++removeIndex == items.length) {
removeIndex = 0;
}
--count;
notFull.signal();
return (T)x;
} finally {
lock.unlock();
}
}
複製代碼
這個有界隊列是一種特殊的隊列,當隊列爲空時,隊列的獲取操做 將會阻塞獲取線程,直到隊列中有新增元素,當隊列已滿時,隊列的插入操做將會阻塞插入線程,直到隊列出現「空位」。spa
首先須要得到鎖,目的是確保數組修改的可見性和排他性。當數組數量等於數組長度時, 表示數組已滿,則調用notFull.await(),當前線程隨之釋放鎖並進入等待狀態。若是數組數量不等於數組長度,表示數組未滿,則添加元素到數組中,同時通知等待在notEmpty上的線程,數組中已經有新元素能夠獲取。 在添加和刪除方法中使用while循環而非if判斷,目的是防止過早或意外的通知,只有條件 符合纔可以退出循環。回想以前提到的等待/通知的經典範式,兩者是很是相似的。
總結:在Object的監視器模型上,一個對象擁有一個同步隊列和等待隊列,而併發包中的 Lock(更確切地說是同步器)擁有一個同步隊列和多個等待隊列。
問題:爲何每一個併發包中的同步器會有多個等待隊列呢??
不一樣於synchronized同步隊列和等待隊列只有一個,AQS的等待隊列是有多個,由於AQS能夠實現排他鎖(ReentrantLock)和非排他鎖(ReentrantReadWriteLock——讀寫鎖),讀寫鎖就是一個須要多個等待隊列的鎖。等待隊列(Condition)用來保存被阻塞的線程的。由於讀寫鎖是一對鎖,因此須要兩個等待隊列來分別保存被阻塞的讀鎖和被阻塞的寫鎖。
condition是一個接口,那它的實現類呢?它的實現類——ConditionObject定義在同步器AQS內部,由於condition的操做須要獲取相關聯的鎖,因此將其定義爲同步器內部類也較爲合理。
每一個condition對象都包含着一個隊列——等待隊列,用來保存着被阻塞的線程。隱式鎖synchronized配合着object對象的監視器方法wait()和notify()可以實現等待/通知功能,固然顯式鎖(同步組件)配合着condition對象也能夠實現等待/通知功能。
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
//等待隊列的頭結點,transient的做用是不讓節點被序列化
private transient Node firstWaiter;
//等待隊列的尾結點,transient的做用是不讓節點被序列化
private transient Node lastWaiter;
...
}
複製代碼
能夠看到Node就是等待隊列中的節點。那麼這個Node節點是定義在哪裏的呢?這個Node節點是定義在同步器AQS中的。也就是說,同步隊列和等待隊列中的節點類型都是定義在同步器的靜態內部類AbstractQueuedSynchronized.Node。
對的你沒看錯,等待隊列和同步隊列都是定義在同步器中的,換句話說就是等待隊列定義在同步隊列中的。
調用condition.await()方法會發生什麼是呢?當前線程會被構形成一個節點,而後從尾部加入到等待隊裏中並釋放當前線程持有的鎖,當前先被阻塞,而後喚醒同步隊列中的後繼節點,同時當前線程的狀態變爲等待狀態(釋放同步狀態),而後就在等待隊列中一直等待着...等待着... 下一步會發生什麼?答案就在下面。
上面的過程能夠這樣看待,就是將同步器中同步隊列的首節點(獲取了鎖的節點)的線程移動到了等待隊列的尾部。
下面來看看condition.await()方法的源碼
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//將當前線程構形成節點而後添加到condition等待隊列的尾部
Node node = addConditionWaiter();
//釋放鎖(釋放同步狀態)
int savedState = fullyRelease(node);
int interruptMode = 0;
//若是當前線程在同步隊列中,isOnSyncQueue(node)返回true
while (!isOnSyncQueue(node)) {
//阻塞當前線程
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//調用同步器的acquireQueued()方法加入到獲取同步狀態的競爭中
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode)
}
複製代碼
LockSupport.park(this);
複製代碼
LockSupport.park()方法很關鍵,讓當前隊列真正的阻塞起來。看LockSupport.park的源碼可知是調用了native park()方法。
public native void park(boolean var1, long var2);
複製代碼
下面用圖來展現一下上面說的稍微有點複雜但又很清晰的過程
問1:當前線程釋放的鎖被誰持有了?
答1:調用await()方法的線程得到了鎖,而後該線程就會稱爲同步隊列的首節點。
特別注意:上述節點引用更新的過程(釋放同步狀態)並無使用CAS保證,緣由在於調用await()方法的線程一定是獲取了鎖的線程,也就是說該過程是由鎖來保證線程安全的。
通知的結果,就至關於將等待隊列中等待時間最久的線程(首節點)移動到同步隊列中的隊尾。
想了解更詳細的原理就接着往下看吧
先看看源碼
//通知
public final void signal() {
//isHeldExclusively()判斷當前是不是持有鎖的線程
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//獲取等待隊列中的首節點
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
//將首節點斷開,而後調用transferForSignal(first)將首節點移動到同步隊列中
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//添加到同步隊列中
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
//調用unpark喚醒當前線程,而後就能夠到同步隊列中去競爭鎖了
LockSupport.unpark(node.thread);
return true;
}
複製代碼
結合上面的等待過程,此時的通知過程:當前持有鎖的線程調用signal(),先獲取等待隊列中的首節點並斷開首節點,而後添加到同步隊列的隊尾,最後調用LockSupport.unpark()方法來喚醒等待線程,將從await()方法中的while循環中退出(isOnSyncQueue(Node node)方法返回true,節點已經在同步隊列中),進而調用同步器的acquireQueued()方法加入到獲取同步狀態的競爭中。若是被喚醒的線程有幸競爭成功得到了鎖(獲取同步狀態),被喚醒的線程將從先前調用的await()方法返回,此時該線程已經成功地獲取了鎖。
另外signalAll()方法,至關於對等待隊列中的每一個節點均執行一次signal()方法,效 果就是將等待隊列中全部節點所有移動到同步隊列中,並喚醒每一個節點的線程。
來一張圖整理一下整個通知過程,思路會更加清晰