由於在工做中常常會用到阻塞隊列,有的時候還要根據業務場景獲取重寫阻塞隊列中的方法,因此學習一下阻塞隊列的實現原理仍是頗有必要的。(PS:不深刻了解的話,很容易使用出錯,形成沒有技術深度的樣子)html
要想了解阻塞隊列,先了解一下隊列是啥,簡單的說隊列就是一種先進先出的數據結構。(具體的內容去數據結構裏學習一下)因此阻塞隊列就是一種可阻塞的隊列。和普通的隊列的不一樣就體如今 」阻塞「兩個字上。阻塞是啥意思?java
百度看一下程序員
在軟件工程裏阻塞通常指的是阻塞調用,即調用結果返回以前,當前線程會被掛起。函數只有在獲得結果以後纔會返回。編程
阻塞隊列其實就是普通的隊列根據須要將某些方法改成阻塞調用。因此阻塞隊裏和普通隊裏的不一樣主要體如今兩個方面數組
那麼爲何要使用阻塞隊列?阻塞隊列又能完成什麼特殊的任務嗎?緩存
阻塞隊列的經典使用 場景就是「生產者」和「消費者」模型,生產者生產數據,放入隊列,而後消費從隊列中獲取數據,這個在通常狀況下天然沒有問題,但若是生產者和消費者在某個時間段內,萬一發生數據處理速度不匹配的狀況呢?安全
在出現消費者速度遠大於生產者速度,消費者在數據消費至必定程度的狀況下,暫停等待一下(阻塞消費者)來等待生產者,以保證生產者可以生產出新的數據;反之亦然。數據結構
阻塞隊列在java中的一種典型使用場景是線程池,在線程池中,當提交的任務不能被當即獲得執行的時候,線程池就會將提交的任務放到一個阻塞的任務隊列中來(線程池的具體使用參見以前寫的一篇文章《java併發之線程池的淺析》)多線程
然而,在阻塞隊列發佈之前,在多線程環境下,咱們每一個程序員都必須去本身控制這些細節,尤爲還要兼顧效率和線程安全,而這會給咱們的程序帶來不小的複雜度。在這裏要感謝一下concurrent包,減輕了咱們不少工做併發
下面分別簡單介紹一下:
ArrayBlockingQueue:是一個用數組實現的有界阻塞隊列,此隊列按照先進先出(FIFO)的原則對元素進行排序。構造時必須傳入的參數是數組大小此外還能夠指定是否公平性。【注:每個線程在獲取鎖的時候可能都會排隊等待,若是在等待時間上,先獲取鎖的線程的請求必定先被知足,那麼這個鎖就是公平的。反之,這個鎖就是不公平的。公平的獲取鎖,也就是當前等待時間最長的線程先獲取鎖】;在插入或刪除元素時不會產生或銷燬任何額外的對象實例
LinkedBlockingDeque: 一個由鏈表結構組成的雙向阻塞隊列。隊列頭部和尾部均可以添加和移除元素,多線程併發時,能夠將鎖的競爭最多降到一半。
阻塞對隊列的核心方法主要是插入操做操做和取出操做,以下
public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); }
前面介紹了非阻塞隊列和阻塞隊列中經常使用的方法,下面來探討阻塞隊列的實現原理,本文以比較經常使用的ArrayBlockingQueue爲例,其餘阻塞隊列實現原理根據特性會和ArrayBlockingQueue有一些差異,可是大致思路應該相似,有興趣的朋友可自行查看其餘阻塞隊列的實現源碼。
首先看一下ArrayBlockingQueue的幾個關鍵成員變量
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { /** The queued items */ final Object[] items; /** items index for next take, poll, peek or remove */ int takeIndex; /** items index for next put, offer, or add */ int putIndex; /** Number of elements in the queue */ int count; /* * Concurrency control uses the classic two-condition algorithm * found in any textbook. */ /** Main lock guarding all access */ final ReentrantLock lock; /** Condition for waiting takes */ private final Condition notEmpty; /** Condition for waiting puts */ private final Condition notFull; }
從上邊能夠明顯的看出ArrayBlockingQueue用一個數組來存儲數據,takeIndex和putIndex分別表示隊首元素和隊尾元素的下標,count表示隊列中元素的個數。 lock是一個可重入鎖,notEmpty和notFull是等待條件。
而後看它的一個關鍵方法的實現:put()
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); }
}
enqueue(E x)將元素插入到數組啊item中
/** * Inserts element at current put position, advances, and signals. * Call only when holding lock. */ private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); }
該方法內部經過putIndex索引直接將元素添加到數組items中
這裏思考一個問題 爲何當putIndex索引大小等於數組長度時,須要將putIndex從新設置爲0?
這是由於當隊列是先進先出的 因此獲取元素老是從隊列頭部獲取,而添加元素從中從隊列尾部獲取。因此當隊列索引(從0開始)與數組長度相等時,因此下次咱們就須要從數組頭部開始添加了;
最後當插入成功後,經過notEmpty喚醒正在等待取元素的線程
阻塞隊列中和put對應的就是take了
下邊是take方法的實現
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); finally { lock.unlock(); } }
take方法其實很簡單,隊列中有數據就刪除沒有就阻塞,注意這個阻塞是能夠中斷的,若是隊列沒有數據那麼就加入notEmpty條件隊列等待(有數據就直接取走,方法結束),若是有新的put線程添加了數據,那麼put操做將會喚醒take線程;
能夠看到take的實現跟put方法實現很相似,只不過put方法等待的是notFull信號,而take方法等待的是notEmpty信號。(等的就是上文的put中的信號)當數組的數量爲空時,也就是無任何數據能夠被取出來的時候,notEmpty這個Condition就會進行阻塞,直到被notEmpty喚醒
dequeue的實現以下
private E dequeue() { final Object[] items = this.items; E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal(); return x; }
take方法主要是從隊列頭部取元素,能夠看到takeIndex是取元素的時候的偏移值,而put中是putIndex控制添加元素的偏移量,因而可知,put和take操做的偏移量分別是由putIndex和takeIndex控制的。其實仔細觀察put和take的實現思路是有不少類似之處。
模擬食堂的經歷,食堂窗口端出一道菜放在臺面,而後等待顧客消費。寫到代碼裏就是食堂窗口就是一個生產者線程,顧客就是消費者線程,檯面就是阻塞隊列。
public class TestBlockingQueue { /** * 生產和消費業務操做 * * */ protected class WorkDesk { BlockingQueue<String> desk = new LinkedBlockingQueue<String>(8); public void work() throws InterruptedException { Thread.sleep(1000); desk.put("端出一道菜"); } public String eat() throws InterruptedException { Thread.sleep(4000); return desk.take(); } } /** * 生產者類 * * */ class Producer implements Runnable { private String producerName; private WorkDesk workDesk; public Producer(String producerName, WorkDesk workDesk) { this.producerName = producerName; this.workDesk = workDesk; } @Override public void run() { try { for (;;) { workDesk.work(); System.out.println(producerName + "端出一道菜" +",Data:"+System.currentTimeMillis()); } } catch (Exception e) { e.printStackTrace(); } } } /** * 消費者類 * * */ class Consumer implements Runnable { private String consumerName; private WorkDesk workDesk; public Consumer(String consumerName, WorkDesk workDesk) { this.consumerName = consumerName; this.workDesk = workDesk; } @Override public void run() { try { for (;;) { workDesk.eat(); System.out.println(consumerName + "端走了一個菜"+",Data:"+System.currentTimeMillis()); } } catch (Exception e) { e.printStackTrace(); } } } public static void main(String args[]) throws InterruptedException { TestBlockingQueue testQueue = new TestBlockingQueue(); WorkDesk workDesk = testQueue.new WorkDesk(); ExecutorService service = Executors.newFixedThreadPool(6); //四個生產者線程 for (int i=1;i<=4;++i) { service.submit(testQueue.new Producer("食堂窗口-"+ i+"-", workDesk)); } //兩個消費者線程 Consumer consumer1 = testQueue.new Consumer("顧客-1-", workDesk); Consumer consumer2 = testQueue.new Consumer("顧客-2-", workDesk); service.submit(consumer1); service.submit(consumer2); service.shutdown(); } }
結果部分以下
能夠看到當生產者產生的數據達到阻塞隊列的容量時,生成者線程會阻塞,等待消費者線程進行消費,上述案例中最大容量爲8個盤子,因此當食堂作好了8個菜後了8會等待顧客進行消費,消費後繼續生產。上述案例使用阻塞隊列,看起來代碼要簡單得多,不須要再單獨考慮同步和線程間通訊的問題。
在併發編程中,通常推薦使用阻塞隊列,這樣實現能夠儘可能地避免程序出現意外的錯誤。
阻塞隊列使用最經典的場景就是socket客戶端數據的讀取和解析,讀取數據的線程不斷將數據放入隊列,而後解析線程不斷從隊列取數據解析。還有其餘相似的場景,如線程池中就使用了阻塞隊列,其實只要符合生產者-消費者模型的均可以使用阻塞隊列。
參考資料:
《Java編程思想》
https://www.cnblogs.com/dolphin0520/p/3932906.html
https://www.cnblogs.com/superfj/p/7757876.html