併發編程之Disruptor併發框架

1、什麼是Disruptor

Martin Fowler在本身網站上寫了一篇LMAX架構的文章,在文章中他介紹了LMAX是一種新型零售金融交易平臺,它可以以很低的延遲產生大量交易。這個系統是創建在JVM平臺上,其核心是一個業務邏輯處理器,它可以在一個線程裏每秒處理6百萬訂單。業務邏輯處理器徹底是運行在內存中,使`用事件源驅動方式。業務邏輯處理器的核心是Disruptor。java

Disruptor它是一個開源的併發框架,並得到2011 Duke’s 程序框架創新獎,可以在無鎖的狀況下實現網絡的Queue併發操做。算法

Disruptor是一個高性能的異步處理框架,或者能夠認爲是最快的消息框架(輕量的JMS),也能夠認爲是一個觀察者模式的實現,或者事件監聽模式的實現。segmentfault

在使用以前,首先說明disruptor主要功能加以說明,你能夠理解爲他是一種高效的"生產者-消費者"模型。也就性能遠遠高於傳統的BlockingQueue容器。數組

在JDK的多線程與併發庫一文中, 提到了BlockingQueue實現了生產者-消費者模型
BlockingQueue是基於鎖實現的, 而鎖的效率一般較低. 有沒有使用CAS機制實現的生產者-消費者緩存

Disruptor使用觀察者模式, 主動將消息發送給消費者, 而不是等消費者從隊列中取; 在無鎖的狀況下, 實現queue(環形, RingBuffer)的併發操做, 性能遠高於BlockingQueue網絡

2、Disruptor的設計方案

Disruptor經過如下設計來解決隊列速度慢的問題:數據結構

  • 環形數組結構:

爲了不垃圾回收,採用數組而非鏈表。同時,數組對處理器的緩存機制更加友好。多線程

  • 元素位置定位:

數組長度2^n,經過位運算,加快定位的速度。下標採起遞增的形式。不用擔憂index溢出的問題。index是long類型,即便100萬QPS的處理速度,也須要30萬年才能用完。架構

  • 無鎖設計:

每一個生產者或者消費者線程,會先申請能夠操做的元素在數組中的位置,申請到以後,直接在該位置寫入或者讀取數據。併發

3、Disruptor實現生產與消費

一、Pom Maven依賴信息

<dependencies>
        <dependency>
            <groupId>com.lmax</groupId>
            <artifactId>disruptor</artifactId>
            <version>3.2.1</version>
        </dependency>
    </dependencies>

二、首先聲明一個Event來包含須要傳遞的數據:

//定義事件event  經過Disruptor 進行交換的數據類型。
public class LongEvent {

    private Long value;

    public Long getValue() {
        return value;
    }

    public void setValue(Long value) {
        this.value = value;
    }

}

三、須要讓Disruptor爲咱們建立事件,咱們同時還聲明瞭一個EventFactory來實例化Event對象。

public class LongEventFactory implements EventFactory<LongEvent> {

    public LongEvent newInstance() {

        return new LongEvent();
    }

}

四、事件消費者,也就是一個事件處理器。這個事件處理器簡單地把事件中存儲的數據打印到終端:

public class LongEventHandler implements EventHandler<LongEvent>  {

    public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {
         System.out.println("消費者:"+event.getValue());
    }

}

五、定義生產者發送事件

public class LongEventProducer {

    public final RingBuffer<LongEvent> ringBuffer;

    public LongEventProducer(RingBuffer<LongEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    public void onData(ByteBuffer byteBuffer) {
        // 1.ringBuffer 事件隊列 下一個槽
        long sequence = ringBuffer.next();
        Long data = null;
        try {
            //2.取出空的事件隊列
            LongEvent longEvent = ringBuffer.get(sequence);
            data = byteBuffer.getLong(0);
            //3.獲取事件隊列傳遞的數據
            longEvent.setValue(data);
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        } finally {
            System.out.println("生產這準備發送數據");
            //4.發佈事件
            ringBuffer.publish(sequence);

        }
    }

}

六、main函數執行調用

public class DisruptorMain {

    public static void main(String[] args) {
        // 1.建立一個可緩存的線程 提供線程來出發Consumer 的事件處理
        ExecutorService executor = Executors.newCachedThreadPool();
        // 2.建立工廠
        EventFactory<LongEvent> eventFactory = new LongEventFactory();
        // 3.建立ringBuffer 大小
        int ringBufferSize = 1024 * 1024; // ringBufferSize大小必定要是2的N次方
        // 4.建立Disruptor
        Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(eventFactory, ringBufferSize, executor,
                ProducerType.SINGLE, new YieldingWaitStrategy());
        // 5.鏈接消費端方法
        disruptor.handleEventsWith(new LongEventHandler());
        // 6.啓動
        disruptor.start();
        // 7.建立RingBuffer容器
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
        // 8.建立生產者
        LongEventProducer producer = new LongEventProducer(ringBuffer);
        // 9.指定緩衝區大小
        ByteBuffer byteBuffer = ByteBuffer.allocate(8);
        for (int i = 1; i <= 100; i++) {
            byteBuffer.putLong(0, i);
            producer.onData(byteBuffer);
        }
        //10.關閉disruptor和executor
        disruptor.shutdown();
        executor.shutdown();
    }

}

4、什麼是ringbuffer

它是一個環(首尾相接的環),你能夠把它用作在不一樣上下文(線程)間傳遞數據的buffer。

file

基原本說,ringbuffer擁有一個序號,這個序號指向數組中下一個可用的元素。(校對注:以下圖右邊的圖片表示序號,這個序號指向數組的索引4的位置。)

file

隨着你不停地填充這個buffer(可能也會有相應的讀取),這個序號會一直增加,直到繞過這個環。

file

要找到數組中當前序號指向的元素,能夠經過mod操做:
以上面的ringbuffer爲例(java的mod語法):12 % 10 = 2。很簡單吧。 事實上,上圖中的ringbuffer只有10個槽徹底是個意外。若是槽的個數是2的N次方更有利於基於二進制

優勢

之因此ringbuffer採用這種數據結構,是由於它在可靠消息傳遞方面有很好的性能。這就夠了,不過它還有一些其餘的優勢。

首先,由於它是數組,因此要比鏈表快,並且有一個容易預測的訪問模式。(譯者注:數組內元素的內存地址的連續性存儲的)。這是對CPU緩存友好的—也就是說,在硬件級別,數組中的元素是會被預加載的,所以在ringbuffer當中,cpu無需時不時去主存加載數組中的下一個元素。(校對注:由於只要一個元素被加載到緩存行,其餘相鄰的幾個元素也會被加載進同一個緩存行)

其次,你能夠爲數組預先分配內存,使得數組對象一直存在(除非程序終止)。這就意味着不須要花大量的時間用於垃圾回收。此外,不像鏈表那樣,須要爲每個添加到其上面的對象創造節點對象—對應的,當刪除節點時,須要執行相應的內存清理操做。

RingBuffer底層實現

RingBuffer是一個首尾相連的環形數組,所謂首尾相連,是指當RingBuffer上的指針越過數組是上界後,繼續從數組頭開始遍歷。所以,RingBuffer中至少有一個指針,來表示RingBuffer中的操做位置。另外,指針的自增操做須要作併發控制,Disruptor和本文的OptimizedQueue都使用CAS的樂觀併發控制來保證指針自增的原子性。

Disruptor中的RingBuffer上只有一個指針,表示當前RingBuffer上消息寫到了哪裏,此外,每一個消費者會維護一個sequence表示本身在RingBuffer上讀到哪裏,從這個角度講,Disruptor中的RingBuffer上實際有消費者數+1個指針。因爲咱們要實現的是一個單消息單消費的阻塞隊列,只要維護一個讀指針(對應消費者)和一個寫指針(對應生產者)便可,不管哪一個指針,每次讀寫操做後都自增一次,一旦越界,即從數組頭開始繼續讀寫

5、Disruptor的核心概念

RingBuffer

如其名,環形的緩衝區。曾經 RingBuffer 是 Disruptor 中的最主要的對象,但從3.0版本開始,其職責被簡化爲僅僅負責對經過 Disruptor 進行交換的數據(事件)進行存儲和更新。在一些更高級的應用場景中,Ring Buffer 能夠由用戶的自定義實現來徹底替代。

SequenceDisruptor

經過順序遞增的序號來編號管理經過其進行交換的數據(事件),對數據(事件)的處理過程老是沿着序號逐個遞增處理。一個 Sequence 用於跟蹤標識某個特定的事件處理者( RingBuffer/Consumer )的處理進度。雖然一個 AtomicLong 也能夠用於標識進度,但定義 Sequence 來負責該問題還有另外一個目的,那就是防止不一樣的 Sequence 之間的CPU緩存僞共享(Flase Sharing)問題。

Sequencer

Sequencer 是 Disruptor 的真正核心。此接口有兩個實現類 SingleProducerSequencer、MultiProducerSequencer ,它們定義在生產者和消費者之間快速、正確地傳遞數據的併發算法。

Sequence Barrier

用於保持對RingBuffer的 main published Sequence 和Consumer依賴的其它Consumer的 Sequence 的引用。 Sequence Barrier 還定義了決定 Consumer 是否還有可處理的事件的邏輯。

Wait Strategy

定義 Consumer 如何進行等待下一個事件的策略。 (注:Disruptor 定義了多種不一樣的策略,針對不一樣的場景,提供了不同的性能表現)

Event

在 Disruptor 的語義中,生產者和消費者之間進行交換的數據被稱爲事件(Event)。它不是一個被 Disruptor 定義的特定類型,而是由 Disruptor 的使用者定義並指定。

EventProcessor

EventProcessor 持有特定消費者(Consumer)的 Sequence,並提供用於調用事件處理實現的事件循環(Event Loop)。

EventHandler

Disruptor 定義的事件處理接口,由用戶實現,用於處理事件,是 Consumer 的真正實現。

Producer

即生產者,只是泛指調用 Disruptor 發佈事件的用戶代碼,Disruptor 沒有定義特定接口或類型。

各概念的做用

  • RingBuffer——Disruptor底層數據結構實現,核心類,是線程間交換數據的中轉地;
  • Sequencer——序號管理器,負責消費者/生產者各自序號、序號柵欄的管理和協調;
  • Sequence——序號,聲明一個序號,用於跟蹤ringbuffer中任務的變化和消費者的消費狀況;
  • SequenceBarrier——序號柵欄,管理和協調生產者的遊標序號和各個消費者的序號,確保生產者不會覆蓋消費者將來得及處理的消息,確保存在依賴的消費者之間可以按照正確的順序處理;
  • EventProcessor——事件處理器,監聽RingBuffer的事件,並消費可用事件,從RingBuffer讀取的事件會交由實際的生產者實現類來消費;它會一直偵聽下一個可用的序號,直到該序號對應的事件已經準備好。
  • EventHandler——業務處理器,是實際消費者的接口,完成具體的業務邏輯實現,第三方實現該接口;表明着消費者。
  • Producer——生產者接口,第三方線程充當該角色,producer向RingBuffer寫入事件。
我的博客 蝸牛
相關文章
相關標籤/搜索