Martin Fowler在本身網站上寫了一篇LMAX架構的文章,在文章中他介紹了LMAX是一種新型零售金融交易平臺,它可以以很低的延遲產生大量交易。這個系統是創建在JVM平臺上,其核心是一個業務邏輯處理器,它可以在一個線程裏每秒處理6百萬訂單。業務邏輯處理器徹底是運行在內存中,使`用事件源驅動方式。業務邏輯處理器的核心是Disruptor。java
Disruptor它是一個開源的併發框架,並得到2011 Duke’s 程序框架創新獎,可以在無鎖的狀況下實現網絡的Queue併發操做。算法
Disruptor是一個高性能的異步處理框架,或者能夠認爲是最快的消息框架(輕量的JMS),也能夠認爲是一個觀察者模式的實現,或者事件監聽模式的實現。segmentfault
在使用以前,首先說明disruptor主要功能加以說明,你能夠理解爲他是一種高效的"生產者-消費者"模型。也就性能遠遠高於傳統的BlockingQueue容器。數組
在JDK的多線程與併發庫一文中, 提到了BlockingQueue實現了生產者-消費者模型
BlockingQueue是基於鎖實現的, 而鎖的效率一般較低. 有沒有使用CAS機制實現的生產者-消費者緩存
Disruptor使用觀察者模式, 主動將消息發送給消費者, 而不是等消費者從隊列中取; 在無鎖的狀況下, 實現queue(環形, RingBuffer)的併發操做, 性能遠高於BlockingQueue網絡
Disruptor經過如下設計來解決隊列速度慢的問題:數據結構
爲了不垃圾回收,採用數組而非鏈表。同時,數組對處理器的緩存機制更加友好。多線程
數組長度2^n,經過位運算,加快定位的速度。下標採起遞增的形式。不用擔憂index溢出的問題。index是long類型,即便100萬QPS的處理速度,也須要30萬年才能用完。架構
每一個生產者或者消費者線程,會先申請能夠操做的元素在數組中的位置,申請到以後,直接在該位置寫入或者讀取數據。併發
<dependencies> <dependency> <groupId>com.lmax</groupId> <artifactId>disruptor</artifactId> <version>3.2.1</version> </dependency> </dependencies>
//定義事件event 經過Disruptor 進行交換的數據類型。 public class LongEvent { private Long value; public Long getValue() { return value; } public void setValue(Long value) { this.value = value; } }
public class LongEventFactory implements EventFactory<LongEvent> { public LongEvent newInstance() { return new LongEvent(); } }
public class LongEventHandler implements EventHandler<LongEvent> { public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception { System.out.println("消費者:"+event.getValue()); } }
public class LongEventProducer { public final RingBuffer<LongEvent> ringBuffer; public LongEventProducer(RingBuffer<LongEvent> ringBuffer) { this.ringBuffer = ringBuffer; } public void onData(ByteBuffer byteBuffer) { // 1.ringBuffer 事件隊列 下一個槽 long sequence = ringBuffer.next(); Long data = null; try { //2.取出空的事件隊列 LongEvent longEvent = ringBuffer.get(sequence); data = byteBuffer.getLong(0); //3.獲取事件隊列傳遞的數據 longEvent.setValue(data); try { Thread.sleep(10); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } finally { System.out.println("生產這準備發送數據"); //4.發佈事件 ringBuffer.publish(sequence); } } }
public class DisruptorMain { public static void main(String[] args) { // 1.建立一個可緩存的線程 提供線程來出發Consumer 的事件處理 ExecutorService executor = Executors.newCachedThreadPool(); // 2.建立工廠 EventFactory<LongEvent> eventFactory = new LongEventFactory(); // 3.建立ringBuffer 大小 int ringBufferSize = 1024 * 1024; // ringBufferSize大小必定要是2的N次方 // 4.建立Disruptor Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(eventFactory, ringBufferSize, executor, ProducerType.SINGLE, new YieldingWaitStrategy()); // 5.鏈接消費端方法 disruptor.handleEventsWith(new LongEventHandler()); // 6.啓動 disruptor.start(); // 7.建立RingBuffer容器 RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); // 8.建立生產者 LongEventProducer producer = new LongEventProducer(ringBuffer); // 9.指定緩衝區大小 ByteBuffer byteBuffer = ByteBuffer.allocate(8); for (int i = 1; i <= 100; i++) { byteBuffer.putLong(0, i); producer.onData(byteBuffer); } //10.關閉disruptor和executor disruptor.shutdown(); executor.shutdown(); } }
它是一個環(首尾相接的環),你能夠把它用作在不一樣上下文(線程)間傳遞數據的buffer。
基原本說,ringbuffer擁有一個序號,這個序號指向數組中下一個可用的元素。(校對注:以下圖右邊的圖片表示序號,這個序號指向數組的索引4的位置。)
隨着你不停地填充這個buffer(可能也會有相應的讀取),這個序號會一直增加,直到繞過這個環。
要找到數組中當前序號指向的元素,能夠經過mod操做:
以上面的ringbuffer爲例(java的mod語法):12 % 10 = 2。很簡單吧。 事實上,上圖中的ringbuffer只有10個槽徹底是個意外。若是槽的個數是2的N次方更有利於基於二進制
之因此ringbuffer採用這種數據結構,是由於它在可靠消息傳遞方面有很好的性能。這就夠了,不過它還有一些其餘的優勢。
首先,由於它是數組,因此要比鏈表快,並且有一個容易預測的訪問模式。(譯者注:數組內元素的內存地址的連續性存儲的)。這是對CPU緩存友好的—也就是說,在硬件級別,數組中的元素是會被預加載的,所以在ringbuffer當中,cpu無需時不時去主存加載數組中的下一個元素。(校對注:由於只要一個元素被加載到緩存行,其餘相鄰的幾個元素也會被加載進同一個緩存行)
其次,你能夠爲數組預先分配內存,使得數組對象一直存在(除非程序終止)。這就意味着不須要花大量的時間用於垃圾回收。此外,不像鏈表那樣,須要爲每個添加到其上面的對象創造節點對象—對應的,當刪除節點時,須要執行相應的內存清理操做。
RingBuffer是一個首尾相連的環形數組,所謂首尾相連,是指當RingBuffer上的指針越過數組是上界後,繼續從數組頭開始遍歷。所以,RingBuffer中至少有一個指針,來表示RingBuffer中的操做位置。另外,指針的自增操做須要作併發控制,Disruptor和本文的OptimizedQueue都使用CAS的樂觀併發控制來保證指針自增的原子性。
Disruptor中的RingBuffer上只有一個指針,表示當前RingBuffer上消息寫到了哪裏,此外,每一個消費者會維護一個sequence表示本身在RingBuffer上讀到哪裏,從這個角度講,Disruptor中的RingBuffer上實際有消費者數+1個指針。因爲咱們要實現的是一個單消息單消費的阻塞隊列,只要維護一個讀指針(對應消費者)和一個寫指針(對應生產者)便可,不管哪一個指針,每次讀寫操做後都自增一次,一旦越界,即從數組頭開始繼續讀寫
如其名,環形的緩衝區。曾經 RingBuffer 是 Disruptor 中的最主要的對象,但從3.0版本開始,其職責被簡化爲僅僅負責對經過 Disruptor 進行交換的數據(事件)進行存儲和更新。在一些更高級的應用場景中,Ring Buffer 能夠由用戶的自定義實現來徹底替代。
經過順序遞增的序號來編號管理經過其進行交換的數據(事件),對數據(事件)的處理過程老是沿着序號逐個遞增處理。一個 Sequence 用於跟蹤標識某個特定的事件處理者( RingBuffer/Consumer )的處理進度。雖然一個 AtomicLong 也能夠用於標識進度,但定義 Sequence 來負責該問題還有另外一個目的,那就是防止不一樣的 Sequence 之間的CPU緩存僞共享(Flase Sharing)問題。
Sequencer 是 Disruptor 的真正核心。此接口有兩個實現類 SingleProducerSequencer、MultiProducerSequencer ,它們定義在生產者和消費者之間快速、正確地傳遞數據的併發算法。
用於保持對RingBuffer的 main published Sequence 和Consumer依賴的其它Consumer的 Sequence 的引用。 Sequence Barrier 還定義了決定 Consumer 是否還有可處理的事件的邏輯。
定義 Consumer 如何進行等待下一個事件的策略。 (注:Disruptor 定義了多種不一樣的策略,針對不一樣的場景,提供了不同的性能表現)
在 Disruptor 的語義中,生產者和消費者之間進行交換的數據被稱爲事件(Event)。它不是一個被 Disruptor 定義的特定類型,而是由 Disruptor 的使用者定義並指定。
EventProcessor 持有特定消費者(Consumer)的 Sequence,並提供用於調用事件處理實現的事件循環(Event Loop)。
Disruptor 定義的事件處理接口,由用戶實現,用於處理事件,是 Consumer 的真正實現。
即生產者,只是泛指調用 Disruptor 發佈事件的用戶代碼,Disruptor 沒有定義特定接口或類型。
我的博客 蝸牛