Disruptor是英國外匯交易公司LMAX開發的一個高性能隊列,研發的初衷是解決內存隊列的延遲問題(在性能測試中發現居然與I/O操做處於一樣的數量級)。基於Disruptor開發的系統單線程能支撐每秒600萬訂單,2010年在QCon演講後,得到了業界關注。2011年,企業應用軟件專家Martin Fowler專門撰寫長文介紹。同年它還得到了Oracle官方的Duke大獎。html
介紹Disruptor以前,咱們先來看一看經常使用的線程安全的內置隊列有什麼問題。Java的內置隊列以下表所示。java
隊列 | 有界性 | 鎖 | 數據結構 |
---|---|---|---|
ArrayBlockingQueue | bounded | 加鎖 | arraylist |
LinkedBlockingQueue | optionally-bounded | 加鎖 | linkedlist |
ConcurrentLinkedQueue | unbounded | 無鎖 | linkedlist |
LinkedTransferQueue | unbounded | 無鎖 | linkedlist |
PriorityBlockingQueue | unbounded | 加鎖 | heap |
DelayQueue | unbounded | 加鎖 | heap |
隊列的底層通常分紅三種:數組、鏈表和堆。其中,堆通常狀況下是爲了實現帶有優先級特性的隊列,暫且不考慮。數組
咱們就從數組和鏈表兩種數據結構來看,基於數組線程安全的隊列,比較典型的是ArrayBlockingQueue,它主要經過加鎖的方式來保證線程安全;基於鏈表的線程安全隊列分紅LinkedBlockingQueue和ConcurrentLinkedQueue兩大類,前者也經過鎖的方式來實現線程安全,然後者以及上面表格中的LinkedTransferQueue都是經過原子變量compare and swap(如下簡稱「CAS」)這種不加鎖的方式來實現的。緩存
經過不加鎖的方式實現的隊列都是無界的(沒法保證隊列的長度在肯定的範圍內);而加鎖的方式,能夠實現有界隊列。在穩定性要求特別高的系統中,爲了防止生產者速度過快,致使內存溢出,只能選擇有界隊列;同時,爲了減小Java的垃圾回收對系統性能的影響,會盡可能選擇array/heap格式的數據結構。這樣篩選下來,符合條件的隊列就只有ArrayBlockingQueue。安全
Disruptor論文中講述了一個實驗:服務器
Method | Time (ms) |
---|---|
Single thread | 300 |
Single thread with CAS | 5,700 |
Single thread with lock | 10,000 |
Single thread with volatile write | 4,700 |
Two threads with CAS | 30,000 |
Two threads with lock | 224,000 |
CAS操做比單線程無鎖慢了1個數量級;有鎖且多線程併發的狀況下,速度比單線程無鎖慢3個數量級。可見無鎖速度最快。數據結構
單線程狀況下,不加鎖的性能 > CAS操做的性能 > 加鎖的性能。多線程
在多線程狀況下,爲了保證線程安全,必須使用CAS或鎖,這種狀況下,CAS的性能超過鎖的性能,前者大約是後者的8倍。併發
綜上可知,加鎖的性能是最差的。分佈式
Disruptor經過如下設計來解決隊列速度慢的問題:
爲了不垃圾回收,採用數組而非鏈表。同時,數組對處理器的緩存機制更加友好。
數組長度2^n,經過位運算,加快定位的速度。下標採起遞增的形式。不用擔憂index溢出的問題。index是long類型,即便100萬QPS的處理速度,也須要30萬年才能用完。
每一個生產者或者消費者線程,會先申請能夠操做的元素在數組中的位置,申請到以後,直接在該位置寫入或者讀取數據。
生產者單線程寫數據的流程比較簡單:
圖5 單個生產者生產過程示意圖
多個生產者的狀況下,會遇到「如何防止多個線程重複寫同一個元素」的問題。Disruptor的解決方法是,每一個線程獲取不一樣的一段數組空間進行操做。這個經過CAS很容易達到。只須要在分配元素的時候,經過CAS判斷一下這段空間是否已經分配出去便可。
可是會遇到一個新問題:如何防止讀取的時候,讀到還未寫的元素。Disruptor在多個生產者的狀況下,引入了一個與Ring Buffer大小相同的buffer:available Buffer。當某個位置寫入成功的時候,便把availble Buffer相應的位置置位,標記爲寫入成功。讀取的時候,會遍歷available Buffer,來判斷元素是否已經就緒。
下面分讀數據和寫數據兩種狀況介紹。
生產者多線程寫入的狀況會複雜不少:
以下圖所示,讀線程讀到下標爲2的元素,三個線程Writer1/Writer2/Writer3正在向RingBuffer相應位置寫數據,寫線程被分配到的最大元素下標是11。
讀線程申請讀取到下標從3到11的元素,判斷writer cursor>=11。而後開始讀取availableBuffer,從3開始,日後讀取,發現下標爲7的元素沒有生產成功,因而WaitFor(11)返回6。
而後,消費者讀取下標從3到6共計4個元素。
圖6 多個生產者狀況下,消費者消費過程示意圖
多個生產者寫入的時候:
以下圖所示,Writer1和Writer2兩個線程寫入數組,都申請可寫的數組空間。Writer1被分配了下標3到下表5的空間,Writer2被分配了下標6到下標9的空間。
Writer1寫入下標3位置的元素,同時把available Buffer相應位置置位,標記已經寫入成功,日後移一位,開始寫下標4位置的元素。Writer2一樣的方式。最終都寫入完成。
圖7 多個生產者狀況下,生產者生產過程示意圖
下面忽略數組的環形結構,介紹一下如何實現無鎖設計。整個過程經過原子變量CAS,保證操做的線程安全。
防止不一樣生產者對同一段空間寫入的代碼,以下所示:
public long tryNext(int n) throws InsufficientCapacityException { if (n < 1) { throw new IllegalArgumentException("n must be > 0"); } long current; long next; do { current = cursor.get(); next = current + n; if (!hasAvailableCapacity(gatingSequences, n, current)) { throw InsufficientCapacityException.INSTANCE; } } while (!cursor.compareAndSet(current, next)); return next; }
經過do/while循環的條件cursor.compareAndSet(current, next),來判斷每次申請的空間是否已經被其餘生產者佔據。假如已經被佔據,該函數會返回失敗,While循環從新執行,申請寫入空間。
消費者的流程與生產者很是相似,這兒就很少描述了。
Disruptor經過精巧的無鎖設計實現了在高併發情形下的高性能。
在美團點評內部,不少高併發場景借鑑了Disruptor的設計,減小競爭的強度。其設計思想能夠擴展到分佈式場景,經過無鎖設計,來提高服務性能。
/** * @description disruptor代碼樣例。每10ms向disruptor中插入一個元素,消費者讀取數據,並打印到終端 */ import com.lmax.disruptor.*; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; import java.util.concurrent.ThreadFactory; public class DisruptorMain { public static void main(String[] args) throws Exception { // 隊列中的元素 class Element { private int value; public int get(){ return value; } public void set(int value){ this.value= value; } } // 生產者的線程工廠 ThreadFactory threadFactory = new ThreadFactory(){ @Override public Thread newThread(Runnable r) { return new Thread(r, "simpleThread"); } }; // RingBuffer生產工廠,初始化RingBuffer的時候使用 EventFactory<Element> factory = new EventFactory<Element>() { @Override public Element newInstance() { return new Element(); } }; // 處理Event的handler EventHandler<Element> handler = new EventHandler<Element>(){ @Override public void onEvent(Element element, long sequence, boolean endOfBatch) { System.out.println("Element: " + element.get()); } }; // 阻塞策略 BlockingWaitStrategy strategy = new BlockingWaitStrategy(); // 指定RingBuffer的大小 int bufferSize = 16; // 建立disruptor,採用單生產者模式 Disruptor<Element> disruptor = new Disruptor(factory, bufferSize, threadFactory, ProducerType.SINGLE, strategy); // 設置EventHandler disruptor.handleEventsWith(handler); // 啓動disruptor的線程 disruptor.start(); RingBuffer<Element> ringBuffer = disruptor.getRingBuffer(); for (int l = 0; true; l++) { // 獲取下一個可用位置的下標 long sequence = ringBuffer.next(); try { // 返回可用位置的元素 Element event = ringBuffer.get(sequence); // 設置該位置元素的值 event.set(l); } finally { ringBuffer.publish(sequence); } Thread.sleep(10); } } }
切記:必定要在設置值的地方加上
try{
}finally{
},
不然若是數據發佈不成功,最後數據會逐漸填滿ringbuffer,最後後面來的數據根本沒有辦法調用可用空間,致使方法阻塞,佔用CPU和內存,沒法釋放資源,最後致使服務器死機
注意,最後的 ringBuffer.publish 方法必須包含在 finally 中以確保必須獲得調用;若是某個請求的 sequence 未被提交,將會堵塞後續的發佈操做或者其它的 producer。
Disruptor 還提供另一種形式的調用來簡化以上操做,並確保 publish 老是獲得調用。
public class LongEvent { private long value; public void set(long value) { this.value = value; } } static class Translator implements EventTranslatorOneArg<LongEvent, Long>{ @Override publicvoid translateTo(LongEvent event, long sequence, Long data) { event.set(data); } } public static Translator TRANSLATOR = new Translator(); public staticvoid publishEvent2(Disruptor<LongEvent> disruptor) { // 發佈事件; RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); long data = getEventDataxxxx();//獲取要經過事件傳遞的業務數據; ringBuffer.publishEvent(TRANSLATOR,data); }
多個生產者
在構建Disruptor實例的時候,須要指定生產者是單生產者(ProducerType.SINGLE)仍是多生產者(ProducerType.MULTI)
多個消費者
(類型1) 多個消費者每一個消費者都有機會消費相同數據,使用handleEventsWith方法
class ComsumerHandler implements EventHandler<ResultTxt>{ private int no; public ComsumerHandler(int no) { this.no=no; } @Override public void onEvent(ResultTxt resultTxt, long sequence, boolean endOfBatch) throws Exception { System.out.println(no+" data commming......"+resultTxt.getBarCode()); } } //設置多個消費者 disruptor.handleEventsWith(new ComsumerHandler(1),new ComsumerHandler(2));
(類型2) 多個消費者,每一個消費者消費不一樣數據。也就是說每一個消費者競爭數據,競爭到消費,其餘消費者沒有機會。使用handleEventsWithWorkerPool方法
class ComsumerHandler implements WorkHandler<ResultTxt>{ private int no; public ComsumerHandler(int no) { this.no=no; } @Override public void onEvent(ResultTxt event) throws Exception { System.out.println(no+" data commming......"+event.getBarCode()); } } //多個消費者,每一個消費者競爭消費不一樣數據 disruptor.handleEventsWithWorkerPool(new ComsumerHandler(1),new ComsumerHandler(2));
暫時只有休眠1ns。
LockSupport.parkNanos(1);
名稱 | 措施 | 適用場景 |
---|---|---|
BlockingWaitStrategy | 加鎖 | CPU資源緊缺,吞吐量和延遲並不重要的場景 |
BusySpinWaitStrategy | 自旋 | 經過不斷重試,減小切換線程致使的系統調用,而下降延遲。推薦在線程綁定到固定的CPU的場景下使用 |
PhasedBackoffWaitStrategy | 自旋 + yield + 自定義策略 | CPU資源緊缺,吞吐量和延遲並不重要的場景 |
SleepingWaitStrategy | 自旋 + yield + sleep | 性能和CPU資源之間有很好的折中。延遲不均勻 |
TimeoutBlockingWaitStrategy | 加鎖,有超時限制 | CPU資源緊缺,吞吐量和延遲並不重要的場景 |
YieldingWaitStrategy | 自旋 + yield + 自旋 | 性能和CPU資源之間有很好的折中。延遲比較均勻 |
更多詳細信息請參考: http://tech.meituan.com/disruptor.html