最近在看數據結構的時候,看到了隊列這裏,在實際的開發中咱們不多會手動的去實現一個隊列,甚至不多直接用到隊列,可是在Java的包中有一些具備特殊屬性的隊列應用的比較普遍,例如:阻塞隊列&併發隊列.java
阻塞隊列(BlockingQueue)是一個額外支持兩種操做的隊列。這兩個附加的操做是:
1、在隊列爲空時,獲取元素的線程會等待隊列變爲非空。
2、當隊列滿時,存儲元素的線程會等待隊列可用。
阻塞隊列經常使用於生產者和消費者的場景,生產者是往隊列裏添加元素的線程,消費者是從隊列裏拿元素的線程。阻塞隊列就是生產者存放元素的容器,而消費者也只從容器裏拿元素。segmentfault
阻塞隊列提供了四種處理方法:數組
拋出異常數據結構
add(e):在添加元素的時候若是隊列已滿,那麼直接拋出異常。 remove(e):移除元素,若是隊列爲空,那麼拋出異常。 element():檢查方法。 public static void test() { ArrayBlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(10); for (int i=0; i<blockingQueue.size() + 1; i++) { blockingQueue.add("string"+i); } } 結果:Exception in thread "main" java.lang.IllegalStateException: Queue full 源碼以下: public boolean add(E var1) { return super.add(var1); } public boolean add(E var1) { //調用offer方法 if (this.offer(var1)) { return true; } else { throw new IllegalStateException("Queue full"); } } /**offer(var1)在ArrayBlockingQueue中的實現源碼以下:*/ public boolean offer(E var1) { checkNotNull(var1); ReentrantLock var2 = this.lock; var2.lock(); boolean var3; try { //若是隊列已滿,返回false if (this.count == this.items.length) { var3 = false; return var3; } //進行入隊操做 this.enqueue(var1); var3 = true; } finally { var2.unlock(); } return var3; }
返回特殊值併發
一、offer(e) 入隊的時候返回特殊值,在不一樣的阻塞隊列中實現有必定的差異 2、poll() 出隊的時候返回特殊的值 3、peek() 測試出隊可否成功
一直阻塞函數
1、put(e) 若是隊列已滿,那麼會一直阻塞,直到成功 2、take() 若是隊列爲空,那麼出隊會一直阻塞,直到成功
阻塞,超時退出測試
1、offer(e,time,unit) 2、poll(time,unit)
Java中的阻塞隊列
JDK7提供了7個阻塞隊列。分別是ui
下面具體看一下每一種阻塞隊列的實現方式以及使用場景:this
特性:用數組實現的實現的有界阻塞隊列,默認狀況下不保證線程公平的訪問隊列(按照阻塞的前後順序訪問隊列),隊列可用的時候,阻塞的線程均可以爭奪隊列的訪問資格,固然也可使用如下的構造方法建立一個公平的阻塞隊列。
ArrayBlockingQueue<String> blockingQueue2 = new ArrayBlockingQueue<>(10, true);
下面經過源碼探究如下,這個阻塞隊列是如何實現的?若是實現公平與非公平的控制。
public ArrayBlockingQueue(int capacity) { this(capacity, false); } public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); //基於數組實現 this.items = new Object[capacity]; /**公平與非公平是經過可重入鎖來實現的*/ lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); } /**阻塞隊列的公平與非公平是經過可重入鎖來實現的,關於爲何可重入鎖能夠實現線程訪問的公平非公平特性,咱們晚一點分析一下ReentrantLock的實現原理。
【關於ReentrantLock的實現原理】https://segmentfault.com/a/11...線程
public boolean add(E e) { return super.add(e); } /**AbstractQueue 父類的add方法*/ public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); } /**經過多態調用本身的offer(e)實現*/ public boolean offer(E e) { checkNotNull(e); //加鎖 final ReentrantLock lock = this.lock; lock.lock(); try { //若是隊列滿了,那麼返回false if (count == items.length) return false; else { //入隊 enqueue(e); return true; } } finally { lock.unlock(); } } private void enqueue(E var1) { Object[] var2 = this.items; //putIndex能夠認爲是隊列的隊尾後的一個位置,數據入隊對應的位置,若是隊列滿了,那麼putIndex設置爲0 var2[this.putIndex] = var1; if (++this.putIndex == var2.length) { this.putIndex = 0; } ++this.count; //喚醒一個等待在condition上的線程 this.notEmpty.signal(); }
public void put(E var1) throws InterruptedException { checkNotNull(var1); ReentrantLock var2 = this.lock; //加鎖,可被線程中斷返回 var2.lockInterruptibly(); try { //若是隊列已經滿了,那麼阻塞 while(this.count == this.items.length) { this.notFull.await(); } //進行入隊操做 this.enqueue(var1); } finally { var2.unlock(); } } /** * 在隊列滿的狀況put操做被阻塞,那麼何時該操做能夠被喚醒呢?很顯然是隊列中出現空地的狀況下,纔會被喚醒在notFull這種條件下 * 阻塞的操做: * 因此在發生如下操做的時候,會被喚醒進行入隊的操做 * 1、dequeue()操做 2、removeAt(int var1)操做 3、clear() 4、drainTo */
/**實現比較容易,和上面的操做殊途同歸*/ public E take() throws InterruptedException { ReentrantLock var1 = this.lock; var1.lockInterruptibly(); Object var2; try { while(this.count == 0) { this.notEmpty.await(); } var2 = this.dequeue(); } finally { var1.unlock(); } return var2; }
我的總結:實現阻塞操做和核心在於線程掛起以及線程的喚醒,在Java中提供了兩種線程等待以及線程喚醒的方式。一是基於對象監視器的wait(),notify()方法。 二是經過Condition.await()和signal()方法。
基於鏈表實現的有界阻塞隊列。此隊列的默認和最大長度爲Integer.MAX_VALUE。此隊列按照先進先出的原則對元素進行排序。這個隊列的實現原理和ArrayBlockingQueue實現基本相同。能夠看一下隊列的定義:
/**默認的構造函數*/ public LinkedBlockingQueue() { this(2147483647); } public LinkedBlockingQueue(int var1) { this.count = new AtomicInteger(); this.takeLock = new ReentrantLock(); this.notEmpty = this.takeLock.newCondition(); this.putLock = new ReentrantLock(); this.notFull = this.putLock.newCondition(); if (var1 <= 0) { throw new IllegalArgumentException(); } else { this.capacity = var1; //鏈表的頭結點和尾節點,默認是空 this.last = this.head = new LinkedBlockingQueue.Node((Object)null); } }
一個支持優先級的無界隊列。默認狀況下元素採起天然順序排列,也能夠經過比較器comparator來指定元素的排序規則。元素按照升序排列。具體是如何實現的?
/**定義和一般的阻塞隊列相同,AbstractQueue中定義了隊列的基本操做,BlockingQueue中定義可阻塞隊列的相關操做定義*/ public class PriorityBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E> /**構造方法,默認的無參構造方法,調用的是另外一個構造方法,默認定義了一個隊列的容量,那爲何說他是無界隊列呢?接着向下*/ public PriorityBlockingQueue() { this(DEFAULT_INITIAL_CAPACITY, null); } /**全部的構造方法最後調用的構造方法, comparator是一個比較器,經過比較器能夠肯定隊列中元素的排列順序*/ public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) { if (initialCapacity < 1) throw new IllegalArgumentException(); this.lock = new ReentrantLock(); this.notEmpty = lock.newCondition(); this.comparator = comparator; /**隊列是基於數組實現的*/ this.queue = new Object[initialCapacity]; }
public boolean add(E e) { return offer(e); } /** * offer操做 */ public boolean offer(E e) { if (e == null) throw new NullPointerException(); final ReentrantLock lock = this.lock; lock.lock(); int n, cap; Object[] array; //若是隊列已滿,那麼嘗試進行擴容(我的感受這裏使用 >= 並非很合理) while ((n = size) >= (cap = (array = queue).length)) tryGrow(array, cap); try { Comparator<? super E> cmp = comparator; if (cmp == null) //使用默認的比較方法將e放到隊列中 siftUpComparable(n, e, array); else //使用指定的比較順序將數據插入到隊列中 siftUpUsingComparator(n, e, array, cmp); size = n + 1; //激活一個在notEmpty這個condition上等待的線程 notEmpty.signal(); } finally { lock.unlock(); } return true; } /**tryGrow()實現*/ private void tryGrow(Object[] array, int oldCap) { //這裏先釋放了鎖,最後須要從新獲取鎖,那麼這個時候全部的add操做都會執行下面的代碼段 lock.unlock(); // must release and then re-acquire main lock Object[] newArray = null; if (allocationSpinLock == 0 && UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) { try { int newCap = oldCap + ((oldCap < 64) ? (oldCap + 2) : // grow faster if small (oldCap >> 1)); if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow int minCap = oldCap + 1; if (minCap < 0 || minCap > MAX_ARRAY_SIZE) throw new OutOfMemoryError(); newCap = MAX_ARRAY_SIZE; } if (newCap > oldCap && queue == array) newArray = new Object[newCap]; } finally { allocationSpinLock = 0; } } if (newArray == null) // back off if another thread is allocating Thread.yield(); lock.lock(); if (newArray != null && queue == array) { queue = newArray; System.arraycopy(array, 0, newArray, 0, oldCap); } }