Disruptor是一個開源框架,研發的初衷是爲了解決高併發下隊列鎖的問題,最先由LMAX提出並使用,可以在無鎖的狀況下實現隊列的併發操做,並號稱可以在一個線程裏每秒處理6百萬筆訂單
html
官網:http://lmax-exchange.github.io/disruptor/java
目前,包括Apache Storm、Camel、Log4j2在內的不少知名項目都應用了Disruptor以獲取高性能git
爲何會產生Disruptor框架
「目前Java內置隊列保證線程安全的方式:」github
ArrayBlockingQueue:基於數組形式的隊列,經過加鎖的方式,來保證多線程狀況下數據的安全;web
LinkedBlockingQueue:基於鏈表形式的隊列,也經過加鎖的方式,來保證多線程狀況下數據的安全;算法
ConcurrentLinkedQueue:基於鏈表形式的隊列,經過CAS的方式編程
咱們知道,在編程過程當中,加鎖一般會嚴重地影響性能,因此儘可能用無鎖方式,就產生了Disruptor這種無鎖高併發框架數組
基本概念
參考地址:https://github.com/LMAX-Exchange/disruptor/wiki/Introduction#core-concepts緩存
RingBuffer——Disruptor底層數據結構實現,核心類,是線程間交換數據的中轉地;安全
Sequencer——序號管理器,生產同步的實現者,負責消費者/生產者各自序號、序號柵欄的管理和協調,Sequencer有單生產者,多生產者兩種不一樣的模式,裏面實現了各類同步的算法;
Sequence——序號,聲明一個序號,用於跟蹤ringbuffer中任務的變化和消費者的消費狀況,disruptor裏面大部分的併發代碼都是經過對Sequence的值同步修改實現的,而非鎖,這是disruptor高性能的一個主要緣由;
SequenceBarrier——序號柵欄,管理和協調生產者的遊標序號和各個消費者的序號,確保生產者不會覆蓋消費者將來得及處理的消息,確保存在依賴的消費者之間可以按照正確的順序處理
EventProcessor——事件處理器,監聽RingBuffer的事件,並消費可用事件,從RingBuffer讀取的事件會交由實際的生產者實現類來消費;它會一直偵聽下一個可用的序號,直到該序號對應的事件已經準備好。
EventHandler——業務處理器,是實際消費者的接口,完成具體的業務邏輯實現,第三方實現該接口;表明着消費者。
Producer——生產者接口,第三方線程充當該角色,producer向RingBuffer寫入事件。
Wait Strategy:Wait Strategy決定了一個消費者怎麼等待生產者將事件(Event)放入Disruptor中。

等待策略
源碼地址:https://github.com/LMAX-Exchange/disruptor/blob/master/src/main/java/com/lmax/disruptor/WaitStrategy.java
「BlockingWaitStrategy」
Disruptor的默認策略是BlockingWaitStrategy。在BlockingWaitStrategy內部是使用鎖和condition來控制線程的喚醒。BlockingWaitStrategy是最低效的策略,但其對CPU的消耗最小而且在各類不一樣部署環境中能提供更加一致的性能表現。
「SleepingWaitStrategy」
SleepingWaitStrategy 的性能表現跟 BlockingWaitStrategy 差很少,對 CPU 的消耗也相似,但其對生產者線程的影響最小,經過使用LockSupport.parkNanos(1)
來實現循環等待。
「YieldingWaitStrategy」
YieldingWaitStrategy是可使用在低延遲系統的策略之一。YieldingWaitStrategy將自旋以等待序列增長到適當的值。在循環體內,將調用Thread.yield()
以容許其餘排隊的線程運行。在要求極高性能且事件處理線數小於 CPU 邏輯核心數的場景中,推薦使用此策略;例如,CPU開啓超線程的特性。
「BusySpinWaitStrategy」
性能最好,適合用於低延遲的系統。在要求極高性能且事件處理線程數小於CPU邏輯核心數的場景中,推薦使用此策略;例如,CPU開啓超線程的特性。
「PhasedBackoffWaitStrategy」
自旋 + yield + 自定義策略,CPU資源緊缺,吞吐量和延遲並不重要的場景。
使用舉例
參考地址:https://github.com/LMAX-Exchange/disruptor/wiki/Getting-Started
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.3.4</version>
</dependency>
//定義事件event 經過Disruptor 進行交換的數據類型。
public class LongEvent {
private Long value;
public Long getValue() {
return value;
}
public void setValue(Long value) {
this.value = value;
}
}
public class LongEventFactory implements EventFactory<LongEvent> {
public LongEvent newInstance() {
return new LongEvent();
}
}
//定義事件消費者
public class LongEventHandler implements EventHandler<LongEvent> {
public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {
System.out.println("消費者:"+event.getValue());
}
}
//定義生產者
public class LongEventProducer {
public final RingBuffer<LongEvent> ringBuffer;
public LongEventProducer(RingBuffer<LongEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void onData(ByteBuffer byteBuffer) {
// 1.ringBuffer 事件隊列 下一個槽
long sequence = ringBuffer.next();
Long data = null;
try {
//2.取出空的事件隊列
LongEvent longEvent = ringBuffer.get(sequence);
data = byteBuffer.getLong(0);
//3.獲取事件隊列傳遞的數據
longEvent.setValue(data);
try {
Thread.sleep(10);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} finally {
System.out.println("生產這準備發送數據");
//4.發佈事件
ringBuffer.publish(sequence);
}
}
}
public class DisruptorMain {
public static void main(String[] args) {
// 1.建立一個可緩存的線程 提供線程來出發Consumer 的事件處理
ExecutorService executor = Executors.newCachedThreadPool();
// 2.建立工廠
EventFactory<LongEvent> eventFactory = new LongEventFactory();
// 3.建立ringBuffer 大小
int ringBufferSize = 1024 * 1024; // ringBufferSize大小必定要是2的N次方
// 4.建立Disruptor
Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(eventFactory, ringBufferSize, executor,
ProducerType.SINGLE, new YieldingWaitStrategy());
// 5.鏈接消費端方法
disruptor.handleEventsWith(new LongEventHandler());
// 6.啓動
disruptor.start();
// 7.建立RingBuffer容器
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
// 8.建立生產者
LongEventProducer producer = new LongEventProducer(ringBuffer);
// 9.指定緩衝區大小
ByteBuffer byteBuffer = ByteBuffer.allocate(8);
for (int i = 1; i <= 100; i++) {
byteBuffer.putLong(0, i);
producer.onData(byteBuffer);
}
//10.關閉disruptor和executor
disruptor.shutdown();
executor.shutdown();
}
}
核心設計原理
Disruptor經過如下設計來解決隊列速度慢的問題:
「環形數組結構:」
爲了不垃圾回收,採用數組而非鏈表。同時,數組對處理器的緩存機制更加友好
❝緣由:CPU緩存是由不少個緩存行組成的。每一個緩存行一般是64字節,而且它有效地引用主內存中的一起地址。一個Java的long類型變量是8字節,所以在一個緩存行中能夠存8個long類型的變量。CPU每次從主存中拉取數據時,會把相鄰的數據也存入同一個緩存行。在訪問一個long數組的時候,若是數組中的一個值被加載到緩存中,它會自動加載另外7個。所以你能很是快的遍歷這個數組。
❞
「元素位置定位:」
數組長度2^n
,經過位運算,加快定位的速度。下標採起遞增的形式。不用擔憂index溢出的問題。index是long類型,即便100萬QPS的處理速度,也須要30萬年才能用完。
「無鎖設計:」
每一個生產者或者消費者線程,會先申請能夠操做的元素在數組中的位置,申請到以後,直接在該位置寫入或者讀取數據,整個過程經過原子變量CAS,保證操做的線程安全
數據結構
框架使用RingBuffer來做爲隊列的數據結構,RingBuffer就是一個可自定義大小的環形數組。
除數組外還有一個序列號(sequence),用以指向下一個可用的元素,供生產者與消費者使用。
原理圖以下所示:
Sequence
mark:Disruptor經過順序遞增的序號來編號管理經過其進行交換的數據(事件),對數據(事件)的處理過程老是沿着序號逐個遞增處理。
「數組+序列號設計的優點是什麼呢?」
回顧一下HashMap,在知道索引(index)下標的狀況下,存與取數組上的元素時間複雜度只有O(1),而這個index咱們能夠經過序列號與數組的長度取模來計算得出,index=sequence % table.length
。固然也能夠用位運算來計算效率更高,此時table.length必須是2的冪次方。
寫數據流程
單線程寫數據的流程:
-
申請寫入m個元素; -
如果有m個元素能夠入,則返回最大的序列號。這兒主要判斷是否會覆蓋未讀的元素; -
如果返回的正確,則生產者開始寫入元素。
使用場景
通過測試,Disruptor的的延時和吞吐量都比ArrayBlockingQueue優秀不少,因此,當你在使用ArrayBlockingQueue出現性能瓶頸的時候,你就能夠考慮採用Disruptor的代替。
參考:https://github.com/LMAX-Exchange/disruptor/wiki/Performance-Results
固然,Disruptor性能高並非必然的,因此,是否使用還得通過測試。
Disruptor的最經常使用的場景就是「生產者-消費者」場景,對場景的就是「一個生產者、多個消費者」的場景,而且要求順序處理。
舉個例子,咱們從MySQL的BigLog文件中順序讀取數據,而後寫入到ElasticSearch(搜索引擎)中。在這種場景下,BigLog要求一個文件一個生產者,那個是一個生產者。而寫入到ElasticSearch,則嚴格要求順序,不然會出現問題,因此一般意義上的多消費者線程沒法解決該問題,若是經過加鎖,則性能大打折扣
參考:
https://tech.meituan.com/2016/11/18/disruptor.html
https://github.com/LMAX-Exchange/disruptor/wiki
掃描二維碼
獲取更多精彩
月伴飛魚

本文分享自微信公衆號 - 月伴飛魚(gh_c4183eee9eb9)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。