掃描下方二維碼或者微信搜索公衆號
菜鳥飛呀飛
,便可關注微信公衆號,閱讀更多Spring源碼分析
和Java併發編程
文章。java
隊列同步器AQS是經過管程模型來實現的,在管程模型中,咱們提到了兩個隊列:
入口等待隊列
和條件變量等待隊列
。在AQS中同步隊列
對應管程模型中的入口等待隊列
,條件等待隊列
對應管程模型中的條件變量等待隊列
。關於AQS的同步隊列的設計原理及源碼實現能夠閱讀這兩篇文章:隊列同步器(AQS)的設計原理 和 隊列同步器(AQS)源碼分析。今天將詳細分析AQS中等待隊列的設計原理和源碼。(關於管程的介紹能夠參考這篇文章:管程:併發編程的基石 )node
互斥
與同步
,互斥指的是同一時刻只容許一個線程訪問共享資源,這一點AQS的同步隊列已經幫助咱們解決了。同步指的是線程間如何進行通訊和協做,那麼AQS又是如何來解決同步問題的呢?答案就是今天的主角:Condition
。ConditionObject
是隊列同步器AbstractQueuedSynchronizer(後面簡稱AQS)的一個內部類。它須要與Lock一塊兒使用,經過Lock.newCondition()
來建立實例。await()、signal()、signalAll()
三個方法,它們分別與Object類的這三個方法對應,也是用來實現線程以前的通訊的,可是它們在功能和使用法方式上存在部分差別。具體差別能夠參考下表。(表格來源於《Java併發編程的藝術》一書第5章第6節)對比項 | Object | Condition |
---|---|---|
使用前提 | 使用synchronize獲取到鎖 | 使用Lock實例的lock()方法獲取到鎖 |
使用方式 | object.wait()、object.notify()等 | 須要使用Lock接口的實例對象來建立,Lock.newCondition() |
等待隊列個數 | 支持1個 | 能夠支持多個,使用Lock實例new多個Condition便可 |
線程進入等待隊列後是否響應中斷 | 不支持 | 支持 |
超時等待 | 支持 | 支持 |
線程釋放鎖後等待到未來的某個時間點 | 不支持 | 支持 |
喚醒等待隊列中的一個線程 | 支持,notify() | 支持,signal() |
喚醒等待隊列中的全部線程 | 支持,notifyAll() | 支持,signalAll() |
屬性名 | 做用 |
---|---|
Node prev | 同步隊列中,當前節點的前一個節點,若是當前節點是同步隊列的頭結點,那麼prev屬性爲null |
Node next | 同步隊列中,當前節點的後一個節點,若是當前節點是同步隊列的尾結點,那麼next屬性爲null |
Node thread | 當前節點表明的線程,若是當前線程獲取到了鎖,那麼當前線程所表明的節點必定處於同步隊列的隊首,且thread屬性爲null,至於爲何要將其設置爲null,這是AQS特地設計的。 |
int waitStatus | 當前線程的等待狀態,有5種取值。0表示初始值,1表示線程被取消,-1表示當前線程處於等待狀態,-2表示節點處於等待隊列中,-3表示下一次共享式同步狀態獲取將會無條件地被傳播下去 |
Node nextWaiter | 等待隊列中,該節點的下一個節點 |
等待隊列
的實現主要依靠的是Node節點中nextWaiter
屬性和waitStatus
屬性來實現的,其中waitStatus=-2
時,表示線程是處於等待隊列中
。(同步隊列依靠prev和next屬性來實現雙向鏈表)。Condition包含兩個屬性:firstWaiter
和lastWaiter
,分別表示等待隊列的隊首和隊尾。等待隊列遵循先進先出的原則(FIFO)。下圖爲Condition等待隊列的示意圖。添加到等待隊列
中;而後釋放鎖;最後調用LockSopport.park()
方法將本身掛起。可使用以下示意圖表示。firstWaiter
節點,而後將firstWaiter
節點加入到同步隊列
中。示意圖以下。全部Node節點移到同步隊列中
。理解了上面的等待隊列的數據結構和實現原理,接下來就結合源碼看看具體的實現。接下來將分析await()方法和signal()方法的源碼。編程
當調用condition.await()方法時,會調用到AbstractQueuedSynchornizer中的內部類ConditionObject的await()方法。該方法的源碼以下。設計模式
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 將當前線程加入到等待隊列中
Node node = addConditionWaiter();
// 徹底釋放鎖
int savedState = fullyRelease(node);
int interruptMode = 0;
// 判斷同步節點是否在同步隊列中,
// 若是在當前線程釋放鎖後,鎖被其餘線程搶到了,此時當前節點就不在同步隊列中了。
// 若是鎖沒有被搶佔到,節點就會再同步隊列中(當前線程是持有鎖的線程,因此它是頭結點)
while (!isOnSyncQueue(node)) {
// 若是節點不在同步隊列中,將當前線程park
LockSupport.park(this);
/** * 當被喚醒之後,接着從下面開始執行。醒來後會判斷本身在等待過程當中有沒有被中斷過。 * checkInterruptWhileWaiting()方法返回0表示沒有被中斷過 * 返回-1表示須要拋出異常 * 返回1表示須要重置中斷標識 */
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 若是線程在同步隊列中,那麼就嘗試去獲取鎖,若是獲取不到,就會加入到同步隊列中,並阻塞
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 當條件等待隊列中還有其餘線程在等待時,須要判斷條件等待隊列中有沒有線程被取消,若是有,則將它們清除
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
複製代碼
addConditionWaiter()
方法,將本身封裝成一個Node後,加入到等待隊列中。而後調用fullyRelease()
方法釋放鎖,接着經過判斷當前線程是否在等待隊列中,若是不在等待隊列中,就將本身park
。lastWaiter
屬性等於當前線程所表明的節點。(注意:此時若是等待隊列沒有進行初始化時,會先進行初始化)。addConditionWaiter()方法的源碼以下。/** * Adds a new waiter to wait queue. * @return its new wait node * 將當前線程加入到等待隊列中 */
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
// 建立一個節點,節點的waitStatus等於-2
Node node = new Node(Thread.currentThread(), Node.CONDITION);
// 若是等待隊列尚未初始化,即等跌隊列中尚未任何元素,那麼此時firstWaiter和lastWaiter均爲null
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node; // 令舊的隊尾節點nextWaiter屬性等於當前線程的節點,這樣就維護了隊列的先後關係
lastWaiter = node;
return node;
}
複製代碼
fullyRelease()
方法,從方法名看,就知道它的做用是徹底釋放鎖
。這裏爲何要徹底釋放鎖呢?由於對於重入鎖而言,鎖可能被重入了屢次,此時同步變量state的值大於1
,而調用await()方法時,要讓當前線程將鎖釋放掉,因此須要將state的值減爲0,所以這裏取名爲fullyRelease()。fullyRelease()最終仍是調用AQS的release()
方法來釋放鎖。fullyRelease()方法的源碼以下。final int fullyRelease(Node node) {
boolean failed = true;
try {
// 獲取同步變量state的值
int savedState = getState();
// 釋放鎖,注意此時將同步變量的值傳入進去了,若是是重入鎖,且被重入過,那麼此時savedState的值大於1
// 此時釋放鎖時,會將同步變量state的值減爲0。(一般可重入鎖在釋放鎖時,每次只會將state減1,重入了幾回就要釋放幾回,在這裏是一會兒所有釋放)。
if (release(savedState)) {
failed = false;
return savedState;
} else {
// 當線程沒有獲取到鎖時,調用該方法會釋放失敗,會拋出異常。
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
複製代碼
isOnSyncQueue()
判斷當前線程的節點是否是在同步隊列
中,若是isOnSyncQueue()方法返回true,表示節點在同步隊列中,若是返回false,表示當前節點不在同步隊列中。當返回false時,會進入到while循環中
,此時會調用LockSupport.park()
方法,讓當前線程掛起。因爲當前線程在釋放鎖之後會喚醒同步隊列中的線程去搶鎖,若是有線程搶到鎖,那麼當前線程就確定不會在同步隊列了(同步隊列的首節點變化了),因此此時isOnSyncQueue()方法大機率返回的是false,所以會進入到while()方法中。若是當前線程在同步隊列中,或者從park()處醒來後(醒來後會出如今同步隊列中,由於signal()或者signalAll()方法會將節點移到同步隊列),就會執行到await()方法後面的邏輯,即acquireQueued()
方法,該方法就是去嘗試獲取鎖,若是獲取到就會返回,獲取不到就阻塞。isOnSyncQueue()
的源碼以及註釋以下。final boolean isOnSyncQueue(Node node) {
// 若是節點的waitStatus=-2時,節點確定不在同步隊列中,由於只有在等待隊列時,纔會爲-2。
// 若是節點的前驅節點爲空時,有兩種狀況:
// 1. 當前節點是同步隊列的首節點,首節點是已經獲取到鎖的線程,能夠認爲線程不在同步隊列中
// 2. 當前節點在等待隊列中,等待隊列中的節點在建立時,沒有給prev屬性賦值
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
// 在上一個判斷的前提條件下,若是後置節點不爲空,那麼當前節點確定在同步隊列中。
if (node.next != null) // If has successor, it must be on queue
return true;
/* * node.prev can be non-null, but not yet on queue because * the CAS to place it on queue can fail. So we have to * traverse from tail to make sure it actually made it. It * will always be near the tail in calls to this method, and * unless the CAS failed (which is unlikely), it will be * there, so we hardly ever traverse much. */
// 以上狀況均不屬於,那麼就從同步隊列的尾部開始遍歷,找到同步隊列中是否含有node節點
return findNodeFromTail(node);
}
複製代碼
1.
將本身加到等待隊列;2.
釋放鎖;3.
將本身park()。在Condition接口中還提供了幾個重載的await()方法,它們在await()方法的基礎上添加了部分功能,例如超時等待、不響應中斷的等待等功能,但大體邏輯和await()方法相似,有興趣的朋友能夠去研究下。condition.signal()
方法時,會調用到AbstractQueuedSynchornizer中的內部類ConditionObject的signal()方法
。該方法的源碼以下。public final void signal() {
// 先判斷當前線程有沒有獲取到鎖,若是沒有獲取到鎖就來調用signal()方法,就會拋出異常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
複製代碼
doSignal()
方法中實現的。doSignal()方法的源碼以下。private void doSignal(Node first) {
do {
// 令firstWaiter等於條件等待隊列中的下一個節點。
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
/** *調用transferForSignal()方法,是將節點從條件等待隊列中移到同步隊列中. * 當transferForSignal()返回true時,表示節點被成功移到同步隊列了。返回false,表示移動失敗,當節點所表示的線程被取消時,會返回false * 當transferForSignal()返回true時,do...while循環結束。返回false時,繼續。爲何要這樣呢? * 由於當transferForSignal()返回false表示條件等待隊列中的,隊列的頭結點的狀態時取消狀態,不能將它移到同步隊列中,隨意須要繼續從條件等待隊列找沒有被取消的節點。 */
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
複製代碼
從等待隊列中移除
,而後調用transferForSignal()
方法將其加入到同步隊列
中,當transferForSignal()返回true時,表示節點被成功移到了同步隊列中,返回false表示移動失敗,只有一種狀況會移動失敗,那就是線程被取消了。transferForSignal()方法的源碼以下。/** * Transfers a node from a condition queue onto sync queue. * Returns true if successful. * @param node the node * @return true if successfully transferred (else the node was * cancelled before signal) * 將節點從條件等待隊列移到同步隊列 */
final boolean transferForSignal(Node node) {
/* * If cannot change waitStatus, the node has been cancelled. */
// 將節點的waitStatus的值從-2改成-1。這裏若是出現CAS失敗,說明節點的waitStatus值被修改過,在條件等待隊列中,只有當線程被取消後,纔會去修改waitStatus
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */
// 當加入到同步隊列後,須要將當前節點的前一個節點的waitStatus設置爲-1,表示隊列中還有線程在等待
// 若是前驅節點的waitStatus大於0表示線程被取消,須要將當前線程喚醒
// 或者修改前驅節點的waitStatus是失敗,也須要去喚醒當前線程
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
複製代碼
signalAll()
方法的做用就是喚醒等待隊列中的全部節點
,而signal()方法只喚醒等待隊列的第一個節點。當調用condition.signalAll()方法時,會調用到AbstractQueuedSynchornizer中的內部類ConditionObject的signalAll()方法。該方法的源碼以下。public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
// 核心邏輯在doSignalAll()方法
doSignalAll(first);
}
複製代碼
doSignalAll()
方法。doSignalAll()方法的源碼以下。private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
// 經過do...while循環,遍歷等待隊列的全部節點
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
// transferForSignal()方法將節點移到到同步隊列
transferForSignal(first);
first = next;
} while (first != null);
}
複製代碼
do...while
循環,遍歷等待隊列的全部節點,循環調用transferForSignal()
方法,將等待隊列中的節點所有移動到同步隊列
中。Lock.newCondition()
方法就能建立Condition實例,屢次調用
Lock.newCondition()方法,那麼就會建立多個條件等待隊列
。LinkedBlockingQueue
就是經過Condition來實現的,有興趣的朋友能夠先去閱讀下源碼。