JDK容器學習之Queue: ArrayBlockingQueue

基於數組阻塞隊列 ArrayBlockingQueue

前面學習了基於數組的非阻塞雙端隊列ArrayDeque,其內部維護一個數組和指向隊列頭和隊列尾索引的兩個成員變量;本篇則探究下基於數組的阻塞隊列是什麼樣的數據結構,又有什麼特性,相較於ArrayDeque又有什麼異同;而後就是使用場景了java

I. 底層數據結構

先看內部成員變量定義, 和 ArrayDequeue相比,差異不大,一個數組,兩個索引;此外多了一個鎖和兩個斷定條件數組

/** The queued items */
final Object[] items;

/** items index for next take, poll, peek or remove */
int takeIndex;

/** items index for next put, offer, or add */
int putIndex;

/** Number of elements in the queue */
int count;

/** Main lock guarding all access */
final ReentrantLock lock;

/** Condition for waiting takes */
private final Condition notEmpty;

/** Condition for waiting puts */
private final Condition notFull;

注意安全

  1. 底層數據結構依然是數組,註釋上並無說明要求容量是2的n次方(ArrayDequeue有這個強制限定)
  2. 初始化時必須指定隊列的容量(即數組的長度,構造方法中的必選參數)
  3. count直接表示隊列的元素個數(注意DelayQueue是經過遍從來獲取隊列長度,且併發修改會有問題,那麼這個是如何保證併發的?)
  4. 留一個疑問,塞入隊列超過數組容量,是否會出現擴容

數據結構以下圖數據結構

![aryBlockQueueStruct.jpeg](quiver-image-url/5AEE167065A2E49EB3DDBC449BCF991E.jpg =374x165)併發

II. 阻塞實現原理

0. Prefer

分析阻塞原理以前,先經過註釋解釋下ArrayBlockingQueue的使用場景源碼分析

  • 先進先出隊列(隊列頭的是最早進隊的元素;隊列尾的是最後進隊的元素)
  • 有界隊列(即初始化時指定的容量,就是隊列最大的容量,不會出現擴容,容量滿,則阻塞進隊操做;容量空,則阻塞出隊操做)
  • 隊列不支持空元素

1. 進隊

通用的進隊方法以下,是非阻塞的方式,當數組滿時,直接返回false,爲保證併發安全,進隊操做是加鎖實現學習

public boolean offer(E e) {
    // 非空校驗
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lock(); // 進隊加鎖
    try {
        if (count == items.length) 
        // 隊列滿,則直接返回false
            return false;
        else {
            enqueue(e);
            return true;
        }
    } finally {
        lock.unlock();
    }
}

// 直接將元素塞入數組
private void enqueue(E x) {
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    notEmpty.signal();
}

阻塞方式的進隊實現以下ui

public void put(E e) throws InterruptedException {
    checkNotNull(e); // 非空判斷
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly(); // 獲取鎖
    try {
        while (count == items.length) {
        // 一直阻塞,知道隊列非滿時,被喚醒
            notFull.await();
        }
        
        enqueue(e); // 進隊
    } finally {
        lock.unlock();
    }
}

public boolean offer(E e, long timeout, TimeUnit unit)
    throws InterruptedException {

    checkNotNull(e);
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length) {
        // 阻塞,知道隊列不滿
        // 或者超時時間已過,返回false
            if (nanos <= 0)
                return false;
            nanos = notFull.awaitNanos(nanos);
        }
        enqueue(e);
        return true;
    } finally {
        lock.unlock();
    }
}

源碼分析,阻塞入隊的邏輯比較清晰,小結一下this

  • 非阻塞調用方式 offer(e)
  • 阻塞調用方式 put(e)offer(e, timeout, unit)
  • 阻塞調用時,喚醒條件爲超時或者隊列非滿(所以,要求在出隊時,要發起一個喚醒操做)
  • 進隊成功以後,執行notEmpty.signal()喚起被阻塞的出隊線程

2. 出隊

非阻塞出隊方法以下url

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return (count == 0) ? null : dequeue();
    } finally {
        lock.unlock();
    }
}

private E dequeue() {
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;
    final Object[] items = this.items;
    // 直接將隊頭扔出去,並置空數組中該位置
    // 並移動隊列頭到下一位
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    if (itrs != null)
    // 保障在遍歷時,能夠進行出隊操做
        itrs.elementDequeued();
    notFull.signal();
    return x;
}

阻塞的實現,邏輯比較清晰,首先競爭鎖,判斷是否爲空,是阻塞直到非空;不然彈出隊列頭元素

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}

public E poll(long timeout, TimeUnit unit) 
    throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0) {
            if (nanos <= 0)
                return null;
            nanos = notEmpty.awaitNanos(nanos);
        }
        return dequeue();
    } finally {
        lock.unlock();
    }
}

小結

  • 非阻塞出隊調用 poll()方法
  • 阻塞出隊調用 take()poll(long,TimeUnit)方法
  • 出隊以後,會喚醒因隊列滿被阻塞的入隊線程
  • 出隊count計數減1(由於每次只能一個線程出隊,因此count的值能夠保證併發準確性)
  • 支持在遍歷隊列時,出隊

III. 應用場景及case

建立線程池時,一般會用到 ArrayBlockingQueue或者LinkedBlockingQueue,如

new ThreadPoolExecutor(1, 1,
          0L, TimeUnit.MILLISECONDS,
          new ArrayBlockingQueue<Runnable>(2));

延遲隊列也是併發安全,ArrayBlockingQueue 相比較 DelayQueue應用場景的區別主要在

  • 有界和無界(ArrayBlockingQueue不會擴容,而DelayQueue會出現擴容)
  • 前者隊列非空就能夠出隊;後者則須要隊列頭生效(getDelay()返回值小於0)
  • 前者隊列滿,則沒法入隊;後者一直均可以入隊
  • 前者FIFO;後者優先級隊列,延遲時間小的優先出隊

IV. 小結

基於數組阻塞隊列ArrayBlockingQueue

  • 有界數組阻塞隊列,FIFO,先進先出,初始化時指定數組長度
  • 阻塞出隊方法: take()poll(long, TimeUnit)
  • 非阻塞出隊方法: poll()
  • 阻塞入隊方法: offer(E, long, TimeUnit)put(E)
  • 非阻塞入隊方法: offer(E) add(E)
  • 隊列爲空,出隊會被阻塞;隊列滿時,進隊會被阻塞
  • 根據內部計數count,能夠直接獲取隊列長度(count的併發安全是由進隊出隊上鎖,保證同一時刻只有一個線程修改count值保證的)

更多java分享

https://static.oschina.net/uploads/img/201710/13203703_6IVg.jpg

相關文章
相關標籤/搜索