建立步驟:java
簡歷一個工廠Event類,OrderEvent.javaui
package com.bfxy.disruptor.quickstart; public class OrderEvent { //訂單的價格 private long value; public long getValue() { return value; } public void setValue(long value) { this.value = value; } }
OrderEventFactory.javathis
package com.bfxy.disruptor.quickstart; import com.lmax.disruptor.EventFactory; public class OrderEventFactory implements EventFactory<OrderEvent>{ public OrderEvent newInstance() { return new OrderEvent(); //這個方法就是爲了返回空的數據對象(Event) } }
二、須要有一個監聽事件類:OrderEventHandler.java線程
package com.bfxy.disruptor.quickstart; import com.lmax.disruptor.EventHandler; public class OrderEventHandler implements EventHandler<OrderEvent>{ public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws Exception { Thread.sleep(Integer.MAX_VALUE); System.err.println("消費者: " + event.getValue()); } }
三、實例化Disruptor實例:Main.java對象
package com.bfxy.disruptor.quickstart; import java.nio.ByteBuffer; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import com.lmax.disruptor.BlockingWaitStrategy; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; public class Main { public static void main(String[] args) { // 參數準備工做 OrderEventFactory orderEventFactory = new OrderEventFactory(); int ringBufferSize = 4; ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); /** * 1 eventFactory: 消息(event)工廠對象 * 2 ringBufferSize: 容器的長度 * 3 executor: 線程池(建議使用自定義線程池) RejectedExecutionHandler * 4 ProducerType: 單生產者 仍是 多生產者 * 5 waitStrategy: 等待策略 */ //1. 實例化disruptor對象 Disruptor<OrderEvent> disruptor = new Disruptor<OrderEvent>(orderEventFactory, ringBufferSize, executor, ProducerType.SINGLE, new BlockingWaitStrategy()); //2. 添加消費者的監聽 (構建disruptor 與 消費者的一個關聯關係) disruptor.handleEventsWith(new OrderEventHandler()); //3. 啓動disruptor disruptor.start(); //4. 獲取實際存儲數據的容器: RingBuffer RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer(); OrderEventProducer producer = new OrderEventProducer(ringBuffer); ByteBuffer bb = ByteBuffer.allocate(8); for(long i = 0 ; i < 5; i ++){ bb.putLong(0, i); producer.sendData(bb); } disruptor.shutdown(); executor.shutdown(); } }
四、編寫生產者組件:OrderEventProducer.java 事件
package com.bfxy.disruptor.quickstart; import java.nio.ByteBuffer; import com.lmax.disruptor.RingBuffer; public class OrderEventProducer { private RingBuffer<OrderEvent> ringBuffer; public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) { this.ringBuffer = ringBuffer; } public void sendData(ByteBuffer data) { //1 在生產者發送消息的時候, 首先 須要從咱們的ringBuffer裏面 獲取一個可用的序號 long sequence = ringBuffer.next(); //0 try { //2 根據這個序號, 找到具體的 "OrderEvent" 元素 注意:此時獲取的OrderEvent對象是一個沒有被賦值的"空對象" OrderEvent event = ringBuffer.get(sequence); //3 進行實際的賦值處理 event.setValue(data.getLong(0)); } finally { //4 提交發布操做 ringBuffer.publish(sequence); } } }