..................2015年的第一天...................html
本文代碼託管在 https://github.com/hupengcool/disruptor-starterjava
關於吹牛逼的話就不說了。。。Disruptor是Java實現的用於線程間通訊的消息組件。其核心是一個Lock-free的Ringbuffer,Disruptor使用CAS而不是Lock。與大部分併發隊列使用的Lock相比,CAS顯然要快不少。CAS是CPU級別的指令,更加輕量,不須要像Lock同樣須要OS的支持,因此每次調用不須要kernel entry,也不須要context switch。固然,使用CAS的代價是Disruptor實現的複雜程度也相對提升了。git
Sequence是Disruptor最核心的組件,上面已經提到過了。生產者對RingBuffer的互斥訪問,生產者與消費者之間的協調以及消費者之間的協調,都是經過Sequence實現。幾乎每個重要的組件都包含Sequence。那麼Sequence是什麼呢?首先Sequence是一個遞增的序號,說白了就是計數器;其次,因爲須要在線程間共享,因此Sequence是引用傳遞,而且是線程安全的;再次,Sequence支持CAS操做;最後,爲了提升效率,Sequence經過padding來避免僞共享。github
RingBuffer是存儲消息的地方,經過一個名爲cursor的Sequence對象指示隊列的頭,協調多個生產者向RingBuffer中添加消息,並用於在消費者端判斷RingBuffer是否爲空。巧妙的是,表示隊列尾的Sequence並無在RingBuffer中,而是由消費者維護。這樣的好處是多個消費者處理消息的方式更加靈活,能夠在一個RingBuffer上實現消息的單播,多播,流水線以及它們的組合。其缺點是在生產者端判斷RingBuffer是否已盡是須要跟蹤更多的信息,爲此,在RingBuffer中維護了一個名爲gatingSequences的Sequence數組來跟蹤相關Seqence。api
SequenceBarrier用來在消費者之間以及消費者和RingBuffer之間創建依賴關係。在Disruptor中,依賴關係實際上指的是Sequence的大小關係,消費者A依賴於消費者B指的是消費者A的Sequence必定要小於等於消費者B的Sequence,這種大小關係決定了處理某個消息的前後順序。由於全部消費者都依賴於RingBuffer,因此消費者的Sequence必定小於等於RingBuffer中名爲cursor的Sequence,即消息必定是先被生產者放到Ringbuffer中,而後才能被消費者處理。數組
SequenceBarrier在初始化的時候會收集須要依賴的組件的Sequence,RingBuffer的cursor會被自動的加入其中。須要依賴其餘消費者和/或RingBuffer的消費者在消費下一個消息時,會先等待在SequenceBarrier上,直到全部被依賴的消費者和RingBuffer的Sequence大於等於這個消費者的Sequence。當被依賴的消費者或RingBuffer的Sequence有變化時,會通知SequenceBarrier喚醒等待在它上面的消費者。安全
當消費者等待在SequenceBarrier上時,有許多可選的等待策略,不一樣的等待策略在延遲和CPU資源的佔用上有所不一樣,能夠視應用場景選擇:併發
BusySpinWaitStrategy : 自旋等待,相似Linux Kernel使用的自旋鎖。低延遲但同時對CPU資源的佔用也多。框架
BlockingWaitStrategy : 使用鎖和條件變量。CPU資源的佔用少,延遲大。ide
SleepingWaitStrategy : 在屢次循環嘗試不成功後,選擇讓出CPU,等待下次調度,屢次調度後仍不成功,嘗試前睡眠一個納秒級別的時間再嘗試。這種策略平衡了延遲和CPU資源佔用,但延遲不均勻。
YieldingWaitStrategy : 在屢次循環嘗試不成功後,選擇讓出CPU,等待下次調。平衡了延遲和CPU資源佔用,但延遲也比較均勻。
PhasedBackoffWaitStrategy : 上面多種策略的綜合,CPU資源的佔用少,延遲大。
在Disruptor中,消費者是以EventProcessor的形式存在的。其中一類消費者是BatchEvenProcessor。每一個BatchEvenProcessor有一個Sequence,來記錄本身消費RingBuffer中消息的狀況。因此,一個消息必然會被每個BatchEvenProcessor消費。
另外一類消費者是WorkProcessor。每一個WorkProcessor也有一個Sequence,多個WorkProcessor還共享一個Sequence用於互斥的訪問RingBuffer。一個消息被一個WorkProcessor消費,就不會被共享一個Sequence的其餘WorkProcessor消費。這個被WorkProcessor共享的Sequence至關於尾指針。
共享同一個Sequence的WorkProcessor可由一個WorkerPool管理,這時,共享的Sequence也由WorkerPool建立。
下面以Disruptor 3.3.0版本爲例介紹Disruptor的初級使用,本文並無用那些比較原始的API,若是想知道上面寫的一些api如何使用,能夠參考 https://github.com/LMAX-Exchange/disruptor/tree/master/src/perftest/java/com/lmax/disruptor 爲了簡化使用,框架提供Disruptor類來簡化使用,下面主要是使用這個類來演示。
首先定義一個Event:
/** * Created by hupeng on 2015/1/1. */ public class MyEvent { private long value; public void setValue(long value) { this.value = value; } @Override public String toString() { return "MyEvent{" + "value=" + value + '}'; } }
而後提供一個EventFactory,RingBuffer經過這factory來初始化在Event。
import com.lmax.disruptor.EventFactory; /** * Created by hupeng on 2015/1/1. */ public class MyEventFactory implements EventFactory<MyEvent> { @Override public MyEvent newInstance() { return new MyEvent(); } }
而後寫一個Producer類,也就是消息的生產者。
import com.lmax.disruptor.EventTranslatorOneArg; import com.lmax.disruptor.RingBuffer; /** * Created by hupeng on 2015/1/1. */ public class MyEventProducer { private RingBuffer<MyEvent> ringBuffer; public MyEventProducer(RingBuffer<MyEvent> ringBuffer) { this.ringBuffer = ringBuffer; } private static final EventTranslatorOneArg TRANSLATOR = new EventTranslatorOneArg<MyEvent, Long>() { @Override public void translateTo(MyEvent event, long sequence, Long value) { event.setValue(value); } }; public void onData(final Long value) { ringBuffer.publishEvent(TRANSLATOR,value); } }
而後寫一個EventHandler。這個就是咱們定義怎麼處理消息的地方。
import com.lmax.disruptor.EventHandler; /** * Created by hupeng on 2015/1/1. */ public class MyEventHandler implements EventHandler<MyEvent> { @Override public void onEvent(MyEvent event, long sequence, boolean endOfBatch) throws Exception { System.out.println(event); } }
主程序:
import com.lmax.disruptor.IgnoreExceptionHandler; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.YieldingWaitStrategy; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; import disruptor.starter.support.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; public class MyEventMain { public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(2); int bufferSize = 1024; Disruptor<MyEvent> disruptor = new Disruptor<MyEvent>(new MyEventFactory(), bufferSize, executorService, ProducerType.SINGLE, new YieldingWaitStrategy()); disruptor.handleExceptionsWith(new IgnoreExceptionHandler()); disruptor.handleEventsWith(new MyEventHandler(),new MyEventHandler()); // disruptor.handleEventsWith(new MyEventHandler()).then(new MyEventHandler()); //Pipeline RingBuffer<MyEvent> ringBuffer = disruptor.start(); MyEventProducer producer = new MyEventProducer(ringBuffer); for (long i = 0; i < 10; i++) { producer.onData(i); Thread.sleep(1000);// wait for task execute.... } disruptor.shutdown(); ExecutorsUtils.shutdownAndAwaitTermination(executorService, 60, TimeUnit.SECONDS); } }
在這個例子中輸出
MyEvent{value=0} MyEvent{value=0} MyEvent{value=1} MyEvent{value=1} MyEvent{value=2} MyEvent{value=2} MyEvent{value=3} MyEvent{value=3} MyEvent{value=4} MyEvent{value=4} MyEvent{value=5} MyEvent{value=5} MyEvent{value=6} MyEvent{value=6} MyEvent{value=7} MyEvent{value=7} MyEvent{value=8} MyEvent{value=8} MyEvent{value=9} MyEvent{value=9}
能夠看出每一個MyEventHandler(implements EventHandler)都會處理同一條消息。另外咱們還可使用相似:
disruptor.handleEventsWith(new MyEventHandler()).then(new MyEventHandler())
這樣的方法來定義依賴關係,好比先執行哪一個handler再執行哪一個handler。其餘好比and()詳情見api
若是咱們想定義多個handler,可是同時只有一個handler處理某一條消息。能夠實現WorkHandler來定義handler:
import com.lmax.disruptor.WorkHandler; /** * Created by hupeng on 2015/1/1. */ public class MyEventWorkHandler implements WorkHandler<MyEvent> { private String workerName; public MyEventWorkHandler(String workerName) { this.workerName = workerName; } @Override public void onEvent(MyEvent event) throws Exception { System.out.println(workerName + " handle event:" + event); } }
這時候咱們改一下咱們的主程序:
public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(2); int bufferSize = 1024; Disruptor<MyEvent> disruptor = new Disruptor<MyEvent>(new MyEventFactory(), bufferSize, executorService, ProducerType.SINGLE, new YieldingWaitStrategy()); disruptor.handleExceptionsWith(new IgnoreExceptionHandler()); disruptor.handleEventsWithWorkerPool(new MyEventWorkHandler("worker-1"),new MyEventWorkHandler("worker-2")); RingBuffer<MyEvent> ringBuffer = disruptor.start(); MyEventProducer producer = new MyEventProducer(ringBuffer); for (long i = 0; i < 10; i++) { producer.onData(i); Thread.sleep(1000);// wait for task execute.... } disruptor.shutdown(); ExecutorsUtils.shutdownAndAwaitTermination(executorService, 60, TimeUnit.SECONDS); }
這時候咱們能夠看到輸出是這樣的:
worker-1 handle event:MyEvent{value=0} worker-2 handle event:MyEvent{value=1} worker-1 handle event:MyEvent{value=2} worker-2 handle event:MyEvent{value=3} worker-1 handle event:MyEvent{value=4} worker-2 handle event:MyEvent{value=5} worker-1 handle event:MyEvent{value=6} worker-2 handle event:MyEvent{value=7} worker-1 handle event:MyEvent{value=8} worker-2 handle event:MyEvent{value=9}
一條消息只被一個handler處理。
這裏的ExecutorsUtils就是寫的一個關閉ExecutorService的方法
import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; public class ExecutorsUtils { public static void shutdownAndAwaitTermination(ExecutorService pool,int timeout,TimeUnit unit) { pool.shutdown(); // Disable new tasks from being submitted try { // Wait a while for existing tasks to terminate if (!pool.awaitTermination(timeout/2, unit)) { pool.shutdownNow(); // Cancel currently executing tasks // Wait a while for tasks to respond to being cancelled if (!pool.awaitTermination(timeout/2, unit)) System.err.println("Pool did not terminate"); } } catch (InterruptedException ie) { // (Re-)Cancel if current thread also interrupted pool.shutdownNow(); // Preserve interrupt status Thread.currentThread().interrupt(); } } }
概念部分來自http://ziyue1987.github.io/pages/2013/09/22/disruptor-use-manual.html ,若是想對這個框架有更一步瞭解,能夠點進去看看,能夠參考源代碼。