本文首發於一世流雲專欄: https://segmentfault.com/blog...
LinkedBlockingDeque
和ConcurrentLinkedDeque相似,都是一種雙端隊列的結構,只不過LinkedBlockingDeque同時也是一種阻塞隊列,它是在JDK1.5時隨着J.U.C包引入的,實現了BlockingDueue
接口,底層基於雙鏈表實現:java
注意:LinkedBlockingDeque底層利用ReentrantLock實現同步,並不像ConcurrentLinkedDeque那樣採用無鎖算法。
另外,LinkedBlockingDeque是一種近似有界阻塞隊列,爲何說近似?由於LinkedBlockingDeque既能夠在初始構造時就指定隊列的容量,也能夠不指定,若是不指定,那麼它的容量大小默認爲Integer.MAX_VALUE
。node
截止目前爲止,咱們介紹的阻塞隊列都是實現了BlockingQueue接口。和普通雙端隊列接口——Deque同樣,J.U.C中也有一種阻塞的雙端隊列接口——BlockingDeque
。BlockingDeque是JDK1.6時,J.U.C包新增的一個接口:算法
BlockingDeque的類繼承關係圖:segmentfault
咱們知道,BlockingQueue中阻塞方法一共有4個:put(e)
、take()
;offer(e, time, unit)
、poll(time, unit)
,忽略限時等待的阻塞方法,一共就兩個:
隊尾入隊:put(e)
隊首出隊:take()安全
BlockingDeque相對於BlockingQueue,最大的特色就是增長了在隊首入隊/隊尾出隊的阻塞方法。下面是兩個接口的比較:併發
阻塞方法 | BlockingQueue | BlockingDeque |
---|---|---|
隊首入隊 | / | putFirst(e) |
隊首出隊 | take() | takeFirst() |
隊尾入隊 | put(e) | putLast(e) |
隊尾出隊 | / | takeLast() |
LinkedBlockingDeque一共三種構造器,不指定容量時,默認爲Integer.MAX_VALUE
:this
/** * 默認構造器. */ public LinkedBlockingDeque() { this(Integer.MAX_VALUE); }
/** * 指定容量的構造器. */ public LinkedBlockingDeque(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; }
/** * 從已有集合構造隊列. */ public LinkedBlockingDeque(Collection<? extends E> c) { this(Integer.MAX_VALUE); final ReentrantLock lock = this.lock; lock.lock(); // Never contended, but necessary for visibility try { for (E e : c) { if (e == null) throw new NullPointerException(); if (!linkLast(new Node<E>(e))) throw new IllegalStateException("Deque full"); } } finally { lock.unlock(); } }
LinkedBlockingDeque內部是雙鏈表的結構,結點Node
的定義以下:spa
/** * 雙鏈表結點定義 */ static final class Node<E> { /** * 結點值, null表示該結點已被移除. */ E item; /** * 前驅結點指針. */ Node<E> prev; /** * 後驅結點指針. */ Node<E> next; Node(E x) { item = x; } }
字段first
指向隊首結點,字段last指向隊尾結點。另外LinkedBlockingDeque利用ReentrantLock來保證線程安全,全部對隊列的修改操做都須要先獲取這把全局鎖:線程
public class LinkedBlockingDeque<E> extends AbstractQueue<E> implements BlockingDeque<E>, java.io.Serializable { /** * 隊首結點指針. */ transient Node<E> first; /** * 隊尾結點指針. */ transient Node<E> last; /** * 隊列元素個數. */ private transient int count; /** * 隊列容量. */ private final int capacity; /** * 全局鎖 */ final ReentrantLock lock = new ReentrantLock(); /** * 出隊線程條件隊列(隊列爲空時,出隊線程在此等待) */ private final Condition notEmpty = lock.newCondition(); /** * 入隊線程條件隊列(隊列爲滿時,入隊線程在此等待) */ private final Condition notFull = lock.newCondition(); //... }
先來看下,LinkedBlockingDeque是如何實現正常的從隊尾入隊的:3d
/** * 在隊尾入隊元素e. * 若是隊列已滿, 則阻塞線程. */ public void put(E e) throws InterruptedException { putLast(e); } public void putLast(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); // 隊列不能包含null元素 Node<E> node = new Node<E>(e); // 建立入隊結點 final ReentrantLock lock = this.lock; lock.lock(); try { while (!linkLast(node)) // 隊列已滿, 則阻塞線程 notFull.await(); } finally { lock.unlock(); } }
put方法內部調用了putLast方法,這是Deque接口獨有的方法。上述入隊操做的關鍵是linkLast方法:
/** * 將結點node連接到隊尾, 若是失敗, 則返回false. */ private boolean linkLast(Node<E> node) { // assert lock.isHeldByCurrentThread(); if (count >= capacity) // 隊列已滿, 直接返回 return false; // 如下是雙鏈表的"尾插"操做 Node<E> l = last; node.prev = l; last = node; if (first == null) first = node; else l.next = node; ++count; // 隊列元素加1 notEmpty.signal(); // 喚醒一個等待的出隊線程 return true; }
linkLast方法在隊尾插入一個結點,插入失敗(隊列已滿的狀況)則返回false。插入成功,則喚醒一個正在等待的出隊線程:
初始:
隊尾插入結點node:
隊首入隊就是雙鏈表的「頭插法」插入一個結點,若是隊列已滿,則阻塞調用線程:
/** * 在隊首入隊元素e. * 若是隊列已滿, 則阻塞線程. */ public void putFirst(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); Node<E> node = new Node<E>(e); final ReentrantLock lock = this.lock; lock.lock(); try { while (!linkFirst(node)) // 隊列已滿, 則阻塞線程 notFull.await(); } finally { lock.unlock(); } }
/** * 在隊首插入一個結點, 插入失敗則返回null. */ private boolean linkFirst(Node<E> node) { // assert lock.isHeldByCurrentThread(); if (count >= capacity) // 隊列已滿 return false; // 如下是雙鏈表的「頭插」操做 Node<E> f = first; node.next = f; first = node; if (last == null) last = node; else f.prev = node; ++count; // 隊列元素數量加1 notEmpty.signal(); // 喚醒一個等待的出隊線程 return true; }
初始:
隊首插入結點node:
隊首出隊的邏輯很簡單,若是隊列爲空,則阻塞調用線程:
/** * 從隊首出隊一個元素, 若是隊列爲空, 則阻塞線程. */ public E take() throws InterruptedException { return takeFirst(); } public E takeFirst() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lock(); try { E x; while ((x = unlinkFirst()) == null) // 隊列爲空, 則阻塞線程 notEmpty.await(); return x; } finally { lock.unlock(); } }
實際的出隊由unlinkFirst方法執行:
/** * 從隊首刪除一個元素, 失敗則返回null. */ private E unlinkFirst() { // assert lock.isHeldByCurrentThread(); Node<E> f = first; if (f == null) // 隊列爲空 return null; // 如下是雙鏈表的頭部刪除過程 Node<E> n = f.next; E item = f.item; f.item = null; f.next = f; // help GC first = n; if (n == null) last = null; else n.prev = null; --count; // 隊列元素個數減1 notFull.signal(); // 喚醒一個等待的入隊線程 return item; }
初始:
刪除隊首結點:
隊尾出隊的邏輯很簡單,若是隊列爲空,則阻塞調用線程:
/** * 從隊尾出隊一個元素, 若是隊列爲空, 則阻塞線程. */ public E takeLast() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lock(); try { E x; while ((x = unlinkLast()) == null) // 隊列爲空, 阻塞線程 notEmpty.await(); return x; } finally { lock.unlock(); } }
實際的出隊由unlinkLast方法執行:
/** * 刪除隊尾元素, 若是失敗, 則返回null. */ private E unlinkLast() { // assert lock.isHeldByCurrentThread(); Node<E> l = last; if (l == null) // 隊列爲空 return null; // 如下爲雙鏈表的尾部刪除過程 Node<E> p = l.prev; E item = l.item; l.item = null; l.prev = l; // help GC last = p; if (p == null) first = null; else p.next = null; --count; // 隊列元素個數減1 notFull.signal(); // 喚醒一個等待的入隊線程 return item; }
初始:
刪除隊尾結點:
LinkedBlockingDeque做爲一種阻塞雙端隊列,提供了隊尾刪除元素和隊首插入元素的阻塞方法。該類在構造時通常須要指定容量,若是不指定,則最大容量爲Integer.MAX_VALUE
。另外,因爲內部經過ReentrantLock來保證線程安全,因此LinkedBlockingDeque的總體實現時比較簡單的。
另外,雙端隊列相比普通隊列,主要是多了【隊尾出隊元素】/【隊首入隊元素】的功能。
阻塞隊列咱們知道通常用於「生產者-消費者」模式,而雙端阻塞隊列在「生產者-消費者」就能夠利用「雙端」的特性,從隊尾出隊元素。
考慮下面這樣一種場景:有多個消費者,每一個消費者有本身的一個消息隊列,生產者不斷的生產數據扔到隊列中,消費者消費數據有快又慢。爲了提高效率,速度快的消費者能夠從其它消費者隊列的隊尾出隊元素放到本身的消息隊列中,因爲是從其它隊列的隊尾出隊,這樣能夠減小併發衝突(其它消費者從隊首出隊元素),又能提高整個系統的吞吐量。這實際上是一種「工做竊取算法」的思路。