追蹤解析 Disruptor 源碼

零 前期準備

0 FBI WARNING

文章異常囉嗦且繞彎。java

1 版本

Disruptor 版本 : Disruptor 3.4.2git

IDE : idea 2018.3github

JDK 版本 : OpenJDK 11.0.1算法

2 Disruptor 簡介

高性能線程間消息隊列框架 Disruptor,是金融與遊戲領域的經常使用開發組件之一,也是 java 日誌框架和流處理框架底層的經常使用依賴。設計模式

3 Demo

Disruptor 的 github 主頁有很是詳細的 quick start demo,本文依照此 demo 作追蹤的模板(作了很小的改動)。數組

另外,對於官方提供的 jdk8 lambda 簡化版 demo 暫不作討論。緩存

import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.util.DaemonThreadFactory;
import java.nio.ByteBuffer;

public class LongEventMain {

    //main 方法,啓動入口
    public static void main(String[] args) throws Exception {

        //在該框架中,全部的 task 的包裝類被稱爲 Event,EventFactory 則是 Event 的生產者
        LongEventFactory factory = new LongEventFactory();

        //RingBuffer 的大小,數字爲字節數
        //RingBuffer 是框架啓動器內部的緩存區,用來存儲 event 內的 task 數據
        int bufferSize = 1024;

        //建立一個 Disruptor 啓動器,其中 DaemonThreadFactory 是一個線程工廠的實現類
        Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, DaemonThreadFactory.INSTANCE);

        
        //該框架本質上是 生產-消費 設計模式的應用。全部的消費者被冠名爲 handler
        //handleEventsWith(...) 方法會在啓動器中註冊 handler
        //此處的參數是不定數量的,能夠有多個消費者,每一個消費者均可以獲取 Event
        disruptor.handleEventsWith(new LongEventHandler("handler1"),new LongEventHandler("handler2"));

        //啓動器開始執行,並獲取其內部的緩存區
        RingBuffer<LongEvent> ringBuffer = disruptor.start();

        //建立一個生產者,負責往緩存區內寫入數據
        LongEventProducer producer = new LongEventProducer(ringBuffer);

        //官方 demo 中使用了 ByteBuffer 來方便操做,其實非必須
        ByteBuffer bb = ByteBuffer.allocate(8);

        for (long l = 0; true; l++) {
            //將變量 l 做爲一個 long 類型的數存入 ByteBuffer 中
            bb.putLong(0, l);
            //將 ByteBuffer 傳入生產者的相關方法中,該方法會負責將 ByteBuffer 中的數據寫入 RingBuffer
            producer.onData(bb);
            //線程休眠
            Thread.sleep(1000);
        }
    }
}

//Event 類,本質上是數據的封裝,是生產者和消費者之間進行數據傳遞的介質
class LongEvent {
    private long value;

    public void set(long value) {
        this.value = value;
    }

    public long get() {
        return value;
    }
}

//Event 的生產工廠類,必須實現 Disruptor 自帶的 EventFactory 接口
class LongEventFactory implements EventFactory<LongEvent> {

    @Override
    public LongEvent newInstance() {
        return new LongEvent();
    }
}

//消費者,必須實現 Disruptor 自帶的 EventHandler 接口
class LongEventHandler implements EventHandler<LongEvent> {

    private String handlerName;

    public LongEventHandler(String handlerName){
        this.handlerName = handlerName;
    }

    //此方法爲最終的消費 Event 的方法
    @Override
    public void onEvent(LongEvent event, long sequence, boolean endOfBatch) {
        System.out.println("Event " + handlerName + " : " + event.get());
    }
}

//生產者,主要負責往 RingBuffer 中寫入數據
//生產者類在框架中並不是必須,可是通常狀況下都會作必定程度的封裝
class LongEventProducer {
    private final RingBuffer<LongEvent> ringBuffer;

    //生產者的構造器負責獲取並存儲啓動器中的 RingBuffer
    public LongEventProducer(RingBuffer<LongEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    public void onData(ByteBuffer bb) {
        //sequence 是 RingBuffer 中的一個數據塊,相似於一個數據地址
        long sequence = ringBuffer.next();
        try {
            //用數據地址去獲取到一個 Event 事件類實例
            LongEvent event = ringBuffer.get(sequence);
            //在實例中存入 ByteBuffer 中的數據
            event.set(bb.getLong(0));
        } finally {
            //發佈該數據塊,此時消費者們均可以看到該數據塊了,能夠進行消費
            ringBuffer.publish(sequence);
        }
    }
}

一 DaemonThreadFactory

在開始正式追蹤代碼以前有必要先來理解 DaemonThreadFactory。這是 Disruptor 自身攜帶的線程工廠類:框架

public enum DaemonThreadFactory implements ThreadFactory{
    
    //線程工廠使用枚舉實現單例模式
    INSTANCE;

    @Override
    public Thread newThread(final Runnable r){
        Thread t = new Thread(r);
        //此處建立的線程是守護線程
        t.setDaemon(true);
        return t;
    }
}

二 Disruptor

本 part 主要追蹤 demo 中 Disruptor 相關的代碼:less

Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, DaemonThreadFactory.INSTANCE);
disruptor.handleEventsWith(new LongEventHandler("handler1"),new LongEventHandler("handler2"));
RingBuffer<LongEvent> ringBuffer = disruptor.start();

1 disruptor 的建立

來看下方代碼:jvm

Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, DaemonThreadFactory.INSTANCE);

追蹤 Disruptor 的構造器:

//step 1
//Disruptor.class
public Disruptor(final EventFactory<T> eventFactory, final int ringBufferSize, final ThreadFactory threadFactory){
    //RingBuffer.createMultiProducer(...) 方法會返回一個 RingBuffer
    //BasicExecutor 是線程和線程工廠的封裝類
    this(RingBuffer.createMultiProducer(eventFactory, ringBufferSize), new BasicExecutor(threadFactory));
}

//step 2
//Disruptor.class
private Disruptor(final RingBuffer<T> ringBuffer, final Executor executor){
    //存入 RingBuffer 和 Executor
    this.ringBuffer = ringBuffer;
    this.executor = executor;
}

可是實際上 Disruptor 提供了不少的構造器,其中還有一個較高配置權限的:

//Disruptor.class
public Disruptor(final EventFactory<T> eventFactory,final int ringBufferSize,
                final ThreadFactory threadFactory,final ProducerType producerType,
                final WaitStrategy waitStrategy){
    //解釋傳入的參數:
    //eventFactory 是 Event 類的建立工廠
    //ringBufferSize 是 RingBuffer 的字節數大小
    //threadFactory 是線程工廠
    //ProducerType 是生產者的類型,分爲單生產者類型(single)和多生產者類型(multi),默認爲 multi
    //waitStrategy 是框架中的一個接口,表示等待策略,默認爲 BlockingWaitStrategy(阻塞等待),WaitStrategy 的可講內容較多,在後頭開一個單獨 part
    this(RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),new BasicExecutor(threadFactory));
}

先來看 ProducerType:

public enum ProducerType{
    SINGLE,
    MULTI
}

僅僅只是個標記,很少贅述。

1.1 BasicExecutor

BasicExecutor 是 Executor 的實現類,其內部維護着一個線程工廠和一個線程隊列,核心方法爲 execute(...):

//BasicExecutor.class
public void execute(Runnable command){
    
    //使用線程工廠建立一個線程,此處的 factory 即爲 DaemonThreadFactory
    final Thread thread = factory.newThread(command);
    //有效性驗證
    if (null == thread){
        throw new RuntimeException("Failed to create thread to run: " + command);
    }

    //開啓線程
    thread.start();
    //threads 是一個 ConcurrentLinkedQueue<Thread> 類型的變量,用來存儲線程
    threads.add(thread);
}

1.2 RingBuffer 的建立

再來追蹤一下 RingBuffer 的建立:

//RingBuffer.class
public static <E> RingBuffer<E> create(ProducerType producerType,EventFactory<E> factory,
                                        int bufferSize,WaitStrategy waitStrategy){
    //此處根據 ProducerType 進行分發操做
    switch (producerType){
        case SINGLE:
            //建立單消費者的 producer
            return createSingleProducer(factory, bufferSize, waitStrategy);
        case MULTI:
            //建立多消費者的 producer
            return createMultiProducer(factory, bufferSize, waitStrategy);
        default:
            //拋出錯誤
            throw new IllegalStateException(producerType.toString());
    }
}

本質上這兩種模式的 RingBuffer 的建立差距並不大,此處追蹤 createMultiProducer(...) 方法:

//step 1
//RingBuffer.class
public static <E> RingBuffer<E> createMultiProducer(EventFactory<E> factory,int bufferSize,WaitStrategy waitStrategy){
    //MultiProducerSequencer 是 RingBuffer 中用來在生產者和消費者之間傳遞數據的組件
    //sequencer 是 RingBuffer 中的核心組件,是區別 SINGLE 和 MULTI 的關鍵,後文會繼續理解
    MultiProducerSequencer sequencer = new MultiProducerSequencer(bufferSize, waitStrategy);
    //自身構造器
    return new RingBuffer<E>(factory, sequencer);
}

//step 2
//RingBuffer.class
RingBuffer(EventFactory<E> eventFactory,Sequencer sequencer){
    //調用父類 RingBufferFields 的構造方法
    super(eventFactory, sequencer);
}

//step 3
//RingBufferFields.class
RingBufferFields(EventFactory<E> eventFactory,Sequencer sequencer){
    //此處爲 MultiProducerSequencer
    this.sequencer = sequencer;
    //獲取使用者自定義的 bufferSize 並記錄下來
    this.bufferSize = sequencer.getBufferSize();

    //bufferSize 的有效性驗證
    if (bufferSize < 1){
        throw new IllegalArgumentException("bufferSize must not be less than 1");
    }

    if (Integer.bitCount(bufferSize) != 1){
        throw new IllegalArgumentException("bufferSize must be a power of 2");
    }

    //根據 bufferSize 肯定序列號最大值,由於從 0 開始因此要減一
    this.indexMask = bufferSize - 1;
    //entries 是一個 Object 數組,用於存放 Event
    //BUFFER_PAD 是對整個緩衝區的填充
    this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];
    //fill(...) 方法會從新設置 entries
    fill(eventFactory);
}

//step 4
//RingBuffer.class
private void fill(EventFactory<E> eventFactory){
    for (int i = 0; i < bufferSize; i++){
        //遍歷數組進行 Event 的填充
        entries[BUFFER_PAD + i] = eventFactory.newInstance();
    }
}

2 消費者的註冊

來看下方代碼:

disruptor.handleEventsWith(new LongEventHandler("handler1"),new LongEventHandler("handler2"));

追蹤 handleEventsWith(...) 方法:

//step 1
//Disruptor.class
public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers){
    //Sequence 能夠看作是 long 型的封裝類
    //此處的第一個參數是前置關卡,在處理 handler 以前會進行處理的事件
    //handlers 即爲消費者
    return createEventProcessors(new Sequence[0], handlers);
}

//step 2
//Disruptor.class
EventHandlerGroup<T> createEventProcessors(final Sequence[] barrierSequences,final EventHandler<? super T>[] eventHandlers){
    
    //Disruptor 中有一個 AtomicBoolean 類型的變量 started
    //checkNotStarted() 會檢查該變量的值是否爲 true,若是是的話就證實已經啓動了,則拋出異常
    checkNotStarted();

    //processorSequences 是每一個消費者對應的執行器的序列號
    final Sequence[] processorSequences = new Sequence[eventHandlers.length];

    //此處返回的 barrier 能夠看作是上文 MultiProducerSequencer 的封裝加強
    final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);

    for (int i = 0,eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++){
        final EventHandler<? super T> eventHandler = eventHandlers[i];

        //batchEventProcessor 是存儲了消費者和生產者的執行器,實現了 Runnable 接口,內部會不斷循環去接收並處理事件
        final BatchEventProcessor<T> batchEventProcessor =
            new BatchEventProcessor<>(ringBuffer, barrier, eventHandler);

        //exceptionHandler 是用於處理錯誤的消費者組件
        if (exceptionHandler != null){
            batchEventProcessor.setExceptionHandler(exceptionHandler);
        }

        //consumerRepository 能夠看作是消費者的集合封裝
        //consumerRepository 會將傳入的三個參數包裝成 EventProcessorInfo 並儲存在集合和 map 裏
        consumerRepository.add(batchEventProcessor, eventHandler, barrier);

        //記錄下消費者對應的執行器的序列號
        processorSequences[i] = batchEventProcessor.getSequence();
    }

    //處理一些前置事件,在本例中沒有前置事件存在
    updateGatingSequencesForNextInChain(barrierSequences, processorSequences);

    return new EventHandlerGroup<>(this, consumerRepository, processorSequences);
}

2.1 newBarrier

來追蹤一下 ringBuffer.newBarrier(...) 方法:

//step 1
//RingBuffer.class
public SequenceBarrier newBarrier(Sequence... sequencesToTrack){
    //在本例中,此處的 sequencesToTrack 是 Sequence[0]
    //此處的 sequencer 即爲 MultiProducerSequencer
    return sequencer.newBarrier(sequencesToTrack);
}

//step 2
//AbstractSequencer.class
public SequenceBarrier newBarrier(Sequence... sequencesToTrack){
    //此方法被定義在 MultiProducerSequencer 的父類 AbstractSequencer 中
    //cursor 是在 AbstractSequencer 中實例化的一個 Sequence 類型對象,是 MultiProducerSequencer 的序列號
    return new ProcessingSequenceBarrier(this, waitStrategy, cursor, sequencesToTrack);
}

//step 3
//ProcessingSequenceBarrier.class
ProcessingSequenceBarrier(final Sequencer sequencer,final WaitStrategy waitStrategy,
                            final Sequence cursorSequence,final Sequence[] dependentSequences){
    //即爲 Disruptor 啓動器中的 MultiProducerSequencer
    this.sequencer = sequencer;
    //即爲 Disruptor 啓動器中的阻塞策略
    this.waitStrategy = waitStrategy;  
    //上述方法的 cursor
    this.cursorSequence = cursorSequence;
    if (0 == dependentSequences.length){
        //此處的 dependentSequences 是長度是 0,因此此處
        dependentSequence = cursorSequence;
    }else{
        dependentSequence = new FixedSequenceGroup(dependentSequences);
    }
}

須要注意的是,此處的 sequencer 已經被抽象成了 SingleProducerSequencer 和 MultiProducerSequencer 的共同實現接口 Sequencer。

因此對於 SingleProducerSequencer 來講,這個流程也是沒有區別的。

2.2 updateGatingSequencesForNextInChain

回到上述代碼:

//此處的 barrierSequences 是一個 Sequence[0] 數組,processorSequences 是全部消費者的序列號集合
updateGatingSequencesForNextInChain(barrierSequences, processorSequences);

追蹤該方法的實現:

//step 1
//Disruptor.class
private void updateGatingSequencesForNextInChain(final Sequence[] barrierSequences, final Sequence[] processorSequences){
    
    //processorSequences.length 大於 0 意味着消費者數量大於 0
    if (processorSequences.length > 0){
        ringBuffer.addGatingSequences(processorSequences);

        //barrierSequences 是前置事件的集合
        //因爲此處的 barrierSequences 是長度爲 0 的 Sequence 數組,即沒有前置事件,因此此處不會進入循環,忽略
        for (final Sequence barrierSequence : barrierSequences){
            ringBuffer.removeGatingSequence(barrierSequence);
        }
        //unMarkEventProcessorsAsEndOfChain(...) 方法也是處理 barrierSequences 的,忽略
        consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences);
    }
}

addGatingSequences

追蹤 ringBuffer.addGatingSequences(...) 方法:

//step 1
//RingBuffer.class
public void addGatingSequences(Sequence... gatingSequences){
    //sequencer 爲 MultiProducerSequencer
    sequencer.addGatingSequences(gatingSequences);
}

//step 2
//AbstractSequencer.class
public final void addGatingSequences(Sequence... gatingSequences){
    //此處的 SEQUENCE_UPDATER 是一個 AtomicReferenceFieldUpdaterImpl 類型的變量,用於 CAS 操做 gatingSequences
    SequenceGroups.addSequences(this, SEQUENCE_UPDATER, this, gatingSequences);
}

//step 3
//SequenceGroups.class
static <T> void addSequences(final T holder,final AtomicReferenceFieldUpdater<T, Sequence[]> updater,
                            final Cursored cursor,final Sequence... sequencesToAdd){
    long cursorSequence;
    Sequence[] updatedSequences;
    Sequence[] currentSequences;

    do{
        //此處的 holder 即爲 MultiProducerSequencer,此處獲取其內部的 gatingSequences 變量
        currentSequences = updater.get(holder);
        //此處爲 copyOf(...) 方法爲 java.util.Arrays.copyOf(...) 方法,用於將 currentSequences 複製一份
        updatedSequences = copyOf(currentSequences, currentSequences.length + sequencesToAdd.length);
        //此處的 cursor 即爲 MultiProducerSequencer,getCursor() 方法會獲取其的序列號
        cursorSequence = cursor.getCursor();

        int index = currentSequences.length;
        //此處的 sequencesToAdd 是以前消費者的序列號集合,更新 sequencesToAdd 中的每一個序列號封裝
        //將 MultiProducerSequencer 的序列號註冊進去,並填充到新集合的後面一半中
        for (Sequence sequence : sequencesToAdd){
            sequence.set(cursorSequence);
            updatedSequences[index++] = sequence;
        }
    }while (!updater.compareAndSet(holder, currentSequences, updatedSequences));
    //此處的 while 會死循環 CAS 操做直到更新成功

    //在此獲取 MultiProducerSequencer 的序列號,更新到 sequencesToAdd 的每一個序列號封裝類中
    cursorSequence = cursor.getCursor();
    for (Sequence sequence : sequencesToAdd){
        sequence.set(cursorSequence);
    }
}

3 Disruptor 的啓動

來看下方代碼:

disruptor.start();

追蹤 start(...) 方法:

//Disruptor.class
public RingBuffer<T> start(){
    //確認該 Disruptor 沒有啓動
    checkOnlyStartedOnce();
    //此處的 consumerInfo 是 EventProcessorInfo 類型的變量
    for (final ConsumerInfo consumerInfo : consumerRepository){
        consumerInfo.start(executor);
    }

    return ringBuffer;
}

先來看 checkOnlyStartedOnce() 方法:

//Disruptor.class
private void checkOnlyStartedOnce(){
    //若是在調用該 CAS 方法以前已經爲 true 了,會拋出錯誤
    //其實就是確保在調用該方法以前還處於未開啓的狀態
    if (!started.compareAndSet(false, true)){
        throw new IllegalStateException("Disruptor.start() must only be called once.");
    }
}

再來追蹤 EventProcessorInfo 的 start(...) 方法:

//EventProcessorInfo.class
public void start(final Executor executor){
    //此處的 executor 即爲 BasicExecutor
    executor.execute(eventprocessor);
}

因此本質上 Disruptor 的啓動就是開啓 BasicExecutor,藉此啓動線程。

3.1 BatchEventProcessor

上述代碼中啓動線程的時候會傳入 eventprocessor 對象做爲 task 去啓動消費者。eventprocessor 對象本質上是上文中提到過的 BatchEventProcessor。

BatchEventProcessor 可以被傳入 execute(...) 方法,證實其實現了 Runnable 接口:

//step 1
//BatchEventProcessor.class
@Override
public void run(){
    //running 是一個定義在 BatchEventProcessor 中的 AtomicInteger 類型的變量
    //CAS 操做,先判斷 running 的值是否等於 IDLE,若是是的話就修改爲 RUNNING,並返回 true
    //IDLE = 1,RUNNING = 2,皆爲 int 類型的常量
    if (running.compareAndSet(IDLE, RUNNING)){
        
        //此處修改 sequenceBarrier 中 alert 變量的狀態值,清除掉中斷狀態
        sequenceBarrier.clearAlert();

        //若是傳入的消費者實現了 LifecycleAware 接口,就會在 notifyStart() 方法中去執行相關方法
        //LifecycleAware 中定義了 onStart() 和 onShutdown() 方法,會分別在消費者真正執行以前和關閉以前執行一次
        //執行 LifecycleAware 的 onStart() 方法
        notifyStart();
        try{
            //若是 running 是 RUNNING 狀態,就會進入死循環
            if (running.get() == RUNNING){
                //核心方法
                processEvents();
            }
        }finally{
            //執行 LifecycleAware 的 onShutdown() 方法
            notifyShutdown();
            //切換 running 的狀態值
            running.set(IDLE);
        }
    }else{
        if (running.get() == RUNNING){
            throw new IllegalStateException("Thread is already running");
        }else{
            earlyExit();
        }
    }
}

//step 2
//BatchEventProcessor.class
private void processEvents(){
    T event = null;
    //此處的 sequence 記錄着當前消費者已經處理過的事件的編號,初始化的時候爲 -1,因此 nextSequence 初始爲 0,每次加一
    //nextSequence 是當前消費者下一項準備處理的事件的編號
    long nextSequence = sequence.get() + 1L;

    //死循環
    while (true){
        try{
            //當沒有事件發生的時候,消費者所在的線程會在此等待,具體的實現依照使用者設置的等待策略的不一樣而不一樣
            //本例中使用的是 BlockingWaitStrategy,因此會在此阻塞直到出現了事件
            //返回的 availableSequence 是最新的事件的編號,在任務量較小的狀況下和 nextSequence 數值相同,在任務量較大的狀況下小於 nextSequence
            //等待策略留在後頭展開
            final long availableSequence = sequenceBarrier.waitFor(nextSequence);

            if (batchStartAware != null){
                batchStartAware.onBatchStart(availableSequence - nextSequence + 1);
            }

            //nextSequence 大於 availableSequence 的狀況理論上不會出現
            while (nextSequence <= availableSequence){
                //dataProvider 就是以前初始化的 RingBuffer,RingBuffer 在此處會去獲取當前編號的 Event
                event = dataProvider.get(nextSequence);
                //onEvent(...) 是 EventHandler 接口定義的方法,是消費者消費 Event 的最重要方法,方法體由使用者進行定義
                eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
                //編號自增
                nextSequence++;
            }

            //在消費完當前的全部事件以後,記錄下事件編號
            sequence.set(availableSequence);
        }catch (final TimeoutException e){
            //若是消費者實現了 TimeoutHandler 接口,就能夠在這裏處理超時問題
            notifyTimeout(sequence.get());
        }catch (final AlertException ex){
            //running 的狀態值非 RUNNING,就會退出死循環
            if (running.get() != RUNNING){
                break;
            }
        }catch (final Throwable ex){
            //若是當前的消費者實現了 ExceptionHandler 接口的話,能夠在此處進行錯誤處理
            exceptionHandler.handleEventException(ex, nextSequence, event);
            sequence.set(nextSequence);
            nextSequence++;
        }
    }
}

3.2 WaitStrategy

回到上述代碼的如下這句:

final long availableSequence = sequenceBarrier.waitFor(nextSequence);

追蹤一下 waitFor(...) 方法:

//ProcessingSequenceBarrier.class
public long waitFor(final long sequence) throws AlertException, InterruptedException, TimeoutException{
    
    //若是變量 alert 爲 true 的話會拋出錯誤
    checkAlert();

    //調用等待策略的相關方法
    //返回最新的事件的編號
    long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);

    //若是當前可用的最新事件編號小於傳入的 sequence,就直接返回可用編號便可
    if (availableSequence < sequence){
        return availableSequence;
    }

    //getHighestPublishedSequence(...) 方法會判斷最大的可用的事件編號
    return sequencer.getHighestPublishedSequence(sequence, availableSequence);
}

等待策略的全部實現類都實現了 WaitStrategy 接口:

public interface WaitStrategy{
    //休眠方法
    long waitFor(long sequence, Sequence cursor, Sequence dependentSequence, SequenceBarrier barrier)
        throws AlertException, InterruptedException, TimeoutException;
    //喚醒方法
    void signalAllWhenBlocking();
}

Disruptor 自帶的策略中,經常使用的有如下幾種:

阻塞策略  BlockingWaitStrategy:默認策略,沒有獲取到任務的狀況下線程會進入等待狀態。cpu 消耗少,可是延遲高。
阻塞限時策略  TimeoutBlockingWaitStrategy:相對於BlockingWaitStrategy來講,設置了等待時間,超事後拋異常。
自旋策略  BusySpinWaitStrategy:線程一直自旋等待。cpu 佔用高,延遲低.
Yield 策略 YieldingWaitStrategy:嘗試自旋 100 次,而後調用 Thread.yield() 讓出 cpu。cpu 佔用高,延遲低。
分段策略  SleepingWaitStrategy:嘗試自旋 100 此,而後調用 Thread.yield() 100 次,若是通過這兩百次的操做還未獲取到任務,就會嘗試階段性掛起自身線程。此種方式是對 cpu 佔用和延遲的一種平衡,性能不太穩定。

還有幾種譬如 PhasedBackoffWaitStrategy 和 LiteBlockingWaitStrategy 等,很少介紹。

詳細看一下 BlockingWaitStrategy 的實現:

public final class BlockingWaitStrategy implements WaitStrategy{
    //重入鎖
    private final Lock lock = new ReentrantLock();
    //Condition 用來控制線程的休眠和喚醒
    private final Condition processorNotifyCondition = lock.newCondition();

    @Override
    public long waitFor(long sequence, Sequence cursorSequence, 
                        Sequence dependentSequence, SequenceBarrier barrier)
                        throws AlertException, InterruptedException{
        
        long availableSequence;
        if (cursorSequence.get() < sequence){
            //上鎖
            lock.lock();
            try{
                while (cursorSequence.get() < sequence){
                    //檢查線程是否中斷了,若是已經中斷了就會拋出異常
                    barrier.checkAlert();
                    //休眠線程
                    processorNotifyCondition.await();
                }
            }finally{
                //解鎖
                lock.unlock();
            }
        }

        //生產者進度小於消費者的消費進度,此循環進行等待
        //正常狀況下都會在上方阻塞,不會進入該循環
        while ((availableSequence = dependentSequence.get()) < sequence){
            barrier.checkAlert();
            ThreadHints.onSpinWait();
        }

        return availableSequence;
    }

    @Override
    public void signalAllWhenBlocking(){
        lock.lock();
        try{
            //用 Condition 喚醒所有的線程
            processorNotifyCondition.signalAll();
        }finally{
            lock.unlock();
        }
    }

    //toString() 方法,忽略
    @Override
    public String toString(){
        return "BlockingWaitStrategy{" +
            "processorNotifyCondition=" + processorNotifyCondition +
            '}';
    }
}

3.3 DataProvider

回到上述代碼的如下這句:

event = dataProvider.get(nextSequence);

dataProvider 是一個 DataProvider 類型的變量。DataProvider 本質上是一個 Disruptor 內的接口:

public interface DataProvider<T>{
    T get(long sequence);
}

其存在惟一實現類 RingBuffer。因此 get(...) 方法也在 RingBuffer 中:

//step 1
//RingBuffer.class
@Override
public E get(long sequence){
    //elementAt(...) 方法定義在 RingBuffer 的抽象父類 RingBufferFields 中
    return elementAt(sequence);
}

//step 2
//RingBufferFields.class
protected final E elementAt(long sequence){
    //調用 UNSAFE 的相關方法,經過地址去直接獲取
    //entries 在上文代碼中申請了一系列地址連續的內存
    //REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT) 是一個很巧妙的算法,結果永遠只會在申請下來的內存中循環
    return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT));
}

由此可知,Disruptor 中的全部的事件都非存儲在虛擬機中,而是儲存在虛擬機外,由 Unsafe 類直接調用。

Unsafe 具備 "調用內存對象很快,可是申請內存塊很慢" 的特性,因此也就能夠解釋爲何在初始化的時候要一次性將儲存 Event 的數組進行逐個初始化了(代碼在上述 1.2 小節的 step 4 中)。

有一個注意點,entries 上的元素其實是在 jvm 管轄範圍內的,並不必定須要使用 Unsafe 去調用,這裏只是爲了更高的性能。

三 Event 的產生

在開頭的 demo 中,能夠看到 LongEventProducer 中有一個核心方法:

//LongEventProducer.class
public void onData(ByteBuffer bb) {
    //sequence 是 RingBuffer 中的一個數據塊,相似於一個數據地址
    long sequence = ringBuffer.next();
    try {
        //用數據地址去獲取到一個 Event 事件類實例
        LongEvent event = ringBuffer.get(sequence);
        //在實例中存入 ByteBuffer 中的數據
        event.set(bb.getLong(0));
    } finally {
        //發佈該數據塊,此時消費者們均可以看到該數據塊了,能夠進行消費
        ringBuffer.publish(sequence);
    }
}

這個方法內經過調用 ringBuffer.next() 方法獲取數組內對象的地址,而後經過 ringBuffer.get(...) 方法獲取對象。

在 finally 代碼塊中調用 ringBuffer.publish(...) 方法去發佈該信息。

1 next

回到上述代碼的如下這句:

long sequence = ringBuffer.next();

追蹤 next() 方法:

//step 1
//RingBuffer.class
@Override
public long next(){
    //調用 RingBuffer 內的 MultiProducerSequencer 的相關方法
    return sequencer.next();
}

//step 2
//MultiProducerSequencer.class
@Override
public long next(){
    //調用自身的相關方法
    return next(1);
}

//step 3
//MultiProducerSequencer.class
@Override
public long next(int n){
    //參數有效性驗證,此處 n = 1
    if (n < 1){
        throw new IllegalArgumentException("n must be > 0");
    }

    long current;
    long next;

    //死循環
    do{
        //current 是當前最新的事件編號
        current = cursor.get();
        //此處爲 current + 1,用做下一個事件的編號
        next = current + n;

        //wrapPoint 是事件編號和數組大小的差
        long wrapPoint = next - bufferSize;

        //gatingSequenceCache 的設計很巧妙,它是一個 Sequence 類型的變量,能夠看作是一個 long 整數
        //gatingSequenceCache 的存在乎義是每隔一段時間去檢查一次消費者的處理進度
        //gatingSequenceCache 在每次檢查進度的時候都會更新成 "當前處理最慢的消費者已經處理完成的事件編號"
        //處理邏輯在下方 if 判斷中
        long cachedGatingSequence = gatingSequenceCache.get();

        //cachedGatingSequence > current 的狀況就不會發生,由於 cachedGatingSequence 是消費者處理進度,current 是目前的事件總編號,因此最多相等
        //在消費者算力充足的狀況下,cachedGatingSequence 會和 current 相等
        //wrapPoint > cachedGatingSequence 的狀況,在極端狀況下多是由於生產者的速度太快了,已經遠超過最慢的那個消費者,超過了 "一圈"(即 bufferSize 的大小)

        //此處能夠這麼理解,因爲 RingBuffer 內數組的大小是有限的,若是事件生產的多了,就會覆蓋掉最開始的幾個事件
        //可是若是消費者的進度沒有跟上,來不及消費就被覆蓋了,就形成了 bug,此處即爲抑制策略
        if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current){

            //getMinimumSequence(...) 方法會獲取當前處理事件最慢的那個消費者的處理位置
            long gatingSequence = Util.getMinimumSequence(gatingSequences, current);

            //wrapPoint - gatingSequence = next - bufferSize - gatingSequence >0
            //即 next > bufferSize + gatingSequence,落後了 "一圈"
            if (wrapPoint > gatingSequence){
                //線程掛起 1 納秒,而後跳過本次循環進行下一次循環
                //此處會陷入死循環,阻塞掉生產者,去等待消費者的進度
                LockSupport.parkNanos(1);
                continue;
            }

            //跳出上述循環以後在這裏更新 gatingSequenceCache 的值
            gatingSequenceCache.set(gatingSequence);
        }else if (cursor.compareAndSet(current, next)){
            //若是消費者的進度正常,那麼會在此用 CAS 操做更新 cursor 的值,而且跳出 while 循環
            break;
        }
    }while (true);
    
    //返回
    return next;
}

在線程池(好比筆者比較瞭解的 ThreadPoolExecutor)的實現中,對於 task 過多,溢出等待隊列的狀況,通常會有一種策略去應對。在 ThreadPoolExecutor 中,默認的策略爲拋出錯誤,直接終止程序。

在 Disruptor 中,其實 RingBuffer 就相似一個等待隊列,溢出策略則是暫停 task 的產生,等待線程池去執行。

【此處僅爲類比,不能簡單的把 Disruptor 想成是一個線程池】

2 publish

ringBuffer.publish(...) 是事件發佈的核心方法:

//step 1
//RingBuffer.class
@Override
public void publish(long sequence){
    sequencer.publish(sequence);
}

//step 2
//MultiProducerSequencer.class
@Override
public void publish(final long sequence){
    //此處更新數據
    setAvailable(sequence);
    //此處調用等待策略的 signalAllWhenBlocking() 方法喚醒全部等待的線程
    //具體實現依照 waitStrategy 的不一樣而不一樣
    waitStrategy.signalAllWhenBlocking();
}

//step 3
//MultiProducerSequencer.class
private void setAvailable(final long sequence){
    //calculateAvailabilityFlag(sequence) 能夠簡單理解爲是計算出的圈數,即 (sequence / bufferSize)
    //calculateIndex(sequence) 會計算出新的 sequence 對應組中的哪個位置,即 (sequence % bufferSize)
    setAvailableBufferValue(calculateIndex(sequence), calculateAvailabilityFlag(sequence));
}

//step 4
//MultiProducerSequencer.class
private void setAvailableBufferValue(int index, int flag){
    //SCALE 是本機 Object[] 引用的大小,通常爲 4
    long bufferAddress = (index * SCALE) + BASE;
    //使用 Unsafe 更新元素
    //availableBuffer 是一個 int 數組,大小爲 bufferSize,即和 entries 相同
    //Unsafe.putOrderedInt(...) 會將 availableBuffer 的指定位置(bufferAddress)的元素修改爲 flag
    UNSAFE.putOrderedInt(availableBuffer, bufferAddress, flag);
}

四 一點嘮叨

· Disruptor 的封裝很薄(比起 Netty、Spring 之類的重量級框架),調用鏈路都相對較短

· Disruptor 的環裝緩存區(RingBuffer)的不少概念還有待理解

· 對於筆者這樣的數學苦手來講看底層算法代碼略頭疼

· 僅爲我的的學習筆記,可能存在錯誤或者表述不清的地方,有緣補充

相關文章
相關標籤/搜索