當咱們有多個消息的生產者線程,一個消費者線程時,他們之間如何進行高併發、線程安全的協調?git
很簡單,用一個隊列。github
當咱們有多個消息的生產者線程,多個消費者線程,而且每一條消息須要被全部的消費者都消費一次(這就不是通常隊列,只消費一次的語義了),該怎麼作?數組
這時仍然須要一個隊列。可是:安全
1. 每一個消費者須要本身維護一個指針,知道本身消費了隊列中多少數據。這樣同一條消息,能夠被多我的獨立消費。併發
2. 隊列須要一個全局指針,指向最後一條被全部生產者加入的消息。消費者在消費數據時,不能消費到這個全局指針以後的位置——由於這個全局指針,已是表明隊列中最後一條能夠被消費的消息了。app
3. 須要協調全部消費者,在消費完全部隊列中的消息後,阻塞等待。框架
4. 若是消費者之間有依賴關係,即對同一條消息的消費順序,在業務上有固定的要求,那麼還須要處理誰先消費,誰後消費同一條消息的問題。高併發
總而言之,若是有多個生產者,多個消費者,而且同一條消息要給到全部的消費者都去處理一下,須要作到以上4點。這是不容易的。性能
LMAX Disruptor,正是這種場景下,知足以上4點要求的單機跨線程消息傳遞、分發的開源、高性能實現。ui
這裏有一篇英文的Disruptor介紹好文:https://github.com/LMAX-Exchange/disruptor/wiki/Introduction
1. RingBuffer
應用須要傳遞的消息在Disrutpor中稱爲Event(事件)。
RingBuffer是Event的數組,實現了阻塞隊列的語義:
若是RingBuffer滿了,則生產者會阻塞等待。
若是RingBuffer空了,則消費者會阻塞等待。
2. Sequence
在上文中,我提到「每一個消費者須要本身維護一個指針」。這裏的指針就是一個單調遞增加整數(及其基於CAS的加法、獲取操做),稱爲Sequence。
除了每一個消費者須要維護一個指針外,RingBuffer自身也要維護一個全局指針(如上一節第2點所提到的),記錄最後一條能夠被消費的消息。這個全局指針就在下圖紅框中。
生產者往RingBuffer中發送一條消息(RingBuffer.publish())時:
1. 生產者的私有sequence會+1
2. 檢查生產者的私有sequence與RingBuffer中Event個數的關係。若是發現Event數組滿了(下圖紅框中的判斷),則阻塞(下圖綠框中的等待)。
3. RingBuffer會在Event數組中(sequencer+1) % BUFFER_SIZE的地方,放入Event。這裏的取模操做,就體現了Event數組用到最後,則回到頭部繼續放,所謂「Ring「 Buffer的輪循複用語義。
消費者從RingBuffer循環隊列中獲取一條消息時:
1. 從消費者私有Sequence,能夠知道它本身消費到了RingBuffer隊列中的哪一條消息。
2. 從RingBuffer的全局指針Sequence,能夠知道RingBuffer中最後一條沒有被消費的消息在什麼位置。
3. N = (RuingBuffer的全局指針Sequence - 消費者私有Sequence),就是當前消費者,還能夠消費多少Event。
4. 若是以上差值N爲0,說明當前消費者已經消費過RingBuffer中的全部消息了。那麼當前消費者會阻塞。等待生產者加入更多的消息:
以上代碼中,紅框中的availableSequence就是RingBuffer的全局指針Sequence。綠框中的sequence是當前消費者的私有sequence。
若是這個判斷爲true,說明RingBuffer中最新一條能夠被消費的Event,已經被當前消費者消費過了。那麼就會調用apployWaitMethod()阻塞,等待生產者產生更多的Event。
5. 若是RingBuffer中,還有能夠被當前消費者消費的Event,即N > 0,
那麼消費者,會一口氣獲取全部能夠被消費的N個Event。即下圖中的while循環,直到N個Event都被消費才退出。這種一口氣消費盡可能多的Event,是高性能的體現。
從RingBuffer中每獲取一個Event,都會回調綠框中的eventHandler——這是應用註冊的Event處理方法,執行應用的Event消費業務邏輯。
最後,上圖中的sequence.set(availableSequence),會把當前消費者的私有Sequence更新到RingBuffer的全局Sequence。表示RingBuffer中全部的Event都已經消費掉了。
無鎖就沒有鎖競爭。當生產者、消費者線程數很高時,意義重大。因此,
往大里說,每一個消費者維護本身的Sequence,基本沒有跨線程共享的狀態。
往小裏說,Sequence的加法是CAS實現的。
JVM運行時,一怕建立大對象,二怕建立不少小對象。這都會致使JVM堆碎片化、對象元數據存儲的額外開銷大。這是高性能Java應用的噩夢。
爲了解決第二點「不少小對象」,主流開源框架都會本身維護、複用對象池。LMAX Disruptor也不例外。
生產者不是建立新的Event對象,放入到RingBuffer中。而是從RingBuffer中取出一個已有的Event對象,更新它所指向的業務數據,來表明一個邏輯上的新Event。
因此LMAX Disruptor的生產者API,用起來有些麻煩——分爲三步,一是下圖綠框中取出一個已有的、已經被全部人消費過的Event對象,二是下圖紅框中更新這個Event對象所指向的業務數據,三是下圖藍框中標記這個Event對象爲邏輯上的新Event。
https://github.com/LMAX-Exchange/disruptor/wiki/Introduction 這篇文章對Disruptor基本概念已經介紹得很清楚了。
可是,我以爲,入門介紹結合源碼去咀嚼,纔會比較sexy,朋友們會深刻理解。其實也不難,關鍵是找出源碼中的核心部分。
篇幅所限,本文對於Disruptor的高級功能沒有解釋,好比處理多個消費者之間的依賴關係。有機會補充。