Disruptor源碼分析

本文將介紹Disruptor的工做機制,並分析Disruptor的主要源碼html

基於的版本是3.3.7(發佈於2017.09.28)git

水平有限,若有謬誤請留言指正github

 

0. 什麼是Disruptor?

Disruptor是一個開源的併發框架,提供了相似於Java中有界隊列的功能,主要用於生產消費者場景。編程

與Java原生併發隊列不一樣的是,Disruptor高度優化,在單機上能夠輕鬆跑到千萬級別的tps與ns級別的延時數組

 

1. Disruptor的關鍵想法

a. 使用環形隊列做爲底層存儲(存儲空間連續,能夠充分利用cache)緩存

b. 環形隊列中存儲的對象都是預先創建好的,減小了頻繁建立/釋放對象帶來的開銷安全

c. 生產者使用兩階段提交的方式來發布事件(第一階段是先在環形隊列中預佔一個空位,第二階段是向這個空位中寫入數據,競爭只發生在第一階段),並使用CAS操做來解決衝突,而不是使用昂貴的Lock併發

d. 用cache line padding(緩衝區填充)的思想來解決false sharing(僞共享)的問題框架

e. 使用了Java底層的Unsafe操做ide

 

2. Disruptor的核心組件

RingBuffer

環形緩衝區,本質是一個定長Object數組(後續稱裏面的格子爲slot),爲了不僞共享,在這個數組的兩端額外填充了若干空位(這也致使訪問RingBuffer數據的方式比較崎嶇,具體請自行參見源碼)

Sequence

相似於AtomicLong,用於標記事件id

全部生產者共用一個Sequence,用於不衝突的將事件放到RingBuffer上

每一個消費者本身維護一個Sequence,用於標記本身當前正在處理的事件的id

Sequencer

生產者訪問RingBuffer時的控制器,主要實現有兩種:SingleProducerSequencer與MultiProducerSequencer,分別用於單生產者和多生產者的場景

SequenceBarrier

只有一個實現類爲ProcessingSequenceBarrier,用於協調生產者與消費者(若是某個slot中的事件尚未被全部消費者消費完畢,那麼這個slot是不能被複用的,須要等待)

WaitStrategy

消費者等待下一個可用事件的策略,Disruptor自帶了多種WaitStrategy的實現,能夠根據場景自行選擇。

 

3. 生產者發佈事件到RingBuffer

示例代碼以下:

        long sequence = ringBuffer.next();  // 第一階段,獲取RingBuffer上下一個可用的slot的序列號,這裏可能會有爭用
        try {
            Event event = ringBuffer.get(sequence); // 根據序列號直接去RingBuffer上獲取對應的slot上存儲的事件
            event.setData(data);  // 寫入數據
        } finally {
            ringBuffer.publish(sequence); // 第二階段,將這個事件正式發佈到RingBuffer中
        }

須要重點關注的是next()publish()方法

next()方法

RingBuffer的next方法直接調用關聯的Sequencer的next方法,Sequencer的實現又分爲SingleProducerSequencerMultiProducerSequencer這兩種

先從相對簡單的單生產者SingleProducerSequencer看起:

SingleProducerSequencer.next()
    @Override
    public long next()
    {
        return next(1);
    }

    @Override
    public long next(int n)
    {
        if (n < 1)//參數檢驗
        {
            throw new IllegalArgumentException("n must be > 0");
        }

        long nextValue = this.nextValue;//上一次返回的seq

        long nextSequence = nextValue + n;//此次應該返回的序列值,這個序列值還未被產生,對應的slot裏的元素的seq須要減去RingBuffer的大小
        long wrapPoint = nextSequence - bufferSize;//這個序列值對應的slot上正在存儲的元素的seq,這個slot可能已經被消費了,也可能沒有
        long cachedGatingSequence = this.cachedValue;//獲取消費者未消費的元素的seq最小值,這個值不是實時的

        //wrapPoint > cachedGatingSequence,檢查將要被放入元素的slot是否已經沒有消費者佔用了
        //cachedGatingSequence > nextValue,用於來應對seq發生溢出的狀況
        if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
        {
            cursor.setVolatile(nextValue);  // StoreLoad fence,更新RingBuffer的遊標,用到了Unsafe方法

            long minSequence;
            //Util.getMinimumSequence能夠得到全部消費者未消費事件的seq最小值,在比這個值更小的slot裏發佈元素是安全的
            //若是這個判斷成立,說明生產者正在試圖將元素放到消費者未消費完畢的slot裏,這個操做是不安全的,生產者須要在這裏被阻塞
            while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))
            {
                waitStrategy.signalAllWhenBlocking();//激活全部的消費者(避免有的消費者睡死過去了)
                LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin? 自旋等待
            }

            this.cachedValue = minSequence;//更新cachedValue
        }

        this.nextValue = nextSequence;

        return nextSequence;
    }

邏輯比較難懂,關鍵之處以下:

a. 返回的seq對應的slot必須已經被全部消費者消費完畢

b. Util.getMinimumSequence會遍歷全部消費者使用的Sequence,並獲取其最小值,這是一個比較昂貴的操做,因此將其緩存在本地的cachedValue變量中

c. 若是seq對應的slot還沒被消費者消費完畢,說明生產速度快於消費速度,生產者須要原地自旋等待,同時向消費者發送信號,避免消費者睡死過去的狀況

再來看多生產者版本:

    @Override
    public long next(int n)
    {
        if (n < 1)//參數檢驗
        {
            throw new IllegalArgumentException("n must be > 0");
        }

        long current;
        long next;

        do
        {
            current = cursor.get();//獲取最新返回的seq
            next = current + n;//本次返回的seq

            long wrapPoint = next - bufferSize;//本次返回的seq對應的slot裏的元素的seq
            long cachedGatingSequence = gatingSequenceCache.get();//有多個生產者,gatingSequenceCache其實是SingleProducerSequencer裏的cachedValue的Atomic版本

            if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current)//檢查將要被放入元素的slot是否已經沒有消費者佔用了
            {
                long gatingSequence = Util.getMinimumSequence(gatingSequences, current);//得到全部消費者未消費事件的seq最小值

                if (wrapPoint > gatingSequence)//slot仍被消費者佔用,生產者自旋等待
                {
                    waitStrategy.signalAllWhenBlocking();
                    LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
                    continue;
                }

                gatingSequenceCache.set(gatingSequence);//更新gatingSequenceCache的值
            }
            else if (cursor.compareAndSet(current, next))//用CAS操做更新cursor的值,若是失敗了說明有其餘生產者在爭用,進入下一輪循環
            {
                break;
            }
        }
        while (true);

        return next;
    }

與單生產者版本相似,可是當前序列值以及緩存住的安全序列值都使用了原子變量,以解決線程安全問題。能夠說是很是精巧了。

至於publish方法

SingleProducerSequencer.publish
    @Override
    public void publish(long sequence)
    {
        cursor.set(sequence);//更新RingBuffer的遊標
        waitStrategy.signalAllWhenBlocking();//給消費者發送signal信號,具體怎麼作與waitStrategy的實現有關
    }

MultiProducerSequencer.publish
    @Override
    public void publish(final long sequence)
    {
        setAvailable(sequence);//這裏用了一個額外的availableBuffer數組,來標記RingBuffer的某個slot是否已經被publish成功,後面生產者消費的時候會用到
        waitStrategy.signalAllWhenBlocking();//給消費者發送signal信號,具體怎麼作與waitStrategy的實現有關
    }

單生產者/多生產者之間微妙的區別:

單生產者publish一個seq,那麼這個seq以前全部的seq都被publish了

多生產者publish一個seq,那麼只有這一個seq被publish

 

4. 消費者從RingBuffer獲取數據

消費者是經過調用Disruptor的handlerEventsWith方法被添加到系統中的,其調用鏈以下:

Disruptor.handleEventsWith()
    public EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers)
    {
        return createEventProcessors(new Sequence[0], handlers);//添加事件處理器,
    }

Disruptor.createEventProcessors()
    EventHandlerGroup<T> createEventProcessors(
        final Sequence[] barrierSequences,//默認狀況下,barrierSequences是不存在的
        final EventHandler<? super T>[] eventHandlers)
    {
        checkNotStarted();

        final Sequence[] processorSequences = new Sequence[eventHandlers.length];//爲每一個消費者建立一個Sequence
        final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);//建立一個與當前RingBuffer的生產者有關的Barrier

        for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++)//遍歷消費者
        {
            final EventHandler<? super T> eventHandler = eventHandlers[i];

            final BatchEventProcessor<T> batchEventProcessor =
                new BatchEventProcessor<T>(ringBuffer, barrier, eventHandler);//爲每一個消費者建立一個BatchEventProcessor

            if (exceptionHandler != null)
            {
                batchEventProcessor.setExceptionHandler(exceptionHandler);//異常處理器
            }

            consumerRepository.add(batchEventProcessor, eventHandler, barrier);//將EventHandler與BatchEventProcessor關聯起來
            processorSequences[i] = batchEventProcessor.getSequence();
        }

        updateGatingSequencesForNextInChain(barrierSequences, processorSequences);//將消費者的sequence添加爲RingBuffer的gating sequence

        return new EventHandlerGroup<T>(this, consumerRepository, processorSequences);
    }

 

而後調用Disruptor的start方法啓動系統,其調用鏈以下:

Disruptor.start()
    public RingBuffer<T> start()
    {
        checkOnlyStartedOnce();
        for (final ConsumerInfo consumerInfo : consumerRepository)//遍歷註冊的全部消費者
        {
            consumerInfo.start(executor);
        }

        return ringBuffer;
    }


EventProcessorInfo.start()
    @Override
    public void start(final Executor executor)
    {
        executor.execute(eventprocessor);//這裏是將以前在createEventProcessors中爲消費者註冊的BatchEventProcessor放到線程池裏運行起來了
    }


BatchEventProcessor.run()
    @Override
    public void run()
    {
        if (!running.compareAndSet(false, true))//避免重複運行
        {
            throw new IllegalStateException("Thread is already running");
        }
        sequenceBarrier.clearAlert();

        notifyStart();

        T event = null;
        long nextSequence = sequence.get() + 1L;
        try
        {
            while (true)//在死循環中處理事件
            {
                try
                {
                    final long availableSequence = sequenceBarrier.waitFor(nextSequence);//從RingBuffer中獲取一批能夠處理的事件的seq,策略由以前設置的waitStrategy決定,返回的seq可能會大於nextSequence(批量,提升效率)
                    if (batchStartAware != null)
                    {
                        batchStartAware.onBatchStart(availableSequence - nextSequence + 1);
                    }

                    while (nextSequence <= availableSequence)//在循環中消耗返回的availableSequence
                    {
                        event = dataProvider.get(nextSequence);//從RingBuffer中讀取數據
                        eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);//調用用戶傳入的EventHandler中的onEvent方法來處理事件
                        nextSequence++;
                    }

                    sequence.set(availableSequence);//這一批seq消耗完了,更新當前消費者關聯的sequence,讓生產者能夠知道,這裏調用的是相對廉價的putOrderedLong方法,由於不須要很高的實時性
                }
                catch (final TimeoutException e)
                {
                    notifyTimeout(sequence.get());
                }
                catch (final AlertException ex)
                {
                    if (!running.get())
                    {
                        break;
                    }
                }
                catch (final Throwable ex)
                {
                    exceptionHandler.handleEventException(ex, nextSequence, event);
                    sequence.set(nextSequence);
                    nextSequence++;
                }
            }
        }
        finally
        {
            notifyShutdown();
            running.set(false);
        }
    }


ProcessingSequenceBarrier.waitFor()
    public long waitFor()(final long sequence)
        throws AlertException, InterruptedException, TimeoutException
    {
        checkAlert();

        long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);//調用消費者的waitStrategy來等待sequence變得可用

        if (availableSequence < sequence)
        {
            return availableSequence;
        }

        return sequencer.getHighestPublishedSequence(sequence, availableSequence);//從RingBuffer中找到最大的已經被publish事件的slot,尋找策略根據單生產者/多生產者有不一樣
    }


SingleProducerSequencer.getHighestPublishedSequence()//單生產者的狀況,簡單
    public long getHighestPublishedSequence(long lowerBound, long availableSequence)
    {
        return availableSequence;
    }

MultiProducerSequencer.getHighestPublishedSequence()//多生產者
    public long getHighestPublishedSequence(long lowerBound, long availableSequence)
    {
        for (long sequence = lowerBound; sequence <= availableSequence; sequence++)//從lowerBound開始遍歷,在RingBuffer中找到最大的已經被publish事件的slot
        {
            if (!isAvailable(sequence))
            {
                return sequence - 1;
            }
        }

        return availableSequence;
    }

    public boolean isAvailable(long sequence)
    {
        int index = calculateIndex(sequence);//在RingBuffer中定位
        int flag = calculateAvailabilityFlag(sequence);//後面的3行是在availableBuffer數組中尋找對應位置的元素是否被標記爲available。在MultiProducerSequencer的publish方法中會作這一操做
        long bufferAddress = (index * SCALE) + BASE;
        return UNSAFE.getIntVolatile(availableBuffer, bufferAddress) == flag;
    }

這一塊的代碼也比較難,作個小結:

a. 每一個消費者都與一個BatchEventProcessor關聯,BatchEventProcessor初始化的時候會建立一個sequence,這個sequence記錄的是消費者已經處理的事件的seq

b. Disruptor初始化的時候會建立一個SequenceBarrier,這個barrier與生產者有關

c. 消費者會無限的調用barrier的waitFor方法,以嘗試獲取最新publish的事件,一旦waitFor方法返回了可用的seq,就在循環中調用消費者的onEvent方法將這些事件消耗掉

d. 在waitFor方法中,會根據設定的WaitStrategy來在RingBuffer上查找最新publish的事件的seq

 

5. WaitStrategy

WaitStrategy在消費速度快於生產速度,消費者等待生產者publish新的事件時會被用到。

若是生產速度快於消費速度,生產者等待消費者時,用的是LockSupport.parkNanos(1),相似於自旋等待。

WaitStrategy很是重要,不一樣的WaitStrategy會直接影響到響應事件和CPU佔用,值得專門開一節來分析

WaitStrategy是一個接口,其中只含有兩個方法:

    //sequence:消費者等待這個sequence關聯的事件產生
    //cursor:RingBuffer上,生產者關聯的Sequence
    //dependentSequence:默認狀況下與cursor相同
    //barrier:與生產者關聯的barrier
    long waitFor(long sequence, Sequence cursor, Sequence dependentSequence, SequenceBarrier barrier)
        throws AlertException, InterruptedException, TimeoutException;

    void signalAllWhenBlocking();//若是有消費者阻塞等待,將其喚醒

 

WaitStrategy有多種實現,我這裏只分析兩種最有表明性的:BlockingWaitStrategy與BusySpinWaitStrategy

BlockingWaitWaitStrategy的實現以下:

    private final Lock lock = new ReentrantLock();//鎖與關聯的條件
    private final Condition processorNotifyCondition = lock.newCondition();

    @Override
    public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier)
        throws AlertException, InterruptedException
    {
        long availableSequence;
        if (cursorSequence.get() < sequence)//若是生產者尚未生產出足夠的事件,那麼在鎖上等待
        {
            lock.lock();//嘗試佔有鎖
            try
            {
                while (cursorSequence.get() < sequence)
                {
                    barrier.checkAlert();
                    processorNotifyCondition.await();//等待
                }
            }
            finally
            {
                lock.unlock();//釋放鎖
            }
        }

        while ((availableSequence = dependentSequence.get()) < sequence)
        {
            barrier.checkAlert();
        }

        return availableSequence;
    }

    @Override
    public void signalAllWhenBlocking()
    {
        lock.lock();
        try
        {
            processorNotifyCondition.signalAll();//喚醒全部在鎖上等待的生產者
        }
        finally
        {
            lock.unlock();
        }
    }

代碼很簡單,用JDK自帶的ReentrantLock與Condition來完成消費者的等待控制,只要消費者拿不到可用的事件,就調用Condition.await方法等待

好處:CPU佔用少

壞處:在生產的事件足夠後,消費者沒法在第一時間醒來,須要生產者調用signalAll才行,因爲此時消費者線程可能已經被OS切走了,這會帶來必定的延時

BusySpinWaitStrategy的實現以下:

    @Override
    public long waitFor(
        final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier)
        throws AlertException, InterruptedException
    {
        long availableSequence;

        while ((availableSequence = dependentSequence.get()) < sequence)//只要生產者沒有生產出足夠的事件,消費者就一直自旋等待
        {
            barrier.checkAlert();
        }

        return availableSequence;
    }

    @Override
    public void signalAllWhenBlocking()//消費者根本不被阻塞,因此無需喚醒
    {
    }

這個等待策略就更簡單了,甚至能夠說是喪心病狂,全部消費者都在無限自旋等待(Busy Spin),直到生產者生產了足夠的事件爲止。

好處:延時極低(消費者線程一直是熱的),生產出來的事件會第一時間被消費掉

壞處:有多少個消費者,就會吃滿多少個Core

小結一下:

WaitStrategy其實是延時與CPU資源佔用的權衡

若是你追求最低的延時(ns級別),那就必須保證消費者一直是是熱的,不能被系統調度走,所以你須要BusySpinWaitStrategy

若是你不須要那麼低的延時,那麼基於鎖的BlockingWaitStrategy可能更加適合你

 

 6. Disruptor與ArrayBlockingQueue的比較

ArrayBlockingQueue的源碼分析能夠參見這篇博客,其核心想法以下:

a. 內置一個ReentrantLock與兩個Condition

b. 任何對Queue的讀寫操做均用ReentrantLock加鎖 -> 實現了線程安全的語義

c. 在隊列空/滿的狀況下若是繼續取出/插入元素,則利用Condition將工做線程阻塞,在符合條件的時候再將被阻塞的線程喚醒 -> 實現了阻塞隊列的語義

很明顯,這種實如今高併發的狀況下存在必定的問題:

a. 在任一時刻,只能有一個讀/寫線程在工做,其餘的線程都被ReentrantLock所阻塞

b. takeIndex與putIndex這兩個被頻繁訪問的域在內存上距離很近,容易引發僞共享問題

Disruptor則很好的解決了這些問題,具體請參加本文第一節

 

7. 總結

Disruptor是一個設計很是精巧的框架,爲了追求極致性能,作了不少底層優化,值得學習參考

 

參考資料

併發編程網上關於Disruptor的系列文章

高性能隊列——Disruptor

相關文章
相關標籤/搜索