逐行分析AQS源碼(4)——Condition接口實現

前言

本篇文章是基於線程間的同步與通訊(4)——Lock 和 Condtion 這篇文章寫的,在那篇文章中,咱們分析了Condition接口所定義的方法,本篇咱們就來看看AQS對於Condition接口的這些接口方法的具體實現。java

下文中筆者將假設讀者已經閱讀過那篇文章,或者已經瞭解了相關的背景知識。node

系列文章目錄segmentfault

概述

咱們在前面介紹Conditon的時候說過,Condition接口的await/signal機制是設計用來代替監視器鎖的wait/notify機制 的,所以,與監視器鎖的wait/notify機制對照着學習有助於咱們更好的理解Conditon接口:數組

Object 方法 Condition 方法 區別
void wait() void await()
void wait(long timeout) long awaitNanos(long nanosTimeout) 時間單位,返回值
void wait(long timeout, int nanos) boolean await(long time, TimeUnit unit) 時間單位,參數類型,返回值
void notify() void signal()
void notifyAll() void signalAll()
- void awaitUninterruptibly() Condition獨有
- boolean awaitUntil(Date deadline) Condition獨有

這裏先作一下說明,本文說wait方法時,是泛指wait()wait(long timeout)wait(long timeout, int nanos) 三個方法,當須要指明某個特定的方法時,會帶上相應的參數。一樣的,說notify方法時,也是泛指notify(),notifyAll()方法,await方法和signal方法以此類推。數據結構

首先,咱們經過wait/notify機制來類比await/signal機制:併發

  1. 調用wait方法的線程首先必須是已經進入了同步代碼塊,即已經獲取了監視器鎖;與之相似,調用await方法的線程首先必須得到lock鎖
  2. 調用wait方法的線程會釋放已經得到的監視器鎖,進入當前監視器鎖的等待隊列(wait set)中;與之相似,調用await方法的線程會釋放已經得到的lock鎖,進入到當前Condtion對應的條件隊列中。
  3. 調用監視器鎖的notify方法會喚醒等待在該監視器鎖上的線程,這些線程將開始參與鎖競爭,並在得到鎖後,從wait方法處恢復執行;與之相似,調用Condtion的signal方法會喚醒對應的條件隊列中的線程,這些線程將開始參與鎖競爭,並在得到鎖後,從await方法處開始恢復執行。

實戰

因爲前面咱們已經學習過了監視器鎖的wait/notify機制,await/signal的用法基本相似。在正式分析源碼以前,咱們先來看一個使用condition的實例:less

class BoundedBuffer {
    final Lock lock = new ReentrantLock();
    final Condition notFull = lock.newCondition();
    final Condition notEmpty = lock.newCondition();

    final Object[] items = new Object[100];
    int putptr, takeptr, count;

    // 生產者方法,往數組裏面寫數據
    public void put(Object x) throws InterruptedException {
        lock.lock();
        try {
            while (count == items.length)
                notFull.await(); //數組已滿,沒有空間時,掛起等待,直到數組「非滿」(notFull)
            items[putptr] = x;
            if (++putptr == items.length) putptr = 0;
            ++count;
            // 由於放入了一個數據,數組確定不是空的了
            // 此時喚醒等待這notEmpty條件上的線程
            notEmpty.signal(); 
        } finally {
            lock.unlock();
        }
    }

    // 消費者方法,從數組裏面拿數據
    public Object take() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0)
                notEmpty.await(); // 數組是空的,沒有數據可拿時,掛起等待,直到數組非空(notEmpty)
            Object x = items[takeptr];
            if (++takeptr == items.length) takeptr = 0;
            --count;
            // 由於拿出了一個數據,數組確定不是滿的了
            // 此時喚醒等待這notFull條件上的線程
            notFull.signal();
            return x;
        } finally {
            lock.unlock();
        }
    }
}

這是java官方文檔提供的例子,是一個典型的生產者-消費者模型。這裏在同一個lock鎖上,建立了兩個條件隊列notFull, notEmpty。當數組已滿,沒有存儲空間時,put方法在notFull條件上等待,直到數組「not full」;當數組空了,沒有數據可讀時,take方法在notEmpty條件上等待,直到數組「not empty」,而notEmpty.signal()notFull.signal()則用來喚醒等待在這個條件上的線程。函數

注意,上面所說的,在notFull notEmpty條件上等待事實上是指線程在條件隊列(condition queue)上等待,當該線程被相應的signal方法喚醒後,將進入到咱們前面三篇介紹的sync queue中去爭鎖,爭到鎖後才能能await方法處返回。這裏接牽涉到兩種隊列了——condition queuesync queue,它們都定義在AQS中。源碼分析

爲了防止你們被AQS中的隊列弄暈,這裏咱們先理理清:學習

同步隊列 vs 條件隊列

sync queue

首先,在逐行分析AQS源碼(1)——獨佔鎖的獲取這篇中咱們說過,全部等待鎖的線程都會被包裝成Node扔到一個同步隊列中。該同步隊列以下:
dummy head

sync queue是一個雙向鏈表,咱們使用prevnext屬性來串聯節點。可是在這個同步隊列中,咱們一直沒有用到nextWaiter屬性,即便是在共享鎖模式下,這一屬性也只做爲一個標記,指向了一個空節點,所以,在sync queue中,咱們不會用它來串聯節點。

condtion queue

每建立一個Condtion對象就會對應一個Condtion隊列,每個調用了Condtion對象的await方法的線程都會被包裝成Node扔進一個條件隊列中,就像這樣:
Conditon queue
可見,每個Condition對象對應一個Conditon隊列,每一個Condtion隊列都是獨立的,互相不影響的。在上圖中,若是咱們對當前線程調用了notFull.await(), 則當前線程就會被包裝成Node加到notFull隊列的末尾。

值得注意的是,condition queue是一個單向鏈表,在該鏈表中咱們使用nextWaiter屬性來串聯鏈表。可是,就像在sync queue中不會使用nextWaiter屬性來串聯鏈表同樣,在condition queue中,也並不會用到prev, next屬性,它們的值都爲null。也就是說,在條件隊列中,Node節點真正用到的屬性只有三個:

  • thread:表明當前正在等待某個條件的線程
  • waitStatus:條件的等待狀態
  • nextWaiter:指向條件隊列中的下一個節點

既然這裏又提到了waitStatus,咱們這裏再回顧一下它的取值範圍:

volatile int waitStatus;
static final int CANCELLED =  1;
static final int SIGNAL    = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;

在條件隊列中,咱們只須要關注一個值便可——CONDITION。它表示線程處於正常的等待狀態,而只要waitStatus不是CONDITION,咱們就認爲線程再也不等待了,此時就要從條件隊列中出隊。

sync queue 和 conditon queue的聯繫

通常狀況下,等待鎖的sync queue和條件隊列condition queue是相互獨立的,彼此之間並無任何關係。可是,當咱們調用某個條件隊列的signal方法時,會將某個或全部等待在這個條件隊列中的線程喚醒,被喚醒的線程和普通線程同樣須要去爭鎖,若是沒有搶到,則一樣要被加到等待鎖的sync queue中去,此時節點就從condition queue中被轉移到sync queue中:
condtion queue to sync queue

可是,這裏尤爲要注意的是,node是被一個一個轉移過去的,哪怕咱們調用的是signalAll()方法也是一個一個轉移過去的,而不是將整個條件隊列接在sync queue的末尾。

同時要注意的是,咱們在sync queue中只使用prevnext來串聯鏈表,而不使用nextWaiter;咱們在condition queue中只使用nextWaiter來串聯鏈表,而不使用prevnext.事實上,它們就是兩個使用了一樣的Node數據結構的徹底獨立的兩種鏈表。所以,將節點從condition queue中轉移到sync queue中時,咱們須要斷開原來的連接(nextWaiter),創建新的連接(prev, next),這某種程度上也是須要將節點一個一個地轉移過去的緣由之一。

入隊時和出隊時的鎖狀態

sync queue是等待鎖的隊列,當一個線程被包裝成Node加到該隊列中時,必然是沒有獲取到鎖;當處於該隊列中的節點獲取到了鎖,它將從該隊列中移除(事實上移除操做是將獲取到鎖的節點設爲新的dummy head,並將thread屬性置爲null)。

condition隊列是等待在特定條件下的隊列,由於調用await方法時,必然是已經得到了lock鎖,因此在進入condtion隊列線程必然是已經獲取了鎖;在被包裝成Node扔進條件隊列中後,線程將釋放鎖,而後掛起;當處於該隊列中的線程被signal方法喚醒後,因爲隊列中的節點在以前掛起的時候已經釋放了鎖,因此必須先去再次的競爭鎖,所以,該節點會被添加到sync queue中。所以,條件隊列在出隊時,線程並不持有鎖。

因此事實上,這兩個隊列的鎖狀態正好相反:

  • condition queue:入隊時已經持有了鎖 -> 在隊列中釋放鎖 -> 離開隊列時沒有鎖 -> 轉移到sync queue
  • sync queue:入隊時沒有鎖 -> 在隊列中爭鎖 -> 離開隊列時得到了鎖

經過上面的介紹,咱們對條件隊列已經有了感性的認識,接下來就讓咱們進入到本篇的重頭戲——源碼分析:

CondtionObject

AQS對Condition這個接口的實現主要是經過ConditionObject,上面已經說個,它的核心實現就是是一個條件隊列,每個在某個condition上等待的線程都會被封裝成Node對象扔進這個條件隊列。

核心屬性

它的核心屬性只有兩個:

/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;

這兩個屬性分別表明了條件隊列的隊頭和隊尾,每當咱們新建一個conditionObject對象,都會對應一個條件隊列。

構造函數

public ConditionObject() { }

構造函數啥也沒幹,可見,條件隊列是延時初始化的,在真正用到的時候纔會初始化。

Condition接口方法實現

await()第一部分分析

public final void await() throws InterruptedException {
    // 若是當前線程在調動await()方法前已經被中斷了,則直接拋出InterruptedException
    if (Thread.interrupted())
        throw new InterruptedException();
    // 將當前線程封裝成Node添加到條件隊列
    Node node = addConditionWaiter();
    // 釋放當前線程所佔用的鎖,保存當前的鎖狀態
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    // 若是當前隊列不在同步隊列中,說明剛剛被await, 尚未人調用signal方法,則直接將當前線程掛起
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this); // 線程將在這裏被掛起,中止運行
        // 能執行到這裏說明要麼是signal方法被調用了,要麼是線程被中斷了
        // 因此檢查下線程被喚醒的緣由,若是是由於中斷被喚醒,則跳出while循環
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // 第一部分就分析到這裏,下面的部分咱們到第二部分再看, 先把它註釋起來
    /*
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
    */
}

咱們已經在上面的代碼註釋中描述了大致的流程,接下來咱們詳細來看看await方法中所調用方法的具體實現。

首先是將當前線程封裝成Node扔進條件隊列中的addConditionWaiter方法:

addConditionWaiter

/**
 * Adds a new waiter to wait queue.
 * @return its new wait node
 */
private Node addConditionWaiter() {
    Node t = lastWaiter;
    // 若是尾節點被cancel了,則先遍歷整個鏈表,清除全部被cancel的節點
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    // 將當前線程包裝成Node扔進條件隊列
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    /*
    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
    */
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

首先咱們要思考的是,存在兩個不一樣的線程同時入隊的狀況嗎?不存在。爲何呢?由於前面說過了,能調用await方法的線程必然是已經得到了鎖,而得到了鎖的線程只有一個,因此這裏不存在併發,所以不須要CAS操做。

在這個方法中,咱們就是簡單的將當前線程封裝成Node加到條件隊列的末尾。這和將一個線程封裝成Node加入等待隊列略有不一樣:

  1. 節點加入sync queuewaitStatus的值爲0,但節點加入condition queuewaitStatus的值爲Node.CONDTION
  2. sync queue的頭節點爲dummy節點,若是隊列爲空,則會先建立一個dummy節點,再建立一個表明當前節點的Node添加在dummy節點的後面;而condtion queue 沒有dummy節點,初始化時,直接將firstWaiterlastWaiter直接指向新建的節點就好了。
  3. sync queue是一個雙向隊列,在節點入隊後,要同時修改當前節點的前驅前驅節點的後繼;而在condtion queue中,咱們只修改了前驅節點的nextWaiter,也就是說,condtion queue是做爲單向隊列來使用的。

若是入隊時發現尾節點已經取消等待了,那麼咱們就不該該接在它的後面,此時須要調用unlinkCancelledWaiters來剔除那些已經取消等待的線程:

private void unlinkCancelledWaiters() {
    Node t = firstWaiter;
    Node trail = null;
    while (t != null) {
        Node next = t.nextWaiter;
        if (t.waitStatus != Node.CONDITION) {
            t.nextWaiter = null;
            if (trail == null)
                firstWaiter = next;
            else
                trail.nextWaiter = next;
            if (next == null)
                lastWaiter = trail;
        }
        else
            trail = t;
        t = next;
    }
}

該方法將從頭節點開始遍歷整個隊列,剔除其中waitStatus不爲Node.CONDTION的節點,這裏使用了兩個指針firstWaitertrail來分別記錄第一個和最後一個waitStatus不爲Node.CONDTION的節點,這些都是基礎的鏈表操做,很容易理解,這裏再也不贅述了。

fullyRelease

在節點被成功添加到隊列的末尾後,咱們將調用fullyRelease來釋放當前線程所佔用的鎖:

/**
 * Invokes release with current state value; returns saved state.
 * Cancels node and throws exception on failure.
 * @param node the condition node for this wait
 * @return previous sync state
 */
final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        int savedState = getState();
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}

首先,當咱們調用這個方法時,說明當前線程已經被封裝成Node扔進條件隊列了。在該方法中,咱們經過release方法釋放鎖,還記得release方法嗎,咱們在逐行分析AQS源碼(2)——獨佔鎖的釋放中已經詳細講過了,這裏再也不贅述了。

值得注意的是,這是一次性釋放了全部的鎖,即對於可重入鎖而言,不管重入了幾回,這裏是一次性釋放完的,這也就是爲何該方法的名字叫fullyRelease。但這裏尤爲要注意的是release(savedState)方法是有可能拋出IllegalMonitorStateException的,這是由於當前線程可能並非持有鎖的線程。可是咱前面不是說,只有持有鎖的線程才能調用await方法嗎?既然fullyRelease方法在await方法中,爲啥當前線程還有可能並非持有鎖的線程呢?

雖然話是這麼說,可是在調用await方法時,咱們其實並無檢測Thread.currentThread() == getExclusiveOwnerThread(),換句話說,也就是執行到fullyRelease這一步,咱們纔會檢測這一點,而這一點檢測是由AQS子類實現tryRelease方法來保證的,例如,ReentrantLock對tryRelease方法的實現以下:

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

當發現當前線程不是持有鎖的線程時,咱們就會進入finally塊,將當前Node的狀態設爲Node.CANCELLED,這也就是爲何上面的addConditionWaiter在添加新節點前每次都會檢查尾節點是否已經被取消了。

在當前線程的鎖被徹底釋放了以後,咱們就能夠調用LockSupport.park(this)把當前線程掛起,等待被signal了。可是,在掛起當前線程以前咱們先用isOnSyncQueue確保了它不在sync queue中,這是爲何呢?當前線程不是在一個和sync queue無關的條件隊列中嗎?怎麼可能會出如今sync queue中的狀況?

/**
 * Returns true if a node, always one that was initially placed on
 * a condition queue, is now waiting to reacquire on sync queue.
 * @param node the node
 * @return true if is reacquiring
 */
final boolean isOnSyncQueue(Node node) {
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    if (node.next != null) // If has successor, it must be on queue
        return true;
    /*
     * node.prev can be non-null, but not yet on queue because
     * the CAS to place it on queue can fail. So we have to
     * traverse from tail to make sure it actually made it.  It
     * will always be near the tail in calls to this method, and
     * unless the CAS failed (which is unlikely), it will be
     * there, so we hardly ever traverse much.
     */
    return findNodeFromTail(node);
}
/**
 * Returns true if node is on sync queue by searching backwards from tail.
 * Called only when needed by isOnSyncQueue.
 * @return true if present
 */
private boolean findNodeFromTail(Node node) {
    Node t = tail;
    for (;;) {
        if (t == node)
            return true;
        if (t == null)
            return false;
        t = t.prev;
    }
}

爲了解釋這一問題,咱們先來看看signal方法

signalAll()

在看signalAll以前,咱們首先要區分調用signalAll方法的線程與signalAll方法要喚醒的線程(等待在對應的條件隊列裏的線程):

  • 調用signalAll方法的線程自己是已經持有了鎖,如今準備釋放鎖了;
  • 在條件隊列裏的線程是已經在對應的條件上掛起了,等待着被signal喚醒,而後去爭鎖。

首先,與調用notify時線程必須是已經持有了監視器鎖相似,在調用condition的signal方法時,線程也必須是已經持有了lock鎖:

public final void signalAll() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignalAll(first);
}

該方法首先檢查當前調用signal方法的線程是否是持有鎖的線程,這是經過isHeldExclusively方法來實現的,該方法由繼承AQS的子類來實現,例如,ReentrantLock對該方法的實現爲:

protected final boolean isHeldExclusively() {
    return getExclusiveOwnerThread() == Thread.currentThread();
}

由於exclusiveOwnerThread保存了當前持有鎖的線程,這裏只要檢測它是否是等於當前線程就好了。
接下來先經過firstWaiter是否爲空判斷條件隊列是否爲空,若是條件隊列不爲空,則調用doSignalAll方法:

private void doSignalAll(Node first) {
    lastWaiter = firstWaiter = null;
    do {
        Node next = first.nextWaiter;
        first.nextWaiter = null;
        transferForSignal(first);
        first = next;
    } while (first != null);
}

首先咱們經過lastWaiter = firstWaiter = null;將整個條件隊列清空,而後經過一個do-while循環,將原先的條件隊列裏面的節點一個一個拿出來(令nextWaiter = null),再經過transferForSignal方法一個一個添加到sync queue的末尾:

/**
 * Transfers a node from a condition queue onto sync queue.
 * Returns true if successful.
 * @param node the node
 * @return true if successfully transferred (else the node was
 * cancelled before signal)
 */
final boolean transferForSignal(Node node) {
    // 若是該節點在調用signal方法前已經被取消了,則直接跳過這個節點
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;
    // 若是該節點在條件隊列中正常等待,則利用enq方法將該節點添加至sync queue隊列的尾部
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread); 
    return true;
}

transferForSignal方法中,咱們先使用CAS操做將當前節點的waitStatus狀態由CONDTION設爲0,若是修改不成功,則說明該節點已經被CANCEL了,則咱們直接返回,操做下一個節點;若是修改爲功,則說明咱們已經將該節點從等待的條件隊列中成功「喚醒」了,但此時該節點對應的線程並無真正被喚醒,它還要和其餘普通線程同樣去爭鎖,所以它將被添加到sync queue的末尾等待獲取鎖。

咱們這裏經過enq方法將該節點添加進sync queue的末尾。關於該方法,咱們在逐行分析AQS源碼(1)——獨佔鎖的獲取中已經詳細講過了,這裏再也不贅述。不過這裏尤爲注意的是,enq方法將node節點添加進隊列時,返回的是node的前驅節點

在將節點成功添加進sync queue中後,咱們獲得了該節點在sync queue中的前驅節點。咱們前面說過,在sync queque中的節點都要靠前驅節點去喚醒,因此,這裏咱們要作的就是將前驅節點的waitStatus設爲Node.SIGNAL, 這一點和shouldParkAfterFailedAcquire所作的工做相似:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        return true;
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

所不一樣的是,shouldParkAfterFailedAcquire將會向前查找,跳過那些被cancel的節點,而後將找到的第一個沒有被cancel的節點的waitStatus設成SIGNAL,最後再掛起。而在transferForSignal中,當前Node所表明的線程自己就已經被掛起了,因此這裏作的更像是一個複合操做——只要前驅節點處於被取消的狀態或者沒法將前驅節點的狀態修成Node.SIGNAL,那咱們就將Node所表明的線程喚醒,但這個條件並不意味着當前lock處於可獲取的狀態,有可能線程被喚醒了,可是鎖仍是被佔有的狀態,不過這樣作至少是無害的,由於咱們在線程被喚醒後還要去爭鎖,若是搶不到鎖,則大不了再次被掛起。

值得注意的是,transferForSignal是有返回值的,可是咱們在這個方法中並無用到,它將在signal()方法中被使用。

在繼續往下看signal()方法以前,這裏咱們再總結一下signalAll()方法:

  1. 將條件隊列清空(只是令lastWaiter = firstWaiter = null,隊列中的節點和鏈接關係仍然還存在)
  2. 將條件隊列中的頭節點取出,使之成爲孤立節點(nextWaiter,prev,next屬性都爲null)
  3. 若是該節點處於被Cancelled了的狀態,則直接跳過該節點(因爲是孤立節點,則會被GC回收)
  4. 若是該節點處於正常狀態,則經過enq方法將它添加到sync queue的末尾
  5. 判斷是否須要將該節點喚醒(包括設置該節點的前驅節點的狀態爲SIGNAL),若有必要,直接喚醒該節點
  6. 重複2-5,直到整個條件隊列中的節點都被處理完

signal()

signalAll()方法不一樣,signal()方法只會喚醒一個節點,對於AQS的實現來講,就是喚醒條件隊列中第一個沒有被Cancel的節點,弄懂了signalAll()方法,signal()方法就很容易理解了,由於它們大同小異:

public final void signal() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

首先依然是檢查調用該方法的線程(即當前線程)是否是已經持有了鎖,這一點和上面的signalAll()方法同樣,所不同的是,接下來調用的是doSignal方法:

private void doSignal(Node first) {
    do {
        // 將firstWaiter指向條件隊列隊頭的下一個節點
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        // 將條件隊列原來的隊頭從條件隊列中斷開,則此時該節點成爲一個孤立的節點
        first.nextWaiter = null;
    } while (!transferForSignal(first) && (first = firstWaiter) != null);
}

這個方法也是一個do-while循環,目的是遍歷整個條件隊列,找到第一個沒有被cancelled的節點,並將它添加到條件隊列的末尾。若是條件隊列裏面已經沒有節點了,則將條件隊列清空(firstWaiter=lasterWaiter=null)。

在這裏,咱們用的依然用的是transferForSignal方法,可是用到了它的返回值,只要節點被成功添加到sync queue中,transferForSignal就返回true, 此時while循環的條件就不知足了,整個方法就結束了,即調用signal()方法,只會喚醒一個線程。

總結: 調用signal()方法會從當前條件隊列中取出第一個沒有被cancel的節點添加到sync隊列的末尾。

await()第二部分分析

前面咱們已經分析了signal方法,它會將節點添加進sync queue隊列中,並要麼當即喚醒線程,要麼等待前驅節點釋放鎖後將本身喚醒,不管怎樣,被喚醒的線程要從哪裏恢復執行呢?固然是被掛起的地方呀,咱們在哪裏被掛起的呢?還記得嗎?固然是調用了await方法的地方,以await()方法爲例:

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this); // 咱們在這裏被掛起了,被喚醒後,將從這裏繼續往下運行
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) 
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

這裏值得注意的是,當咱們被喚醒時,其實並不知道是由於什麼緣由被喚醒,有多是由於其餘線程調用了signal方法,也有多是由於當前線程被中斷了。

可是,不管是被中斷喚醒仍是被signal喚醒,被喚醒的線程最後都將離開condition queue,進入到sync queue中,這一點咱們在下面分析源代碼的時候詳細說。

隨後,線程將在sync queue中利用進行acquireQueued方法進行「阻塞式」爭鎖,搶到鎖就返回,搶不到鎖就繼續被掛起。所以,當await()方法返回時,必然是保證了當前線程已經持有了lock鎖。

另外有一點這裏咱們提早說明一下,這一點對於咱們下面理解源碼很重要,那就是:

若是從線程被喚醒,到線程獲取到鎖這段過程當中發生過中斷,該怎麼處理?

咱們前面分析中斷的時候說過,中斷對於當前線程只是個建議,由當前線程決定怎麼對其作出處理。在acquireQueued方法中,咱們對中斷是不響應的,只是簡單的記錄搶鎖過程當中的中斷狀態,並在搶到鎖後將這個中斷狀態返回,交於上層調用的函數處理,而這裏「上層調用的函數」就是咱們的await()方法。

那麼await()方法是怎麼對待這個中斷的呢?這取決於:

中斷髮生時,線程是否已經被signal過?

若是中斷髮生時,當前線程並無被signal過,則說明當前線程還處於條件隊列中,屬於正常在等待中的狀態,此時中斷將致使當前線程的正常等待行爲被打斷,進入到sync queue中搶鎖,所以,在咱們從await方法返回後,須要拋出InterruptedException,表示當前線程由於中斷而被喚醒。

若是中斷髮生時,當前線程已經被signal過了,則說明這個中斷來的太晚了,既然當前線程已經被signal過了,那麼就說明在中斷髮生前,它就已經正常地被從condition queue中喚醒了,因此隨後即便發生了中斷(注意,這個中斷能夠發生在搶鎖以前,也能夠發生在搶鎖的過程當中),咱們都將忽略它,僅僅是在await()方法返回後,再自我中斷一下,補一下這個中斷。就好像這個中斷是在await()方法調用結束以後才發生的同樣。這裏之因此要「補一下」這個中斷,是由於咱們在用Thread.interrupted()方法檢測是否發生中斷的同時,會將中斷狀態清除,所以若是選擇了忽略中斷,則應該在await()方法退出後將它設成原來的樣子。

關於「這個中斷來的太晚了」這一點若是你們不太容易理解的話,這裏打個比方:這就比如咱們去飯店吃飯,都快吃完了,有一個菜到如今尚未上,因而咱們經常會把服務員叫來問:這個菜有沒有在作?要是還沒作咱們就不要了。而後服務員會跑到廚房去問,以後跑回來講:對不起,這個菜已經下鍋在炒了,請再耐心等待一下。這裏,這個「這個菜咱們不要了」(發起的中斷)就來的太晚了,由於菜已經下鍋了(已經被signal過了)。

理清了上面的概念,咱們再來看看await()方法是怎麼作的,它用中斷模式interruptMode這個變量記錄中斷事件,該變量有三個值:

  1. 0 : 表明整個過程當中一直沒有中斷髮生。
  2. THROW_IE : 表示退出await()方法時須要拋出InterruptedException,這種模式對應於中斷髮生在signal以前
  3. REINTERRUPT : 表示退出await()方法時只須要再自我中斷如下,這種模式對應於中斷髮生在signal以後,即中斷來的太晚了。
/** Mode meaning to reinterrupt on exit from wait */
private static final int REINTERRUPT =  1;
/** Mode meaning to throw InterruptedException on exit from wait */
private static final int THROW_IE    = -1;

接下來咱們就從線程被喚醒的地方繼續往下走,一步步分析源碼:

狀況1:中斷髮生時,線程尚未被signal過

線程被喚醒後,咱們將首先使用checkInterruptWhileWaiting方法檢測中斷的模式:

/**
 * Checks for interrupt, returning THROW_IE if interrupted
 * before signalled, REINTERRUPT if after signalled, or
 * 0 if not interrupted.
 */
private int checkInterruptWhileWaiting(Node node) {
    return Thread.interrupted() ?
        (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
        0;
}

這裏假設已經發生過中斷,則Thread.interrupted()方法必然返回true,接下來就是用transferAfterCancelledWait進一步判斷是否發生了signal:

final boolean transferAfterCancelledWait(Node node) {
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
        enq(node);
        return true;
    }
    while (!isOnSyncQueue(node))
        Thread.yield();
    return false;
}

上面已經說過,判斷一個node是否被signal過,一個簡單有效的方法就是判斷它是否離開了condition queue, 進入到sync queue中。

換句話說,只要一個節點的waitStatus仍是Node.CONDITION,那就說明它尚未被signal過。
因爲如今咱們分析狀況1,則當前節點的waitStatus必然是Node.CONDITION,則會成功執行compareAndSetWaitStatus(node, Node.CONDITION, 0),將該節點的狀態設置成0,而後調用enq(node)方法將當前節點添加進sync queue中,而後返回true
這裏值得注意的是,咱們此時並無斷開node的nextWaiter,因此最後必定不要忘記將這個連接斷開。

再回到transferAfterCancelledWait調用處,可知,因爲transferAfterCancelledWait將返回true,如今checkInterruptWhileWaiting將返回THROW_IE,這表示咱們在離開await方法時應當要拋出THROW_IE異常。

再回到checkInterruptWhileWaiting的調用處:

public final void await() throws InterruptedException {
    /*
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    */
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this); 
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) // 咱們如今在這裏!!!
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) 
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

interruptMode如今爲THROW_IE,則咱們將執行break,跳出while循環。

接下來咱們將執行acquireQueued(node, savedState)進行爭鎖,注意,這裏傳入的須要獲取鎖的重入數量是savedState,即以前釋放了多少,這裏就須要再次獲取多少:

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt()) // 若是線程獲取不到鎖,則將在這裏被阻塞住
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

acquireQueued咱們在前面的文章中已經詳細分析過了,它是一個阻塞式的方法,獲取到鎖則退出,獲取不到鎖則會被掛起。該方法只有在最終獲取到了鎖後,纔會退出,而且退出時會返回當前線程的中斷狀態,若是咱們在獲取鎖的過程當中又被中斷了,則會返回true,不然會返回false。可是其實這裏返回true仍是false已經不重要了,由於前面已經發生過中斷了,咱們就是由於中斷而被喚醒的不是嗎?因此不管如何,咱們在退出await()方法時,必然會拋出InterruptedException

咱們這裏假設它獲取到了鎖了,則它將回到上面的調用處,因爲咱們這時的interruptMode = THROW_IE,則會跳過if語句。
接下來咱們將執行:

if (node.nextWaiter != null) 
    unlinkCancelledWaiters();

上面咱們說過,當前節點的nextWaiter是有值的,它並無和原來的condition隊列斷開,這裏咱們已經獲取到鎖了,根據逐行分析AQS源碼(1)——獨佔鎖的獲取中的分析,咱們經過setHead方法已經將它的thread屬性置爲null,從而將當前線程從sync queue"移除"了,接下來應當將它從condition隊列裏面移除。因爲condition隊列是一個單向隊列,咱們沒法獲取到它的前驅節點,因此只能從頭開始遍歷整個條件隊列,而後找到這個節點,再移除它。

然而,事實上呢,咱們並無這麼作。由於既然已經必須從頭開始遍歷鏈表了,咱們就乾脆一次性把鏈表中全部沒有在等待的節點都拿出去,因此這裏調用了unlinkCancelledWaiters方法,該方法咱們在前面await()第一部分的分析的時候已經講過了,它就是簡單的遍歷鏈表,找到全部waitStatus不爲CONDITION的節點,並把它們從隊列中移除。

節點被移除後,接下來就是最後一步了——彙報中斷狀態:

if (interruptMode != 0)
    reportInterruptAfterWait(interruptMode);

這裏咱們的interruptMode=THROW_IE,說明發生了中斷,則將調用reportInterruptAfterWait

/**
 * Throws InterruptedException, reinterrupts current thread, or
 * does nothing, depending on mode.
 */
private void reportInterruptAfterWait(int interruptMode) throws InterruptedException {
    if (interruptMode == THROW_IE)
        throw new InterruptedException();
    else if (interruptMode == REINTERRUPT)
        selfInterrupt();
}

能夠看出,在interruptMode=THROW_IE時,咱們就是簡單的拋出了一個InterruptedException

至此,狀況1(中斷髮生於signal以前)咱們就分析完了,這裏咱們簡單總結一下:

  1. 線程由於中斷,從掛起的地方被喚醒
  2. 隨後,咱們經過transferAfterCancelledWait確認了線程的waitStatus值爲Node.CONDITION,說明並無signal發生過
  3. 而後咱們修改線程的waitStatus爲0,並經過enq(node)方法將其添加到sync queue
  4. 接下來線程將在sync queue中以阻塞的方式獲取,若是獲取不到鎖,將會被再次掛起
  5. 線程在sync queue中獲取到鎖後,將調用unlinkCancelledWaiters方法將本身從條件隊列中移除,該方法還會順便移除其餘取消等待的鎖
  6. 最後咱們經過reportInterruptAfterWait拋出了InterruptedException

由此能夠看出,一個調用了await方法掛起的線程在被中斷後不會當即拋出InterruptedException,而是會被添加到sync queue中去爭鎖,若是爭不到,仍是會被掛起;
只有爭到了鎖以後,該線程才得以從sync queuecondition queue中移除,最後拋出InterruptedException

因此說,一個調用了await方法的線程,即便被中斷了,它依舊仍是會被阻塞住,直到它獲取到鎖以後才能返回,並在返回時拋出InterruptedException。中斷對它意義更多的是體如今將它從condition queue中移除,加入到sync queue中去爭鎖,從這個層面上看,中斷和signal的效果其實很像,所不一樣的是,在await()方法返回後,若是是由於中斷被喚醒,則await()方法須要拋出InterruptedException異常,表示是它是被非正常喚醒的(正常喚醒是指被signal喚醒)。

狀況2:中斷髮生時,線程已經被signal過了

這種狀況對應於「中斷來的太晚了」,即REINTERRUPT模式,咱們在拿到鎖退出await()方法後,只須要再自我中斷一下,不須要拋出InterruptedException。

值得注意的是這種狀況其實包含了兩個子狀況:

  1. 被喚醒時,已經發生了中斷,但此時線程已經被signal過了
  2. 被喚醒時,並無發生中斷,可是在搶鎖的過程當中發生了中斷

下面咱們分別來分析:

狀況2.1:被喚醒時,已經發生了中斷,但此時線程已經被signal過了

對於這種狀況,與前面中斷髮生於signal以前的主要差異在於transferAfterCancelledWait方法:

final boolean transferAfterCancelledWait(Node node) {
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { //線程A執行到這裏,CAS操做將會失敗
        enq(node);
        return true;
    }
    // 因爲中斷髮生前,線程已經被signal了,則這裏只須要等待線程成功進入sync queue便可
    while (!isOnSyncQueue(node))
        Thread.yield();
    return false;
}

在這裏,因爲signal已經發生過了,則由咱們以前分析的signal方法可知,此時當前節點的waitStatus一定不爲Node.CONDITION,他將跳過if語句。此時當前線程可能已經在sync queue中,或者正在進入到sync queue的路上

爲何這裏會出現「正在進入到sync queue的路上」的狀況呢? 這裏咱們解釋下:

假設當前線程爲線程A, 它被喚醒以後檢測到發生了中斷,來到了transferAfterCancelledWait這裏,而另外一個線程B在這以前已經調用了signal方法,該方法會調用transferForSignal將當前線程添加到sync queue的末尾:

final boolean transferForSignal(Node node) {
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) // 線程B執行到這裏,CAS操做將會成功
        return false; 
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

由於線程A和線程B是併發執行的,而這裏咱們分析的是「中斷髮生在signal以後」,則此時,線程B的compareAndSetWaitStatus先於線程A執行。這時可能出現線程B已經成功修改了node的waitStatus狀態,可是還沒來得及調用enq(node)方法,線程A就執行到了transferAfterCancelledWait方法,此時它發現waitStatus已經不是Condition,可是其實當前節點尚未被添加到sync node隊列中,所以,它接下來將經過自旋,等待線程B執行完transferForSignal方法。

線程A在自旋過程當中會不斷地判斷節點有沒有被成功添加進sync queue,判斷的方法就是isOnSyncQueue

/**
 * Returns true if a node, always one that was initially placed on
 * a condition queue, is now waiting to reacquire on sync queue.
 * @param node the node
 * @return true if is reacquiring
 */
final boolean isOnSyncQueue(Node node) {
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    if (node.next != null) // If has successor, it must be on queue
        return true;
    return findNodeFromTail(node);
}

該方法很好理解,只要waitStatus的值還爲Node.CONDITION,則它必定還在condtion隊列中,天然不可能在sync裏面;而每個調用了enq方法入隊的線程:

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) { //即便這一步失敗了next.prev必定是有值的
                t.next = node; // 若是t.next有值,說明上面的compareAndSetTail方法必定成功了,則當前節點成爲了新的尾節點
                return t; // 返回了當前節點的前驅節點
            }
        }
    }
}

哪怕在設置compareAndSetTail這一步失敗了,它的prev必然也是有值的,所以這兩個條件只要有一個知足,就說明節點必然不在sync queue隊列中。

另外一方面,若是node.next有值,則說明它不只在sync queue中,而且在它後面還有別的節點,則只要它有值,該節點必然在sync queue中。
若是以上都不知足,說明這裏出現了尾部分叉(關於尾部分叉,參見這裏)的狀況,咱們就從尾節點向前尋找這個節點:

/**
 * Returns true if node is on sync queue by searching backwards from tail.
 * Called only when needed by isOnSyncQueue.
 * @return true if present
 */
private boolean findNodeFromTail(Node node) {
    Node t = tail;
    for (;;) {
        if (t == node)
            return true;
        if (t == null)
            return false;
        t = t.prev;
    }
}

這裏固然仍是有可能出現從尾部反向遍歷找不到的狀況,可是不用擔憂,咱們還在while循環中,不管如何,節點最後總會入隊成功的。最終,transferAfterCancelledWait將返回false。

再回到transferAfterCancelledWait調用處:

private int checkInterruptWhileWaiting(Node node) {
    return Thread.interrupted() ?
        (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
        0;
}

則這裏,因爲transferAfterCancelledWait返回了false,則checkInterruptWhileWaiting方法將返回REINTERRUPT,這說明咱們在退出該方法時只須要再次中斷。

再回到checkInterruptWhileWaiting方法的調用處:

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) //咱們在這裏!!!
            break;
    }
    //當前interruptMode=REINTERRUPT,不管這裏是否進入if體,該值不變
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE) 
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

此時,interruptMode的值爲REINTERRUPT,咱們將直接跳出while循環。

接下來就和上面的狀況1同樣了,咱們依然仍是去爭鎖,這一步依然是阻塞式的,獲取到鎖則退出,獲取不到鎖則會被掛起。

另外因爲如今interruptMode的值已經爲REINTERRUPT,所以不管在爭鎖的過程當中是否發生過中斷interruptMode的值都仍是REINTERRUPT

接着就是將節點從condition queue中剔除,與狀況1不一樣的是,在signal方法成功將node加入到sync queue時,該節點的nextWaiter已是null了,因此這裏這一步不須要執行。

再接下來就是報告中斷狀態了:

private void reportInterruptAfterWait(int interruptMode)
    throws InterruptedException {
    if (interruptMode == THROW_IE)
        throw new InterruptedException();
    else if (interruptMode == REINTERRUPT)
        selfInterrupt();
}

static void selfInterrupt() {
    Thread.currentThread().interrupt();
}

注意,這裏並無拋出中斷異常,而只是將當前線程再中斷一次。

至此,狀況2.1(被喚醒時,已經發生了中斷,但此時線程已經被signal過了)咱們就分析完了,這裏咱們簡單總結一下:

  1. 線程從掛起的地方被喚醒,此時既發生過中斷,又發生過signal
  2. 隨後,咱們經過transferAfterCancelledWait確認了線程的waitStatus值已經不爲Node.CONDITION,說明signal發生於中斷以前
  3. 而後,咱們經過自旋的方式,等待signal方法執行完成,確保當前節點已經被成功添加到sync queue
  4. 接下來線程將在sync queue中以阻塞的方式獲取鎖,若是獲取不到,將會被再次掛起
  5. 最後咱們經過reportInterruptAfterWait將當前線程再次中斷,可是不會拋出InterruptedException
狀況2.2:被喚醒時,並無發生中斷,可是在搶鎖的過程當中發生了中斷

這種狀況就比上面的狀況簡單一點了,既然被喚醒時沒有發生中斷,那基本能夠確信線程是被signal喚醒的,可是不要忘記還存在「假喚醒」這種狀況,所以咱們依然仍是要檢測被喚醒的緣由。

那麼怎麼區分究竟是假喚醒仍是由於是被signal喚醒了呢?

若是線程是由於signal而被喚醒,則由前面分析的signal方法可知,線程最終都會離開condition queue 進入sync queue中,因此咱們只須要判斷被喚醒時,線程是否已經在sync queue中便可:

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);  // 咱們在這裏,線程將在這裏被喚醒
        // 因爲如今沒有發生中斷,因此interruptMode目前爲0
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

線程被喚醒時,暫時尚未發生中斷,因此這裏interruptMode = 0, 表示沒有中斷髮生,因此咱們將繼續while循環,這時咱們將經過isOnSyncQueue方法判斷當前線程是否已經在sync queue中了。因爲已經發生過signal了,則此時node必然已經在sync queue中了,因此isOnSyncQueue將返回true,咱們將退出while循環。

不過這裏插一句,若是isOnSyncQueue檢測到當前節點不在sync queue中,則說明既沒有發生中斷,也沒有發生過signal,則當前線程是被「假喚醒」的,那麼咱們將再次進入循環體,將線程掛起。

退出while循環後接下來仍是利用acquireQueued爭鎖,由於前面沒有發生中斷,則interruptMode=0,這時,若是在爭鎖的過程當中發生了中斷,則acquireQueued將返回true,則此時interruptMode將變爲REINTERRUPT

接下是判斷node.nextWaiter != null,因爲在調用signal方法時已經將節點移出了隊列,全部這個條件也不成立。

最後就是彙報中斷狀態了,此時interruptMode的值爲REINTERRUPT,說明線程在被signal後又發生了中斷,這個中斷髮生在搶鎖的過程當中,這個中斷來的太晚了,所以咱們只是再次自我中斷一下。

至此,狀況2.2(被喚醒時,並無發生中斷,可是在搶鎖的過程當中發生了中斷)咱們就分析完了,這種狀況和2.1很像,區別就是一個是在喚醒後就被發現已經發生了中斷,一個個喚醒後沒有發生中斷,可是在搶鎖的過成中發生了中斷,但不管如何,這兩種狀況都會被歸結爲「中斷來的太晚了」,中斷模式爲REINTERRUPT,狀況2.2的總結以下:

  1. 線程被signal方法喚醒,此時並無發生過中斷
  2. 由於沒有發生過中斷,咱們將從checkInterruptWhileWaiting處返回,此時interruptMode=0
  3. 接下來咱們回到while循環中,由於signal方法保證了將節點添加到sync queue中,此時while循環條件不成立,循環退出
  4. 接下來線程將在sync queue中以阻塞的方式獲取,若是獲取不到鎖,將會被再次掛起
  5. 線程獲取到鎖返回後,咱們檢測到在獲取鎖的過程當中發生過中斷,而且此時interruptMode=0,這時,咱們將interruptMode修改成REINTERRUPT
  6. 最後咱們經過reportInterruptAfterWait將當前線程再次中斷,可是不會拋出InterruptedException

這裏咱們再總結如下狀況2(中斷髮生時,線程已經被signal過了),這種狀況對應於中斷髮生signal以後,咱們無論這個中斷是在搶鎖以前就已經發生了仍是搶鎖的過程當中發生了,只要它是在signal以後發生的,咱們就認爲它來的太晚了,咱們將忽略這個中斷。所以,從await()方法返回的時候,咱們只會將當前線程從新中斷一下,而不會拋出中斷異常。

狀況3: 一直沒有中斷髮生

這種狀況就更簡單了,它的大致流程和上面的狀況2.2差很少,只是在搶鎖的過程當中也沒有發生異常,則interruptMode爲0,沒有發生過中斷,所以不須要彙報中斷。則線程就從await()方法處正常返回。

await()總結

至此,咱們總算把await()方法完整的分析完了,這裏咱們對整個方法作出總結:

  1. 進入await()時必須是已經持有了鎖
  2. 離開await()時一樣必須是已經持有了鎖
  3. 調用await()會使得當前線程被封裝成Node扔進條件隊列,而後釋放所持有的鎖
  4. 釋放鎖後,當前線程將在condition queue中被掛起,等待signal或者中斷
  5. 線程被喚醒後會將會離開condition queue進入sync queue中進行搶鎖
  6. 若在線程搶到鎖以前發生過中斷,則根據中斷髮生在signal以前仍是以後記錄中斷模式
  7. 線程在搶到鎖後進行善後工做(離開condition queue, 處理中斷異常)
  8. 線程已經持有了鎖,從await()方法返回

await() 方法流程

在這一過程當中咱們尤爲要關注中斷,如前面所說,中斷和signal所起到的做用都是將線程從condition queue中移除,加入到sync queue中去爭鎖,所不一樣的是,signal方法被認爲是正常喚醒線程,中斷方法被認爲是非正常喚醒線程,若是中斷髮生在signal以前,則咱們在最終返回時,應當拋出InterruptedException;若是中斷髮生在signal以後,咱們就認爲線程自己已經被正常喚醒了,這個中斷來的太晚了,咱們直接忽略它,並在await()返回時再自我中斷一下,這種作法至關於將中斷推遲至await()返回時再發生。

awaitUninterruptibly()

在前面咱們分析的await()方法中,中斷起到了和signal一樣的效果,可是中斷屬於將一個等待中的線程非正常喚醒,可能即便線程被喚醒後,也搶到了鎖,可是卻發現當前的等待條件並無知足,則仍是得把線程掛起。所以咱們有時候並不但願await方法被中斷,awaitUninterruptibly()方法即實現了這個功能:

public final void awaitUninterruptibly() {
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    boolean interrupted = false;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if (Thread.interrupted())
            interrupted = true; // 發生了中斷後線程依舊留在了condition queue中,將會再次被掛起
    }
    if (acquireQueued(node, savedState) || interrupted)
        selfInterrupt();
}

首先,從方法簽名上就能夠看出,這個方法不會拋出中斷異常,咱們拿它和await()方法對比一下:

public final void await() throws InterruptedException {
    if (Thread.interrupted())  // 不一樣之處
        throw new InterruptedException(); // 不一樣之處
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    int interruptMode = 0;  // 不一樣之處
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)  // 不一樣之處
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)  // 不一樣之處
        interruptMode = REINTERRUPT;  // 不一樣之處
    if (node.nextWaiter != null)  // 不一樣之處
        unlinkCancelledWaiters(); // 不一樣之處
    if (interruptMode != 0) // 不一樣之處
        reportInterruptAfterWait(interruptMode); // 不一樣之處
}

因而可知,awaitUninterruptibly()全程忽略中斷,即便是當前線程由於中斷被喚醒,該方法也只是簡單的記錄中斷狀態,而後再次被掛起(由於並無並無任何操做將它添加到sync queue中)

要使當前線程離開condition queue去爭鎖,則必須是發生了signal事件。

最後,當線程在獲取鎖的過程當中發生了中斷,該方法也是不響應,只是在最終獲取到鎖返回時,再自我中斷一下。能夠看出,該方法和「中斷髮生於signal以後的」REINTERRUPT模式的await()方法很像。

至此,該方法咱們就分析完了,若是你以前await()方法已經弄懂了,這個awaitUninterruptibly()方法就很容易理解了。它的核心思想是:

  1. 中斷雖然會喚醒線程,可是不會致使線程離開condition queue,若是線程只是由於中斷而被喚醒,則他將再次被掛起
  2. 只有signal方法會使得線程離開condition queue
  3. 調用該方法時或者調用過程當中若是發生了中斷,僅僅會在該方法結束時再自我中斷如下,不會拋出InterruptedException

awaitNanos(long nanosTimeout)

前面咱們看的方法,不管是await()仍是awaitUninterruptibly(),它們在搶鎖的過程當中都是阻塞式的,即一直到搶到了鎖才能返回,不然線程仍是會被掛起,這樣帶來一個問題就是線程若是長時間搶不到鎖,就會一直被阻塞,所以咱們有時候更須要帶超時機制的搶鎖,這一點和帶超時機制的wait(long timeout)是很像的,咱們直接來看源碼:

public final long awaitNanos(long nanosTimeout) throws InterruptedException {
    /*if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);*/
    final long deadline = System.nanoTime() + nanosTimeout;
    /*int interruptMode = 0;
    while (!isOnSyncQueue(node)) */{
        if (nanosTimeout <= 0L) {
            transferAfterCancelledWait(node);
            break;
        }
        if (nanosTimeout >= spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanosTimeout);
        /*if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;*/
        nanosTimeout = deadline - System.nanoTime();
    }
    /*if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);*/
    return deadline - System.nanoTime();
}

該方法幾乎和await()方法同樣,只是多了超時時間的處理,咱們上面已經把和await()方法相同的部分註釋起來了,只留下了不一樣的部分,這樣它們的區別就變得更明顯了。

該方法的主要設計思想是,若是設定的超時時間還沒到,咱們就將線程掛起;超過等待的時間了,咱們就將線程從condtion queue轉移到sync queue中。注意這裏對於超時時間有一個小小的優化——當設定的超時時間很短時(小於spinForTimeoutThreshold的值),咱們就是簡單的自旋,而不是將線程掛起,以減小掛起線程和喚醒線程所帶來的時間消耗。

不過這裏還有一處值得注意,就是awaitNanos(0)的意義,咱們在線程間的同步與通訊(2)——wait, notify, notifyAll曾經提到過,wait(0)的含義是無限期等待,而咱們在awaitNanos(long nanosTimeout)方法中是怎麼處理awaitNanos(0)的呢?

if (nanosTimeout <= 0L) {
    transferAfterCancelledWait(node);
    break;
}

從這裏能夠看出,若是設置的等待時間自己就小於等於0,當前線程是會直接從condition queue中轉移到sync queue中的,並不會被掛起,也不須要等待signal,這一點確實是更復合邏輯。若是須要線程只有在signal發生的條件下才會被喚醒,則應該用上面的awaitUninterruptibly()方法。

await(long time, TimeUnit unit)

看完awaitNanos(long nanosTimeout)再看await(long time, TimeUnit unit)方法就更簡單了,它就是在awaitNanos(long nanosTimeout)的基礎上多了對於超時時間的時間單位的設置,可是在內部實現上仍是會把時間轉成納秒去執行,這裏咱們直接拿它和上面的awaitNanos(long nanosTimeout)方法進行對比,只給出不一樣的部分:

public final boolean await(long time, TimeUnit unit) throws InterruptedException {
    long nanosTimeout = unit.toNanos(time);
    /*if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    final long deadline = System.nanoTime() + nanosTimeout;*/
    /*boolean timedout = false;
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        if (nanosTimeout <= 0L) {*/
            timedout = transferAfterCancelledWait(node);
            /*break;
        }
        if (nanosTimeout >= spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanosTimeout);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
        nanosTimeout = deadline - System.nanoTime();
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);*/
    return !timedout;
}

能夠看出,這兩個方法主要的差異就體如今返回值上面,awaitNanos(long nanosTimeout)的返回值是剩餘的超時時間,若是該值大於0,說明超時時間還沒到,則說明該返回是由signal行爲致使的,而await(long time, TimeUnit unit)的返回值就是transferAfterCancelledWait(node)的值,咱們知道,若是調用該方法時,node尚未被signal過則返回true,node已經被signal過了,則返回false。所以當await(long time, TimeUnit unit)方法返回true,則說明在超時時間到以前就已經發生過signal了,該方法的返回是由signal方法致使的而不是超時時間。

綜上,調用await(long time, TimeUnit unit)其實就等價於調用awaitNanos(unit.toNanos(time)) > 0 方法,關於這一點,咱們在介紹condition接口的時候也已經提過了。

awaitUntil(Date deadline)

awaitUntil(Date deadline)方法與上面的幾種帶超時的方法也基本相似,所不一樣的是它的超時時間是一個絕對的時間,咱們直接拿它來和上面的await(long time, TimeUnit unit)方法對比:

public final boolean awaitUntil(Date deadline) throws InterruptedException {
    long abstime = deadline.getTime();
    /*if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    boolean timedout = false;
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) */{
        if (System.currentTimeMillis() > abstime) {
            /*timedout = transferAfterCancelledWait(node);
            break;
        }*/
        LockSupport.parkUntil(this, abstime);
        /*if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;*/
    }
    /*
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
    return !timedout;*/
}

可見,這裏大段的代碼都是重複的,區別就是在超時時間的判斷上使用了絕對時間,其實這裏的deadline就和awaitNanos(long nanosTimeout)以及await(long time, TimeUnit unit)內部的deadline變量是等價的,另外就是在這個方法中,沒有使用spinForTimeoutThreshold進行自旋優化,由於通常調用這個方法,目的就是設定一個較長的等待時間,不然使用上面的相對時間會更方便一點。

至此,AQS對於Condition接口的實現咱們就所有分析完了。

(完)

系列文章目錄

相關文章
相關標籤/搜索