Java併發編程:阻塞隊列BlockingQueue

阻塞隊列BlockingQueue簡介

阻塞隊列BlockingQueue是JDK1.5併發新特性中的內容,阻塞隊列首先是一個隊列,一樣實現了Collection接口。阻塞隊列提供了可阻塞的put和take方法,以及支持定時的poll和offer方法。html

阻塞隊列跟普通隊列相比,首頁它是線程安全的,另外還提供了兩個附加操做:當隊列爲空時,從隊列中獲取元素的操做將被阻塞;當隊列填盡是,向隊列添加元素將被阻塞。這兩個附加操做分別由BlockingQueue提供的兩個take和put方法支持。若是隊列已經滿了,那麼put方法將被阻塞直到有空間可用;若是隊列爲空,那麼take方法將被阻塞直到有元素可用。隊列能夠是有界的也能夠是無界的,無界隊列永遠不會充滿,所以在無界隊列上面put方法也永遠不會被阻塞。java

BlockingQueue提供了4中類型的處理方法:算法

方法\處理方式 拋出異常 返回特殊值 一直阻塞 超時退出
插入方法 add(e) offer(e) put(e) offer(e,time,unit)
移除方法 remove() poll() take() poll(time,unit)
檢查方法 element() peek() 不可用 不可用
  • 拋出異常: 當隊列滿時,再向隊列中插入元素,則會拋出IllegalStateException異常。當隊列空時,再向隊列中獲取元素,則會拋出NoSuchElementException異常。
  • 返回特殊值: 當隊列滿時,向隊列中添加元素,則返回false,不然返回true。當隊列爲空時,向隊列中獲取元素,則返回null,不然返回元素。
  • 一直阻塞: 當阻塞隊列滿時,若是生產者向隊列中插入元素,則隊列會一直阻塞當前線程,直到隊列可用或響應中斷退出。當阻塞隊列爲空時,若是消費者線程向阻塞隊列中獲取數據,則隊列會一直阻塞當前線程,直到隊列空閒或響應中斷退出。
  • 超時退出: 當隊列滿時,若是生產線程向隊列中添加元素,則隊列會阻塞生產線程一段時間,超過指定的時間則退出返回false。當隊列爲空時,消費線程從隊列中移除元素,則隊列會阻塞一段時間,若是超過指定時間退出返回null。

Java提供的7個阻塞隊列

隊列 有界性 數據結構
ArrayBlockingQueue 有界 加鎖 arraylist
LinkedBlockingQueue 可選有界 加鎖 單向linkedlist
PriorityBlockingQueue 無界 加鎖 Heap
DelayQueue 無界 加鎖 Heap
SynchronousQueue 有界 無鎖(JDK1.6) ~
LinkedTransferQueue 無界 無鎖 單向linkedlist
LinkedBlockingDeque 可選有界 加鎖 雙向linkedlist

在多線程環境中,經過隊列能夠很容易的實現數據共享。在基於隊列的生產者-消費者模型中,數據生產時,生產者就把數據放入隊列,當消費者準備使用數據時就從隊列中取出數據。生產者不須要知道消費者的標識或者數量,或者他們是惟一的生產者。一樣,消費者也不須要知道生產者來自何處。BlockingQqueue簡化了生產者-消費者的過程,它支持任意數量的生產者-消費者。一種最多見的生產者-消費者模式就是線程池與工做隊列的組合,在Executor執行框架中就體現了這種模式。編程

阻塞隊列BlockingQueue的成員介紹

ArrayBlockingQueue數組

ArrayBlockingQueue是一個基於數組的阻塞隊列實現,內部維護了一個定長數組,以便緩存數據。一旦建立了這樣的緩存區,就不能再增長其容量。試圖向已滿隊列中放入元素會致使放入操做受阻塞;試圖從空隊列中檢索元素將致使相似阻塞。ArrayBlockingQueue內部還保存着兩個整形變量,分別標識着隊列的頭部和尾部在數組中的位置。緩存

  • ArrayBlockingQueue(int capacity) 建立一個帶有給定的(固定)容量和默認訪問策略(非公平鎖)的 ArrayBlockingQueue。capacity是隊列容量。
  • ArrayBlockingQueue(int capacity, boolean fair) 建立一個具備給定的(固定)容量和指定訪問策略的 ArrayBlockingQueue。fair訪問策略若是爲 true,則按照 FIFO 順序訪問插入或移除時受阻塞線程的隊列,若是爲 false,則訪問順序是不肯定的。fair是「可重入的獨佔鎖(ReentrantLock)」的類型。fair爲true,表示是公平鎖,fair爲false,表示是非公平鎖。
  • ArrayBlockingQueue(int capacity, boolean fair, Collectionc) 建立一個具備給定的(固定)容量和指定訪問策略的 ArrayBlockingQueue,它最初包含給定 collection 的元素,並以 collection 迭代器的遍歷順序添加元素。

因爲ArrayBlockingQueue內部只維護一個ReentrantLock類型的lock鎖對象,因此在生成者-消費者模型中,並不能真正的實現並行,這一點不一樣於LinkedBlockingQueue,LinkedBlockingQueue內部維護了兩個鎖。事實上ArrayBlockingQueue徹底能夠採用分離鎖,從而實現生產者和消費者操做的徹底並行運行。Doug Lea之因此沒這樣去作,也許是由於ArrayBlockingQueue的數據寫入和獲取操做已經足夠輕巧,以致於引入獨立的鎖機制,除了給代碼帶來額外的複雜性外,其在性能上徹底佔不到任何便宜。安全

ArrayBlockingQueue和LinkedBlockingQueue間還有一個明顯的不一樣之處在於,前者在插入或刪除元素時不會產生或銷燬任何額外的對象實例,然後者則會生成一個額外的Node對象。這在長時間內須要高效併發地處理大批量數據的系統中,其對於GC的影響仍是存在必定的區別。而在建立ArrayBlockingQueue時,咱們還能夠控制對象的內部鎖是否採用公平鎖,默認採用非公平鎖。數據結構

LinkedBlockingQueue多線程

LinkedBlockingQueue是一個單向鏈表實現的阻塞隊列,支持真正的並行操做,由於內部使用ReentrantLock實現插入鎖(putLock)和取出鎖(takeLock),維護了兩個所對象。其內部也維持着一個數據緩衝隊列(該隊列由一個鏈表構成),當生產者往隊列中放入一個數據時,隊列會從生產者手中獲取數據,並緩存在隊列內部,而生產者當即返回;只有當隊列緩衝區達到最大值緩存容量時(LinkedBlockingQueue能夠經過構造函數指定該值),纔會阻塞生產者隊列,直到消費者從隊列中消費掉一份數據,生產者線程會被喚醒,反之對於消費者這端的處理也基於一樣的原理。而LinkedBlockingQueue之因此可以高效的處理併發數據,還由於其對於生產者端和消費者端分別採用了獨立的鎖來控制數據同步,這也意味着在高併發的狀況下生產者和消費者能夠並行地操做隊列中的數據,以此來提升整個隊列的併發性能。併發

在開發中新建LinkedBlockingQueue實例的時候,通常要指定其大小,若是沒有指定大小,大小默認是Integer.MAX_VALUE,這樣的話,若是生產者的速度一旦大於消費者的速度,也許尚未等到隊列滿阻塞產生,系統內存就有可能已被消耗殆盡了。在線程池框架Executors中newSingleThreadExecutor和newFixedThreadPool方法內部維護的都是LinkedBlockingQueue。

PriorityBlockingQueue

PriorityBlockingQueue是一個按照優先級排序的隊列,若是想要某個隊列不是按照FIFO的順序來處理元素,該隊列很是有用,內部維護一個堆的數據結構。PriorityBlockingQueue既能夠根據元素的天然順序進行排序,若是元素實現了Comparable接口,也能夠根據Comparator進行比較。該隊列看似有界隊列,實際上它會自動擴容,所以是無界隊列,所以在生產者-消費者模型中,生產者並不會真正的阻塞,而只會在沒有可消費的數據時,阻塞數據的消費者。所以使用的時候要特別注意,生產者生產數據的速度絕對不能快於消費者消費數據的速度,不然時間一長,會最終耗盡全部的可用堆內存空間。在實現PriorityBlockingQueue時,內部控制線程同步的鎖採用的是非公平鎖。

DelayQueue

DelayQueue是一個無界阻塞隊列,只有在延遲期滿時才能從中提取元素。該隊列的頭部 是延遲期滿後保存時間最長的 Delayed 元素。若是延遲都尚未期滿,則隊列沒有頭部,而且 poll 將返回null。當一個元素的 getDelay(TimeUnit.NANOSECONDS) 方法返回一個小於等於 0 的值時,將發生到期。即便沒法使用 take 或 poll移除未到期的元素,也不會將這些元素做爲正常元素對待。例如,size 方法同時返回到期和未到期元素的計數。此隊列不容許使用 null 元素。

DelayQueue使用場景較少,但都至關巧妙,常見的例子好比使用一個DelayQueue來管理一個超時未響應的鏈接隊列。

SynchronousQueue

SynchronousQueue是這樣一種阻塞隊列,其中每一個 put 必須等待一個take,反之亦然。同步隊列沒有任何內部容量,甚至連一個隊列的容量都沒有,它不會爲隊列中的元素維護存儲空間。與其它隊列不一樣的是,它維護一組線程,這些線程在等待着元素加入或者移除隊列。不能在同步隊列上進行peek,由於僅在試圖要取得元素時,該元素才存在;除非另外一個線程試圖移除某個元素,不然也不能(使用任何方法)添加元素;也不能迭代隊列,由於其中沒有元素可用於迭代。隊列的頭是嘗試添加到隊列中的首個已排隊線程元素; 若是沒有已排隊線程,則不添加元素而且頭爲null。SynchronousQueue相似於無中介的直接交易,有點像原始社會中的生產者和消費者,生產者拿着產品去集市銷售給產品的最終消費者,而消費者必須親自去集市找到所要商品的直接生產者,若是一方沒有找到合適的目標,那麼對不起,你們都在集市等待。

SynchronousQueue的一個使用場景是在線程池裏。Executors.newCachedThreadPool()就使用了SynchronousQueue,這個線程池根據須要(新任務到來時)建立新的線程,若是有空閒線程則會重複使用,線程空閒了60秒後會被回收。

建立SynchronousQueue有兩種構造方法,一種時SynchronousQueue(),默認採用非公平的形式,從JDK1.6開始SynchronousQueue的實現採用了一種性能更好的無鎖算法。競爭機制支持公平和非公平兩種:非公平競爭模式使用的數據結構是後進先出棧(LIFO Stack);公平競爭模式則使用先進先出隊列(FIFO),性能上二者是至關的,通常狀況下,FIFO一般能夠支持更大的吞吐量,但LIFO能夠更大程度的保持線程的本地化。另一種SynchronousQueue(boolean fair),能夠本身指定訪問方式是否採用公平方式。

LinkedTransferQueue

LinkedTransferQueue是JDK1.7中新引入的隊列,該隊列的實現基於CAS無鎖機制,它也是一個基於鏈表實現的無界隊列。相比前面隊列它多transfer和tryTransfer方法。

LinkedBlockingDeque

LinkedBlockingDeque一個基於已連接節點的、任選範圍的阻塞雙端隊列。可選的容量範圍構造方法參數是一種防止過分膨脹的方式。若是未指定容量,那麼容量將等於 Integer.MAX_VALUE。只要插入元素不會使雙端隊列超出容量,每次插入後都將動態地建立連接節點。要想支持阻塞功能,隊列的容量必定是固定的,不然沒法在入隊的時候掛起線程。也就是capacity是final類型的。

阻塞隊列示例

這是一個使用LinkedBlockedQueue設計實現的簡單的生產者-消費者模式。

public class Producer implements Runnable {
 
 private volatile boolean isRunning = true;
 private BlockingQueue<String> queue;
 private static final int DEFAULT_SLEEP = 1000;
 private static AtomicInteger count = new AtomicInteger();
 
 public Producer(BlockingQueue<String> queue) {
 this.queue = queue;
 }
 
 @Override
 public void run() {
 String data = null;
 Random r = new Random();
 
 System.out.println("啓動生產者線程!");
 try {
 while (isRunning) {
 Thread.sleep(r.nextInt(DEFAULT_SLEEP));
 data = "data:" + count.incrementAndGet();
 queue.put(data);
 System.out.println("將數據:" + data + "放入隊列...");
 }
 } catch (InterruptedException e) {
 e.printStackTrace();
 Thread.currentThread().interrupt();
 } finally {
 System.out.println("退出生產者線程!");
 }
 }
 
 public void stop() {
 isRunning = false;
 }
 
}
public class Consumer implements Runnable {
 
 private volatile boolean isRunning = true;
 private BlockingQueue<String> queue;
 private static final int DEFAULT_SLEEP = 1000;
 
 public Consumer(BlockingQueue<String> queue) {
 this.queue = queue;
 }
 
 @Override
 public void run() {
 System.out.println("啓動消費者線程!");
 Random r = new Random();
 try {
 while (isRunning) {
 String data = queue.take();
 if (null != data) {
 System.out.println("正在消費:" + data);
 Thread.sleep(r.nextInt(DEFAULT_SLEEP));
 }
 }
 } catch (InterruptedException e) {
 e.printStackTrace();
 Thread.currentThread().interrupt();
 } finally {
 System.out.println("退出消費者線程!");
 }
 }
 
 public void stop() {
 isRunning = false;
 }
 
}
public class MainTest {
 
 public static void main(String[] args) throws InterruptedException {
 // 聲明一個容量爲10的緩存隊列
 BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10);
 
 Producer producer1 = new Producer(queue);
 Producer producer2 = new Producer(queue);
 Producer producer3 = new Producer(queue);
 Consumer consumer = new Consumer(queue);
 
 // 藉助Executors
 ExecutorService service = Executors.newCachedThreadPool();
 // 啓動線程
 service.execute(producer1);
 service.execute(producer2);
 service.execute(producer3);
 service.execute(consumer);
 
 // 執行20s
 Thread.sleep(20 * 1000);
 producer1.stop();
 producer2.stop();
 producer3.stop();
 
 Thread.sleep(2 * 1000);
 consumer.stop();
 // 退出Executor
 service.shutdown();
 }
 
}

若是不使用阻塞隊列,使用Object.wait()和Object.notify()非阻塞隊列實現生產者-消費者模式,生產者線程在緩衝區爲滿的時候,消費者在緩衝區爲空的時候,都應該暫停運行。而後用notify 和notifyAll通知等待中的線程從新開始執行。

參考資料

Java併發編程實踐

Java中的阻塞隊列(BlockingQueue)

Java多線程-工具篇-BlockingQueue

聊聊併發(七)——Java中的阻塞隊列

阻塞隊列

JDK7以前已有的隊列實現

ArrayBlockingQueue

相關文章
相關標籤/搜索