JDK源碼分析(11)之 BlockingQueue 相關

本文將主要結合源碼對 JDK 中的阻塞隊列進行分析,並比較其各自的特色;java

1、BlockingQueue 概述

說到阻塞隊列想到的第一個應用場景可能就是生產者消費者模式了,如圖所示;node

blocking-queue

根據上圖所示,明顯在入隊和出隊的時候,會發生競爭;因此一種很天然的想法就是使用鎖,而在 JDK 中也的確是經過鎖來實現的;因此 BlockingQueue 的源碼其實能夠當成鎖的應用示例來查看;同時 JDK 也爲咱們提供了多種不一樣功能的隊列:數組

  • ArrayBlockingQueue :基於數組的有界隊列;
  • LinkedBlockingQueue :基於鏈表的無界隊列(能夠設置容量);
  • PriorityBlockingQueue :基於二叉堆的無界優先級隊列;
  • DelayQueue :基於 PriorityBlockingQueue 的無界延遲隊列;
  • SynchronousQueue :無容量的阻塞隊列(Executors.newCachedThreadPool() 中使用的隊列);
  • LinkedTransferQueue :基於鏈表的無界隊列;

接下來咱們就對最經常使用的 ArrayBlockingQueueLinkedBlockingQueue 進行分析;源碼分析


2、 ArrayBlockingQueue 源碼分析

1. 結構概述

public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
    final Object[] items;               // 容器數組
    int takeIndex;                      // 出隊索引
    int putIndex;                       // 入隊索引
    int count;                          // 排隊個數
    final ReentrantLock lock;           // 全局鎖
    private final Condition notEmpty;   // 出隊條件隊列
    private final Condition notFull;    // 入隊條件隊列
    ...
}

ArrayBlockingQueue 的結構如圖所示:
this

arrayblockingqueue

如圖所示,線程

  • ArrayBlockingQueue 的數組實際上是一個邏輯上的環狀結構,在添加、取出數據的時候,並無像 ArrayList 同樣發生數組元素的移動(固然除了 removeAt(final int removeIndex));
  • 而且由 takeIndexputIndex 指示讀寫位置;
  • 在讀寫的時候還有兩個讀寫條件隊列;

下面咱們就讀寫操做,對源碼簡單分析:指針


2. 入隊

public void put(E e) throws InterruptedException {
  checkNotNull(e);
  final ReentrantLock lock = this.lock;
  lock.lockInterruptibly();
  try {
    while (count == items.length)  // 當隊列已滿的時候放入 putCondition 條件隊列
      notFull.await();   
    enqueue(e);  // 入隊
  } finally {
    lock.unlock();
  }
}
private void enqueue(E x) {
  // assert lock.getHoldCount() == 1;
  // assert items[putIndex] == null;
  final Object[] items = this.items;
  items[putIndex] = x;  // 插入隊列
  if (++putIndex == items.length) putIndex = 0;  // 指針走一圈的時候復位
  count++;
  notEmpty.signal();  // 喚醒 takeCondition 條件隊列中等待的線程
}


3. 出隊

public E take() throws InterruptedException {
  final ReentrantLock lock = this.lock;
  lock.lockInterruptibly();
  try {
    while (count == 0)  // 當隊列爲空的時候,放入 takeCondition 條件
      notEmpty.await();  
    return dequeue();   // 出隊
  } finally {
    lock.unlock();
  }
}
private E dequeue() {
  // assert lock.getHoldCount() == 1;
  // assert items[takeIndex] != null;
  final Object[] items = this.items;
  @SuppressWarnings("unchecked")
  E x = (E) items[takeIndex];  // 取出元素
  items[takeIndex] = null;
  if (++takeIndex == items.length)
    takeIndex = 0;
  count--;
  if (itrs != null)
    itrs.elementDequeued();
  notFull.signal();  // 取出元素後,隊列空出一位,因此喚醒 putCondition 中的線程
  return x;
}


3、LinkedBlockingQueue 源碼分析

1. 結構概述

public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
  
  private final int capacity; // 默認 Integer.MAX_VALUE
  private final AtomicInteger count = new AtomicInteger(); // 容量
  transient Node<E> head;          // 頭結點 head.item == null
  private transient Node<E> last;  // 尾節點 last.next == null
  private final ReentrantLock takeLock = new ReentrantLock();  // 出隊鎖
  private final Condition notEmpty = takeLock.newCondition();  // 出隊條件
  private final ReentrantLock putLock = new ReentrantLock();   // 入隊鎖
  private final Condition notFull = putLock.newCondition();    // 入隊條件
  
  static class Node<E> {
    E item;
    Node<E> next;
    Node(E x) { item = x; }
  }
}

LinkedBlockingQueue 的結構如圖所示:
code

LinkedBlockingQueue

如圖所示,blog

  • LinkedBlockingQueue 其實就是一個簡單的單向鏈表,其中頭部元素的數據爲空,尾部元素的 next 爲空;
  • 由於讀寫都有競爭,因此在頭部和尾部分別有一把鎖;同時還有對應的兩個條件隊列;

下面咱們就讀寫操做,對源碼簡單分析:索引


2. 入隊

public boolean offer(E e) {
  if (e == null) throw new NullPointerException();
  final AtomicInteger count = this.count;
  if (count.get() == capacity) return false;  // 若是隊列已滿,直接返回失敗
  int c = -1;
  Node<E> node = new Node<E>(e);              // 將數據封裝爲節點
  final ReentrantLock putLock = this.putLock;
  putLock.lock();
  try {
    if (count.get() < capacity) {
      enqueue(node);                          // 入隊
      c = count.getAndIncrement();
      if (c + 1 < capacity)                   // 若是隊列未滿,則繼續喚醒 putCondition 條件隊列
        notFull.signal();
    }
  } finally {
    putLock.unlock();
  }
  if (c == 0)           // 若是添加以前的容量爲0,說明在出隊的時候有競爭,則喚醒 takeCondition
    signalNotEmpty();   // 由於是兩把鎖,因此在喚醒 takeCondition的時候,還須要獲取 takeLock
  return c >= 0;
}
private void enqueue(Node<E> node) {
  // assert putLock.isHeldByCurrentThread();
  // assert last.next == null;
  last = last.next = node;  // 鏈接節點,並設置尾節點
}


3. 出隊

public E take() throws InterruptedException {
  E x;
  int c = -1;
  final AtomicInteger count = this.count;
  final ReentrantLock takeLock = this.takeLock;
  takeLock.lockInterruptibly();
  try {
    while (count.get() == 0) {   // 若是隊列爲空,則加入 takeCondition 條件隊列
      notEmpty.await();
    }
    x = dequeue();               // 出隊
    c = count.getAndDecrement();
    if (c > 1)
      notEmpty.signal();         // 若是隊列還有剩餘,則繼續喚醒 takeCondition 條件隊列
  } finally {
    takeLock.unlock();
  }
  if (c == capacity)             // 若是取以前隊列是滿的,說明入隊的時候有競爭,則喚醒 putCondition
    signalNotFull();             // 一樣注意是兩把鎖
  return x;
}
private E dequeue() {
  // assert takeLock.isHeldByCurrentThread();
  // assert head.item == null;
  Node<E> h = head;
  Node<E> first = h.next;
  h.next = h; // help GC   // 將next引用指向本身,則該節點不可達,在下一次GC的時候回收
  head = first;
  E x = first.item;
  first.item = null;
  return x;
}


4、ABQ、LBQ 對比

根據以上的講解,咱們能夠逐步分析出一些不一樣,以及在不一樣場景隊列的選擇:

  1. 結構不一樣
    • ABQ:基於數組,有界,一把鎖;
    • LBQ:基於鏈表,無界,兩把鎖;
  2. 內存分配
    • ABQ:隊列空間預先初始化,受堆空間影響小,穩定性高;
    • LBQ:隊列空間動態變化,受對空間影響大,穩定性差;
  3. 入隊、出隊效率
    • ABQ:數據直接賦值,移除;隊列空間重複使用,效率高;
    • LBQ:數據須要包裝爲節點;需開闢新空間,效率低;
  4. 競爭方面
    • ABQ:出入隊共用一把鎖,相互影響;競爭嚴重時效率低;
    • LBQ:出入隊分用兩把鎖,互不影響;競爭嚴重時效率影響小;

因此在這裏並不能簡單的給出詳細的數據,證實哪一個隊列更適合什麼場景,最好是結合實際使用場景分析;

相關文章
相關標籤/搜索