『併發包入坑指北』之阻塞隊列

前言

較長一段時間以來我都發現很多開發者對 jdk 中的 J.U.C(java.util.concurrent)也就是 Java 併發包的使用甚少,更別談對它的理解了;但這卻也是咱們進階的必備關卡。java

以前或多或少也分享過相關內容,但都不成體系;因而便想整理一套與併發包相關的系列文章。git

其中的內容主要包含如下幾個部分:github

  • 根據定義本身實現一個併發工具。
  • JDK 的標準實現。
  • 實踐案例。

基於這三點我相信你們對這部份內容不至於一問三不知。數據庫

既然開了一個新坑,就不想作的太差;因此我打算將這個列表下的大部分類都講到。api

因此本次重點討論 ArrayBlockingQueue數組

本身實現

在本身實現以前先搞清楚阻塞隊列的幾個特色:安全

  • 基本隊列特性:先進先出。
  • 寫入隊列空間不可用時會阻塞。
  • 獲取隊列數據時當隊列爲空時將阻塞。

實現隊列的方式多種,總的來講就是數組和鏈表;其實咱們只須要搞清楚其中一個便可,不一樣的特性主要表現爲數組和鏈表的區別。網絡

這裏的 ArrayBlockingQueue 看名字很明顯是由數組實現。多線程

咱們先根據它這三個特性嘗試本身實現試試。併發

初始化隊列

我這裏自定義了一個類:ArrayQueue,它的構造函數以下:

public ArrayQueue(int size) {
        items = new Object[size];
    }

很明顯這裏的 items 就是存放數據的數組;在初始化時須要根據大小建立數組。

寫入隊列

寫入隊列比較簡單,只須要依次把數據存放到這個數組中便可,以下圖:

但仍是有幾個須要注意的點:

  • 隊列滿的時候,寫入的線程須要被阻塞。
  • 寫入過隊列的數量大於隊列大小時須要從第一個下標開始寫。

先看第一個隊列滿的時候,寫入的線程須要被阻塞,先來考慮下如何才能使一個線程被阻塞,看起來的表象線程卡住啥事也作不了。

有幾種方案能夠實現這個效果:

  • Thread.sleep(timeout)線程休眠。
  • object.wait() 讓線程進入 waiting 狀態。

固然還有一些 join、LockSupport.part 等不在本次的討論範圍。

阻塞隊列還有一個很是重要的特性是:當隊列空間可用時(取出隊列),寫入線程須要被喚醒讓數據能夠寫入進去。

因此很明顯Thread.sleep(timeout)不合適,它在到達超時時間以後便會繼續運行;達不到空間可用時才喚醒繼續運行這個特色。

其實這樣的一個特色很容易讓咱們想到 Java 的等待通知機制來實現線程間通訊;更多線程見通訊的方案能夠參考這裏:深刻理解線程通訊

因此我這裏的作法是,一旦隊列滿時就將寫入線程調用 object.wait() 進入 waiting 狀態,直到空間可用時再進行喚醒。

/**
     * 隊列滿時的阻塞鎖
     */
    private Object full = new Object();

    /**
     * 隊列空時的阻塞鎖
     */
    private Object empty = new Object();

因此這裏聲明瞭兩個對象用於隊列滿、空狀況下的互相通知做用。

在寫入數據成功後須要使用 empty.notify(),這樣的目的是當獲取隊列爲空時,一旦寫入數據成功就能夠把消費隊列的線程喚醒。

這裏的 wait 和 notify 操做都須要對各自的對象使用 synchronized 方法塊,這是由於 wait 和 notify 都須要獲取到各自的鎖。

消費隊列

上文也提到了:當隊列爲空時,獲取隊列的線程須要被阻塞,直到隊列中有數據時才被喚醒。

代碼和寫入的很是相似,也很好理解;只是這裏的等待、喚醒剛好是相反的,經過下面這張圖能夠很好理解:

總的來講就是:

  • 寫入隊列滿時會阻塞直到獲取線程消費了隊列數據後喚醒寫入線程
  • 消費隊列空時會阻塞直到寫入線程寫入了隊列數據後喚醒消費線程

測試

先來一個基本的測試:單線程的寫入和消費。

3
123
1234
12345

經過結果來看沒什麼問題。


當寫入的數據超過隊列的大小時,就只能消費以後才能接着寫入。

2019-04-09 16:24:41.040 [Thread-0] INFO  c.c.concurrent.ArrayQueueTest - [Thread-0]123
2019-04-09 16:24:41.040 [main] INFO  c.c.concurrent.ArrayQueueTest - size=3
2019-04-09 16:24:41.047 [main] INFO  c.c.concurrent.ArrayQueueTest - 1234
2019-04-09 16:24:41.048 [main] INFO  c.c.concurrent.ArrayQueueTest - 12345
2019-04-09 16:24:41.048 [main] INFO  c.c.concurrent.ArrayQueueTest - 123456

從運行結果也能看出只有當消費數據後才能接着往隊列裏寫入數據。


而當沒有消費時,再往隊列裏寫數據則會致使寫入線程被阻塞。

併發測試

三個線程併發寫入300條數據,其中一個線程消費一條。

=====0
299

最終的隊列大小爲 299,可見線程也是安全的。

因爲不論是寫入仍是獲取方法裏的操做都須要獲取鎖才能操做,因此整個隊列是線程安全的。

ArrayBlockingQueue

下面來看看 JDK 標準的 ArrayBlockingQueue 的實現,有了上面的基礎會更好理解。

初始化隊列

看似要複雜些,但其實逐步拆分後也很好理解:

第一步其實和咱們本身寫的同樣,初始化一個隊列大小的數組。

第二步初始化了一個重入鎖,這裏其實就和咱們以前使用的 synchronized 做用一致的;

只是這裏在初始化重入鎖的時候默認是非公平鎖,固然也能夠指定爲 true 使用公平鎖;這樣就會按照隊列的順序進行寫入和消費。

更多關於 ReentrantLock 的使用和原理請參考這裏:ReentrantLock 實現原理

三四兩步則是建立了 notEmpty notFull 這兩個條件,他的做用於用法和以前使用的 object.wait/notify 相似。

這就是整個初始化的內容,其實和咱們本身實現的很是相似。

寫入隊列


其實會發現阻塞寫入的原理都是差很少的,只是這裏使用的是 Lock 來顯式獲取和釋放鎖。

同時其中的 notFull.await();notEmpty.signal(); 和咱們以前使用的 object.wait/notify 的用法和做用也是同樣的。

固然它仍是實現了超時阻塞的 API

也是比較簡單,使用了一個具備超時時間的等待方法。

消費隊列

再看消費隊列:


也是差很少的,一看就懂。

而其中的超時 API 也是使用了 notEmpty.awaitNanos(nanos) 來實現超時返回的,就不具體說了。

實際案例

說了這麼多,來看一個隊列的實際案例吧。

背景是這樣的:

有一個定時任務會按照必定的間隔時間從數據庫中讀取一批數據,須要對這些數據作校驗同時調用一個遠程接口。

簡單的作法就是由這個定時任務的線程去完成讀取數據、消息校驗、調用接口等整個全流程;但這樣會有一個問題:

假設調用外部接口出現了異常、網絡不穩致使耗時增長就會形成整個任務的效率下降,由於他都是串行會互相影響。

因此咱們改進了方案:

其實就是一個典型的生產者消費者模型:

  • 生產線程從數據庫中讀取消息丟到隊列裏。
  • 消費線程從隊列裏獲取數據作業務邏輯。

這樣兩個線程就能夠經過這個隊列來進行解耦,互相不影響,同時這個隊列也能起到緩衝的做用。

但在使用過程當中也有一些小細節值得注意。

由於這個外部接口是支持批量執行的,因此在消費線程取出數據後會在內存中作一個累加,一旦達到閾值或者是累計了一個時間段便將這批累計的數據處理掉。

但因爲開發者的大意,在消費的時候使用的是 queue.take() 這個阻塞的 API;正常運行沒啥問題。

可一旦原始的數據源,也就是 DB 中沒數據了,致使隊列裏的數據也被消費完後這個消費線程便會被阻塞。

這樣上一輪積累在內存中的數據便一直沒機會使用,直到數據源又有數據了,一旦中間間隔較長時即可能會致使嚴重的業務異常。

因此咱們最好是使用 queue.poll(timeout) 這樣帶超時時間的 api,除非業務上有明確的要求須要阻塞。

這個習慣一樣適用於其餘場景,好比調用 http、rpc 接口等都須要設置合理的超時時間。

總結

關於 ArrayBlockingQueue 的相關分享便到此結束,接着會繼續更新其餘併發容器及併發工具。

對本文有任何相關問題均可以留言討論。

本文涉及到的全部源碼:

https://github.com/crossoverJie/JCSprout/blob/master/src/main/java/com/crossoverjie/concurrent/ArrayQueue.java

你的點贊與分享是對我最大的支持

相關文章
相關標籤/搜索