Java併發之阻塞隊列淺析

背景

由於在工做中常常會用到阻塞隊列,有的時候還要根據業務場景獲取重寫阻塞隊列中的方法,因此學習一下阻塞隊列的實現原理仍是頗有必要的。(PS:不深刻了解的話,很容易使用出錯,形成沒有技術深度的樣子)html

阻塞隊列是什麼?

要想了解阻塞隊列,先了解一下隊列是啥,簡單的說隊列就是一種先進先出的數據結構。(具體的內容去數據結構裏學習一下)因此阻塞隊列就是一種可阻塞的隊列。和普通的隊列的不一樣就體如今 」阻塞「兩個字上。阻塞是啥意思?java

  百度看一下程序員

在軟件工程裏阻塞通常指的是阻塞調用,即調用結果返回以前,當前線程會被掛起。函數只有在獲得結果以後纔會返回。編程

阻塞隊列其實就是普通的隊列根據須要將某些方法改成阻塞調用。因此阻塞隊裏和普通隊裏的不一樣主要體如今兩個方面數組

  • 當隊列是空的時,從隊列中獲取元素的操做將會被阻塞 。直到其餘的線程往空的隊列插入新的元素
  • 當隊列是滿時,往隊列裏添加元素的操做會被阻塞,直到其餘的線程使隊列從新變得空閒起來,如從隊列中移除一個或者多個元素,或者徹底清空隊列

爲何要使用阻塞隊列?

    那麼爲何要使用阻塞隊列?阻塞隊列又能完成什麼特殊的任務嗎?緩存

    阻塞隊列的經典使用 場景就是「生產者」和「消費者」模型,生產者生產數據,放入隊列,而後消費從隊列中獲取數據,這個在通常狀況下天然沒有問題,但若是生產者和消費者在某個時間段內,萬一發生數據處理速度不匹配的狀況呢?安全

        在出現消費者速度遠大於生產者速度,消費者在數據消費至必定程度的狀況下,暫停等待一下(阻塞消費者)來等待生產者,以保證生產者可以生產出新的數據;反之亦然。數據結構

   阻塞隊列在java中的一種典型使用場景是線程池,在線程池中,當提交的任務不能被當即獲得執行的時候,線程池就會將提交的任務放到一個阻塞的任務隊列中來(線程池的具體使用參見以前寫的一篇文章《java併發之線程池的淺析》)多線程

   然而,在阻塞隊列發佈之前,在多線程環境下,咱們每一個程序員都必須去本身控制這些細節,尤爲還要兼顧效率和線程安全,而這會給咱們的程序帶來不小的複雜度。在這裏要感謝一下concurrent包,減輕了咱們不少工做併發

阻塞隊列的成員有哪些

                                   

下面分別簡單介紹一下:

  • ArrayBlockingQueue:是一個用數組實現的有界阻塞隊列,此隊列按照先進先出(FIFO)的原則對元素進行排序。構造時必須傳入的參數是數組大小此外還能夠指定是否公平性。【注:每個線程在獲取鎖的時候可能都會排隊等待,若是在等待時間上,先獲取鎖的線程的請求必定先被知足,那麼這個鎖就是公平的。反之,這個鎖就是不公平的。公平的獲取鎖,也就是當前等待時間最長的線程先獲取鎖】;在插入或刪除元素時不會產生或銷燬任何額外的對象實例

  • LinkedBlockingQueue:一個由鏈表結構組成的有界隊列,照先進先出的順序進行排序 ,未指定長度的話,默認 此隊列的長度爲Integer.MAX_VALUE。。【PS:若是生產者的速度遠遠大於消費者的速度,也許尚未等到隊列滿阻塞產生,系統內存就有可能已經被消耗殆盡了。】PriorityBlockingQueue: 一個支持線程優先級排序的無界隊列,默認天然序進行排序,也能夠自定義實現compareTo()方法來指定元素排序規則,不能保證同優先級元素的順序。
    • LinkedBlockingQueue之因此可以高效的處理併發數據,是由於take()方法和put(E param)方法使用了不一樣的可重入鎖,分別爲private final ReentrantLock putLock和private final ReentrantLock takeLock,這也意味着在高併發的狀況下生產者和消費者能夠並行地操做隊列中的數據,以此來提升整個隊列的併發性能
    • LinkedBlockingQueue在插入元素是會建立一個額外的Node對象,因此它這在長時間內須要高效併發地處理大批量數據的系統中,對於GC的仍是存在必定的影響。
  • DelayQueue: 一個實現PriorityBlockingQueue實現延遲獲取的無界隊列,在建立元素時,能夠指定多久才能從隊列中獲取當前元素。只有延時期滿後才能從隊列中獲取元素。(DelayQueue能夠運用在如下應用場景:1.緩存系統的設計:能夠用DelayQueue保存緩存元素的有效期,使用一個線程循環查詢DelayQueue,一旦能從DelayQueue中獲取元素時,表示緩存有效期到了。2.定時任務調度。使用DelayQueue保存當天將會執行的任務和執行時間,一旦從DelayQueue中獲取到任務就開始執行,從好比TimerQueue就是使用DelayQueue實現的。)
  • SynchronousQueue: 一個不存儲元素的阻塞隊列,每個put操做必須等待take操做,不然不能添加元素。支持公平鎖和非公平鎖。SynchronousQueue的一個使用場景是在線程池裏。Executors.newCachedThreadPool()就使用了SynchronousQueue,這個線程池根據須要(新任務到來時)建立新的線程,若是有空閒線程則會重複使用,線程空閒了60秒後會被回收。
  • LinkedTransferQueue: 一個由鏈表結構組成的無界阻塞隊列,至關於其它隊列,LinkedTransferQueue隊列多了transfer和tryTransfer方法。
  • LinkedBlockingDeque: 一個由鏈表結構組成的雙向阻塞隊列。隊列頭部和尾部均可以添加和移除元素,多線程併發時,能夠將鎖的競爭最多降到一半。

阻塞隊列的核心方法 

阻塞對隊列的核心方法主要是插入操做操做和取出操做,以下

  • Throws Exception 類型的插入和取出在不能當即被執行的時候就會拋出異常。
  • Special Value 類型的插入和取出在不能被當即執行的狀況下會返回一個特殊的值(true 或者 false 或者null)
  • Blocked 類型的插入和取出操做在不能被當即執行的時候會阻塞線程直到能夠操做的時候會被其餘線程喚醒
  • Timed out 類型的插入和取出操做在不能當即執行的時候會被阻塞必定的時候,若是在指定的時間內沒有被執行,那麼會返回一個特殊值

 插入操做

  • boolean offer(E e):將指定元素插入此隊列中(若是當即可行且不會違反容量限制),成功時返回 true,若是當前沒有可用的空間,則返回 false。(本方法不阻塞當前執行方法的線程)。       
  • boolean  offer(E o, long timeout, TimeUnit unit):能夠設定等待的時間,若是在設置的指定的時間內,還不能往隊列中加入BlockingQueue,則返回false。
  • void put(E paramE) throws InterruptedException:將指定元素插入到此隊列中裏,若是隊列沒有空間,則調用此方法的線程被阻斷直到隊列裏裏面有空間再繼續執行插入操做。
  • public boolean add(E e): 將指定元素插入此隊列中(若是當即可行且不會違反容量限制),成功時返回 true,若是當前沒有可用的空間,則拋出 IllegalStateException(其實就是調用了offer方法)
public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
}

 獲取操做

  • poll():取走BlockingQueue裏排在首位的對象,,取不到時返回null;
  • poll(long timeout, TimeUnit unit):在指定時間內從BlockingQueue取出一個隊首的對象,隊列一旦有數據可取,則當即返回隊列中的數據。不然直到時間超時尚未數據可取,返回null。
  • take():取走BlockingQueue裏排在首位的對象,若BlockingQueue爲空,阻斷進入等待狀態直到BlockingQueue有新的數據被加入; 
  • drainTo(Collection<? super E> c, int maxElements):一次性從BlockingQueue獲取全部可用的數據對象,將數據對象加入傳遞的集合中(還能夠經過maxElements指定獲取數據的個數),經過該方法,能夠提高獲取數據效率;不須要屢次分批加鎖或釋放鎖

阻塞隊列的實現原理

    前面介紹了非阻塞隊列和阻塞隊列中經常使用的方法,下面來探討阻塞隊列的實現原理,本文以比較經常使用的ArrayBlockingQueue爲例,其餘阻塞隊列實現原理根據特性會和ArrayBlockingQueue有一些差異,可是大致思路應該相似,有興趣的朋友可自行查看其餘阻塞隊列的實現源碼。

   首先看一下ArrayBlockingQueue的幾個關鍵成員變量

public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
    /** The queued items */
    final Object[] items;

    /** items index for next take, poll, peek or remove */
    int takeIndex;

    /** items index for next put, offer, or add */
    int putIndex;

    /** Number of elements in the queue */
    int count;
    /*
     * Concurrency control uses the classic two-condition algorithm
     * found in any textbook.
     */

    /** Main lock guarding all access */
    final ReentrantLock lock;

    /** Condition for waiting takes */
    private final Condition notEmpty;

    /** Condition for waiting puts */
    private final Condition notFull;
}

 從上邊能夠明顯的看出ArrayBlockingQueue用一個數組來存儲數據,takeIndex和putIndex分別表示隊首元素和隊尾元素的下標,count表示隊列中元素的個數。 lock是一個可重入鎖,notEmpty和notFull是等待條件。

 

 而後看它的一個關鍵方法的實現:put()

public void put(E e) throws InterruptedException {
  checkNotNull(e);
   final ReentrantLock lock = this.lock;
   lock.lockInterruptibly();
    try {
        while (count == items.length)
            notFull.await();
        enqueue(e);
    } finally {
      lock.unlock();
    }
}
  1. 首選檢查元素是否爲空,爲空則拋出異常
  2. 接着實例化可重入鎖
  3. 而後localReentrantLock.lockInterruptibly();這裏特別強調一下 (lockInterruptibly()容許在等待時由其餘線程的Thread.interrupt()方法來中斷等待線程而直接返回,這時是不用獲取鎖的,而會拋出一個InterruptException。  而ReentrantLock.lock()方法則不容許Thread.interrupt()中斷,即便檢測到了Thread.interruptted同樣會繼續嘗試獲取鎖,失敗則繼續休眠。只是在最後獲取鎖成功以後在把當前線程置爲中斷狀態)
  4. 判斷當前元素個數是否等於數組的長度,若是相等,則調用notFull.await()進行等待,即當隊列滿的時候,將會等待
  5. 將元素插入到隊列中
  6. 解鎖(這裏必定要在finally中解鎖啊!!!)

enqueue(E x)將元素插入到數組啊item中

/**
     * Inserts element at current put position, advances, and signals.
     * Call only when holding lock.
     */
    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }

該方法內部經過putIndex索引直接將元素添加到數組items中

    這裏思考一個問題 爲何當putIndex索引大小等於數組長度時,須要將putIndex從新設置爲0?

   這是由於當隊列是先進先出的 因此獲取元素老是從隊列頭部獲取,而添加元素從中從隊列尾部獲取。因此當隊列索引(從0開始)與數組長度相等時,因此下次咱們就須要從數組頭部開始添加了;

最後當插入成功後,經過notEmpty喚醒正在等待取元素的線程

 

阻塞隊列中和put對應的就是take了

下邊是take方法的實現

 public E take() throws InterruptedException {
   final ReentrantLock lock = this.lock;
   lock.lockInterruptibly();
    try {
          while (count == 0)
           notEmpty.await();
            return dequeue();
     finally {
        lock.unlock();
     }
 }

take方法其實很簡單,隊列中有數據就刪除沒有就阻塞,注意這個阻塞是能夠中斷的,若是隊列沒有數據那麼就加入notEmpty條件隊列等待(有數據就直接取走,方法結束),若是有新的put線程添加了數據,那麼put操做將會喚醒take線程; 

能夠看到take的實現跟put方法實現很相似,只不過put方法等待的是notFull信號,而take方法等待的是notEmpty信號。(等的就是上文的put中的信號)當數組的數量爲空時,也就是無任何數據能夠被取出來的時候,notEmpty這個Condition就會進行阻塞,直到被notEmpty喚醒

dequeue的實現以下

private E dequeue() {
        final Object[] items = this.items;
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal();
        return x;
    }

take方法主要是從隊列頭部取元素,能夠看到takeIndex是取元素的時候的偏移值,而put中是putIndex控制添加元素的偏移量,因而可知,put和take操做的偏移量分別是由putIndex和takeIndex控制的。其實仔細觀察put和take的實現思路是有不少類似之處。

  • offer(E o, long timeout, TimeUnit unit)的實現方式其實和put的思想是差很少的區別是 offer在阻塞的時候調用的不是await()方法而是awaitNanos(long nanosTimeout) 帶超時響應的等待(PS:具體區別能夠參考我以前寫的關於鎖的博客《JAVA併發之鎖的使用淺析》)
  • poll(long timeout, TimeUnit unit)的實現也是這樣在take的基礎上加了超時響應。感興趣的朋友能夠自行去看一下

案例分析

模擬食堂的經歷,食堂窗口端出一道菜放在臺面,而後等待顧客消費。寫到代碼裏就是食堂窗口就是一個生產者線程,顧客就是消費者線程,檯面就是阻塞隊列。

public class TestBlockingQueue {
  /**
   * 生產和消費業務操做
   *
   *
   */
  protected class WorkDesk {

    BlockingQueue<String> desk = new LinkedBlockingQueue<String>(8);

    public void work() throws InterruptedException {
      Thread.sleep(1000);
      desk.put("端出一道菜");
    }

    public String eat() throws InterruptedException {
      Thread.sleep(4000);
      return desk.take();
    }

  }

  /**
   * 生產者類
   *
   *
   */
  class Producer implements Runnable {

    private String producerName;
    private WorkDesk workDesk;

    public Producer(String producerName, WorkDesk workDesk) {
      this.producerName = producerName;
      this.workDesk = workDesk;
    }

    @Override
    public void run() {
      try {
        for (;;) {

          workDesk.work();
          System.out.println(producerName + "端出一道菜" +",Data:"+System.currentTimeMillis());

        }
      } catch (Exception e) {
        e.printStackTrace();
      }
    }
  }

  /**
   * 消費者類
   *
   *
   */
  class Consumer implements Runnable {
    private String consumerName;
    private WorkDesk workDesk;

    public Consumer(String consumerName, WorkDesk workDesk) {
      this.consumerName = consumerName;
      this.workDesk = workDesk;
    }

    @Override
    public void run() {
      try {
        for (;;) {
          workDesk.eat();
          System.out.println(consumerName + "端走了一個菜"+",Data:"+System.currentTimeMillis());

        }
      } catch (Exception e) {
        e.printStackTrace();
      }
    }
  }

  public static void main(String args[]) throws InterruptedException {
    TestBlockingQueue testQueue = new TestBlockingQueue();
    WorkDesk workDesk = testQueue.new WorkDesk();

    ExecutorService service = Executors.newFixedThreadPool(6);
    //四個生產者線程
    for (int i=1;i<=4;++i) {
      service.submit(testQueue.new Producer("食堂窗口-"+ i+"-", workDesk));
    }

    //兩個消費者線程
    Consumer consumer1 = testQueue.new Consumer("顧客-1-", workDesk);
    Consumer consumer2 = testQueue.new Consumer("顧客-2-", workDesk);

    service.submit(consumer1);
    service.submit(consumer2);
    service.shutdown();
  }

}

結果部分以下

  

     能夠看到當生產者產生的數據達到阻塞隊列的容量時,生成者線程會阻塞,等待消費者線程進行消費,上述案例中最大容量爲8個盤子,因此當食堂作好了8個菜後了8會等待顧客進行消費,消費後繼續生產。上述案例使用阻塞隊列,看起來代碼要簡單得多,不須要再單獨考慮同步和線程間通訊的問題。

 在併發編程中,通常推薦使用阻塞隊列,這樣實現能夠儘可能地避免程序出現意外的錯誤。

 阻塞隊列使用最經典的場景就是socket客戶端數據的讀取和解析,讀取數據的線程不斷將數據放入隊列,而後解析線程不斷從隊列取數據解析。還有其餘相似的場景,如線程池中就使用了阻塞隊列,其實只要符合生產者-消費者模型的均可以使用阻塞隊列。

 

參考資料:

《Java編程思想》

  https://www.cnblogs.com/dolphin0520/p/3932906.html

  https://www.cnblogs.com/superfj/p/7757876.html

相關文章
相關標籤/搜索