Queue: 基本上,一個隊列就是一個先入先出(FIFO)的數據結構java
Queue接口與List、Set同一級別,都是繼承了Collection接口。LinkedList實現了Deque接 口。數組
Queue的實現安全
一、沒有實現的阻塞接口的LinkedList: 實現了java.util.Queue接口和java.util.AbstractQueue接口
內置的不阻塞隊列: PriorityQueue 和 ConcurrentLinkedQueue
PriorityQueue 和 ConcurrentLinkedQueue 類在 Collection Framework 中加入兩個具體集合實現。
PriorityQueue 類實質上維護了一個有序列表。加入到 Queue 中的元素根據它們的自然排序(經過其 java.util.Comparable 實現)或者根據傳遞給構造函數的 java.util.Comparator 實現來定位。
ConcurrentLinkedQueue 是基於連接節點的、線程安全的隊列。併發訪問不須要同步。由於它在隊列的尾部添加元素並從頭部刪除它們,因此只要不須要知道隊列的大 小, ConcurrentLinkedQueue 對公共集合的共享訪問就能夠工做得很好。收集關於隊列大小的信息會很慢,須要遍歷隊列。數據結構
二、實現阻塞接口的:
java.util.concurrent 中加入了 BlockingQueue 接口和五個阻塞隊列類。它實質上就是一種帶有一點扭曲的 FIFO 數據結構。不是當即從隊列中添加或者刪除元素,線程執行操做阻塞,直到有空間或者元素可用。
五個隊列所提供的各有不一樣:
* ArrayBlockingQueue :一個由數組支持的有界隊列。
* LinkedBlockingQueue :一個由連接節點支持的可選有界隊列。
* PriorityBlockingQueue :一個由優先級堆支持的無界優先級隊列。
* DelayQueue :一個由優先級堆支持的、基於時間的調度隊列。
* SynchronousQueue :一個利用 BlockingQueue 接口的簡單彙集(rendezvous)機制。多線程
下表顯示了jdk1.5中的阻塞隊列的操做:併發
add 增長一個元索 若是隊列已滿,則拋出一個IIIegaISlabEepeplian異常
remove 移除並返回隊列頭部的元素 若是隊列爲空,則拋出一個NoSuchElementException異常
element 返回隊列頭部的元素 若是隊列爲空,則拋出一個NoSuchElementException異常
offer 添加一個元素並返回true 若是隊列已滿,則返回false
poll 移除並返問隊列頭部的元素 若是隊列爲空,則返回null
peek 返回隊列頭部的元素 若是隊列爲空,則返回null
put 添加一個元素 若是隊列滿,則阻塞
take 移除並返回隊列頭部的元素 若是隊列爲空,則阻塞函數
remove、element、offer 、poll、peek 實際上是屬於Queue接口。 性能
阻塞隊列的操做能夠根據它們的響應方式分爲如下三類:aad、removee和element操做在你試圖爲一個已滿的隊列增長元素或從空隊列取得元素時 拋出異常。固然,在多線程程序中,隊列在任什麼時候間均可能變成滿的或空的,因此你可能想使用offer、poll、peek方法。這些方法在沒法完成任務時 只是給出一個出錯示而不會拋出異常。this
注意:poll和peek方法出錯進返回null。所以,向隊列中插入null值是不合法的spa
最後,咱們有阻塞操做put和take。put方法在隊列滿時阻塞,take方法在隊列空時阻塞。
LinkedBlockingQueue的容量是沒有上限的(說的不許確,在不指定時容量爲Integer.MAX_VALUE,不要然的話在put時怎麼會受阻呢),可是也能夠選擇指定其最大容量,它是基於鏈表的隊列,此隊列按 FIFO(先進先出)排序元素。
ArrayBlockingQueue在構造時須要指定容量, 並能夠選擇是否須要公平性,若是公平參數被設置true,等待時間最長的線程會優先獲得處理(其實就是經過將ReentrantLock設置爲true來 達到這種公平性的:即等待時間最長的線程會先操做)。一般,公平性會使你在性能上付出代價,只有在的確很是須要的時候再使用它。它是基於數組的阻塞循環隊 列,此隊列按 FIFO(先進先出)原則對元素進行排序。
PriorityBlockingQueue是一個帶優先級的 隊列,而不是先進先出隊列。元素按優先級順序被移除,該隊列也沒有上限(看了一下源碼,PriorityBlockingQueue是對 PriorityQueue的再次包裝,是基於堆數據結構的,而PriorityQueue是沒有容量限制的,與ArrayList同樣,因此在優先阻塞 隊列上put時是不會受阻的。雖然此隊列邏輯上是無界的,可是因爲資源被耗盡,因此試圖執行添加操做可能會致使 OutOfMemoryError),可是若是隊列爲空,那麼取元素的操做take就會阻塞,因此它的檢索操做take是受阻的。另外,往入該隊列中的元 素要具備比較能力。
DelayQueue(基於PriorityQueue來實現的)是一個存放Delayed 元素的無界阻塞隊列,只有在延遲期滿時才能從中提取元素。該隊列的頭部是延遲期滿後保存時間最長的 Delayed 元素。若是延遲都尚未期滿,則隊列沒有頭部,而且poll將返回null。當一個元素的 getDelay(TimeUnit.NANOSECONDS) 方法返回一個小於或等於零的值時,則出現期滿,poll就以移除這個元素了。此隊列不容許使用 null 元素。
ArrayBlockingQueue類的結構以下:
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { private static final long serialVersionUID = -817911632652898426L; final Object[] items; //用數據來存儲元素的容器 int takeIndex; //下一次讀取或移除的位置(remove、poll、take ) int putIndex; //下一次存放元素的位置(add、offer、put) int count; //隊列中元素的總數 final ReentrantLock lock; //全部訪問的保護鎖 private final Condition notEmpty; //等待獲取元素的條件 private final Condition notFull; //等待存放元素的條件 略...
能夠看出ArrayBlockingQueue內部使用final修飾的對象數組來存儲元素,一旦初始化數組,數組的大小就不可改變。使用ReentrantLock鎖來保證鎖競爭,使用Condition來控制插入或獲取元素時,線程是否阻塞。
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; //得到支持響應中斷的鎖 lock.lockInterruptibly(); try { //使用while循環來判斷隊列是否已滿,防止假喚醒 while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } }
首先得到鎖,而後判斷隊列是否已滿,若是已滿則阻塞當前生產線程,直到隊列中空閒時,被喚醒操做。隊列空閒則調用enqueue 插入元素。
private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; //把當前元素插入到數組中去 items[putIndex] = x; //這裏能夠看出這個數組是個環形數組 if (++putIndex == items.length) putIndex = 0; count++; // 喚醒在notEmpty條件上等待的線程 notEmpty.signal(); }
把元素插入到隊列中去,能夠看出這個隊列中的數組是環形數組結構,這樣每次插入、移除的時候不須要複製移動數組中的元素。
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; //得到可響應中斷鎖 lock.lockInterruptibly(); try { //使用while循環來判斷隊列是否已滿,防止假喚醒 while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } }
消費者線程從阻塞隊列中獲取元素,若是隊列中元素爲空,則阻塞當前的消費者線程直到有數據時才調用dequeue方法獲取元素。不然直接調用dequeue方法獲取元素
private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") //獲取元素 E x = (E) items[takeIndex]; //將當前位置的元素設置爲null items[takeIndex] = null; //這裏能夠看出這個數組是個環形數組 if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) //修改迭代器參數 itrs.elementDequeued(); // 喚醒在notFull條件上等待的線程 notFull.signal(); return x; }
直接從數據中獲取items[takeIndex]的元素,並設置當前位置的元素爲null,並設置下一次takeIndex的座標(++takeIndex),隊列元素總數-1等操做。
public boolean offer(E e) { checkNotNull(e); final ReentrantLock lock = this.lock; //得到不可響應中斷的鎖 lock.lock(); try { if (count == items.length) return false; else { // enqueue(e); return true; } } finally { lock.unlock(); } }
首先判斷隊列中的元素是否已滿,若是已滿則直接返回false,不然調用enqueue方法向隊列中插入元素,插入成功返回true。
public E poll() { final ReentrantLock lock = this.lock; //得到不可響應中斷的鎖 lock.lock(); try { return (count == 0) ? null : dequeue(); } finally { lock.unlock(); } }
判斷隊列是否爲空,若是爲空返回null,不然調用dequeue方法返回元素。
public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); }
首先調用offer方法插入元素,插入成功返回true,不然拋出IllegalStateException異常。
public E remove() { E x = poll(); if (x != null) return x; else throw new NoSuchElementException(); }
首先調用poll方法獲取元素,若是不爲空則直接返回,不然拋出NoSuchElementException異常。
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { checkNotNull(e); //獲得超時的時間 long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; //得到可響應中斷的鎖 lock.lockInterruptibly(); try { while (count == items.length) { if (nanos <= 0) return false; nanos = notFull.awaitNanos(nanos); } enqueue(e); return true; } finally { lock.unlock(); } }
首先判斷隊列是否已滿,若是已滿再循環判斷超時時間是否超時,超時則直接返回false,不然阻塞該生產線程nanos時間,若是nanos時間以內喚醒則調用enqueue方法插入元素。若是隊列不滿則直接調用enqueue方法插入元素,並返回true。
public E poll(long timeout, TimeUnit unit) throws InterruptedException { //獲得超時的時間 long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; //得到可響應中斷的鎖 lock.lockInterruptibly(); try { while (count == 0) { if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos); } return dequeue(); } finally { lock.unlock(); } }
首先循環判斷隊列是否爲空,若是爲空再判斷是否超時,超時則返回null。不超時則等待,在nanos時間喚醒則調用dequeue方法獲取元素。
public E element() { E x = peek(); if (x != null) return x; else throw new NoSuchElementException(); }
調用peek方法獲取元素,元素不爲空則返回,不然拋出NoSuchElementException異常。
public E peek() { final ReentrantLock lock = this.lock; lock.lock(); try { return itemAt(takeIndex); // null when queue is empty } finally { lock.unlock(); } } final E itemAt(int i) { return (E) items[i]; }
調用itemAt方法獲取元素。
** 其它的阻塞隊列實現原理都相似,都是使用ReentrantLock和Condition來完成併發控制、阻塞的。 **