已經不記得最先接觸到 Disruptor 是何時了,只記得發現它的時候它是以具備閃電般的速度被介紹的。因而在腦子裏, Disruptor 和「閃電」一詞關聯了起來,然而卻一直沒有時間去探究一下。java
最近正在進行一項對性能有很高要求的產品項目的研究,天然想起了閃電般的 Disruptor ,這必有它的用武之地,因而進行了一番探查,將成果和體會記錄在案。git
1、什麼是 Disruptor github
從功能上來看,Disruptor 是實現了「隊列」的功能,並且是一個有界隊列。那麼它的應用場景天然就是「生產者-消費者」模型的應用場合了。算法
能夠拿 JDK 的 BlockingQueue 作一個簡單對比,以便更好地認識 Disruptor 是什麼。緩存
咱們知道 BlockingQueue 是一個 FIFO 隊列,生產者(Producer)往隊列裏發佈(publish)一項事件(或稱之爲「消息」也能夠)時,消費者(Consumer)能得到通知;若是沒有事件時,消費者被堵塞,直到生產者發佈了新的事件。安全
這些都是 Disruptor 能作到的,與之不一樣的是,Disruptor 能作更多:架構
以上的描述雖然簡單地指出了 Disruptor 是什麼,但對於它「能作什麼」還不是那麼直截了當。通常性地來講,當你須要在兩個獨立的處理過程(兩個線程)之間交換數據時,就可使用 Disruptor 。固然使用隊列(如上面提到的 BlockingQueue)也能夠,只不過 Disruptor 作得更好。併發
拿隊列來做比較的作法弱化了對 Disruptor 有多強大的認識,若是想要對此有更多的瞭解,能夠仔細看看 Disruptor 在其東家 LMAX 交易平臺(也是實現者) 是如何做爲核心架構來使用的,這方面就不作詳述了,問度娘或谷哥都能找到。異步
2、Disruptor 的核心概念ide
先從瞭解 Disruptor 的核心概念開始,來了解它是如何運做的。下面介紹的概念模型,既是領域對象,也是映射到代碼實現上的核心對象。
3、如何使用 Disruptor
Disruptor 的 API 十分簡單,主要有如下幾個步驟:
public class LongEvent { private long value; public void set(long value) { this.value = value; } }
import com.lmax.disruptor.EventFactory; public class LongEventFactory implements EventFactory<LongEvent> { public LongEvent newInstance() { return new LongEvent(); } }
import com.lmax.disruptor.EventHandler; public class LongEventHandler implements EventHandler<LongEvent> { public void onEvent(LongEvent event, long sequence, boolean endOfBatch) { System.out.println("Event: " + event); } }
ExecutorService executor = Executors.newCachedThreadPool();
WaitStrategy BLOCKING_WAIT = new BlockingWaitStrategy(); WaitStrategy SLEEPING_WAIT = new SleepingWaitStrategy(); WaitStrategy YIELDING_WAIT = new YieldingWaitStrategy();
EventFactory<LongEvent> eventFactory = new LongEventFactory(); ExecutorService executor = Executors.newSingleThreadExecutor(); int ringBufferSize = 1024 * 1024; // RingBuffer 大小,必須是 2 的 N 次方; Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(eventFactory, ringBufferSize, executor, ProducerType.SINGLE, new YieldingWaitStrategy()); EventHandler<LongEvent> eventHandler = new LongEventHandler(); disruptor.handleEventsWith(eventHandler); disruptor.start();
// 發佈事件; RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); long sequence = ringBuffer.next();//請求下一個事件序號; try { LongEvent event = ringBuffer.get(sequence);//獲取該序號對應的事件對象; long data = getEventData();//獲取要經過事件傳遞的業務數據; event.set(data); } finally{ ringBuffer.publish(sequence);//發佈事件; }
注意,最後的 ringBuffer.publish 方法必須包含在 finally 中以確保必須獲得調用;若是某個請求的 sequence 未被提交,將會堵塞後續的發佈操做或者其它的 producer。
Disruptor 還提供另一種形式的調用來簡化以上操做,並確保 publish 老是獲得調用。static class Translator implements EventTranslatorOneArg<LongEvent, Long>{ @Override public void translateTo(LongEvent event, long sequence, Long data) { event.set(data); } } public static Translator TRANSLATOR = new Translator(); public static void publishEvent2(Disruptor<LongEvent> disruptor) { // 發佈事件; RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); long data = getEventData();//獲取要經過事件傳遞的業務數據; ringBuffer.publishEvent(TRANSLATOR, data); }
此外,Disruptor 要求 RingBuffer.publish 必須獲得調用的潛臺詞就是,若是發生異常也同樣要調用 publish ,那麼,很顯然這個時候須要調用者在事件處理的實現上來判斷事件攜帶的數據是不是正確的或者完整的,這是實現者應該要注意的事情。
disruptor.shutdown();//關閉 disruptor,方法會堵塞,直至全部的事件都獲得處理; executor.shutdown();//關閉 disruptor 使用的線程池;若是須要的話,必須手動關閉, disruptor 在 shutdown 時不會自動關閉;
4、性能對比測試
爲了直觀地感覺 Disruptor 有多快,設計了一個性能對比測試:Producer 發佈 100 萬次事件,從發佈第一個事件開始計時,捕捉 Consumer 處理完全部事件的耗時。
測試用例在 Producer 如何將事件通知到 Consumer 的實現方式上,設計了三種不一樣的實現:
這次測試用例僅作了只有一個 Producer 和一個 Consumer 的情形,測試用例的代碼以下:
CounterTracer tracer = tracerFactory.newInstance(DATA_COUNT);//計數跟蹤到達指定的數值; TestHandler handler = new TestHandler(tracer);//Consumer 的事件處理; EventPublisher publisher = publisherFactory.newInstance(new PublisherCreationArgs(DATA_COUNT, handler));//經過工廠對象建立不一樣的 Producer 的實現; publisher.start(); tracer.start(); //發佈事件; for (int i = 0; i < DATA_COUNT; i++) { publisher.publish(i); } //等待事件處理完成; tracer.waitForReached(); publisher.stop(); //輸出結果; printResult(tracer);
事件處理的實現只是調用一個計數器(CounterTracer)加1,該計數器跟蹤從開始到達到總的事件次數時所耗的時間。
public class TestHandler { private CounterTracer tracer; public TestHandler(CounterTracer tracer) { this.tracer = tracer; } /** * 若是返回 true,則表示處理已經所有完成,再也不處理後續事件; * * @param event * @return */ public boolean process(TestEvent event){ return tracer.count(); } }
針對單一Producer 和單一 Consumer 的測試場景,CounterTracer 的實現以下:
/** * 測試結果跟蹤器,計數器不是線程安全的,僅在單線程的 consumer 測試中使用; * * @author haiq * */ public class SimpleTracer implements CounterTracer { private long startTicks; private long endTicks; private long count = 0; private boolean end = false; private final long expectedCount; private CountDownLatch latch = new CountDownLatch(1); public SimpleTracer(long expectedCount) { this.expectedCount = expectedCount; } @Override public void start() { startTicks = System.currentTimeMillis(); end = false; } @Override public long getMilliTimeSpan() { return endTicks - startTicks; } @Override public boolean count() { if (end) { return end; } count++; end = count >= expectedCount; if (end) { endTicks = System.currentTimeMillis(); latch.countDown(); } return end; } @Override public void waitForReached() throws InterruptedException { latch.await(); } }
第一種 Producer 的實現:直接觸發事件處理;
public class DirectingPublisher implements EventPublisher { private TestHandler handler; private TestEvent event = new TestEvent(); public DirectingPublisher(TestHandler handler) { this.handler = handler; } @Override public void publish(int data) throws Exception { event.setValue(data); handler.process(event); } //省略其它代碼; }
第二種 Producer 的實現:經過 ArrayBlockinigQueue 實現;
public class BlockingQueuePublisher implements EventPublisher { private ArrayBlockingQueue<TestEvent> queue ; private TestHandler handler; public BlockingQueuePublisher(int maxEventSize, TestHandler handler) { this.queue = new ArrayBlockingQueue<TestEvent>(maxEventSize); this.handler = handler; } public void start(){ Thread thrd = new Thread(new Runnable() { @Override public void run() { handle(); } }); thrd.start(); } private void handle(){ try { TestEvent evt ; while (true) { evt = queue.take(); if (evt != null && handler.process(evt)) { //完成後自動結束處理線程; break; } } } catch (InterruptedException e) { e.printStackTrace(); } } @Override public void publish(int data) throws Exception { TestEvent evt = new TestEvent(); evt.setValue(data); queue.put(evt); } //省略其它代碼; }
第三種 Producer 的實現:經過 Disruptor 實現;
public class DisruptorPublisher implements EventPublisher { private class TestEventHandler implements EventHandler<TestEvent> { private TestHandler handler; public TestEventHandler(TestHandler handler) { this.handler = handler; } @Override public void onEvent(TestEvent event, long sequence, boolean endOfBatch) throws Exception { handler.process(event); } } private static final WaitStrategy YIELDING_WAIT = new YieldingWaitStrategy(); private Disruptor<TestEvent> disruptor; private TestEventHandler handler; private RingBuffer<TestEvent> ringbuffer; private ExecutorService executor; public DisruptorPublisher(int bufferSize, TestHandler handler) { this.handler = new TestEventHandler(handler); executor = Executors.newSingleThreadExecutor(); disruptor = new Disruptor<TestEvent>(EVENT_FACTORY, bufferSize, executor, ProducerType.SINGLE, YIELDING_WAIT); } @SuppressWarnings("unchecked") public void start() { disruptor.handleEventsWith(handler); disruptor.start(); ringbuffer = disruptor.getRingBuffer(); } @Override public void publish(int data) throws Exception { long seq = ringbuffer.next(); try { TestEvent evt = ringbuffer.get(seq); evt.setValue(data); } finally { ringbuffer.publish(seq); } } //省略其它代碼; }
Producer 第一種實現並無線程間的交換,實際上就是直接調用計數器,所以以此種實現的測試結果做爲基準,對比其它的兩種實現的測試結果。
在個人CPU CORE i5 / 4G 內存 / Win7 64 位的筆記本上,數據量(DATA_COUNT)取值爲 1024 * 1024 時的測試結果以下:
【基準測試】 [1]--每秒吞吐量:--;(1048576/0ms) [2]--每秒吞吐量:--;(1048576/0ms) [3]--每秒吞吐量:--;(1048576/0ms) [4]--每秒吞吐量:69905066;(1048576/15ms) [5]--每秒吞吐量:--;(1048576/0ms) 【對比測試1: ArrayBlockingQueue 實現】 [1]--每秒吞吐量:4788018;(1048576/219ms) [2]--每秒吞吐量:5165399;(1048576/203ms) [3]--每秒吞吐量:4809981;(1048576/218ms) [4]--每秒吞吐量:5165399;(1048576/203ms) [5]--每秒吞吐量:5577531;(1048576/188ms) 【對比測試2: Disruptor實現】 [1]--每秒吞吐量:33825032;(1048576/31ms) [2]--每秒吞吐量:65536000;(1048576/16ms) [3]--每秒吞吐量:65536000;(1048576/16ms) [4]--每秒吞吐量:69905066;(1048576/15ms) [5]--每秒吞吐量:33825032;(1048576/31ms)
從測試結果看, Disruptor 的性能比 ArrayBlockingQueue 高出了幾乎一個數量級,操做耗時也只有平均20毫秒左右。
因爲篇幅有限,關於 Disruptor 實現高性能的原理,留待之後再作探討。
6、參考資料