BlockingQueue

  • 前言:html

  • 在新增的Concurrent包中,BlockingQueue很好的解決了多線程中,如何高效安全「傳輸」數據的問題。經過這些高效而且線程安全的隊列類,爲咱們快速搭建高質量的多線程程序帶來極大的便利。本文詳細介紹了BlockingQueue家庭中的全部成員,包括他們各自的功能以及常見使用場景。java

    • 認識BlockingQueue
      阻塞隊列,顧名思義,首先它是一個隊列,而一個隊列在數據結構中所起的做用大體以下圖所示:

      從上圖咱們能夠很清楚看到,經過一個共享的隊列,可使得數據由隊列的一端輸入,從另一端輸出;
      經常使用的隊列主要有如下兩種:(固然經過不一樣的實現方式,還能夠延伸出不少不一樣類型的隊列,DelayQueue就是其中的一種)
        先進先出(FIFO):先插入的隊列的元素也最早出隊列,相似於排隊的功能。從某種程度上來講這種隊列也體現了一種公平性。
        後進先出(LIFO):後插入隊列的元素最早出隊列,這種隊列優先處理最近發生的事件。

           多線程環境中,經過隊列能夠很容易實現數據共享,好比經典的「生產者」和「消費者」模型中,經過隊列能夠很便利地實現二者之間的數據共享。假設咱們有若干生產者線程,另外又有若干個消費者線程。若是生產者線程須要把準備好的數據共享給消費者線程,利用隊列的方式來傳遞數據,就能夠很方便地解決他們之間的數據共享問題。但若是生產者和消費者在某個時間段內,萬一發生數據處理速度不匹配的狀況呢?理想狀況下,若是生產者產出數據的速度大於消費者消費的速度,而且當生產出來的數據累積到必定程度的時候,那麼生產者必須暫停等待一下(阻塞生產者線程),以便等待消費者線程把累積的數據處理完畢,反之亦然。然而,在concurrent包發佈之前,在多線程環境下,咱們每一個程序員都必須去本身控制這些細節,尤爲還要兼顧效率和線程安全,而這會給咱們的程序帶來不小的複雜度。好在此時,強大的concurrent包橫空出世了,而他也給咱們帶來了強大的BlockingQueue。(在多線程領域:所謂阻塞,在某些狀況下會掛起線程(即阻塞),一旦條件知足,被掛起的線程又會自動被喚醒)
      下面兩幅圖演示了BlockingQueue的兩個常見阻塞場景:
             如上圖所示:當隊列中沒有數據的狀況下,消費者端的全部線程都會被自動阻塞(掛起),直到有數據放入隊列。


         如上圖所示:當隊列中填滿數據的狀況下,生產者端的全部線程都會被自動阻塞(掛起),直到隊列中有空的位置,線程被自動喚醒。
          這也是咱們在多線程環境下,爲何須要BlockingQueue的緣由。做爲BlockingQueue的使用者,咱們不再須要關心何時須要阻塞線程,何時須要喚醒線程,由於這一切BlockingQueue都給你一手包辦了。既然BlockingQueue如此神通廣大,讓咱們一塊兒來見識下它的經常使用方法:
      BlockingQueue的核心方法
      放入數據:
        offer(anObject):表示若是可能的話,將anObject加到BlockingQueue裏,即若是BlockingQueue能夠容納,
          則返回true,不然返回false.(本方法不阻塞當前執行方法的線程)
        offer(E o, long timeout, TimeUnit unit),能夠設定等待的時間,若是在指定的時間內,還不能往隊列中
          加入BlockingQueue,則返回失敗。
        put(anObject):把anObject加到BlockingQueue裏,若是BlockQueue沒有空間,則調用此方法的線程被阻斷
          直到BlockingQueue裏面有空間再繼續.
      獲取數據:
        poll(time):取走BlockingQueue裏排在首位的對象,若不能當即取出,則能夠等time參數規定的時間,
          取不到時返回null;
        poll(long timeout, TimeUnit unit):從BlockingQueue取出一個隊首的對象,若是在指定時間內,
          隊列一旦有數據可取,則當即返回隊列中的數據。不然知道時間超時尚未數據可取,返回失敗。
        take():取走BlockingQueue裏排在首位的對象,BlockingQueue爲空,阻斷進入等待狀態直到
          
      BlockingQueue有新的數據被加入;
        drainTo():一次性從BlockingQueue獲取全部可用的數據對象(還能夠指定獲取數據的個數),
          經過該方法,能夠提高獲取數據效率;不須要屢次分批加鎖或釋放鎖。程序員

    • 常見BlockingQueue
      在瞭解了BlockingQueue的基本功能後,讓咱們來看看BlockingQueue家庭大體有哪些成員?
      數組

    • BlockingQueue成員詳細介紹
      1. ArrayBlockingQueue

           基於數組的阻塞隊列實現,在ArrayBlockingQueue內部,維護了一個定長數組,以便緩存隊列中的數據對象,這是一個經常使用的阻塞隊列,除了一個定長數組外,ArrayBlockingQueue內部還保存着兩個整形變量,分別標識着隊列的頭部和尾部在數組中的位置。
        ArrayBlockingQueue在生產者放入數據和消費者獲取數據,都是共用同一個鎖對象,由此也意味着二者沒法真正並行運行,這點尤爲不一樣於LinkedBlockingQueue;按照實現原理來分析,ArrayBlockingQueue徹底能夠採用分離鎖,從而實現生產者和消費者操做的徹底並行運行。Doug Lea之因此沒這樣去作,也許是由於ArrayBlockingQueue的數據寫入和獲取操做已經足夠輕巧,以致於引入獨立的鎖機制,除了給代碼帶來額外的複雜性外,其在性能上徹底佔不到任何便宜。 ArrayBlockingQueue和LinkedBlockingQueue間還有一個明顯的不一樣之處在於,前者在插入或刪除元素時不會產生或銷燬任何額外的對象實例,然後者則會生成一個額外的Node對象。這在長時間內須要高效併發地處理大批量數據的系統中,其對於GC的影響仍是存在必定的區別。而在建立ArrayBlockingQueue時,咱們還能夠控制對象的內部鎖是否採用公平鎖,默認採用非公平鎖。

      2. LinkedBlockingQueue
           基於鏈表的阻塞隊列,同ArrayListBlockingQueue相似,其內部也維持着一個數據緩衝隊列(該隊列由一個鏈表構成),當生產者往隊列中放入一個數據時,隊列會從生產者手中獲取數據,並緩存在隊列內部,而生產者當即返回;只有當隊列緩衝區達到最大值緩存容量時(LinkedBlockingQueue能夠經過構造函數指定該值),纔會阻塞生產者隊列,直到消費者從隊列中消費掉一份數據,生產者線程會被喚醒,反之對於消費者這端的處理也基於一樣的原理。而LinkedBlockingQueue之因此可以高效的處理併發數據,還由於其對於生產者端和消費者端分別採用了獨立的鎖來控制數據同步,這也意味着在高併發的狀況下生產者和消費者能夠並行地操做隊列中的數據,以此來提升整個隊列的併發性能。
      做爲開發者,咱們須要注意的是,若是構造一個LinkedBlockingQueue對象,而沒有指定其容量大小,LinkedBlockingQueue會默認一個相似無限大小的容量(Integer.MAX_VALUE),這樣的話,若是生產者的速度一旦大於消費者的速度,也許尚未等到隊列滿阻塞產生,系統內存就有可能已被消耗殆盡了。

      ArrayBlockingQueue和LinkedBlockingQueue是兩個最普通也是最經常使用的阻塞隊列,通常狀況下,在處理多線程間的生產者消費者問題,使用這兩個類足以。

      下面的代碼演示瞭如何使用BlockingQueue:
      緩存

      ?安全

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      import java.util.concurrent.BlockingQueue;      
      import java.util.concurrent.ExecutorService;      
      import java.util.concurrent.Executors;      
      import java.util.concurrent.LinkedBlockingQueue;      
      /**      
      * @author  jackyuj      
      */      
      public class BlockingQueueTest {      
          public static void main(String[] args) throws InterruptedException {      
              // 聲明一個容量爲10的緩存隊列      
              BlockingQueue<String> queue = new LinkedBlockingQueue<String>( 10 );      
              Producer producer1 = new Producer(queue);      
              Producer producer2 = new Producer(queue);      
              Producer producer3 = new Producer(queue);      
              Consumer consumer = new Consumer(queue);      
              // 藉助Executors      
              ExecutorService service = Executors.newCachedThreadPool();      
              // 啓動線程      
              service.execute(producer1);      
              service.execute(producer2);      
              service.execute(producer3);      
              service.execute(consumer);      
              // 執行10s      
              Thread.sleep( 10 * 1000 );      
              producer1.stop();      
              producer2.stop();      
              producer3.stop();      
              Thread.sleep( 2000 );      
              // 退出Executor      
              service.shutdown();      
          }      
      }      

      ?數據結構

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      68
      69
      70
      71
      72
      73
      74
      75
      76
      77
      78
      79
      80
      81
      82
      83
      84
      85
      86
      87
      88
      89
      90
      91
      92
      93
      94
      import java.util.Random;      
      import java.util.concurrent.BlockingQueue;      
      import java.util.concurrent.TimeUnit;      
      /**      
      * 消費者線程      
      *      
      * @author  jackyuj      
      */      
      public class Consumer implements Runnable {      
          public Consumer(BlockingQueue<String> queue) {      
              this .queue = queue;      
          }      
          public void run() {      
              System.out.println( "啓動消費者線程!" );      
              Random r = new Random();      
              boolean isRunning = true ;      
              try {      
                  while (isRunning) {      
                      System.out.println( "正從隊列獲取數據..." );      
                      String data = queue.poll( 2 , TimeUnit.SECONDS);      
                      if ( null != data) {      
                          System.out.println( "拿到數據:" + data);      
                          System.out.println( "正在消費數據:" + data);      
                          Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));      
                      } else {      
                          // 超過2s還沒數據,認爲全部生產線程都已經退出,自動退出消費線程。      
                          isRunning = false ;      
                      }      
                  }      
              } catch (InterruptedException e) {      
                  e.printStackTrace();      
                  Thread.currentThread().interrupt();      
              } finally {      
                  System.out.println( "退出消費者線程!" );      
              }      
          }      
          private BlockingQueue<String> queue;      
          private static final int       DEFAULT_RANGE_FOR_SLEEP = 1000 ;      
      }      
      import java.util.Random;      
      import java.util.concurrent.BlockingQueue;      
      import java.util.concurrent.TimeUnit;      
      import java.util.concurrent.atomic.AtomicInteger;      
      /**      
      * 生產者線程      
      *      
      * @author  jackyuj      
      */      
      public class Producer implements Runnable {      
          public Producer(BlockingQueue queue) {      
              this .queue = queue;      
          }      
          public void run() {      
              String data = null ;      
              Random r = new Random();      
              System.out.println( "啓動生產者線程!" );      
              try {      
                  while (isRunning) {      
                      System.out.println( "正在生產數據..." );      
                      Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));      
                      data = "data:" + count.incrementAndGet();      
                      System.out.println( "將數據:" + data + "放入隊列..." );      
                      if (!queue.offer(data, 2 , TimeUnit.SECONDS)) {      
                          System.out.println( "放入數據失敗:" + data);      
                      }      
                  }      
              } catch (InterruptedException e) {      
                  e.printStackTrace();      
                  Thread.currentThread().interrupt();      
              } finally {      
                  System.out.println( "退出生產者線程!" );      
              }      
          }      
          public void stop() {      
              isRunning = false ;      
          }      
          private volatile boolean       isRunning               = true ;      
          private BlockingQueue queue;      
          private static AtomicInteger  count                   = new AtomicInteger();      
          private static final int       DEFAULT_RANGE_FOR_SLEEP = 1000 ;      
      }      
    • 3. DelayQueue
           DelayQueue中的元素只有當其指定的延遲時間到了,纔可以從隊列中獲取到該元素。DelayQueue是一個沒有大小限制的隊列,所以往隊列中插入數據的操做(生產者)永遠不會被阻塞,而只有獲取數據的操做(消費者)纔會被阻塞。
      使用場景:
        DelayQueue使用場景較少,但都至關巧妙,常見的例子好比使用一個DelayQueue來管理一個超時未響應的鏈接隊列。

      4. PriorityBlockingQueue
           基於優先級的阻塞隊列(優先級的判斷經過構造函數傳入的Compator對象來決定),但須要注意的是PriorityBlockingQueue並不會阻塞數據生產者,而只會在沒有可消費的數據時,阻塞數據的消費者。所以使用的時候要特別注意,生產者生產數據的速度絕對不能快於消費者消費數據的速度,不然時間一長,會最終耗盡全部的可用堆內存空間。在實現PriorityBlockingQueue時,內部控制線程同步的鎖採用的是公平鎖。

      5. SynchronousQueue
           一種無緩衝的等待隊列,相似於無中介的直接交易,有點像原始社會中的生產者和消費者,生產者拿着產品去集市銷售給產品的最終消費者,而消費者必須親自去集市找到所要商品的直接生產者,若是一方沒有找到合適的目標,那麼對不起,你們都在集市等待。相對於有緩衝的BlockingQueue來講,少了一箇中間經銷商的環節(緩衝區),若是有經銷商,生產者直接把產品批發給經銷商,而無需在乎經銷商最終會將這些產品賣給那些消費者,因爲經銷商能夠庫存一部分商品,所以相對於直接交易模式,整體來講採用中間經銷商的模式會吞吐量高一些(能夠批量買賣);但另外一方面,又由於經銷商的引入,使得產品從生產者到消費者中間增長了額外的交易環節,單個產品的及時響應性能可能會下降。
        聲明一個SynchronousQueue有兩種不一樣的方式,它們之間有着不太同樣的行爲。公平模式和非公平模式的區別:
        若是採用公平模式:SynchronousQueue會採用公平鎖,並配合一個FIFO隊列來阻塞多餘的生產者和消費者,從而體系總體的公平策略;
        但若是是非公平模式(SynchronousQueue默認):SynchronousQueue採用非公平鎖,同時配合一個LIFO隊列來管理多餘的生產者和消費者,然後一種模式,若是生產者和消費者的處理速度有差距,則很容易出現飢渴的狀況,便可能有某些生產者或者是消費者的數據永遠都得不處處理。多線程

    • 小結
        BlockingQueue不光實現了一個完整隊列所具備的基本功能,同時在多線程環境下,他還自動管理了多線間的自動等待於喚醒功能,從而使得程序員能夠忽略這些細節,關注更高級的功能。併發

相關文章
相關標籤/搜索