Disruptor是英國外匯交易公司LMAX開發的一個高性能隊列,研發的初衷是解決內存隊列的延遲問題(在性能測試中發現居然與I/O操做處於一樣的數量級)。基於Disruptor開發的系統單線程能支撐每秒600萬訂單,2010年在QCon演講後,得到了業界關注。2011年,企業應用軟件專家Martin Fowler專門撰寫長文介紹。同年它還得到了Oracle官方的Duke大獎。html
目前,包括Apache Storm、Camel、Log4j 2在內的不少知名項目都應用了Disruptor以獲取高性能。在美團點評技術團隊它也有很多應用,有的項目架構借鑑了它的設計機制。本文從實戰角度剖析了Disruptor的實現原理。java
須要特別指出的是,這裏所說的隊列是系統內部的內存隊列,而不是Kafka這樣的分佈式隊列。另外,本文所描述的Disruptor特性限於3.3.4。git
介紹Disruptor以前,咱們先來看一看經常使用的線程安全的內置隊列有什麼問題。Java的內置隊列以下表所示。github
隊列 | 有界性 | 鎖 | 數據結構 |
---|---|---|---|
ArrayBlockingQueue | bounded | 加鎖 | arraylist |
LinkedBlockingQueue | optionally-bounded | 加鎖 | linkedlist |
ConcurrentLinkedQueue | unbounded | 無鎖 | linkedlist |
LinkedTransferQueue | unbounded | 無鎖 | linkedlist |
PriorityBlockingQueue | unbounded | 加鎖 | heap |
DelayQueue | unbounded | 加鎖 | heap |
隊列的底層通常分紅三種:數組、鏈表和堆。其中,堆通常狀況下是爲了實現帶有優先級特性的隊列,暫且不考慮。apache
咱們就從數組和鏈表兩種數據結構來看,基於數組線程安全的隊列,比較典型的是ArrayBlockingQueue,它主要經過加鎖的方式來保證線程安全;基於鏈表的線程安全隊列分紅LinkedBlockingQueue和ConcurrentLinkedQueue兩大類,前者也經過鎖的方式來實現線程安全,然後者以及上面表格中的LinkedTransferQueue都是經過原子變量compare and swap(如下簡稱「CAS」)這種不加鎖的方式來實現的。編程
經過不加鎖的方式實現的隊列都是無界的(沒法保證隊列的長度在肯定的範圍內);而加鎖的方式,能夠實現有界隊列。在穩定性要求特別高的系統中,爲了防止生產者速度過快,致使內存溢出,只能選擇有界隊列;同時,爲了減小Java的垃圾回收對系統性能的影響,會盡可能選擇array/heap格式的數據結構。這樣篩選下來,符合條件的隊列就只有ArrayBlockingQueue。數組
ArrayBlockingQueue在實際使用過程當中,會由於加鎖和僞共享等出現嚴重的性能問題,咱們下面來分析一下。promise
現實編程過程當中,加鎖一般會嚴重地影響性能。線程會由於競爭不到鎖而被掛起,等鎖被釋放的時候,線程又會被恢復,這個過程當中存在着很大的開銷,而且一般會有較長時間的中斷,由於當一個線程正在等待鎖時,它不能作任何其餘事情。若是一個線程在持有鎖的狀況下被延遲執行,例如發生了缺頁錯誤、調度延遲或者其它相似狀況,那麼全部須要這個鎖的線程都沒法執行下去。若是被阻塞線程的優先級較高,而持有鎖的線程優先級較低,就會發生優先級反轉。緩存
Disruptor論文中講述了一個實驗:安全
Method | Time (ms) |
---|---|
Single thread | 300 |
Single thread with CAS | 5,700 |
Single thread with lock | 10,000 |
Single thread with volatile write | 4,700 |
Two threads with CAS | 30,000 |
Two threads with lock | 224,000 |
CAS操做比單線程無鎖慢了1個數量級;有鎖且多線程併發的狀況下,速度比單線程無鎖慢3個數量級。可見無鎖速度最快。
單線程狀況下,不加鎖的性能 > CAS操做的性能 > 加鎖的性能。
在多線程狀況下,爲了保證線程安全,必須使用CAS或鎖,這種狀況下,CAS的性能超過鎖的性能,前者大約是後者的8倍。
綜上可知,加鎖的性能是最差的。
保證線程安全通常分紅兩種方式:鎖和原子變量。
採起加鎖的方式,默認線程會衝突,訪問數據時,先加上鎖再訪問,訪問以後再解鎖。經過鎖界定一個臨界區,同時只有一個線程進入。如上圖所示,Thread2訪問Entry的時候,加了鎖,Thread1就不能再執行訪問Entry的代碼,從而保證線程安全。
下面是ArrayBlockingQueue經過加鎖的方式實現的offer方法,保證線程安全。
public boolean offer(E e) { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lock(); try { if (count == items.length) return false; else { insert(e); return true; } } finally { lock.unlock(); } }
原子變量可以保證原子性的操做,意思是某個任務在執行過程當中,要麼所有成功,要麼所有失敗回滾,恢復到執行以前的初態,不存在初態和成功之間的中間狀態。例如CAS操做,要麼比較並交換成功,要麼比較並交換失敗。由CPU保證原子性。
經過原子變量能夠實現線程安全。執行某個任務的時候,先假定不會有衝突,若不發生衝突,則直接執行成功;當發生衝突的時候,則執行失敗,回滾再從新操做,直到不發生衝突。
如圖所示,Thread1和Thread2都要把Entry加1。若不加鎖,也不使用CAS,有可能Thread1取到了myValue=1,Thread2也取到了myValue=1,而後相加,Entry中的value值爲2。這與預期不相符,咱們預期的是Entry的值通過兩次相加後等於3。
CAS會先把Entry如今的value跟線程當初讀出的值相比較,若相同,則賦值;若不相同,則賦值執行失敗。通常會經過while/for循環來從新執行,直到賦值成功。
代碼示例是AtomicInteger的getAndAdd方法。CAS是CPU的一個指令,由CPU保證原子性。
/** * Atomically adds the given value to the current value. * * @param delta the value to add * @return the previous value */ public final int getAndAdd(int delta) { for (;;) { int current = get(); int next = current + delta; if (compareAndSet(current, next)) return current; } } /** * Atomically sets the value to the given updated value * if the current value {@code ==} the expected value. * * @param expect the expected value * @param update the new value * @return true if successful. False return indicates that * the actual value was not equal to the expected value. */ public final boolean compareAndSet(int expect, int update) { return unsafe.compareAndSwapInt(this, valueOffset, expect, update); }
在高度競爭的狀況下,鎖的性能將超過原子變量的性能,可是更真實的競爭狀況下,原子變量的性能將超過鎖的性能。同時原子變量不會有死鎖等活躍性問題。
下圖是計算的基本結構。L一、L二、L3分別表示一級緩存、二級緩存、三級緩存,越靠近CPU的緩存,速度越快,容量也越小。因此L1緩存很小但很快,而且緊靠着在使用它的CPU內核;L2大一些,也慢一些,而且仍然只能被一個單獨的CPU核使用;L3更大、更慢,而且被單個插槽上的全部CPU核共享;最後是主存,由所有插槽上的全部CPU核共享。
當CPU執行運算的時候,它先去L1查找所需的數據、再去L二、而後是L3,若是最後這些緩存中都沒有,所需的數據就要去主內存拿。走得越遠,運算耗費的時間就越長。因此若是你在作一些很頻繁的事,你要儘可能確保數據在L1緩存中。
另外,線程之間共享一份數據的時候,須要一個線程把數據寫回主存,而另外一個線程訪問主存中相應的數據。
下面是從CPU訪問不一樣層級數據的時間概念:
從CPU到 | 大約須要的CPU週期 | 大約須要的時間 |
---|---|---|
主存 | 約60-80ns | |
QPI 總線傳輸(between sockets, not drawn) | 約20ns | |
L3 cache | 約40-45 cycles | 約15ns |
L2 cache | 約10 cycles | 約3ns |
L1 cache | 約3-4 cycles | 約1ns |
寄存器 | 1 cycle |
可見CPU讀取主存中的數據會比從L1中讀取慢了近2個數量級。
Cache是由不少個cache line組成的。每一個cache line一般是64字節,而且它有效地引用主內存中的一起地址。一個Java的long類型變量是8字節,所以在一個緩存行中能夠存8個long類型的變量。
CPU每次從主存中拉取數據時,會把相鄰的數據也存入同一個cache line。
在訪問一個long數組的時候,若是數組中的一個值被加載到緩存中,它會自動加載另外7個。所以你能很是快的遍歷這個數組。事實上,你能夠很是快速的遍歷在連續內存塊中分配的任意數據結構。
下面的例子是測試利用cache line的特性和不利用cache line的特性的效果對比。
package com.meituan.FalseSharing; /** * @author gongming * @description * @date 16/6/4 */ public class CacheLineEffect { //考慮通常緩存行大小是64字節,一個 long 類型佔8字節 static long[][] arr; public static void main(String[] args) { arr = new long[1024 * 1024][]; for (int i = 0; i < 1024 * 1024; i++) { arr[i] = new long[8]; for (int j = 0; j < 8; j++) { arr[i][j] = 0L; } } long sum = 0L; long marked = System.currentTimeMillis(); for (int i = 0; i < 1024 * 1024; i+=1) { for(int j =0; j< 8;j++){ sum = arr[i][j]; } } System.out.println("Loop times:" + (System.currentTimeMillis() - marked) + "ms"); marked = System.currentTimeMillis(); for (int i = 0; i < 8; i+=1) { for(int j =0; j< 1024 * 1024;j++){ sum = arr[j][i]; } } System.out.println("Loop times:" + (System.currentTimeMillis() - marked) + "ms"); } }
在2G Hz、2核、8G內存的運行環境中測試,速度差一倍。
結果:
Loop times:30ms
Loop times:65ms
ArrayBlockingQueue有三個成員變量:
這三個變量很容易放到一個緩存行中,可是之間修改沒有太多的關聯。因此每次修改,都會使以前緩存的數據失效,從而不能徹底達到共享的效果。
如上圖所示,當生產者線程put一個元素到ArrayBlockingQueue時,putIndex會修改,從而致使消費者線程的緩存中的緩存行無效,須要從主存中從新讀取。
這種沒法充分使用緩存行特性的現象,稱爲僞共享。
對於僞共享,通常的解決方案是,增大數組元素的間隔使得由不一樣線程存取的元素位於不一樣的緩存行上,以空間換時間。
package com.meituan.FalseSharing; public class FalseSharing implements Runnable{ public final static long ITERATIONS = 500L * 1000L * 100L; private int arrayIndex = 0; private static ValuePadding[] longs; public FalseSharing(final int arrayIndex) { this.arrayIndex = arrayIndex; } public static void main(final String[] args) throws Exception { for(int i=1;i<10;i++){ System.gc(); final long start = System.currentTimeMillis(); runTest(i); System.out.println("Thread num "+i+" duration = " + (System.currentTimeMillis() - start)); } } private static void runTest(int NUM_THREADS) throws InterruptedException { Thread[] threads = new Thread[NUM_THREADS]; longs = new ValuePadding[NUM_THREADS]; for (int i = 0; i < longs.length; i++) { longs[i] = new ValuePadding(); } for (int i = 0; i < threads.length; i++) { threads[i] = new Thread(new FalseSharing(i)); } for (Thread t : threads) { t.start(); } for (Thread t : threads) { t.join(); } } public void run() { long i = ITERATIONS + 1; while (0 != --i) { longs[arrayIndex].value = 0L; } } public final static class ValuePadding { protected long p1, p2, p3, p4, p5, p6, p7; protected volatile long value = 0L; protected long p9, p10, p11, p12, p13, p14; protected long p15; } public final static class ValueNoPadding { // protected long p1, p2, p3, p4, p5, p6, p7; protected volatile long value = 0L; // protected long p9, p10, p11, p12, p13, p14, p15; } }
在2G Hz,2核,8G內存, jdk 1.7.0_45 的運行環境下,使用了共享機制比沒有使用共享機制,速度快了4倍左右。
結果:
Thread num 1 duration = 447
Thread num 2 duration = 463
Thread num 3 duration = 454
Thread num 4 duration = 464
Thread num 5 duration = 561
Thread num 6 duration = 606
Thread num 7 duration = 684
Thread num 8 duration = 870
Thread num 9 duration = 823
把代碼中ValuePadding都替換爲ValueNoPadding後的結果:
Thread num 1 duration = 446
Thread num 2 duration = 2549
Thread num 3 duration = 2898
Thread num 4 duration = 3931
Thread num 5 duration = 4716
Thread num 6 duration = 5424
Thread num 7 duration = 4868
Thread num 8 duration = 4595
Thread num 9 duration = 4540
備註:在jdk1.8中,有專門的註解@Contended來避免僞共享,更優雅地解決問題。
Disruptor經過如下設計來解決隊列速度慢的問題:
爲了不垃圾回收,採用數組而非鏈表。同時,數組對處理器的緩存機制更加友好。
數組長度2^n,經過位運算,加快定位的速度。下標採起遞增的形式。不用擔憂index溢出的問題。index是long類型,即便100萬QPS的處理速度,也須要30萬年才能用完。
每一個生產者或者消費者線程,會先申請能夠操做的元素在數組中的位置,申請到以後,直接在該位置寫入或者讀取數據。
下面忽略數組的環形結構,介紹一下如何實現無鎖設計。整個過程經過原子變量CAS,保證操做的線程安全。
生產者單線程寫數據的流程比較簡單:
多個生產者的狀況下,會遇到「如何防止多個線程重複寫同一個元素」的問題。Disruptor的解決方法是,每一個線程獲取不一樣的一段數組空間進行操做。這個經過CAS很容易達到。只須要在分配元素的時候,經過CAS判斷一下這段空間是否已經分配出去便可。
可是會遇到一個新問題:如何防止讀取的時候,讀到還未寫的元素。Disruptor在多個生產者的狀況下,引入了一個與Ring Buffer大小相同的buffer:available Buffer。當某個位置寫入成功的時候,便把availble Buffer相應的位置置位,標記爲寫入成功。讀取的時候,會遍歷available Buffer,來判斷元素是否已經就緒。
下面分讀數據和寫數據兩種狀況介紹。
生產者多線程寫入的狀況會複雜不少:
以下圖所示,讀線程讀到下標爲2的元素,三個線程Writer1/Writer2/Writer3正在向RingBuffer相應位置寫數據,寫線程被分配到的最大元素下標是11。
讀線程申請讀取到下標從3到11的元素,判斷writer cursor>=11。而後開始讀取availableBuffer,從3開始,日後讀取,發現下標爲7的元素沒有生產成功,因而WaitFor(11)返回6。
而後,消費者讀取下標從3到6共計4個元素。
多個生產者寫入的時候:
以下圖所示,Writer1和Writer2兩個線程寫入數組,都申請可寫的數組空間。Writer1被分配了下標3到下表5的空間,Writer2被分配了下標6到下標9的空間。
Writer1寫入下標3位置的元素,同時把available Buffer相應位置置位,標記已經寫入成功,日後移一位,開始寫下標4位置的元素。Writer2一樣的方式。最終都寫入完成。
防止不一樣生產者對同一段空間寫入的代碼,以下所示:
public long tryNext(int n) throws InsufficientCapacityException { if (n < 1) { throw new IllegalArgumentException("n must be > 0"); } long current; long next; do { current = cursor.get(); next = current + n; if (!hasAvailableCapacity(gatingSequences, n, current)) { throw InsufficientCapacityException.INSTANCE; } } while (!cursor.compareAndSet(current, next)); return next; }
經過do/while循環的條件cursor.compareAndSet(current, next),來判斷每次申請的空間是否已經被其餘生產者佔據。假如已經被佔據,該函數會返回失敗,While循環從新執行,申請寫入空間。
消費者的流程與生產者很是相似,這兒就很少描述了。
Disruptor經過精巧的無鎖設計實現了在高併發情形下的高性能。
在美團點評內部,不少高併發場景借鑑了Disruptor的設計,減小競爭的強度。其設計思想能夠擴展到分佈式場景,經過無鎖設計,來提高服務性能。
使用Disruptor比使用ArrayBlockingQueue略微複雜,爲方便讀者上手,增長代碼樣例。
代碼實現的功能:每10ms向disruptor中插入一個元素,消費者讀取數據,並打印到終端。詳細邏輯請細讀代碼。
如下代碼基於3.3.4版本的Disruptor包。
package com.meituan.Disruptor; /** * @description disruptor代碼樣例。每10ms向disruptor中插入一個元素,消費者讀取數據,並打印到終端 */ import com.lmax.disruptor.*; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; import java.util.concurrent.ThreadFactory; public class DisruptorMain { public static void main(String[] args) throws Exception { // 隊列中的元素 class Element { private int value; public int get(){ return value; } public void set(int value){ this.value= value; } } // 生產者的線程工廠 ThreadFactory threadFactory = new ThreadFactory(){ @Override public Thread newThread(Runnable r) { return new Thread(r, "simpleThread"); } }; // RingBuffer生產工廠,初始化RingBuffer的時候使用 EventFactory<Element> factory = new EventFactory<Element>() { @Override public Element newInstance() { return new Element(); } }; // 處理Event的handler EventHandler<Element> handler = new EventHandler<Element>(){ @Override public void onEvent(Element element, long sequence, boolean endOfBatch) { System.out.println("Element: " + element.get()); } }; // 阻塞策略 BlockingWaitStrategy strategy = new BlockingWaitStrategy(); // 指定RingBuffer的大小 int bufferSize = 16; // 建立disruptor,採用單生產者模式 Disruptor<Element> disruptor = new Disruptor(factory, bufferSize, threadFactory, ProducerType.SINGLE, strategy); // 設置EventHandler disruptor.handleEventsWith(handler); // 啓動disruptor的線程 disruptor.start(); RingBuffer<Element> ringBuffer = disruptor.getRingBuffer(); for (int l = 0; true; l++) { // 獲取下一個可用位置的下標 long sequence = ringBuffer.next(); try { // 返回可用位置的元素 Element event = ringBuffer.get(sequence); // 設置該位置元素的值 event.set(l); } finally { ringBuffer.publish(sequence); } Thread.sleep(10); } } }
如下面這些模式測試性能:
吞吐量測試數據(每秒的數量)以下。
環境:
ABQ | Disruptor | |
---|---|---|
Unicast: 1P – 1C | 5,339,256 | 25,998,336 |
Pipeline: 1P – 3C | 2,128,918 | 16,806,157 |
Sequencer: 3P – 1C | 5,539,531 | 13,403,268 |
Multicast: 1P – 3C | 1,077,384 | 9,377,871 |
Diamond: 1P – 3C | 2,113,941 | 16,143,613 |
環境:
ABQ | Disruptor | |
---|---|---|
Unicast: 1P – 1C | 4,057,453 | 22,381,378 |
Pipeline: 1P – 3C | 2,006,903 | 15,857,913 |
Sequencer: 3P – 1C | 2,056,118 | 14,540,519 |
Multicast: 1P – 3C | 260,733 | 10,860,121 |
Diamond: 1P – 3C | 2,082,725 | 15,295,197 |
依據併發競爭的激烈程度的不一樣,Disruptor比ArrayBlockingQueue吞吐量快4~7倍。
按照Pipeline: 1P – 3C的鏈接模式測試延遲,生產者兩次寫入之間的延遲爲1ms。
運行環境:
Array Blocking Queue (ns) | Disruptor (ns) | |
---|---|---|
99% observations less than | 2,097,152 | 128 |
99.99% observations less than | 4,194,304 | 8,192 |
Max Latency | 5,069,086 | 175,567 |
Mean Latency | 32,757 | 52 |
Min Latency | 145 | 29 |
可見,平均延遲差了3個數量級。
暫時只有休眠1ns。
LockSupport.parkNanos(1);
名稱 | 措施 | 適用場景 |
---|---|---|
BlockingWaitStrategy | 加鎖 | CPU資源緊缺,吞吐量和延遲並不重要的場景 |
BusySpinWaitStrategy | 自旋 | 經過不斷重試,減小切換線程致使的系統調用,而下降延遲。推薦在線程綁定到固定的CPU的場景下使用 |
PhasedBackoffWaitStrategy | 自旋 + yield + 自定義策略 | CPU資源緊缺,吞吐量和延遲並不重要的場景 |
SleepingWaitStrategy | 自旋 + yield + sleep | 性能和CPU資源之間有很好的折中。延遲不均勻 |
TimeoutBlockingWaitStrategy | 加鎖,有超時限制 | CPU資源緊缺,吞吐量和延遲並不重要的場景 |
YieldingWaitStrategy | 自旋 + yield + 自旋 | 性能和CPU資源之間有很好的折中。延遲比較均勻 |
Log4j 2相對於Log4j 1最大的優點在於多線程併發場景下性能更優。該特性源自於Log4j 2的異步模式採用了Disruptor來處理。
在Log4j 2的配置文件中能夠配置WaitStrategy,默認是Timeout策略。下面是Log4j 2中對WaitStrategy的配置官方文檔:
System Property | Default Value | Description |
---|---|---|
AsyncLogger.WaitStrategy | Timeout | Valid values: Block, Timeout, Sleep, Yield. Block is a strategy that uses a lock and condition variable for the I/O thread waiting for log events. Block can be used when throughput and low-latency are not as important as CPU resource. Recommended for resource constrained/virtualised environments. Timeout is a variation of the Block strategy that will periodically wake up from the lock condition await() call. This ensures that if a notification is missed somehow the consumer thread is not stuck but will recover with a small latency delay (default 10ms). Sleep is a strategy that initially spins, then uses a Thread.yield(), and eventually parks for the minimum number of nanos the OS and JVM will allow while the I/O thread is waiting for log events. Sleep is a good compromise between performance and CPU resource. This strategy has very low impact on the application thread, in exchange for some additional latency for actually getting the message logged. Yield is a strategy that uses a Thread.yield() for waiting for log events after an initially spinning. Yield is a good compromise between performance and CPU resource, but may use more CPU than Sleep in order to get the message logged to disk sooner. |
loggers all async採用的是Disruptor,而Async Appender採用的是ArrayBlockingQueue隊列。
由圖可見,單線程狀況下,loggers all async與Async Appender吞吐量相差不大,可是在64個線程的時候,loggers all async的吞吐量比Async Appender增長了12倍,是Sync模式的68倍。
美團點評在公司內部統一推行日誌接入規範,要求必須使用Log4j 2,使普通單機QPS的上限再也不只停留在幾千,極高地提高了服務性能。