disruptor入門


1、Disruptor的簡介

  Disruptor是由LAMX(歐洲頂級金融公司)設計和開源的大規模、高併發、低延遲的異步處理框架,也能夠說他
是最快的消息框架(JMS)。整個業務邏輯處理器徹底運行在內存中,其LMAX架構能夠達到一個線程裏每秒處理6百萬
流水,用1微秒的延遲能夠得到100K+吞吐量的爆炸性能。很是適合那種實時性高、延遲率低、業務流水量大的應用場
景,好比銀行的實時交易處理、讀寫操做分離、數據緩存等。
  Disruptor是基於生產者-消費者模型,實現了"隊列「功能的無鎖高併發框架。他能夠作到一個生產者對應多個消
費者且消費者之間能夠並行的處理,也能夠造成前後順序的處理。Disruptor本質上解決的就是在兩個獨立的處理過
程之間交換數據。Disruptor框架的一些核心類有:
 1.Disruptor:用於控制整個消費者-生產者模型的處理器
   2.RingBuffer:用於存放數據
   3.EventHandler:一個用於處理事件的接口(能夠當作生產者,也能夠當作消費者)。
   4.EventFactory:事件工廠類。
   5.WaitStrategy:用於實現事件處理等待RingBuffer遊標策略的接口。
   6.SequeueBarrier:隊列屏障,用於處理訪問RingBuffer的序列。
   7.用於運行disruptor的線程或者線程池。算法

2、Disruptor的入門

  Disruptor的編寫通常能夠分爲如下幾步:
    (1)定義事件;
    (2)定義事件工廠;
    (3)消費者–定義事件處理的具體實現;
    (4)定義用於事件處理(消費者)的線程池;
    (5)指定等待策略:
      Disruptor 提供了多個WaitStrategy的實現,例如:BlockingWaitStrategy、SleepingWaitStrategy、
YieldingWaitStrategy等:
      BlockingWaitStrategy是最低效的策略,但其對CPU的消耗最小而且在各類不一樣部署環境中能提供
更加一致的性能表現;
      SleepingWaitStrategy 的性能表現跟BlockingWaitStrategy差很少,對CPU的消耗也相似,但其
對生產者線程的影響最小,適合用於異步日誌相似的場景;
      YieldingWaitStrategy 的性能是最好的,適合用於低延遲的系統。在要求極高性能且事件處理線
數小於 CPU 邏輯核心數的場景中,推薦使用此策略;例如,CPU開啓超線程的特性。
      WaitStrategy BLOCKING_WAIT = new BlockingWaitStrategy();
      WaitStrategy SLEEPING_WAIT = new SleepingWaitStrategy();
      WaitStrategy YIELDING_WAIT = new YieldingWaitStrategy();
    (6)生產(發佈)消息;
    (7)關閉disruptor業務邏輯處理器;
  Disruptor的一些核心概念有:
    - Ring Buffer(環形緩衝區) :
    曾經RingBuffer是Disruptor中的最主要的對象,但從3.0版本開始,其職責被簡化爲僅僅負責對經過
Disruptor進行交換的數據(事件)進行存儲和更新。在一些更高級的應用場景中,Ring Buffer 能夠由用戶的自定
義實現來徹底替代。
    - Sequence Disruptor :
    經過順序遞增的序號來編號管理。經過其進行交換的數據(事件),對數據(事件)的處理過程老是沿着序
號逐個遞增處理。一個Sequence用於跟蹤標識某個特定的事件處理者( RingBuffer/Consumer )的處理進度。雖然一
個AtomicLong也能夠用於標識進度,但定義Sequence來負責該問題還有另外一個目的,那就是防止不一樣的 Sequence之間
的CPU緩存僞共享(Flase Sharing)問題。
    - Sequencer :
    Sequencer是Disruptor的真正核心。此接口有兩個實現類SingleProducerSequencer、MultiProducerSequencer
,它們定義在生產者和消費者之間快速、正確地傳遞數據的併發算法。
    - Sequence Barrier
    用於保持對RingBuffer的 main published Sequence 和Consumer依賴的其它Consumer的 Sequence 的引用。
Sequence Barrier 還定義了決定Consumer是否還有可處理的事件的邏輯。
    - Wait Strategy
    定義 Consumer 如何進行等待下一個事件的策略。(注:Disruptor定義了多種不一樣的策略,針對不一樣的場
景,提供了不同的性能表現)
    - Event
  在Disruptor的語義中,生產者和消費者之間進行交換的數據被稱爲事件(Event)。它不是一個被Disruptor
定義的特定類型,而是由 Disruptor 的使用者定義並指定。
    - EventProcessor
    EventProcessor持有特定消費者的Sequence,並提供用於調用事件處理實現的事件循環(Event Loop)。
    - EventHandler
    Disruptor 定義的事件處理接口,由用戶實現,用於處理事件,是 Consumer 的真正實現。
    - Producer
    即生產者,只是泛指調用 Disruptor 發佈事件的用戶代碼,Disruptor 沒有定義特定接口或類型。
  栗子:緩存

  Event:服務器

/** * 事件(Event)就是經過 Disruptor 進行交換的數據類型。 * @author lcy * */
public class TransactionEvent { private long seq; private double amount; private long callNumber; public long getCallNumber() { return callNumber; } @Override public String toString() { return "TransactionEvent [seq=" + seq + ", amount=" + amount + ", callNumber=" + callNumber + "]"; } public void setCallNumber(long callNumber) { this.callNumber = callNumber; } public long getSeq() { return seq; } public void setSeq(long seq) { this.seq = seq; } public double getAmount() { return amount; } public void setAmount(double amount) { this.amount = amount; } }

  Factory:架構

/** * Event Factory 定義瞭如何實例化前面第1步中定義的事件(Event) * Disruptor 經過 EventFactory 在 RingBuffer 中預建立 Event 的實例。 一個 Event 實例實際上被用做一個「數據槽」,發佈者發佈前,先從 RingBuffer 得到一個 Event 的實例, 而後往 Event 實例中填充數據,以後再發布到 RingBuffer中,以後由 Consumer 得到該 Event 實例並從中讀取數據。 * @author lcy * */
public class TransactionEventFactory implements EventFactory<TransactionEvent>{ @Override public TransactionEvent newInstance() { // TODO Auto-generated method stub
        return new TransactionEvent(); } }

  Customer:併發

/** * 事件處理類-交易流水初始化 * @author lcy * */
public class AmountTrasfer implements EventTranslator<TransactionEvent>{ @Override public void translateTo(TransactionEvent arg0, long arg1) { arg0.setAmount(Math.random()*99); arg0.setCallNumber(17088888888L); arg0.setSeq(System.currentTimeMillis()); System.out.println("設置交易流水:"+arg0.getSeq()); } }
/** * 消費者–定義事件處理的具體實現 * 攔截交易流水 * @author lcy * */
public class TransHandler implements EventHandler<TransactionEvent>,WorkHandler<TransactionEvent>{ @Override public void onEvent(TransactionEvent transactionEvent) throws Exception { System.out.println("交易流水號爲:"+transactionEvent.getSeq()+"||交易金額爲:"+transactionEvent.getAmount()); } @Override public void onEvent(TransactionEvent arg0, long arg1, boolean arg2) throws Exception { // TODO Auto-generated method stub
        this.onEvent(arg0); } }
/** * 發送驗證短信 * @author lcy * */
public class SendMsgHandler implements EventHandler<TransactionEvent>{ @Override public void onEvent(TransactionEvent arg0, long arg1, boolean arg2) throws Exception { // TODO Auto-generated method stub
         System.out.println("向手機號:"+arg0.getCallNumber()+"發送驗證短信......"); } }
/** * 交易流水入庫操做 * @author lcy * */
public class InnerDBHandler implements EventHandler<TransactionEvent>,WorkHandler<TransactionEvent>{ @Override public void onEvent(TransactionEvent arg0, long arg1, boolean arg2) throws Exception { // TODO Auto-generated method stub
        this.onEvent(arg0); } @Override public void onEvent(TransactionEvent arg0) throws Exception { arg0.setSeq(arg0.getSeq()*10000); System.out.println("攔截入庫流水號------------  "+arg0.getSeq()); } }

  Producer:框架

/** * 生產者、發佈事件 * @author lcy * */
public class TransactionEventProducer implements Runnable { // 線程同步輔助類 - 容許一個或多個線程一直等待
 CountDownLatch cdl; Disruptor disruptor; public TransactionEventProducer(CountDownLatch cdl, Disruptor disruptor) { super(); this.cdl = cdl; this.disruptor = disruptor; } public TransactionEventProducer() { super(); // TODO Auto-generated constructor stub
 } @Override public void run() { AmountTrasfer th; try { //Event對象初始化類
            th = new AmountTrasfer(); //發佈事件
 disruptor.publishEvent(th); } finally { // 遞減鎖存器的計數 -若是計數到達零,則釋放全部等待的線程。
 cdl.countDown(); } } // 定義環大小,2的倍數
    private static final int BUFFER_SIZE = 1024; // 定義處理事件的線程或線程池
    ExecutorService pool = Executors.newFixedThreadPool(7); /** * 批處理模式 * @throws Exception */
    public void BatchDeal() throws Exception { //建立一個單生產者的ringBuffer
        final RingBuffer<TransactionEvent> ringBuffer = RingBuffer.createSingleProducer(new EventFactory<TransactionEvent>() { @Override public TransactionEvent newInstance() { return new TransactionEvent(); } //設置等待策略,YieldingWaitStrategy 的性能是最好的,適合用於低延遲的系統。
        }, BUFFER_SIZE,new YieldingWaitStrategy()); //建立SequenceBarrier
        SequenceBarrier barrier = ringBuffer.newBarrier(); //建立消息處理器
        BatchEventProcessor<TransactionEvent> eventProcessor = new BatchEventProcessor<TransactionEvent>(ringBuffer,barrier,new InnerDBHandler()); //構造反向依賴,eventProcessor之間沒有依賴關係則能夠將Sequence直接加入
 ringBuffer.addGatingSequences(eventProcessor.getSequence()); //提交消息處理器
 pool.submit(eventProcessor); //提交一個有返回值的任務用於執行,返回一個表示任務的未決結果的 Future。
        Future<Void> submit = pool.submit(new Callable<Void>() { //計算結果,若是沒法計算結果則拋出異常
 @Override public Void call() throws Exception { long seq; for (int i=0;i<7000;i++) { System.out.println("生產者:"+i); //環裏一個可用的區塊
                    seq=ringBuffer.next(); //爲環裏的對象賦值
                    ringBuffer.get(seq).setAmount(Math.random()*10); System.out.println("TransactionEvent:   "+ringBuffer.get(seq).toString()); //發佈這個區塊的數據,
 ringBuffer.publish(seq); } return null; } }); //等待計算完成,而後獲取其結果。
 submit.get(); Thread.sleep(1000); //關閉消息處理器
 eventProcessor.halt(); //關閉線程池
 pool.shutdown(); } /** * 工做池模式 * @throws Exception */ @SuppressWarnings("unchecked") public void poolDeal() throws Exception { RingBuffer<TransactionEvent> ringBuffer = RingBuffer.createSingleProducer(new EventFactory<TransactionEvent>() { @Override public TransactionEvent newInstance() { return new TransactionEvent(); } }, BUFFER_SIZE, new YieldingWaitStrategy()); SequenceBarrier barrier = ringBuffer.newBarrier(); //建立一個定長的線程池
        ExecutorService pool2 = Executors.newFixedThreadPool(5); //交易流水入庫操做
        WorkHandler<TransactionEvent> innerDBHandler = new InnerDBHandler(); ExceptionHandler arg2; WorkerPool<TransactionEvent> workerPool = new WorkerPool<TransactionEvent>(ringBuffer, barrier, new IgnoreExceptionHandler(), innerDBHandler); workerPool.start(pool2); long seq; for(int i =0;i<7;i++){ seq = ringBuffer.next(); ringBuffer.get(seq).setAmount(Math.random()*99); ringBuffer.publish(seq); } Thread.sleep(1000); workerPool.halt(); pool2.shutdown(); } /** * disruptor處理器用來組裝生產者和消費者 * @throws Exception */ @SuppressWarnings("unchecked") public void disruptorManage() throws Exception{ //建立用於處理事件的線程池
        ExecutorService pool2 = Executors.newFixedThreadPool(7); //建立disruptor對象
        /** * 用來指定數據生成者有一個仍是多個,有兩個可選值ProducerType.SINGLE和ProducerType.MULTI * BusySpinWaitStrategy是一種延遲最低,最耗CPU的策略。一般用於消費線程數小於CPU數的場景 */ Disruptor<TransactionEvent> disruptor2 = new Disruptor<TransactionEvent>(new EventFactory<TransactionEvent>() { @Override public TransactionEvent newInstance() { return new TransactionEvent(); } },BUFFER_SIZE,pool2,ProducerType.SINGLE,new BusySpinWaitStrategy()); //建立消費者組,先執行攔截交易流水和入庫操做
        EventHandlerGroup<TransactionEvent> eventsWith = disruptor2.handleEventsWith(new InnerDBHandler(),new TransHandler()); //在進行風險交易的2次驗證操做
        eventsWith.then(new SendMsgHandler()); //啓動disruptor
 disruptor2.start(); //在線程能經過 await()以前,必須調用 countDown() 的次數
        CountDownLatch latch = new CountDownLatch(1); //將封裝好的TransactionEventProducer類提交
        pool2.submit(new TransactionEventProducer(latch,disruptor2)); //使當前線程在鎖存器倒計數至零以前一直等待,以保證生產者任務徹底消費掉
 latch.await(); //關閉disruptor業務邏輯處理器
 disruptor2.shutdown(); //銷燬線程池
 pool2.shutdown(); } }

  Test:dom

/** * 測試類 * @author lcy * */
public class Test { public static void main(String[] args) throws Exception { TransactionEventProducer producer = new TransactionEventProducer(); for (int i = 0; i < 100; i++) producer.disruptorManage(); System.out.println("--------------------------------------------------"); } }

3、記一次生產上的BUG

  前段時間升級的時候出現了這樣一個BUG,致使了近萬用戶的交易失敗。首先確認了咱們在生產上並無部署攔
截交易的規則,全部的交易流水都是放行的不會加入咱們的風險名單庫。那麼內存庫裏的幾萬灰名單是怎麼來的呢?
  咱們在上線成功後需使用真實的用戶進行一波生產上的測試,而在測試的過程當中爲了配合測試那邊的需求,
需將特定的幾個測試帳號配置成加入灰名單並進行2次認證的攔截規則。測試經過後便將那幾條測試規則給撤回了。但
是咱們忽略了一個問題,由於Disruptor框架在初始化環的時候,只會new一次這個對象。這就致使了插入環裏「槽」的對
象始終都是第一次進入「灰名單」對象,等到環被塞滿後下條流水進來的時候就會使用「槽」裏的「灰名單」對象。即便這筆
交易不是風險交易也會加入到灰名單中,致使了大量的用戶交易失敗。
  上線後的次日,咱們頭兒就意識到了這個問題,想經過重啓服務、清空環來暫時解決這個問題(服務器有負載均
衡),由於環被清空後,以前在環裏的「灰名單」對象也就不存在了,並且生產上沒有部署將用戶加入「灰名單」的規則,環
裏的對象就必定是「乾淨的」,這個問題也就獲得瞭解決。可是、但是、可可是、萬萬沒想到啊,當晚生產上仍是出現了問題。
灰名單裏的用戶數量已經逼近2萬了,大量用戶不能進行電子交易。
  爲何項目已經被重啓了,環也被清空了,也沒有規則會產生新的灰名單,那2萬的灰名單用戶是從哪來的?過後
經過查看代碼發現,雖然環被清空了,可是在清空以前已經有部分用戶被存到了灰名單裏。這些用戶在某一時間再次
進行交易時,會從新將這條交易的狀態設置爲「灰名單」(其餘業務須要),這就致使了接待這條交易流水的「槽」會被重
新賦值爲「灰名單」的狀態,而後環裏的「灰名單」槽就會越滾越多。
  Event在使用的時候必定不要忘記將關鍵的屬性進行初始化,這樣才能保證從環裏取出的對象是初始狀態的,不會被上次處理的數據所影響。異步

相關文章
相關標籤/搜索