本文首發於一世流雲專欄: https://segmentfault.com/blog...
ArrayBlockingQueue
是在JDK1.5時,隨着J.U.C包引入的一種阻塞隊列,它實現了BlockingQueue接口,底層基於數組實現:java
ArrayBlockingQueue是一種有界阻塞隊列,在初始構造的時候須要指定隊列的容量。具備以下特色:segmentfault
這裏的公平策略,是指當線程從阻塞到喚醒後,以最初請求的順序(FIFO)來添加或刪除元素;非公平策略指線程被喚醒後,誰先搶佔到鎖,誰就能往隊列中添加/刪除順序,是隨機的。
ArrayBlockingQueue提供了三種構造器:設計模式
/** * 指定隊列初始容量的構造器. */ public ArrayBlockingQueue(int capacity) { this(capacity, false); }
/** * 指定隊列初始容量和公平/非公平策略的構造器. */ public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); // 利用獨佔鎖的策略 notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
/** * 根據已有集合構造隊列 */ public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { this(capacity, fair); final ReentrantLock lock = this.lock; lock.lock(); // 這裏加鎖是用於保證items數組的可見性 try { int i = 0; try { for (E e : c) { checkNotNull(e); // 不能有null元素 items[i++] = e; } } catch (ArrayIndexOutOfBoundsException ex) { throw new IllegalArgumentException(); } count = i; putIndex = (i == capacity) ? 0 : i; // 若是隊列已滿,則重置puIndex索引爲0 } finally { lock.unlock(); } }
核心就是第二種構造器,從構造器也能夠看出,ArrayBlockingQueue在構造時就指定了內部數組的大小,並經過ReentrantLock來保證併發環境下的線程安全。數組
ArrayBlockingQueue的公平/非公平策略其實就是內部ReentrantLock對象的策略,此外構造時還建立了兩個Condition對象。在隊列滿時,插入線程須要在notFull上等待;當隊列空時,刪除線程會在notEmpty上等待:安全
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { /** * 內部數組 */ final Object[] items; /** * 下一個待刪除位置的索引: take, poll, peek, remove方法使用 */ int takeIndex; /** * 下一個待插入位置的索引: put, offer, add方法使用 */ int putIndex; /** * 隊列中的元素個數 */ int count; /** * 全局鎖 */ final ReentrantLock lock; /** * 非空條件隊列:當隊列空時,線程在該隊列等待獲取 */ private final Condition notEmpty; /** * 非滿條件隊列:當隊列滿時,線程在該隊列等待插入 */ private final Condition notFull; //... }
ArrayBlockingQueue會阻塞線程的方法一共4個:put(E e)
、offer(e, time, unit)
和take()
、poll(time, unit)
,咱們先來看插入元素的方法。多線程
插入元素——put(E e)併發
插入元素的邏輯很簡單,用ReentrantLock來保證線程安全,當隊列滿時,則調用線程會在notFull條件隊列上等待,不然就調用enqueue方法入隊。高併發
/** * 在隊尾插入指定元素,若是隊列已滿,則阻塞線程. */ public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); // 加鎖 try { while (count == items.length) // 隊列已滿。這裏必須用while,防止虛假喚醒 notFull.await(); // 在notFull隊列上等待 enqueue(e); // 隊列未滿, 直接入隊 } finally { lock.unlock(); } }
這裏須要注意一點,隊列已滿的時候,是經過while循環判斷的,這實際上是多線程設計模式中的Guarded Suspension模式:性能
while (count == items.length) // 隊列已滿。這裏必須用while,防止虛假喚醒 notFull.await(); // 在notFull隊列上等待
之因此這樣作,是防止線程被意外喚醒,不經再次判斷就直接調用enqueue方法。this
enqueue方法:
private void enqueue(E x) { final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) // 隊列已滿,則重置索引爲0 putIndex = 0; count++; // 元素個數+1 notEmpty.signal(); // 喚醒一個notEmpty上的等待線程(能夠來隊列取元素了) }
刪除元素——take()
刪除元素的邏輯和插入元素相似,區別就是:刪除元素時,若是隊列空了,則線程須要在notEmpty條件隊列上等待。
/** * 從隊首刪除一個元素, 若是隊列爲空, 則阻塞線程 */ public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) // 隊列爲空, 則線程在notEmpty條件隊列等待 notEmpty.await(); return dequeue(); // 隊列非空,則出隊一個元素 } finally { lock.unlock(); } }
隊列非空時,調用dequeue方法出隊一個元素:
private E dequeue() { final Object[] items = this.items; E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) // 若是隊列已空 takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal(); // 喚醒一個notFull上的等待線程(能夠插入元素到隊列了) return x; }
從上面的入隊/出隊操做,能夠看出,ArrayBlockingQueue的內部數組實際上是一種環形結構。
假設ArrayBlockingQueue的容量大小爲6,咱們來看下整個入隊過程:
①初始時
②插入元素「9」
③插入元素「2」、「10」、「25」、「93」
④插入元素「90」
注意,此時再插入一個元素「90」,則putIndex變成6,等於隊列容量6,因爲是循環隊列,因此會將tableIndex重置爲0:
這是隊列已經滿了(count==6),若是再有線程嘗試插入元素,並不會覆蓋原有值,而是被阻塞。
咱們再來看下出隊過程:
①出隊元素「9」
②出隊元素「2」、「10」、「25」、「93」
③出隊元素「90」
注意,此時再出隊一個元素「90」,則tabeIndex變成6,等於隊列容量6,因爲是循環隊列,因此會將tableIndex重置爲0:
這是隊列已經空了(count==0),若是再有線程嘗試出隊元素,則會被阻塞。
ArrayBlockingQueue利用了ReentrantLock來保證線程的安全性,針對隊列的修改都須要加全局鎖。在通常的應用場景下已經足夠。對於超高併發的環境,因爲生產者-消息者共用一把鎖,可能出現性能瓶頸。
另外,因爲ArrayBlockingQueue是有界的,且在初始時指定隊列大小,因此若是初始時須要限定消息隊列的大小,則ArrayBlockingQueue 比較合適。後續,咱們會介紹另外一種基於單鏈表實現的阻塞隊列——LinkedBlockingQueue,該隊列的最大特色是使用了「兩把鎖」,以提高吞吐量。