併發編程中條件變量Condition的源碼分析

掃描下方二維碼或者微信搜索公衆號菜鳥飛呀飛,便可關注微信公衆號,閱讀更多Spring源碼分析Java併發編程文章。java

微信公衆號

隊列同步器AQS是經過管程模型來實現的,在管程模型中,咱們提到了兩個隊列:入口等待隊列條件變量等待隊列。在AQS中同步隊列對應管程模型中的入口等待隊列條件等待隊列對應管程模型中的條件變量等待隊列。關於AQS的同步隊列的設計原理及源碼實現能夠閱讀這兩篇文章:隊列同步器(AQS)的設計原理隊列同步器(AQS)源碼分析。今天將詳細分析AQS中等待隊列的設計原理和源碼。(關於管程的介紹能夠參考這篇文章:管程:併發編程的基石node

簡介

  • 在併發領域中須要解決的兩個問題:互斥同步,互斥指的是同一時刻只容許一個線程訪問共享資源,這一點AQS的同步隊列已經幫助咱們解決了。同步指的是線程間如何進行通訊和協做,那麼AQS又是如何來解決同步問題的呢?答案就是今天的主角:Condition
  • Condition是JUC包下的一個接口,它的一個實現類ConditionObject是隊列同步器AbstractQueuedSynchronizer(後面簡稱AQS)的一個內部類。它須要與Lock一塊兒使用,經過Lock.newCondition()來建立實例。
  • 在Object類中有三個native方法:wait()、notify()、notifyAll(),它們須要與synchronize關鍵字一塊兒使用,用它來實現線程之間的通訊。而Condition提供了await()、signal()、signalAll()三個方法,它們分別與Object類的這三個方法對應,也是用來實現線程以前的通訊的,可是它們在功能和使用法方式上存在部分差別。具體差別能夠參考下表。(表格來源於《Java併發編程的藝術》一書第5章第6節)
對比項 Object Condition
使用前提 使用synchronize獲取到鎖 使用Lock實例的lock()方法獲取到鎖
使用方式 object.wait()、object.notify()等 須要使用Lock接口的實例對象來建立,Lock.newCondition()
等待隊列個數 支持1個 能夠支持多個,使用Lock實例new多個Condition便可
線程進入等待隊列後是否響應中斷 不支持 支持
超時等待 支持 支持
線程釋放鎖後等待到未來的某個時間點 不支持 支持
喚醒等待隊列中的一個線程 支持,notify() 支持,signal()
喚醒等待隊列中的全部線程 支持,notifyAll() 支持,signalAll()

AQS中如何設計等待對列

數據結構

  • AQS中的同步隊列的數據結構是Node元素造成的一個雙向鏈表,一樣,等待隊列也是以Node元素造成的一個鏈表,只不過是單向鏈表。在 隊列同步器(AQS)的設計原理 這篇文章中已經介紹過Node的屬性了,今天再來複習一下它的屬性以及用途。
屬性名 做用
Node prev 同步隊列中,當前節點的前一個節點,若是當前節點是同步隊列的頭結點,那麼prev屬性爲null
Node next 同步隊列中,當前節點的後一個節點,若是當前節點是同步隊列的尾結點,那麼next屬性爲null
Node thread 當前節點表明的線程,若是當前線程獲取到了鎖,那麼當前線程所表明的節點必定處於同步隊列的隊首,且thread屬性爲null,至於爲何要將其設置爲null,這是AQS特地設計的。
int waitStatus 當前線程的等待狀態,有5種取值。0表示初始值,1表示線程被取消,-1表示當前線程處於等待狀態,-2表示節點處於等待隊列中,-3表示下一次共享式同步狀態獲取將會無條件地被傳播下去
Node nextWaiter 等待隊列中,該節點的下一個節點
  • AQS等待隊列的實現主要依靠的是Node節點中nextWaiter屬性和waitStatus屬性來實現的,其中waitStatus=-2時,表示線程是處於等待隊列中。(同步隊列依靠prev和next屬性來實現雙向鏈表)。Condition包含兩個屬性:firstWaiterlastWaiter,分別表示等待隊列的隊首和隊尾。等待隊列遵循先進先出的原則(FIFO)。下圖爲Condition等待隊列的示意圖。

等待隊列示意圖

實現原理

  • synchronize實現鎖的原理也是經過管程模型實現的,可是synchronize實現的鎖,只支持一個等待隊列,而Condition能夠支持多個等待隊列。在AQS中,等待隊列和同步隊列的示意圖以下。

同步隊列和等待隊列

  • 當線程經過Lock獲取鎖之後,調用Condition的await()方法時,當前線程會先將本身封裝成一個Node節點,而後將這個節點添加到等待隊列中;而後釋放鎖;最後調用LockSopport.park()方法將本身掛起。可使用以下示意圖表示。

await()示意圖

  • 當調用Condition的signal()方法時,首先會移除等待隊列的firstWaiter節點,而後將firstWaiter節點加入到同步隊列中。示意圖以下。

signal()示意圖

  • 若是是調用Condition的signalAll()方法,那麼就會將Condition等待隊列中全部Node節點移到同步隊列中

源碼分析

理解了上面的等待隊列的數據結構和實現原理,接下來就結合源碼看看具體的實現。接下來將分析await()方法和signal()方法的源碼。編程

await()方法

當調用condition.await()方法時,會調用到AbstractQueuedSynchornizer中的內部類ConditionObject的await()方法。該方法的源碼以下。設計模式

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    // 將當前線程加入到等待隊列中
    Node node = addConditionWaiter();
    // 徹底釋放鎖
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    // 判斷同步節點是否在同步隊列中,
    // 若是在當前線程釋放鎖後,鎖被其餘線程搶到了,此時當前節點就不在同步隊列中了。
    // 若是鎖沒有被搶佔到,節點就會再同步隊列中(當前線程是持有鎖的線程,因此它是頭結點)
    while (!isOnSyncQueue(node)) {
        // 若是節點不在同步隊列中,將當前線程park
        LockSupport.park(this);
        /** * 當被喚醒之後,接着從下面開始執行。醒來後會判斷本身在等待過程當中有沒有被中斷過。 * checkInterruptWhileWaiting()方法返回0表示沒有被中斷過 * 返回-1表示須要拋出異常 * 返回1表示須要重置中斷標識 */
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // 若是線程在同步隊列中,那麼就嘗試去獲取鎖,若是獲取不到,就會加入到同步隊列中,並阻塞
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    // 當條件等待隊列中還有其餘線程在等待時,須要判斷條件等待隊列中有沒有線程被取消,若是有,則將它們清除
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}
複製代碼
  • 在await()方法中,當前線程會先調用addConditionWaiter()方法,將本身封裝成一個Node後,加入到等待隊列中。而後調用fullyRelease()方法釋放鎖,接着經過判斷當前線程是否在等待隊列中,若是不在等待隊列中,就將本身park
  • 在addConditionWaiter()方法中,主要乾了兩件事。一:將等待隊列中處於取消狀態的節點刪除,即waitStatus=1的Node節點;二:將本身加入到等待隊列,令Condition的lastWaiter屬性等於當前線程所表明的節點。(注意:此時若是等待隊列沒有進行初始化時,會先進行初始化)。addConditionWaiter()方法的源碼以下。
/** * Adds a new waiter to wait queue. * @return its new wait node * 將當前線程加入到等待隊列中 */
private Node addConditionWaiter() {
    Node t = lastWaiter;
    // If lastWaiter is cancelled, clean out.
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    // 建立一個節點,節點的waitStatus等於-2
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    // 若是等待隊列尚未初始化,即等跌隊列中尚未任何元素,那麼此時firstWaiter和lastWaiter均爲null
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node; // 令舊的隊尾節點nextWaiter屬性等於當前線程的節點,這樣就維護了隊列的先後關係
    lastWaiter = node;
    return node;
}
複製代碼
  • 對於fullyRelease()方法,從方法名看,就知道它的做用是徹底釋放鎖。這裏爲何要徹底釋放鎖呢?由於對於重入鎖而言,鎖可能被重入了屢次,此時同步變量state的值大於1,而調用await()方法時,要讓當前線程將鎖釋放掉,因此須要將state的值減爲0,所以這裏取名爲fullyRelease()。fullyRelease()最終仍是調用AQS的release()方法來釋放鎖。fullyRelease()方法的源碼以下。
final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        // 獲取同步變量state的值
        int savedState = getState();
        // 釋放鎖,注意此時將同步變量的值傳入進去了,若是是重入鎖,且被重入過,那麼此時savedState的值大於1
        // 此時釋放鎖時,會將同步變量state的值減爲0。(一般可重入鎖在釋放鎖時,每次只會將state減1,重入了幾回就要釋放幾回,在這裏是一會兒所有釋放)。
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            // 當線程沒有獲取到鎖時,調用該方法會釋放失敗,會拋出異常。
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}
複製代碼
  • 在await()方法中,當釋放完鎖之後,緊接着經過isOnSyncQueue()判斷當前線程的節點是否是在同步隊列中,若是isOnSyncQueue()方法返回true,表示節點在同步隊列中,若是返回false,表示當前節點不在同步隊列中。當返回false時,會進入到while循環中,此時會調用LockSupport.park()方法,讓當前線程掛起。因爲當前線程在釋放鎖之後會喚醒同步隊列中的線程去搶鎖,若是有線程搶到鎖,那麼當前線程就確定不會在同步隊列了(同步隊列的首節點變化了),因此此時isOnSyncQueue()方法大機率返回的是false,所以會進入到while()方法中。若是當前線程在同步隊列中,或者從park()處醒來後(醒來後會出如今同步隊列中,由於signal()或者signalAll()方法會將節點移到同步隊列),就會執行到await()方法後面的邏輯,即acquireQueued()方法,該方法就是去嘗試獲取鎖,若是獲取到就會返回,獲取不到就阻塞。
  • isOnSyncQueue()的源碼以及註釋以下。
final boolean isOnSyncQueue(Node node) {
    // 若是節點的waitStatus=-2時,節點確定不在同步隊列中,由於只有在等待隊列時,纔會爲-2。
    // 若是節點的前驅節點爲空時,有兩種狀況:
    // 1. 當前節點是同步隊列的首節點,首節點是已經獲取到鎖的線程,能夠認爲線程不在同步隊列中
    // 2. 當前節點在等待隊列中,等待隊列中的節點在建立時,沒有給prev屬性賦值
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    // 在上一個判斷的前提條件下,若是後置節點不爲空,那麼當前節點確定在同步隊列中。
    if (node.next != null) // If has successor, it must be on queue
        return true;
    /* * node.prev can be non-null, but not yet on queue because * the CAS to place it on queue can fail. So we have to * traverse from tail to make sure it actually made it. It * will always be near the tail in calls to this method, and * unless the CAS failed (which is unlikely), it will be * there, so we hardly ever traverse much. */
    // 以上狀況均不屬於,那麼就從同步隊列的尾部開始遍歷,找到同步隊列中是否含有node節點
    return findNodeFromTail(node);
}
複製代碼
  • 關於await()方法,總結起來就是三個步驟:1. 將本身加到等待隊列;2. 釋放鎖;3. 將本身park()。在Condition接口中還提供了幾個重載的await()方法,它們在await()方法的基礎上添加了部分功能,例如超時等待、不響應中斷的等待等功能,但大體邏輯和await()方法相似,有興趣的朋友能夠去研究下。

signal()方法

  • 當調用condition.signal()方法時,會調用到AbstractQueuedSynchornizer中的內部類ConditionObject的signal()方法。該方法的源碼以下。
public final void signal() {
    // 先判斷當前線程有沒有獲取到鎖,若是沒有獲取到鎖就來調用signal()方法,就會拋出異常
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}
複製代碼
  • 在signal()方法中會判斷當期線程是不是鎖的擁有者,若是不是,就拋出異常。這就是爲何說調用await()和signal()、signalAll()方法的前提條件是:當前線程獲取到了鎖,由於這幾個方法在執行具體邏輯以前都判斷了當前線程是否等於持有鎖的線程。從代碼中能夠看到,signal()方法的具體邏輯是在doSignal()方法中實現的。doSignal()方法的源碼以下。
private void doSignal(Node first) {
    do {
        // 令firstWaiter等於條件等待隊列中的下一個節點。
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
        /** *調用transferForSignal()方法,是將節點從條件等待隊列中移到同步隊列中. * 當transferForSignal()返回true時,表示節點被成功移到同步隊列了。返回false,表示移動失敗,當節點所表示的線程被取消時,會返回false * 當transferForSignal()返回true時,do...while循環結束。返回false時,繼續。爲何要這樣呢? * 由於當transferForSignal()返回false表示條件等待隊列中的,隊列的頭結點的狀態時取消狀態,不能將它移到同步隊列中,隨意須要繼續從條件等待隊列找沒有被取消的節點。 */
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}
複製代碼
  • doSignal()方法的做用就是將等待隊列的頭節點從等待隊列中移除,而後調用transferForSignal()方法將其加入到同步隊列中,當transferForSignal()返回true時,表示節點被成功移到了同步隊列中,返回false表示移動失敗,只有一種狀況會移動失敗,那就是線程被取消了。transferForSignal()方法的源碼以下。
/** * Transfers a node from a condition queue onto sync queue. * Returns true if successful. * @param node the node * @return true if successfully transferred (else the node was * cancelled before signal) * 將節點從條件等待隊列移到同步隊列 */
final boolean transferForSignal(Node node) {
    /* * If cannot change waitStatus, the node has been cancelled. */
    // 將節點的waitStatus的值從-2改成-1。這裏若是出現CAS失敗,說明節點的waitStatus值被修改過,在條件等待隊列中,只有當線程被取消後,纔會去修改waitStatus
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    /* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */
    // 當加入到同步隊列後,須要將當前節點的前一個節點的waitStatus設置爲-1,表示隊列中還有線程在等待
    // 若是前驅節點的waitStatus大於0表示線程被取消,須要將當前線程喚醒
    // 或者修改前驅節點的waitStatus是失敗,也須要去喚醒當前線程
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}
複製代碼
  • signal()方法總結:將等待隊列的頭節點移動到同步隊列中。細心的朋友可能會發現,調用signal()方法,並不會讓當前線程釋放鎖,因此當線程調用完signal()方法後,當前線程會將本身的業務邏輯執行完,直到在當前線程中調用了lock.unlock()方法,纔會釋放鎖。

signalAll()方法

  • signalAll()方法的做用就是喚醒等待隊列中的全部節點,而signal()方法只喚醒等待隊列的第一個節點。當調用condition.signalAll()方法時,會調用到AbstractQueuedSynchornizer中的內部類ConditionObject的signalAll()方法。該方法的源碼以下。
public final void signalAll() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        // 核心邏輯在doSignalAll()方法
        doSignalAll(first);
}
複製代碼
  • 在signalAll()方法中會調用doSignalAll()方法。doSignalAll()方法的源碼以下。
private void doSignalAll(Node first) {
    lastWaiter = firstWaiter = null;
    // 經過do...while循環,遍歷等待隊列的全部節點
    do {
        Node next = first.nextWaiter;
        first.nextWaiter = null;
        // transferForSignal()方法將節點移到到同步隊列
        transferForSignal(first);
        first = next;
    } while (first != null);
}
複製代碼
  • 能夠發現,doSignalAll()方法經過do...while循環,遍歷等待隊列的全部節點,循環調用transferForSignal()方法,將等待隊列中的節點所有移動到同步隊列中。

總結

  • 本文主要介紹了AQS中等待隊列的功能,對比了它與Object中對應方法的異同點。而後經過分析等待隊列的數據結構,並結合示意圖展現了await()方法和signal()方法的原理。最後經過具體的源碼實現,詳細分析了await()、signal()、signalAll()這三個方法。
  • Condition使用起來十分方便,開發人員只須要經過Lock.newCondition()方法就能建立Condition實例,屢次調用Lock.newCondition()方法,那麼就會建立多個條件等待隊列
  • Condition的應用十分普遍,咱們常常接觸的有界隊列LinkedBlockingQueue就是經過Condition來實現的,有興趣的朋友能夠先去閱讀下源碼。

相關推薦

微信公衆號
相關文章
相關標籤/搜索