阻塞隊列BlockingQueue是JDK1.5併發新特性中的內容,阻塞隊列首先是一個隊列,一樣實現了Collection接口。阻塞隊列提供了可阻塞的put和take方法,以及支持定時的poll和offer方法。html
阻塞隊列跟普通隊列相比,首頁它是線程安全的,另外還提供了兩個附加操做:當隊列爲空時,從隊列中獲取元素的操做將被阻塞;當隊列填盡是,向隊列添加元素將被阻塞。這兩個附加操做分別由BlockingQueue提供的兩個take和put方法支持。若是隊列已經滿了,那麼put方法將被阻塞直到有空間可用;若是隊列爲空,那麼take方法將被阻塞直到有元素可用。隊列能夠是有界的也能夠是無界的,無界隊列永遠不會充滿,所以在無界隊列上面put方法也永遠不會被阻塞。java
BlockingQueue提供了4中類型的處理方法:算法
方法\處理方式 | 拋出異常 | 返回特殊值 | 一直阻塞 | 超時退出 |
---|---|---|---|---|
插入方法 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除方法 | remove() | poll() | take() | poll(time,unit) |
檢查方法 | element() | peek() | 不可用 | 不可用 |
隊列 | 有界性 | 鎖 | 數據結構 |
---|---|---|---|
ArrayBlockingQueue | 有界 | 加鎖 | arraylist |
LinkedBlockingQueue | 可選有界 | 加鎖 | 單向linkedlist |
PriorityBlockingQueue | 無界 | 加鎖 | Heap |
DelayQueue | 無界 | 加鎖 | Heap |
SynchronousQueue | 有界 | 無鎖(JDK1.6) | ~ |
LinkedTransferQueue | 無界 | 無鎖 | 單向linkedlist |
LinkedBlockingDeque | 可選有界 | 加鎖 | 雙向linkedlist |
在多線程環境中,經過隊列能夠很容易的實現數據共享。在基於隊列的生產者-消費者模型中,數據生產時,生產者就把數據放入隊列,當消費者準備使用數據時就從隊列中取出數據。生產者不須要知道消費者的標識或者數量,或者他們是惟一的生產者。一樣,消費者也不須要知道生產者來自何處。BlockingQqueue簡化了生產者-消費者的過程,它支持任意數量的生產者-消費者。一種最多見的生產者-消費者模式就是線程池與工做隊列的組合,在Executor執行框架中就體現了這種模式。編程
ArrayBlockingQueue數組
ArrayBlockingQueue是一個基於數組的阻塞隊列實現,內部維護了一個定長數組,以便緩存數據。一旦建立了這樣的緩存區,就不能再增長其容量。試圖向已滿隊列中放入元素會致使放入操做受阻塞;試圖從空隊列中檢索元素將致使相似阻塞。ArrayBlockingQueue內部還保存着兩個整形變量,分別標識着隊列的頭部和尾部在數組中的位置。緩存
因爲ArrayBlockingQueue內部只維護一個ReentrantLock類型的lock鎖對象,因此在生成者-消費者模型中,並不能真正的實現並行,這一點不一樣於LinkedBlockingQueue,LinkedBlockingQueue內部維護了兩個鎖。事實上ArrayBlockingQueue徹底能夠採用分離鎖,從而實現生產者和消費者操做的徹底並行運行。Doug Lea之因此沒這樣去作,也許是由於ArrayBlockingQueue的數據寫入和獲取操做已經足夠輕巧,以致於引入獨立的鎖機制,除了給代碼帶來額外的複雜性外,其在性能上徹底佔不到任何便宜。安全
ArrayBlockingQueue和LinkedBlockingQueue間還有一個明顯的不一樣之處在於,前者在插入或刪除元素時不會產生或銷燬任何額外的對象實例,然後者則會生成一個額外的Node對象。這在長時間內須要高效併發地處理大批量數據的系統中,其對於GC的影響仍是存在必定的區別。而在建立ArrayBlockingQueue時,咱們還能夠控制對象的內部鎖是否採用公平鎖,默認採用非公平鎖。數據結構
LinkedBlockingQueue多線程
LinkedBlockingQueue是一個單向鏈表實現的阻塞隊列,支持真正的並行操做,由於內部使用ReentrantLock實現插入鎖(putLock)和取出鎖(takeLock),維護了兩個所對象。其內部也維持着一個數據緩衝隊列(該隊列由一個鏈表構成),當生產者往隊列中放入一個數據時,隊列會從生產者手中獲取數據,並緩存在隊列內部,而生產者當即返回;只有當隊列緩衝區達到最大值緩存容量時(LinkedBlockingQueue能夠經過構造函數指定該值),纔會阻塞生產者隊列,直到消費者從隊列中消費掉一份數據,生產者線程會被喚醒,反之對於消費者這端的處理也基於一樣的原理。而LinkedBlockingQueue之因此可以高效的處理併發數據,還由於其對於生產者端和消費者端分別採用了獨立的鎖來控制數據同步,這也意味着在高併發的狀況下生產者和消費者能夠並行地操做隊列中的數據,以此來提升整個隊列的併發性能。併發
在開發中新建LinkedBlockingQueue實例的時候,通常要指定其大小,若是沒有指定大小,大小默認是Integer.MAX_VALUE,這樣的話,若是生產者的速度一旦大於消費者的速度,也許尚未等到隊列滿阻塞產生,系統內存就有可能已被消耗殆盡了。在線程池框架Executors中newSingleThreadExecutor和newFixedThreadPool方法內部維護的都是LinkedBlockingQueue。
PriorityBlockingQueue
PriorityBlockingQueue是一個按照優先級排序的隊列,若是想要某個隊列不是按照FIFO的順序來處理元素,該隊列很是有用,內部維護一個堆的數據結構。PriorityBlockingQueue既能夠根據元素的天然順序進行排序,若是元素實現了Comparable接口,也能夠根據Comparator進行比較。該隊列看似有界隊列,實際上它會自動擴容,所以是無界隊列,所以在生產者-消費者模型中,生產者並不會真正的阻塞,而只會在沒有可消費的數據時,阻塞數據的消費者。所以使用的時候要特別注意,生產者生產數據的速度絕對不能快於消費者消費數據的速度,不然時間一長,會最終耗盡全部的可用堆內存空間。在實現PriorityBlockingQueue時,內部控制線程同步的鎖採用的是非公平鎖。
DelayQueue
DelayQueue是一個無界阻塞隊列,只有在延遲期滿時才能從中提取元素。該隊列的頭部 是延遲期滿後保存時間最長的 Delayed 元素。若是延遲都尚未期滿,則隊列沒有頭部,而且 poll 將返回null。當一個元素的 getDelay(TimeUnit.NANOSECONDS) 方法返回一個小於等於 0 的值時,將發生到期。即便沒法使用 take 或 poll移除未到期的元素,也不會將這些元素做爲正常元素對待。例如,size 方法同時返回到期和未到期元素的計數。此隊列不容許使用 null 元素。
DelayQueue使用場景較少,但都至關巧妙,常見的例子好比使用一個DelayQueue來管理一個超時未響應的鏈接隊列。
SynchronousQueue
SynchronousQueue是這樣一種阻塞隊列,其中每一個 put 必須等待一個take,反之亦然。同步隊列沒有任何內部容量,甚至連一個隊列的容量都沒有,它不會爲隊列中的元素維護存儲空間。與其它隊列不一樣的是,它維護一組線程,這些線程在等待着元素加入或者移除隊列。不能在同步隊列上進行peek,由於僅在試圖要取得元素時,該元素才存在;除非另外一個線程試圖移除某個元素,不然也不能(使用任何方法)添加元素;也不能迭代隊列,由於其中沒有元素可用於迭代。隊列的頭是嘗試添加到隊列中的首個已排隊線程元素; 若是沒有已排隊線程,則不添加元素而且頭爲null。SynchronousQueue相似於無中介的直接交易,有點像原始社會中的生產者和消費者,生產者拿着產品去集市銷售給產品的最終消費者,而消費者必須親自去集市找到所要商品的直接生產者,若是一方沒有找到合適的目標,那麼對不起,你們都在集市等待。
SynchronousQueue的一個使用場景是在線程池裏。Executors.newCachedThreadPool()就使用了SynchronousQueue,這個線程池根據須要(新任務到來時)建立新的線程,若是有空閒線程則會重複使用,線程空閒了60秒後會被回收。
建立SynchronousQueue有兩種構造方法,一種時SynchronousQueue(),默認採用非公平的形式,從JDK1.6開始SynchronousQueue的實現採用了一種性能更好的無鎖算法。競爭機制支持公平和非公平兩種:非公平競爭模式使用的數據結構是後進先出棧(LIFO Stack);公平競爭模式則使用先進先出隊列(FIFO),性能上二者是至關的,通常狀況下,FIFO一般能夠支持更大的吞吐量,但LIFO能夠更大程度的保持線程的本地化。另一種SynchronousQueue(boolean fair),能夠本身指定訪問方式是否採用公平方式。
LinkedTransferQueue
LinkedTransferQueue是JDK1.7中新引入的隊列,該隊列的實現基於CAS無鎖機制,它也是一個基於鏈表實現的無界隊列。相比前面隊列它多transfer和tryTransfer方法。
LinkedBlockingDeque
LinkedBlockingDeque一個基於已連接節點的、任選範圍的阻塞雙端隊列。可選的容量範圍構造方法參數是一種防止過分膨脹的方式。若是未指定容量,那麼容量將等於 Integer.MAX_VALUE。只要插入元素不會使雙端隊列超出容量,每次插入後都將動態地建立連接節點。要想支持阻塞功能,隊列的容量必定是固定的,不然沒法在入隊的時候掛起線程。也就是capacity是final類型的。
這是一個使用LinkedBlockedQueue設計實現的簡單的生產者-消費者模式。
public class Producer implements Runnable { private volatile boolean isRunning = true; private BlockingQueue<String> queue; private static final int DEFAULT_SLEEP = 1000; private static AtomicInteger count = new AtomicInteger(); public Producer(BlockingQueue<String> queue) { this.queue = queue; } @Override public void run() { String data = null; Random r = new Random(); System.out.println("啓動生產者線程!"); try { while (isRunning) { Thread.sleep(r.nextInt(DEFAULT_SLEEP)); data = "data:" + count.incrementAndGet(); queue.put(data); System.out.println("將數據:" + data + "放入隊列..."); } } catch (InterruptedException e) { e.printStackTrace(); Thread.currentThread().interrupt(); } finally { System.out.println("退出生產者線程!"); } } public void stop() { isRunning = false; } } public class Consumer implements Runnable { private volatile boolean isRunning = true; private BlockingQueue<String> queue; private static final int DEFAULT_SLEEP = 1000; public Consumer(BlockingQueue<String> queue) { this.queue = queue; } @Override public void run() { System.out.println("啓動消費者線程!"); Random r = new Random(); try { while (isRunning) { String data = queue.take(); if (null != data) { System.out.println("正在消費:" + data); Thread.sleep(r.nextInt(DEFAULT_SLEEP)); } } } catch (InterruptedException e) { e.printStackTrace(); Thread.currentThread().interrupt(); } finally { System.out.println("退出消費者線程!"); } } public void stop() { isRunning = false; } } public class MainTest { public static void main(String[] args) throws InterruptedException { // 聲明一個容量爲10的緩存隊列 BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10); Producer producer1 = new Producer(queue); Producer producer2 = new Producer(queue); Producer producer3 = new Producer(queue); Consumer consumer = new Consumer(queue); // 藉助Executors ExecutorService service = Executors.newCachedThreadPool(); // 啓動線程 service.execute(producer1); service.execute(producer2); service.execute(producer3); service.execute(consumer); // 執行20s Thread.sleep(20 * 1000); producer1.stop(); producer2.stop(); producer3.stop(); Thread.sleep(2 * 1000); consumer.stop(); // 退出Executor service.shutdown(); } }
若是不使用阻塞隊列,使用Object.wait()和Object.notify()非阻塞隊列實現生產者-消費者模式,生產者線程在緩衝區爲滿的時候,消費者在緩衝區爲空的時候,都應該暫停運行。而後用notify 和notifyAll通知等待中的線程從新開始執行。
Java併發編程實踐