上篇文章已經講過了 RingBuffer 了, RingBuffer 是消息的容器,可是 Disruptor 中最複雜的部分在於如何併發控制消息的增長和消費,而這部分由 Senquencer 來完成。java
這篇文章基於 Disruptor 官方提供的示例代碼。緩存
Sequencer 能夠被認爲是 Disruptor 的大腦,而 Sequence 則能夠認爲是神經元,Sequencer 會產生信號(Sequence 中的 value)來控制消費者和生產者。在一個 Disruptor 實例中,只會有一個 Sequencer 實例,在建立 RingBuffer 時建立。微信
// 多個生產者的狀況
public static <E> RingBuffer<E> createMultiProducer(EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy) {
MultiProducerSequencer sequencer = new MultiProducerSequencer(bufferSize, waitStrategy);
return new RingBuffer<E>(factory, sequencer);
}
// 單個生產者的狀況
public static <E> RingBuffer<E> createSingleProducer(EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy) {
SingleProducerSequencer sequencer = new SingleProducerSequencer(bufferSize, waitStrategy);
return new RingBuffer<E>(factory, sequencer);
}
複製代碼
Sequencer 接口有兩種實現,SingleProducerSequencer
和 MultiProducerSequencer
,分別來處理單個生產者和多個生產者的狀況。併發
在 Sequencer 中有一個 next()
方法,就是這個方法來產生 Sequence 中的 value。Sequence 本質上能夠認爲是一個 AtomicLong,消費者和生產者都會維護本身的 Sequence。框架
Sequence 中的 value 表示 RingBuffer 消息的編號,Disruptor 中控制邏輯都是圍繞這個編號來完成的。RingBuffer 的 sequence 從 0 開始增加。這裏須要注意的是在 Disruptor 中共享的並非 Sequence 對象,而是 sequence 中的 value。ide
生產者中 Sequence 的 value 表示當前消息已經生產到哪一個位置,消費者中 Sequence 的 value 表示消費者已經處理到哪一個位置。對於 Sequencer 和 Sequence 已經介紹清楚了,那麼 Sequencer 是怎麼運行的呢?工具
RingBuffer 是消息的容器,爲了讓消息可以被正常傳遞,RingBuffer 須要知足兩個要求,第一個是對於全部的消費者,在 RingBuffer 爲空時,就不能再從中取數據,對於生產者,新生產的內容不能把未消費的數據覆蓋掉。spa
Sequencer 的核心就是解決了這兩個問題,經過 Gating
和 Barrier
兩個工具。設計
Gating 經過 RingBuffer.addGatingSequences()
方法來獲取,Barrier 經過 RingBuffer.newBarrier()
方法來獲取。code
上圖中 C 表明消費者,P 表明生產者。
須要說明的是,EventProcessor + EventHandler 纔是一個完整的消費者。EventProcessor 中會維護一個 Sequence 對象,記錄該消費者處理到哪條消息,每一個消費者維護本身的 Sequence 生產者的 Sequence 在 RingBuffer 維護
Gating 的設計其實很簡單,其實就是將多個全部消費者的 Sequence 監控起來,而後在生產者向 RingBuffer 中寫入數據時,判斷是否有足夠的空間來存入新的消息。
全部消費者的 Sequence 經過以下的方法調用路徑,最後存入到 Sequencer.gatingSequences 變量中。
Disruptor.handleEventsWith() -> RingBuffer.addGatingSequences() -> Sequencer.addGatingSequences()
Sequencer.next() 中會對 gatingSequences 進行判斷,具體判斷的邏輯就是看當前這些被監控的 Sequence 中最小的 value 是否已經落後一圈了,落後一圈就表示新的消息沒有寫入的空間:
// MultiProducerSequencer.next() 方法
do
{
current = cursor.get();
next = current + n;
long wrapPoint = next - bufferSize; // 獲取一圈以前的值
long cachedGatingSequence = gatingSequenceCache.get(); // 獲取緩存的 gatingSequences 的最小值
// 若是大於緩存的值,則進行進一步的判斷
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current)
{
// 獲取當前實際最小的 sequence
long gatingSequence = Util.getMinimumSequence(gatingSequences, current);
// 若是比實際的最小 sequqnce 還大,說明已經沒有位置了,則繼續進行自旋(無限循環)
if (wrapPoint > gatingSequence)
{
LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
continue;
}
gatingSequenceCache.set(gatingSequence);
}
else if (cursor.compareAndSet(current, next)) // 若是比緩存的值小,說明目前還有空閒位置,能夠繼續存入消息
{
break;
}
}
while (true); // 這是一個無限循環,直到有新的空間能夠存入消息
複製代碼
若是沒有足夠的空間,那麼 next() 方法就會被阻塞,新的消息沒法加入到 RingBuffer 中。
上面是 gating 的示意圖,c1 和 c2 處理的速度不同,c1 在 1 的位置上,而 c2 在 2 的位置上,生產者 P 已經沒法在向 RingBuffer 中添加新的消息,所以會被阻塞,直到 c1 將 消息處理完成以後才能繼續插入消息。
同時對於消費者來講,必須等到 RingBuffer 中有消息才能進行處理。 經過 SequenceBarrier 來進行管理, SequenceBarrier 實際生成的是 ProcessingSequenceBarrier
實例,按照以下的調用路徑來初始化:
RingBuffer.newBarrier() -> Sequencer.newBarrier() -> new ProcessingSequenceBarriser()
消費者從 RingBuffer 中獲取消息時,須要經過 SenquencerBarrier 來肯定是否有可用的消息, 使用 SequencerBarrier 的調用路徑以下:
BatchEventProcessor.processEvents() -> sequenceBarrier.waitFor()
BatchEventProcessor 是默認使用的消費者,上面咱們說到了 EventProcessor + EventHandler 纔是一個完整的消費者。用戶本身實現 EventHandler 來處理消息的邏輯。而實際從 RingBuffer 中獲取消息的邏輯則在 BatchEventProcessor 中實現,關鍵代碼以下:
// BatchEventProcessor.processEvents() 方法,刪除了部分代碼
while (true)
{
try
{
// 獲取可用消息的最大值
final long availableSequence = sequenceBarrier.waitFor(nextSequence);
// 若是當前的位置小於可用的位置,說明有消息能夠處理,進行消息處理
while (nextSequence <= availableSequence)
{
event = dataProvider.get(nextSequence);
eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence); // 調用實際的 Handler 處理消息
nextSequence++;
}
sequence.set(availableSequence); // 將本身的 sequence 設置處理完成的位置
}
}
複製代碼
若是沒有獲取到可處理的 sequence, 那麼當前的處理消息的 handlers 也會被阻塞。
SequenceBarrier 除了能夠控制消費者從 RingBuffer 取數據以外,還能夠控制多個消費者執行的順序。若是要安排消費者執行的順序,用以下的代碼就能夠。
disruptor.handleEventsWith(new LongEventHandler()).then(new AnOtherLongEventHandler());
複製代碼
上面的代碼表示 AnotherLongEventHandler
須要等 LongEventHandler
處理完成以後,才能對消息進行處理。
消費者之間控制依賴關係其實就是控制 sequence 的大小,若是說 C2 消費者 依賴 C1,那就表示 C2 中 Sequence 的值必定小於等於 C1 的 Sequence。
其中 then 關係是經過 Disruptor.updateGatingSequencesForNextInChain() 方法來實現:
private void updateGatingSequencesForNextInChain(final Sequence[] barrierSequences, final Sequence[] processorSequences) {
if (processorSequences.length > 0)
{
ringBuffer.addGatingSequences(processorSequences);
for (final Sequence barrierSequence : barrierSequences)
{
ringBuffer.removeGatingSequence(barrierSequence);
}
consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences);
}
}
複製代碼
其實 Disruptor 控制的祕密就是這些了,其實也不是很複雜,只是實現的方式很巧妙,再加上併發控制沒有使用鎖,才造就了一個如此高效的框架。
關注微信公衆號,聊點其餘的