在disruptor框架調用start方法以前,須要將消息的消費者指定給disruptor框架。java
對於獨立消費的消費者,應當實現EventHandler接口。對於不重複消費的消費者,應當實現WorkHandler接口。
從代碼層面而言, 有不一樣的具體實現來支持不一樣的模式app
此處的測試代碼的對接口WorkHandler 進行了改造。
框架
package com.lmax.disruptor.noob; import java.time.Instant; import java.time.format.DateTimeFormatter; public class CompareTest { public static int THREAD = 2; // 線程數量 public static int PER = 1; // 單個線程生產數量 public static int TOTAL_COUNT = THREAD * PER; // 數據總量 public static int SIZE = 4; // 最大容量 public static void main(String[] args) { println("線程數:" + THREAD + " 單線程生產量: " + PER + " 容量:" + SIZE + " 數據總量:" + TOTAL_COUNT); DisruptorTest.execute(); } public static void println(String msg) { System.out.println(DateTimeFormatter.ISO_INSTANT.format(Instant.now()) + "[" + Thread.currentThread().getName() + "] " + msg); } } --------- import java.util.concurrent.ThreadFactory; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.dsl.Disruptor; public class DisruptorTest { public static void execute() { Disruptor<DataEvent> disruptor = new Disruptor<DataEvent>(new DataEventFactory(), CompareTest.SIZE, new ThreadFactory() { AtomicInteger count = new AtomicInteger(0); @Override public Thread newThread(Runnable eventProcessor) { CompareTest.println("EventProcessor wrapper");// 對事件處理總線的封裝 Thread thread = new Thread(eventProcessor); thread.setName("EventProcessor" + count.incrementAndGet()); return thread; } }); /** * 建立EventProcessors<Runnable>. * 子過程Disruptor.checkNotStarted()事件處理handler必須在啓動以前綁定. */ disruptor.handleEventsWith(new DataEventHandler("dataEventHandler1"),new DataEventHandler("dataEventHandler2")); // disruptor.handleEventsWithWorkerPool(new DataWorkHandler("dataWorkHandler1"),new DataWorkHandler("dataWorkHandler2")); disruptor.start(); CompareTest.println("disruptor start success!"); RingBuffer<DataEvent> ringBuffer = disruptor.getRingBuffer(); DataProducer producer = new DataProducer(ringBuffer); DataEventProducerWithTranslator translator = new DataEventProducerWithTranslator(ringBuffer); long start = System.currentTimeMillis(); for (int l = 0; l < CompareTest.THREAD; l++) { new Thread(() -> { for (int m = 0; m < CompareTest.PER; m++) { producer.onData(start); // translator.onData(start); 推薦用這種方式作。 } }).start(); } } } ---------- import java.util.concurrent.atomic.AtomicLong; import com.lmax.disruptor.EventHandler; public class DataEventHandler implements EventHandler<DataEvent> { public AtomicLong count = new AtomicLong(0); public String name = null; public DataEventHandler(String name) { this.name = name; } @Override public void onEvent(DataEvent event, long sequence, boolean endOfBatch) throws Exception { Thread.sleep(name.contentEquals("dataEventHandler1") ? 1000 : 100); CompareTest.println("handlerName: " + name + " 處理的sequence:" + sequence + " count:" + count.incrementAndGet() + " Disruptor 總耗時:" + (System.currentTimeMillis() - event.getStartTime())); } } ---------- import java.util.concurrent.atomic.AtomicLong; import com.lmax.disruptor.WorkHandler; public class DataWorkHandler implements WorkHandler<DataEvent> { public AtomicLong count = new AtomicLong(0); public String name = null; public DataWorkHandler(String name) { this.name = name; } @Override public void onEvent(DataEvent event, long sequence) throws Exception { Thread.sleep(name.contentEquals("dataWorkHandler2") ? 100 :1000); CompareTest.println("handlerName: " + name + " 處理的sequence:" + sequence + " count:" + count.incrementAndGet() + " Disruptor 總耗時:" + (System.currentTimeMillis() - event.getStartTime())); } }
disruptor.handleEventsWith(new DataEventHandler("dataEventHandler1"), new DataEventHandler("dataEventHandler2"));
disruptor.handleEventsWith(new DataEventHandler("dataEventHandler1")).then(new DataEventHandler("dataEventHandler2")).then(new DataEventHandler("dataEventHandler3"));
disruptor.handleEventsWithWorkerPool(new DataWorkHandler("dataWorkHandler1"), new DataWorkHandler("dataWorkHandler2"));
組合方式ide
disruptor.handleEventsWithWorkerPool(new DataWorkHandler("dataWorkHandler3"),new DataWorkHandler("dataWorkHandler4")).then(new DataEventHandler("dataEventHandler1"), new DataEventHandler("dataEventHandler2"));
disruptor.handleEventsWith(new DataEventHandler("dataEventHandler1"), new DataEventHandler("dataEventHandler2")).thenHandleEventsWithWorkerPool(new DataWorkHandler("dataWorkHandler3"),new DataWorkHandler("dataWorkHandler4"));