前面學習了基於數組的非阻塞雙端隊列
ArrayDeque
,其內部維護一個數組和指向隊列頭和隊列尾索引的兩個成員變量;本篇則探究下基於數組的阻塞隊列是什麼樣的數據結構,又有什麼特性,相較於ArrayDeque
又有什麼異同;而後就是使用場景了java
先看內部成員變量定義, 和 ArrayDequeue
相比,差異不大,一個數組,兩個索引;此外多了一個鎖和兩個斷定條件數組
/** The queued items */ final Object[] items; /** items index for next take, poll, peek or remove */ int takeIndex; /** items index for next put, offer, or add */ int putIndex; /** Number of elements in the queue */ int count; /** Main lock guarding all access */ final ReentrantLock lock; /** Condition for waiting takes */ private final Condition notEmpty; /** Condition for waiting puts */ private final Condition notFull;
注意安全
count
直接表示隊列的元素個數(注意DelayQueue是經過遍從來獲取隊列長度,且併發修改會有問題,那麼這個是如何保證併發的?)數據結構以下圖數據結構
![aryBlockQueueStruct.jpeg](quiver-image-url/5AEE167065A2E49EB3DDBC449BCF991E.jpg =374x165)併發
分析阻塞原理以前,先經過註釋解釋下ArrayBlockingQueue
的使用場景源碼分析
通用的進隊方法以下,是非阻塞的方式,當數組滿時,直接返回false,爲保證併發安全,進隊操做是加鎖實現學習
public boolean offer(E e) { // 非空校驗 checkNotNull(e); final ReentrantLock lock = this.lock; lock.lock(); // 進隊加鎖 try { if (count == items.length) // 隊列滿,則直接返回false return false; else { enqueue(e); return true; } } finally { lock.unlock(); } } // 直接將元素塞入數組 private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); }
阻塞方式的進隊實現以下ui
public void put(E e) throws InterruptedException { checkNotNull(e); // 非空判斷 final ReentrantLock lock = this.lock; lock.lockInterruptibly(); // 獲取鎖 try { while (count == items.length) { // 一直阻塞,知道隊列非滿時,被喚醒 notFull.await(); } enqueue(e); // 進隊 } finally { lock.unlock(); } } public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { checkNotNull(e); long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) { // 阻塞,知道隊列不滿 // 或者超時時間已過,返回false if (nanos <= 0) return false; nanos = notFull.awaitNanos(nanos); } enqueue(e); return true; } finally { lock.unlock(); } }
源碼分析,阻塞入隊的邏輯比較清晰,小結一下this
offer(e)
put(e)
或 offer(e, timeout, unit)
notEmpty.signal()
喚起被阻塞的出隊線程非阻塞出隊方法以下url
public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { return (count == 0) ? null : dequeue(); } finally { lock.unlock(); } } private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; // 直接將隊頭扔出去,並置空數組中該位置 // 並移動隊列頭到下一位 @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) // 保障在遍歷時,能夠進行出隊操做 itrs.elementDequeued(); notFull.signal(); return x; }
阻塞的實現,邏輯比較清晰,首先競爭鎖,判斷是否爲空,是阻塞直到非空;不然彈出隊列頭元素
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } } public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) { if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos); } return dequeue(); } finally { lock.unlock(); } }
小結
poll()
方法take()
或 poll(long,TimeUnit)
方法建立線程池時,一般會用到 ArrayBlockingQueue
或者LinkedBlockingQueue
,如
new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(2));
延遲隊列也是併發安全,ArrayBlockingQueue
相比較 DelayQueue
應用場景的區別主要在
getDelay()
返回值小於0)基於數組阻塞隊列ArrayBlockingQueue
take()
或 poll(long, TimeUnit)
poll()
offer(E, long, TimeUnit)
或put(E)
offer(E)
add(E)