高性能低延遲且簡單的併發框架-DISRUPTOR

     Disruptor是一個用於在線程間通訊的高效低延時的消息組件,它像個加強的隊列,使用柵欄(barrier)+序號(Sequencing)機制協調生產者與消費者,從而避免使用鎖和CAS,同時還組合使用預分配內存機制、緩存行機制(cache line)、批處理效應(batch effect)來達到高吞吐量和低時延的目標。java

相關背景概述

  • 併發的複雜性

         代碼併發執行主要考慮兩個方面:互斥和變化的可見性。互斥主要用來管理對某些資源的競爭訪問;變化的可見性主要用來控制何時這些變化對其餘線程可見。併發環境中最耗時的操做其實就是併發寫操做。多線程對同一個資源近與須要複雜昂貴的協調,一般會經過某種鎖來實現資源協調:git

        1.鎖的代價github

          鎖提供了互斥,並可以確保變化可以以一個肯定的順序讓其它的線程可見鎖會涉及到操做系統的上下文切換,操做系統會掛起全部在等待這把鎖的線程,直到鎖持有者釋放該鎖數組

       2.CAS的代價緩存

          CAS依賴於處理器的支持,CAS相對於鎖是很是高效的,CAS並非免費的,通常是採用CPU空轉的方式數據結構

 

    Disruptor的設計初衷就是爲了解決鎖代價和CAS代價過大等問題,以求最優化內存分配,使用緩存友好的方式來最佳使用硬件資源,Disruptor地核心機制在於:以RingBuffer的形式預分配有界的數據結構,單個或多個生產者能夠抽RingBuffer中寫入數據,單個或多個消費者能夠從RingBuffer讀取數據。多線程

 核心對象

  • RingBuffer  一個環形的數據結構,對象初始化時,會使用事件Event進行填充,Buffer的大小必須是2的冪次方,方便移位操做。
  • Event  無指定具體接口,用戶本身實現,能夠綁定本身的業務數據。
  • EventFactory  生產事件Event的工廠,由用戶本身實現(extends com.lmax.disruptor.EventFactory
  • EvnetTranslator  事件發佈回調接口,由用戶本身實現(extends com.lmax.disruptor.EventTranslator),調用translateTo方法負責將業務參數設置到事件中。
  • Sequencer 序列產生器,也是協調生產者和消費者及實現高併發的核心,有MultiProducerSequencer和SingleProducerSequencer兩個實現類。
  • SequenceBarrier  擁有RingBuffer的發佈事件Sequence引用和消費者依賴的Sequence引用。決定消費者消費可消費的Sequence。
  • EventHandler  事件的處理者,由用戶本身實現,消費者處理消息。
  • EventProcesssor  事件的處理器,單獨在一個線程中運行。持有特定消費者的Sequence,並提供用於調用事件處理實現的事件循環
  • WorkHandler   事件的處理者,由用戶本身實現。(我的感受和EventHandler沒多大區別)
  • WorkProcessor  事件的處理器,單獨在一個線程中運行。
  • WorkerPool  一組WorkProcessor的處理。
  • WaitStrategy  在消費者比生產者快時,消費者處理器的等待策略。

       BlockingWaitStrategy:經過線程阻塞的方式,等待生產者喚醒併發

       BusySpinWaitStrategy:線程一直自旋等待,比較耗CPU。dom

       LiteBlockingWaitStrategy:經過線程阻塞的方式,等待生產者喚醒,比BlockingWaitStrategy要輕,某些狀況下能夠減小阻塞的次數。高併發

       PhasedBackoffWaitStrategy:根據指定的時間段參數和指定的等待策略決定採用哪一種等待策略。

       SleepingWaitStrategy:可經過參數設置,使線程經過Thread.yield()主動放棄執行,經過線程調度器從新調度;或一直自旋等待。

       TimeoutBlockingWaitStrategy:經過參數設置阻塞時間,若是超時則拋出異常。

       YieldingWaitStrategy: 經過Thread.yield()主動放棄執行,經過線程調度器從新調度。

 

  實現原理及分析

  • RingBuffer的實現:封裝了一個對象數組,RingBuffer實例化時,用Event填充。生產者和消費者經過對序列(long的原子操做)取模計算獲取對象數組中的Event.
public E get(long sequence)
{
    return elementAt(sequence);
}
protected final E elementAt(long sequence)
{
    return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT));
}
  • 單個生產者的實現:保存有全部消費者當前消費的前一個序列值,在取下一個要發佈的序列時,檢查要發佈的序列是否覆蓋全部消費者正在處理的最小序列。若是未覆蓋,則獲取可發佈的遊標值,若是覆蓋(說明緩存已經滿了),則自旋等待,直到能夠發佈。發佈事件時則先發布,後指定當前遊標爲發佈的序列值。
  • 多個生產者的實現:保存有全部消費者當前消費的前一個序列值,並維護一個和RingBuffer同樣大小的數組,在取下一個要發佈的序列時,檢查要發佈的序列是否覆蓋全部消費者正在處理的最小序列。若是未覆蓋,則先發布,後指定當前遊標爲發佈的序列值,若是未覆蓋,則獲取可發佈的遊標值,若是覆蓋(說明緩存已經滿了),則自旋等待,直到能夠發佈。一個生產者獲取可發佈的序列後,當即更新當前遊標。發佈事件時生產者每發佈一個序列,則記錄到數組指定位置。
  • 消費者的實現:消費者保持一個本身的序列,每次累加後nextSequence,去獲取可訪問的最大序列。對於一個生產者,就是nextSequence到RingBuffer當前遊標的序列。對於多個生產者,就是nextSequence到RingBuffer當前遊標之間,最大的連續的序列集。

DEMO實例

  • POM
  • <dependency>
        <groupId>com.lmax</groupId>
        <artifactId>disruptor</artifactId>
        <version>3.4.2</version>
    </dependency>
         
  • 建立一個Event對象
  • package com.scgaopan.disruptor.pakingdemo;
    
    /**
     * Author:scgaopan
     * Date:18/10/25
     * Description:  汽車信息
     */
    public class MyInParkingDataEvent {
    
        private  String carLicense;//車牌號
    
        public String getCarLicense() {
            return carLicense;
        }
    
        public void setCarLicense(String carLicense) {
            this.carLicense = carLicense;
        }
    }
  • 建立監聽類,消費者的具體實現,這裏建立了3個消費者
  • package com.scgaopan.disruptor.pakingdemo.handler;
    
    import com.lmax.disruptor.EventHandler;
    import com.lmax.disruptor.WorkHandler;
    import com.scgaopan.disruptor.pakingdemo.MyInParkingDataEvent;
    
    /**
     * Author:scgaopan
     * Date:18/10/25
     * Description:第一個消費者,負責保存進場汽車的信息
     */
    public class MyParkingDataInDbHandler implements EventHandler<MyInParkingDataEvent>, WorkHandler<MyInParkingDataEvent> {
    
    
        public void onEvent(MyInParkingDataEvent event, long sequence, boolean endOfBatch) throws Exception {
            System.out.println("this is Eventhandler executed.........data="+event.getCarLicense());
           this.onEvent(event);
    
        }
    
        public void onEvent(MyInParkingDataEvent event) throws Exception {
    
            System.out.println("this is WorkHandler executed.........data="+event.getCarLicense());
            Thread.sleep(3000);
            System.out.println("  成功保存車牌信息到database中,currentThread="+Thread.currentThread().getId()+",data="+event.getCarLicense());
    
    
    
        }
    }
    package com.scgaopan.disruptor.pakingdemo.handler;
    
    import com.lmax.disruptor.EventHandler;
    import com.scgaopan.disruptor.pakingdemo.MyInParkingDataEvent;
    
    /**
     * Author:scgaopan
     * Date:18/10/25
     * Description:
     */
    public class MyParkingDataToKafkaHandler implements EventHandler<MyInParkingDataEvent> {
    
        public void onEvent(MyInParkingDataEvent event, long sequence, boolean endOfBatch) throws Exception {
    
    
            Thread.sleep(2000);
            System.out.println("經過kafka成功,當前線程="+Thread.currentThread().getId()+" data="+event.getCarLicense());
        }
    }
    package com.scgaopan.disruptor.pakingdemo.handler;
    
    import com.lmax.disruptor.EventHandler;
    import com.scgaopan.disruptor.pakingdemo.MyInParkingDataEvent;
    
    /**
     * Author:scgaopan
     * Date:18/10/25
     * Description:
     */
    public class MyParkingDataSmsHandler implements EventHandler<MyInParkingDataEvent> {
    
        public void onEvent(MyInParkingDataEvent event, long sequence, boolean endOfBatch) throws Exception {
            //Thread.sleep(1000);
            System.out.println("發送消息成功,當前線程="+Thread.currentThread().getId()+" data="+event.getCarLicense());
        }
    }
  • EventTranslator的實現,事件發佈回調接口,負責將業務參數設置到事件中
  • package com.scgaopan.disruptor.pakingdemo.producer;
    
    import com.lmax.disruptor.EventTranslator;
    import com.scgaopan.disruptor.pakingdemo.MyInParkingDataEvent;
    
    /**
     * Author:scgaopan
     * Date:18/10/25
     * Description:
     */
    public class MyInParkingDataEventTranslator implements EventTranslator<MyInParkingDataEvent> {
    
    
        public void translateTo(MyInParkingDataEvent myInParkingDataEvent, long sequence) {
            this.generateData(myInParkingDataEvent);
        }
    
        private MyInParkingDataEvent generateData(MyInParkingDataEvent myInParkingDataEvent) {
            myInParkingDataEvent.setCarLicense("車牌號: 鄂A-" + (int) (Math.random() * 100000)); // 隨機生成一個車牌號
           // System.out.println("Thread Id " + Thread.currentThread().getId() + " 寫完一個event");
            return myInParkingDataEvent;
    
        }
    }
  • 事件的發佈類
  • package com.scgaopan.disruptor.pakingdemo.producer;
    
    import com.lmax.disruptor.dsl.Disruptor;
    import com.scgaopan.disruptor.pakingdemo.DateUtil;
    import com.scgaopan.disruptor.pakingdemo.MyInParkingDataEvent;
    
    import java.util.concurrent.CountDownLatch;
    
    /**
     * Author:scgaopan
     * Date:18/10/25
     * Description:
     */
    public class MyInParkingDataEventPublisher implements Runnable {
    
        private CountDownLatch countDownLatch; // 用於監聽初始化操做,等初始化執行完畢後,通知主線程繼續工做
        private Disruptor<MyInParkingDataEvent> disruptor;
        private static final Integer NUM = 1; // 1,10,100,1000
    
        public MyInParkingDataEventPublisher(CountDownLatch countDownLatch,
                                             Disruptor<MyInParkingDataEvent> disruptor) {
            this.countDownLatch = countDownLatch;
            this.disruptor = disruptor;
        }
    
        public void run() {
            MyInParkingDataEventTranslator eventTranslator = new MyInParkingDataEventTranslator();
            try {
                for (int i = 0; i < NUM; i++) {
                    disruptor.publishEvent(eventTranslator);
                    Thread.sleep(1000); // 假設一秒鐘進一輛車
                    System.out.println(DateUtil.getCurrentTime()+ "有一輛車進入...............");
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                countDownLatch.countDown(); // 執行完畢後通知 await()方法
                System.out.println(NUM + "輛車已經所有進入進入停車場!");
            }
    
        }
    }
  • main類
  • package com.scgaopan.disruptor.pakingdemo;
    
    import com.lmax.disruptor.EventFactory;
    import com.lmax.disruptor.RingBuffer;
    import com.lmax.disruptor.YieldingWaitStrategy;
    import com.lmax.disruptor.dsl.Disruptor;
    import com.lmax.disruptor.dsl.EventHandlerGroup;
    import com.lmax.disruptor.dsl.ProducerType;
    import com.scgaopan.disruptor.pakingdemo.handler.MyParkingDataInDbHandler;
    import com.scgaopan.disruptor.pakingdemo.handler.MyParkingDataSmsHandler;
    import com.scgaopan.disruptor.pakingdemo.handler.MyParkingDataToKafkaHandler;
    import com.scgaopan.disruptor.pakingdemo.producer.MyInParkingDataEventPublisher;
    
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    /**
     * Author:scgaopan
     * Date:18/10/25
     * Description:
     */
    public class MyInParkingDataEventMain {
    
        public static void main(String[] args) {
            long beginTime=System.currentTimeMillis();
            int bufferSize = 2048; // 2的N次方
            try {
                // 建立消費者線程池,負責處理Disruptor的四個消費者
                ExecutorService executor = Executors.newFixedThreadPool(4);
    
                // 初始化一個 Disruptor
                Disruptor<MyInParkingDataEvent> disruptor = new Disruptor<MyInParkingDataEvent>(new EventFactory<MyInParkingDataEvent>() {
    
                    public MyInParkingDataEvent newInstance() {
                        return new MyInParkingDataEvent(); // Event 初始化工廠
                    }
                }, bufferSize, executor, ProducerType.SINGLE, new YieldingWaitStrategy());
    
                // 使用disruptor建立消費者組 MyParkingDataInDbHandler 和 MyParkingDataToKafkaHandler
                EventHandlerGroup<MyInParkingDataEvent> handlerGroup = disruptor.handleEventsWith(
                        new MyParkingDataInDbHandler(), new MyParkingDataToKafkaHandler());
    
                // 當上面兩個消費者處理(這裏是處理結束,而不是通知)結束後再消耗 smsHandler
                MyParkingDataSmsHandler myParkingDataSmsHandler = new MyParkingDataSmsHandler();
                handlerGroup.then(myParkingDataSmsHandler);
                // 啓動Disruptor
                disruptor.start();
    
                CountDownLatch countDownLatch = new CountDownLatch(1); // 一個生產者線程準備好了就能夠通知主線程繼續工做了
                // 生產者生成數據
                executor.submit(new MyInParkingDataEventPublisher(countDownLatch, disruptor));
                countDownLatch.await(); // 等待生產者結束
    
                disruptor.shutdown();
                executor.shutdown();
            } catch (Exception e) {
                e.printStackTrace();
            }
    
            System.out.println("總耗時:"+(System.currentTimeMillis()-beginTime));
        }
    }
  • 代碼地址:https://gitee.com/scgaopan/diruptor-demo.git

Disruptor源碼地址: https://github.com/LMAX-Exchange/disruptor

相關文章
相關標籤/搜索