最近工做比較忙,在工做項目中,看了不少人都本身實現了一套數據任務處理機制,我的感受有點亂,且也方便他人的後續維護,因此想到了一種數據處理模式,即生產者、緩衝隊列、消費者的模式來統一你們的實現邏輯。java
下面時是對Disruptor基本使用的演示。使用中須要引入依賴git
<dependency> <groupId>com.lmax</groupId> <artifactId>disruptor</artifactId> <version>3.4.2</version> </dependency>
Ring Buffergithub
環境的緩存區,3.0版本之前被認爲是Disruptor的主要成員。3.0版本之後,環形緩衝區只負責經過Disruptor的事件方式來對數據進行存儲和更新。在一些高級的應用場景中,Ring Buffer能夠由用戶的自定義實現徹底替代。算法
Sequence緩存
Disruptor使用Sequence做爲一種方法來肯定特定組件的位置。每一個使用者(EventProcessor)與Disruptor自己同樣維護一個序列。大多數併發代碼依賴於這些序列值的移動,所以序列支持AtomicLong的許多當前特性。事實上,二者之間惟一真正的區別是序列包含額外的功能,以防止序列和其餘值之間的錯誤共享。微信
Sequencer架構
Sequencer是真正的核心,該接口的兩個實現(單生產者, 多消費者)實現了全部用於在生產者和使用者之間的快速、正確的傳遞數據的併發算法。併發
Sequence Barrier異步
序列屏障由Sequencer產生,包含對Sequencer和任何依賴消費者的序列的引用。它包含肯定是否有任何事件可供使用者處理的邏輯。ide
Wait Strategy
等待策略肯定消費者將如何等待生產者產生的消息,Disruptor將消息放到事件(Event)中。
Event
從生產者到消費者的數據單位。不存在徹底由用戶定義的事件的特定代碼的表示形式。
EventProcessor
EventProcessor持有特定消費者(Consumer)的Sequence,並提供用於調用事件處理實現的事件循環。
BatchEventProcessor
BatchEventProcessor它包含事件循環的有效實現,並將回調到已使用的EventHandle接口實現。
EventHandler
Disruptor定義的事件處理接口,由用戶實現,用於處理事件,是Consumer的真正實現。
Producer
生產者,只是泛指調用Disruptor發佈事件的用戶代碼,Disruptor沒有定義特定接口或類型。
事件就是經過Disruptor進行交換的數據類型。
package com.disruptor; public class Data { private long value; public long getValue() { return value; } public void setValue(long value) { this.value = value; } }
事件工廠定義瞭如何實例化第一步中定義的事件。Disruptor經過EventFactory在RingBuffer中預建立Event的實例。
一個Event實例被用做一個數據槽,發佈者發佈前,先從RingBuffer得到一個Event的實例,而後往Event實例中插入數據,而後再發布到RingBuffer中,最後由Consumer得到Event實例並從中讀取數據。
package com.disruptor; import com.lmax.disruptor.EventFactory; public class DataFactory implements EventFactory<Data> { @Override public Data newInstance() { return new Data(); } }
package com.disruptor; import com.lmax.disruptor.RingBuffer; import java.nio.ByteBuffer; public class Producer { private final RingBuffer<Data> ringBuffer; public Producer(RingBuffer<Data> ringBuffer) { this.ringBuffer = ringBuffer; } public void pushData(ByteBuffer byteBuffer) { long sequence = ringBuffer.next(); try { Data even = ringBuffer.get(sequence); even.setValue(byteBuffer.getLong(0)); } finally { ringBuffer.publish(sequence); } } }
package com.disruptor; import com.lmax.disruptor.WorkHandler; import java.text.MessageFormat; public class Consumer implements WorkHandler<Data> { @Override public void onEvent(Data data) throws Exception { long result = data.getValue() + 1; System.out.println(MessageFormat.format("Data process : {0} + 1 = {1}", data.getValue(), result)); } }
package com.disruptor; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.dsl.Disruptor; import java.nio.ByteBuffer; import java.util.concurrent.ThreadFactory; public class Main { private static final int NUMS = 10; private static final int SUM = 1000000; public static void main(String[] args) { try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } long start = System.currentTimeMillis(); DataFactory factory = new DataFactory(); int buffersize = 1024; Disruptor<Data> disruptor = new Disruptor<Data>(factory, buffersize, new ThreadFactory() { @Override public Thread newThread(Runnable r) { return new Thread(r); } }); Consumer[] consumers = new Consumer[NUMS]; for (int i = 0; i < NUMS; i++) { consumers[i] = new Consumer(); } disruptor.handleEventsWithWorkerPool(consumers); disruptor.start(); RingBuffer<Data> ringBuffer = disruptor.getRingBuffer(); Producer producer = new Producer(ringBuffer); ByteBuffer bb = ByteBuffer.allocate(8); for (long i = 0; i < SUM; i++) { bb.putLong(0, i); producer.pushData(bb); System.out.println("Success producer data : " + i); } long end = System.currentTimeMillis(); disruptor.shutdown(); System.out.println("Total time : " + (end - start)); } }
Data process : 999,987 + 1 = 999,988 Success producer data : 999995 Data process : 999,990 + 1 = 999,991 Data process : 999,989 + 1 = 999,990 Data process : 999,991 + 1 = 999,992 Data process : 999,992 + 1 = 999,993 Data process : 999,993 + 1 = 999,994 Data process : 999,995 + 1 = 999,996 Success producer data : 999996 Success producer data : 999997 Success producer data : 999998 Success producer data : 999999 Data process : 999,994 + 1 = 999,995 Data process : 999,996 + 1 = 999,997 Data process : 999,997 + 1 = 999,998 Data process : 999,998 + 1 = 999,999 Data process : 999,999 + 1 = 1,000,000 Total time : 14202
由結果展現可見,邊生產、邊消費。
package com.mm.demo.disruptor.translator; import com.lmax.disruptor.EventTranslatorOneArg; import com.mm.demo.disruptor.entity.Data; public class DataEventTranslator implements EventTranslatorOneArg<Data, Long> { @Override public void translateTo(Data event, long sequence, Long arg0) { System.out.println(MessageFormat.format("DataEventTranslator arg0 = {0}, seq = {1}", arg0, sequence)); event.setValue(arg0); } }
消費者每次將event的結果加1。
package com.mm.demo.disruptor.handler; import com.lmax.disruptor.EventHandler; import com.mm.demo.disruptor.entity.Data; import java.text.MessageFormat; public class D1DataEventHandler implements EventHandler<Data> { @Override public void onEvent(Data event, long sequence, boolean endOfBatch) throws Exception { long result = event.getValue() + 1; Thread t = new Thread(); String name = t.getName(); System.out.println(MessageFormat.format("consumer "+name+": {0} + 1 = {1}", event.getValue(), result)); } }
這裏是使用的是EventHandler。也是使用WorkHandler,EventHandler和WorkHandler的區別是前者不須要池化,後者須要池化。
package com.mm.demo.disruptor.handler; import com.lmax.disruptor.EventHandler; import com.mm.demo.disruptor.entity.Data; import java.text.MessageFormat; public class D2DataEventHandler implements EventHandler<Data> { @Override public void onEvent(Data event, long sequence, boolean endOfBatch) throws Exception { long result = event.getValue() + 2; System.out.println(MessageFormat.format("consumer 2: {0} + 2 = {1}", event.getValue(), result)); } }
Consumer1執行完成再執行Consumer2。
package com.mm.demo.disruptor.process; import com.lmax.disruptor.dsl.Disruptor; import com.mm.demo.disruptor.entity.Data; import com.mm.demo.disruptor.handler.D1DataEventHandler; import com.mm.demo.disruptor.handler.D2DataEventHandler; /** * 串行依次計算 * @DateT: 2020-01-07 */ public class Serial { public static void serial(Disruptor<Data> disruptor) { disruptor.handleEventsWith(new D1DataEventHandler()).then(new D2DataEventHandler()); disruptor.start(); } }
Consumer1和Consumer2同時執行。
package com.mm.demo.disruptor.process; import com.lmax.disruptor.dsl.Disruptor; import com.mm.demo.disruptor.entity.Data; import com.mm.demo.disruptor.handler.D1DataEventHandler; import com.mm.demo.disruptor.handler.D2DataEventHandler; /** * 並行執行 * @DateT: 2020-01-07 */ public class Parallel { public static void parallel(Disruptor<Data> dataDisruptor) { dataDisruptor.handleEventsWith(new D1DataEventHandler(), new D2DataEventHandler()); dataDisruptor.start(); } }
package com.mm.demo.disruptor; import com.lmax.disruptor.BlockingWaitStrategy; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; import com.mm.demo.disruptor.entity.Data; import com.mm.demo.disruptor.handler.D1DataEventHandler; import com.mm.demo.disruptor.process.Parallel; import com.mm.demo.disruptor.process.Serial; import com.mm.demo.disruptor.translator.DataEventTranslator; import javax.swing.plaf.synth.SynthTextAreaUI; import java.nio.ByteBuffer; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; public class Main { private static final int BUFFER = 1024 * 1024; public static void main(String[] args) { DataFactory factory = new DataFactory(); Disruptor<Data> disruptor = new Disruptor<Data>(factory, BUFFER, Executors.defaultThreadFactory(), ProducerType.MULTI, new BlockingWaitStrategy()); Serial.serial(disruptor); // Parallel.parallel(disruptor); RingBuffer<Data> ringBuffer = disruptor.getRingBuffer(); for (int i = 0; i < 2; i++) { ringBuffer.publishEvent(new DataEventTranslator(), (long)i); } disruptor.shutdown(); } }
上邊只演示了串行和並行的方式,其實仍是經過組合的方式建立不的計算處理方式(須要建立多個事件處理器EventHandler)。
- BlockingWaitStrategy:最低效的策略,可是對cpu的消耗是最小的,在各類不一樣部署環境中能提供更加一致的性能表現。
- SleepingWaitStrategy:性能和BlockingWaitStrategy差很少少,cpu消耗也相似,可是其對生產者線程的影響最小,適合用於異步處理數據的場景。
- YieldingWaitStrategy:性能是最好的,適用於低延遲的場景。在要求極高性能且事件處理線程數小於cpu處理核數時推薦使用此策略。
- BusySpinWaitStrategy:低延遲,可是對cpu資源的佔用較多。
- PhasedBackoffWaitStrategy:上邊幾種策略的綜合體,延遲大,可是佔用cpu資源較少。
本文參考了Disruptor源碼以及github中的部分說明。