Java併發編程,Condition的await和signal等待通知機制

Condition簡介
Object類是Java中全部類的父類, 在線程間實現通訊的每每會應用到Object的幾個方法: wait(),wait(long timeout),wait(long timeout, int nanos)與notify(),notifyAll() 實現等待/通知機制,一樣的, 在Java Lock體系下依然會有一樣的方法實現等待/通知機制。 從總體上來看Object的wait和notify/notify是與對象監視器配合完成線程間的等待/通知機制,Condition與Lock配合完成等待/通知機制, 前者是Java底層級別的,後者是語言級別的,具備更高的可控制性和擴展性。 二者除了在使用方式上不一樣外,在功能特性上仍是有不少的不一樣:node

Condition可以支持不響應中斷,而經過使用Object方式不支持併發

Condition可以支持多個等待隊列(new 多個Condition對象),而Object方式只能支持一個分佈式

Condition可以支持超時時間的設置,而Object不支持高併發

參照Object的wait和notify/notifyAll方法,Condition也提供了一樣的方法:學習

針對Object的wait方法ui

void await() throws InterruptedException//當前線程進入等待狀態,若是在等待狀態中被中斷會拋出被中斷異常long awaitNanos(long nanosTimeout)//當前線程進入等待狀態直到被通知,中斷或者超時boolean await(long time, TimeUnit unit)throws InterruptedException//同第二種,支持自定義時間單位boolean awaitUntil(Date deadline) throws InterruptedException//當前線程進入等待狀態直到被通知,中斷或者到了某個時間
針對Object的notify/notifyAll方法this

void signal()//喚醒一個等待在condition上的線程,將該線程從等待隊列中轉移到同步隊列中,若是在同步隊列中可以競爭到Lock則能夠從等待方法中返回。void signalAll()//與1的區別在於可以喚醒全部等待在condition上的線程
Condition實現原理分析
等待隊列
建立一個Condition對象是經過lock.newCondition(), 而這個方法其實是會建立ConditionObject對象,該類是AQS的一個內部類。 Condition是要和Lock配合使用的也就是Condition和Lock是綁定在一塊兒的,而lock的實現原理又依賴於AQS, 天然而然ConditionObject做爲AQS的一個內部類無可厚非。 咱們知道在鎖機制的實現上,AQS內部維護了一個同步隊列,若是是獨佔式鎖的話, 全部獲取鎖失敗的線程的尾插入到同步隊列, 一樣的,Condition內部也是使用一樣的方式,內部維護了一個等待隊列, 全部調用condition.await方法的線程會加入到等待隊列中,而且線程狀態轉換爲等待狀態。 另外注意到ConditionObject中有兩個成員變量:spa

/ First node of condition queue. /private transient Node firstWaiter;/ Last node of condition queue. /private transient Node lastWaiter;
ConditionObject經過持有等待隊列的頭尾指針來管理等待隊列。 注意Node類複用了在AQS中的Node類,Node類有這樣一個屬性:線程

//後繼節點Node nextWaiter;
等待隊列是一個單向隊列,而在以前說AQS時知道同步隊列是一個雙向隊列。3d

等待隊列示意圖:

clipboard.png

注意: 咱們能夠屢次調用lock.newCondition()方法建立多個Condition對象,也就是一個lLock能夠持有多個等待隊列。 利用Object的方式其實是指在對象Object對象監視器上只能擁有一個同步隊列和一個等待隊列; 併發包中的Lock擁有一個同步隊列和多個等待隊列。示意圖以下:

clipboard.png

ConditionObject是AQS的內部類, 所以每一個ConditionObject可以訪問到AQS提供的方法,至關於每一個Condition都擁有所屬同步器的引用。

await實現原理
當調用condition.await()方法後會使得當前獲取lock的線程進入到等待隊列, 若是該線程可以從await()方法返回的話必定是該線程獲取了與condition相關聯的lock。 await()方法源碼以下:

public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 1. 將當前線程包裝成Node,尾插法插入到等待隊列中

Node node = addConditionWaiter();    // 2. 釋放當前線程所佔用的lock,在釋放的過程當中會喚醒同步隊列中的下一個節點
int savedState = fullyRelease(node);    int interruptMode = 0;    while (!isOnSyncQueue(node)) {    // 3. 當前線程進入到等待狀態
    LockSupport.park(this);        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)            break;
}    // 4. 自旋等待獲取到同步狀態(即獲取到lock)
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
    interruptMode = REINTERRUPT;    if (node.nextWaiter != null) // clean up if cancelled
    unlinkCancelledWaiters();    // 5. 處理被中斷的狀況
if (interruptMode != 0)
    reportInterruptAfterWait(interruptMode);

}
當前線程調用condition.await()方法後,會使得當前線程釋放lock而後加入到等待隊列中, 直至被signal/signalAll後會使得當前線程從等待隊列中移至到同步隊列中去, 直到得到了lock後纔會從await方法返回,或者在等待時被中斷會作中斷處理。

addConditionWaiter()將當前線程添加到等待隊列中,其源碼以下:

private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out.

if (t != null && t.waitStatus != Node.CONDITION) {
    unlinkCancelledWaiters();
    t = lastWaiter;
}    //將當前線程包裝成Node
Node node = new Node(Thread.currentThread(), Node.CONDITION);    if (t == null) //t==null,同步隊列爲空的狀況
    firstWaiter = node;    else

//尾插法

t.nextWaiter = node;    //更新lastWaiter
lastWaiter = node;    return node;

}
這裏經過尾插法將當前線程封裝的Node插入到等待隊列中, 同時能夠看出等待隊列是一個不帶頭結點的鏈式隊列,以前咱們學習AQS時知道同步隊列是一個帶頭結點的鏈式隊列。

將當前節點插入到等待對列以後,使用fullyRelease(0)方法釋放當前線程釋放lock,源碼以下:

final int fullyRelease(Node node) { boolean failed = true; try { int savedState = getState(); if (release(savedState)) { //成功釋放同步狀態

failed = false;            return savedState;
    } else {    //不成功釋放同步狀態拋出異常
        throw new IllegalMonitorStateException();
    }
} finally {        if (failed)
        node.waitStatus = Node.CANCELLED;
}

}
調用AQS的模板方法release()方法釋放AQS的同步狀態而且喚醒在同步隊列中頭結點的後繼節點引用的線程, 若是釋放成功則正常返回,若失敗的話就拋出異常。

如何從await()方法中退出?再看await()方法有這樣一段代碼:

while (!isOnSyncQueue(node)) { // 3. 當前線程進入到等待狀態

LockSupport.park(this);    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)        break;

}
當線程第一次調用condition.await()方法時, 會進入到這個while()循環中,而後經過LockSupport.park(this)方法使得當前線程進入等待狀態, 那麼要想退出這個await方法就要先退出這個while循環,退出while循環的出口有2個:

break退出while循環

while循環中的邏輯判斷爲false

第1種狀況的條件是當前等待的線程被中斷後會走到break退出,

第2種狀況是當前節點被移動到了同步隊列中,(即另外線程調用的condition的signal或者signalAll方法), while中邏輯判斷爲false後結束while循環。

當退出while循環後就會調用acquireQueued(node, savedState),該方法的做用是 在自旋過程當中線程不斷嘗試獲取同步狀態,直至成功(線程獲取到lock)。

這樣就說明了退出await方法必須是已經得到了Condition引用(關聯)的Lock。

await方法示意圖以下:

clipboard.png

調用condition.await方法的線程必須是已經得到了lock,也就是當前線程是同步隊列中的頭結點。 調用該方法後會使得當前線程所封裝的Node尾插入到等待隊列中。

超時機制的支持

condition還額外支持了超時機制,使用者可調用方法awaitNanos,awaitUtil。 這兩個方法的實現原理,基本上與AQS中的tryAcquire方法一模一樣。

不響應中斷的支持

調用condition.awaitUninterruptibly()方法,該方法的源碼爲:

public final void awaitUninterruptibly() { Node node = addConditionWaiter(); int savedState = fullyRelease(node); boolean interrupted = false; while (!isOnSyncQueue(node)) { LockSupport.park(this); if (Thread.interrupted())

interrupted = true;
}    if (acquireQueued(node, savedState) || interrupted)
    selfInterrupt();

}
與上面的await方法基本一致,只不過減小了對中斷的處理, 並省略了reportInterruptAfterWait方法拋被中斷的異常。

signal和signalAll實現原理
調用Condition的signal或者signalAll方法能夠將 等待隊列中等待時間最長的節點移動到同步隊列中,使得該節點可以有機會得到lock。 按照等待隊列是先進先出(FIFO)的, 因此等待隊列的頭節點必然會是等待時間最長的節點, 也就是每次調用condition的signal方法是將頭節點移動到同步隊列中。 signal()源碼以下:

public final void signal() { //1. 先檢測當前線程是否已經獲取lock

if (!isHeldExclusively())        throw new IllegalMonitorStateException();    //2. 獲取等待隊列中第一個節點,以後的操做都是針對這個節點

Node first = firstWaiter; if (first != null)

doSignal(first);

}
signal方法首先會檢測當前線程是否已經獲取lock, 若是沒有獲取lock會直接拋出異常,若是獲取的話再獲得等待隊列的頭指針引用的節點,doSignal方法也是基於該節點。 doSignal方法源碼以下:

private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null)

lastWaiter = null;    //1. 將頭結點從等待隊列中移除
    first.nextWaiter = null;    //2. while中transferForSignal方法對頭結點作真正的處理
} while (!transferForSignal(first) &&
         (first = firstWaiter) != null);

}
真正對頭節點作處理的是transferForSignal(),該方法源碼以下:

final boolean transferForSignal(Node node) { //1. 更新狀態爲0

if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))        return false;    //2.將該節點移入到同步隊列中去
Node p = enq(node);    int ws = p.waitStatus;    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))        LockSupport.unpark(node.thread);    return true;

}
這段代碼主要作了兩件事情:

1.將頭結點的狀態更改成CONDITION

2.調用enq方法,將該節點尾插入到同步隊列中

調用condition的signal的前提條件是 當前線程已經獲取了lock,該方法會使得等待隊列中的頭節點(等待時間最長的那個節點)移入到同步隊列, 而移入到同步隊列後纔有機會使得等待線程被喚醒, 即從await方法中的LockSupport.park(this)方法中返回,從而纔有機會使得調用await方法的線程成功退出。

signal方法示意圖以下:

clipboard.png

signalAll

sigllAll與sigal方法的區別體如今doSignalAll方法上。doSignalAll()的源碼以下:

private void doSignalAll(Node first) {

lastWaiter = firstWaiter = null;    do {        Node next = first.nextWaiter;
    first.nextWaiter = null;
    transferForSignal(first);
    first = next;
} while (first != null);

}
doSignal方法只會對等待隊列的頭節點進行操做,而doSignalAll方法將等待隊列中的每個節點都移入到同步隊列中, 即「通知」當前調用condition.await()方法的每個線程。

await與signal和signalAll的結合
await和signal和signalAll方法就像一個開關控制着線程A(等待方)和線程B(通知方)。 它們之間的關係能夠用下面一個圖來表現得更加貼切:

clipboard.png

線程awaitThread先經過lock.lock()方法獲取鎖成功後調用了condition.await方法進入等待隊列, 而另外一個線程signalThread經過lock.lock()方法獲取鎖成功後調用了condition.signal或者signalAll方法, 使得線程awaitThread可以有機會移入到同步隊列中, 當其餘線程釋放lock後使得線程awaitThread可以有機會獲取lock, 從而使得線程awaitThread可以從await方法中退出,而後執行後續操做。 若是awaitThread獲取lock失敗會直接進入到同步隊列。

免費Java高級資料須要本身領取,涵蓋了Java、Redis、MongoDB、MySQL、Zookeeper、Spring Cloud、Dubbo高併發分佈式等教程,一共30G。

傳送門:https://mp.weixin.qq.com/s/Jz...

相關文章
相關標籤/搜索