Queue除了前面介紹的實現外,還有一種雙向的Queue實現Deque。這種隊列容許在隊列頭和尾部進行入隊出隊操做,所以在功能上比Queue顯然要更復雜。下圖描述的是Deque的完總體系圖。須要說明的是LinkedList也已經加入了Deque的一部分(LinkedList是從jdk1.2 開始就存在數據結構)。html
最原始的接口固然是Deque,繼承的接口爲BlockingDeque,以後因爲LikedBlockingDeque實現了BlockingDeuqe前端
Deque在Queue的基礎上增長了更多的操做方法。java
從上圖能夠看到,Deque不只具備FIFO的Queue實現,也有FILO的實現,也就是不只能夠實現隊列,也能夠實現一個堆棧。後端
同時在Deque的體系結構圖中能夠看到,實現一個Deque可使用數組(ArrayDeque),同時也可使用鏈表(LinkedList),還能夠同實現一個支持阻塞的線程安全版本隊列LinkedBlockingDeque。數組
對於數組實現的Deque來講,數據結構上比較簡單,只須要一個存儲數據的數組以及頭尾兩個索引便可。因爲數組是固定長度的,因此很容易就獲得數組的頭和尾,那麼對於數組的操做只須要移動頭和尾的索引便可。安全
特別說明的是ArrayDeque並非一個固定大小的隊列,每次隊列滿了之後就將隊列容量擴大一倍(doubleCapacity()),所以加入一個元素老是能成功,並且也不會拋出一個異常。也就是說ArrayDeque是一個沒有容量限制的隊列。數據結構
一樣繼續性能的考慮,使用System.arraycopy複製一個數組比循環設置要高效得多。併發
//數組雙端隊列ArrayDeque的源碼解析 public class ArrayDeque<E> extends AbstractCollection<E> implements Deque<E>, Cloneable, Serializable{ /** * 存放隊列元素的數組,數組的長度爲「2的指數」 */ private transient E[] elements; /** *隊列的頭部索引位置,(被remove()或pop()操做的位置),當爲空隊列時,首尾index相同 */ private transient int head; /** * 隊列的尾部索引位置,(被 addLast(E), add(E), 或 push(E)操做的位置). */ private transient int tail; /** * 隊列的最小容量(大小必須爲「2的指數」) */ private static final int MIN_INITIAL_CAPACITY = 8; // ****** Array allocation and resizing utilities ****** /** * 根據所給的數組長度,獲得一個比該長度大的最小的2^p的真實長度,並創建真實長度的空數組 */ private void allocateElements(int numElements) { int initialCapacity = MIN_INITIAL_CAPACITY; if (numElements >= initialCapacity) { initialCapacity = numElements; initialCapacity |= (initialCapacity >>> 1); initialCapacity |= (initialCapacity >>> 2); initialCapacity |= (initialCapacity >>> 4); initialCapacity |= (initialCapacity >>> 8); initialCapacity |= (initialCapacity >>> 16); initialCapacity++; if (initialCapacity < 0) // Too many elements, must back off initialCapacity >>>= 1;// Good luck allocating 2 ^ 30 elements } elements = (E[]) new Object[initialCapacity]; } /** * 當隊列首尾指向同一個引用時,擴充隊列的容量爲原來的兩倍,並對元素從新定位到新數組中 */ private void doubleCapacity() { assert head == tail; int p = head; int n = elements.length; int r = n - p; // number of elements to the right of p int newCapacity = n << 1; if (newCapacity < 0) throw new IllegalStateException("Sorry, deque too big"); Object[] a = new Object[newCapacity]; System.arraycopy(elements, p, a, 0, r); System.arraycopy(elements, 0, a, r, p); elements = (E[])a; head = 0; tail = n; } /** * 拷貝隊列中的元素到新數組中 */ private <T> T[] copyElements(T[] a) { if (head < tail) { System.arraycopy(elements, head, a, 0, size()); } else if (head > tail) { int headPortionLen = elements.length - head; System.arraycopy(elements, head, a, 0, headPortionLen); System.arraycopy(elements, 0, a, headPortionLen, tail); } return a; } /** * 默認構造隊列,初始化一個長度爲16的數組 */ public ArrayDeque() { elements = (E[]) new Object[16]; } /** * 指定元素個數的構造方法 */ public ArrayDeque(int numElements) { allocateElements(numElements); } /** * 用一個集合做爲參數的構造方法 */ public ArrayDeque(Collection<? extends E> c) { allocateElements(c.size()); addAll(c); } //插入和刪除的方法主要是: addFirst(),addLast(), pollFirst(), pollLast()。 //其餘的方法依賴於這些實現。 /** * 在雙端隊列的前端插入元素,元素爲null拋異常 */ public void addFirst(E e) { if (e == null) throw new NullPointerException(); elements[head = (head - 1) & (elements.length - 1)] = e; if (head == tail) doubleCapacity(); } /** *在雙端隊列的末端插入元素,元素爲null拋異常 */ public void addLast(E e) { if (e == null) throw new NullPointerException(); elements[tail] = e; if ( (tail = (tail + 1) & (elements.length - 1)) == head) doubleCapacity(); } /** * 在前端插入,調用addFirst實現,返回boolean類型 */ public boolean offerFirst(E e) { addFirst(e); return true; } /** * 在末端插入,調用addLast實現,返回boolean類型 */ public boolean offerLast(E e) { addLast(e); return true; } /** * 刪除前端,調用pollFirst實現 */ public E removeFirst() { E x = pollFirst(); if (x == null) throw new NoSuchElementException(); return x; } /** * 刪除後端,調用pollLast實現 */ public E removeLast() { E x = pollLast(); if (x == null) throw new NoSuchElementException(); return x; } //前端出對(刪除前端) public E pollFirst() { int h = head; E result = elements[h]; // Element is null if deque empty if (result == null) return null; elements[h] = null; // Must null out slot head = (h + 1) & (elements.length - 1); return result; } //後端出對(刪除後端) public E pollLast() { int t = (tail - 1) & (elements.length - 1); E result = elements[t]; if (result == null) return null; elements[t] = null; tail = t; return result; } /** * 獲得前端頭元素 */ public E getFirst() { E x = elements[head]; if (x == null) throw new NoSuchElementException(); return x; } /** * 獲得末端尾元素 */ public E getLast() { E x = elements[(tail - 1) & (elements.length - 1)]; if (x == null) throw new NoSuchElementException(); return x; } public E peekFirst() { return elements[head]; // elements[head] is null if deque empty } public E peekLast() { return elements[(tail - 1) & (elements.length - 1)]; } /** * 移除此雙端隊列中第一次出現的指定元素(當從頭部到尾部遍歷雙端隊列時)。 */ public boolean removeFirstOccurrence(Object o) { if (o == null) return false; int mask = elements.length - 1; int i = head; E x; while ( (x = elements[i]) != null) { if (o.equals(x)) { delete(i); return true; } i = (i + 1) & mask; } return false; } /** * 移除此雙端隊列中最後一次出現的指定元素(當從頭部到尾部遍歷雙端隊列時)。 */ public boolean removeLastOccurrence(Object o) { if (o == null) return false; int mask = elements.length - 1; int i = (tail - 1) & mask; E x; while ( (x = elements[i]) != null) { if (o.equals(x)) { delete(i); return true; } i = (i - 1) & mask; } return false; } // *** 隊列方法(Queue methods) *** /** * add方法,添加到隊列末端 */ public boolean add(E e) { addLast(e); return true; } /** * 同上 */ public boolean offer(E e) { return offerLast(e); } /** * remove元素,刪除隊列前端 */ public E remove() { return removeFirst(); } /** * 彈出前端(出對,刪除前端) */ public E poll() { return pollFirst(); } public E element() { return getFirst(); } public E peek() { return peekFirst(); } // *** 棧 方法(Stack methods) *** public void push(E e) { addFirst(e); } public E pop() { return removeFirst(); } private void checkInvariants() { …… } private boolean delete(int i) { …… } // *** 集合方法(Collection Methods) *** …… // *** Object methods *** …… } 總體來講:1個數組,2個index(head 索引和tail索引)。實現比較簡單,容易理解。
對於LinkedList自己而言,數據結構就更簡單了,除了一個size用來記錄大小外,只有head一個元素Entry。對比Map和Queue的其它數據結構能夠看到這裏的Entry有兩個引用,是雙向的隊列。框架
在示意圖中,LinkedList老是有一個「傀儡」節點,用來描述隊列「頭部」,可是並不表示頭部元素,它是一個執行null的空節點。高併發
隊列一開始只有head一個空元素,而後從尾部加入E1(add/addLast),head和E1之間創建雙向連接。而後繼續從尾部加入E2,E2就在head和E1之間創建雙向連接。最後從隊列的頭部加入E3(push/addFirst),因而E3就在E1和head之間連接雙向連接。
雙向鏈表的數據結構比較簡單,操做起來也比較容易,從事從「傀儡」節點開始,「傀儡」節點的下一個元素就是隊列的頭部,前一個元素是隊列的尾部,換句話說,「傀儡」節點在頭部和尾部之間創建了一個通道,是整個隊列造成一個循環,這樣就能夠從任意一個節點的任意一個方向能遍歷完整的隊列。
一樣LinkedList也是一個沒有容量限制的隊列,所以入隊列(不論是從頭部仍是尾部)總能成功。
上面描述的ArrayDeque和LinkedList是兩種不一樣方式的實現,一般在遍歷和節省內存上ArrayDeque更高效(索引更快,另外不須要Entry對象),可是在隊列擴容下LinkedList更靈活,由於不須要複製原始的隊列,某些狀況下可能更高效。
一樣須要注意的上述兩個實現都不是線程安全的,所以只適合在單線程環境下使用,下面章節要介紹的LinkedBlockingDeque就是線程安全的可阻塞的Deque。事實上也應該是功能最強大的Queue實現,固然了實現起來也許會複雜一點。
雙向併發阻塞隊列。所謂雙向是指能夠從隊列的頭和尾同時操做,併發只是線程安全的實現,阻塞容許在入隊出隊不知足條件時掛起線程,這裏說的隊列是指支持FIFO/FILO實現的鏈表。
首先看下LinkedBlockingDeque的數據結構。一般狀況下從數據結構上就能看出這種實現的優缺點,這樣就知道如何更好的使用工具了。
從數據結構和功能需求上能夠獲得如下結論:
要想支持阻塞功能,隊列的容量必定是固定的,不然沒法在入隊的時候掛起線程。也就是capacity是final類型的。
既然是雙向鏈表,每個結點就須要先後兩個引用,這樣才能將全部元素串聯起來,支持雙向遍歷。也即須要prev/next兩個引用。
雙向鏈表須要頭尾同時操做,因此須要first/last兩個節點,固然能夠參考LinkedList那樣採用一個節點的雙向來完成,那樣實現起來就稍微麻煩點。
既然要支持阻塞功能,就須要鎖和條件變量來掛起線程。這裏使用一個鎖兩個條件變量來完成此功能。
public class LinkedBlockingDeque<E> extends AbstractQueue<E> implements BlockingDeque<E>, java.io.Serializable { /** 包含前驅和後繼節點的雙向鏈式結構 */ static final class Node<E> { E item; Node<E> prev; Node<E> next; Node(E x, Node<E> p, Node<E> n) { item = x; prev = p; next = n; } } /** 頭節點 */ private transient Node<E> first; /** 尾節點 */ private transient Node<E> last; /** 元素個數*/ private transient int count; /** 隊列容量 */ private final int capacity; /** 鎖 */ private final ReentrantLock lock = new ReentrantLock(); /** notEmpty條件 */ private final Condition notEmpty = lock.newCondition(); /** notFull條件 */ private final Condition notFull = lock.newCondition(); /** 構造方法 */ public LinkedBlockingDeque() { this(Integer.MAX_VALUE); } public LinkedBlockingDeque(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; } public LinkedBlockingDeque(Collection<? extends E> c) { this(Integer.MAX_VALUE); for (E e : c) add(e); } /** * 添加元素做爲新的頭節點 */ private boolean linkFirst(E e) { if (count >= capacity) return false; ++count; Node<E> f = first; Node<E> x = new Node<E>(e, null, f); first = x; if (last == null) last = x; else f.prev = x; notEmpty.signal(); return true; } /** * 添加尾元素 */ private boolean linkLast(E e) { if (count >= capacity) return false; ++count; Node<E> l = last; Node<E> x = new Node<E>(e, l, null); last = x; if (first == null) first = x; else l.next = x; notEmpty.signal(); return true; } /** * 返回並移除頭節點 */ private E unlinkFirst() { Node<E> f = first; if (f == null) return null; Node<E> n = f.next; first = n; if (n == null) last = null; else n.prev = null; --count; notFull.signal(); return f.item; } /** * 返回並移除尾節點 */ private E unlinkLast() { Node<E> l = last; if (l == null) return null; Node<E> p = l.prev; last = p; if (p == null) first = null; else p.next = null; --count; notFull.signal(); return l.item; } /** * 移除節點x */ private void unlink(Node<E> x) { Node<E> p = x.prev; Node<E> n = x.next; if (p == null) {//x是頭的狀況 if (n == null) first = last = null; else { n.prev = null; first = n; } } else if (n == null) {//x是尾的狀況 p.next = null; last = p; } else {//x是中間的狀況 p.next = n; n.prev = p; } --count; notFull.signalAll(); } //--------------------------------- BlockingDeque 雙端阻塞隊列方法實現 public void addFirst(E e) { if (!offerFirst(e)) throw new IllegalStateException("Deque full"); } public void addLast(E e) { if (!offerLast(e)) throw new IllegalStateException("Deque full"); } public boolean offerFirst(E e) { if (e == null) throw new NullPointerException(); lock.lock(); try { return linkFirst(e); } finally { lock.unlock(); } } public boolean offerLast(E e) { if (e == null) throw new NullPointerException(); lock.lock(); try { return linkLast(e); } finally { lock.unlock(); } } public void putFirst(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); lock.lock(); try { while (!linkFirst(e)) notFull.await(); } finally { lock.unlock(); } } public void putLast(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); lock.lock(); try { while (!linkLast(e)) notFull.await(); } finally { lock.unlock(); } } public boolean offerFirst(E e, long timeout, TimeUnit unit) throws InterruptedException { if (e == null) throw new NullPointerException(); long nanos = unit.toNanos(timeout); lock.lockInterruptibly(); try { for (;;) { if (linkFirst(e)) return true; if (nanos <= 0) return false; nanos = notFull.awaitNanos(nanos); } } finally { lock.unlock(); } } public boolean offerLast(E e, long timeout, TimeUnit unit) throws InterruptedException { if (e == null) throw new NullPointerException(); long nanos = unit.toNanos(timeout); lock.lockInterruptibly(); try { for (;;) { if (linkLast(e)) return true; if (nanos <= 0) return false; nanos = notFull.awaitNanos(nanos); } } finally { lock.unlock(); } } public E removeFirst() { E x = pollFirst(); if (x == null) throw new NoSuchElementException(); return x; } public E removeLast() { E x = pollLast(); if (x == null) throw new NoSuchElementException(); return x; } public E pollFirst() { lock.lock(); try { return unlinkFirst(); } finally { lock.unlock(); } } public E pollLast() { lock.lock(); try { return unlinkLast(); } finally { lock.unlock(); } } public E takeFirst() throws InterruptedException { lock.lock(); try { E x; while ( (x = unlinkFirst()) == null) notEmpty.await(); return x; } finally { lock.unlock(); } } public E takeLast() throws InterruptedException { lock.lock(); try { E x; while ( (x = unlinkLast()) == null) notEmpty.await(); return x; } finally { lock.unlock(); } } public E pollFirst(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); lock.lockInterruptibly(); try { for (;;) { E x = unlinkFirst(); if (x != null) return x; if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos); } } finally { lock.unlock(); } } public E pollLast(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); lock.lockInterruptibly(); try { for (;;) { E x = unlinkLast(); if (x != null) return x; if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos); } } finally { lock.unlock(); } } public E getFirst() { E x = peekFirst(); if (x == null) throw new NoSuchElementException(); return x; } public E getLast() { E x = peekLast(); if (x == null) throw new NoSuchElementException(); return x; } public E peekFirst() { lock.lock(); try { return (first == null) ? null : first.item; } finally { lock.unlock(); } } public E peekLast() { lock.lock(); try { return (last == null) ? null : last.item; } finally { lock.unlock(); } } public boolean removeFirstOccurrence(Object o) { if (o == null) return false; lock.lock(); try { for (Node<E> p = first; p != null; p = p.next) { if (o.equals(p.item)) { unlink(p); return true; } } return false; } finally { lock.unlock(); } } public boolean removeLastOccurrence(Object o) { if (o == null) return false; lock.lock(); try { for (Node<E> p = last; p != null; p = p.prev) { if (o.equals(p.item)) { unlink(p); return true; } } return false; } finally { lock.unlock(); } } //---------------------------------- BlockingQueue阻塞隊列 方法實現 public boolean add(E e) { addLast(e); return true; } public boolean offer(E e) { return offerLast(e); } public void put(E e) throws InterruptedException { putLast(e); } public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { return offerLast(e, timeout, unit); } public E remove() { return removeFirst(); } public E poll() { return pollFirst(); } public E take() throws InterruptedException { return takeFirst(); } public E poll(long timeout, TimeUnit unit) throws InterruptedException { return pollFirst(timeout, unit); } public E element() { return getFirst(); } public E peek() { return peekFirst(); } //------------------------------------------- Stack 方法實現 public void push(E e) { addFirst(e); } public E pop() { return removeFirst(); } //------------------------------------------- Collection 方法實現 public boolean remove(Object o) { return removeFirstOccurrence(o); } public int size() { lock.lock(); try { return count; } finally { lock.unlock(); } } public boolean contains(Object o) { if (o == null) return false; lock.lock(); try { for (Node<E> p = first; p != null; p = p.next) if (o.equals(p.item)) return true; return false; } finally { lock.unlock(); } } boolean removeNode(Node<E> e) { lock.lock(); try { for (Node<E> p = first; p != null; p = p.next) { if (p == e) { unlink(p); return true; } } return false; } finally { lock.unlock(); } } …… }
有了上面的結論再來研究LinkedBlockingDeque的優缺點。
優勢固然是功能足夠強大,同時因爲採用一個獨佔鎖,所以實現起來也比較簡單。全部對隊列的操做都加鎖就能夠完成。同時獨佔鎖也可以很好的支持雙向阻塞的特性。
凡事有利必有弊。缺點就是因爲獨佔鎖,因此不能同時進行兩個操做,這樣性能上就大打折扣。從性能的角度講LinkedBlockingDeque要比LinkedBlockingQueue要低不少,比CocurrentLinkedQueue就低更多了,這在高併發狀況下就比較明顯了。
前面分析足夠多的Queue實現後,LinkedBlockingDeque的原理和實現就不值得一提了,無非是在獨佔鎖下對一個鏈表的普通操做。
有趣的是此類支持序列化,可是Node並不支持序列化,所以fist/last就不能序列化,那麼如何完成序列化/反序列化過程呢?
清單4 描述的是LinkedBlockingDeque序列化/反序列化的過程。序列化時將真正的元素寫入輸出流,最後還寫入了一個null。讀取的時候將全部對象列表讀出來,若是讀取到一個null就表示結束。這就是爲何寫入的時候寫入一個null的緣由,由於沒有將count寫入流,因此就靠null來表示結束,省一個整數空間。
集合框架 Queue篇(1)---ArrayDeque
http://hi.baidu.com/yao1111yao/item/1a1346f65a50d9c8521c266d
集合框架 Queue篇(7)---LinkedBlockingDeque
http://hi.baidu.com/yao1111yao/item/b1649cff2cf60be91a111f6d
深刻淺出 Java Concurrency (24): 併發容器 part 9 雙向隊列集合 Deque
http://www.blogjava.net/xylz/archive/2010/08/12/328587.html
深刻淺出 Java Concurrency (25): 併發容器 part 10 雙向併發阻塞隊列 BlockingDeque
http://www.blogjava.net/xylz/archive/2010/08/18/329227.html