什麼是 Disruptor

已經不記得最先接觸到 Disruptor 是何時了,只記得發現它的時候它是以具備閃電般的速度被介紹的。因而在腦子裏, Disruptor 和「閃電」一詞關聯了起來,然而卻一直沒有時間去探究一下。java

      最近正在進行一項對性能有很高要求的產品項目的研究,天然想起了閃電般的 Disruptor ,這必有它的用武之地,因而進行了一番探查,將成果和體會記錄在案。git

1、什麼是 Disruptor github

從功能上來看,Disruptor 是實現了「隊列」的功能,並且是一個有界隊列。那麼它的應用場景天然就是「生產者-消費者」模型的應用場合了。算法

能夠拿 JDK 的 BlockingQueue 作一個簡單對比,以便更好地認識 Disruptor 是什麼。緩存

咱們知道 BlockingQueue 是一個 FIFO 隊列,生產者(Producer)往隊列裏發佈(publish)一項事件(或稱之爲「消息」也能夠)時,消費者(Consumer)能得到通知;若是沒有事件時,消費者被堵塞,直到生產者發佈了新的事件。安全

這些都是 Disruptor 能作到的,與之不一樣的是,Disruptor 能作更多:架構

  • 同一個「事件」能夠有多個消費者,消費者之間既能夠並行處理,也能夠相互依賴造成處理的前後次序(造成一個依賴圖);
  • 預分配用於存儲事件內容的內存空間;
  • 針對極高的性能目標而實現的極度優化和無鎖的設計;

以上的描述雖然簡單地指出了 Disruptor 是什麼,但對於它「能作什麼」還不是那麼直截了當。通常性地來講,當你須要在兩個獨立的處理過程(兩個線程)之間交換數據時,就可使用 Disruptor 。固然使用隊列(如上面提到的 BlockingQueue)也能夠,只不過 Disruptor 作得更好。併發

拿隊列來做比較的作法弱化了對 Disruptor 有多強大的認識,若是想要對此有更多的瞭解,能夠仔細看看 Disruptor 在其東家 LMAX 交易平臺(也是實現者) 是如何做爲核心架構來使用的,這方面就不作詳述了,問度娘或谷哥都能找到。異步

2、Disruptor 的核心概念ide

先從瞭解 Disruptor 的核心概念開始,來了解它是如何運做的。下面介紹的概念模型,既是領域對象,也是映射到代碼實現上的核心對象。

  • Ring Buffer
    如其名,環形的緩衝區。曾經 RingBuffer 是 Disruptor 中的最主要的對象,但從3.0版本開始,其職責被簡化爲僅僅負責對經過 Disruptor 進行交換的數據(事件)進行存儲和更新。在一些更高級的應用場景中,Ring Buffer 能夠由用戶的自定義實現來徹底替代。
  • Sequence  Disruptor
    經過順序遞增的序號來編號管理經過其進行交換的數據(事件),對數據(事件)的處理過程老是沿着序號逐個遞增處理。一個 Sequence 用於跟蹤標識某個特定的事件處理者( RingBuffer/Consumer )的處理進度。雖然一個 AtomicLong 也能夠用於標識進度,但定義 Sequence 來負責該問題還有另外一個目的,那就是防止不一樣的 Sequence 之間的CPU緩存僞共享(Flase Sharing)問題。
    (注:這是 Disruptor 實現高性能的關鍵點之一,網上關於僞共享問題的介紹已經汗牛充棟,在此再也不贅述)。
  • Sequencer 
    Sequencer 是 Disruptor 的真正核心。此接口有兩個實現類 SingleProducerSequencer、MultiProducerSequencer ,它們定義在生產者和消費者之間快速、正確地傳遞數據的併發算法。
  • Sequence Barrier
    用於保持對RingBuffer的 main published Sequence 和Consumer依賴的其它Consumer的 Sequence 的引用。 Sequence Barrier 還定義了決定 Consumer 是否還有可處理的事件的邏輯。
  • Wait Strategy
    定義 Consumer 如何進行等待下一個事件的策略。 (注:Disruptor 定義了多種不一樣的策略,針對不一樣的場景,提供了不同的性能表現)
  • Event
    在 Disruptor 的語義中,生產者和消費者之間進行交換的數據被稱爲事件(Event)。它不是一個被 Disruptor 定義的特定類型,而是由 Disruptor 的使用者定義並指定。
  • EventProcessor
    EventProcessor 持有特定消費者(Consumer)的 Sequence,並提供用於調用事件處理實現的事件循環(Event Loop)。
  • EventHandler
    Disruptor 定義的事件處理接口,由用戶實現,用於處理事件,是 Consumer 的真正實現。
  • Producer
    即生產者,只是泛指調用 Disruptor 發佈事件的用戶代碼,Disruptor 沒有定義特定接口或類型。

3、如何使用 Disruptor 

Disruptor 的 API 十分簡單,主要有如下幾個步驟:

  1. 定義事件
    事件(Event)就是經過 Disruptor 進行交換的數據類型。

    複製代碼

    public class LongEvent
    {
        private long value;
    
        public void set(long value)
        {
            this.value = value;
        }
    }

    複製代碼

     

  2. 定義事件工廠
    事件工廠(Event Factory)定義瞭如何實例化前面第1步中定義的事件(Event),須要實現接口 com.lmax.disruptor.EventFactory<T>。
    Disruptor 經過 EventFactory 在 RingBuffer 中預建立 Event 的實例。
    一個 Event 實例實際上被用做一個「數據槽」,發佈者發佈前,先從 RingBuffer 得到一個 Event 的實例,而後往 Event 實例中填充數據,以後再發布到 RingBuffer 中,以後由 Consumer 得到該 Event 實例並從中讀取數據。

    複製代碼

    import com.lmax.disruptor.EventFactory;
    
    public class LongEventFactory implements EventFactory<LongEvent>
    {
        public LongEvent newInstance()
        {
            return new LongEvent();
        }
    }

    複製代碼

     

  3. 定義事件處理的具體實現
    經過實現接口 com.lmax.disruptor.EventHandler<T> 定義事件處理的具體實現。

    複製代碼

    import com.lmax.disruptor.EventHandler;
    
    public class LongEventHandler implements EventHandler<LongEvent>
    {
        public void onEvent(LongEvent event, long sequence, boolean endOfBatch)
        {
            System.out.println("Event: " + event);
        }
    }

    複製代碼

     

  4. 定義用於事件處理的線程池
    Disruptor 經過 java.util.concurrent.ExecutorService 提供的線程來觸發 Consumer 的事件處理。例如:
    ExecutorService executor = Executors.newCachedThreadPool();

     

  5. 指定等待策略
    Disruptor 定義了 com.lmax.disruptor.WaitStrategy 接口用於抽象 Consumer 如何等待新事件,這是策略模式的應用。
    Disruptor 提供了多個 WaitStrategy 的實現,每種策略都具備不一樣性能和優缺點,根據實際運行環境的 CPU 的硬件特色選擇恰當的策略,並配合特定的 JVM 的配置參數,可以實現不一樣的性能提高。
    例如,BlockingWaitStrategy、SleepingWaitStrategy、YieldingWaitStrategy 等,其中,
    BlockingWaitStrategy 是最低效的策略,但其對CPU的消耗最小而且在各類不一樣部署環境中能提供更加一致的性能表現;
    SleepingWaitStrategy 的性能表現跟 BlockingWaitStrategy 差很少,對 CPU 的消耗也相似,但其對生產者線程的影響最小,適合用於異步日誌相似的場景;
    YieldingWaitStrategy 的性能是最好的,適合用於低延遲的系統。在要求極高性能且事件處理線數小於 CPU 邏輯核心數的場景中,推薦使用此策略;例如,CPU開啓超線程的特性。
    WaitStrategy BLOCKING_WAIT = new BlockingWaitStrategy();
    WaitStrategy SLEEPING_WAIT = new SleepingWaitStrategy();
    WaitStrategy YIELDING_WAIT = new YieldingWaitStrategy();

     

  6. 啓動 Disruptor

    複製代碼

    EventFactory<LongEvent> eventFactory = new LongEventFactory();
    ExecutorService executor = Executors.newSingleThreadExecutor();
    int ringBufferSize = 1024 * 1024; // RingBuffer 大小,必須是 2 的 N 次方;
            
    Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(eventFactory,
                    ringBufferSize, executor, ProducerType.SINGLE,
                    new YieldingWaitStrategy());
            
    EventHandler<LongEvent> eventHandler = new LongEventHandler();
    disruptor.handleEventsWith(eventHandler);
            
    disruptor.start();

    複製代碼

     

  7. 發佈事件
    Disruptor 的事件發佈過程是一個兩階段提交的過程:
      第一步:先從 RingBuffer 獲取下一個能夠寫入的事件的序號;
      第二步:獲取對應的事件對象,將數據寫入事件對象;
      第三部:將事件提交到 RingBuffer;
    事件只有在提交以後纔會通知 EventProcessor 進行處理;

    複製代碼

    // 發佈事件;
    RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
    long sequence = ringBuffer.next();//請求下一個事件序號;
        
    try {
        LongEvent event = ringBuffer.get(sequence);//獲取該序號對應的事件對象;
        long data = getEventData();//獲取要經過事件傳遞的業務數據;
        event.set(data);
    } finally{
        ringBuffer.publish(sequence);//發佈事件;
    }

    複製代碼

     注意,最後的 ringBuffer.publish 方法必須包含在 finally 中以確保必須獲得調用;若是某個請求的 sequence 未被提交,將會堵塞後續的發佈操做或者其它的 producer。

    Disruptor 還提供另一種形式的調用來簡化以上操做,並確保 publish 老是獲得調用。

    複製代碼

    static class Translator implements EventTranslatorOneArg<LongEvent, Long>{
        @Override
        public void translateTo(LongEvent event, long sequence, Long data) {
            event.set(data);
        }    
    }
        
    public static Translator TRANSLATOR = new Translator();
        
    public static void publishEvent2(Disruptor<LongEvent> disruptor) {
        // 發佈事件;
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
        long data = getEventData();//獲取要經過事件傳遞的業務數據;
        ringBuffer.publishEvent(TRANSLATOR, data);
    }

    複製代碼

    此外,Disruptor 要求 RingBuffer.publish 必須獲得調用的潛臺詞就是,若是發生異常也同樣要調用 publish ,那麼,很顯然這個時候須要調用者在事件處理的實現上來判斷事件攜帶的數據是不是正確的或者完整的,這是實現者應該要注意的事情。

  8. 關閉 Disruptor
    disruptor.shutdown();//關閉 disruptor,方法會堵塞,直至全部的事件都獲得處理;
    executor.shutdown();//關閉 disruptor 使用的線程池;若是須要的話,必須手動關閉, disruptor 在 shutdown 時不會自動關閉;

     

4、性能對比測試

  爲了直觀地感覺 Disruptor 有多快,設計了一個性能對比測試:Producer 發佈 100 萬次事件,從發佈第一個事件開始計時,捕捉 Consumer 處理完全部事件的耗時。

  測試用例在 Producer 如何將事件通知到 Consumer 的實現方式上,設計了三種不一樣的實現:

  1. Producer 的事件發佈和 Consumer 的事件處理都在同一個線程,Producer 發佈事件後當即觸發 Consumer 的事件處理;
  2. Producer 的事件發佈和 Consumer 的事件處理在不一樣的線程,經過 ArrayBlockingQueue 傳遞給 Consumer 進行處理;
  3. Producer 的事件發佈和 Consumer 的事件處理在不一樣的線程,經過 Disruptor 傳遞給 Consumer 進行處理;

這次測試用例僅作了只有一個 Producer 和一個 Consumer 的情形,測試用例的代碼以下:

複製代碼

CounterTracer tracer = tracerFactory.newInstance(DATA_COUNT);//計數跟蹤到達指定的數值;
TestHandler handler = new TestHandler(tracer);//Consumer 的事件處理;
        
EventPublisher publisher = publisherFactory.newInstance(new PublisherCreationArgs(DATA_COUNT, handler));//經過工廠對象建立不一樣的 Producer 的實現;
publisher.start();
tracer.start();
        
//發佈事件;
for (int i = 0; i < DATA_COUNT; i++) {
    publisher.publish(i);
}
        
//等待事件處理完成;
tracer.waitForReached();
        
publisher.stop();
        
//輸出結果;
printResult(tracer);

複製代碼

 

事件處理的實現只是調用一個計數器(CounterTracer)加1,該計數器跟蹤從開始到達到總的事件次數時所耗的時間。

複製代碼

public class TestHandler {
    
    private CounterTracer tracer;
    
    public TestHandler(CounterTracer tracer) {
        this.tracer = tracer;
    }
    
    /**
     * 若是返回 true,則表示處理已經所有完成,再也不處理後續事件;
     * 
     * @param event
     * @return
     */
    public boolean process(TestEvent event){
        return tracer.count();
    }
}

複製代碼

 

針對單一Producer 和單一 Consumer 的測試場景,CounterTracer 的實現以下:

複製代碼

/**
 * 測試結果跟蹤器,計數器不是線程安全的,僅在單線程的 consumer 測試中使用;
 * 
 * @author haiq
 *
 */
public class SimpleTracer implements CounterTracer {

    private long startTicks;
    private long endTicks;
    private long count = 0;
    private boolean end = false;
    private final long expectedCount;
    private CountDownLatch latch = new CountDownLatch(1);

    public SimpleTracer(long expectedCount) {
        this.expectedCount = expectedCount;
    }

    @Override
    public void start() {
        startTicks = System.currentTimeMillis();
        end = false;
    }

    @Override
    public long getMilliTimeSpan() {
        return endTicks - startTicks;
    }

    @Override
    public boolean count() {
        if (end) {
            return end;
        }
        count++;
        end = count >= expectedCount;
        if (end) {
            endTicks = System.currentTimeMillis();
            latch.countDown();
        }
        return end;
    }

    @Override
    public void waitForReached() throws InterruptedException {
        latch.await();
    }
}

複製代碼

 

第一種 Producer 的實現:直接觸發事件處理;

複製代碼

public class DirectingPublisher implements EventPublisher {
    
    private TestHandler handler;    
    private TestEvent event = new TestEvent();
    
    public DirectingPublisher(TestHandler handler) {
        this.handler = handler;
    }

    @Override
    public void publish(int data) throws Exception {
        event.setValue(data);
        handler.process(event);
    }

    //省略其它代碼;    
}

複製代碼

 

第二種 Producer 的實現:經過 ArrayBlockinigQueue 實現;

複製代碼

public class BlockingQueuePublisher implements EventPublisher {
    
    private ArrayBlockingQueue<TestEvent> queue ;    
    private TestHandler handler;    
    public BlockingQueuePublisher(int maxEventSize, TestHandler handler) {
        this.queue = new ArrayBlockingQueue<TestEvent>(maxEventSize);
        this.handler = handler;
    }

    public void start(){
        Thread thrd = new Thread(new Runnable() {
            @Override
            public void run() {
                handle();
            }
        });
        thrd.start();
    }
    
    private void handle(){
        try {
            TestEvent evt ;
            while (true) {
                evt = queue.take();
                if (evt != null && handler.process(evt)) {
                    //完成後自動結束處理線程;
                    break;
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void publish(int data) throws Exception {
        TestEvent evt = new TestEvent();
        evt.setValue(data);
        queue.put(evt);
    }

    //省略其它代碼;
}

複製代碼

 

第三種 Producer 的實現:經過 Disruptor 實現;

複製代碼

public class DisruptorPublisher implements EventPublisher {

    private class TestEventHandler implements EventHandler<TestEvent> {

        private TestHandler handler;

        public TestEventHandler(TestHandler handler) {
            this.handler = handler;
        }

        @Override
        public void onEvent(TestEvent event, long sequence, boolean endOfBatch)
                throws Exception {
            handler.process(event);
        }

    }
    
    private static final WaitStrategy YIELDING_WAIT = new YieldingWaitStrategy();

    private Disruptor<TestEvent> disruptor;
    private TestEventHandler handler;
    private RingBuffer<TestEvent> ringbuffer;    
    private ExecutorService executor;

    public DisruptorPublisher(int bufferSize, TestHandler handler) {
        this.handler = new TestEventHandler(handler);
        executor = Executors.newSingleThreadExecutor();
        disruptor = new Disruptor<TestEvent>(EVENT_FACTORY, bufferSize,
                executor, ProducerType.SINGLE,
                YIELDING_WAIT);
    }

    @SuppressWarnings("unchecked")
    public void start() {
        disruptor.handleEventsWith(handler);
        disruptor.start();
        ringbuffer = disruptor.getRingBuffer();
    }

    @Override
    public void publish(int data) throws Exception {
        long seq = ringbuffer.next();
        try {
            TestEvent evt = ringbuffer.get(seq);
            evt.setValue(data);
        } finally {
            ringbuffer.publish(seq);
        }
    }

    //省略其它代碼;
}

複製代碼

 

 

Producer 第一種實現並無線程間的交換,實際上就是直接調用計數器,所以以此種實現的測試結果做爲基準,對比其它的兩種實現的測試結果。

在個人CPU CORE i5 / 4G 內存 / Win7 64 位的筆記本上,數據量(DATA_COUNT)取值爲 1024 * 1024 時的測試結果以下:

複製代碼

【基準測試】
[1]--每秒吞吐量:--;(1048576/0ms)
[2]--每秒吞吐量:--;(1048576/0ms)
[3]--每秒吞吐量:--;(1048576/0ms)
[4]--每秒吞吐量:69905066;(1048576/15ms)
[5]--每秒吞吐量:--;(1048576/0ms)
【對比測試1: ArrayBlockingQueue 實現】
[1]--每秒吞吐量:4788018;(1048576/219ms)
[2]--每秒吞吐量:5165399;(1048576/203ms)
[3]--每秒吞吐量:4809981;(1048576/218ms)
[4]--每秒吞吐量:5165399;(1048576/203ms)
[5]--每秒吞吐量:5577531;(1048576/188ms)
【對比測試2: Disruptor實現】
[1]--每秒吞吐量:33825032;(1048576/31ms)
[2]--每秒吞吐量:65536000;(1048576/16ms)
[3]--每秒吞吐量:65536000;(1048576/16ms)
[4]--每秒吞吐量:69905066;(1048576/15ms)
[5]--每秒吞吐量:33825032;(1048576/31ms)

複製代碼

 

從測試結果看, Disruptor 的性能比 ArrayBlockingQueue 高出了幾乎一個數量級,操做耗時也只有平均20毫秒左右。

因爲篇幅有限,關於 Disruptor 實現高性能的原理,留待之後再作探討。

 

6、參考資料

  1. Diruptor 頁面:https://github.com/LMAX-Exchange/disruptor
相關文章
相關標籤/搜索