1、Disruptor的簡介
Disruptor是由LAMX(歐洲頂級金融公司)設計和開源的大規模、高併發、低延遲的異步處理框架,也能夠說他
是最快的消息框架(JMS)。整個業務邏輯處理器徹底運行在內存中,其LMAX架構能夠達到一個線程裏每秒處理6百萬
流水,用1微秒的延遲能夠得到100K+吞吐量的爆炸性能。很是適合那種實時性高、延遲率低、業務流水量大的應用場
景,好比銀行的實時交易處理、讀寫操做分離、數據緩存等。
Disruptor是基於生產者-消費者模型,實現了"隊列「功能的無鎖高併發框架。他能夠作到一個生產者對應多個消
費者且消費者之間能夠並行的處理,也能夠造成前後順序的處理。Disruptor本質上解決的就是在兩個獨立的處理過
程之間交換數據。Disruptor框架的一些核心類有:
1.Disruptor:用於控制整個消費者-生產者模型的處理器
2.RingBuffer:用於存放數據
3.EventHandler:一個用於處理事件的接口(能夠當作生產者,也能夠當作消費者)。
4.EventFactory:事件工廠類。
5.WaitStrategy:用於實現事件處理等待RingBuffer遊標策略的接口。
6.SequeueBarrier:隊列屏障,用於處理訪問RingBuffer的序列。
7.用於運行disruptor的線程或者線程池。算法
2、Disruptor的入門
Disruptor的編寫通常能夠分爲如下幾步:
(1)定義事件;
(2)定義事件工廠;
(3)消費者–定義事件處理的具體實現;
(4)定義用於事件處理(消費者)的線程池;
(5)指定等待策略:
Disruptor 提供了多個WaitStrategy的實現,例如:BlockingWaitStrategy、SleepingWaitStrategy、
YieldingWaitStrategy等:
BlockingWaitStrategy是最低效的策略,但其對CPU的消耗最小而且在各類不一樣部署環境中能提供
更加一致的性能表現;
SleepingWaitStrategy 的性能表現跟BlockingWaitStrategy差很少,對CPU的消耗也相似,但其
對生產者線程的影響最小,適合用於異步日誌相似的場景;
YieldingWaitStrategy 的性能是最好的,適合用於低延遲的系統。在要求極高性能且事件處理線
數小於 CPU 邏輯核心數的場景中,推薦使用此策略;例如,CPU開啓超線程的特性。
WaitStrategy BLOCKING_WAIT = new BlockingWaitStrategy();
WaitStrategy SLEEPING_WAIT = new SleepingWaitStrategy();
WaitStrategy YIELDING_WAIT = new YieldingWaitStrategy();
(6)生產(發佈)消息;
(7)關閉disruptor業務邏輯處理器;
Disruptor的一些核心概念有:
- Ring Buffer(環形緩衝區) :
曾經RingBuffer是Disruptor中的最主要的對象,但從3.0版本開始,其職責被簡化爲僅僅負責對經過
Disruptor進行交換的數據(事件)進行存儲和更新。在一些更高級的應用場景中,Ring Buffer 能夠由用戶的自定
義實現來徹底替代。
- Sequence Disruptor :
經過順序遞增的序號來編號管理。經過其進行交換的數據(事件),對數據(事件)的處理過程老是沿着序
號逐個遞增處理。一個Sequence用於跟蹤標識某個特定的事件處理者( RingBuffer/Consumer )的處理進度。雖然一
個AtomicLong也能夠用於標識進度,但定義Sequence來負責該問題還有另外一個目的,那就是防止不一樣的 Sequence之間
的CPU緩存僞共享(Flase Sharing)問題。
- Sequencer :
Sequencer是Disruptor的真正核心。此接口有兩個實現類SingleProducerSequencer、MultiProducerSequencer
,它們定義在生產者和消費者之間快速、正確地傳遞數據的併發算法。
- Sequence Barrier
用於保持對RingBuffer的 main published Sequence 和Consumer依賴的其它Consumer的 Sequence 的引用。
Sequence Barrier 還定義了決定Consumer是否還有可處理的事件的邏輯。
- Wait Strategy
定義 Consumer 如何進行等待下一個事件的策略。(注:Disruptor定義了多種不一樣的策略,針對不一樣的場
景,提供了不同的性能表現)
- Event
在Disruptor的語義中,生產者和消費者之間進行交換的數據被稱爲事件(Event)。它不是一個被Disruptor
定義的特定類型,而是由 Disruptor 的使用者定義並指定。
- EventProcessor
EventProcessor持有特定消費者的Sequence,並提供用於調用事件處理實現的事件循環(Event Loop)。
- EventHandler
Disruptor 定義的事件處理接口,由用戶實現,用於處理事件,是 Consumer 的真正實現。
- Producer
即生產者,只是泛指調用 Disruptor 發佈事件的用戶代碼,Disruptor 沒有定義特定接口或類型。
栗子:緩存
Event:服務器
/** * 事件(Event)就是經過 Disruptor 進行交換的數據類型。 * @author lcy * */ public class TransactionEvent { private long seq; private double amount; private long callNumber; public long getCallNumber() { return callNumber; } @Override public String toString() { return "TransactionEvent [seq=" + seq + ", amount=" + amount + ", callNumber=" + callNumber + "]"; } public void setCallNumber(long callNumber) { this.callNumber = callNumber; } public long getSeq() { return seq; } public void setSeq(long seq) { this.seq = seq; } public double getAmount() { return amount; } public void setAmount(double amount) { this.amount = amount; } }
Factory:架構
/** * Event Factory 定義瞭如何實例化前面第1步中定義的事件(Event) * Disruptor 經過 EventFactory 在 RingBuffer 中預建立 Event 的實例。 一個 Event 實例實際上被用做一個「數據槽」,發佈者發佈前,先從 RingBuffer 得到一個 Event 的實例, 而後往 Event 實例中填充數據,以後再發布到 RingBuffer中,以後由 Consumer 得到該 Event 實例並從中讀取數據。 * @author lcy * */ public class TransactionEventFactory implements EventFactory<TransactionEvent>{ @Override public TransactionEvent newInstance() { // TODO Auto-generated method stub return new TransactionEvent(); } }
Customer:併發
/** * 事件處理類-交易流水初始化 * @author lcy * */ public class AmountTrasfer implements EventTranslator<TransactionEvent>{ @Override public void translateTo(TransactionEvent arg0, long arg1) { arg0.setAmount(Math.random()*99); arg0.setCallNumber(17088888888L); arg0.setSeq(System.currentTimeMillis()); System.out.println("設置交易流水:"+arg0.getSeq()); } }
/** * 消費者–定義事件處理的具體實現 * 攔截交易流水 * @author lcy * */ public class TransHandler implements EventHandler<TransactionEvent>,WorkHandler<TransactionEvent>{ @Override public void onEvent(TransactionEvent transactionEvent) throws Exception { System.out.println("交易流水號爲:"+transactionEvent.getSeq()+"||交易金額爲:"+transactionEvent.getAmount()); } @Override public void onEvent(TransactionEvent arg0, long arg1, boolean arg2) throws Exception { // TODO Auto-generated method stub this.onEvent(arg0); } }
/** * 發送驗證短信 * @author lcy * */ public class SendMsgHandler implements EventHandler<TransactionEvent>{ @Override public void onEvent(TransactionEvent arg0, long arg1, boolean arg2) throws Exception { // TODO Auto-generated method stub System.out.println("向手機號:"+arg0.getCallNumber()+"發送驗證短信......"); } }
/** * 交易流水入庫操做 * @author lcy * */ public class InnerDBHandler implements EventHandler<TransactionEvent>,WorkHandler<TransactionEvent>{ @Override public void onEvent(TransactionEvent arg0, long arg1, boolean arg2) throws Exception { // TODO Auto-generated method stub this.onEvent(arg0); } @Override public void onEvent(TransactionEvent arg0) throws Exception { arg0.setSeq(arg0.getSeq()*10000); System.out.println("攔截入庫流水號------------ "+arg0.getSeq()); } }
Producer:框架
/** * 生產者、發佈事件 * @author lcy * */ public class TransactionEventProducer implements Runnable { // 線程同步輔助類 - 容許一個或多個線程一直等待 CountDownLatch cdl; Disruptor disruptor; public TransactionEventProducer(CountDownLatch cdl, Disruptor disruptor) { super(); this.cdl = cdl; this.disruptor = disruptor; } public TransactionEventProducer() { super(); // TODO Auto-generated constructor stub } @Override public void run() { AmountTrasfer th; try { //Event對象初始化類 th = new AmountTrasfer(); //發佈事件 disruptor.publishEvent(th); } finally { // 遞減鎖存器的計數 -若是計數到達零,則釋放全部等待的線程。 cdl.countDown(); } } // 定義環大小,2的倍數 private static final int BUFFER_SIZE = 1024; // 定義處理事件的線程或線程池 ExecutorService pool = Executors.newFixedThreadPool(7); /** * 批處理模式 * @throws Exception */ public void BatchDeal() throws Exception { //建立一個單生產者的ringBuffer final RingBuffer<TransactionEvent> ringBuffer = RingBuffer.createSingleProducer(new EventFactory<TransactionEvent>() { @Override public TransactionEvent newInstance() { return new TransactionEvent(); } //設置等待策略,YieldingWaitStrategy 的性能是最好的,適合用於低延遲的系統。 }, BUFFER_SIZE,new YieldingWaitStrategy()); //建立SequenceBarrier SequenceBarrier barrier = ringBuffer.newBarrier(); //建立消息處理器 BatchEventProcessor<TransactionEvent> eventProcessor = new BatchEventProcessor<TransactionEvent>(ringBuffer,barrier,new InnerDBHandler()); //構造反向依賴,eventProcessor之間沒有依賴關係則能夠將Sequence直接加入 ringBuffer.addGatingSequences(eventProcessor.getSequence()); //提交消息處理器 pool.submit(eventProcessor); //提交一個有返回值的任務用於執行,返回一個表示任務的未決結果的 Future。 Future<Void> submit = pool.submit(new Callable<Void>() { //計算結果,若是沒法計算結果則拋出異常 @Override public Void call() throws Exception { long seq; for (int i=0;i<7000;i++) { System.out.println("生產者:"+i); //環裏一個可用的區塊 seq=ringBuffer.next(); //爲環裏的對象賦值 ringBuffer.get(seq).setAmount(Math.random()*10); System.out.println("TransactionEvent: "+ringBuffer.get(seq).toString()); //發佈這個區塊的數據, ringBuffer.publish(seq); } return null; } }); //等待計算完成,而後獲取其結果。 submit.get(); Thread.sleep(1000); //關閉消息處理器 eventProcessor.halt(); //關閉線程池 pool.shutdown(); } /** * 工做池模式 * @throws Exception */ @SuppressWarnings("unchecked") public void poolDeal() throws Exception { RingBuffer<TransactionEvent> ringBuffer = RingBuffer.createSingleProducer(new EventFactory<TransactionEvent>() { @Override public TransactionEvent newInstance() { return new TransactionEvent(); } }, BUFFER_SIZE, new YieldingWaitStrategy()); SequenceBarrier barrier = ringBuffer.newBarrier(); //建立一個定長的線程池 ExecutorService pool2 = Executors.newFixedThreadPool(5); //交易流水入庫操做 WorkHandler<TransactionEvent> innerDBHandler = new InnerDBHandler(); ExceptionHandler arg2; WorkerPool<TransactionEvent> workerPool = new WorkerPool<TransactionEvent>(ringBuffer, barrier, new IgnoreExceptionHandler(), innerDBHandler); workerPool.start(pool2); long seq; for(int i =0;i<7;i++){ seq = ringBuffer.next(); ringBuffer.get(seq).setAmount(Math.random()*99); ringBuffer.publish(seq); } Thread.sleep(1000); workerPool.halt(); pool2.shutdown(); } /** * disruptor處理器用來組裝生產者和消費者 * @throws Exception */ @SuppressWarnings("unchecked") public void disruptorManage() throws Exception{ //建立用於處理事件的線程池 ExecutorService pool2 = Executors.newFixedThreadPool(7); //建立disruptor對象 /** * 用來指定數據生成者有一個仍是多個,有兩個可選值ProducerType.SINGLE和ProducerType.MULTI * BusySpinWaitStrategy是一種延遲最低,最耗CPU的策略。一般用於消費線程數小於CPU數的場景 */ Disruptor<TransactionEvent> disruptor2 = new Disruptor<TransactionEvent>(new EventFactory<TransactionEvent>() { @Override public TransactionEvent newInstance() { return new TransactionEvent(); } },BUFFER_SIZE,pool2,ProducerType.SINGLE,new BusySpinWaitStrategy()); //建立消費者組,先執行攔截交易流水和入庫操做 EventHandlerGroup<TransactionEvent> eventsWith = disruptor2.handleEventsWith(new InnerDBHandler(),new TransHandler()); //在進行風險交易的2次驗證操做 eventsWith.then(new SendMsgHandler()); //啓動disruptor disruptor2.start(); //在線程能經過 await()以前,必須調用 countDown() 的次數 CountDownLatch latch = new CountDownLatch(1); //將封裝好的TransactionEventProducer類提交 pool2.submit(new TransactionEventProducer(latch,disruptor2)); //使當前線程在鎖存器倒計數至零以前一直等待,以保證生產者任務徹底消費掉 latch.await(); //關閉disruptor業務邏輯處理器 disruptor2.shutdown(); //銷燬線程池 pool2.shutdown(); } }
Test:dom
/** * 測試類 * @author lcy * */ public class Test { public static void main(String[] args) throws Exception { TransactionEventProducer producer = new TransactionEventProducer(); for (int i = 0; i < 100; i++) producer.disruptorManage(); System.out.println("--------------------------------------------------"); } }
3、記一次生產上的BUG
前段時間升級的時候出現了這樣一個BUG,致使了近萬用戶的交易失敗。首先確認了咱們在生產上並無部署攔
截交易的規則,全部的交易流水都是放行的不會加入咱們的風險名單庫。那麼內存庫裏的幾萬灰名單是怎麼來的呢?
咱們在上線成功後需使用真實的用戶進行一波生產上的測試,而在測試的過程當中爲了配合測試那邊的需求,
需將特定的幾個測試帳號配置成加入灰名單並進行2次認證的攔截規則。測試經過後便將那幾條測試規則給撤回了。但
是咱們忽略了一個問題,由於Disruptor框架在初始化環的時候,只會new一次這個對象。這就致使了插入環裏「槽」的對
象始終都是第一次進入「灰名單」對象,等到環被塞滿後下條流水進來的時候就會使用「槽」裏的「灰名單」對象。即便這筆
交易不是風險交易也會加入到灰名單中,致使了大量的用戶交易失敗。
上線後的次日,咱們頭兒就意識到了這個問題,想經過重啓服務、清空環來暫時解決這個問題(服務器有負載均
衡),由於環被清空後,以前在環裏的「灰名單」對象也就不存在了,並且生產上沒有部署將用戶加入「灰名單」的規則,環
裏的對象就必定是「乾淨的」,這個問題也就獲得瞭解決。可是、但是、可可是、萬萬沒想到啊,當晚生產上仍是出現了問題。
灰名單裏的用戶數量已經逼近2萬了,大量用戶不能進行電子交易。
爲何項目已經被重啓了,環也被清空了,也沒有規則會產生新的灰名單,那2萬的灰名單用戶是從哪來的?過後
經過查看代碼發現,雖然環被清空了,可是在清空以前已經有部分用戶被存到了灰名單裏。這些用戶在某一時間再次
進行交易時,會從新將這條交易的狀態設置爲「灰名單」(其餘業務須要),這就致使了接待這條交易流水的「槽」會被重
新賦值爲「灰名單」的狀態,而後環裏的「灰名單」槽就會越滾越多。
Event在使用的時候必定不要忘記將關鍵的屬性進行初始化,這樣才能保證從環裏取出的對象是初始狀態的,不會被上次處理的數據所影響。異步