今天用一個訂單問題來加深對 Disruptor 的理解。當系統中有訂單產生時,系統首先會記錄訂單信息。同時也會發送消息到其餘系統處理相關業務,最後纔是訂單的處理。java
代碼包含如下內容:git
1) 事件對象 Eventgithub
2)三個消費者 Handler數據庫
3)一個生產者 Producerdom
4)執行 Main 方法ide
(1) Event測試
public class Trade { private String id;//ID private String name; private double price;//金額 private AtomicInteger count = new AtomicInteger(0); // 省略getter/setter }
(2) Handler 類this
一個負責存儲訂單信息,一個負責發送 kafka 信息到其餘系統中,最後一個負責處理訂單信息。線程
import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.WorkHandler; /** * 第一個 Handler1,存儲到數據庫中 */ public class Handler1 implements EventHandler<Trade>, WorkHandler<Trade> { @Override public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception { this.onEvent(event); } @Override public void onEvent(Trade event) throws Exception { long threadId = Thread.currentThread().getId(); // 獲取當前線程id String id = event.getId(); // 獲取訂單號 System.out.println(String.format("%s:Thread Id %s 訂單信息保存 %s 到數據庫中 ....", this.getClass().getSimpleName(), threadId, id)); } }
import com.lmax.disruptor.EventHandler; /** * 第二個 Handler2,訂單信息發送到其它系統中 */ public class Handler2 implements EventHandler<Trade> { @Override public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception { long threadId = Thread.currentThread().getId(); // 獲取當前線程id String id = event.getId(); // 獲取訂單號 System.out.println(String.format("%s:Thread Id %s 訂單信息 %s 發送到 karaf 系統中 ....", this.getClass().getSimpleName(), threadId, id)); } }
import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.WorkHandler; /** * 第三個 Handler2,處理訂單信息 */ public class Handler3 implements EventHandler<Trade>, WorkHandler<Trade> { @Override public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception { onEvent(event); } @Override public void onEvent(Trade event) throws Exception { long threadId = Thread.currentThread().getId(); // 獲取當前線程id String id = event.getId(); // 獲取訂單號 System.out.println(String.format("%s:Thread Id %s 訂單信息 %s 處理中 ....", this.getClass().getSimpleName(), threadId, id)); } }
(3) Producer 類code
import com.lmax.disruptor.EventTranslator; import com.lmax.disruptor.dsl.Disruptor; import java.util.UUID; import java.util.concurrent.CountDownLatch; public class TradePublisher implements Runnable { Disruptor<Trade> disruptor; private CountDownLatch latch; private static int LOOP = 1; // 模擬百萬次交易的發生 public TradePublisher(CountDownLatch latch, Disruptor<Trade> disruptor) { this.disruptor=disruptor; this.latch=latch; } @Override public void run() { TradeEventTranslator tradeTransloator = new TradeEventTranslator(); for(int i = 0; i < LOOP; i++) { disruptor.publishEvent(tradeTransloator); } latch.countDown(); } } class TradeEventTranslator implements EventTranslator<Trade>{ @Override public void translateTo(Trade event, long sequence) { event.setId(UUID.randomUUID().toString()); } }
(4) 執行的 Main 方法
package com.github.binarylei.disruptor.demo3; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import com.lmax.disruptor.BusySpinWaitStrategy; import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.EventHandlerGroup; import com.lmax.disruptor.dsl.ProducerType; public class Main { public static void main(String[] args) throws InterruptedException { long beginTime=System.currentTimeMillis(); int bufferSize=1024; ExecutorService executor=Executors.newFixedThreadPool(8); Disruptor<Trade> disruptor = new Disruptor<>(new EventFactory<Trade>() { @Override public Trade newInstance() { return new Trade(); } }, bufferSize, executor, ProducerType.SINGLE, new BusySpinWaitStrategy()); //菱形操做 //使用disruptor建立消費者組C1,C2 EventHandlerGroup<Trade> handlerGroup = disruptor.handleEventsWith(new Handler1(), new Handler2()); //聲明在C1,C2完事以後執行JMS消息發送操做 也就是流程走到C3 handlerGroup.then(new Handler3()); disruptor.start();//啓動 CountDownLatch latch=new CountDownLatch(1); //生產者準備 executor.submit(new TradePublisher(latch, disruptor)); latch.await();//等待生產者完事. disruptor.shutdown(); executor.shutdown(); System.out.println("總耗時:"+(System.currentTimeMillis()-beginTime)); } }
測試結果以下:
Handler1:Thread Id 10 訂單信息保存 a097c77d-08f1-430a-8342-2143963f268f 到數據庫中 .... Handler2:Thread Id 11 訂單信息 a097c77d-08f1-430a-8342-2143963f268f 發送到 karaf 系統中 .... Handler3:Thread Id 13 訂單信息 a097c77d-08f1-430a-8342-2143963f268f 處理中 .... 總耗時:1631
能夠看到 Handler3 在 Handler1 和 Handler2 執行完成後才執行。
雖然 disruptor 模式使用起來很簡單,可是創建多個消費者以及它們之間的依賴關係須要的樣板代碼太多了。爲了能快速又簡單適用於99%的場景,我爲 Disruptor 模式準備了一個簡單的領域特定語言(DSL),定義了消費順序。更多Disruptor場景使用
在講解 Disruptor DSL 以前先看一下多個消費者不重複消費的問題。
默認一個消費者一個線程,若是想要實現 C3 多個消費者共同不重複消費數據,可使用 handlerGroup.thenHandleEventsWithWorkerPool(customers)
//使用disruptor建立消費者組C1, C2 EventHandlerGroup<Trade> handlerGroup = disruptor.handleEventsWith(new Handler1(), new Handler2()); // 多個消費者不重複消費 Handler3[] customers = new Handler3[]{new Handler3(), new Handler3(), new Handler3()}; handlerGroup.thenHandleEventsWithWorkerPool(customers);
在這種狀況下,只要生產者(P1)將元素放到ring buffer上,消費者C1和C2就能夠並行處理這些元素。可是消費者C3必須一直等到C1和C2處理完以後,才能夠處理。在現實世界中的對應的案例就像:在處理實際的業務邏輯(C3)以前,須要校驗數據(C1),以及將數據寫入磁盤(C2)。
//1. 使用disruptor建立消費者組C1,C2 EventHandlerGroup<Trade> handlerGroup = disruptor.handleEventsWith(new Handler1(), new Handler2()); //2. 聲明在C1,C2完事以後執行JMS消息發送操做 也就是流程走到C3 handlerGroup.then(new Handler3());
disruptor.handleEventsWith(new Handler1()). handleEventsWith(new Handler2()). handleEventsWith(new Handler3());
咱們甚至能夠在一個更復雜的六邊形模式中構建一個並行消費者鏈:
Handler1 h1 = new Handler1(); Handler2 h2 = new Handler2(); Handler3 h3 = new Handler3(); Handler4 h4 = new Handler4(); Handler5 h5 = new Handler5(); disruptor.handleEventsWith(h1, h2); disruptor.after(h1).handleEventsWith(h4); disruptor.after(h2).handleEventsWith(h5); disruptor.after(h4, h5).handleEventsWith(h3);
天天用心記錄一點點。內容也許不重要,但習慣很重要!