談談JUC----------Condition源碼分析

1、什麼是 Condition?

之前多線程通訊使用的是Object類提供的wait以及notify方法,Condition也有相似waitnotify的功能,且提供了當線程中斷時是否作出相應的處理方法。Condition底層實現基於FIFO隊列,它必須結合Lock鎖一塊兒使用,而且Condition實例由Lock建立。最後總結起來就是,Condition是一種多線程通訊工具,表示多線程下參與數據競爭的線程的一種狀態,主要負責多線程環境下對線程的掛起和喚醒工做。node

下面貼出一張截取自《Java併發編程的藝術》的對比圖:編程

2、Condition 源碼分析

一、Condition架構及其內部結構是怎樣的?

Condition是一個接口,jdk中提供了Condition的一個實現類ConditionObject,ConditionObjectAQS中的內部類,由於Condition的操做須要獲取相關的鎖,而AQS又是實現同步鎖的基礎。Condition提供了下面幾種操做方法:bash

  • void await() throws InterruptedException

當調用這個方法時,線程將被掛起,直到被其餘線程喚醒爲止。注意當掛起的線程被中斷時,將拋出InterruptedException異常。多線程

  • void awaitUninterruptibly()

此方法與上面的await()方法相似,區別是該方法不處理線程中斷的狀況。架構

  • long awaitNanos(long nanosTimeout) throws InterruptedException

表示能夠最長等待指定時間,除非中途被中斷或者提早喚醒了,返回值=nanosTimeout-已等待的時間。併發

  • void awaitUninterruptibly()

此方法與上面的await()方法相似,區別是該方法不處理線程中斷的狀況。工具

  • boolean await(long time, TimeUnit unit) throws InterruptedException

awaitNanos(),只不過能夠指定時間單位。源碼分析

  • boolean awaitUntil(Date deadline) throws InterruptedException

表示線程掛起,知道某個時間點喚醒該線程。post

  • void signal()以及void signalAll()

喚醒操做。ui

那麼問題來了,怎麼獲取Condition對象呢?查看Lock接口,該接口聲明瞭一個newCondition()方法,返回一個Condition對象,如Lock接口的一個實現ReentrantLock中則提供了該方法的實現,但本質上仍是建立了ConditionObject對象。

源碼以下:

public Condition newCondition() {
    return sync.newCondition();
}
複製代碼

其中sync爲繼承自AQS的同步器。

final ConditionObject newCondition() {
    return new ConditionObject();
}
複製代碼

關於AQS,能夠參考下面這篇文章:

Java 併發編程 ----- AQS(抽象隊列同步器)

二、Condition中await內部實現細節

下邊爲await()方法源代碼:

public final void await() throws InterruptedException {
    // 一、線程若是中斷,那麼拋出異常
    if (Thread.interrupted())
        throw new InterruptedException();
    // 二、將當前線程包裝成爲一個Node節點,加入FIFO隊列中
    Node node = addConditionWaiter();
    // 三、釋放鎖
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    // 四、判斷節點是否在同步隊列(注意非Condition隊列)中,若是沒有,則掛起當前線程,由於該線程還沒有具有數據競爭資格
    while (!isOnSyncQueue(node)) {
        // 五、掛起線程
        LockSupport.park(this);
        // 六、中斷直接返回
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // 七、參與數據競爭(非中斷時執行)
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
        
    // 清理條件隊列中狀態爲cancelled的節點
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
    }
複製代碼

從上面代碼咱們能夠看出,由於await自己不支持中斷,所以若是當前線程被中斷了,那麼第一步直接拋出異常。第二步將當前線程包裝成一個Node節點,加入到Condition條件隊列中,NodeAQS使用的是同一個類型,查看AQS可知,每一個Node與一個當前Thread相關聯。

static final class Node {
    volatile Thread thread;
}
複製代碼

接着執行第三步釋放鎖,這裏就涉及到爲何說Condition使用時必須先得到鎖的問題了,下方爲fullyRelease()源碼:

final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        int savedState = getState();
        // 這裏直接執行釋放鎖操做
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            // 釋放失敗,則報錯
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}
複製代碼

第4-6步則不斷循環,直到該節點出如今同步隊列中,注意同步隊列不是條件隊列,一個屬於AQS,一個屬於Condition,那麼,這個while循環是怎樣判斷是否在同步隊列中的呢?其實很簡單,就是當把節點的狀態改爲非CONDITION就能夠了,好比調用了signal(),並且請注意,LockSupport.park(this);這句代碼執行完以後是阻塞代碼了哈,後面分析signal會提到。源碼以下:

final boolean isOnSyncQueue(Node node) {
    // 判斷節點的狀態是否是CONDITION,CONDITION表示該節點正在處於等待某個條件,此時就應該park掛起線程
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    if (node.next != null) // If has successor, it must be on queue
        return true;
    return findNodeFromTail(node);
}
複製代碼

第7步也很好理解,既然當前線程等待的條件都有了,被喚醒了,那麼就直接參與數據競爭就完事了,第8步清理掉一些狀態爲cancelled的節點,線程因爲中斷或超時時,節點的狀態就會被標記成cancelled

三、Condition中signal內部實現細節
public final void signal() {
    // 一、必須得到鎖
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}
複製代碼
private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
        // 將節點插入到同步隊列中
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}
複製代碼

調用condition的signal的前提條件是當前線程已經獲取了lock,該方法會使得等待隊列中的頭節點即等待時間最長的那個節點移入到同步隊列,而移入到同步隊列後纔有機會使得等待線程被喚醒,即從await方法中的LockSupport.park(this)方法中返回(由於上文提到這個方法是阻塞住當前代碼的),從而纔有機會使得調用await方法的線程成功退出

3、總結

使用鎖結合Condition能夠很好的解決生產者消費者問題,須要注意的就是,使用Condition時必須先獲取鎖,不然將報錯,也就是說,通常會按下面方式去調用Condition

lock.lock();
condition.await();

condition.signal();
lock.unlock();
複製代碼
相關文章
相關標籤/搜索