Java多線程進階(三二)—— J.U.C之collections框架:ArrayBlockingQueue

圖片描述

本文首發於一世流雲專欄: https://segmentfault.com/blog...

1、ArrayBlockingQueue簡介

ArrayBlockingQueue是在JDK1.5時,隨着J.U.C包引入的一種阻塞隊列,它實現了BlockingQueue接口,底層基於數組實現:java

clipboard.png

ArrayBlockingQueue是一種有界阻塞隊列,在初始構造的時候須要指定隊列的容量。具備以下特色:segmentfault

  1. 隊列的容量一旦在構造時指定,後續不能改變;
  2. 插入元素時,在隊尾進行;刪除元素時,在隊首進行;
  3. 隊列滿時,調用特定方法插入元素會阻塞線程;隊列空時,刪除元素也會阻塞線程;
  4. 支持公平/非公平策略,默認爲非公平策略。
這裏的公平策略,是指當線程從阻塞到喚醒後,以最初請求的順序(FIFO)來添加或刪除元素;非公平策略指線程被喚醒後,誰先搶佔到鎖,誰就能往隊列中添加/刪除順序,是隨機的。

2、ArrayBlockingQueue原理

構造

ArrayBlockingQueue提供了三種構造器:設計模式

/**
 * 指定隊列初始容量的構造器.
 */
public ArrayBlockingQueue(int capacity) {
    this(capacity, false);
}
/**
 * 指定隊列初始容量和公平/非公平策略的構造器.
 */
public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();

    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);     // 利用獨佔鎖的策略
   notEmpty = lock.newCondition();
    notFull = lock.newCondition();
}
/**
 * 根據已有集合構造隊列
 */
public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) {
    this(capacity, fair);

    final ReentrantLock lock = this.lock;
    lock.lock();    // 這裏加鎖是用於保證items數組的可見性
    try {
        int i = 0;
        try {
            for (E e : c) {
                checkNotNull(e);    // 不能有null元素
                items[i++] = e;
            }
        } catch (ArrayIndexOutOfBoundsException ex) {
            throw new IllegalArgumentException();
        }
        count = i;
        putIndex = (i == capacity) ? 0 : i;     // 若是隊列已滿,則重置puIndex索引爲0
    } finally {
        lock.unlock();
    }
}

核心就是第二種構造器,從構造器也能夠看出,ArrayBlockingQueue在構造時就指定了內部數組的大小,並經過ReentrantLock來保證併發環境下的線程安全。數組

ArrayBlockingQueue的公平/非公平策略其實就是內部ReentrantLock對象的策略,此外構造時還建立了兩個Condition對象。在隊列滿時,插入線程須要在notFull上等待;當隊列空時,刪除線程會在notEmpty上等待:安全

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable {

    /**
     * 內部數組
     */
    final Object[] items;

    /**
     * 下一個待刪除位置的索引: take, poll, peek, remove方法使用
     */
    int takeIndex;

    /**
     * 下一個待插入位置的索引: put, offer, add方法使用
     */
    int putIndex;

    /**
     * 隊列中的元素個數
     */
    int count;

    /**
     * 全局鎖
     */
    final ReentrantLock lock;

    /**
     * 非空條件隊列:當隊列空時,線程在該隊列等待獲取
     */
    private final Condition notEmpty;

    /**
     * 非滿條件隊列:當隊列滿時,線程在該隊列等待插入
     */
    private final Condition notFull;

    //...
}

核心方法

ArrayBlockingQueue會阻塞線程的方法一共4個:put(E e)offer(e, time, unit)take()poll(time, unit),咱們先來看插入元素的方法。多線程

插入元素——put(E e)併發

插入元素的邏輯很簡單,用ReentrantLock來保證線程安全,當隊列滿時,則調用線程會在notFull條件隊列上等待,不然就調用enqueue方法入隊。高併發

/**
 * 在隊尾插入指定元素,若是隊列已滿,則阻塞線程.
 */
public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();   // 加鎖
    try {
        while (count == items.length)   // 隊列已滿。這裏必須用while,防止虛假喚醒
            notFull.await();            // 在notFull隊列上等待
        enqueue(e);                     // 隊列未滿, 直接入隊
    } finally {
        lock.unlock();
    }
}

這裏須要注意一點,隊列已滿的時候,是經過while循環判斷的,這實際上是多線程設計模式中的Guarded Suspension模式性能

while (count == items.length)   // 隊列已滿。這裏必須用while,防止虛假喚醒
    notFull.await();            // 在notFull隊列上等待

之因此這樣作,是防止線程被意外喚醒,不經再次判斷就直接調用enqueue方法。this

enqueue方法:

private void enqueue(E x) {
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)     // 隊列已滿,則重置索引爲0
        putIndex = 0;
    count++;                            // 元素個數+1
    notEmpty.signal();                  // 喚醒一個notEmpty上的等待線程(能夠來隊列取元素了)
}

刪除元素——take()

刪除元素的邏輯和插入元素相似,區別就是:刪除元素時,若是隊列空了,則線程須要在notEmpty條件隊列上等待。

/**
 * 從隊首刪除一個元素, 若是隊列爲空, 則阻塞線程
 */
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)      // 隊列爲空, 則線程在notEmpty條件隊列等待
            notEmpty.await();
        return dequeue();       // 隊列非空,則出隊一個元素
    } finally {
        lock.unlock();
    }
}

隊列非空時,調用dequeue方法出隊一個元素:

private E dequeue() {
    final Object[] items = this.items;
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)    // 若是隊列已空
        takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    notFull.signal();                   // 喚醒一個notFull上的等待線程(能夠插入元素到隊列了)
    return x;
}

環形隊列

從上面的入隊/出隊操做,能夠看出,ArrayBlockingQueue的內部數組實際上是一種環形結構。

假設ArrayBlockingQueue的容量大小爲6,咱們來看下整個入隊過程:

①初始時

clipboard.png

②插入元素「9」

clipboard.png

③插入元素「2」、「10」、「25」、「93」

clipboard.png

④插入元素「90」

注意,此時再插入一個元素「90」,則putIndex變成6,等於隊列容量6,因爲是循環隊列,因此會將tableIndex重置爲0:

clipboard.png

這是隊列已經滿了(count==6),若是再有線程嘗試插入元素,並不會覆蓋原有值,而是被阻塞。


咱們再來看下出隊過程:

①出隊元素「9」

clipboard.png

②出隊元素「2」、「10」、「25」、「93」

clipboard.png

③出隊元素「90」

注意,此時再出隊一個元素「90」,則tabeIndex變成6,等於隊列容量6,因爲是循環隊列,因此會將tableIndex重置爲0:

clipboard.png

這是隊列已經空了(count==0),若是再有線程嘗試出隊元素,則會被阻塞。

3、總結

ArrayBlockingQueue利用了ReentrantLock來保證線程的安全性,針對隊列的修改都須要加全局鎖。在通常的應用場景下已經足夠。對於超高併發的環境,因爲生產者-消息者共用一把鎖,可能出現性能瓶頸。

另外,因爲ArrayBlockingQueue是有界的,且在初始時指定隊列大小,因此若是初始時須要限定消息隊列的大小,則ArrayBlockingQueue 比較合適。後續,咱們會介紹另外一種基於單鏈表實現的阻塞隊列——LinkedBlockingQueue,該隊列的最大特色是使用了「兩把鎖」,以提高吞吐量。

相關文章
相關標籤/搜索