高性能隊列disruptor淺析

最近翻看技術文章,發現一個叫作disruptor的高性能內存隊列佔據了頭版頭條,好奇其高性能的祕密,因而對該框架進行了一些簡單的研究。disruptor並不是新出,而是從2013年到如今經歷了3個大版本的迭代。本文有以下幾個部分:html

  1. disruptor如何使用?
  2. disruptor高性能的祕密?

如何使用

核心概念:git

  1. EventFactory:用於生成一個事件提供消費者消費
  2. EventHandler:消費者處理器
  3. 事件和生產者:無內部接口,能夠本身封裝實體對象和RingBuffer
  4. RingBuffer:核心數據結構,能夠理解爲環形緩衝,disruptor高性能祕密
  5. WaitStrategy:消費者阻塞時候的等待策略:SleepingWaitStrategy,BlockingWaitStrategy,BusySpinWaitStrategy,YieldingWaitStrategy
  6. Disruptor:disruptor框架入口,進行相關配置

事件和工廠

public static class LogEvent {
    String content;
    public void setContent(String content) {
        this.content = content;
    }
}
public static class LogEventFactory implements EventFactory<LogEvent> {
    @Override
    public LogEvent newInstance() {
        return new LogEvent();
    }
}

消費者

public static class LogEventHandler implements EventHandler<LogEvent> {
    @Override
    public void onEvent(LogEvent event, long sequence, boolean endOfBatch) throws Exception {
        System.out.println(Thread.currentThread().getName() + ",seq=" + sequence + ",event=" + event.content);
    }
}

生產者

public static class LogEventProducer {
    private final RingBuffer<LogEvent> ringBuffer;
    public LogEventProducer(RingBuffer<LogEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }
    public void onData(String content) {
        long seq = ringBuffer.next();
        try {
            LogEvent logEvent = ringBuffer.get(seq);
            logEvent.setContent(content);
        } finally {
            ringBuffer.publish(seq);
        }
    }
}
public static void main(String[] args) {
    LogEventFactory eventFactory = new LogEventFactory();
		
    // 緩衝區大小
    int bufferSize = 16;
    // ProducerType有single和multi之分,分別對應不一樣的RingBuffer實現,性能不一樣
    // WaitStrategy對應消費者阻塞時的處理策略
    Disruptor<LogEvent> disruptor = new Disruptor<>(eventFactory, bufferSize,
        (ThreadFactory)Thread::new,
        ProducerType.MULTI,
        new BlockingWaitStrategy());
			
    //掛載消費者,能夠有多個消費者,並能夠經過Group自由連接組合關係
    disruptor.handleEventsWith(new LogEventHandler());
    disruptor.start();
		
    RingBuffer<LogEvent> ringBuffer = disruptor.getRingBuffer();
    LogEventProducer producer = new LogEventProducer(ringBuffer);
    for (int i = 0; i < 20; i++) {
        producer.onData("this is a log event i=" + i);
    }
}

高性能的祕密

以下是高性能的祕密,我我的理解和簡述:github

lock-free

ArrayBlockingQueue使用lock實現併發控制,不管get或者put,都是將final ReentrantLock lock;進行上鎖,毋庸置疑當c/p大量併發時會有性能損失。緩存

相比,disruptor經過’分離競爭點’解決大量cp併發問題。即當:數據結構

1p(product)-1c(consume):無特殊
1p(product)-多c(consume):每一個消費者使用獨立的seq,即分離競爭點。disruptor不會清理數據,循環覆蓋。
多p(product)-多c(consume):多p之間採用cas操做。內部實現AtomicLong
多p(product)-1c(consume):與3同樣
這種分離競爭點技術思想,也用於LongAdder以及ConcurrentHashMap併發

僞共享和緩存行填充

L1cache是計算機體系結構中重要組成部分,咱們都知道L1Cache的做用:緩存memory熱點數據,以加快cpu訪問。在多核時代,因爲不一樣cpu的緩存中擁有相同內存區域的不一樣副本,所以,L1Cache之間的同步是有其成本的,這須要使用內存控制器以及MESI協議。緩存以行做爲最小加載單元(64byte=8個long),如上圖,當隊列首位指針(disruptor使用seq),被加載到相同的緩存行的時候,c1處理生產,c2處理消費,而首位指針實際上是獨立的但卻進行了共享(僞共享),致使L1Cache的緩存行刷新。框架

解決辦法很明顯,h,t分別放到不一樣的緩存行上。查看源碼時有以下代碼:ide

性能優越性

給一個官方disruptor與ArrayBlockingQueue的性能測試結果:性能

參考連接

http://ifeve.com/disruptor/
http://www.cnblogs.com/cyfonly/p/5800758.html
https://github.com/LMAX-Exchange/disruptor測試

相關文章
相關標籤/搜索