單機最快MQ—Disruptor

單機最快MQ—Disruptor

今天來說講我所知道的單機最快的MQ,它叫Disruptor數組

先來介紹一下Disruptor,從翻譯上來看,Disruptor—分裂、瓦解,Disruptor是國外某個金融、股票交易所開發的,2011年得到Duke獎,爲成爲單機最快的MQ,性能及高,無鎖CAS,單機支持高併發併發

怎麼樣,心動了沒?來來來,讓我來帶你們學習一下今天的主角—Disruptoride

你們能夠把Disruptor當作是內存裏的高效的隊列高併發

Disruptor簡介

  • 無鎖(CAS)、高併發,使用環形Buffer,直接覆蓋(不用清除)舊數據,下降GC頻繁,實現了基於事件的生產者消費者模型(觀察者模式)性能

    • 爲何說它是觀察者模式呢?由於消費者時刻關注着隊列裏有沒有消息,一旦有新消息產生,消費者線程就會馬上把它消費

環形隊列(RingBuffer)

  1. RingBuffer有一個序號sequence,指向下一個可用元素,採用數組實現,沒有首尾指針學習

    • Disruptor要求你對他設置長度的時候,設置成2的n次冪,這樣有利於二進制的運算

    image
    首先,它是基於數組實現的,遍歷起來要比鏈表要快
    其次不用維護首尾指針,固然他也沒有首尾指針,之須要維護一個sequence便可this

  2. **當全部位置都放滿了,再放下一個時,就會把0號位置覆蓋掉

這時就會有小夥伴着急了,怎麼能覆蓋掉呢,那我數據不就丟失了嗎?**spa

那確定是不會就讓他這麼輕易滴把這數據覆蓋掉滴,當須要覆蓋數據時,會執行一個策略,Disruptor給提供多種策略,說說比較經常使用的線程

  • BlockingWaitStrategy策略,常見且默認的等待策略,當這個隊列裏滿了,不執行覆蓋,而是在外面阻塞等待
  • SleepingWaitStrategy策略,看字面意思,用睡眠來等待,等待中循環調用LockSupport.parkNanos(1)來睡眠
  • YieldingWaitStrategy策略,循環等待sequence增長到合適的值,循環中調用Thread.yieId(),容許其餘準備好的線程執行

Disruptor開發步驟

  1. 定義Event—隊列中須要處理的元素
  2. 定義Event工廠,用於填充隊列
  3. 定義EventHandler(消費者),處理容器中的元素
//定義Event消息(事件)類
public class LongEvent{

    private long value;
    private String name;

    @Override
    public String toString() {
        return "LongEvent{" +
                "value=" + value +
                ", name='" + name + '\'' +
                '}';
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public long getValue() {
        return value;
    }
    public void setValue(long value) {
        this.value = value;
    }
}
//定義消息(事件)工廠
public class LongEventFactory implements EventFactory<LongEvent> {
    @Override
    public LongEvent newInstance() {
        return new LongEvent();
    }
}
//定義消息(事件)的消費方式
public class LongEventHandler implements EventHandler<LongEvent> {
    @Override
    public void onEvent(LongEvent longEvent, long l, boolean b) throws Exception {
        System.out.println(longEvent.getName()+"-----"+longEvent.getValue());
    }
}
//消息(事件)生產者
public class LongEventProducer {
    private final RingBuffer<LongEvent> ringBuffer;

    public LongEventProducer(RingBuffer<LongEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }
    public void onData(long val, String name) {
        long sequence = ringBuffer.next();
        try {
            LongEvent event = ringBuffer.get(sequence);
            event.setValue(val);
            event.setName(name);
        } finally {
            ringBuffer.publish(sequence);
        }
    }
}
public static void main(String[] args) {
        //new一個消息(事件)工廠
        LongEventFactory factory = new LongEventFactory();
        //設置環形Buffer的SIZE
        int size = 1024;
        //new Disruptor,參數是消息(事件)工廠,Buffer的Size,線程工廠
        Disruptor<LongEvent> longEventDisruptor = new Disruptor<LongEvent>(factory, size, Executors.defaultThreadFactory());
        //設置如何消費生產者產出的消息(事件)
        longEventDisruptor.handleEventsWith(new LongEventHandler());
        //啓動--環形Buffer建立成功,全部的位置均已建立好Event對象
        longEventDisruptor.start();
        //獲取Disruptor的環形Buffer
        RingBuffer<LongEvent> ringBuffer = longEventDisruptor.getRingBuffer();
        //new 消息(事件)生產者
        LongEventProducer producer = new LongEventProducer(ringBuffer);
        //循環調用-往裏添加消息
        for(long l = 0; l<100; l++) {
            //TODO   調用producer的生產消息(事件)的方法
            producer.onData(l,"MingLog-"+l);
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        //將消息(事件)發佈出去
        longEventDisruptor.shutdown();
    }

回過頭來看看,爲何Disruptor這麼快呢?

  1. 底層是數組,循環起來要比鏈表快
  2. 沒有首尾指針,免去了維護兩個指針的時間
  3. start()方法被調用,Disruptor被初始化,全部可用空間上的Event所有被初始化(提早建立好,每次進來在原對象上進行修改,不用從新new,不用建立新的對象,也就能夠下降GC的頻率),由於是一開始就把全部的Event初始化好的,因此next獲取下一個可用的Event時就不須要再去判斷該Event是否被初始化,減小了一步判斷
  4. Disruptor的Size是2的n次冪,方便進行二進制位運算,來肯定消息應該放在那個可用區域

好了,Disruptor講解到這裏就結束了,你們有什麼想要學習的均可以私信或評論告訴我哦\~ 我會盡全力知足你們滴,我學,你也學,咳咳\~廣告看多了翻譯

點贊、關注來一波好嗎,秋梨膏~

相關文章
相關標籤/搜索