Disruptor源碼系列-Sequencer

上篇文章已經講過了 RingBuffer 了, RingBuffer 是消息的容器,可是 Disruptor 中最複雜的部分在於如何併發控制消息的增長和消費,而這部分由 Senquencer 來完成。java

這篇文章基於 Disruptor 官方提供的示例代碼。緩存

Sequencer 簡介

Sequencer 能夠被認爲是 Disruptor 的大腦,而 Sequence 則能夠認爲是神經元,Sequencer 會產生信號(Sequence 中的 value)來控制消費者和生產者。在一個 Disruptor 實例中,只會有一個 Sequencer 實例,在建立 RingBuffer 時建立。微信

// 多個生產者的狀況
public static <E> RingBuffer<E> createMultiProducer(EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy) {
    MultiProducerSequencer sequencer = new MultiProducerSequencer(bufferSize, waitStrategy);
    return new RingBuffer<E>(factory, sequencer);
}
// 單個生產者的狀況
public static <E> RingBuffer<E> createSingleProducer(EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy) {
    SingleProducerSequencer sequencer = new SingleProducerSequencer(bufferSize, waitStrategy);
    return new RingBuffer<E>(factory, sequencer);
}
複製代碼

Sequencer 接口有兩種實現,SingleProducerSequencerMultiProducerSequencer,分別來處理單個生產者和多個生產者的狀況。併發

在 Sequencer 中有一個 next() 方法,就是這個方法來產生 Sequence 中的 value。Sequence 本質上能夠認爲是一個 AtomicLong,消費者和生產者都會維護本身的 Sequence。框架

Sequence 中的 value 表示 RingBuffer 消息的編號,Disruptor 中控制邏輯都是圍繞這個編號來完成的。RingBuffer 的 sequence 從 0 開始增加。這裏須要注意的是在 Disruptor 中共享的並非 Sequence 對象,而是 sequence 中的 value。ide

生產者中 Sequence 的 value 表示當前消息已經生產到哪一個位置,消費者中 Sequence 的 value 表示消費者已經處理到哪一個位置。對於 Sequencer 和 Sequence 已經介紹清楚了,那麼 Sequencer 是怎麼運行的呢?工具

RingBuffer 是消息的容器,爲了讓消息可以被正常傳遞,RingBuffer 須要知足兩個要求,第一個是對於全部的消費者,在 RingBuffer 爲空時,就不能再從中取數據,對於生產者,新生產的內容不能把未消費的數據覆蓋掉。spa

Sequencer 的核心就是解決了這兩個問題,經過 GatingBarrier 兩個工具。設計

Gating 經過 RingBuffer.addGatingSequences() 方法來獲取,Barrier 經過 RingBuffer.newBarrier() 方法來獲取。code

上圖中 C 表明消費者,P 表明生產者。

須要說明的是,EventProcessor + EventHandler 纔是一個完整的消費者。EventProcessor 中會維護一個 Sequence 對象,記錄該消費者處理到哪條消息,每一個消費者維護本身的 Sequence 生產者的 Sequence 在 RingBuffer 維護

Gating 實現

Gating 的設計其實很簡單,其實就是將多個全部消費者的 Sequence 監控起來,而後在生產者向 RingBuffer 中寫入數據時,判斷是否有足夠的空間來存入新的消息。

全部消費者的 Sequence 經過以下的方法調用路徑,最後存入到 Sequencer.gatingSequences 變量中。

Disruptor.handleEventsWith() -> RingBuffer.addGatingSequences() -> Sequencer.addGatingSequences()

Sequencer.next() 中會對 gatingSequences 進行判斷,具體判斷的邏輯就是看當前這些被監控的 Sequence 中最小的 value 是否已經落後一圈了,落後一圈就表示新的消息沒有寫入的空間:

// MultiProducerSequencer.next() 方法
do
{
    current = cursor.get();
    next = current + n;

    long wrapPoint = next - bufferSize; // 獲取一圈以前的值
    long cachedGatingSequence = gatingSequenceCache.get(); // 獲取緩存的 gatingSequences 的最小值
    // 若是大於緩存的值,則進行進一步的判斷
    if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current)
    {
        // 獲取當前實際最小的 sequence
        long gatingSequence = Util.getMinimumSequence(gatingSequences, current);
        // 若是比實際的最小 sequqnce 還大,說明已經沒有位置了,則繼續進行自旋(無限循環)
        if (wrapPoint > gatingSequence)
        {
            LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
            continue;
        }
        gatingSequenceCache.set(gatingSequence);
    }
    else if (cursor.compareAndSet(current, next)) // 若是比緩存的值小,說明目前還有空閒位置,能夠繼續存入消息
    {
        break;
    }
}
while (true); // 這是一個無限循環,直到有新的空間能夠存入消息
複製代碼

若是沒有足夠的空間,那麼 next() 方法就會被阻塞,新的消息沒法加入到 RingBuffer 中。

上面是 gating 的示意圖,c1 和 c2 處理的速度不同,c1 在 1 的位置上,而 c2 在 2 的位置上,生產者 P 已經沒法在向 RingBuffer 中添加新的消息,所以會被阻塞,直到 c1 將 消息處理完成以後才能繼續插入消息。

SequencerBarrier 實現

同時對於消費者來講,必須等到 RingBuffer 中有消息才能進行處理。 經過 SequenceBarrier 來進行管理, SequenceBarrier 實際生成的是 ProcessingSequenceBarrier 實例,按照以下的調用路徑來初始化:

RingBuffer.newBarrier() -> Sequencer.newBarrier() -> new ProcessingSequenceBarriser()

消費者從 RingBuffer 中獲取消息時,須要經過 SenquencerBarrier 來肯定是否有可用的消息, 使用 SequencerBarrier 的調用路徑以下:

BatchEventProcessor.processEvents() -> sequenceBarrier.waitFor()

BatchEventProcessor 是默認使用的消費者,上面咱們說到了 EventProcessor + EventHandler 纔是一個完整的消費者。用戶本身實現 EventHandler 來處理消息的邏輯。而實際從 RingBuffer 中獲取消息的邏輯則在 BatchEventProcessor 中實現,關鍵代碼以下:

// BatchEventProcessor.processEvents() 方法,刪除了部分代碼
while (true)
{
    try
    {
        // 獲取可用消息的最大值
        final long availableSequence = sequenceBarrier.waitFor(nextSequence);
        // 若是當前的位置小於可用的位置,說明有消息能夠處理,進行消息處理
        while (nextSequence <= availableSequence)
        {
            event = dataProvider.get(nextSequence);
            eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence); // 調用實際的 Handler 處理消息
            nextSequence++;
        }
        sequence.set(availableSequence); // 將本身的 sequence 設置處理完成的位置
    }
}
複製代碼

若是沒有獲取到可處理的 sequence, 那麼當前的處理消息的 handlers 也會被阻塞。

SequenceBarrier 除了能夠控制消費者從 RingBuffer 取數據以外,還能夠控制多個消費者執行的順序。若是要安排消費者執行的順序,用以下的代碼就能夠。

disruptor.handleEventsWith(new LongEventHandler()).then(new AnOtherLongEventHandler());
複製代碼

上面的代碼表示 AnotherLongEventHandler 須要等 LongEventHandler 處理完成以後,才能對消息進行處理。

消費者之間控制依賴關係其實就是控制 sequence 的大小,若是說 C2 消費者 依賴 C1,那就表示 C2 中 Sequence 的值必定小於等於 C1 的 Sequence。

其中 then 關係是經過 Disruptor.updateGatingSequencesForNextInChain() 方法來實現:

private void updateGatingSequencesForNextInChain(final Sequence[] barrierSequences, final Sequence[] processorSequences) {
    if (processorSequences.length > 0)
    {
        ringBuffer.addGatingSequences(processorSequences);
        for (final Sequence barrierSequence : barrierSequences)
        {
            ringBuffer.removeGatingSequence(barrierSequence);
        }
        consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences);
    }
}
複製代碼

其實 Disruptor 控制的祕密就是這些了,其實也不是很複雜,只是實現的方式很巧妙,再加上併發控制沒有使用鎖,才造就了一個如此高效的框架。

關注微信公衆號,聊點其餘的

相關文章
相關標籤/搜索