在前面咱們接觸的隊列都是非阻塞隊列,好比PriorityQueue、LinkedList(LinkedList是雙向鏈表,它實現了Dequeue接口)。java
使用非阻塞隊列的時候有一個很大問題就是:它不會對當前線程產生阻塞,那麼在面對相似消費者-生產者的模型時,就必須額外地實現同步策略以及線程間喚醒策略,這個實現起來就很是麻煩。可是有了阻塞隊列就不同了,它會對當前線程產生阻塞,好比一個線程從一個空的阻塞隊列中取元素,此時線程會被阻塞直到阻塞隊列中有了元素。當隊列中有元素後,被阻塞的線程會自動被喚醒(不須要咱們編寫代碼去喚醒)。這樣提供了極大的方便性。數組
1.非阻塞隊列中的幾個主要方法:緩存
add(E e):將元素e插入到隊列末尾,若是插入成功,則返回true;若是插入失敗(即隊列已滿),則會拋出異常;多線程
remove():移除隊首元素,若移除成功,則返回true;若是移除失敗(隊列爲空),則會拋出異常;併發
offer(E e):將元素e插入到隊列末尾,若是插入成功,則返回true;若是插入失敗(即隊列已滿),則返回false;函數
poll():移除並獲取隊首元素,若成功,則返回隊首元素;不然返回null;高併發
peek():獲取隊首元素,若成功,則返回隊首元素;不然返回null性能
對於非阻塞隊列,通常狀況下建議使用offer、poll和peek三個方法,不建議使用add和remove方法。由於使用offer、poll和peek三個方法能夠經過返回值判斷操做成功與否,而使用add和remove方法卻不能達到這樣的效果。注意,非阻塞隊列中的方法都沒有進行同步措施。this
2.阻塞隊列中的幾個主要方法:spa
阻塞隊列包括了非阻塞隊列中的大部分方法,上面列舉的5個方法在阻塞隊列中都存在,可是要注意這5個方法在阻塞隊列中都進行了同步措施。除此以外,阻塞隊列提供了另外4個很是有用的方法:
put(E e):put方法用來向隊尾存入元素,若是隊列滿,則等待;
take():take方法用來從隊首取元素,若是隊列爲空,則等待;
offer(E e,long timeout, TimeUnit unit):offer方法用來向隊尾存入元素,若是隊列滿,則等待必定的時間,當時間期限達到時,若是尚未插入成功,則返回false;不然返回true;
poll(long timeout, TimeUnit unit):poll方法用來從隊首取元素,若是隊列空,則等待必定的時間,當時間期限達到時,若是取到,則返回null;不然返回取得的元素;
二.七種主要的阻塞隊列
自從Java 1.5以後,在java.util.concurrent包下提供了若干個阻塞隊列,主要有如下幾個:
1.ArrayBlockingQueue:基於數組實現的一個有界阻塞隊列,該隊列內部維持着一個定長的數據緩衝隊列(該隊列由數組構成),此隊列按照先進先出(FIFO)的原則對元素進行排序,在建立ArrayBlockingQueue對象時必須指定容量大小。ArrayBlockingQueue內部還保存着兩個整形變量,分別標識着隊列的頭部和尾部在數組中的位置。
而且還能夠指定公平性與非公平性,默認狀況下爲非公平的。所謂公平訪問隊列是指阻塞的線程,能夠按照阻塞的前後順序訪問隊列,即先阻塞線程先訪問隊列。非公平性是對先等待的線程是非公平的,當隊列可用時,阻塞的線程均可以爭奪訪問隊列的資格,有可能先阻塞的線程最後才訪問隊列。爲了保證公平性,一般會下降吞吐量。咱們可使用如下代碼建立一個公平的阻塞隊列。
ArrayBlockingQueue fairQueue = new ArrayBlockingQueue(1000,true); public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0) throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair); //能夠看出訪問者的公平性是使用可重入鎖實現的
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
2.LinkedBlockingQueue:基於鏈表實現的一個有界阻塞隊列,內部維持着一個數據緩衝隊列(該隊列由鏈表構成),此隊列按照先進先出的原則對元素進行排序。當生產者往隊列中放入一個數據時,隊列會從生產者手中獲取數據,並緩存在隊列內部,而生產者當即返回;只有當隊列緩衝區達到最大值緩存容量時(能夠經過LinkedBlockingQueue的構造函數指定該值),纔會阻塞生產者隊列,直到消費者從隊列中消費掉一份數據,生產者線程將會被喚醒,反之對於消費者這端的處理也基於一樣的原理。在建立LinkedBlockingQueue對象時若是不指定容量大小,則默認大小爲Integer.MAX_VALUE。這樣的話,若是生產者的速度一旦大於消費者的速度,也許尚未等到隊列滿阻塞產生,系統內存就有可能已經被消耗殆盡了。
LinkedBlockingQueue之因此可以高效的處理併發數據,是由於其對於生產者端和消費者端分別採用了獨立的鎖來控制數據同步,這也意味着在高併發的狀況下生產者和消費者能夠並行地操做隊列中的數據,以此來提升整個隊列的併發性能。
3.PriorityBlockingQueue:支持優先級排序的無界阻塞隊列,以上2種隊列都是先進先出隊列,而PriorityBlockingQueue卻不是,它會按照元素的優先級對元素進行排序,默認狀況下元素採起天然順序排列,也能夠經過構造函數傳入的Compator對象來決定。而且也是按照優先級順序出隊,每次出隊的元素都是優先級最高的元素。在實現PriorityBlockingQueue時,內部控制線程同步的鎖採用的是公平鎖。須要注意的是PriorityBlockingQueue並不會阻塞數據生產者,而只是在沒有可消費的數據時阻塞數據的消費者,所以使用的時候要特別注意,生產者生產數據的速度絕對不能快於消費者消費數據的速度,不然時間一長,會最終耗盡全部的可用堆內存空間。注意,此阻塞隊列爲無界阻塞隊列,即容量沒有上限(經過源碼就能夠知道,它沒有容器滿的信號標誌)。
4.DelayQueue:基於PriorityQueue,一種支持延時的獲取元素的無界阻塞隊列,DelayQueue中的元素只有當其指定的延遲時間到了,纔可以從隊列中獲取到該元素。DelayQueue也是一個無界隊列,所以往隊列中插入數據的操做(生產者)永遠不會被阻塞,而只有獲取數據的操做(消費者)纔會被阻塞。
transfer()方法:若是當前有消費者正在等待接收元素(消費者使用take()方法或帶時間限制的poll()方法),transfer()方法能夠把生產者傳入的元素馬上傳輸給消費者;若是沒有消費者在等待接收元素,transfer()方法會將元素存放到隊列的tail節點,並等到該元素被消費者消費了才返回。
transfer()方法的關鍵代碼以下:
Node pred = tryAppend(s, haveData); return awaitMatch(s, pred, e, (how == TIMED), nanos);
第一行代碼是試圖把存放當前元素的s節點做爲tail節點,第二行代碼是讓CPU自旋等待消費者消費元素。由於自旋會消耗CPU,因此自旋必定的次數後使用Thread.yield()方法來暫停當前正在執行的線程,並執行其餘線程。
tryTransfer()方法:該方法是用來試探生產者傳入的元素是否能直接傳給消費者,若是沒有消費者等待接收元素,則返回false。與transfer()方法的區別:tryTransfer()方法是當即返回(不管消費者是否接收),transfer()方法是必須等到消費者消費了才返回。對於帶有時間限制的tryTransfer(E e, long timeout, TimeUnit unit)方法,則是試圖把生產者傳入的元素直接傳給消費者,可是若是沒有消費者消費該元素則等待指定的時間以後再返回,若是超時還沒消費元素,則返回false,若是在超時時間內消費了元素,則返回true。
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { /** * Serialization ID. This class relies on default serialization * even for the items array, which is default-serialized, even if * it is empty. Otherwise it could not be declared final, which is * necessary here. */ private static final long serialVersionUID = -817911632652898426L; /** The queued items */ final Object[] items; /** items index for next take, poll, peek or remove */ int takeIndex; /** items index for next put, offer, or add */ int putIndex; /** Number of elements in the queue */ int count; /** Main lock guarding all access */ final ReentrantLock lock; /** Condition for waiting takes */ private final Condition notEmpty; /** Condition for waiting puts */ private final Condition notFull; transient Itrs itrs = null;
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly();//操做以前先上鎖 try { while (count == items.length)//當隊列滿了 notFull.await(); //則生產者不繼續添加,而是將本身阻塞,直到有消費者來消費並將本身喚醒後,才能夠繼續執行 enqueue(e); } finally { lock.unlock(); //釋放鎖 } } private void enqueue(E x) {//至關於add()方法 final Object[] items = this.items; items[putIndex] = x;//在隊尾添加元素 if (++putIndex == items.length)//索引自增,若是已經是最後一個位置,從新設置 putIndex = 0 putIndex = 0; count++; notEmpty.signal(); } public E take() throws InterruptedException {//因爲此時併發容器已滿,因此生產者生產失敗,釋放了鎖,輪到消費者執行 final ReentrantLock lock = this.lock; lock.lockInterruptibly(); //操做前先上鎖 try { while (count == 0)//判斷容器不爲空 notEmpty.await(); return dequeue();//調用該方法 } finally { lock.unlock(); } } private E dequeue() {//至關於remove() final Object[] items = this.items;//獲取數組容器 E x = (E) items[takeIndex];//獲取隊首元素,由於ArrayBlockingQueue是先進先出隊列 items[takeIndex] = null;//將該位置置空 if (++takeIndex == items.length)//索引自增,若是已經是最後一個位置,從新設置 putIndex = 0 takeIndex = 0; count--;//將容器中元素個數減一 if (itrs != null) itrs.elementDequeued(); notFull.signal();//喚醒其餘被阻塞的線程,因爲剛纔生產者因容器已滿而被阻塞掉,這時候就會被該線程喚醒了,喚醒以後就可繼續它的生產工做。 return x; }