Disruptor源碼解讀

 

上一篇已經介紹了Disruptor是什麼?簡單總結了爲何這麼快?下面咱們直接源碼搞起來,簡單粗暴。
高性能隊列disruptor爲何這麼快?html

 

1、核心類接口

Disruptor 提供了對RingBuffer的封裝。java

RingBuffer 環形隊列,基於數組實現,內存被循環使用,減小了內存分配、回收擴容等操做。git

EventProcessor 事件處理器,實現了Runnable,單線程批量處理BatchEventProcessor和多線程處理WorkProcessor。github

Sequencer 生產者訪問序列的接口,RingBuffer生產者的父接口,其直接實現有SingleProducerSequencer和MultiProducerSequencer。數組

EventSequencer 空接口,暫時沒用,用於之後擴展。緩存

SequenceBarrier 消費者屏障 消費者用於訪問緩存的控制器。數據結構

WaitStrategy 當沒有可消費的事件時,根據特定的實現進行等待。多線程

SingleProducerSequencer 單生產者發佈實現類框架

MultiProducerSequencer 多生產者發佈實現類less

筆者簡單介紹下經常使用的類,看不懂不要緊,等看完源碼天然明白。

核心類源碼分析已經上傳了git地址:https://github.com/Sonion/disroptor

2、生產者

開局一張圖,走起。

想分析disruptor,先看生產者,這是筆者整理的生產者相關類圖。

單生產者發佈流程:

生產者發佈消息是從Disruptor的publish方法開始,

//Disruptor.java
public <A> void publishEvent(final EventTranslatorOneArg<T, A> eventTranslator, final A arg)
{
  ringBuffer.publishEvent(eventTranslator, arg);
}

實際調用的RingBuffer的publishEvent,實際上也就是作兩件事,一先去獲取RingBuffer上的一個可用位置,第二步在可用位置上發佈數據

//RingBuffer.java   
public <A> void publishEvent(EventTranslatorOneArg<E, A> translator, A arg0)
{
  final long sequence = sequencer.next();
  translateAndPublish(translator, sequence, arg0);
}

來看看獲取RingBuffer上的一個可用位置,先看看單生產者SingleProducerSequencer.next()方法。

 //SingleProducerSequencer.java   
    /**
     * @see Sequencer#next()
     */
    @Override
    public long next()
    {
        return next(1);
    }

    /**
     * @see Sequencer#next(int)
     */
    @Override
    public long next(int n)
    {
        if (n < 1 || n > bufferSize)
        {
            throw new IllegalArgumentException("n must be > 0 and < bufferSize");
        }
        // 獲取上次申請最後的序列值
        long nextValue = this.nextValue;
        // n=1,獲得本次須要申請的序列值
        long nextSequence = nextValue + n;
        // 可能發生繞環的點,本次申請值 - 環形一圈長度
        long wrapPoint = nextSequence - bufferSize;
        // 數值最小的序列值,理解爲最慢消費者
        long cachedGatingSequence = this.cachedValue;
        // 序列值初始值是 -1 ,只有wrapPoint 大於 cachedGatingSequence 將發生繞環行爲,生產者超一圈從後方追上消費者,生產者覆蓋未消費的狀況。
        // 沒有空坑位,將進入循環等待。
        if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
        {
            cursor.setVolatile(nextValue);  // StoreLoad fence

            long minSequence;
            // 只有當消費者消費,向前移動後,才能跳出循環
            // 每次從新獲取消費者序列最小值進行輪詢判斷
            while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))
            {
                LockSupport.parkNanos(1L);
            }
            // 當消費者消費後,更新緩存的最小序號
            this.cachedValue = minSequence;
        }
        // 將成功申請的序號賦值給對象實例變量
        this.nextValue = nextSequence;

        return nextSequence;
    }

next獲取能夠寫入的序列號,回到RingBuffer的publishEvent方法,執行translateAndPublish方法,進行發佈操做。

//RingBuffer.java   
     private void translateAndPublish(EventTranslator<E> translator, long sequence)
    {
        try
        {
            translator.translateTo(get(sequence), sequence);
        }
        finally
        {
            sequencer.publish(sequence);
        }
    }

 translator.translateTo()對EventTranslator接口的實現。將數據放置好,進行發佈。

//SingleProducerSequencer.java  
  /**
     * @see Sequencer#publish(long)
     */
    @Override
    public void publish(long sequence)
    {
        // 更新Sequencer內部遊標值
        cursor.set(sequence);
        // 當生產者發佈新事件後,將通知等待的EventProcessor,能夠進行消費
        waitStrategy.signalAllWhenBlocking();
    }

// BlockingWaitStrategy.java
    @Override
    public void signalAllWhenBlocking()
    {
        synchronized (mutex)
        {
            mutex.notifyAll();
        }
    }

到此單生產者發佈流程已經講完。仍是那句話,很簡單,兩步操做先去獲取RingBuffer上的一個可用位置,第二步在可用位置上發佈數據。

多生產者發佈流程:

咱們簡單看下,前面和單生產者發佈流程同樣,實現接口AbstractSequencer,仍是next()方法,咱們來看。

//MultiProducerSequencer.java
    /**
     * @see Sequencer#next()
     */
    @Override
    public long next()
    {
        return next(1);
    }

    /**
     * @see Sequencer#next(int)
     */
    @Override
    public long next(int n)
    {
        if (n < 1 || n > bufferSize)
        {
            throw new IllegalArgumentException("n must be > 0 and < bufferSize");
        }

        long current;
        long next;

        do
        {
            // 當前遊標值,初始化時是-1
            current = cursor.get();
            next = current + n;

            long wrapPoint = next - bufferSize;
            long cachedGatingSequence = gatingSequenceCache.get();

            if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current)
            {
                long gatingSequence = Util.getMinimumSequence(gatingSequences, current);

                if (wrapPoint > gatingSequence)
                {
                    LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
                    continue;
                }

                gatingSequenceCache.set(gatingSequence);
            }
            else if (cursor.compareAndSet(current, next))
            {
                break;
            }
        }
        while (true);

        return next;
    }

能夠看到多生產者發佈流程和單生產者發佈流程區別不大,最後固然仍是調用publish發佈,publish有點區別,咱們來看。

    /**
     * @see Sequencer#publish(long)
     */
    @Override
    public void publish(final long sequence)
    {
        //多生產者是採用availableBuffer數組設置
        setAvailable(sequence);

        waitStrategy.signalAllWhenBlocking();
    }

    /**
     * @see Sequencer#publish(long, long)
     */
    @Override
    public void publish(long lo, long hi)
    {
        for (long l = lo; l <= hi; l++)
        {
            setAvailable(l);
        }
        waitStrategy.signalAllWhenBlocking();
    }

對比SingleProducerSequencer的publish,MultiProducerSequencer的publish沒有設置cursor,而是將內部使用的availableBuffer數組對應位置進行設置。availableBuffer是一個記錄RingBuffer槽位狀態的數組,經過對序列值sequence&bufferSize-1,得到槽位號,再經過位運算,獲取序列值所在的圈數,進行設置。使用更高效的位與和右移操做。

    private void setAvailable(final long sequence)
    {
        // calculateIndex 與&, calculateAvailabilityFlag 移位操做
        setAvailableBufferValue(calculateIndex(sequence), calculateAvailabilityFlag(sequence));
    }

    private void setAvailableBufferValue(int index, int flag)
    {
        // 使用Unsafe更新屬性,由於是直接操做內存,因此須要計算元素位置對應的內存位置buffer地址
        long bufferAddress = (index * SCALE) + BASE;
        // availableBuffer是標誌可用位置的int數組,初始全爲-1
        UNSAFE.putOrderedInt(availableBuffer, bufferAddress, flag);
    }

     private int calculateAvailabilityFlag(final long sequence)
    {
        return (int) (sequence >>> indexShift);
    }

    private int calculateIndex(final long sequence)
    {
        return ((int) sequence) & indexMask;
    }

到此,多生產者發佈流程也講完了,是否是很easy,若是你們有問題,評論區咱們一塊兒討論。

3、消費者

老規矩,再來一張圖,這個就比較簡單了。

EventProcessor是整個消費者事件處理框架,EventProcessor接口繼承了Runnable接口,主要有兩種實現:單線程批量處理BatchEventProcessor和多線程處理WorkProcessor
在使用Disruptor幫助類構建消費者時,使用handleEventsWith方法傳入多個EventHandler,內部使用多個BatchEventProcessor關聯多個線程執行。這種狀況相似JMS中的發佈訂閱模式,同一事件會被多個消費者並行消費。適用於同一事件觸發多種操做。
而使用Disruptor的handleEventsWithWorkerPool傳入多個WorkHandler時,內部使用多個WorkProcessor關聯多個線程執行。這種狀況相似JMS的點對點模式,同一事件會被一組消費者其中之一消費。適用於提高消費者並行處理能力。

BatchEventProcessor單線程批處理事件(理解爲廣播消費,重複消費)

// EventHandlerGroup.java
public EventHandlerGroup<T> then(final EventHandler<? super T>... handlers)
{
    return handleEventsWith(handlers);
}
public EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers)
{
    return disruptor.createEventProcessors(sequences, handlers);
}

 

//Disruptor.java
    // barrierSequences是EventHandlerGroup實例的序列,就是上一個事件處理者組的序列
    EventHandlerGroup<T> createEventProcessors(
        final Sequence[] barrierSequences,
        final EventHandler<? super T>[] eventHandlers)
    {
        checkNotStarted();
        // processorSequences本次事件處理器組的序列組
        final Sequence[] processorSequences = new Sequence[eventHandlers.length];
        final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);

        for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++)
        {
            final EventHandler<? super T> eventHandler = eventHandlers[i];

            final BatchEventProcessor<T> batchEventProcessor =
                new BatchEventProcessor<>(ringBuffer, barrier, eventHandler);

            if (exceptionHandler != null)
            {
                batchEventProcessor.setExceptionHandler(exceptionHandler);
            }

            consumerRepository.add(batchEventProcessor, eventHandler, barrier);
            processorSequences[i] = batchEventProcessor.getSequence();
        }
        // 每次添加完事件處理器後,更新門控序列,用於後續調用鏈的添加判斷。
        updateGatingSequencesForNextInChain(barrierSequences, processorSequences);

        return new EventHandlerGroup<>(this, consumerRepository, processorSequences);
    }

    // 門控,是指後續消費鏈的消費,不能超過前邊。
    private void updateGatingSequencesForNextInChain(final Sequence[] barrierSequences, final Sequence[] processorSequences)
    {
        if (processorSequences.length > 0)
        {
            //GatingSequences一直保存消費鏈末端消費者的序列組
            ringBuffer.addGatingSequences(processorSequences);
            for (final Sequence barrierSequence : barrierSequences)
            {
                ringBuffer.removeGatingSequence(barrierSequence);
            }
            // 取消標記上一組消費者爲消費鏈末端
            consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences);
        }
    }

 

BatchEventProcessor構建消費者鏈時的邏輯都在createEventProcessors這個方法中。

先簡單說下ConsumerRepository,這個類主要保存消費者的各類關係,如經過EventHandler引用獲取EventProcessorInfo信息,經過Sequence獲取ConsumerInfo信息等。

由於要使用引用作key,因此數據結構使用IdentityHashMap

  • IdentityHashMap使用的是==比較key的值,而HashMap使用的是equals()
  • HashMap使用的是hashCode()查找位置,IdentityHashMap使用的是System.identityHashCode(object)
  • IdentityHashMap理論上來講速度要比HashMap快一點
  • 另一點呢就是IdentityHashMap中key能重複,但須要注意一點的是key比較的方法是==,因此若要存放兩個相同的key,就須要存放不一樣的地址。

這個createEventProcessors方法接收兩個參數,barrierSequences表示當前消費者組的屏障序列數組,若是當前消費者組是第一組,則取一個空的序列數組;不然,barrierSequences就是上一組消費者組的序列數組。createEventProcessors方法的另外一個參數eventHandlers,這個參數是表明事件消費邏輯的EventHandler數組。
Disruptor爲每一個EventHandler實現類都建立了一個對應的BatchEventProcessor,全部消費者共用一個SequenceBarrier
在構建BatchEventProcessor時須要如下傳入三個構造參數:dataProvider是數據存儲結構如RingBuffer;sequenceBarrier用於跟蹤生產者遊標,協調數據處理;eventHandler是用戶實現的事件處理器,也就是實際的消費者。

//BatchEventProcessor.java
    @Override
    public void run()
    {
        if (running.compareAndSet(IDLE, RUNNING))
        {
            sequenceBarrier.clearAlert();

            notifyStart();
            try
            {
                if (running.get() == RUNNING)
                {
                    processEvents();
                }
            }
            finally
            {
                notifyShutdown();
                running.set(IDLE);
            }
        }
        else
        {
            // This is a little bit of guess work.  The running state could of changed to HALTED by
            // this point.  However, Java does not have compareAndExchange which is the only way
            // to get it exactly correct.
            if (running.get() == RUNNING)
            {
                throw new IllegalStateException("Thread is already running");
            }
            else
            {
                earlyExit();
            }
        }
    }

    private void processEvents()
    {
        T event = null;
        long nextSequence = sequence.get() + 1L;

        while (true)
        {
            try
            {
                // 當前可以使用的最大值
                // 使用給定的等待策略去等待下一個序列可用
                final long availableSequence = sequenceBarrier.waitFor(nextSequence);
                if (batchStartAware != null)
                {
                    batchStartAware.onBatchStart(availableSequence - nextSequence + 1);
                }

                // 批處理
                // 消費的偏移量大於上次消費記錄
                while (nextSequence <= availableSequence)
                {
                    event = dataProvider.get(nextSequence);
                    eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
                    nextSequence++;
                }

                // eventHandler處理完畢後,更新當前序號
                sequence.set(availableSequence);
            }
            catch (final TimeoutException e)
            {
                notifyTimeout(sequence.get());
            }
            catch (final AlertException ex)
            {
                if (running.get() != RUNNING)
                {
                    break;
                }
            }
            catch (final Throwable ex)
            {
                exceptionHandler.handleEventException(ex, nextSequence, event);
                sequence.set(nextSequence);
                nextSequence++;
            }
        }
    }

咱們再來看看SequenceBarrier實現類ProcessingSequenceBarrier的代碼是如何實現waitFor方法。

final class ProcessingSequenceBarrier implements SequenceBarrier
{
    /**
     * 等待可用消費時,指定的等待策略
     */
    private final WaitStrategy waitStrategy;
    /**
     * 依賴的上組消費者的序號,若是當前爲第一組則爲cursorSequence(即生產者發佈遊標序列)
     * 不然使用FixedSequenceGroup封裝上組消費者序列
     */
    private final Sequence dependentSequence;
    /**
     * 當觸發halt時,將標記alerted爲true
     */
    private volatile boolean alerted = false;
    /**
     * AbstractSequencer中的cursor引用,記錄當前發佈者發佈的最新位置
     */
    private final Sequence cursorSequence;
    /**
     * MultiProducerSequencer 或 SingleProducerSequencer
     */
    private final Sequencer sequencer;

    ProcessingSequenceBarrier(
        final Sequencer sequencer,
        final WaitStrategy waitStrategy,
        final Sequence cursorSequence,
        final Sequence[] dependentSequences)
    {
        this.sequencer = sequencer;
        this.waitStrategy = waitStrategy;
        this.cursorSequence = cursorSequence;
        // 依賴的上一組序列長度,第一次是0
        if (0 == dependentSequences.length)
        {
            dependentSequence = cursorSequence;
        }
        // 將上一組序列數組複製成新數組保存,引用不變
        else
        {
            dependentSequence = new FixedSequenceGroup(dependentSequences);
        }
    }

    @Override
    public long waitFor(final long sequence)
        throws AlertException, InterruptedException, TimeoutException
    {
        // 檢查是否中止服務
        checkAlert();
        // 獲取最大可用序號 sequence爲給定序號,通常爲當前序號+1,cursorSequence記錄生產者最新位置,
        long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);

        if (availableSequence < sequence)
        {
            return availableSequence;
        }
        // 返回已發佈最高的序列值,將對每一個序號進行校驗
        return sequencer.getHighestPublishedSequence(sequence, availableSequence);
    }

咱們再來看看等待策略WaitStrategy#waitFor

//BlockingWaitStrategy.java
public final class BlockingWaitStrategy implements WaitStrategy
{
    private final Object mutex = new Object();

    @Override
    public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier)
        throws AlertException, InterruptedException
    {
        long availableSequence;
        // 當前遊標小於給定序號,也就是無可用事件
        if (cursorSequence.get() < sequence)
        {
            //也就是隻有等待策略纔會用鎖,其餘使用CAS,這就是前文提到的高效緣由
            synchronized (mutex)
            {
                // 當給定的序號大於生產者遊標序號時,進行等待
                while (cursorSequence.get() < sequence)
                // 循環等待,在Sequencer中publish進行喚醒;等待消費時也會在循環中定時喚醒。
                {
                    barrier.checkAlert();
                    mutex.wait();
                }
            }
        }
        while ((availableSequence = dependentSequence.get()) < sequence)
        {
            barrier.checkAlert();
            ThreadHints.onSpinWait();
        }

        return availableSequence;
    }

WorkProcessor多線程處理事件(理解爲集羣消費)

WorkProcessor的原理和BatchEventProcessor相似,只是多了workSequence用來保存同組共用的處理序列。在更新workSequence時,涉及多線程操做,因此使用CAS進行更新。 

//WorkProcessor.java
    @Override
    public void run()
    {
        if (!running.compareAndSet(false, true))
        {
            throw new IllegalStateException("Thread is already running");
        }
        sequenceBarrier.clearAlert();

        notifyStart();

        boolean processedSequence = true;
        long cachedAvailableSequence = Long.MIN_VALUE;
        long nextSequence = sequence.get();
        T event = null;
        while (true)
        {
            try
            {
                // if previous sequence was processed - fetch the next sequence and set
                // that we have successfully processed the previous sequence
                // typically, this will be true
                // this prevents the sequence getting too far forward if an exception
                // is thrown from the WorkHandler
                // 表示nextSequence序號的處理狀況(不區分正常或是異常處理)。只有處理過,才能申請下一個序號。
                if (processedSequence)
                {
                    processedSequence = false;
                    do
                    {
                        // 同組中多個消費線程有可能會爭搶一個序號,使用CAS避免使用鎖。
                        // 同一組使用一個workSequence,WorkProcessor不斷申請下一個可用序號,對workSequence設置成功纔會實際消費。
                        nextSequence = workSequence.get() + 1L;
                        sequence.set(nextSequence - 1L);
                    }
                    while (!workSequence.compareAndSet(nextSequence - 1L, nextSequence));
                }
                // 緩存的可用序號比要處理的序號大,才能進行處理
                if (cachedAvailableSequence >= nextSequence)
                {
                    event = ringBuffer.get(nextSequence);
                    workHandler.onEvent(event);
                    processedSequence = true;
                }
                // 更新緩存的可用序列。這個cachedAvailableSequence只用在WorkProcessor實例內,不一樣實例的緩存多是不同
                else
                {
                    // 和單線程模式相似,返回的也是最大可用序號
                    cachedAvailableSequence = sequenceBarrier.waitFor(nextSequence);
                }
            }
            catch (final TimeoutException e)
            {
                notifyTimeout(sequence.get());
            }
            catch (final AlertException ex)
            {
                if (!running.get())
                {
                    break;
                }
            }
            catch (final Throwable ex)
            {
                // handle, mark as processed, unless the exception handler threw an exception
                exceptionHandler.handleEventException(ex, nextSequence, event);
                processedSequence = true;
            }
        }

        notifyShutdown();

        running.set(false);
    }

總結一波:

BatchEventProcessor主要用於處理單線程並行任務,同一消費者組的不一樣消費者會接收相同的事件,並在全部事件處理完畢後進入下一消費者組進行處理(是否是相似JUC裏的Phaser、CyclicBarrier或CountDownLatch呢)。WorkProcessor經過WorkerPool管理多個WorkProcessor,達到多線程處理事件的目的,同一消費者組的多個WorkProcessor不會處理同一個事件。經過選擇不一樣的WaitStragegy實現,能夠控制消費者在沒有可用事件處理時的等待策略。

應用場景總結

BatchEventProcessor

  • 對Event的處理順序有需求
  • 單個Event的處理很是快(由於單線程)

WorkProcessor

  • 對Event的處理順序沒有要求,雖然是順序的消費,可是最終的消費前後取決於線程的調度,沒有辦法保證
  • 單個Event的處理速度相對較慢(由於多線程)

具體的核心類分析,能夠參考 https://github.com/Sonion/disroptor  裏面有分析和註釋,寫個demo,打個斷點,再看看核心類和方法,Disruptor 源碼仍是簡單的,主要是環形隊列,循環寫入,不用GC回收,還有一些緩存行優化,無鎖等處理是值得學習和思考的。

參考資料:

一、參考demo能夠看這裏

http://www.javashuo.com/article/p-acgvopbo-dv.html

二、簡單資料

https://blog.csdn.net/changong28/article/details/43637679

http://brokendreams.iteye.com/category/349033

相關文章
相關標籤/搜索