阻塞隊列(BlockingQueue)是一個支持兩個附加操做的隊列。這兩個附加的操做是:在隊列爲空時,獲取元素的線程會等待隊列變爲非空。當隊列滿 時,存儲元素的線程會等待隊列可用。阻塞隊列經常使用於生產者和消費者的場景,生產者是往隊列裏添加元素的線程,消費者是從隊列裏拿元素的線程。阻塞隊列就是 生產者存放元素的容器,而消費者也只從容器裏拿元素。html
ArrayBlockingQueue :一個由數組結構組成的有界阻塞隊列。java
LinkedBlockingQueue :一個由鏈表結構組成的有界阻塞隊列。node
PriorityBlockingQueue :一個支持優先級排序的無界阻塞隊列。linux
DelayQueue:一個使用優先級隊列實現的無界阻塞隊列。編程
SynchronousQueue:一個不存儲元素的阻塞隊列。windows
LinkedTransferQueue:一個由鏈表結構組成的無界阻塞隊列。數組
LinkedBlockingDeque:一個由鏈表結構組成的雙向阻塞隊列。緩存
ArrayBlockingQueue fairQueue = new ArrayBlockingQueue(1000,true);
訪問者的公平性是使用可重入鎖實現的,代碼以下:多線程
public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
LinkedBlockingQueue是一個用鏈表實現的有界阻塞隊列。此隊列的默認和最大長度爲Integer.MAX_VALUE。此隊列按照先進先出的原則對元素進行排序。併發
PriorityBlockingQueue是一個支持優先級的無界隊列。默認狀況下元素採起天然順序排列,也能夠經過比較器comparator來指定元素的排序規則。元素按照升序排列。
DelayQueue是一個支持延時獲取元素的無界阻塞隊列。隊列使用PriorityQueue來實現。隊列中的元素必須實現Delayed接 口,在建立元素時能夠指定多久才能從隊列中獲取當前元素。只有在延遲期滿時才能從隊列中提取元素。咱們能夠將DelayQueue運用在如下應用場景:
緩存系統的設計:能夠用DelayQueue保存緩存元素的有效期,使用一個線程循環查詢DelayQueue,一旦能從DelayQueue中獲取元素時,表示緩存有效期到了。
定時任務調度。使用DelayQueue保存當天將會執行的任務和執行時間,一旦從DelayQueue中獲取到任務就開始執行,從好比TimerQueue就是使用DelayQueue實現的。
隊列中的Delayed必須實現compareTo來指定元素的順序。好比讓延時時間最長的放在隊列的末尾。實現代碼以下:
public int compareTo(Delayed other) { if (other == this) // compare zero ONLY if same object return 0; if (other instanceof ScheduledFutureTask) { ScheduledFutureTask x = (ScheduledFutureTask)other; long diff = time - x.time; if (diff < 0) return -1; else if (diff > 0) return 1; else if (sequenceNumber < x.sequenceNumber) return -1; else return 1; } long d = (getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS)); return (d == 0) ? 0 : ((d < 0) ? -1 : 1); }
如何實現Delayed接口
咱們能夠參考ScheduledThreadPoolExecutor裏ScheduledFutureTask類。這個類實現了Delayed接口。首先:在對象建立的時候,使用time記錄前對象何時可使用,代碼以下:
ScheduledFutureTask(Runnable r, V result, long ns, long period) { super(r, result); this.time = ns; this.period = period; this.sequenceNumber = sequencer.getAndIncrement(); }
而後使用getDelay能夠查詢當前元素還須要延時多久,代碼以下:
public long getDelay(TimeUnit unit) { return unit.convert(time - now(), TimeUnit.NANOSECONDS); }
經過構造函數能夠看出延遲時間參數ns的單位是納秒,本身設計的時候最好使用納秒,由於getDelay時能夠指定任意單位,一旦以納秒做爲單位,而延時的時間又精確不到納秒就麻煩了。使用時請注意當time小於當前時間時,getDelay會返回負數。
如何實現延時隊列
延時隊列的實現很簡單,當消費者從隊列裏獲取元素時,若是元素沒有達到延時時間,就阻塞當前線程。
long delay = first.getDelay(TimeUnit.NANOSECONDS); if (delay <= 0) return q.poll(); else if (leader != null) available.await();
SynchronousQueue是一個不存儲元素的阻塞隊列。每個put操做必須等待一個take操做,不然不能繼續添加元素。 SynchronousQueue能夠當作是一個傳球手,負責把生產者線程處理的數據直接傳遞給消費者線程。隊列自己並不存儲任何元素,很是適合於傳遞性 場景,好比在一個線程中使用的數據,傳遞給另一個線程使用,SynchronousQueue的吞吐量高於LinkedBlockingQueue 和 ArrayBlockingQueue。
LinkedTransferQueue是一個由鏈表結構組成的無界阻塞TransferQueue隊列。相對於其餘阻塞隊列,LinkedTransferQueue多了tryTransfer和transfer方法。
transfer方法。若是當前有消費者正在等待接收元素(消費者使用take()方法或帶時間限制的poll()方法時),transfer方法 能夠把生產者傳入的元素馬上transfer(傳輸)給消費者。若是沒有消費者在等待接收元素,transfer方法會將元素存放在隊列的tail節點, 並等到該元素被消費者消費了才返回。transfer方法的關鍵代碼以下:
Node pred = tryAppend(s, haveData); return awaitMatch(s, pred, e, (how == TIMED), nanos);
第一行代碼是試圖把存放當前元素的s節點做爲tail節點。第二行代碼是讓CPU自旋等待消費者消費元素。由於自旋會消耗CPU,因此自旋必定的次數後使用Thread.yield()方法來暫停當前正在執行的線程,並執行其餘線程。
tryTransfer方法。則是用來試探下生產者傳入的元素是否能直接傳給消費者。若是沒有消費者等待接收元素,則返回false。和 transfer方法的區別是tryTransfer方法不管消費者是否接收,方法當即返回。而transfer方法是必須等到消費者消費了才返回。
對於帶有時間限制的tryTransfer(E e, long timeout, TimeUnit unit)方法,則是試圖把生產者傳入的元素直接傳給消費者,可是若是沒有消費者消費該元素則等待指定的時間再返回,若是超時還沒消費元素,則返回 false,若是在超時時間內消費了元素,則返回true。
LinkedBlockingDeque是一個由鏈表結構組成的雙向阻塞隊列。所謂雙向隊列指的你能夠從隊列的兩端插入和移出元素。雙端隊列由於多 了一個操做隊列的入口,在多線程同時入隊時,也就減小了一半的競爭。相比其餘的阻塞隊列,LinkedBlockingDeque多了 addFirst,addLast,offerFirst,offerLast,peekFirst,peekLast等方法,以First單詞結尾的方 法,表示插入,獲取(peek)或移除雙端隊列的第一個元素。以Last單詞結尾的方法,表示插入,獲取或移除雙端隊列的最後一個元素。另外插入方法 add等同於addLast,移除方法remove等效於removeFirst。可是take方法卻等同於takeFirst,不知道是否是Jdk的 bug,使用時仍是用帶有First和Last後綴的方法更清楚。
在初始化LinkedBlockingDeque時能夠設置容量防止其過渡膨脹。另外雙向阻塞隊列能夠運用在「工做竊取」模式中。
若是隊列是空的,消費者會一直等待,當生產者添加元素時候,消費者是如何知道當前隊列有元素的呢?若是讓你來設計阻塞隊列你會如何設計,讓生產者和消費者可以高效率的進行通信呢?讓咱們先來看看JDK是如何實現的。
使用通知模式實現。所謂通知模式,就是當生產者往滿的隊列裏添加元素時會阻塞住生產者,當消費者消費了一個隊列中的元素後,會通知生產者當前隊列可用。經過查看JDK源碼發現ArrayBlockingQueue使用了Condition來實現,代碼以下:
private final Condition notFull; private final Condition notEmpty; public ArrayBlockingQueue(int capacity, boolean fair) { //省略其餘代碼 notEmpty = lock.newCondition(); notFull = lock.newCondition(); } public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); insert(e); } finally { lock.unlock(); } } public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return extract(); } finally { lock.unlock(); } } private void insert(E x) { items[putIndex] = x; putIndex = inc(putIndex); ++count; notEmpty.signal(); }
當咱們往隊列裏插入一個元素時,若是隊列不可用,阻塞生產者主要經過LockSupport.park(this);來實現
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
繼續進入源碼,發現調用setBlocker先保存下將要阻塞的線程,而後調用unsafe.park阻塞當前線程。
public static void park(Object blocker) { Thread t = Thread.currentThread(); setBlocker(t, blocker); unsafe.park(false, 0L); setBlocker(t, null); }
unsafe.park是個native方法,代碼以下:
public native void park(boolean isAbsolute, long time);
park這個方法會阻塞當前線程,只有如下四種狀況中的一種發生時,該方法纔會返回。
與park對應的unpark執行或已經執行時。注意:已經執行是指unpark先執行,而後再執行的park。
線程被中斷時。
若是參數中的time不是零,等待了指定的毫秒數時。
發生異常現象時。這些異常事先沒法肯定。
咱們繼續看一下JVM是如何實現park方法的,park在不一樣的操做系統使用不一樣的方式實現,在linux下是使用的是系統方法 pthread_cond_wait實現。實現代碼在JVM源碼路徑src/os/linux/vm/os_linux.cpp裏的 os::PlatformEvent::park方法,代碼以下:
void os::PlatformEvent::park() { int v ; for (;;) { v = _Event ; if (Atomic::cmpxchg (v-1, &_Event, v) == v) break ; } guarantee (v >= 0, "invariant") ; if (v == 0) { // Do this the hard way by blocking ... int status = pthread_mutex_lock(_mutex); assert_status(status == 0, status, "mutex_lock"); guarantee (_nParked == 0, "invariant") ; ++ _nParked ; while (_Event < 0) { status = pthread_cond_wait(_cond, _mutex); // for some reason, under 2.7 lwp_cond_wait() may return ETIME ... // Treat this the same as if the wait was interrupted if (status == ETIME) { status = EINTR; } assert_status(status == 0 || status == EINTR, status, "cond_wait"); } -- _nParked ; // In theory we could move the ST of 0 into _Event past the unlock(), // but then we'd need a MEMBAR after the ST. _Event = 0 ; status = pthread_mutex_unlock(_mutex); assert_status(status == 0, status, "mutex_unlock"); } guarantee (_Event >= 0, "invariant") ; } }
pthread_cond_wait是一個多線程的條件變量函數,cond是condition的縮寫,字面意思能夠理解爲線程在等待一個條件發 生,這個條件是一個全局變量。這個方法接收兩個參數,一個共享變量_cond,一個互斥量_mutex。而unpark方法在linux下是使用 pthread_cond_signal實現的。park 在windows下則是使用WaitForSingleObject實現的。
當隊列滿時,生產者往阻塞隊列裏插入一個元素,生產者線程會進入WAITING (parking)狀態。咱們可使用jstack dump阻塞的生產者線程看到這點:
"main" prio=5 tid=0x00007fc83c000000 nid=0x10164e000 waiting on condition [0x000000010164d000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x0000000140559fe8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043) at java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:324) at blockingqueue.ArrayBlockingQueueTest.main(ArrayBlockingQueueTest.java:11)
方騰飛,花名清英,併發編程網站站長。目前在阿里巴巴微貸事業部工做。併發編程網:http://ifeve.com,我的微博:http://weibo.com/kirals,歡迎經過個人微博進行技術交流。