條件隊列是什麼?可能不少人和我同樣答不出來,不過今天終於搞清楚了!java
條件隊列:當某個線程調用了wait方法,或者經過Condition對象調用了await相關方法,線程就會進入阻塞狀態,並加入到對應條件隊列中。node
在等待喚醒機制相關文章中咱們提到了條件隊列,即當對象獲取到同步鎖以後,若是調用了wait方法,當前線程會進入到條件隊列中,並釋放鎖。併發
synchronized(對象){ // 獲取鎖失敗,線程會加入到同步隊列中 while(條件不知足){ 對象.wait();// 調用wait方法當前線程加入到條件隊列中 } }
基於synchcronized的內置條件隊列存在一些缺陷。每一個內置鎖都只能有一個相關聯的條件隊列,於是存在多個線程可能在同一個條件隊列上等待不一樣的條件謂詞,而且在最多見的加鎖模式下公開條件隊列對象。框架
Java中的鎖的實現能夠分爲兩種,一種是基於synchronized的隱式鎖,它是基於JVM層面實現的;而另外一種則是基於AQS框架在代碼層面實現的鎖,如ReentrantLock等,在進行併發控制過程當中,不少狀況下他們均可以相互替代。ide
其中同步隊列和條件隊列是AQS中兩個比較核心的概念,它們是代碼層面實現鎖的關鍵。關於同步隊列的內容,咱們已經在圖解AQS的設計與實現,手摸手帶你實現一把互斥鎖!中進行了詳細的介紹。ui
與Object配合synchronized相比,基於AQS的Lock&Condition實現的等待喚醒模式更加靈活,支持多個條件隊列,支持等待狀態中不響應中斷以及超時等待功能; 其次就是基於AQS實現的條件隊列是"肉眼可見"的,咱們能夠經過源代碼進行debug,而synchronized則是徹底隱式的。this
與條件隊列密不可分的類則是ConditionObject, 是AQS中實現了Condition接口的內部類,一般配合基於AQS實現的鎖一同使用。當線程獲取到鎖以後,能夠調用await方法進入條件隊列並釋放鎖,或者調用singinal方法喚醒對應條件隊列中等待時間最久的線程並加入到等待隊列中。線程
在AQS中,線程會被封裝成Node對象加入隊列中,而條件隊列中則複用了同步隊列中的Node對象。debug
Condition接口一共定義瞭如下幾個方法:設計
await(): 當前線程進入等待狀態,直到被通知(siginal)或中斷【和wait方法語義相同】。
awaitUninterruptibly(): 當前線程進入等待狀態,直到被通知,對中斷不敏感。
awaitNanos(long timeout): 當前線程進入等待狀態直到被通知(siginal),中斷或超時。
awaitUnitil(Date deadTime): 當前線程進入等待狀態直到被通知(siginal),中斷或到達某個時間。
signal(): 喚醒一個等待在Condition上的線程,該線程從等待方法返回前必須得到與Condition關聯的鎖【和notify方法語義相同】
signalAll(): 喚醒全部等待在Condition上的線程,可以從等待方法返回的線程必須得到與Condition關聯的鎖【和notifyAll方法語義相同】。
當線程獲取到鎖以後,Condition對象調用await相關的方法,線程會進入到對應的條件隊列中。
/** * 若是當前線程被終端,拋出 InterruptedException 異常 */ public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 添加當前線程到【條件隊列】 Node node = addConditionWaiter(); // 釋放已經獲取的鎖資源,並返回釋放前的同步狀態 int savedState = fullyRelease(node); int interruptMode = 0; // 若是當前節點不在【同步隊列】中, 線程進入阻塞狀態,等待被喚醒 while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
Condition對象調用signal或者signalAll方法時,
/** * 將【條件隊列】中第一個有效的元素移除而且添加到【同步隊列】中 * 所謂有效指的是非null,而且狀態嗎 * @param first 條件隊列中第一個非空的元素 */ private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; // 將條件隊列中等待最久的那個有效元素添加到同步隊列中 } while (!transferForSignal(first) && (first = firstWaiter) != null); } /** * 將條件隊列中的節點轉換到同步隊列中 */ final boolean transferForSignal(Node node) { /* * If cannot change waitStatus, the node has been cancelled. * 若是節點的等待狀態不能被修改,說明當前線程已經被取消等待【多個線程執行siginal時會出現的狀況】 */ if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; /* * 加入到【同步隊列】中,而且嘗試將前驅節點設置爲可喚醒狀態 */ Node p = enq(node); // 將node添加到同步隊列中,並返回它的前驅節點 int ws = p.waitStatus; // 若是前驅節點不須要喚醒,或者設置狀態爲‘喚醒’失敗,則喚醒線程時期從新爭奪同步狀態 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
/** * 自定義互斥鎖 * * @author cruder * @time 2019/11/29 9:43 */ public class MutexLock { private static final Sync STATE_HOLDER = new Sync(); /** * 經過Sync內部類來持有同步狀態, 當狀態爲1表示鎖被持有,0表示鎖處於空閒狀態 */ private static class Sync extends AbstractQueuedSynchronizer { /** * 是否被獨佔, 有兩種表示方式 * 1. 能夠根據狀態,state=1表示鎖被佔用,0表示空閒 * 2. 能夠根據當前獨佔鎖的線程來判斷,即getExclusiveOwnerThread()!=null 表示被獨佔 */ @Override protected boolean isHeldExclusively() { return getExclusiveOwnerThread() != null; } /** * 嘗試獲取鎖,將狀態從0修改成1,操做成功則將當前線程設置爲當前獨佔鎖的線程 */ @Override protected boolean tryAcquire(int arg) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } /** * 釋放鎖,將狀態修改成0 */ @Override protected boolean tryRelease(int arg) { if (getState() == 0) { throw new UnsupportedOperationException(); } setExclusiveOwnerThread(null); setState(0); return true; } //【新增代碼】 final ConditionObject newCondition() { return new ConditionObject(); } } /** * 下面的實現Lock接口須要重寫的方法,基本是就是調用內部內Sync的方法 */ public void lock() { STATE_HOLDER.acquire(1); } public void unlock() { STATE_HOLDER.release(1); } // 【新增代碼】 獲取條件隊列 public Condition newCondition(){ return STATE_HOLDER.newCondition(); } }
/** * 有界阻塞阻塞隊列 * * @author Jann Lee * @date 2019-12-11 22:20 **/ public class BoundedBlockingQueue<T> { /** * list做爲底層存儲結構 */ private List<T> dataList; /** * 隊列的大小 */ private int size; /** * 鎖,和條件變量 */ private MutexLock lock; /** * 隊列非空 條件變量 */ private Condition notEmpty; /** * 隊列未滿 條件變量 */ private Condition notFull; public BoundedBlockingQueue(int size) { dataList = new ArrayList<>(); lock = new MutexLock(); notEmpty = lock.newCondition(); notFull = lock.newCondition(); this.size = size; } /** * 隊列中添加元素 [只有隊列未滿時才能夠添加,不然須要等待隊列變成未滿狀態] */ public void add(T data) throws InterruptedException { lock.lock(); try { // 若是隊列已經滿了, 須要在等待「隊列未滿」條件知足 while (dataList.size() == size) { notFull.await(); } dataList.add(data); Thread.sleep(2000); notEmpty.signal(); } finally { lock.unlock(); } } /** * 移除隊列的第一個元素[只有隊列非空才能夠移除,不然須要等待變成隊列非空狀態] */ public T remove() throws InterruptedException { lock.lock(); try { // 若是爲空, 須要在等待「隊列非空」條件知足 while (dataList.isEmpty()) { notEmpty.await(); } T result = dataList.remove(0); notFull.signal(); return result; } finally { lock.unlock(); } } }
最後敬上一個關注了也沒有錯的公衆號~