Disruptor是一個高性能的異步處理框架,或者能夠認爲是線程間通訊的高效低延時的內存消息組件,它最大特色是高性能,其LMAX架構能夠得到每秒6百萬訂單,用1微秒的延遲得到吞吐量爲100K+。
它是如何實現高性能的呢?它因爲JDK內置的隊列有什麼區別呢?java
咱們知道,Java內置了幾種內存消息隊列,以下所示:算法
隊列 | 加鎖方式 | 是否有界 | 數據結構 |
---|---|---|---|
ArrayBlockingQueue | 加鎖 | 有界 | ArrayList |
LinkedBlockingQueue | 加鎖 | 無界 | LinkedList |
ConcurrentLinkedQueue | CAS | 無界 | LinkedList |
LinkedTransferQueue | CAS | 無界 | LinkedList |
咱們知道CAS算法比經過加鎖實現同步性能高不少,而上表能夠看出基於CAS實現的隊列都是無界的,而有界隊列是經過同步實現的。在系統穩定性要求比較高的場景下,爲了防止生產者速度過快,若是採用無界隊列會最終致使內存溢出,只能選擇有界隊列。而有界隊列只有ArrayBlockingQueue
,該隊列是經過加鎖實現的,在請求鎖和釋放鎖時對性能開銷很大,這時候基於有界隊列的高性能的Disruptor就應運而生。編程
Disruptor實現高性能主要體現了去掉了鎖,採用CAS算法,同時內部經過環形隊列實現有界隊列。數組
當前業界開源組件使用Disruptor的包括Log4j二、Apache Storm等,它能夠用來做爲高性能的有界內存隊列,基於生產者消費者模式,實現一個/多個生產者對應多個消費者。它也能夠認爲是觀察者模式的一種實現,或者發佈訂閱模式。緩存
同時,Disruptor還容許開發者使用多線程技術去建立基於任務的工做流。Disruptor能用來並行建立任務,同時保證多個處理過程的有序性,而且它是沒有鎖的。安全
使用Disruptor,主要用於對性能要求高、延遲低的場景,它經過「榨乾」機器的性能來換取處理的高性能。若是你的項目有對性能要求高,對延遲要求低的需求,而且須要一個無鎖的有界隊列,來實現生產者/消費者模式,那麼Disruptor是你的不二選擇。數據結構
要學會基於Disruptor進行編程,咱們先了解下大概流程示意圖,其中綠色部分是表示咱們須要編寫和實現的類。多線程
import com.lmax.disruptor.BlockingWaitStrategy; import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.EventTranslatorOneArg; import com.lmax.disruptor.ExceptionHandler; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; import java.util.concurrent.ThreadFactory; /** * @author: chuanyi.huang **/ public class DisruptorTest { /* * 消息事件類 */ public static class MessageEvent { /** * 原始消息 */ private String message; public String getMessage() { return message; } public void setMessage(String message) { this.message = message; } } /* * 消息事件工廠類 */ public static class MessageEventFactory implements EventFactory<MessageEvent> { public MessageEvent newInstance() { return new MessageEvent(); } } /* * 消息轉換類, 負責將消息轉換成事件 */ public static class MessageEventTranslator implements EventTranslatorOneArg<MessageEvent, String> { public void translateTo(MessageEvent messageEvent, long l, String s) { messageEvent.setMessage(s); } } /* * 消費者線程工廠類 */ public static class MessageThreadFactory implements ThreadFactory { public Thread newThread(Runnable r) { return new Thread(r, "Simple Discruptor Test Thread"); } } /* * 消息事件處理類 */ public static class MessageEventHandler implements EventHandler<MessageEvent> { public void onEvent(MessageEvent messageEvent, long l, boolean b) throws Exception { System.out.println(messageEvent.getMessage()); } } /* * 異常處理類 */ public static class MessageExceptionHandler implements ExceptionHandler<MessageEvent> { public void handleEventException(Throwable throwable, long l, MessageEvent messageEvent) { throwable.printStackTrace(); } public void handleOnStartException(Throwable throwable) { throwable.printStackTrace(); } public void handleOnShutdownException(Throwable throwable) { throwable.printStackTrace(); } } /* * 消息生產者類 */ public static class MessageEventProducer { private RingBuffer<MessageEvent> ringBuffer; public MessageEventProducer(RingBuffer<MessageEvent> ringBuffer) { this.ringBuffer = ringBuffer; } /** * 將接收到的消息輸出到ringBuffer */ public void onData(String message) { EventTranslatorOneArg<MessageEvent, String> translator = new MessageEventTranslator(); ringBuffer.publishEvent(translator, message); } } public static void main(String[] args) { String message = "Hello Disruptor"; int ringBufferSize = 1024; Disruptor<MessageEvent> disruptor = new Disruptor<MessageEvent>(new MessageEventFactory(), ringBufferSize, new MessageThreadFactory(), ProducerType.SINGLE, new BlockingWaitStrategy()); disruptor.handleEventsWith(new MessageEventHandler()); disruptor.setDefaultExceptionHandler(new MessageExceptionHandler()); RingBuffer<MessageEvent> start = disruptor.start(); MessageEventProducer messageEventProducer = new MessageEventProducer(start); messageEventProducer.onData(message); } }