阻塞隊列(BlockingQueue)是一種支持兩個附加操做的隊列.java
阻塞隊列經常使用於生產者和消費者的場景,生產者是向隊列裏存儲元素的線程,消費者是從隊列裏獲取元素的線程.阻塞隊列就是生產者存儲元素、消費者獲取元素的容器.node
在阻塞隊列不可用時,這兩個附加操做提供了4種處理方式
數組
注意 如果無界阻塞隊列,隊列不會出現滿的狀況,因此使用put或offer方法永遠不會被阻塞,使用offer方法時,永遠返回true緩存
BlockingQueue 不接受 null 元素。試圖 add、put 或 offer null 元素時,會拋出 NullPointerException。null 被用做指示 poll 操做失敗的警惕值(沒法經過編譯)。 安全
BlockingQueue 實現主要用於生產者-使用者隊列,但它另外還支持 Collection 接口。所以,舉例來講,使用 remove(x) 從隊列中移除任意一個元素是有可能的。然而,這種操做一般表現並不高效,只能有計劃地偶爾使用,好比在取消排隊信息時。markdown
BlockingQueue 實現是線程安全的。全部排隊方法均可以使用內置鎖或其餘形式的併發控制來自動達到它們的目的。然而,大量的Collection 操做(addAll、containsAll、retainAll 和 removeAll)沒有必要自動執行,除非在實現中特別說明。所以,舉例來講,在只添加 c 中的一些元素後,addAll(c) 有可能失敗(拋出一個異常)。併發
BlockingQueue 實質上不支持使用任何一種「close」或「shutdown」操做來指示再也不添加任何項。這種功能的需求和使用有依賴於實現的傾向。例如,一種經常使用的策略是:對於生產者,插入特殊的 end-of-stream 或 poison 對象,並根據使用者獲取這些對象的時間來對它們進行解釋。 ui
至JDK8,Java提供了7個阻塞隊列
·ArrayBlockingQueue:一個由數組結構組成的有界阻塞隊列。
·LinkedBlockingQueue:一個由鏈表結構組成的有界阻塞隊列。
·PriorityBlockingQueue:一個支持優先級排序的無界阻塞隊列。
·DelayQueue:一個使用優先級隊列實現的無界阻塞隊列。
·SynchronousQueue:一個不存儲元素的阻塞隊列。
·LinkedTransferQueue:一個由鏈表結構組成的無界阻塞隊列。
·LinkedBlockingDeque:一個由鏈表結構組成的雙向阻塞隊列。this
是一個用數組實現的有界阻塞隊列.此隊列按FIFO的原則對元素進行排序spa
這是一個典型的「有界緩存區」,固定大小的數組在其中保持生產者插入的元素和消費者獲取的元素.一旦建立了這樣的緩存區,就不能再增長其容量.試圖向已滿隊列中放入元素會致使操做受阻塞;試圖從空隊列中提取元素將致使相似阻塞.
此類支持對等待的生產者和消費者線程進行排序的可選的公平策略.默認狀況下,不保證這種機制.然而,經過將公平性設置爲 true 而構造的隊列容許按 FIFO 順序訪問線程。公平性一般會下降吞吐量,但也減小了可變性和避免了「不平衡性」.
所謂公平訪問隊列是指阻塞的線程,能夠按照阻塞的前後順序訪問隊列,即先阻塞的線程先訪問隊列.
非公平性是對先等待的線程是非公平的,當隊列有可用空間時,阻塞的線程均可以爭奪訪問隊列的資格,有可能先阻塞的線程最後才訪問隊列.
爲保證公平性,一般會下降吞吐量.咱們可使用如下代碼建立一個公平的阻塞隊列:
ArrayBlockingQueue fairQueue = new ArrayBlockingQueue(1000,true);
訪問者的公平性是使用可重入鎖實現的,代碼以下:
public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
若隊列爲空,消費者會一直等待,當生產者添加元素時,消費者是如何知道當前隊列有元素的呢?讓咱們看看JDK是如何實現的。
使用通知模式實現。所謂通知模式,就是當生產者往滿的隊列裏添加元素時會阻塞住生產者,當消費者消費了一個隊列中的元素後,會通知生產者當前隊列可用。經過查看源碼發現ArrayBlockingQueue使用了Condition來實現,代碼以下。
private final Condition notFull; private final Condition notEmpty; public ArrayBlockingQueue(int capacity, boolean fair) { // 省略其餘代碼 notEmpty = lock.newCondition(); notFull = lock.newCondition(); public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } } public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } } private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); }
當往隊列裏插入一個元素時,若是隊列不可用,那麼阻塞生產者主要經過
LockSupport.park(this)來實現。
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
繼續進入源碼,發現調用setBlocker先保存一下將要阻塞的線程,而後調用unsafe.park阻塞當前線程。
public static void park(Object blocker) { Thread t = Thread.currentThread(); setBlocker(t, blocker); unsafe.park(false, 0L); setBlocker(t, null); }
unsafe.park是個native方法,代碼以下。
public native void park(boolean isAbsolute, long time);
park這個方法會阻塞當前線程,只有如下4種狀況中的一種發生時,該方法纔會返回。