淺談Java中的Condition條件隊列,手摸手帶你實現一個阻塞隊列!

條件隊列是什麼?可能不少人和我同樣答不出來,不過今天終於搞清楚了!java

什麼是條件隊列

條件隊列:當某個線程調用了wait方法,或者經過Condition對象調用了await相關方法,線程就會進入阻塞狀態,並加入到對應條件隊列中。node

等待喚醒機制相關文章中咱們提到了條件隊列,即當對象獲取到同步鎖以後,若是調用了wait方法,當前線程會進入到條件隊列中,並釋放鎖。併發

synchronized(對象){ // 獲取鎖失敗,線程會加入到同步隊列中 
    while(條件不知足){
        對象.wait();// 調用wait方法當前線程加入到條件隊列中
    }
}

基於synchcronized的內置條件隊列存在一些缺陷。每一個內置鎖都只能有一個相關聯的條件隊列,於是存在多個線程可能在同一個條件隊列上等待不一樣的條件謂詞,而且在最多見的加鎖模式下公開條件隊列對象。框架

Java中的鎖的實現能夠分爲兩種,一種是基於synchronized的隱式鎖,它是基於JVM層面實現的;而另外一種則是基於AQS框架在代碼層面實現的鎖,如ReentrantLock等,在進行併發控制過程當中,不少狀況下他們均可以相互替代。ide

其中同步隊列和條件隊列是AQS中兩個比較核心的概念,它們是代碼層面實現鎖的關鍵。關於同步隊列的內容,咱們已經在圖解AQS的設計與實現,手摸手帶你實現一把互斥鎖!中進行了詳細的介紹。ui

與Object配合synchronized相比,基於AQS的Lock&Condition實現的等待喚醒模式更加靈活,支持多個條件隊列,支持等待狀態中不響應中斷以及超時等待功能; 其次就是基於AQS實現的條件隊列是"肉眼可見"的,咱們能夠經過源代碼進行debug,而synchronized則是徹底隱式的。this

同步隊列和條件隊列

與條件隊列密不可分的類則是ConditionObject, 是AQS中實現了Condition接口的內部類,一般配合基於AQS實現的鎖一同使用。當線程獲取到鎖以後,能夠調用await方法進入條件隊列並釋放鎖,或者調用singinal方法喚醒對應條件隊列中等待時間最久的線程並加入到等待隊列中。線程

在AQS中,線程會被封裝成Node對象加入隊列中,而條件隊列中則複用了同步隊列中的Node對象debug

Condition相關方法和描述

Condition接口一共定義瞭如下幾個方法:設計

await(): 當前線程進入等待狀態,直到被通知(siginal)或中斷【和wait方法語義相同】。

awaitUninterruptibly(): 當前線程進入等待狀態,直到被通知,對中斷不敏感。

awaitNanos(long timeout): 當前線程進入等待狀態直到被通知(siginal),中斷或超時。

awaitUnitil(Date deadTime): 當前線程進入等待狀態直到被通知(siginal),中斷或到達某個時間。

signal(): 喚醒一個等待在Condition上的線程,該線程從等待方法返回前必須得到與Condition關聯的鎖【和notify方法語義相同】

signalAll(): 喚醒全部等待在Condition上的線程,可以從等待方法返回的線程必須得到與Condition關聯的鎖【和notifyAll方法語義相同】。

條件隊列入隊操做

當線程獲取到鎖以後,Condition對象調用await相關的方法,線程會進入到對應的條件隊列中。

/**
  * 若是當前線程被終端,拋出 InterruptedException 異常
  */
public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    // 添加當前線程到【條件隊列】
    Node node = addConditionWaiter();
    // 釋放已經獲取的鎖資源,並返回釋放前的同步狀態
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    // 若是當前節點不在【同步隊列】中, 線程進入阻塞狀態,等待被喚醒
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

條件隊出隊操做

Condition對象調用signal或者signalAll方法時,

/**
 * 將【條件隊列】中第一個有效的元素移除而且添加到【同步隊列】中
 * 所謂有效指的是非null,而且狀態嗎
 * @param first 條件隊列中第一個非空的元素
 */
private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    // 將條件隊列中等待最久的那個有效元素添加到同步隊列中
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

/**
 * 將條件隊列中的節點轉換到同步隊列中
 */
final boolean transferForSignal(Node node) {
    /*
     * If cannot change waitStatus, the node has been cancelled.
     * 若是節點的等待狀態不能被修改,說明當前線程已經被取消等待【多個線程執行siginal時會出現的狀況】
     */
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    /*
     * 加入到【同步隊列】中,而且嘗試將前驅節點設置爲可喚醒狀態
     */
    Node p = enq(node); // 將node添加到同步隊列中,並返回它的前驅節點
    int ws = p.waitStatus;
    // 若是前驅節點不須要喚醒,或者設置狀態爲‘喚醒’失敗,則喚醒線程時期從新爭奪同步狀態
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

實現阻塞隊列

  1. 自定義互斥鎖升級,增長獲取Condition對象接口。
/**
 * 自定義互斥鎖
 *
 * @author cruder
 * @time 2019/11/29 9:43
 */
public class MutexLock {

    private static final Sync STATE_HOLDER = new Sync();

    /**
     * 經過Sync內部類來持有同步狀態, 當狀態爲1表示鎖被持有,0表示鎖處於空閒狀態
     */
    private static class Sync extends AbstractQueuedSynchronizer {

        /**
         * 是否被獨佔, 有兩種表示方式
         *  1. 能夠根據狀態,state=1表示鎖被佔用,0表示空閒
         *  2. 能夠根據當前獨佔鎖的線程來判斷,即getExclusiveOwnerThread()!=null 表示被獨佔
         */
        @Override
        protected boolean isHeldExclusively() {
            return getExclusiveOwnerThread() != null;
        }

        /**
         * 嘗試獲取鎖,將狀態從0修改成1,操做成功則將當前線程設置爲當前獨佔鎖的線程
         */
        @Override
        protected boolean tryAcquire(int arg) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        /**
         * 釋放鎖,將狀態修改成0
         */
        @Override
        protected boolean tryRelease(int arg) {
            if (getState() == 0) {
                throw new UnsupportedOperationException();
            }
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        //【新增代碼】
        final ConditionObject newCondition() {
            return new ConditionObject();
        }
    }

    /**
     * 下面的實現Lock接口須要重寫的方法,基本是就是調用內部內Sync的方法
     */
    public void lock() {
        STATE_HOLDER.acquire(1);
    }

    public void unlock() {
        STATE_HOLDER.release(1);
    }

    // 【新增代碼】 獲取條件隊列
    public Condition newCondition(){
        return STATE_HOLDER.newCondition();
    }

}
  1. 基於自定義互斥鎖,實現阻塞隊列。阻塞隊列具備兩個特色:
  • 添加元素到隊列中, 若是隊列已滿會使得當前線程阻塞【加入到條件隊列-隊列不滿】,直到隊列不滿爲止
  • 移除隊列中的元素,當隊列爲空時會使當前線程阻塞【加入到條件隊列-隊列不空】,直到隊列不爲空爲止
/**
 *  有界阻塞阻塞隊列
 *
 * @author Jann Lee
 * @date 2019-12-11 22:20
 **/
public class BoundedBlockingQueue<T> {
    /**
     * list做爲底層存儲結構
     */
    private List<T> dataList;
    /**
     * 隊列的大小
     */
    private int size;

    /**
     * 鎖,和條件變量
     */
    private MutexLock lock;
    /**
     * 隊列非空 條件變量
     */
    private Condition notEmpty;
    /**
     * 隊列未滿 條件變量
     */
    private Condition notFull;

    public BoundedBlockingQueue(int size) {
        dataList = new ArrayList<>();
        lock = new MutexLock();
        notEmpty = lock.newCondition();
        notFull = lock.newCondition();
        this.size = size;
    }


    /**
     * 隊列中添加元素 [只有隊列未滿時才能夠添加,不然須要等待隊列變成未滿狀態]
     */
    public void add(T data) throws InterruptedException {
        lock.lock();
        try {
            // 若是隊列已經滿了, 須要在等待「隊列未滿」條件知足
            while (dataList.size() == size) {
                notFull.await();
            }
            dataList.add(data);
            Thread.sleep(2000);
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    /**
     * 移除隊列的第一個元素[只有隊列非空才能夠移除,不然須要等待變成隊列非空狀態]
     */
    public T remove() throws InterruptedException {
        lock.lock();
        try {
            // 若是爲空, 須要在等待「隊列非空」條件知足
            while (dataList.isEmpty()) {
                notEmpty.await();
            }
            T result = dataList.remove(0);
            notFull.signal();
            return result;
        } finally {
            lock.unlock();
        }
    }

}

總結

  1. 條件隊列和同步隊列在Java中有兩種實現,synchronized關鍵字以及基於AQS
  2. 每一個(基於synchronized的)內置鎖都只能有一個相關聯的條件隊列,會存在多個線程可能在同一個條件隊列上等待不一樣的條件謂詞;而(基於AQS實現的)顯式鎖支持多個條件隊列
  3. 與wait,notify,notifyAll 對應的方法時Conditoin接口中的await,signal,signalAll,他們具備相同的語義

最後敬上一個關注了也沒有錯的公衆號~

相關文章
相關標籤/搜索