Java多線程-BlockingQueue-ArrayBlockingQueue-LinkedBlockingQueue

前言:html

  BlockingQueue很好的解決了多線程中,如何高效安全「傳輸」數據的問題。經過這些高效而且線程安全的隊列類,爲咱們快速搭建高質量的多線程程序帶來極大的便利。本文詳細介紹了BlockingQueue家庭中的兩個重要成員,包括他們各自的功能以及常見使用場景。

認識BlockingQueuejava

  阻塞隊列,顧名思義,首先它是一個隊列,而一個隊列在數據結構中所起的做用大體以下圖所示:程序員

  從圖中咱們能夠很清楚看到,經過一個共享的隊列,可使得數據由隊列的一端輸入,從另一端輸出;經常使用的隊列主要有如下兩種:(固然經過不一樣的實現方式,還能夠延伸出不少不一樣類型的隊列,DelayQueue就是其中的一種)數組

  先進先出(FIFO):先插入的隊列的元素也最早出隊列,相似於排隊的功能。從某種程度上來講這種隊列也體現了一種公平性。緩存

  後進先出(LIFO):後插入隊列的元素最早出隊列,這種隊列優先處理最近發生的事件。

  多線程環境中,經過隊列能夠很容易實現數據共享,好比經典的「生產者」和「消費者」模型中,經過隊列能夠很便利地實現二者之間的數據共享。假設咱們有若干生產者線程,另外又有若干個消費者線程。若是生產者線程須要把準備好的數據共享給消費者線程,利用隊列的方式來傳遞數據,就能夠很方便地解決他們之間的數據共享問題。但若是生產者和消費者在某個時間段內,萬一發生數據處理速度不匹配的狀況呢?理想狀況下,若是生產者產出數據的速度大於消費者消費的速度,而且當生產出來的數據累積到必定程度的時候,那麼生產者必須暫停等待一下(阻塞生產者線程),以便等待消費者線程把累積的數據處理完畢,反之亦然。安全

  然而,在concurrent包發佈之前,在多線程環境下,咱們每一個程序員都必須去本身控制這些細節,尤爲還要兼顧效率和線程安全,而這會給咱們的程序帶來不小的複雜度。好在此時,強大的concurrent包橫空出世了,而他也給咱們帶來了強大的BlockingQueue。(在多線程領域:所謂阻塞,在某些狀況下會掛起線程,一旦條件知足,被掛起的線程又會自動被喚醒)數據結構

  下面兩幅圖演示了BlockingQueue的兩個常見阻塞場景:多線程

如圖所示:當隊列中沒有數據的狀況下,消費者端的全部線程都會被自動阻塞(掛起),直到有數據放入隊列。併發

如圖所示:當隊列中填滿數據的狀況下,生產者端的全部線程都會被自動阻塞(掛起),直到隊列中有空的位置,線程被自動喚醒。dom

  這也是咱們在多線程環境下,爲何須要BlockingQueue的緣由。做爲BlockingQueue的使用者,咱們不再須要關心何時須要阻塞線程,何時須要喚醒線程,由於這一切BlockingQueue都給你一手包辦了。既然BlockingQueue如此神通廣大,讓咱們一塊兒來見識下它的經常使用方法:

BlockingQueue的核心方法:

放入數據:

  offer(anObject):表示若是可能的話,將anObject加到BlockingQueue裏,即若是BlockingQueue能夠容納,則返回true,不然返回false.(本方法不阻塞當前執行方法的線程)

  offer(E o, long timeout, TimeUnit unit),能夠設定等待的時間,若是在指定的時間內,還不能往隊列中加入BlockingQueue,則返回失敗。

  put(anObject):把anObject加到BlockingQueue裏,若是BlockQueue沒有空間,則調用此方法的線程被阻斷直到BlockingQueue裏面有空間再繼續.

獲取數據:

  poll(time):取走BlockingQueue裏排在首位的對象,若不能當即取出,則能夠等time參數規定的時間,取不到時返回null;

  poll(long timeout, TimeUnit unit):從BlockingQueue取出一個隊首的對象,若是在指定時間內,隊列一旦有數據可取,則當即返回隊列中的數據。不然知道時間超時尚未數據可取,返回失敗。

  take():取走BlockingQueue裏排在首位的對象,若BlockingQueue爲空,阻斷進入等待狀態直到BlockingQueue有新的數據被加入;

  drainTo():一次性從BlockingQueue獲取全部可用的數據對象(還能夠指定獲取數據的個數),經過該方法,能夠提高獲取數據效率;不須要屢次分批加鎖或釋放鎖。

常見BlockingQueue

在瞭解了BlockingQueue的基本功能後,讓咱們來看看BlockingQueue家庭大體有哪些成員?

1. ArrayBlockingQueue

  基於數組的阻塞隊列實現,在ArrayBlockingQueue內部,維護了一個定長數組,以便緩存隊列中的數據對象,這是一個經常使用的阻塞隊列,除了一個定長數組外,ArrayBlockingQueue內部還保存着兩個整形變量,分別標識着隊列的頭部和尾部在數組中的位置。

  ArrayBlockingQueue在生產者放入數據和消費者獲取數據,都是共用同一個鎖對象,由此也意味着二者沒法真正並行運行,這點尤爲不一樣於LinkedBlockingQueue;

  按照實現原理來分析,ArrayBlockingQueue徹底能夠採用分離鎖,從而實現生產者和消費者操做的徹底並行運行。之因此沒這樣去作,猜想是由於ArrayBlockingQueue的數據寫入和獲取操做已經足夠輕巧,以致於引入獨立的鎖機制,除了給代碼帶來額外的複雜性外,其在性能上徹底佔不到任何便宜。 ArrayBlockingQueue和LinkedBlockingQueue間還有一個明顯的不一樣之處在於,前者在插入或刪除元素時不會產生或銷燬任何額外的對象實例,然後者則會生成一個額外的Node對象。這在長時間內須要高效併發地處理大批量數據的系統中,其對於GC的影響仍是存在必定的區別。而在建立ArrayBlockingQueue時,咱們還能夠控制對象的內部鎖是否採用公平鎖,默認採用非公平鎖。

2. LinkedBlockingQueue

  基於鏈表的阻塞隊列,同ArrayListBlockingQueue相似,其內部也維持着一個數據緩衝隊列(該隊列由一個鏈表構成),當生產者往隊列中放入一個數據時,隊列會從生產者手中獲取數據,並緩存在隊列內部,而生產者當即返回;只有當隊列緩衝區達到最大值緩存容量時(LinkedBlockingQueue能夠經過構造函數指定該值),纔會阻塞生產者隊列,直到消費者從隊列中消費掉一份數據,生產者線程會被喚醒,反之對於消費者這端的處理也基於一樣的原理。而LinkedBlockingQueue之因此可以高效的處理併發數據,還由於其對於生產者端和消費者端分別採用了獨立的鎖來控制數據同步,這也意味着在高併發的狀況下生產者和消費者能夠並行地操做隊列中的數據,以此來提升整個隊列的併發性能。

  做爲開發者,咱們須要注意的是,若是構造一個LinkedBlockingQueue對象,而沒有指定其容量大小,LinkedBlockingQueue會默認一個相似無限大小的容量(Integer.MAX_VALUE),這樣的話,若是生產者的速度一旦大於消費者的速度,也許尚未等到隊列滿阻塞產生,系統內存就有可能已被消耗殆盡了。

  ArrayBlockingQueue和LinkedBlockingQueue是兩個最普通也是最經常使用的阻塞隊列,通常狀況下,在處理多線程間的生產者消費者問題,使用這兩個類足以。

BlockingQueueTest.java

複製代碼
import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; public class BlockingQueueTest { public static void main(String[] args) throws InterruptedException { // 聲明一個容量爲10的緩存隊列 BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10); Producer producer1 = new Producer(queue); Producer producer2 = new Producer(queue); Producer producer3 = new Producer(queue); Consumer consumer = new Consumer(queue); // 藉助Executors ExecutorService service = Executors.newCachedThreadPool(); // 啓動線程  service.execute(producer1); service.execute(producer2); service.execute(producer3); service.execute(consumer); // 執行10s Thread.sleep(10 * 1000); producer1.stop(); producer2.stop(); producer3.stop(); Thread.sleep(2000); // 退出Executor  service.shutdown(); } }
複製代碼

  上面程序使用了Executors線程池的用法,詳細內容能夠參考:http://www.cnblogs.com/liqiu/p/3649360.html

Cosumer.java

複製代碼
import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; public class Consumer implements Runnable { public Consumer(BlockingQueue<String> queue) { this.queue = queue; } public void run() { System.out.println("啓動消費者線程!"); Random r = new Random(); boolean isRunning = true; try { while (isRunning) { System.out.println("正從隊列獲取數據..."); String data = queue.poll(2, TimeUnit.SECONDS); if (null != data) { System.out.println("拿到數據:" + data); System.out.println("正在消費數據:" + data); Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP)); } else { // 超過2s還沒數據,認爲全部生產線程都已經退出,自動退出消費線程。 isRunning = false; } } } catch (InterruptedException e) { e.printStackTrace(); Thread.currentThread().interrupt(); } finally { System.out.println("退出消費者線程!"); } } private BlockingQueue<String> queue; private static final int DEFAULT_RANGE_FOR_SLEEP = 1000; }
複製代碼

Producer.java

複製代碼
import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; public class Producer implements Runnable { public Producer(BlockingQueue queue) { this.queue = queue; } public void run() { String data = null; Random r = new Random(); System.out.println("啓動生產者線程!"); try { while (isRunning) { System.out.println("正在生產數據..."); Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP)); data = "data:" + count.incrementAndGet(); System.out.println("將數據:" + data + "放入隊列..."); if (!queue.offer(data, 2, TimeUnit.SECONDS)) { System.out.println("放入數據失敗:" + data); } } } catch (InterruptedException e) { e.printStackTrace(); Thread.currentThread().interrupt(); } finally { System.out.println("退出生產者線程!"); } } public void stop() { isRunning = false; } private volatile boolean isRunning = true; private BlockingQueue queue; private static AtomicInteger count = new AtomicInteger(); private static final int DEFAULT_RANGE_FOR_SLEEP = 1000; }
複製代碼

 執行結果:

複製代碼
。。。。。。。。。。。。。。。
正從隊列獲取數據...
拿到數據:data:36 正在消費數據:data:36 正從隊列獲取數據... 拿到數據:data:37 正在消費數據:data:37 正從隊列獲取數據... 拿到數據:data:38 正在消費數據:data:38 。。。。。。。。。。。。原文:http://www.cnblogs.com/liqiu/p/3630281.html
相關文章
相關標籤/搜索