BlockingQueue介紹與經常使用方法
BlockingQueue是一個阻塞隊列。在高併發場景是用得很是多的,在線程池中。若是運行線程數目大於核心線程數目時,也會嘗試把新加入的線程放到一個BlockingQueue中去。隊列的特性就是先進先出很容易理解,在Java裏頭它的實現類主要有下圖的幾種,其中最經常使用到的是ArrayBlockingQueue、LinkedBlockingQueue及SynchronousQueue這三種。html
![](http://static.javashuo.com/static/loading.gif)
它主要的方法有java
![](http://static.javashuo.com/static/loading.gif)
BlockingQueue的核心方法:
一、放入數據android
(1) add(object)數組
隊列沒滿的話,放入成功。不然拋出異常。緩存
(2)offer(object):安全
表示若是可能的話,將object加到BlockingQueue裏,即若是BlockingQueue能夠容納,則返回true,不然返回false.(本方法不阻塞當前執行方法的線程)
(3)offer(E o, long timeout, TimeUnit unit)併發
能夠設定等待的時間,若是在指定的時間內,還不能往隊列中加入BlockingQueue,則返回失敗。
(4)put(object)app
把object加到BlockingQueue裏,若是BlockQueue沒有空間,則調用此方法的線程阻塞。直到BlockingQueue裏面有空間再繼續.
二、獲取數據
(1)poll(time)ide
取走BlockingQueue裏排在首位的對象,若不能當即取出,則能夠等time參數規定的時間,取不到時返回null;
(2)poll(long timeout, TimeUnit unit)函數
從BlockingQueue取出一個隊首的對象,若是在指定時間內,隊列一旦有數據可取,則當即返回隊列中的數據。不然知道時間超時尚未數據可取,返回失敗。
(3)take()
取走BlockingQueue裏排在首位的對象,若BlockingQueue爲空,阻斷進入等待狀態直到BlockingQueue有新的數據被加入;
(4)drainTo()
一次性從BlockingQueue獲取全部可用的數據對象(還能夠指定獲取數據的個數),經過該方法,能夠提高獲取數據效率;不須要屢次分批加鎖或釋放鎖。
ArrayBlockingQueue
一個由數組支持的有界阻塞隊列。它的本質是一個基於數組的BlockingQueue的實現。
它的容納大小是固定的。此隊列按 FIFO(先進先出)原則對元素進行排序。
隊列的頭部 是在隊列中存在時間最長的元素。隊列的尾部 是在隊列中存在時間最短的元素。
新元素插入到隊列的尾部,隊列檢索操做則是從隊列頭部開始得到元素。
這是一個典型的「有界緩存區」,固定大小的數組在其中保持生產者插入的元素和使用者提取的元素。
一旦建立了這樣的緩存區,就不能再增長其容量。
試圖向已滿隊列中放入元素會致使放入操做受阻塞,直到BlockingQueue裏有新的喚空間纔會被醒繼續操做;
試圖從空隊列中檢索元素將致使相似阻塞,直到BlocingkQueue進了新貨纔會被喚醒。
此類支持對等待的生產者線程和使用者線程進行排序的可選公平策略。
默認狀況下,不保證是這種排序。然而,經過在構造函數將公平性 (fairness) 設置爲 true 而構造的隊列容許按照 FIFO 順序訪問線程。
公平性一般會下降吞吐量,但也減小了可變性和避免了「不平衡性」。
此類及其迭代器實現了 Collection 和 Iterator 接口的全部可選 方法。
注意1:它是有界阻塞隊列。它是數組實現的,是一個典型的「有界緩存區」。數組大小在構造函數指定,並且今後之後不可改變。
注意2:是它線程安全的,是阻塞的,具體參考BlockingQueue的「注意4」。
注意3:不接受 null 元素
注意4:公平性 (fairness)能夠在構造函數中指定。
Public Constructors |
|
ArrayBlockingQueue(int capacity) Creates an ArrayBlockingQueue with the given (fixed) capacity and default access policy. |
|
ArrayBlockingQueue(int capacity, boolean fair) Creates an ArrayBlockingQueue with the given (fixed) capacity and the specified access policy. |
|
ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) Creates an ArrayBlockingQueue with the given (fixed) capacity, the specified access policy and initially containing the elements of the given collection, added in traversal order of the collection's iterator. |
若是爲true,則按照 FIFO 順序訪問插入或移除時受阻塞線程的隊列;若是爲 false,則訪問順序是不肯定的。
注意5:它實現了BlockingQueue接口。
注意6:此類及其迭代器實現了 Collection 和 Iterator 接口的全部可選 方法。
注意7:其容量在構造函數中指定。容量不能夠自動擴展,也沒提供手動擴展的接口。
注意8:在JDK5/6中,LinkedBlockingQueue和ArrayBlocingQueue等對象的poll(long timeout, TimeUnit unit)存在內存泄露
Leak的對象是AbstractQueuedSynchronizer.Node,
據稱JDK5會在Update12裏Fix,JDK6會在Update2裏Fix。
源碼分析:
一個基本數組的阻塞隊列。能夠設置列隊的大小。
它的基本原理實際仍是數組,只不過存、取、刪時都要作隊列是否滿或空的判斷。而後加鎖訪問。
[java] view plain copy
![在CODE上查看代碼片](http://static.javashuo.com/static/loading.gif)
![派生到個人代碼片](http://static.javashuo.com/static/loading.gif)
- package java.util.concurrent;
- import java.util.concurrent.locks.Condition;
- import java.util.concurrent.locks.ReentrantLock;
- import java.util.AbstractQueue;
- import java.util.Collection;
- import java.util.Iterator;
- import java.util.NoSuchElementException;
- import java.lang.ref.WeakReference;
- import java.util.Spliterators;
- import java.util.Spliterator;
-
-
- public class ArrayBlockingQueue<E> extends AbstractQueue<E>
- implements BlockingQueue<E>, java.io.Serializable {
-
- private static final long serialVersionUID = -817911632652898426L;
-
- /** 真正存入數據的數組*/
- final Object[] items;
-
- /** take, poll, peek or remove的下一個索引 */
- int takeIndex;
-
- /** put, offer, or add的下一個索引 */
- int putIndex;
-
- /**隊列中元素個數*/
- int count;
-
-
- /**可重入鎖 */
- final ReentrantLock lock;
-
- /** 隊列不爲空的條件 */
- private final Condition notEmpty;
-
- /** 隊列未滿的條件 */
- private final Condition notFull;
-
- transient Itrs itrs = null;
-
-
- /**
- *當前元素個數-1
- */
- final int dec(int i) {
- return ((i == 0) ? items.length : i) - 1;
- }
-
- /**
- * 返回對應索引上的元素
- */
- @SuppressWarnings("unchecked")
- final E itemAt(int i) {
- return (E) items[i];
- }
-
- /**
- * 非空檢查
- *
- * @param v the element
- */
- private static void checkNotNull(Object v) {
- if (v == null)
- throw new NullPointerException();
- }
-
- /**
- * 元素放入隊列,注意調用這個方法時都要先加鎖
- *
- */
- private void enqueue(E x) {
- final Object[] items = this.items;
- items[putIndex] = x;
- if (++putIndex == items.length)
- putIndex = 0;
- count++;//當前擁有元素個數加1
- notEmpty.signal();//有一個元素加入成功,那確定隊列不爲空
- }
-
- /**
- * 元素出隊,注意調用這個方法時都要先加鎖
- *
- */
- private E dequeue() {
- final Object[] items = this.items;
- @SuppressWarnings("unchecked")
- E x = (E) items[takeIndex];
- items[takeIndex] = null;
- if (++takeIndex == items.length)
- takeIndex = 0;
- count--;/當前擁有元素個數減1
- if (itrs != null)
- itrs.elementDequeued();
- notFull.signal();//有一個元素取出成功,那確定隊列不滿
- return x;
- }
-
- /**
- * 指定刪除索引上的元素
- *
- */
- void removeAt(final int removeIndex) {
- final Object[] items = this.items;
- if (removeIndex == takeIndex) {
- items[takeIndex] = null;
- if (++takeIndex == items.length)
- takeIndex = 0;
- count--;
- if (itrs != null)
- itrs.elementDequeued();
- } else {
- final int putIndex = this.putIndex;
- for (int i = removeIndex;;) {
- int next = i + 1;
- if (next == items.length)
- next = 0;
- if (next != putIndex) {
- items[i] = items[next];
- i = next;
- } else {
- items[i] = null;
- this.putIndex = i;
- break;
- }
- }
- count--;
- if (itrs != null)
- itrs.removedAt(removeIndex);
- }
- notFull.signal();//有一個元素刪除成功,那確定隊列不滿
- }
-
- /**
- *
- * 構造函數,設置隊列的初始容量
- */
- public ArrayBlockingQueue(int capacity) {
- this(capacity, false);
- }
-
- /**
- * 構造函數。capacity設置數組大小 ,fair設置是否爲公平鎖
- * capacity and the specified access policy.
- */
- public ArrayBlockingQueue(int capacity, boolean fair) {
- if (capacity <= 0)
- throw new IllegalArgumentException();
- this.items = new Object[capacity];
- lock = new ReentrantLock(fair);//是否爲公平鎖,若是是的話,那麼先到的線程先得到鎖對象。
- //不然,由操做系統調度由哪一個線程得到鎖,通常爲false,性能會比較高
- notEmpty = lock.newCondition();
- notFull = lock.newCondition();
- }
-
- /**
- *構造函數,帶有初始內容的隊列
- */
- public ArrayBlockingQueue(int capacity, boolean fair,
- Collection<? extends E> c) {
- this(capacity, fair);
-
- final ReentrantLock lock = this.lock;
- lock.lock(); //要給數組設置內容,先上鎖
- try {
- int i = 0;
- try {
- for (E e : c) {
- checkNotNull(e);
- items[i++] = e;//依次拷貝內容
- }
- } catch (ArrayIndexOutOfBoundsException ex) {
- throw new IllegalArgumentException();
- }
- count = i;
- putIndex = (i == capacity) ? 0 : i;//若是putIndex大於數組大小 ,那麼從0從新開始
- } finally {
- lock.unlock();//最後必定要釋放鎖
- }
- }
-
- /**
- * 添加一個元素,其實super.add裏面調用了offer方法
- */
- public boolean add(E e) {
- return super.add(e);
- }
-
- /**
- *加入成功返回true,不然返回false
- *
- */
- public boolean offer(E e) {
- checkNotNull(e);
- final ReentrantLock lock = this.lock;
- lock.lock();//上鎖
- try {
- if (count == items.length) //超過數組的容量
- return false;
- else {
- enqueue(e); //放入元素
- return true;
- }
- } finally {
- lock.unlock();
- }
- }
-
- /**
- * 若是隊列已滿的話,就會等待
- */
- public void put(E e) throws InterruptedException {
- checkNotNull(e);
- final ReentrantLock lock = this.lock;
- lock.lockInterruptibly();//和lock()方法的區別是讓它在阻塞時也可拋出異常跳出
- try {
- while (count == items.length)
- notFull.await(); //這裏就是阻塞了,要注意。若是運行到這裏,那麼它會釋放上面的鎖,一直等到notify
- enqueue(e);
- } finally {
- lock.unlock();
- }
- }
-
- /**
- * 帶有超時時間的插入方法,unit表示是按秒、分、時哪種
- */
- public boolean offer(E e, long timeout, TimeUnit unit)
- throws InterruptedException {
-
- checkNotNull(e);
- long nanos = unit.toNanos(timeout);
- final ReentrantLock lock = this.lock;
- lock.lockInterruptibly();
- try {
- while (count == items.length) {
- if (nanos <= 0)
- return false;
- nanos = notFull.awaitNanos(nanos);//帶有超時等待的阻塞方法
- }
- enqueue(e);//入隊
- return true;
- } finally {
- lock.unlock();
- }
- }
-
- //實現的方法,若是當前隊列爲空,返回null
- public E poll() {
- final ReentrantLock lock = this.lock;
- lock.lock();
- try {
- return (count == 0) ? null : dequeue();
- } finally {
- lock.unlock();
- }
- }
- //實現的方法,若是當前隊列爲空,一直阻塞
- public E take() throws InterruptedException {
- final ReentrantLock lock = this.lock;
- lock.lockInterruptibly();
- try {
- while (count == 0)
- notEmpty.await();//隊列爲空,阻塞方法
- return dequeue();
- } finally {
- lock.unlock();
- }
- }
- //帶有超時時間的取元素方法,不然返回Null
- public E poll(long timeout, TimeUnit unit) throws InterruptedException {
- long nanos = unit.toNanos(timeout);
- final ReentrantLock lock = this.lock;
- lock.lockInterruptibly();
- try {
- while (count == 0) {
- if (nanos <= 0)
- return null;
- nanos = notEmpty.awaitNanos(nanos);//超時等待
- }
- return dequeue();//取得元素
- } finally {
- lock.unlock();
- }
- }
- //只是看一個隊列最前面的元素,取出是不刪除隊列中的原來元素。隊列爲空時返回null
- public E peek() {
- final ReentrantLock lock = this.lock;
- lock.lock();
- try {
- return itemAt(takeIndex); // 隊列爲空時返回null
- } finally {
- lock.unlock();
- }
- }
-
- /**
- * 返回隊列當前元素個數
- *
- */
- public int size() {
- final ReentrantLock lock = this.lock;
- lock.lock();
- try {
- return count;
- } finally {
- lock.unlock();
- }
- }
-
- /**
- * 返回當前隊列再放入多少個元素就滿隊
- */
- public int remainingCapacity() {
- final ReentrantLock lock = this.lock;
- lock.lock();
- try {
- return items.length - count;
- } finally {
- lock.unlock();
- }
- }
-
- /**
- * 從隊列中刪除一個元素的方法。刪除成功返回true,不然返回false
- */
- public boolean remove(Object o) {
- if (o == null) return false;
- final Object[] items = this.items;
- final ReentrantLock lock = this.lock;
- lock.lock();
- try {
- if (count > 0) {
- final int putIndex = this.putIndex;
- int i = takeIndex;
- do {
- if (o.equals(items[i])) {
- removeAt(i); //真正刪除的方法
- return true;
- }
- if (++i == items.length)
- i = 0;
- } while (i != putIndex);//一直不斷的循環取出來作判斷
- }
- return false;
- } finally {
- lock.unlock();
- }
- }
-
- /**
- * 是否包含一個元素
- */
- public boolean contains(Object o) {
- if (o == null) return false;
- final Object[] items = this.items;
- final ReentrantLock lock = this.lock;
- lock.lock();
- try {
- if (count > 0) {
- final int putIndex = this.putIndex;
- int i = takeIndex;
- do {
- if (o.equals(items[i]))
- return true;
- if (++i == items.length)
- i = 0;
- } while (i != putIndex);
- }
- return false;
- } finally {
- lock.unlock();
- }
- }
-
- /**
- * 清空隊列
- *
- */
- public void clear() {
- final Object[] items = this.items;
- final ReentrantLock lock = this.lock;
- lock.lock();
- try {
- int k = count;
- if (k > 0) {
- final int putIndex = this.putIndex;
- int i = takeIndex;
- do {
- items[i] = null;
- if (++i == items.length)
- i = 0;
- } while (i != putIndex);
- takeIndex = putIndex;
- count = 0;
- if (itrs != null)
- itrs.queueIsEmpty();
- for (; k > 0 && lock.hasWaiters(notFull); k--)
- notFull.signal();
- }
- } finally {
- lock.unlock();
- }
- }
-
- /**
- * 取出全部元素到集合
- */
- public int drainTo(Collection<? super E> c) {
- return drainTo(c, Integer.MAX_VALUE);
- }
-
- /**
- * 取出全部元素到集合
- */
- public int drainTo(Collection<? super E> c, int maxElements) {
- checkNotNull(c);
- if (c == this)
- throw new IllegalArgumentException();
- if (maxElements <= 0)
- return 0;
- final Object[] items = this.items;
- final ReentrantLock lock = this.lock;
- lock.lock();
- try {
- int n = Math.min(maxElements, count);
- int take = takeIndex;
- int i = 0;
- try {
- while (i < n) {
- @SuppressWarnings("unchecked")
- E x = (E) items[take];
- c.add(x);
- items[take] = null;
- if (++take == items.length)
- take = 0;
- i++;
- }
- return n;
- } finally {
- // Restore invariants even if c.add() threw
- if (i > 0) {
- count -= i;
- takeIndex = take;
- if (itrs != null) {
- if (count == 0)
- itrs.queueIsEmpty();
- else if (i > take)
- itrs.takeIndexWrapped();
- }
- for (; i > 0 && lock.hasWaiters(notFull); i--)
- notFull.signal();
- }
- }
- } finally {
- lock.unlock();
- }
- }
-
-
- }
使用實例:
生產者-消費者模型
大量的實現ArrayBlockingQueue已經作掉了,包括判空,線程掛起等操做都封裝在ArrayBlockingQueue中。生產者只須要關心生產,消費者只須要關心消費。而若是不使用ArrayBlockingQueue的話,具體的生產者還須要去通知消費者,還須要關心整個容器是否滿了。從這裏能夠看出ArrayBlockingQueue是一種比較好的實現方式,高度的內聚。
Producer.java
[java] view plain copy
-
- public class Producer implements Runnable{
-
- //容器
- private final ArrayBlockingQueue<Bread> queue;
-
- public Producer(ArrayBlockingQueue<Bread> queue){
- this.queue = queue;
- }
-
- /* (non-Javadoc)
- * @see java.lang.Runnable#run()
- */
- @Override
- public void run() {
- while(true){
- produce();
- }
- }
-
- public void produce(){
- /**
- * put()方法是若是容器滿了的話就會把當前線程掛起
- * offer()方法是容器若是滿的話就會返回false。
- */
- try {
- Bread bread = new Bread();
- queue.put(bread);
- System.out.println("Producer:"+bread);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
Consumer.java
[java] view plain copy
-
- public class Consumer implements Runnable{
-
- //容器
- private final ArrayBlockingQueue<Bread> queue;
-
- public Consumer(ArrayBlockingQueue<Bread> queue){
- this.queue = queue;
- }
-
- /* (non-Javadoc)
- * @see java.lang.Runnable#run()
- */
- @Override
- public void run() {
- while(true){
- consume();
- }
- }
-
- public void consume(){
- /**
- * take()方法和put()方法是對應的,從中拿一個數據,若是拿不到線程掛起
- * poll()方法和offer()方法是對應的,從中拿一個數據,若是沒有直接返回null
- */
- try {
- Bread bread = queue.take();
- System.out.println("consumer:"+bread);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
Client.java
[java] view plain copy
-
- public class Client {
-
- /**
- * @param args
- */
- public static void main(String[] args) {
- int capacity = 10;
- ArrayBlockingQueue<Bread> queue = new ArrayBlockingQueue<Bread>(capacity);
-
- new Thread(new Producer(queue)).start();
- new Thread(new Producer(queue)).start();
- new Thread(new Consumer(queue)).start();
- new Thread(new Consumer(queue)).start();
- new Thread(new Consumer(queue)).start();
- }
-
- }
參考:http://blog.csdn.net/evankaka/article/details/51706109
http://blog.csdn.net/hudashi/article/details/7076745