最近在看一些java基礎的東西,看到了隊列這章,打算對複習的一些知識點作一個筆記,也算是對本身思路的一個整理,本章先聊聊java中的阻塞隊列java
參考文章:數組
http://ifeve.com/java-blocking-queue/app
https://blog.csdn.net/u014082714/article/details/52215130函數
由上圖能夠用看出java中的阻塞隊列都實現了 BlockingQueue接口,BlockingQueue又繼承自Queue性能
阻塞隊列(BlockingQueue)是一個支持兩個附加操做的隊列。這兩個附加的操做是:在隊列爲空時,獲取元素的線程會等待隊列變爲非空。當隊列滿時,存儲元素的線程會等待隊列可用。阻塞隊列經常使用於生產者和消費者的場景,生產者是往隊列裏添加元素的線程,消費者是從隊列裏拿元素的線程。阻塞隊列就是生產者存放元素的容器,而消費者也只從容器裏拿元素。this
阻塞隊列提供了四種處理方法:spa
JDK7提供了7個阻塞隊列。分別是操作系統
ArrayBlockingQueue.net
ArrayBlockingQueue是一個用數組實現的有界阻塞隊列。此隊列按照先進先出(FIFO)的原則對元素進行排序。默認狀況下不保證訪問者公平的訪問隊列,所謂公平訪問隊列是指阻塞的全部生產者線程或消費者線程,當隊列可用時,能夠按照阻塞的前後順序訪問隊列,即先阻塞的生產者線程,能夠先往隊列裏插入元素,先阻塞的消費者線程,能夠先從隊列裏獲取元素。一般狀況下爲了保證公平性會下降吞吐量。咱們能夠使用如下代碼建立一個公平的阻塞隊列:線程
ArrayBlockingQueue fairQueue =
new
ArrayBlockingQueue(
1000
,
true
);
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
經過源碼咱們能夠看到,構造器第一個參數是指定有界隊列的大小(及數組的大小),第二個參數指定是否使用公平鎖,這裏能夠看到阻塞隊列的公平訪問隊列是經過重入鎖來實現的(關於重入鎖咱們在別的章節介紹)
下邊咱們結合源碼對其提供的方法作一個簡單分析
關於構造器相關說明
/** * * 構造函數,設置隊列的初始容量 */ public ArrayBlockingQueue(int capacity) { this(capacity, false); } /** * 構造函數。capacity設置數組大小 ,fair設置是否爲公平鎖 * capacity and the specified access policy. */ public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair);//是否爲公平鎖,若是是的話,那麼先到的線程先得到鎖對象。 //不然,由操做系統調度由哪一個線程得到鎖,通常爲false,性能會比較高 notEmpty = lock.newCondition(); notFull = lock.newCondition(); } /** *構造函數,帶有初始內容的隊列 */ public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { this(capacity, fair); final ReentrantLock lock = this.lock; lock.lock(); //要給數組設置內容,先上鎖 try { int i = 0; try { for (E e : c) { checkNotNull(e); items[i++] = e;//依次拷貝內容 } } catch (ArrayIndexOutOfBoundsException ex) { throw new IllegalArgumentException(); } count = i; putIndex = (i == capacity) ? 0 : i;//若是putIndex大於數組大小 ,那麼從0從新開始 } finally { lock.unlock();//最後必定要釋放鎖 } }
關於方法的說明
/** * 添加一個元素,其實super.add裏面調用了offer方法 */ public boolean add(E e) { return super.add(e); }
/**
* 當調用offer方法返回false時,直接拋出異常
*/
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
}
/** *加入成功返回true,不然返回false * */ public boolean offer(E e) { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lock();//上鎖 try { if (count == items.length) //超過數組的容量 return false; else { enqueue(e); //放入元素 return true; } } finally { lock.unlock(); } } /** * 若是隊列已滿的話,就會等待 */ public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly();//和lock()方法的區別是讓它在阻塞時也可拋出異常跳出 try { while (count == items.length) notFull.await(); //這裏就是阻塞了,要注意。若是運行到這裏,那麼它會釋放上面的鎖,一直等到notify enqueue(e); } finally { lock.unlock(); } } /** * 帶有超時時間的插入方法,unit表示是按秒、分、時哪種 */ public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { checkNotNull(e); long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) { if (nanos <= 0) return false; nanos = notFull.awaitNanos(nanos);//帶有超時等待的阻塞方法 } enqueue(e);//入隊 return true; } finally { lock.unlock(); } } //實現的方法,若是當前隊列爲空,返回null public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { return (count == 0) ? null : dequeue(); } finally { lock.unlock(); } } //實現的方法,若是當前隊列爲空,一直阻塞 public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await();//隊列爲空,阻塞方法 return dequeue(); } finally { lock.unlock(); } } //帶有超時時間的取元素方法,不然返回Null public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) { if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos);//超時等待 } return dequeue();//取得元素 } finally { lock.unlock(); } } //只是看一個隊列最前面的元素,取出是不刪除隊列中的原來元素。隊列爲空時返回null public E peek() { final ReentrantLock lock = this.lock; lock.lock(); try { return itemAt(takeIndex); // 隊列爲空時返回null } finally { lock.unlock(); } } /** * 返回隊列當前元素個數 * */ public int size() { final ReentrantLock lock = this.lock; lock.lock(); try { return count; } finally { lock.unlock(); } } /** * 返回當前隊列再放入多少個元素就滿隊 */ public int remainingCapacity() { final ReentrantLock lock = this.lock; lock.lock(); try { return items.length - count; } finally { lock.unlock(); } } /** * 從隊列中刪除一個元素的方法。刪除成功返回true,不然返回false */ public boolean remove(Object o) { if (o == null) return false; final Object[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { if (count > 0) { final int putIndex = this.putIndex; int i = takeIndex; do { if (o.equals(items[i])) { removeAt(i); //真正刪除的方法 return true; } if (++i == items.length) i = 0; } while (i != putIndex);//一直不斷的循環取出來作判斷 } return false; } finally { lock.unlock(); } } /** * 是否包含一個元素 */ public boolean contains(Object o) { if (o == null) return false; final Object[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { if (count > 0) { final int putIndex = this.putIndex; int i = takeIndex; do { if (o.equals(items[i])) return true; if (++i == items.length) i = 0; } while (i != putIndex); } return false; } finally { lock.unlock(); } } /** * 清空隊列 * */ public void clear() { final Object[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { int k = count; if (k > 0) { final int putIndex = this.putIndex; int i = takeIndex; do { items[i] = null; if (++i == items.length) i = 0; } while (i != putIndex); takeIndex = putIndex; count = 0; if (itrs != null) itrs.queueIsEmpty(); for (; k > 0 && lock.hasWaiters(notFull); k--) notFull.signal(); } } finally { lock.unlock(); } } /** * 取出全部元素到集合 */ public int drainTo(Collection<? super E> c) { return drainTo(c, Integer.MAX_VALUE); } /** * 取出全部元素到集合 */ public int drainTo(Collection<? super E> c, int maxElements) { checkNotNull(c); if (c == this) throw new IllegalArgumentException(); if (maxElements <= 0) return 0; final Object[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { int n = Math.min(maxElements, count); int take = takeIndex; int i = 0; try { while (i < n) { @SuppressWarnings("unchecked") E x = (E) items[take]; c.add(x); items[take] = null; if (++take == items.length) take = 0; i++; } return n; } finally { // Restore invariants even if c.add() threw if (i > 0) { count -= i; takeIndex = take; if (itrs != null) { if (count == 0) itrs.queueIsEmpty(); else if (i > take) itrs.takeIndexWrapped(); } for (; i > 0 && lock.hasWaiters(notFull); i--) notFull.signal(); } } } finally { lock.unlock(); } }