聊一聊 JUC 下的 ArrayBlockingQueue

本文聊一聊 JUC 下的另外一個阻塞隊列 ArrayBlockingQueue,先說一下 ArrayBlockingQueue 的特色,從全局上對 ArrayBlockingQueue 有必定的瞭解,ArrayBlockingQueue 有如下幾個特色:java

  • 由數組實現的有界阻塞隊列,容量一旦建立,後續大小沒法修改
  • 遵守先進先出規則,隊頭拿數據,隊尾取數據
  • 跟 LinkedBlockingQueue 同樣,隊列滿時,往隊列中 put 數據會被阻塞,隊列空時,往隊列中拿數據會被阻塞
  • 對數據操做時,共用一把鎖,因此不能同時讀寫操做

ArrayBlockingQueue 跟 LinkedBlockingQueue 同樣,一樣繼承了 AbstractQueue 實現 BlockingQueue 接口,因此在方法上跟 LinkedBlockingQueue 同樣,因此在這裏咱們不把方法列出來了,能夠去查看前面 LinkedBlockingQueue 的文章~數組

除了方法以外,在 ArrayBlockingQueue 中,還有兩個比較重要的參數:函數

/** items index for next take, poll, peek or remove */
// 獲取元素的位置
int takeIndex;
/** items index for next put, offer, or add */
// 新增元素時的數組下標
int putIndex;
複製代碼

因爲 ArrayBlockingQueue 底層採用的是數組,結合上面的兩個參數,ArrayBlockingQueue 的總體結構圖大概以下:學習

ArrayBlockingQueue 的總體結構

ArrayBlockingQueue 有三個構造函數:ui

public ArrayBlockingQueue(int capacity);
// fair 表示是否爲公平鎖 
public ArrayBlockingQueue(int capacity, boolean fair);

public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c);
複製代碼

關於構造函數就很少說了,都大同小異,跟 LinkedBlockingQueue 同樣,一樣拿 put()take() 方法,看看 ArrayBlockingQueue 是如何實現數據的添加和拿取的~this

先從put()方法開始,看看 ArrayBlockingQueue 是如何實現的~spa

public void put(E e) throws InterruptedException {
    Objects.requireNonNull(e);
    // 獲取鎖
    final ReentrantLock lock = this.lock;
    // 設置可重入鎖
    lock.lockInterruptibly();
    try {
        // 當數組隊列存滿時,阻塞等待.....
        while (count == items.length)
            notFull.await();
        // 入隊操做
        enqueue(e);
    } finally {
        // 解鎖
        lock.unlock();
    }
}
// 入隊 
private void enqueue(E e) {
    final Object[] items = this.items;
    // 根據 putIndex 插入到對應的位置便可
    items[putIndex] = e;
    // 設置好下一次插入的位置,若是當前插入的位置是最後一個元素,
    // 那麼下一次插入的位置就是隊頭了
    if (++putIndex == items.length) putIndex = 0;
    count++;
    notEmpty.signal();
}
複製代碼

put() 方法的實現並不複雜,代碼也就 20 行左右,咱們來拆解一下 put 過程:指針

  • 一、先獲取鎖,對操做進行加鎖;
  • 二、判斷隊列是否隊滿,若是隊滿,則掛起等待;
  • 三、根據 putIndex 的值,直接將元素插入到 items 數組中;
  • 四、調整 putIndex 的位置,用於下一次插入使用,若是當前 putIndex 是數組的最後一個位置,則 putIndex 下一次插入的位置是數組的第一個位置,能夠把它看成是循環;
  • 五、解鎖;

put 方總體解決起來不難,跟 LinkedBlockingQueue 同樣,其餘添加方法這裏就不介紹了,大同小異~code

再來看看 ArrayBlockingQueue 是如何實現 take() 方法的,take() 方法主要源碼以下:cdn

public E take() throws InterruptedException {
    // 獲取鎖
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        // 判斷隊列是否爲空,爲空的話掛起等待
        while (count == 0)
            notEmpty.await();
        // 獲取數據
        return dequeue();
    } finally {
        lock.unlock();
    }
}
private E dequeue() {
    
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    // 根據 takeIndex 獲取 items 中的元素
    E e = (E) items[takeIndex];
    // 將 takeIndex 中的數據置爲空
    items[takeIndex] = null;
    // 設置下一次獲取數據的下標,
    if (++takeIndex == items.length) takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    notFull.signal();
    return e;
}
複製代碼

take() 方法跟 put() 方法沒有什麼太大的區別,就是一個反操做~

最後咱們再來關注一下 remove() 方法,這個方法仍是有一些學問的,主要源碼以下:

public boolean remove(Object o) {
    if (o == null) return false;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        if (count > 0) {
            final Object[] items = this.items;
            for (int i = takeIndex, end = putIndex,
                     to = (i < end) ? end : items.length;
                 ; i = 0, to = end) {
                 // 遍歷有值的一段數據
                for (; i < to; i++)
                    if (o.equals(items[i])) {
                        removeAt(i);
                        return true;
                    }
                if (to == end) break;
            }
        }
        return false;
    } finally {
        lock.unlock();
    }
}
// 主要看這個方法
void removeAt(final int removeIndex) {
    final Object[] items = this.items;
    // 若是要刪除的位置正好是下一次 take的位置
    if (removeIndex == takeIndex) {
        // removing front item; just advance
        items[takeIndex] = null;
        if (++takeIndex == items.length) takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
    } else {
        // 若是刪除的位置時 takeIndex 和 putIndex 之間的位置,則被刪除的數據所有往前移動~
        for (int i = removeIndex, putIndex = this.putIndex;;) {
            int pred = i;
            if (++i == items.length) i = 0;
            if (i == putIndex) {
                items[pred] = null;
                this.putIndex = pred;
                break;
            }
            items[pred] = items[i];
        }
        count--;
        if (itrs != null)
            itrs.removedAt(removeIndex);
    }
    notFull.signal();
}
複製代碼

remove 的時候分三種狀況:

  • 第一種:當 removeIndex == takeIndex 時,這種狀況就比較簡單,將該位置的元素刪除後,takeIndex +1 便可

  • 第二種:當 removeIndex + 1 == putIndex 時,直接將 putIndex -1 就好,至關於 putIndex 指針往前移動一格

  • 第三種:當 removeIndex != takeIndex && removeIndex + 1 != putIndex 時,這種狀況就比較複雜了,須要涉及到數據的移動,要將 removeIndex 後面的數據所有往前移動一個位置,putIndex 的位置也要遷移一位,具體的能夠參考源碼

以上就是 ArrayBlockingQueue 的部分源碼解析,但願對你學習或者工做有所幫助,感謝你的閱讀~

最後

歡迎關注公衆號【互聯網平頭哥】,一塊兒學習,一塊兒進步~

相關文章
相關標籤/搜索