Disruptor的使用

..................2015年的第一天...................html

本文代碼託管在 https://github.com/hupengcool/disruptor-starterjava

Intruduction

關於吹牛逼的話就不說了。。。Disruptor是Java實現的用於線程間通訊的消息組件。其核心是一個Lock-free的Ringbuffer,Disruptor使用CAS而不是Lock。與大部分併發隊列使用的Lock相比,CAS顯然要快不少。CAS是CPU級別的指令,更加輕量,不須要像Lock同樣須要OS的支持,因此每次調用不須要kernel entry,也不須要context switch。固然,使用CAS的代價是Disruptor實現的複雜程度也相對提升了。git

Component

Sequence

Sequence是Disruptor最核心的組件,上面已經提到過了。生產者對RingBuffer的互斥訪問,生產者與消費者之間的協調以及消費者之間的協調,都是經過Sequence實現。幾乎每個重要的組件都包含Sequence。那麼Sequence是什麼呢?首先Sequence是一個遞增的序號,說白了就是計數器;其次,因爲須要在線程間共享,因此Sequence是引用傳遞,而且是線程安全的;再次,Sequence支持CAS操做;最後,爲了提升效率,Sequence經過padding來避免僞共享。github

RingBuffer

RingBuffer是存儲消息的地方,經過一個名爲cursor的Sequence對象指示隊列的頭,協調多個生產者向RingBuffer中添加消息,並用於在消費者端判斷RingBuffer是否爲空。巧妙的是,表示隊列尾的Sequence並無在RingBuffer中,而是由消費者維護。這樣的好處是多個消費者處理消息的方式更加靈活,能夠在一個RingBuffer上實現消息的單播,多播,流水線以及它們的組合。其缺點是在生產者端判斷RingBuffer是否已盡是須要跟蹤更多的信息,爲此,在RingBuffer中維護了一個名爲gatingSequences的Sequence數組來跟蹤相關Seqence。api

SequenceBarrier

SequenceBarrier用來在消費者之間以及消費者和RingBuffer之間創建依賴關係。在Disruptor中,依賴關係實際上指的是Sequence的大小關係,消費者A依賴於消費者B指的是消費者A的Sequence必定要小於等於消費者B的Sequence,這種大小關係決定了處理某個消息的前後順序。由於全部消費者都依賴於RingBuffer,因此消費者的Sequence必定小於等於RingBuffer中名爲cursor的Sequence,即消息必定是先被生產者放到Ringbuffer中,而後才能被消費者處理。數組

SequenceBarrier在初始化的時候會收集須要依賴的組件的Sequence,RingBuffer的cursor會被自動的加入其中。須要依賴其餘消費者和/或RingBuffer的消費者在消費下一個消息時,會先等待在SequenceBarrier上,直到全部被依賴的消費者和RingBuffer的Sequence大於等於這個消費者的Sequence。當被依賴的消費者或RingBuffer的Sequence有變化時,會通知SequenceBarrier喚醒等待在它上面的消費者。安全

WaitStrategy

當消費者等待在SequenceBarrier上時,有許多可選的等待策略,不一樣的等待策略在延遲和CPU資源的佔用上有所不一樣,能夠視應用場景選擇:併發

BusySpinWaitStrategy : 自旋等待,相似Linux Kernel使用的自旋鎖。低延遲但同時對CPU資源的佔用也多。框架

BlockingWaitStrategy : 使用鎖和條件變量。CPU資源的佔用少,延遲大。ide

SleepingWaitStrategy : 在屢次循環嘗試不成功後,選擇讓出CPU,等待下次調度,屢次調度後仍不成功,嘗試前睡眠一個納秒級別的時間再嘗試。這種策略平衡了延遲和CPU資源佔用,但延遲不均勻。

YieldingWaitStrategy : 在屢次循環嘗試不成功後,選擇讓出CPU,等待下次調。平衡了延遲和CPU資源佔用,但延遲也比較均勻。

PhasedBackoffWaitStrategy : 上面多種策略的綜合,CPU資源的佔用少,延遲大。

BatchEvenProcessor

在Disruptor中,消費者是以EventProcessor的形式存在的。其中一類消費者是BatchEvenProcessor。每一個BatchEvenProcessor有一個Sequence,來記錄本身消費RingBuffer中消息的狀況。因此,一個消息必然會被每個BatchEvenProcessor消費。

WorkProcessor

另外一類消費者是WorkProcessor。每一個WorkProcessor也有一個Sequence,多個WorkProcessor還共享一個Sequence用於互斥的訪問RingBuffer。一個消息被一個WorkProcessor消費,就不會被共享一個Sequence的其餘WorkProcessor消費。這個被WorkProcessor共享的Sequence至關於尾指針。

WorkerPool

共享同一個Sequence的WorkProcessor可由一個WorkerPool管理,這時,共享的Sequence也由WorkerPool建立。

Use Cases

下面以Disruptor 3.3.0版本爲例介紹Disruptor的初級使用,本文並無用那些比較原始的API,若是想知道上面寫的一些api如何使用,能夠參考 https://github.com/LMAX-Exchange/disruptor/tree/master/src/perftest/java/com/lmax/disruptor 爲了簡化使用,框架提供Disruptor類來簡化使用,下面主要是使用這個類來演示。
首先定義一個Event:

/**
 * Created by hupeng on 2015/1/1.
 */
public class MyEvent {

    private long value;

    public void setValue(long value) {
        this.value = value;
    }

    @Override
    public String toString() {
        return "MyEvent{" +
                "value=" + value +
                '}';
    }
}

而後提供一個EventFactory,RingBuffer經過這factory來初始化在Event。

import com.lmax.disruptor.EventFactory;

/**
 * Created by hupeng on 2015/1/1.
 */
public class MyEventFactory implements EventFactory<MyEvent> {
    @Override
    public MyEvent newInstance() {
        return new MyEvent();
    }
}

而後寫一個Producer類,也就是消息的生產者。

import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.RingBuffer;

/**
 * Created by hupeng on 2015/1/1.
 */
public class MyEventProducer {

    private RingBuffer<MyEvent> ringBuffer;

    public MyEventProducer(RingBuffer<MyEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    private static final EventTranslatorOneArg TRANSLATOR = new EventTranslatorOneArg<MyEvent, Long>() {

        @Override
        public void translateTo(MyEvent event, long sequence, Long value) {
            event.setValue(value);
        }
    };
    
    public void onData(final Long value) {
        ringBuffer.publishEvent(TRANSLATOR,value);
    }
}

而後寫一個EventHandler。這個就是咱們定義怎麼處理消息的地方。

import com.lmax.disruptor.EventHandler;

/**
 * Created by hupeng on 2015/1/1.
 */
public class MyEventHandler implements EventHandler<MyEvent> {
    @Override
    public void onEvent(MyEvent event, long sequence, boolean endOfBatch) throws Exception {
        System.out.println(event);
    }
}

主程序:

import com.lmax.disruptor.IgnoreExceptionHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import disruptor.starter.support.*;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class MyEventMain {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(2);

        int bufferSize = 1024;

        Disruptor<MyEvent> disruptor = new Disruptor<MyEvent>(new MyEventFactory(),
                bufferSize, executorService, ProducerType.SINGLE, new YieldingWaitStrategy());
        disruptor.handleExceptionsWith(new IgnoreExceptionHandler());

        disruptor.handleEventsWith(new MyEventHandler(),new MyEventHandler());
//        disruptor.handleEventsWith(new MyEventHandler()).then(new MyEventHandler());  //Pipeline
        RingBuffer<MyEvent> ringBuffer = disruptor.start();

        MyEventProducer producer = new MyEventProducer(ringBuffer);
        for (long i = 0; i < 10; i++) {
            producer.onData(i);
            Thread.sleep(1000);// wait for task execute....
        }

        disruptor.shutdown();

        ExecutorsUtils.shutdownAndAwaitTermination(executorService, 60, TimeUnit.SECONDS);
    }
}

在這個例子中輸出

MyEvent{value=0}
MyEvent{value=0}
MyEvent{value=1}
MyEvent{value=1}
MyEvent{value=2}
MyEvent{value=2}
MyEvent{value=3}
MyEvent{value=3}
MyEvent{value=4}
MyEvent{value=4}
MyEvent{value=5}
MyEvent{value=5}
MyEvent{value=6}
MyEvent{value=6}
MyEvent{value=7}
MyEvent{value=7}
MyEvent{value=8}
MyEvent{value=8}
MyEvent{value=9}
MyEvent{value=9}

能夠看出每一個MyEventHandler(implements EventHandler)都會處理同一條消息。另外咱們還可使用相似:

disruptor.handleEventsWith(new MyEventHandler()).then(new MyEventHandler())

這樣的方法來定義依賴關係,好比先執行哪一個handler再執行哪一個handler。其餘好比and()詳情見api
若是咱們想定義多個handler,可是同時只有一個handler處理某一條消息。能夠實現WorkHandler來定義handler:

import com.lmax.disruptor.WorkHandler;

/**
 * Created by hupeng on 2015/1/1.
 */
public class MyEventWorkHandler implements WorkHandler<MyEvent> {

    private String workerName;

    public MyEventWorkHandler(String workerName) {
        this.workerName = workerName;
    }

    @Override
    public void onEvent(MyEvent event) throws Exception {
        System.out.println(workerName + " handle event:" + event);
    }
}

這時候咱們改一下咱們的主程序:

public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(2);

        int bufferSize = 1024;

        Disruptor<MyEvent> disruptor = new Disruptor<MyEvent>(new MyEventFactory(),
                bufferSize, executorService, ProducerType.SINGLE, new YieldingWaitStrategy());
        disruptor.handleExceptionsWith(new IgnoreExceptionHandler());
        disruptor.handleEventsWithWorkerPool(new MyEventWorkHandler("worker-1"),new MyEventWorkHandler("worker-2"));
        RingBuffer<MyEvent> ringBuffer = disruptor.start();

        MyEventProducer producer = new MyEventProducer(ringBuffer);
        for (long i = 0; i < 10; i++) {
            producer.onData(i);
            Thread.sleep(1000);// wait for task execute....
        }

        disruptor.shutdown();

        ExecutorsUtils.shutdownAndAwaitTermination(executorService, 60, TimeUnit.SECONDS);

    }

這時候咱們能夠看到輸出是這樣的:

worker-1 handle event:MyEvent{value=0}
worker-2 handle event:MyEvent{value=1}
worker-1 handle event:MyEvent{value=2}
worker-2 handle event:MyEvent{value=3}
worker-1 handle event:MyEvent{value=4}
worker-2 handle event:MyEvent{value=5}
worker-1 handle event:MyEvent{value=6}
worker-2 handle event:MyEvent{value=7}
worker-1 handle event:MyEvent{value=8}
worker-2 handle event:MyEvent{value=9}

一條消息只被一個handler處理。

這裏的ExecutorsUtils就是寫的一個關閉ExecutorService的方法

import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

public class ExecutorsUtils {

    public static  void shutdownAndAwaitTermination(ExecutorService pool,int timeout,TimeUnit unit) {
        pool.shutdown(); // Disable new tasks from being submitted
        try {
            // Wait a while for existing tasks to terminate
            if (!pool.awaitTermination(timeout/2, unit)) {
                pool.shutdownNow(); // Cancel currently executing tasks
                // Wait a while for tasks to respond to being cancelled
                if (!pool.awaitTermination(timeout/2, unit))
                    System.err.println("Pool did not terminate");
            }
        } catch (InterruptedException ie) {
            // (Re-)Cancel if current thread also interrupted
            pool.shutdownNow();
            // Preserve interrupt status
            Thread.currentThread().interrupt();
        }
    }
}

概念部分來自http://ziyue1987.github.io/pages/2013/09/22/disruptor-use-manual.html ,若是想對這個框架有更一步瞭解,能夠點進去看看,能夠參考源代碼。

相關文章
相關標籤/搜索