Disruptor學習之路(一)

Disruptor是什麼?

    Disruptor是一個高性能的異步處理框架,或者能夠認爲是線程間通訊的高效低延時的內存消息組件,它最大特色是高性能,其LMAX架構能夠得到每秒6百萬訂單,用1微秒的延遲得到吞吐量爲100K+。
    它是如何實現高性能的呢?它因爲JDK內置的隊列有什麼區別呢?java

JDK內置內存隊列?

咱們知道,Java內置了幾種內存消息隊列,以下所示:算法

隊列 加鎖方式 是否有界 數據結構
ArrayBlockingQueue 加鎖 有界 ArrayList
LinkedBlockingQueue 加鎖 無界 LinkedList
ConcurrentLinkedQueue CAS 無界 LinkedList
LinkedTransferQueue CAS 無界 LinkedList

    咱們知道CAS算法比經過加鎖實現同步性能高不少,而上表能夠看出基於CAS實現的隊列都是無界的,而有界隊列是經過同步實現的。在系統穩定性要求比較高的場景下,爲了防止生產者速度過快,若是採用無界隊列會最終致使內存溢出,只能選擇有界隊列。而有界隊列只有ArrayBlockingQueue,該隊列是經過加鎖實現的,在請求鎖和釋放鎖時對性能開銷很大,這時候基於有界隊列的高性能的Disruptor就應運而生。編程

Disruptor如何實現高性能?

Disruptor實現高性能主要體現了去掉了鎖,採用CAS算法,同時內部經過環形隊列實現有界隊列。數組

  • 環形數據結構
    爲了不垃圾回收,採用數組而非鏈表。同時,數組對處理器的緩存機制更加友好。
  • 元素位置定位
    數組長度2^n,經過位運算,加快定位的速度。下標採起遞增的形式。不用擔憂index溢出的問題。index是long類型,即便100萬QPS的處理速度,也須要30萬年才能用完。
  • 無鎖設計
    每一個生產者或者消費者線程,會先申請能夠操做的元素在數組中的位置,申請到以後,直接在該位置寫入或者讀取數據。整個過程經過原子變量CAS,保證操做的線程安全。

Disruptor能夠用來作什麼?

    當前業界開源組件使用Disruptor的包括Log4j二、Apache Storm等,它能夠用來做爲高性能的有界內存隊列,基於生產者消費者模式,實現一個/多個生產者對應多個消費者。它也能夠認爲是觀察者模式的一種實現,或者發佈訂閱模式。緩存

同時,Disruptor還容許開發者使用多線程技術去建立基於任務的工做流。Disruptor能用來並行建立任務,同時保證多個處理過程的有序性,而且它是沒有鎖的。安全

爲何要使用Disruptor?

    使用Disruptor,主要用於對性能要求高、延遲低的場景,它經過「榨乾」機器的性能來換取處理的高性能。若是你的項目有對性能要求高,對延遲要求低的需求,而且須要一個無鎖的有界隊列,來實現生產者/消費者模式,那麼Disruptor是你的不二選擇。數據結構

怎麼用Disruptor?

    要學會基於Disruptor進行編程,咱們先了解下大概流程示意圖,其中綠色部分是表示咱們須要編寫和實現的類。多線程

 

import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;

import java.util.concurrent.ThreadFactory;

/**
 * @author: chuanyi.huang
 **/
public class DisruptorTest {
    /*
     * 消息事件類
     */
    public static class MessageEvent {
        /**
         * 原始消息
         */
        private String message;

        public String getMessage() {
            return message;
        }

        public void setMessage(String message) {
            this.message = message;
        }
    }

    /*
     * 消息事件工廠類
     */
    public static class MessageEventFactory implements EventFactory<MessageEvent> {

        public MessageEvent newInstance() {
            return new MessageEvent();
        }
    }

    /*
     * 消息轉換類, 負責將消息轉換成事件
     */
    public static class MessageEventTranslator implements EventTranslatorOneArg<MessageEvent, String> {
        public void translateTo(MessageEvent messageEvent, long l, String s) {
            messageEvent.setMessage(s);
        }
    }

    /*
     * 消費者線程工廠類
     */
    public static class MessageThreadFactory implements ThreadFactory {

        public Thread newThread(Runnable r) {
            return new Thread(r, "Simple Discruptor Test Thread");
        }
    }

    /*
     * 消息事件處理類
     */
    public static class MessageEventHandler implements EventHandler<MessageEvent> {

        public void onEvent(MessageEvent messageEvent, long l, boolean b) throws Exception {
            System.out.println(messageEvent.getMessage());
        }
    }

    /*
     * 異常處理類
     */
    public static class MessageExceptionHandler implements ExceptionHandler<MessageEvent> {

        public void handleEventException(Throwable throwable, long l, MessageEvent messageEvent) {
            throwable.printStackTrace();
        }

        public void handleOnStartException(Throwable throwable) {
            throwable.printStackTrace();
        }

        public void handleOnShutdownException(Throwable throwable) {
            throwable.printStackTrace();
        }
    }

    /*
     * 消息生產者類
     */
    public static class MessageEventProducer {
        private RingBuffer<MessageEvent> ringBuffer;

        public MessageEventProducer(RingBuffer<MessageEvent> ringBuffer) {
            this.ringBuffer = ringBuffer;
        }

        /**
         * 將接收到的消息輸出到ringBuffer
         */
        public void onData(String message) {
            EventTranslatorOneArg<MessageEvent, String> translator =
                    new MessageEventTranslator();
            ringBuffer.publishEvent(translator, message);
        }
    }

    public static void main(String[] args) {
        String message = "Hello Disruptor";
        int ringBufferSize = 1024;
        Disruptor<MessageEvent> disruptor =
                new Disruptor<MessageEvent>(new MessageEventFactory(), ringBufferSize, new MessageThreadFactory(), ProducerType.SINGLE, new BlockingWaitStrategy());
        disruptor.handleEventsWith(new MessageEventHandler());
        disruptor.setDefaultExceptionHandler(new MessageExceptionHandler());
        RingBuffer<MessageEvent> start = disruptor.start();
        MessageEventProducer messageEventProducer = new MessageEventProducer(start);
        messageEventProducer.onData(message);
    }
}
相關文章
相關標籤/搜索