Java 併發編程之 Condition 接口


本文部分摘自《Java 併發編程的藝術》node


概述

任意一個 Java 對象,都擁有一個監視器方法,主要包括 wait()、wait(long timeout)、notify() 以及 notifyAll() 方法,這些方法與 synchronized 同步關鍵字配合,能夠實現等待 - 通知模式。Condition 接口也提供了相似 Object 的監視器方法,與 Lock 配合能夠實現等待 - 通知模式編程

Object 的監視器方法與 Condition 接口的對比:數組

對比項 Object 監視器方法 Condition
前置條件 獲取對象的監視器鎖 調用 Lock.lock() 獲取鎖調用 Lock.newCondition() 獲取 Condition 對象
調用方法 直接調用如:object.wait() 直接調用如:condition.await()
等待隊列個數 一個 多個
當前線程釋放鎖並進入等待隊列 支持 支持
當前線程釋放鎖並進入等待隊列,在等待狀態中不響應中斷 不支持 支持
當前線程釋放鎖並進入超時等待狀態 支持 支持
當前線程釋放鎖並進入等待狀態到未來的某個時間 不支持 支持
喚醒等待隊列中的一個線程 支持 支持
喚醒等待隊列中的所有線程 支持 支持


接口示例

Condition 定義了等待 - 通知兩種類型的方法,當前線程調用這些方法時,須要提早獲取到 Condition 對象關聯的鎖。Condition 對象是由 Lock 對象(調用 Lock 對象的 newCondition() 方法)建立,換句話說,Condition 是依賴 Lock 對象的安全

public class ConditionUserCase {

    Lock lock = new ReentrantLock();
    Condition condition = lock.newCondition();

    public void conditionWait() throws InterruptedException {
        lock.lock();
        try {
            condition.await();
        } finally {
            lock.unlock();
        }
    }

    public void conditionSignal() {
        lock.lock();
        try {
            condition.signal();
        } finally {
            lock.unlock();
        }
    }
}

當調用 await() 方法後,當前線程會釋放鎖並在此等待,而其餘線程調用 Condition 對象的 signal() 方法,通知當前線程後,當前線程才從 await() 方法返回,而且在返回前已經獲取了鎖併發

Condition 的部分方法以及描述:ide

方法名稱 描 述
void await() throws InterruptedException 當前線程進入等待狀態直到被通知(signal)或中斷。
void awaitUninterruptibly() 當前線程進入等待狀態直到被通知,該方法不響應中斷。
long awaitNanos(long nanosTimeout) throws InterruptedException 當前線程進入等待狀態直到被通知、中斷或者超時,返回值表示剩餘超時時間。
boolean awaitUntil(Date deadline) throws InterruptedException 當前線程進入等待狀態直到被通知、中斷或者到某個時間。若是沒有到指定時間就被通知,方法返回 true,不然,表示到了指定時間,返回 false。
void signal() 喚醒一個等待在 Condition 上的線程,該線程從等待方法返回前必須得到與 Condition 相關聯的鎖。
void signalAll() 喚醒全部等待在 Condition 上的線程,可以從等待方法返回的線程必須得到與 Condition 相關聯的鎖。

下面經過一個有界隊列的示例來深刻理解 Condition 的使用方式ui

public class BoundedQueue<T> {

    private Object[] items;
    // 添加的下標,刪除的下標和數據當前數量
    private int addIndex, removeIndex, count;
    private Lock lock = new ReentrantLock();
    private Condition notEmpty = lock.newCondition();
    private Condition notFull = lock.newCondition();

    public BoundedQueue(int size) {
        items = new Object[size];
    }

    /**
     * 添加一個元素,若是數組滿,則添加線程進入等待狀態,直到有空位
     */
    public void add(T t) throws InterruptedException {
        lock.lock();
        try {
            while (count == items.length) {
                notFull.await();
            }
            items[addIndex] = t;
            if (++addIndex == items.length) {
                addIndex = 0;
            }
            ++count;
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    /**
     * 由頭部刪除一個元素,若是數組空,則刪除線程進入等待狀態,直到有新元素添加
     */
    @SuppressWarnings("unchecked")
    public T remove() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0) {
                notEmpty.await();
            }
            Object x = items[removeIndex];
            if (++removeIndex == items.length) {
                removeIndex = 0;
            }
            --count;
            notFull.signal();
            return (T) x;
        } finally {
            lock.unlock();
        }
    }
}


實現分析

ConditionObject 是同步器 AbstractQueuedSynchronizer 的內部類,每一個 Condition 對象都包含着一個隊列(等待隊列),該隊列是 Condition 對象實現等待 - 通知功能的關鍵this

1. 等待隊列

等待隊列是一個 FIFO 隊列,在隊列中的每一個節點都包含了一個線程引用,該線程就是在 Condition 對象上等待的線程,若是一個線程調用了 Condition.await() 方法,那麼該線程就會釋放鎖,構形成節點並加入等待隊列並進入等待狀態線程

一個 Condition 包含一個等待隊列,Condition 擁有首尾節點的引用,新增節點只須要將原有的尾節點 nextWaiter 指向它,並更新尾節點便可。節點引用更新的過程並無使用 CAS 來保證,緣由在於調用 await() 方法的線程一定是獲取了鎖的線程,也就是該過程是由鎖來保證線程安全的對象

在 Object 的監視器模型上,一個對象擁有一個同步隊列和等待隊列,而併發包中的 Lock 擁有一個同步隊列和多個等待隊列,其對應關係如圖所示:

2. 等待

調用 Condition 的 await() 方法,會使當前線程進入等待隊列並釋放鎖,同時線程狀態變爲等待狀態。當從 await() 方法返回時,當前線程必定獲取了 Condition 相關聯的鎖

Condition 的 await() 方法以下所示:

public final void await() throws InterruptedException {
    // 檢測線程中斷狀態
    if (Thread.interrupted())
        throw new InterruptedException();
    // 當前線程包裝爲 Node 並加入等待隊列
    Node node = addConditionWaiter();
    // 釋放同步狀態,也就是釋放鎖
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    // 檢測該節點是否在同步隊列中,若是不在,則繼續等待
    while (!isOnSyncQueue(node)) {
        // 掛起線程
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // 競爭同步狀態
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    // 清理條件隊列中的不是在等待條件的節點
    if (node.nextWaiter != null) 
        unlinkCancelledWaiters();
    // 對等待線程中斷,會拋出異常
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

3. 通知

調用 Condition 的 signal() 方法,將會喚醒在等待隊列中等待時間最長的節點(首節點),在喚醒節點以前,會將節點移到同步隊列中

Condition 的 signal() 方法代碼以下所示:

public final void signal() {
    // 檢查當前線程是否獲取了鎖
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    // 獲取等待隊列首節點,移動到同步隊列並喚醒
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

Condition 的 signAll() 方法,至關於對等待隊列中的每一個結點均執行一個 signal() 方法,效果就是將等待隊列中全部節點所有移動到同步隊列中,並喚醒每一個節點的線程

相關文章
相關標籤/搜索