本文譯自Dirsruptor在github上的wiki中文章:Getting Startedhtml
Disruptor jar包能夠從maven倉庫mvnrepository獲取,能夠將其集成進項目的依賴管理中。java
<dependency> <groupId>com.lmax</groupId> <artifactId>disruptor</artifactId> <version>3.4.2</version> </dependency>
public class LongEvent { private long value; public void set(long value) { this.value = value; } }
public class LongEventFactory implements EventFactory<LongEvent> { public LongEvent newInstance() { return new LongEvent(); } }
public class LongEventHandler implements EventHandler<LongEvent> { public void onEvent(LongEvent event, long sequence, boolean endOfBatch) { System.out.println("Event: " + event); } }
隨着3.0版本,Disruptor經過將複雜邏輯囊括在RingBuffer中,從而提供了豐富的Lambda-style API幫助開發者構建Producer。所以從3.0以後,更偏心使用Event Publisher/Event Translator的API發佈消息:app
public class LongEventProducerWithTranslator { private final RingBuffer<LongEvent> ringBuffer; public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer) { this.ringBuffer = ringBuffer; } private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR = new EventTranslatorOneArg<LongEvent, ByteBuffer>() { public void translateTo(LongEvent event, long sequence, ByteBuffer bb) { event.set(bb.getLong(0)); } }; public void onData(ByteBuffer bb) { ringBuffer.publishEvent(TRANSLATOR, bb); } }
這種方式的另外一個優點在於Translator代碼能夠被分離在單獨的類中,同時也比較容易進行無依賴的單元測試。Disruptor提供了許多不一樣的接口(EventTranslator, EventTranslatorOneArg, EventTranslatorTwoArg, etc.),能夠經過實現這些接口提供translators。緣由是容許轉換器被表示爲靜態類或非捕獲lambda做爲轉換方法的參數經過Ring Buffer上的調用傳遞給轉換器。異步
public class LongEventProducer { private final RingBuffer<LongEvent> ringBuffer; public LongEventProducer(RingBuffer<LongEvent> ringBuffer) { this.ringBuffer = ringBuffer; } public void onData(ByteBuffer bb) { long sequence = ringBuffer.next(); // Grab the next sequence try { LongEvent event = ringBuffer.get(sequence); // Get the entry in the Disruptor // for the sequence event.set(bb.getLong(0)); // Fill with data } finally { ringBuffer.publish(sequence); } } }
public class LongEventMain { public static void main(String[] args) throws Exception { // The factory for the event LongEventFactory factory = new LongEventFactory(); // Specify the size of the ring buffer, must be power of 2. int bufferSize = 1024; // Construct the Disruptor Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, DaemonThreadFactory.INSTANCE); // Connect the handler disruptor.handleEventsWith(new LongEventHandler()); // Start the Disruptor, starts all threads running disruptor.start(); // Get the ring buffer from the Disruptor to be used for publishing. RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); LongEventProducer producer = new LongEventProducer(ringBuffer); ByteBuffer bb = ByteBuffer.allocate(8); for (long l = 0; true; l++) { bb.putLong(0, l); producer.onData(bb); Thread.sleep(1000); } } }
關於對Disruptor的接口設計的影響之一是Java 8,由於它使用了Functional Interfaces去實現Java Lambdas。在Disruptor API的大多數接口都被定義成Functional Interfaces以便Lambdas能夠被使用。以上的LongEventMain可使用Lambdas進行簡化:
public class LongEventMain { public static void main(String[] args) throws Exception { // Specify the size of the ring buffer, must be power of 2. int bufferSize = 1024; // Construct the Disruptor Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE); // Connect the handler disruptor.handleEventsWith((event, sequence, endOfBatch) -> System.out.println("Event: " + event)); // Start the Disruptor, starts all threads running disruptor.start(); // Get the ring buffer from the Disruptor to be used for publishing. RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); ByteBuffer bb = ByteBuffer.allocate(8); for (long l = 0; true; l++) { bb.putLong(0, l); ringBuffer.publishEvent((event, sequence, buffer) -> event.set(buffer.getLong(0)), bb); Thread.sleep(1000); } } }
ByteBuffer bb = ByteBuffer.allocate(8); for (long l = 0; true; l++) { bb.putLong(0, l); ringBuffer.publishEvent((event, sequence) -> event.set(bb.getLong(0))); Thread.sleep(1000); }
注意這裏使用了捕獲式的Lambda,意味着經過調用publishEvent()時可能須要實例化一個對象來持有ByteBuffer bb將其傳遞給lambda。這個將可能建立額外的垃圾,若是對GC壓力有嚴格要求的狀況下,經過傳遞參數的方式將更加受歡迎。
public class LongEventMain { public static void handleEvent(LongEvent event, long sequence, boolean endOfBatch) { System.out.println(event); } public static void translate(LongEvent event, long sequence, ByteBuffer buffer) { event.set(buffer.getLong(0)); } public static void main(String[] args) throws Exception { // Specify the size of the ring buffer, must be power of 2. int bufferSize = 1024; // Construct the Disruptor Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE); // Connect the handler disruptor.handleEventsWith(LongEventMain::handleEvent); // Start the Disruptor, starts all threads running disruptor.start(); // Get the ring buffer from the Disruptor to be used for publishing. RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); ByteBuffer bb = ByteBuffer.allocate(8); for (long l = 0; true; l++) { bb.putLong(0, l); ringBuffer.publishEvent(LongEventMain::translate, bb); Thread.sleep(1000); } } }
提升併發系統的性能的最好方式是遵循Single Writer Principle,這個也在Disruptor也被應用。若是在你的場景中只僅僅是單生產者,而後你能夠調優得到額外的性能提高:
public class LongEventMain { public static void main(String[] args) throws Exception { //..... // Construct the Disruptor with a SingleProducerSequencer Disruptor<LongEvent> disruptor = new Disruptor( factory, bufferSize, ProducerType.SINGLE, new BlockingWaitStrategy(), DaemonThreadFactory.INSTANCE); //..... } }
爲了說明經過這種技術方式能替身多少性能優點,這裏有一份測試類OneToOne performance test。在i7 Sandy Bridge MacBook Air的運行結果:
Multiple Producer:
Run 0, Disruptor=26,553,372 ops/sec
Run 1, Disruptor=28,727,377 ops/sec
Run 2, Disruptor=29,806,259 ops/sec
Run 3, Disruptor=29,717,682 ops/sec
Run 4, Disruptor=28,818,443 ops/sec
Run 5, Disruptor=29,103,608 ops/sec
Run 6, Disruptor=29,239,766 ops/sec
Single Producer:
Run 0, Disruptor=89,365,504 ops/sec
Run 1, Disruptor=77,579,519 ops/sec
Run 2, Disruptor=78,678,206 ops/sec
Run 3, Disruptor=80,840,743 ops/sec
Run 4, Disruptor=81,037,277 ops/sec
Run 5, Disruptor=81,168,831 ops/sec
Run 6, Disruptor=81,699,346 ops/sec
YieldingWaitStrategy是一個低延遲系統中等待策略。經過犧牲CPU資源來下降延遲。YieldingWaitStrategy經過busy spin等待sequence增加到合適的值。在內部實現中,經過在循環內部使用Thread.yield()容許其餘的隊列線程運行。當須要很高的性能且事件處理線程少於CPU邏輯核數時這個策略被強烈推薦。如:啓用了超線程。
class ObjectEvent<T> { T val; void clear() { val = null; } } public class ClearingEventHandler<T> implements EventHandler<ObjectEvent<T>> { public void onEvent(ObjectEvent<T> event, long sequence, boolean endOfBatch) { // Failing to call clear here will result in the // object associated with the event to live until // it is overwritten once the ring buffer has wrapped // around to the beginning. event.clear(); } } public static void main(String[] args) { Disruptor<ObjectEvent<String>> disruptor = new Disruptor<>( () -> ObjectEvent<String>(), bufferSize, DaemonThreadFactory.INSTANCE); disruptor .handleEventsWith(new ProcessingEventHandler()) .then(new ClearingObjectHandler()); }