https://github.com/LMAX-Exchange/disruptor/wiki/Getting-Startedhtml
1 event 生產 消費java
package disruptor.start; /** * https://github.com/LMAX-Exchange/disruptor/wiki/Getting-Started * * 1 * * one that will pass a single long value from a producer to a consumer, * where the consumer will simply print out the value. * Firstly we will define the Event that will carry the data. */ public class LongEvent { private long value; public void set(long value) { this.value = value; } } package disruptor.start; import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.RingBuffer; /** * * 2 * * In order to allow the Disruptor to preallocate these events for us, * we need to an EventFactory that will perform the construction * * Called by the {@link RingBuffer} to pre-populate all the events to fill the RingBuffer. */ public class LongEventFactory implements EventFactory<LongEvent> { @Override public LongEvent newInstance() { return new LongEvent(); } } package disruptor.start; import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.RingBuffer; /** * 3 * * Once we have the event defined we need to * create a consumer that will handle these events * * EventHandler - Callback interface to be implemented for processing events as they become available in the {@link RingBuffer} * */ public class LongEventHandler implements EventHandler<LongEvent> { /** * Called when a publisher has published an event to the {@link RingBuffer} * * @param event published to the {@link RingBuffer} * @param sequence of the event being processed * @param endOfBatch flag to indicate if this is the last event in a batch from the {@link RingBuffer} * @throws Exception if the EventHandler would like the exception handled further up the chain. */ @Override public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception { System.out.println(Thread.currentThread().getName() +" Event: " + event); } } package disruptor.start; import java.nio.ByteBuffer; import com.lmax.disruptor.EventProcessor; import com.lmax.disruptor.RingBuffer; /** * 4 * need a source for these events, for the sake of an example I am going to assume that * the data is coming from some sort of I/O device, e.g. network or file in the form of a ByteBuffer. */ public class LongEventProducer { /** * Ring based store of reusable entries containing the data representing * an event being exchanged between event producer and {@link EventProcessor}s. */ private final RingBuffer<LongEvent> ringBuffer; public LongEventProducer(RingBuffer<LongEvent> ringBuffer) { this.ringBuffer = ringBuffer; } /** * What becomes immediately obvious is that event publication becomes more invovled that using a simple queue. * This is due to the desire for event preallocation. * In requires (at the lowest level) a 2-phase approach to message publication, * i.e. claim the slot in the ring buffer then publish the available data. * It is also necessary to wrap publication in a try/finally block. * If we claim a slot in the Ring Buffer (calling RingBuffer.next()) then we must publish this sequence. * Failing to do can result in corruption of the state of the Disruptor. * Specially, in the multi-producer case this will result in the consumers stalling and being unable to recover without a restart. * @param bb */ public void onData(ByteBuffer bb) { long sequence = ringBuffer.next(); // Grab the next sequence try { LongEvent event = ringBuffer.get(sequence); // Get the entry in the Disruptor // for the sequence event.set(bb.getLong(0)); // Fill with data } finally { ringBuffer.publish(sequence); } } }
2 高級用法ios
package disruptor.start.v3; import java.nio.ByteBuffer; import com.lmax.disruptor.EventTranslatorOneArg; import com.lmax.disruptor.RingBuffer; import disruptor.start.LongEvent; /** * With version 3.0 of the Disruptor a richer Lambda-style API was added to help developers * by encapsulating this complexity within the Ring Buffer, s * o post-3.0 the preferred approach for publishing messages is via the * Event Publisher/Event Translator portion of the API. E.g. * Date: 2017-01-10 * * @author duyu3 */ public class LongEventProducerWithTranslator { private final RingBuffer<LongEvent> ringBuffer; public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer) { this.ringBuffer = ringBuffer; } //Implementations translate another data representations into events claimed from the {@link RingBuffer} private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR = new EventTranslatorOneArg<LongEvent, ByteBuffer>() { public void translateTo(LongEvent event, long sequence, ByteBuffer bb) { event.set(bb.getLong(0)); } }; public void onData(ByteBuffer bb) { ringBuffer.publishEvent(TRANSLATOR, bb); } }
3 maingit
package disruptor.start.main; import java.nio.ByteBuffer; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.dsl.Disruptor; import disruptor.start.LongEvent; import disruptor.start.LongEventFactory; import disruptor.start.LongEventHandler; import disruptor.start.LongEventProducer; public class LongEventMain { public static void main(String[] args) throws Exception { // Executor that will be used to construct new threads for consumers Executor executor = Executors.newCachedThreadPool(); // The factory for the event LongEventFactory factory = new LongEventFactory(); // Specify the size of the ring buffer, must be power of 2. int bufferSize = 1024; // Construct the Disruptor Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, executor); // Connect the handler disruptor.handleEventsWith(new LongEventHandler()); // Start the Disruptor, starts all threads running disruptor.start(); // Get the ring buffer from the Disruptor to be used for publishing. RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); LongEventProducer producer = new LongEventProducer(ringBuffer); ByteBuffer bb = ByteBuffer.allocate(8); for (long l = 0; true; l++) { bb.putLong(0, l); producer.onData(bb); Thread.sleep(1000); } } } package disruptor.start.main; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.dsl.Disruptor; import start.LongEvent; import java.nio.ByteBuffer; import java.util.concurrent.Executor; import java.util.concurrent.Executors; /** * Note how a number of the classes (e.g. handler, translator) are no longer required. * Also note how the lambda used for publishEvent() only refers to the parameters that are passed in. */ public class LongEventMainWithJava8 { public static void main(String[] args) throws Exception { // Executor that will be used to construct new threads for consumers Executor executor = Executors.newCachedThreadPool(); // Specify the size of the ring buffer, must be power of 2. int bufferSize = 1024; // Construct the Disruptor Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, executor); // Connect the handler disruptor.handleEventsWith((event, sequence, endOfBatch) -> System.out.println("Event: " + event)); // Start the Disruptor, starts all threads running disruptor.start(); // Get the ring buffer from the Disruptor to be used for publishing. RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); ByteBuffer bb = ByteBuffer.allocate(8); for (long l = 0; true; l++) { bb.putLong(0, l); // 1. ringBuffer.publishEvent((event, sequence, buffer) -> event.set(buffer.getLong(0)), bb); // 2. /** * This would create a capturing lambda, * meaning that it would need to instantiate an object to hold the ByteBuffer bb variable * as it passes the lambda through to the publishEvent() call. * This will create additional (unnecessary) garbage, * so the call that passes the argument through to the lambda should be preferred if low GC pressure is a requirement. */ ringBuffer.publishEvent((event, sequence) -> event.set(bb.getLong(0))); Thread.sleep(1000); } } } package start.main; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.RingBuffer; import start.LongEvent; import java.nio.ByteBuffer; import java.util.concurrent.Executor; import java.util.concurrent.Executors; public class LongEventMainGood { public static void handleEvent(LongEvent event, long sequence, boolean endOfBatch) { System.out.println(event); } public static void translate(LongEvent event, long sequence, ByteBuffer buffer) { event.set(buffer.getLong(0)); } public static void main(String[] args) throws Exception { // Executor that will be used to construct new threads for consumers Executor executor = Executors.newCachedThreadPool(); // Specify the size of the ring buffer, must be power of 2. int bufferSize = 1024; // Construct the Disruptor Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, executor); // Connect the handler disruptor.handleEventsWith(LongEventMainGood::handleEvent); // Start the Disruptor, starts all threads running disruptor.start(); // Get the ring buffer from the Disruptor to be used for publishing. RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); ByteBuffer bb = ByteBuffer.allocate(8); for (long l = 0; true; l++) { bb.putLong(0, l); ringBuffer.publishEvent(LongEventMainGood::translate, bb); Thread.sleep(1000); } } }
基本調優github
Using the above approach will work functionally in the widest set of deployment scenarios. However, if you able to make certain assumptions about the hardware and software environment that the Disruptor will run in then you can take advantage of a number of tuning options to improve performance. There are 2 main options for tuning, single vs. multiple producers and alternative wait strategies.api
One of the best ways to improve performance in concurrect systems is to ahere to the Single Writer Princple, this applies to the Disruptor. If you are in the situation where there will only ever be a single thread producing events into the Disruptor, then you can take advantage of this to gain additional performance.oracle
public class LongEventMain { public static void main(String[] args) throws Exception { //..... // Construct the Disruptor with a SingleProducerSequencer Disruptor<LongEvent> disruptor = new Disruptor(factory, bufferSize, ProducerType.SINGLE, // Single producer new BlockingWaitStrategy(), executor); //..... } }
To give an indication of how much of a performance advantage can be achieved through this technique we can change the producer type in the OneToOne performance test. Tests run on i7 Sandy Bridge MacBook Air.app
Run 0, Disruptor=26,553,372 ops/sec Run 1, Disruptor=28,727,377 ops/sec Run 2, Disruptor=29,806,259 ops/sec Run 3, Disruptor=29,717,682 ops/sec Run 4, Disruptor=28,818,443 ops/sec Run 5, Disruptor=29,103,608 ops/sec Run 6, Disruptor=29,239,766 ops/sec
Run 0, Disruptor=89,365,504 ops/sec Run 1, Disruptor=77,579,519 ops/sec Run 2, Disruptor=78,678,206 ops/sec Run 3, Disruptor=80,840,743 ops/sec Run 4, Disruptor=81,037,277 ops/sec Run 5, Disruptor=81,168,831 ops/sec Run 6, Disruptor=81,699,346 ops/sec
The default wait strategy used by the Disruptor is the BlockingWaitStrategy. Internally the BlockingWaitStrategy uses a typical lock and condition variable to handle thread wake-up. The BlockingWaitStrategy is the slowest of the available wait strategies, but is the most conservative with the respect to CPU usage and will give the most consistent behaviour across the widest variety of deployment options. However, again knowledge of the deployed system can allow for additional performance.less
Like the BlockingWaitStrategy the SleepingWaitStrategy it attempts to be conservative with CPU usage, by using a simple busy wait loop, but uses a call to LockSupport.parkNanos(1)
in the middle of the loop. On a typical Linux system this will pause the thread for around 60µs. However it has the benefit that the producing thread does not need to take any action other increment the appropriate counter and does not require the cost of signalling a condition variable. However, the mean latency of moving the event between the producer and consumer threads will be higher. It works best in situations where low latency is not required, but a low impact on the producing thread is desired. A common use case is for asynchronous logging.async
The YieldingWaitStrategy is one of 2 Wait Strategies that can be use in low latency systems, where there is the option to burn CPU cycles with the goal of improving latency. The YieldingWaitStrategy will busy spin waiting for the sequence to increment to the appropriate value. Inside the body of the loop Thread.yield()
will be called allowing other queued threads to run. This is the recommended wait strategy when need very high performance and the number of Event Handler threads is less than the total number of logical cores, e.g. you have hyper-threading enabled.
The BusySpinWaitStrategy is the highest performing Wait Strategy, but puts the highest constraints on the deployment environment. This wait strategy should only be used if the number of Event Handler threads is smaller than the number of physical cores on the box. E.g. hyper-threading should be disabled.