本次代碼測試基於相同的 容量、生產線程數、單個線程生產量; 僅有一個消費線程。java
修改各參數獲得的結果:併發
數據規模、併發線程數、 最主要的是容量小時:Disruptor沒有優點app
2019-08-29T07:42:35.235Z 線程數:64 單線程生產量: 2048 容量:32 數據總量:131072 2019-08-29T07:42:48.742Z EventProcessor wrapper 2019-08-29T07:42:48.743Z disruptor start success! 2019-08-29T07:42:51.113Z 處理的sequence:131071 count:131072 Disruptor 總耗時:2369 2019-08-29T07:42:36.200Z ArrayBlockingQueue 生產耗時:962 2019-08-29T07:42:36.200Z 處理count:131072 ArrayBlockingQueue 消費耗時:962 2019-08-29T07:42:36.201Z ArrayBlockingQueue 總耗時:963
2019-08-29T08:24:38.641Z 線程數:512 單線程生產量: 2048 容量:32 數據總量:1048576 2019-08-29T08:24:38.670Z EventProcessor wrapper 2019-08-29T08:24:38.670Z disruptor start success! 2019-08-29T08:25:08.590Z 處理的sequence:1048575 count:1048576 Disruptor 總耗時:29918 2019-08-29T08:25:54.753Z 處理count:1048576 ArrayBlockingQueue 消費耗時:9231 2019-08-29T08:25:54.753Z ArrayBlockingQueue 生產耗時:9230 2019-08-29T08:25:54.753Z ArrayBlockingQueue 總耗時:9231
增大容量: Disruptor的性能上升ide
2019-08-29T07:40:28.980Z 線程數:64 單線程生產量: 2048 容量:128 數據總量:131072 2019-08-29T07:40:29.008Z EventProcessor wrapper 2019-08-29T07:40:29.008Z disruptor start success! 2019-08-29T07:40:29.694Z 處理的sequence:131071 count:131072 Disruptor 總耗時:685 2019-08-29T07:47:42.436Z 處理count:131072 ArrayBlockingQueue 消費耗時:508 2019-08-29T07:47:42.436Z ArrayBlockingQueue 生產耗時:508 2019-08-29T07:47:42.436Z ArrayBlockingQueue 總耗時:508
2019-08-29T07:43:39.073Z 線程數:64 單線程生產量: 2048 容量:512 數據總量:131072 2019-08-29T07:43:39.101Z EventProcessor wrapper 2019-08-29T07:43:39.101Z disruptor start success! 2019-08-29T07:43:39.269Z 處理的sequence:131071 count:131072 Disruptor 總耗時:167 2019-08-29T07:43:53.722Z ArrayBlockingQueue 生產耗時:383 2019-08-29T07:43:53.722Z 處理count:131072 ArrayBlockingQueue 消費耗時:383 2019-08-29T07:43:53.722Z ArrayBlockingQueue 總耗時:383
2019-08-29T07:44:05.995Z 線程數:64 單線程生產量: 2048 容量:1024 數據總量:131072 2019-08-29T08:18:10.426Z EventProcessor wrapper 2019-08-29T08:18:10.426Z disruptor start success! 2019-08-29T08:18:10.524Z 處理的sequence:131071 count:131072 Disruptor 總耗時:97 2019-08-29T07:44:06.365Z ArrayBlockingQueue 生產耗時:367 2019-08-29T07:44:06.365Z 處理count:131072 ArrayBlockingQueue 消費耗時:367 2019-08-29T07:44:06.365Z ArrayBlockingQueue 總耗時:367
再增大各指標參數: Disruptor優點愈來愈明顯高併發
2019-08-29T07:50:59.911Z 線程數:64 單線程生產量: 65536 容量:1048576 數據總量:4194304 2019-08-29T07:51:28.075Z EventProcessor wrapper 2019-08-29T07:51:28.075Z disruptor start success! 2019-08-29T07:51:28.577Z 處理的sequence:4194303 count:4194304 Disruptor 總耗時:501 2019-08-29T07:51:11.549Z ArrayBlockingQueue 生產耗時:11633 2019-08-29T07:51:11.575Z 處理count:4194304 ArrayBlockingQueue 消費耗時:11659 2019-08-29T07:51:11.575Z ArrayBlockingQueue 總耗時:11659
2019-08-29T07:57:22.994Z 線程數:128 單線程生產量: 65536 容量:1048576 數據總量:8388608 2019-08-29T07:57:23.074Z EventProcessor wrapper 2019-08-29T07:57:23.074Z disruptor start success! 2019-08-29T07:57:24.036Z 處理的sequence:8388607 count:8388608 Disruptor 總耗時:961 2019-08-29T07:58:25.567Z ArrayBlockingQueue 生產耗時:47941 2019-08-29T07:58:25.646Z 處理count:8388608 ArrayBlockingQueue 消費耗時:48020 2019-08-29T07:58:25.647Z ArrayBlockingQueue 總耗時:48021
再大線程數, ArrayBlockingQueue 更耗時了,而Disruptor仍舊很快性能
2019-08-29T08:05:17.927Z 線程數:256 單線程生產量: 65536 容量:1048576 數據總量:16777216 2019-08-29T08:05:18.026Z EventProcessor wrapper 2019-08-29T08:05:18.027Z disruptor start success! 2019-08-29T08:05:20.060Z 處理的sequence:16777215 count:16777216 Disruptor 總耗時:2032
經測試發現: 測試
package com.lmax.disruptor.noob; import java.time.Instant; import java.time.format.DateTimeFormatter; /** * 擔憂影響, 分開執行測試 * * @author admin * */ public class CompareTest { public static int THREAD = 2 << 8; // 線程數量 public static int PER = 2 << 10; // 單個線程生產數量 public static int TOTAL_COUNT = THREAD * PER; // 數據總量 public static int SIZE =32; // 最大容量 public static void main(String[] args) { println("線程數:" + THREAD + " 單線程生產量: " + PER + " 容量:" + SIZE + " 數據總量:" + TOTAL_COUNT); new Thread(() -> ArrayBlockingQueueTest.execute()).start(); // new Thread(() -> DisruptorTest.execute()).start(); } public static void println(String msg) { System.out.println(DateTimeFormatter.ISO_INSTANT.format(Instant.now()) + " " + msg); } }
package com.lmax.disruptor.noob; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; public class ArrayBlockingQueueTest { public static void execute() { ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<String>(CompareTest.SIZE); AtomicBoolean endP = new AtomicBoolean(false); AtomicBoolean endC = new AtomicBoolean(false); long startTime = System.currentTimeMillis(); AtomicLong count = new AtomicLong(0); for (int i = 0; i < CompareTest.THREAD; i++) { final int m = i; new Thread(() -> { for (int j = 0; j < CompareTest.PER; j++) { try { queue.put("i" + m + "j" + j); // 隊列不夠,等待生產 } catch (InterruptedException e) { e.printStackTrace(); } if (count.incrementAndGet() == CompareTest.TOTAL_COUNT) { CompareTest.println("ArrayBlockingQueue 生產耗時:" + (System.currentTimeMillis() - startTime)); endP.set(true); } } }).start(); } new Thread(() -> { AtomicLong consumerCount = new AtomicLong(0); while (true) { try { queue.take(); // 直到消費完全部信息 } catch (InterruptedException e) { e.printStackTrace(); } if (consumerCount.incrementAndGet() == CompareTest.TOTAL_COUNT) { break; } } CompareTest.println("處理count:" + consumerCount.get() + " ArrayBlockingQueue 消費耗時:" + (System.currentTimeMillis() - startTime)); endC.set(true); }).start(); while (!(endC.get() && endP.get())) {} CompareTest.println("ArrayBlockingQueue 總耗時:" + (System.currentTimeMillis() - startTime)); } }
package com.lmax.disruptor.noob; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicLong; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.dsl.Disruptor; public class DisruptorTest { public static void execute() { Disruptor<DataEvent> disruptor = new Disruptor<DataEvent>(new DataEventFactory(), CompareTest.SIZE, new ThreadFactory() { @Override public Thread newThread(Runnable eventProcessor) { CompareTest.println("EventProcessor wrapper");// 對事件處理總線的封裝 Thread thread = new Thread(eventProcessor); thread.setName("EventProcessorWrapper"); return thread; } }); /** * 建立EventProcessors<Runnable>. * 子過程Disruptor.checkNotStarted()事件處理handler必須在啓動以前綁定. */ disruptor.handleEventsWith(new DataEventHandler()); disruptor.start(); CompareTest.println("disruptor start success!"); RingBuffer<DataEvent> ringBuffer = disruptor.getRingBuffer(); DataProducer producer = new DataProducer(ringBuffer); DataEventProducerWithTranslator translator = new DataEventProducerWithTranslator(ringBuffer); long start = System.currentTimeMillis(); for (int l = 0; l < CompareTest.THREAD; l++) { new Thread(() -> { for (int m = 0; m < CompareTest.PER; m++) { producer.onData(start); // translator.onData(start); } }).start(); } /** * 關閉 disruptor,方法會堵塞,直至全部的事件都獲得處理;並不會自動關閉外部指定的executor,須要主動關閉 */ // disruptor.shutdown(); // CompareTest.println("disruptor shutdown success!"); // executor.shutdown(); } }
事件this
package com.lmax.disruptor.noob; /** * 事件實例封裝 業務數據傳遞對象 * * @author admin * */ public class DataEvent { private long startTime; public long getStartTime() { return startTime; } public void setStartTime(long startTime) { this.startTime = startTime; } } --- package com.lmax.disruptor.noob; import com.lmax.disruptor.EventFactory; /* * 構建傳遞的數據封裝對象, 在初始化ringBuffer時,直接給entries[]每一個地址上初始化DataEvent */ public class DataEventFactory implements EventFactory { @Override public Object newInstance() { return new DataEvent(); } }
生產事件發佈atom
package com.lmax.disruptor.noob; import com.lmax.disruptor.RingBuffer; public class DataProducer { private final RingBuffer<DataEvent> ringBuffer; public DataProducer(RingBuffer<DataEvent> ringBuffer) { this.ringBuffer = ringBuffer; } /** * 當前仍是生產線程 * <p> * onData用來發布事件,每調用一次就發佈一次事件事件 它的參數會經過事件傳遞給消費者 * * @param data */ public void onData(long data) {// // 能夠把ringBuffer看作一個事件隊列,那麼next就是獲得下面一個事件槽, 若沒有空閒的時間槽則阻塞 long sequence = ringBuffer.next(); // CompareTest.println("生產置入sequence:" + sequence); try { // 用上面的索引取出一個空的事件用於填充 DataEvent event = ringBuffer.get(sequence);// for the sequence event.setStartTime(data); } finally { // 發佈事件 ringBuffer.publish(sequence); } } }
獲取下一個事件槽併發布事件要使用try/finally保證事件必定會被髮布, 因此最好直接使用 ringBuffer.publishEvent方式將數據交由Translator來處理填充DataEvent,最後finally發佈spa
package com.lmax.disruptor.noob; import com.lmax.disruptor.EventTranslatorOneArg; import com.lmax.disruptor.RingBuffer; /** * 獲取下一個事件槽併發布事件(發佈事件的時候要使用try/finally保證事件必定會被髮布)。 * 若是咱們使用RingBuffer.next()獲取一個事件槽,那麼必定要發佈對應的事件。若是不能發佈事件,那麼就會引發Disruptor狀態的混亂 * 。尤爲是在多個事件生產者的狀況下會致使事件消費者失速,從而不得不重啓應用才能會恢復。 * * @author admin * */ public class DataEventProducerWithTranslator { private final RingBuffer<DataEvent> ringBuffer; // 一個translator能夠看作一個事件初始化器,publicEvent方法會調用它 // 填充Event private static final EventTranslatorOneArg<DataEvent, Long> TRANSLATOR = new EventTranslatorOneArg<DataEvent, Long>() { public void translateTo(DataEvent event, long sequence, Long startTime) { event.setStartTime(startTime); } }; public DataEventProducerWithTranslator(RingBuffer<DataEvent> ringBuffer) { this.ringBuffer = ringBuffer; } public void onData(Long bb) { ringBuffer.publishEvent(TRANSLATOR, bb); // 當前仍是生產者線程 // CompareTest.println(Thread.currentThread().getName() + " pulishEvent end!"); } }
事件消費處理
package com.lmax.disruptor.noob; import java.util.concurrent.atomic.AtomicLong; import com.lmax.disruptor.EventHandler; /** * 對指定事件的處理過程 * */ public class DataEventHandler implements EventHandler<DataEvent> { public AtomicLong count = new AtomicLong(0); @Override public void onEvent(DataEvent event, long sequence, boolean endOfBatch) throws Exception { /** * 消費者線程由初始化Disruptor時指定的threadFactory建立的 */ if (count.incrementAndGet() == CompareTest.TOTAL_COUNT) { CompareTest.println("處理的sequence:" + sequence + " count:" + count.get() + " Disruptor 總耗時:" + (System.currentTimeMillis() - event.getStartTime())); } } }