AbstractQueuedSynchronizer 原理分析 - Condition 實現原理

1. 簡介

Condition是一個接口,AbstractQueuedSynchronizer 中的ConditionObject內部類實現了這個接口。Condition聲明瞭一組等待/通知的方法,這些方法的功能與Object中的wait/notify/notifyAll等方法類似。這二者相同的地方在於,它們所提供的等待/通知方法均是爲了協同線程的運行秩序。只不過,Object 中的方法須要配合 synchronized 關鍵字使用,而 Condition 中的方法則要配合鎖對象使用,並經過newCondition方法獲取實現類對象。除此以外,Condition 接口中聲明的方法功能上更爲豐富一些。好比,Condition 聲明瞭具備不響應中斷和超時功能的等待接口,這些都是 Object wait 方法所不具有的。html

本篇文章是上一篇文章AbstractQueuedSynchronizer 原理分析 - 獨佔/共享模式的續篇,在學習 Condition 的原理前,建議你們先去了解 AbstractQueuedSynchronizer 同步隊列相關原理。本篇文章會涉及到同步隊列相關知識,這些知識在上一篇文章分析過。java

關於Condition的簡介這裏先說到這,接下來分析一下Condition實現類ConditionObject的原理。node

2. 實現原理

ConditionObject是經過基於單鏈表的條件隊列來管理等待線程的。線程在調用await方法進行等待時,會釋放同步狀態。同時線程將會被封裝到一個等待節點中,並將節點置入條件隊列尾部進行等待。當有線程在獲取獨佔鎖的狀況下調用signalsingalAll方法時,隊列中的等待線程將會被喚醒,從新競爭鎖。另外,須要說明的是,一個鎖對象可同時建立多個 ConditionObject 對象,這意味着多個競爭同一獨佔鎖的線程可在不一樣的條件隊列中進行等待。在喚醒時,可喚醒指定條件隊列中的線程。其大體示意圖以下:併發

以上就是 ConditionObject 所實現的等待/通知機制的大體原理,並非很難理解。固然,在具體的實現中,則考慮的更爲細緻一些。相關細節將會在接下來一章中進行說明,繼續往下看吧。源碼分析

3. 源碼解析

3.1 等待

ConditionObject 中實現了幾種不一樣的等待方法,每種方法均有它本身的特色。好比await()會響應中斷,而awaitUninterruptibly()則不響應中斷。await(long, TimeUnit)則會在響應中斷的基礎上,新增了超時功能。除此以外,還有一些等待方法,這裏就不一一列舉了。學習

在本節中,我將主要分析await()的方法實現。其餘的等待方法大同小異,就不一一分析了,有興趣的朋友能夠本身看一下。好了,接下來進入源碼分析階段。ui

/**
 * await 是一個響應中斷的等待方法,主要邏輯流程以下:
 * 1. 若是線程中斷了,拋出 InterruptedException 異常
 * 2. 將線程封裝到節點對象裏,並將節點添加到條件隊列尾部
 * 3. 保存並徹底釋放同步狀態,保存下來的同步狀態在從新競爭鎖時會用到
 * 4. 線程進入等待狀態,直到被通知或中斷纔會恢復運行
 * 5. 使用第3步保存的同步狀態去競爭獨佔鎖
 */
public final void await() throws InterruptedException {
    // 線程中斷,則拋出中斷異常,對應步驟1
    if (Thread.interrupted())
        throw new InterruptedException();
    
    // 添加等待節點到條件隊列尾部,對應步驟2
    Node node = addConditionWaiter();
    
    // 保存並徹底釋放同步狀態,對應步驟3。此方法的意義會在後面詳細說明。
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    
    /*
     * 判斷節點是否在同步隊列上,若是不在則阻塞線程。
     * 循環結束的條件:
     * 1. 其餘線程調用 singal/singalAll,node 將會被轉移到同步隊列上。node 對應線程將
     *    會在獲取同步狀態的過程當中被喚醒,並走出 while 循環。
     * 2. 線程在阻塞過程當中產生中斷
     */ 
    while (!isOnSyncQueue(node)) {
        // 調用 LockSupport.park 阻塞當前線程,對應步驟4
        LockSupport.park(this);
        
        /*
         * 檢測中斷模式,這裏有兩種中斷模式,以下:
         * THROW_IE:
         *     中斷在 node 轉移到同步隊列「前」發生,須要當前線程自行將 node 轉移到同步隊
         *     列中,並在隨後拋出 InterruptedException 異常。
         *     
         * REINTERRUPT:
         *     中斷在 node 轉移到同步隊列「期間」或「以後」發生,此時代表有線程正在調用 
         *     singal/singalAll 轉移節點。在該種中斷模式下,再次設置線程的中斷狀態。
         *     向後傳遞中斷標誌,由後續代碼去處理中斷。
         */
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    
    /*
     * 被轉移到同步隊列的節點 node 將在 acquireQueued 方法中從新獲取同步狀態,注意這裏
     * 的這裏的 savedState 是上面調用 fullyRelease 所返回的值,與此對應,能夠把這裏的 
     * acquireQueued 做用理解爲 fullyAcquire(並不存在這個方法)。
     * 
     * 若是上面的 while 循環沒有產生中斷,則 interruptMode = 0。但 acquireQueued 方法
     * 可能會產生中斷,產生中斷時返回 true。這裏仍將 interruptMode 設爲 REINTERRUPT,
     * 目的是繼續向後傳遞中斷,acquireQueued 不會處理中斷。
     */
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    
    /*
     * 正常經過 singal/singalAll 轉移節點到同步隊列時,nextWaiter 引用會被置空。
     * 若發生線程產生中斷(THROW_IE)或 fullyRelease 方法出現錯誤等異常狀況,
     * 該引用則不會被置空
     */ 
    if (node.nextWaiter != null) // clean up if cancelled
        // 清理等待狀態非 CONDITION 的節點
        unlinkCancelledWaiters();
        
    if (interruptMode != 0)
        /*
         * 根據 interruptMode 以爲中斷的處理方式:
         *   THROW_IE:拋出 InterruptedException 異常
         *   REINTERRUPT:從新設置線程中斷標誌
         */ 
        reportInterruptAfterWait(interruptMode);
}

/** 將當先線程封裝成節點,並將節點添加到條件隊列尾部 */
private Node addConditionWaiter() {
    Node t = lastWaiter;
    /*
     * 清理等待狀態爲 CANCELLED 的節點。fullyRelease 內部調用 release 發生異常或釋放同步狀
     * 態失敗時,節點的等待狀態會被設置爲 CANCELLED。因此這裏要清理一下已取消的節點
     */
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    
    // 建立節點,並將節點置於隊列尾部
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

/** 清理等待狀態爲 CANCELLED 的節點 */ 
private void unlinkCancelledWaiters() {
    Node t = firstWaiter;
    // 指向上一個等待狀態爲非 CANCELLED 的節點
    Node trail = null;
    while (t != null) {
        Node next = t.nextWaiter;
        if (t.waitStatus != Node.CONDITION) {
            t.nextWaiter = null;
            /*
             * trail 爲 null,代表 next 以前的節點等待狀態均爲 CANCELLED,此時更新 
             * firstWaiter 引用的指向。
             * trail 不爲 null,代表 next 以前有節點的等待狀態爲 CONDITION,這時將 
             * trail.nextWaiter 指向 next 節點。
             */
            if (trail == null)
                firstWaiter = next;
            else
                trail.nextWaiter = next;
            // next 爲 null,代表遍歷到條件隊列尾部了,此時將 lastWaiter 指向 trail
            if (next == null)
                lastWaiter = trail;
        }
        else
            // t.waitStatus = Node.CONDITION,則將 trail 指向 t
            trail = t;
        t = next;
    }
}
   
/**
 * 這個方法用於徹底釋放同步狀態。這裏解釋一下徹底釋放的緣由:爲了不死鎖的產生,鎖的實現上
 * 通常應該支持重入功能。對應的場景就是一個線程在不釋放鎖的狀況下能夠屢次調用同一把鎖的 
 * lock 方法進行加鎖,且不會加鎖失敗,如失敗必然致使致使死鎖。鎖的實現類可經過 AQS 中的整型成員
 * 變量 state 記錄加鎖次數,每次加鎖,將 state++。每次 unlock 方法釋放鎖時,則將 state--,
 * 直至 state = 0,線程徹底釋放鎖。用這種方式便可實現了鎖的重入功能。
 */
final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        // 獲取同步狀態數值
        int savedState = getState();
        // 調用 release 釋放指定數量的同步狀態
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        // 若是 relase 出現異常或釋放同步狀態失敗,此處將 node 的等待狀態設爲 CANCELLED
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}

/** 該方法用於判斷節點 node 是否在同步隊列上 */
final boolean isOnSyncQueue(Node node) {
    /*
     * 節點在同步隊列上時,其狀態可能爲 0、SIGNAL、PROPAGATE 和 CANCELLED 其中之一,
     * 但不會爲 CONDITION,因此可已經過節點的等待狀態來判斷節點所處的隊列。
     * 
     * node.prev 僅會在節點獲取同步狀態後,調用 setHead 方法將本身設爲頭結點時被置爲 
     * null,因此只要節點在同步隊列上,node.prev 必定不會爲 null
     */
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
        
    /*
     * 若是節點後繼被爲 null,則代表節點在同步隊列上。由於條件隊列使用的是 nextWaiter 指
     * 向後繼節點的,條件隊列上節點的 next 指針均爲 null。但僅以 node.next != null 條
     * 件判定節點在同步隊列是不充分的。節點在入隊過程當中,是先設置 node.prev,後設置 
     * node.next。若是設置完 node.prev 後,線程被切換了,此時 node.next 仍然爲 
     * null,但此時 node 確實已經在同步隊列上了,因此這裏還須要進行後續的判斷。
     */
    if (node.next != null)
        return true;
        
    // 在同步隊列上,從後向前查找 node 節點
    return findNodeFromTail(node);
}

/** 因爲同步隊列上的的節點 prev 引用不會爲空,因此這裏從後向前查找 node 節點 */
private boolean findNodeFromTail(Node node) {
    Node t = tail;
    for (;;) {
        if (t == node)
            return true;
        if (t == null)
            return false;
        t = t.prev;
    }
}

/** 檢測線程在等待期間是否發生了中斷 */
private int checkInterruptWhileWaiting(Node node) {
    return Thread.interrupted() ?
        (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
        0;
}

/** 
 * 判斷中斷髮生的時機,分爲兩種:
 * 1. 中斷在節點被轉移到同步隊列前發生,此時返回 true
 * 2. 中斷在節點被轉移到同步隊列期間或以後發生,此時返回 false
 */
final boolean transferAfterCancelledWait(Node node) {

    // 中斷在節點被轉移到同步隊列前發生,此時自行將節點轉移到同步隊列上,並返回 true
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
        // 調用 enq 將節點轉移到同步隊列中
        enq(node);
        return true;
    }
    
    /*
     * 若是上面的條件分支失敗了,則代表已經有線程在調用 signal/signalAll 方法了,這兩個
     * 方法會先將節點等待狀態由 CONDITION 設置爲 0 後,再調用 enq 方法轉移節點。下面判斷節
     * 點是否已經在同步隊列上的緣由是,signal/signalAll 方法可能僅設置了等待狀態,還沒
     * 來得及轉移節點就被切換走了。因此這裏用自旋的方式判斷 signal/signalAll 是否已經完
     * 成了轉移操做。這種狀況代表了中斷髮生在節點被轉移到同步隊列期間。
     */
    while (!isOnSyncQueue(node))
        Thread.yield();
    }
    
    // 中斷在節點被轉移到同步隊列期間或以後發生,返回 false
    return false;
}

/**
 * 根據中斷類型作出相應的處理:
 * THROW_IE:拋出 InterruptedException 異常
 * REINTERRUPT:從新設置中斷標誌,向後傳遞中斷
 */
private void reportInterruptAfterWait(int interruptMode)
    throws InterruptedException {
    if (interruptMode == THROW_IE)
        throw new InterruptedException();
    else if (interruptMode == REINTERRUPT)
        selfInterrupt();
}

/** 中斷線程 */   
static void selfInterrupt() {
    Thread.currentThread().interrupt();
}

3.2 通知

/** 將條件隊列中的頭結點轉移到同步隊列中 */
public final void signal() {
    // 檢查線程是否獲取了獨佔鎖,未獲取獨佔鎖調用 signal 方法是不容許的
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    
    Node first = firstWaiter;
    if (first != null)
        // 將頭結點轉移到同步隊列中
        doSignal(first);
}
    
private void doSignal(Node first) {
    do {
        /*
         * 將 firstWaiter 指向 first 節點的 nextWaiter 節點,while 循環將會用到更新後的 
         * firstWaiter 做爲判斷條件。
         */ 
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        // 將頭結點從條件隊列中移除
        first.nextWaiter = null;
    
    /*
     * 調用 transferForSignal 將節點轉移到同步隊列中,若是失敗,且 firstWaiter
     * 不爲 null,則再次進行嘗試。transferForSignal 成功了,while 循環就結束了。
     */
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

/** 這個方法用於將條件隊列中的節點轉移到同步隊列中 */
final boolean transferForSignal(Node node) {
    /*
     * 若是將節點的等待狀態由 CONDITION 設爲 0 失敗,則代表節點被取消。
     * 由於 transferForSignal 中不存在線程競爭的問題,因此下面的 CAS 
     * 失敗的惟一緣由是節點的等待狀態爲 CANCELLED。
     */ 
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    // 調用 enq 方法將 node 轉移到同步隊列中,並返回 node 的前驅節點 p
    Node p = enq(node);
    int ws = p.waitStatus;
    
    /*
     * 若是前驅節點的等待狀態 ws > 0,則代表前驅節點處於取消狀態,此時應喚醒 node 對應的
     * 線程去獲取同步狀態。若是 ws <= 0,這裏經過 CAS 將節點 p 的等待設爲 SIGNAL。
     * 這樣,節點 p 在釋放同步狀態後,纔會喚醒後繼節點 node。若是 CAS 設置失敗,則應當即
     * 喚醒 node 節點對應的線程。以避免因 node 沒有被喚醒致使同步隊列掛掉。關於同步隊列的相關的
     * 知識,請參考個人另外一篇文章「AbstractQueuedSynchronizer 原理分析 - 獨佔/共享模式」,
     * 連接爲:http://t.cn/RuERpHl
     */
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

看完了 signal 方法的分析,下面再來看看 signalAll 的源碼分析,以下:this

public final void signalAll() {
    // 檢查線程是否獲取了獨佔鎖
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignalAll(first);
}

private void doSignalAll(Node first) {
    lastWaiter = firstWaiter = null;
    /*
     * 將條件隊列中全部的節點轉移到同步隊列中。與 doSignal 方法略有不一樣,主要區別在 
     * while 循環的循環條件上,下的循環只有在條件隊列中沒節點後才終止。
     */ 
    do {
        Node next = first.nextWaiter;
        // 將 first 節點從條件隊列中移除
        first.nextWaiter = null;
        // 轉移節點到同步隊列上
        transferForSignal(first);
        first = next;    
    } while (first != null);
}

4. 其餘

在我閱讀 ConditionObject 源碼時發現了一個問題 - await 方法居然沒有作同步控制。而在 signal 和 signalAll 方法開頭都會調用 isHeldExclusively 檢測線程是否已經獲取了獨佔鎖,未獲取獨佔鎖調用這兩個方法會拋出異常。但在 await 方法中,卻沒有進行相關的檢測。若是在正確的使用方式下調用 await 方法是不會出現問題的,所謂正確的使用方式指的是在獲取鎖的狀況下調用 await 方法。但若是沒獲取鎖就調用該方法,就會產生線程競爭的狀況,這將會對條件隊列的結構形成破壞。這裏再來看一下新增節點的方法源碼,以下:spa

private Node addConditionWaiter() {
    Node t = lastWaiter;
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    Node node = new Node(Thread.currentThread(), Node.CONDITION);

    // 存在競爭時將會致使節點入隊出錯
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

假如如今有線程 t1 和 t2,對應節點 node1 和 node2。線程 t1 獲取了鎖,而 t2 未獲取鎖,此時條件隊列爲空,即 firstWaiter = lastWaiter = null。演繹一下會致使條件隊列被破壞的場景,以下:線程

  1. 時刻1:線程 t1 和 t2 同時執行到 if (t == null),兩個線程都認爲 if 條件知足
  2. 時刻2:線程 t1 初始化 firstWaiter,即將 firstWaiter 指向 node1
  3. 時刻3:線程 t2 再次修改 firstWaiter 的指向,此時 firstWaiter 指向 node2

如上,若是線程是按照上面的順序執行,這會致使隊列被破壞。firstWaiter 本應該指向 node1,但結果卻指向了 node2,node1 被排擠出了隊列。這樣會致使什麼問題呢?這樣可能會致使線程 t1 一直阻塞下去。由於 signal/signalAll 是從條件隊列頭部轉移節點的,但 node1 不在隊列中,因此 node1 沒法被轉移到同步隊列上。在不出現中斷的狀況下,node1 對應的線程 t1 會被永久阻塞住。

這裏未對 await 方法進行同步控制,致使條件隊列出現問題,應該算 ConditionObject 實現上的一個缺陷了。關於這個缺陷,博客園博主 活在夢裡 在他的文章 AbstractQueuedSynchronizer源碼解讀--續篇之Condition 中也提到了。並向 JDK 開發者提了一個 BUG,BUG 連接爲 JDK-8187408,有興趣的同窗能夠去看看。

5. 總結

到這裏,Condition 的原理就分析完了。分析完 Condition 原理,關於 AbstractQueuedSynchronizer 的分析也就結束了。整體來講,經過分析 AQS 並寫成博客,使我對 AQS 的原理有了更深入的認識。AQS 是 JDK 中鎖和其餘併發組件實現的基礎,弄懂 AQS 原理對後續在分析各類鎖和其餘同步組件大有裨益。

AQS 自己實現比較複雜,要處理各類各樣的狀況。做爲類庫,AQS 要考慮和處理各類可能的狀況,實現起來可謂很是複雜。不只如此,AQS 還很好的封裝了同步隊列的管理,線程的阻塞與喚醒等基礎操做,大大下降了繼承類實現同步控制功能的複雜度。因此,在本文的最後,再次向 AQS 的做者,Java 大師Doug Lea致敬。

好了,本文到此結束,謝謝你們閱讀。

參考

本文在知識共享許可協議 4.0 下發布,轉載需在明顯位置處註明出處
做者:coolblog
本文同步發佈在個人我的博客: http://www.coolblog.xyz

cc
本做品採用知識共享署名-非商業性使用-禁止演繹 4.0 國際許可協議進行許可。

相關文章
相關標籤/搜索