本文將主要結合源碼對 JDK 中的阻塞隊列進行分析,並比較其各自的特色;java
說到阻塞隊列想到的第一個應用場景可能就是生產者消費者模式了,如圖所示;node
根據上圖所示,明顯在入隊和出隊的時候,會發生競爭;因此一種很天然的想法就是使用鎖,而在 JDK 中也的確是經過鎖來實現的;因此 BlockingQueue
的源碼其實能夠當成鎖的應用示例來查看;同時 JDK 也爲咱們提供了多種不一樣功能的隊列:數組
接下來咱們就對最經常使用的 ArrayBlockingQueue
和 LinkedBlockingQueue
進行分析;源碼分析
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { final Object[] items; // 容器數組 int takeIndex; // 出隊索引 int putIndex; // 入隊索引 int count; // 排隊個數 final ReentrantLock lock; // 全局鎖 private final Condition notEmpty; // 出隊條件隊列 private final Condition notFull; // 入隊條件隊列 ... }
ArrayBlockingQueue
的結構如圖所示:
this
如圖所示,線程
ArrayBlockingQueue
的數組實際上是一個邏輯上的環狀結構,在添加、取出數據的時候,並無像 ArrayList
同樣發生數組元素的移動(固然除了 removeAt(final int removeIndex)
);takeIndex
和 putIndex
指示讀寫位置;下面咱們就讀寫操做,對源碼簡單分析:指針
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) // 當隊列已滿的時候放入 putCondition 條件隊列 notFull.await(); enqueue(e); // 入隊 } finally { lock.unlock(); } }
private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; // 插入隊列 if (++putIndex == items.length) putIndex = 0; // 指針走一圈的時候復位 count++; notEmpty.signal(); // 喚醒 takeCondition 條件隊列中等待的線程 }
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) // 當隊列爲空的時候,放入 takeCondition 條件 notEmpty.await(); return dequeue(); // 出隊 } finally { lock.unlock(); } }
private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; // 取出元素 items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal(); // 取出元素後,隊列空出一位,因此喚醒 putCondition 中的線程 return x; }
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { private final int capacity; // 默認 Integer.MAX_VALUE private final AtomicInteger count = new AtomicInteger(); // 容量 transient Node<E> head; // 頭結點 head.item == null private transient Node<E> last; // 尾節點 last.next == null private final ReentrantLock takeLock = new ReentrantLock(); // 出隊鎖 private final Condition notEmpty = takeLock.newCondition(); // 出隊條件 private final ReentrantLock putLock = new ReentrantLock(); // 入隊鎖 private final Condition notFull = putLock.newCondition(); // 入隊條件 static class Node<E> { E item; Node<E> next; Node(E x) { item = x; } } }
LinkedBlockingQueue
的結構如圖所示:
code
如圖所示,blog
LinkedBlockingQueue
其實就是一個簡單的單向鏈表,其中頭部元素的數據爲空,尾部元素的 next 爲空;下面咱們就讀寫操做,對源碼簡單分析:索引
public boolean offer(E e) { if (e == null) throw new NullPointerException(); final AtomicInteger count = this.count; if (count.get() == capacity) return false; // 若是隊列已滿,直接返回失敗 int c = -1; Node<E> node = new Node<E>(e); // 將數據封裝爲節點 final ReentrantLock putLock = this.putLock; putLock.lock(); try { if (count.get() < capacity) { enqueue(node); // 入隊 c = count.getAndIncrement(); if (c + 1 < capacity) // 若是隊列未滿,則繼續喚醒 putCondition 條件隊列 notFull.signal(); } } finally { putLock.unlock(); } if (c == 0) // 若是添加以前的容量爲0,說明在出隊的時候有競爭,則喚醒 takeCondition signalNotEmpty(); // 由於是兩把鎖,因此在喚醒 takeCondition的時候,還須要獲取 takeLock return c >= 0; }
private void enqueue(Node<E> node) { // assert putLock.isHeldByCurrentThread(); // assert last.next == null; last = last.next = node; // 鏈接節點,並設置尾節點 }
public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { // 若是隊列爲空,則加入 takeCondition 條件隊列 notEmpty.await(); } x = dequeue(); // 出隊 c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); // 若是隊列還有剩餘,則繼續喚醒 takeCondition 條件隊列 } finally { takeLock.unlock(); } if (c == capacity) // 若是取以前隊列是滿的,說明入隊的時候有競爭,則喚醒 putCondition signalNotFull(); // 一樣注意是兩把鎖 return x; }
private E dequeue() { // assert takeLock.isHeldByCurrentThread(); // assert head.item == null; Node<E> h = head; Node<E> first = h.next; h.next = h; // help GC // 將next引用指向本身,則該節點不可達,在下一次GC的時候回收 head = first; E x = first.item; first.item = null; return x; }
根據以上的講解,咱們能夠逐步分析出一些不一樣,以及在不一樣場景隊列的選擇:
因此在這裏並不能簡單的給出詳細的數據,證實哪一個隊列更適合什麼場景,最好是結合實際使用場景分析;