ArrayBlockingQueue是一個由數組支持的有界阻塞隊列。此隊列按 FIFO(先進先出)原則對元素進行排序。隊列的頭部是在隊列中存在時間最長的元素。隊列的尾部是在隊列中存在時間最短的元素。新元素插入到隊列的尾部,隊列獲取操做則是從隊列頭部開始得到元素。 數組
ArrayBlockingQueue繼承自 AbstractQueue並實現 BlockingQueue接口。緩存
ArrayBlockingQueue是一個典型的「有界緩存區」,固定大小的數組在其中保持生產者插入的元素和使用者提取的元素。一旦建立了這樣的緩存區,就不能再增長其容量。試圖向已滿隊列中放入元素會致使操做受阻塞,試圖從空隊列中提取元素將致使相似阻塞。 多線程
ArrayBlockingQueue支持對等待的生產者線程和使用者線程進行排序的可選公平策略。默認狀況下,不保證是這種排序。然而,經過將公平性 (fairness) 設置爲 true 而構造的隊列容許按照 FIFO 順序訪問線程。公平性一般會下降吞吐量,但也減小了可變性和避免了「不平衡性」。 公平性經過建立 ArrayBlockingQueue實例時指定。學習
1.成員變量this
- private final E[] items;
- private int takeIndex;
- private int putIndex;
- private int count;
- private final ReentrantLock lock;
- private final Condition notEmpty;
- private final Condition notFull;
以前咱們已經學習了鎖相關的知識,因此幾個成員變量不難理解。spa
1)其中E[] items;是數組式的隊列實現;線程
2)takeIndex 用於記錄 take操做的次數;指針
3)putIndex 用於記錄 put操做的次數;blog
4)count 用於記錄隊列中元素數目;排序
5)ReentrantLock lock 是用於控制訪問的主鎖;
6)Condition notEmpty 獲取操做時的條;
7)Condition notFull 插入操做時的條件;
2.構造方法
ArrayBlockingQueue的構造方法有3個。
1)最簡單的構造方法:
- public ArrayBlockingQueue(int capacity) {
- this(capacity, false);
- }
此種構造方法最爲簡單也最爲經常使用,在建立 ArrayBlockingQueue實例時只需指定其大小便可。
- BlockingQueue<Object> q = new ArrayBlockingQueue<Object>(10);
2)增長訪問策略的構造方法,除了指定隊列大小外還可指定隊列的訪問策略:
- public ArrayBlockingQueue(int capacity, boolean fair) {
- if (capacity <= 0)
- throw new IllegalArgumentException();
- this.items = (E[]) new Object[capacity];
- lock = new ReentrantLock(fair);
- notEmpty = lock.newCondition();
- notFull = lock.newCondition();
- }
fair若是爲 true,則按照 FIFO 順序訪問插入或移除時受阻塞線程的隊列;若是爲 false,則訪問順序是不肯定的。
構造方法中首先初始化了 items數組,而後根據fair建立 ReentrantLock實例,最後返回 notEmpty與 notFull兩個 Condition實例,分別用於等待中的獲取與添加操做。
3)帶初始元素的構造方法:
- public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) {
- this(capacity, fair);
- if (capacity < c.size())
- throw new IllegalArgumentException();
-
- for (Iterator<? extends E> it = c.iterator(); it.hasNext();)
- add(it.next());
- }
除了能夠指定容量和訪問策略外,還能夠包含給定 collection 的元素,並以 collection 迭代器的遍歷順序添加元素。
代碼也能夠觀察到,在實例化隊列以後還使用了add方法將 collection 中的元素按原有順序添加到實例中。
3.添加元素
1)add方法
ArrayBlockingQueue的add方法調用的是父類方法,而父類 add方法則調用的是 offer方法,如下是add方法的源代碼:
- public boolean add(E e) {
-
- return super.add(e);
- }
2)offer方法
offer方法將指定的元素插入到此隊列的尾部(若是當即可行且不會超過該隊列的容量),在成功時返回 true,若是此隊列已滿,則返回 false。此方法一般要優於 add(E) 方法,後者可能沒法插入元素,而只是拋出一個異常。
- public boolean offer(E e) {
-
- if (e == null)
- throw new NullPointerException();
- final ReentrantLock lock = this.lock;
-
- lock.lock();
- try {
-
- if (count == items.length)
- return false;
- else {
-
- insert(e);
- return true;
- }
- } finally {
-
- lock.unlock();
- }
- }
由於 add方法在隊列已滿時會拋出異常,因此 offer方法通常優於 add方法使用。
首先,判斷要添加的元素是否爲 null,若是爲null則拋出空指針異常。
接着,建立一個 ReentrantLock實例,ReentrantLock是可重入鎖實現。更詳細介紹參考http://286.iteye.com/blog/2296191
而後,獲取鎖。
最後,判斷隊列是否已滿,若是已滿則返回 false;若是未滿則調用insert方法插入元素,返回true。
全部操做完成後釋放鎖。
offer方法的處理流程能夠參照如下流程圖:
從代碼中就能夠看到,offer方法利用了ReentrantLock來實現隊列阻塞的功能,因此多線程操做相同隊列時會排隊等待。
offer的另外一個重載方法是 offer(E e, long timeout, TimeUnit unit),此重載方法將指定的元素插入此隊列的尾部,若是該隊列已滿,則在到達指定的等待時間以前等待可用的空間。其源代碼爲:
- public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
-
- if (e == null)
- throw new NullPointerException();
- long nanos = unit.toNanos(timeout);
- final ReentrantLock lock = this.lock;
- lock.lockInterruptibly();
- try {
- for (;;) {
- if (count != items.length) {
- insert(e);
- return true;
- }
- if (nanos <= 0)
- return false;
- try {
- nanos = notFull.awaitNanos(nanos);
- } catch (InterruptedException ie) {
- notFull.signal();
- throw ie;
- }
- }
- } finally {
- lock.unlock();
- }
- }
與普通offer方法不一樣之處在於:offer(E e, long timeout, TimeUnit unit)方法利用循環在指定時間內不斷去嘗試添加元素,若是成功則返回true,若是指定時間已到則退出返回 false。
3)insert方法
insert方法在當前位置(putIndex)插入元素。
- private void insert(E x) {
-
- items[putIndex] = x;
-
- putIndex = inc(putIndex);
-
- ++count;
-
- notEmpty.signal();
- }
由於 ArrayBlockingQueue內部隊列實現爲數組items,而 putIndex則記錄了隊列中已添加元素的位置,因此新添加的元素就直接被添加到數組的指定位置。隨後修改 putIndex值,若是隊列未滿則+1,若是已滿則從0從新開始。最後喚醒 notEmpty中的一個線程。
4)put方法
將指定的元素插入此隊列的尾部,若是該隊列已滿,則等待可用的空間。
如下是put方法的源代碼:
- public void put(E e) throws InterruptedException {
-
- if (e == null)
- throw new NullPointerException();
- final E[] items = this.items;
- final ReentrantLock lock = this.lock;
-
- lock.lockInterruptibly();
- try {
- try {
-
- while (count == items.length)
-
- notFull.await();
- } catch (InterruptedException ie) {
- notFull.signal();
- throw ie;
- }
-
- insert(e);
- } finally {
-
- lock.unlock();
- }
- }
put方法與其餘方法相似,其中會循環判斷隊列是否已滿,若是已滿則阻塞 notFull,若是未滿則調用 insert方法添加元素。由於其中運用了循環判斷隊列是否有位置添加新元素,若是隊列已滿則產生阻塞等待,直至能夠添加元素爲止。
將本文開始時的例子修改一下,去掉消費者,只留下生產者,這樣當隊列滿了以後沒有消費者去消費產品,生產者就不會再向隊列中插入了:
- class Producer implements Runnable {
- private final ArrayBlockingQueue<Integer> queue;
- private int i;
-
- Producer(ArrayBlockingQueue<Integer> q) {
- queue = q;
- }
-
- public void run() {
- try {
- while (true) {
- int p=produce();
- queue.put(p);
- System.out.println("插入成功:"+p);
- }
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
-
- int produce() {
- return i++;
- }
- }
-
- public class Runner {
- public static void main(String[] args) {
- ArrayBlockingQueue<Integer> q = new ArrayBlockingQueue<Integer>(10);
- Producer p = new Producer(q);
- new Thread(p).start();
- }
- }
- 插入成功:0
- 插入成功:1
- 插入成功:2
- 插入成功:3
- 插入成功:4
- 插入成功:5
- 插入成功:6
- 插入成功:7
- 插入成功:8
- 插入成功:9
此時程序並不會退出,而是阻塞在那裏等待隊列有位置插入。
4.獲取元素
1)peek方法
獲取但不移除此隊列的頭;若是此隊列爲空,則返回 null。如下是peek方法的源代碼:
- public E peek() {
- final ReentrantLock lock = this.lock;
-
- lock.lock();
- try {
-
- return (count == 0) ? null : items[takeIndex];
- } finally {
-
- lock.unlock();
- }
- }
peek代碼比較簡單,首先判斷隊列是否有元素,即count==0,若是爲空則返回null,非空則返回相應元素。
2)poll方法
獲取並移除此隊列的頭,若是此隊列爲空,則返回 null。如下是poll方法的源代碼:
- public E poll() {
- final ReentrantLock lock = this.lock;
-
- lock.lock();
- try {
-
- if (count == 0)
- return null;
-
- E x = extract();
- return x;
- } finally {
-
- lock.unlock();
- }
- }
poll方法調用的是 extract()方法來獲取頭元素。
3)extract方法
如下是extract()方法的源代碼:
- private E extract() {
- final E[] items = this.items;
-
- E x = items[takeIndex];
-
- items[takeIndex] = null;
-
- takeIndex = inc(takeIndex);
- --count;
-
- notFull.signal();
- return x;
- }
4)take方法
獲取並移除此隊列的頭部,在元素變得可用以前一直等待(若是有必要)。
- public E take() throws InterruptedException {
- final ReentrantLock lock = this.lock;
-
- lock.lockInterruptibly();
- try {
- try {
-
- while (count == 0)
- notEmpty.await();
- } catch (InterruptedException ie) {
- notEmpty.signal();
- throw ie;
- }
-
- E x = extract();
- return x;
- } finally {
-
- lock.unlock();
- }
- }
與put方法相似,take方法也是利用循環阻塞的方式來獲取元素,若是沒有元素則等待,直至獲取元素爲止。
與put方法的例子相似,生產者只生產5個產品,消費完這5個產品後,消費者就不得不等待隊列有元素可取:
- class Producer implements Runnable {
- private final ArrayBlockingQueue<Integer> queue;
- private int i;
-
- Producer(ArrayBlockingQueue<Integer> q) {
- queue = q;
- }
-
- public void run() {
- try {
- for (int i = 0; i < 5; i++) {
- int p = produce();
- queue.put(p);
- System.out.println("插入成功:" + p);
- }
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
-
- int produce() {
- return i++;
- }
- }
-
- class Consumer implements Runnable {
- private final ArrayBlockingQueue<Integer> queue;
-
- Consumer(ArrayBlockingQueue<Integer> q) {
- queue = q;
- }
-
- public void run() {
- try {
- while (true) {
- int p = queue.take();
- System.out.println("獲取成功:" + p);
- }
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
-
- void consume(Object x) {
- System.out.println("消費:" + x);
- }
- }
-
- public class Runner {
- public static void main(String[] args) {
- ArrayBlockingQueue<Integer> q = new ArrayBlockingQueue<Integer>(10);
- Producer p = new Producer(q);
- Consumer c1 = new Consumer(q);
- Consumer c2 = new Consumer(q);
- new Thread(p).start();
- new Thread(c1).start();
- new Thread(c2).start();
- }
- }
- 插入成功:0
- 插入成功:1
- 插入成功:2
- 插入成功:3
- 插入成功:4
- 獲取成功:0
- 獲取成功:1
- 獲取成功:2
- 獲取成功:3
- 獲取成功:4
以後程序也是會阻塞在那裏。
5.移除元素
1)remove方法
remove方法今後隊列中移除指定元素的單個實例(若是存在)。更確切地講,若是此隊列包含一個或多個知足 o.equals(e) 的元素 e,則移除該元素。若是此隊列包含指定的元素(或者此隊列因爲調用而發生更改),則返回 true。
- public boolean remove(Object o) {
-
- if (o == null)
- return false;
- final E[] items = this.items;
- final ReentrantLock lock = this.lock;
-
- lock.lock();
- try {
- int i = takeIndex;
- int k = 0;
- for (;;) {
-
- if (k++ >= count)
- return false;
-
- if (o.equals(items[i])) {
-
- removeAt(i);
- return true;
- }
-
- i = inc(i);
- }
-
- } finally {
-
- lock.unlock();
- }
- }
remove方法其中利用循環來不斷判斷該元素的位置,若是找到則調用 removeAt方法移除指定位置的數組元素。
如下是一個移除的小例子:
- ArrayBlockingQueue<Integer> q = new ArrayBlockingQueue<Integer>(10);
- for (int i = 0; i < 10; i++) {
- q.add(i);
- }
- q.remove(1);
- q.remove(3);
- q.remove(5);
- q.remove(7);
- q.remove(9);
- q.remove(9);
- for (Integer i : q) {
- System.out.println(i);
- }
- 0
- 2
- 4
- 6
- 8
從結果能夠看出,從隊列中正確的移除了咱們指定的元素,在最後即便指定已經不存在的元素值,remove方法也以後返回false。
2)drainTo方法
drainTo方法用於移除此隊列中全部可用的元素,並將它們添加到給定 collection 中。此操做可能比反覆輪詢此隊列更有效。在試圖向 collection c 中添加元素沒有成功時,可能致使在拋出相關異常時,元素會同時在兩個 collection 中出現,或者在其中一個 collection 中出現,也可能在兩個 collection 中都不出現。若是試圖將一個隊列放入自身隊列中,則會致使 IllegalArgumentException 異常。此外,若是正在進行此操做時修改指定的 collection,則此操做行爲是不肯定的。
如下是 drainTo方法的源代碼:
- public int drainTo(Collection<? super E> c) {
-
- if (c == null)
- throw new NullPointerException();
-
- if (c == this)
- throw new IllegalArgumentException();
- final E[] items = this.items;
- final ReentrantLock lock = this.lock;
-
- lock.lock();
- try {
-
- int i = takeIndex;
- int n = 0;
-
- int max = count;
-
- while (n < max) {
- c.add(items[i]);
- items[i] = null;
- i = inc(i);
- ++n;
- }
-
- if (n > 0) {
- count = 0;
- putIndex = 0;
- takeIndex = 0;
- notFull.signalAll();
- }
-
- return n;
- } finally {
-
- lock.unlock();
- }
- }
代碼中並無添加失敗的相關處理,因此結果如上所說並不必定完整。如下是相關實例:
- ArrayBlockingQueue<Integer> q = new ArrayBlockingQueue<Integer>(10);
- List<Integer> list = new ArrayList<Integer>();
- for (int i = 0; i < 10; i++) {
- q.add(i);
- list.add(i + 10);
- }
- q.drainTo(list);
- for (Integer i : list) {
- System.out.println(i);
- }
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 0
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
須要值得注意的是 drainTo方法是將隊列中的元素按順序添加到指定 Collection中,別弄反了。
drainTo(Collection<? super E> c, int maxElements)用法相似,只不過指定了移除元素數。
3)clear方法
移除此隊列中的全部元素。在此調用返回以後,隊列將爲空。
- public void clear() {
- final E[] items = this.items;
- final ReentrantLock lock = this.lock;
-
- lock.lock();
- try {
- int i = takeIndex;
- int k = count;
-
- while (k-- > 0) {
- items[i] = null;
- i = inc(i);
- }
-
- count = 0;
- putIndex = 0;
- takeIndex = 0;
-
- notFull.signalAll();
- } finally {
-
- lock.unlock();
- }
- }
clear方法比較簡單,就是利用循環清除數組中的元素,而後將相關參數置爲初始值。
ArrayBlockingQueue還有一些其餘方法,這些方法相對簡單這裏就不細說了。