阻塞隊列中目前還剩下一個比較特殊的隊列實現,相比較前面講解過的隊列,本文中要講的LinkedBlockingDeque比較容易理解了,可是與以前講解過的阻塞隊列又有些不一樣,從命名上你應該能看出一些端倪,接下來就一塊兒看看這個特殊的阻塞隊列java
JDK版本號:1.8.0_171
LinkedBlockingDeque在結構上有別於以前講解過的阻塞隊列,它不是Queue而是Deque,中文翻譯成雙端隊列,雙端隊列指能夠從任意一端入隊或者出隊元素的隊列,實現了在隊列頭和隊列尾的高效插入和移除node
LinkedBlockingDeque是鏈表實現的線程安全的無界的同時支持FIFO、LIFO的雙端阻塞隊列,能夠回顧下以前的LinkedBlockingQueue阻塞隊列特色,本質上是相似的,可是又有些不一樣:設計模式
Queue和Deque的關係有點相似於單鏈表和雙向鏈表,LinkedBlockingQueue和LinkedBlockingDeque的內部結點實現就是單鏈表和雙向鏈表的區別,具體可參考源碼安全
在第二點中可能有些人有些疑問,兩個互斥鎖和一個互斥鎖的區別在哪裏?咱們能夠考慮如下場景:多線程
A線程先進行入隊操做,B線程隨後進行出隊操做,若是是LinkedBlockingQueue,A線程入隊過程還未結束(已得到鎖還未釋放),B線程出隊操做不會被阻塞等待(鎖不一樣),若是是LinkedBlockingDeque則B線程會被阻塞等待(同一把鎖)A線程完成操做才繼續執行併發
LinkedBlockingQueue通常的操做是獲取一把鎖就能夠,但有些操做例如remove操做,則須要同時獲取兩把鎖,以前的LinkedBlockingQueue講解曾經說明過,這裏就不詳細講解了源碼分析
實現BlockingDeque接口,其中定義了雙端隊列應該實現的方法,具體方法不說明了,主要是每一個方法都分爲了First和Last兩種方式,從頭部或者尾部進行隊列操做性能
public class LinkedBlockingDeque<E> extends AbstractQueue<E> implements BlockingDeque<E>, java.io.Serializable
/** * 頭結點 */ transient Node<E> first; /** * 尾結點 */ transient Node<E> last; /** 雙端隊列實際結點個數 */ private transient int count; /** 雙端隊列容量 */ private final int capacity; /** 互斥重入鎖 */ final ReentrantLock lock = new ReentrantLock(); /** 非空條件對象 */ private final Condition notEmpty = lock.newCondition(); /** 非滿條件對象 */ private final Condition notFull = lock.newCondition();
爲了實現雙端隊列,內部使用了雙向鏈表,不像LinkedBlockingQueue使用的是單鏈表,前驅和後繼指針的特殊狀況須要注意this
/** 雙向鏈表Node */ static final class Node<E> { /** * 節點數據值,移除則爲null */ E item; /** * 下列狀況之一: * - 前驅節點 * - 前驅是尾節點,則prev指向當前節點 * - 無前驅節點則爲null */ Node<E> prev; /** * 下列狀況之一: * - 後繼節點 * - 後繼是頭節點,則next指向當前節點 * - 無後繼節點則爲null */ Node<E> next; Node(E x) { item = x; } }
構造方法比較簡單,可設置容量上限,不設置默認Integer.MAX_VALUE,相似以前的LinkedBlockingQueuespa
public LinkedBlockingDeque() { this(Integer.MAX_VALUE); } public LinkedBlockingDeque(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; } public LinkedBlockingDeque(Collection<? extends E> c) { this(Integer.MAX_VALUE); final ReentrantLock lock = this.lock; // 確保可見性 lock.lock(); // Never contended, but necessary for visibility try { for (E e : c) { if (e == null) throw new NullPointerException(); if (!linkLast(new Node<E>(e))) throw new IllegalStateException("Deque full"); } } finally { lock.unlock(); } }
在入隊操做中,將每種方法都分紅了隊首入隊和隊尾入隊兩種,在獲取鎖以後最終調用方法是linkFirst/linkLast。在putFirst中若是隊列已滿則經過notFull.await()阻塞操做,比較容易理解
入隊方法以下:
類別 | 失敗返回特殊值 | 失敗拋異常 | 阻塞等待 | 超時阻塞等待 |
---|---|---|---|---|
隊首 | offerFirst | push/addFirst | putFirst | offerFirst(E e, long timeout, TimeUnit unit) |
隊尾 | offer/offerLast | add/addLast | put/putLast | offer/offerLast(E e, long timeout, TimeUnit unit) |
public boolean offerFirst(E e) { if (e == null) throw new NullPointerException(); Node<E> node = new Node<E>(e); final ReentrantLock lock = this.lock; lock.lock(); try { return linkFirst(node); } finally { lock.unlock(); } } public boolean offerLast(E e) { if (e == null) throw new NullPointerException(); Node<E> node = new Node<E>(e); final ReentrantLock lock = this.lock; lock.lock(); try { return linkLast(node); } finally { lock.unlock(); } }
將結點添加到頭部或者尾部,若是隊列已滿則返回false,在調用方法以前已經獲取互斥鎖才進行操做
/** * Links node as first element, or returns false if full. */ private boolean linkFirst(Node<E> node) { // assert lock.isHeldByCurrentThread(); // 隊列已滿 if (count >= capacity) return false; // 更新first Node<E> f = first; node.next = f; first = node; // last爲空則表示原隊列爲空,當前隊列僅有node則last更新指向node便可 if (last == null) last = node; else // 原隊列非空,更新原first的前驅指針 f.prev = node; ++count; // 經過條件對象喚醒出隊操做阻塞線程 notEmpty.signal(); return true; } /** * Links node as last element, or returns false if full. */ private boolean linkLast(Node<E> node) { // assert lock.isHeldByCurrentThread(); if (count >= capacity) return false; // 更新last Node<E> l = last; node.prev = l; last = node; // first爲空則表示原隊列爲空,當前隊列僅有node則first更新指向node便可 if (first == null) first = node; else // 原隊列非空,更新原last的後繼指針 l.next = node; // 實際節點數量 + 1 ++count; // 經過條件對象喚醒出隊操做阻塞線程 notEmpty.signal(); return true; }
在出隊操做中,一樣將每種方法都分紅了隊首出隊和隊尾出隊兩種,在獲取鎖以後最終調用方法是unlinkFirst/unlinkLast。在takeFirst中若是隊列爲空則經過notEmpty.await()阻塞操做,比較容易理解
出隊方法以下:
類別 | 失敗返回特殊值 | 失敗拋異常 | 阻塞等待 | 超時阻塞等待 |
---|---|---|---|---|
隊首 | poll/pollFirst | pop/remove/removeFirst | take/takeFirst | poll/pollFirst(long timeout, TimeUnit unit) |
隊尾 | pollLast | removeLast | takeLast | pollLast(long timeout, TimeUnit unit) |
public E pollFirst() { final ReentrantLock lock = this.lock; lock.lock(); try { return unlinkFirst(); } finally { lock.unlock(); } } public E pollLast() { final ReentrantLock lock = this.lock; lock.lock(); try { return unlinkLast(); } finally { lock.unlock(); } }
移除返回隊頭元素或隊尾元素,假如隊列爲空則返回null
/** * Removes and returns first element, or null if empty. */ private E unlinkFirst() { // assert lock.isHeldByCurrentThread(); Node<E> f = first; // 判空 if (f == null) return null; // 更新first Node<E> n = f.next; E item = f.item; // item置空,next指向本身,方便gc回收 f.item = null; f.next = f; // help GC first = n; // 當前隊列爲空,last指向置空 if (n == null) last = null; else // 當前隊列非空,新的頭結點前驅置空 n.prev = null; // 實際結點數量 - 1 --count; // 經過條件對象喚醒入隊操做阻塞線程 notFull.signal(); return item; } /** * Removes and returns last element, or null if empty. */ private E unlinkLast() { // assert lock.isHeldByCurrentThread(); Node<E> l = last; if (l == null) return null; // 更新last Node<E> p = l.prev; // item置空,prev指向本身,方便gc回收 E item = l.item; l.item = null; l.prev = l; // help GC last = p; // 當前隊列爲空,last指向置空 if (p == null) first = null; else // 當前隊列非空,新的尾結點後繼置空 p.next = null; --count; // 經過條件對象喚醒入隊操做阻塞線程 notFull.signal(); return item; }
將隊列中匹配的結點刪除,鏈表中進行解除先後關聯便可,注意下若是x處於隊列中間(非頭和尾結點),則x自己的前驅和後繼指針不會被更新修改,爲的是防止迭代器循環到x找不到先後結點,避免迭代器異常
/** * Unlinks x. */ void unlink(Node<E> x) { // assert lock.isHeldByCurrentThread(); // 前驅結點和後繼結點 Node<E> p = x.prev; Node<E> n = x.next; // 前驅爲空,至關於刪除頭結點 if (p == null) { unlinkFirst(); } else if (n == null) { // 後繼爲空,至關於刪除尾結點 unlinkLast(); } else { // 前驅後繼都不爲空,解除刪除結點與先後結點的關係,item置空 // 注意,這裏x自己的前驅和後繼沒有被更新修改,爲的是防止迭代器循環到x找不到先後結點,避免迭代器異常 p.next = n; n.prev = p; x.item = null; --count; // 經過條件對象喚醒入隊操做阻塞線程 notFull.signal(); } }
peek操做直接經過首尾節點指向得到對應的item便可,不會刪除節點
public E peekFirst() { final ReentrantLock lock = this.lock; lock.lock(); try { return (first == null) ? null : first.item; } finally { lock.unlock(); } } public E peekLast() { final ReentrantLock lock = this.lock; lock.lock(); try { return (last == null) ? null : last.item; } finally { lock.unlock(); } }
刪除第一個/最後一個知足條件的隊列結點,removeFirstOccurrence從前向後進行匹配,removeLastOccurrence從後向前進行匹配,找到第一個知足條件的結點進行刪除操做
public boolean removeFirstOccurrence(Object o) { if (o == null) return false; final ReentrantLock lock = this.lock; lock.lock(); try { // 從前向後循環判斷相等則經過unlink移除 for (Node<E> p = first; p != null; p = p.next) { if (o.equals(p.item)) { unlink(p); return true; } } return false; } finally { lock.unlock(); } } public boolean removeLastOccurrence(Object o) { if (o == null) return false; final ReentrantLock lock = this.lock; lock.lock(); try { // 從後向前循環判斷相等則經過unlink移除 for (Node<E> p = last; p != null; p = p.prev) { if (o.equals(p.item)) { unlink(p); return true; } } return false; } finally { lock.unlock(); } }
轉移隊列操做
public int drainTo(Collection<? super E> c, int maxElements) { if (c == null) throw new NullPointerException(); if (c == this) throw new IllegalArgumentException(); if (maxElements <= 0) return 0; final ReentrantLock lock = this.lock; lock.lock(); try { int n = Math.min(maxElements, count); for (int i = 0; i < n; i++) { // 先添加防止未添加到新隊列中時原隊列結點出隊 c.add(first.item); // In this order, in case add() throws. // 解除關聯 unlinkFirst(); } return n; } finally { lock.unlock(); } }
清空隊列操做,比較簡單,很好理解
public void clear() { final ReentrantLock lock = this.lock; lock.lock(); try { for (Node<E> f = first; f != null; ) { f.item = null; Node<E> n = f.next; f.prev = null; f.next = null; f = n; } first = last = null; count = 0; notFull.signalAll(); } finally { lock.unlock(); } }
因爲其雙向鏈表的實現,迭代器可分爲升序迭代器(Itr)和倒序迭代器(DescendingItr),經過AbstractItr封裝公共操做方法,Itr和DescendingItr分別實現對應不一樣的方法,模板方法設計模式寫法可借鑑,一個從頭節點開始向後進行遍歷,一個從尾節點向後進行遍歷
private abstract class AbstractItr implements Iterator<E> { /** * next方法返回的node */ Node<E> next; /** * 保存next的item,防止hasNext爲true後節點被刪除再調用next獲取不到值的狀況 */ E nextItem; /** * 最近一次調用next返回的節點,若是經過調用remove刪除了此元素,則重置爲null */ private Node<E> lastRet; // 兩個不一樣迭代器實現不一樣 abstract Node<E> firstNode(); abstract Node<E> nextNode(Node<E> n); // 構造方法初始化,設置next和nextItem AbstractItr() { // set to initial position final ReentrantLock lock = LinkedBlockingDeque.this.lock; lock.lock(); try { next = firstNode(); nextItem = (next == null) ? null : next.item; } finally { lock.unlock(); } } /** * 返回後繼節點 */ private Node<E> succ(Node<E> n) { for (;;) { Node<E> s = nextNode(n); if (s == null) return null; else if (s.item != null) return s; else if (s == n) return firstNode(); else n = s; } } /** * 設置下一次執行next應該返回的值 */ void advance() { final ReentrantLock lock = LinkedBlockingDeque.this.lock; lock.lock(); try { // assert next != null; next = succ(next); nextItem = (next == null) ? null : next.item; } finally { lock.unlock(); } } public boolean hasNext() { return next != null; } public E next() { if (next == null) throw new NoSuchElementException(); lastRet = next; E x = nextItem; advance(); return x; } // 移除操做,注意,這裏直接在原隊列中移除了lastRet對應的節點 public void remove() { Node<E> n = lastRet; if (n == null) throw new IllegalStateException(); lastRet = null; final ReentrantLock lock = LinkedBlockingDeque.this.lock; lock.lock(); try { if (n.item != null) unlink(n); } finally { lock.unlock(); } } } /** Forward iterator */ // 從頭節點向後遍歷迭代器 private class Itr extends AbstractItr { Node<E> firstNode() { return first; } Node<E> nextNode(Node<E> n) { return n.next; } } /** Descending iterator */ // 從尾節點向後遍歷迭代器 private class DescendingItr extends AbstractItr { Node<E> firstNode() { return last; } Node<E> nextNode(Node<E> n) { return n.prev; } }
源碼分析完畢,整理LinkedBlockingDeque有以下特色:
以上內容若有問題歡迎指出,筆者驗證後將及時修正,謝謝