下面以一個簡單的例子來看看Disruptor的用法:生產者發送一個long型的消息,消費者接收消息並打印出來。java
首先,咱們定義一個Event:git
public class LongEvent { private long value; public void set(long value) { this.value = value; } }
爲了使Disruptor對這些Event提早分配,咱們須要建立一個EventFactory:github
import com.lmax.disruptor.EventFactory; public class LongEventFactory implements EventFactory<LongEvent> { public LongEvent newInstance() { return new LongEvent(); } }
事件已經定義好了,咱們須要建立一個消費者來處理這些消息。咱們須要消費者在終端打印接收到的消息的值:網絡
import com.lmax.disruptor.EventHandler; public class LongEventHandler implements EventHandler<LongEvent> { public void onEvent(LongEvent event, long sequence, boolean endOfBatch) { System.out.println("Event: " + event); } }
咱們須要建立一個事件源,咱們假設數據來是來自一些I/O設備,好比網絡或文件。app
import com.lmax.disruptor.RingBuffer; public class LongEventProducer { private final RingBuffer<LongEvent> ringBuffer; public LongEventProducer(RingBuffer<LongEvent> ringBuffer) { this.ringBuffer = ringBuffer; } public void onData(ByteBuffer bb) { long sequence = ringBuffer.next(); // Grab the next sequence try { LongEvent event = ringBuffer.get(sequence); // Get the entry in the Disruptor // for the sequence event.set(bb.getLong(0)); // Fill with data } finally { ringBuffer.publish(sequence); } } }
顯而易見的是,事件發佈比使用簡單隊列更爲複雜,這是事件預分配的緣故,若是2個生產者發佈消息,即在RingBuffer中聲明插槽發佈可用數據,並且須要在try/finally塊中發佈。若是咱們在RingBuffer中申請了一個插槽(RingBuffer.next()),那麼咱們必須發佈這個Sequence,若是沒有發佈或者發佈失敗,那麼Disruptor的將會failed,具體點來說,在多生產者的狀況下,這將致使消費者失速,並且除了重啓沒有其餘辦法能夠解決了。異步
使用version3.0的Translator性能
Disruptor的version3.0給開發者提供了Lambda表達式風格的API,將RingBuffer的複雜性封裝起來。因此,對於3.0之後的首選方法是經過API中發佈事件的Translator部分來發布事件,例如:單元測試
import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.EventTranslatorOneArg; public class LongEventProducerWithTranslator { private final RingBuffer<LongEvent> ringBuffer; public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer) { this.ringBuffer = ringBuffer; } private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR = new EventTranslatorOneArg<LongEvent, ByteBuffer>() { public void translateTo(LongEvent event, long sequence, ByteBuffer bb) { event.set(bb.getLong(0)); } }; public void onData(ByteBuffer bb) { ringBuffer.publishEvent(TRANSLATOR, bb); } }
這種方法的另外一個優勢是能夠將Translator代碼拖到單獨的類中,並方便對其進行單元測試。Disruptor提供了不少不一樣的接口(EventTranslator, EventTranslatorOneArg, EventTranslatorTwoArg等等)能夠提供Translator。緣由是能夠表示爲靜態類或者非捕獲的lambda做爲參數經過Translator傳遞給RingBuffer。測試
最後一步是將全部的東西串聯起來,能夠手動的鏈接全部的組件,可是可能會有點複雜,所以能夠經過DSL來簡化構建,一些複雜的選項不是經過DSL提供的,可是能夠適用於大多數狀況。優化
import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.RingBuffer; import java.nio.ByteBuffer; import java.util.concurrent.Executor; import java.util.concurrent.Executors; public class LongEventMain { public static void main(String[] args) throws Exception { // Executor that will be used to construct new threads for consumers Executor executor = Executors.newCachedThreadPool(); // Specify the size of the ring buffer, must be power of 2. int bufferSize = 1024; // Construct the Disruptor Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, executor); // Connect the handler disruptor.handleEventsWith((event, sequence, endOfBatch) -> System.out.println("Event: " + event)); // Start the Disruptor, starts all threads running disruptor.start(); // Get the ring buffer from the Disruptor to be used for publishing. RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); ByteBuffer bb = ByteBuffer.allocate(8); for (long l = 0; true; l++) { bb.putLong(0, l); ringBuffer.publishEvent((event, sequence, buffer) -> event.set(buffer.getLong(0)), bb); Thread.sleep(1000); } } }
注意再也不須要一些類(例如處理程序、翻譯程序),還須要注意lambda用於publishEvent()
指的是傳入的參數,若是咱們把代碼寫成:
ByteBuffer bb = ByteBuffer.allocate(8); for (long l = 0; true; l++) { bb.putLong(0, l); ringBuffer.publishEvent((event, sequence) -> event.set(bb.getLong(0))); Thread.sleep(1000); }
這將建立一個可捕獲的lambda,這意味須要經過publishEvent()
實例化一個對象以保存ByteBuffer bb
,這將建立額外的垃圾,爲了下降GC壓力,則將調用傳遞給lambda的調用應該是首選。
方法的引用能夠用lambda來代替,fashion的寫法:
import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.RingBuffer; import java.nio.ByteBuffer; import java.util.concurrent.Executor; import java.util.concurrent.Executors; public class LongEventMain { public static void handleEvent(LongEvent event, long sequence, boolean endOfBatch) { System.out.println(event); } public static void translate(LongEvent event, long sequence, ByteBuffer buffer) { event.set(buffer.getLong(0)); } public static void main(String[] args) throws Exception { // Executor that will be used to construct new threads for consumers Executor executor = Executors.newCachedThreadPool(); // Specify the size of the ring buffer, must be power of 2. int bufferSize = 1024; // Construct the Disruptor Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, executor); // Connect the handler disruptor.handleEventsWith(LongEventMain::handleEvent); // Start the Disruptor, starts all threads running disruptor.start(); // Get the ring buffer from the Disruptor to be used for publishing. RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); ByteBuffer bb = ByteBuffer.allocate(8); for (long l = 0; true; l++) { bb.putLong(0, l); ringBuffer.publishEvent(LongEventMain::translate, bb); Thread.sleep(1000); } } }
使用上述方法在最經常使用的場景中已經夠用了,可是若是你但願追求極致,還可以針對須要運行的硬件和軟件,利用一些優化選項來提升性能。調優主要有兩種方法:單或多生產者和可選等待策略。
一個在concurrent系統提升性能的最好方式是單個Write的原則,這一樣也適用於Disruptor,若是你在這種只有一個單線程的生產者發送Event的的Disruptor中,那麼你能利用這個來得到額外的性能。
public class LongEventMain { public static void main(String[] args) throws Exception { //..... // Construct the Disruptor with a SingleProducerSequencer Disruptor<LongEvent> disruptor = new Disruptor( factory, bufferSize, ProducerType.SINGLE, new BlockingWaitStrategy(), executor); //..... } }
能有多少性能優點能夠經過 OneToOne performance test測試,Tests運行在i7 Sandy Bridge MacBook Air。
多生產者:
Run 0, Disruptor=26,553,372 ops/sec Run 1, Disruptor=28,727,377 ops/sec Run 2, Disruptor=29,806,259 ops/sec Run 3, Disruptor=29,717,682 ops/sec Run 4, Disruptor=28,818,443 ops/sec Run 5, Disruptor=29,103,608 ops/sec Run 6, Disruptor=29,239,766 ops/sec
單生產者:
Run 0, Disruptor=89,365,504 ops/sec Run 1, Disruptor=77,579,519 ops/sec Run 2, Disruptor=78,678,206 ops/sec Run 3, Disruptor=80,840,743 ops/sec Run 4, Disruptor=81,037,277 ops/sec Run 5, Disruptor=81,168,831 ops/sec Run 6, Disruptor=81,699,346 ops/sec
默認的Disruptor使用的等待策略是BlockingWaitStrategy(阻塞等待策略),阻塞等待策略內部使用的是典型的鎖和Condition條件變量來處理線程喚醒,這是最慢的等待策略了,可是在CPU使用率上最保守並且能給予確定的一致性行爲。
休眠等待策略(SleepingWaitStrategy)
和BlockingWaitStrategy同樣,爲了保證CPU的使用率,不是經過一個簡單的忙等待循環,而是使用一個叫LockSupport.parknanos(1)
在循環中,在典型的Linux系統中將暫停60µs,這樣顯然是有優點的,生產者線程不須要增長計數器,也不須要信號條件。可是在生產者和消費者之間移動事件的平均延遲事件會更高。休眠等待策略在不須要低延遲的狀況下效果最好,可是對生成線程的影響是很小的,一個常見的用例是異步日誌記錄。
退出等待策略(YieldingWaitStrategy)
退出等待策略是兩個等待策略中能夠被用到低延遲的策略,消耗CPU來提升實時性。YieldingWaitStrategy會忙等待Sequence增長爲適當的值。在循環體中Thread.yield()
將會容許其餘線程運行,當須要很是高的性能和事件處理線程的的數量小於邏輯核心的總數時,這是推薦的等待策略,啓用了超線程。
自旋等待策略(BusySpinWaitStrategy)
自旋等待策略是常見的等待策略,可是對部署環境也有很高的要求。自旋等待策略應該只能被用在處理線程數小於實際核數的時候。另外,超線程應該被關閉。
當經過Disruptor傳遞數據的時候,對象的存活壽命可能比預期的要長,爲了不這種狀況發生,可能須要在處理完事件之後清除它。若是隻有一個單事件處理程序,那麼在一個處理程序中清除data就夠了。若是有一個事件處理鏈,那麼須要在鏈結束的地方利用特殊的處理程序來清除對象。
class ObjectEvent<T> { T val; void clear() { val = null; } } public class ClearingEventHandler<T> implements EventHandler<ObjectEvent<T>> { public void onEvent(ObjectEvent<T> event, long sequence, boolean endOfBatch) { // Failing to call clear here will result in the // object associated with the event to live until // it is overwritten once the ring buffer has wrapped // around to the beginning. event.clear(); } } public static void main(String[] args) { Disruptor<ObjectEvent<String>> disruptor = new Disruptor<>( () -> ObjectEvent<String>(), bufferSize, executor); disruptor .handleEventsWith(new ProcessingEventHandler()) .then(new ClearingObjectHandler()); }
參考資料: