LMAX Disruptor—多生產者多消費者中,消息複製分發的高性能實現

解決的問題

當咱們有多個消息的生產者線程,一個消費者線程時,他們之間如何進行高併發、線程安全的協調?git

很簡單,用一個隊列。github

 

當咱們有多個消息的生產者線程,多個消費者線程,而且每一條消息須要被全部的消費者都消費一次(這就不是通常隊列,只消費一次的語義了),該怎麼作?數組

這時仍然須要一個隊列。可是:安全

1. 每一個消費者須要本身維護一個指針,知道本身消費了隊列中多少數據。這樣同一條消息,能夠被多我的獨立消費。併發

2. 隊列須要一個全局指針,指向最後一條被全部生產者加入的消息。消費者在消費數據時,不能消費到這個全局指針以後的位置——由於這個全局指針,已是表明隊列中最後一條能夠被消費的消息了。app

3. 須要協調全部消費者,在消費完全部隊列中的消息後,阻塞等待。框架

4. 若是消費者之間有依賴關係,即對同一條消息的消費順序,在業務上有固定的要求,那麼還須要處理誰先消費,誰後消費同一條消息的問題。高併發

 

總而言之,若是有多個生產者,多個消費者,而且同一條消息要給到全部的消費者都去處理一下,須要作到以上4點。這是不容易的。性能

LMAX Disruptor,正是這種場景下,知足以上4點要求的單機跨線程消息傳遞、分發的開源、高性能實現。ui

這裏有一篇英文的Disruptor介紹好文:https://github.com/LMAX-Exchange/disruptor/wiki/Introduction

 

關鍵概念

1. RingBuffer

應用須要傳遞的消息在Disrutpor中稱爲Event(事件)。

RingBuffer是Event的數組,實現了阻塞隊列的語義:

若是RingBuffer滿了,則生產者會阻塞等待。

若是RingBuffer空了,則消費者會阻塞等待。

 

2. Sequence

在上文中,我提到「每一個消費者須要本身維護一個指針」。這裏的指針就是一個單調遞增加整數(及其基於CAS的加法、獲取操做),稱爲Sequence。

除了每一個消費者須要維護一個指針外,RingBuffer自身也要維護一個全局指針(如上一節第2點所提到的),記錄最後一條能夠被消費的消息。這個全局指針就在下圖紅框中。

生產場景實現

生產者往RingBuffer中發送一條消息(RingBuffer.publish())時:

1. 生產者的私有sequence會+1

2. 檢查生產者的私有sequence與RingBuffer中Event個數的關係。若是發現Event數組滿了(下圖紅框中的判斷),則阻塞(下圖綠框中的等待)。

 

3. RingBuffer會在Event數組中(sequencer+1) % BUFFER_SIZE的地方,放入Event。這裏的取模操做,就體現了Event數組用到最後,則回到頭部繼續放,所謂「Ring「 Buffer的輪循複用語義。

 

消費場景實現

 消費者從RingBuffer循環隊列中獲取一條消息時:

1. 從消費者私有Sequence,能夠知道它本身消費到了RingBuffer隊列中的哪一條消息。

2. 從RingBuffer的全局指針Sequence,能夠知道RingBuffer中最後一條沒有被消費的消息在什麼位置。

3. N = (RuingBuffer的全局指針Sequence - 消費者私有Sequence),就是當前消費者,還能夠消費多少Event。

4. 若是以上差值N爲0,說明當前消費者已經消費過RingBuffer中的全部消息了。那麼當前消費者會阻塞。等待生產者加入更多的消息:

 

 以上代碼中,紅框中的availableSequence就是RingBuffer的全局指針Sequence。綠框中的sequence是當前消費者的私有sequence。

若是這個判斷爲true,說明RingBuffer中最新一條能夠被消費的Event,已經被當前消費者消費過了。那麼就會調用apployWaitMethod()阻塞,等待生產者產生更多的Event。

 5. 若是RingBuffer中,還有能夠被當前消費者消費的Event,即N > 0,

     那麼消費者,會一口氣獲取全部能夠被消費的N個Event。即下圖中的while循環,直到N個Event都被消費才退出。這種一口氣消費盡可能多的Event,是高性能的體現。

     從RingBuffer中每獲取一個Event,都會回調綠框中的eventHandler——這是應用註冊的Event處理方法,執行應用的Event消費業務邏輯。

  

   最後,上圖中的sequence.set(availableSequence),會把當前消費者的私有Sequence更新到RingBuffer的全局Sequence。表示RingBuffer中全部的Event都已經消費掉了。

 

 高性能的實現細節

無鎖

無鎖就沒有鎖競爭。當生產者、消費者線程數很高時,意義重大。因此,

往大里說,每一個消費者維護本身的Sequence,基本沒有跨線程共享的狀態。

往小裏說,Sequence的加法是CAS實現的。

  • 當生產者須要判斷RingBuffer是否已滿時,用CAS比較原先RingBuffer的Event個數,和假定放入新Event後Event的個數。
  • 若是CAS返回false,說明在判斷期間,別的生產者加入了新Event;或者別的消費者拿走了Event。那麼當前判斷無效,須要從新判斷。這就是常見的 do { ... } while (false == CAS(oldVal, newVal))。——都是套路:)

 

對象的複用

JVM運行時,一怕建立大對象,二怕建立不少小對象。這都會致使JVM堆碎片化、對象元數據存儲的額外開銷大。這是高性能Java應用的噩夢。

爲了解決第二點「不少小對象」,主流開源框架都會本身維護、複用對象池。LMAX Disruptor也不例外。

生產者不是建立新的Event對象,放入到RingBuffer中。而是從RingBuffer中取出一個已有的Event對象,更新它所指向的業務數據,來表明一個邏輯上的新Event。

因此LMAX Disruptor的生產者API,用起來有些麻煩——分爲三步,一是下圖綠框中取出一個已有的、已經被全部人消費過的Event對象,二是下圖紅框中更新這個Event對象所指向的業務數據,三是下圖藍框中標記這個Event對象爲邏輯上的新Event。

 

總結

https://github.com/LMAX-Exchange/disruptor/wiki/Introduction 這篇文章對Disruptor基本概念已經介紹得很清楚了。

可是,我以爲,入門介紹結合源碼去咀嚼,纔會比較sexy,朋友們會深刻理解。其實也不難,關鍵是找出源碼中的核心部分。

篇幅所限,本文對於Disruptor的高級功能沒有解釋,好比處理多個消費者之間的依賴關係。有機會補充。

相關文章
相關標籤/搜索