系列譯文: http://ifeve.com/disruptor/html
當有多個消費者時,(按Disruptor的設計)每一個消費者各自控制本身的指針,依次讀取每一個Slot(也就是每一個消費者都會讀取到全部的產品),這時只須要保證生產者指針不會超過最慢的消費者(超過最後一個消費者「一圈」)便可,也不須要鎖。java
http://abc08010051.iteye.com/blog/2246976git
http://www.360doc.com/content/15/0330/20/11962419_459384128.shtml程序員
LMAX架構github
一、引言ajax
Disruptor是一個開源的Java框架,它被設計用於在生產者—消費者(producer-consumer problem,簡稱PCP)問題上得到儘可能高的吞吐量(TPS)和儘可能低的延遲。Disruptor是LMAX在線交易平臺的關鍵組成部分,LMAX平臺使用該框架對訂單處理速度能達到600萬TPS,除金融領域以外,其餘通常的應用中均可以用到Disruptor,它能夠帶來顯著的性能提高。其實Disruptor與其說是一個框架,不如說是一種設計思路,這個設計思路對於存在「併發、緩衝區、生產者—消費者模型、事務處理」這些元素的程序來講,Disruptor提出了一種大幅提高性能(TPS)的方案。數據庫
如今有不少人寫過關於Disruptor文章,可是我仍是想寫這篇淺析,畢竟不一樣人的理解是不一樣的,但願沒接觸過它的人能經過本文對Disruptor有個初步的瞭解,本文後面給出了一些相關連接供參考。編程
二、什麼是Disruptor?爲何速度更快?設計模式
簡單的說,Disruptor是一個高性能的Buffer,並提供了使用這個Buffer的框架。爲何說是它性能更好呢?這得從PCP和傳統解決辦法的缺點開始提及。數組
咱們知道,PCP又稱Bounded-Buffer問題,其核心就是保證對一個Buffer的存取操做在多線程環境下不會出錯。使用Java中的ArrayBlockingQueue和LinkedBlockingQueue類能輕鬆的完成PCP模型,這對於通常程序已經沒問題了,可是對於併發度高、TPS要求較大的系統則否則。(kafka經過 底層,和 增長 partition(分區)實現)
*BlockingQueue使用的是package java.util.concurrent.locks中實現的鎖,當多個線程(例如生產者)同時寫入Queue時,鎖的爭搶會致使只有一個生產者能夠執行,其餘線程都中斷了,也就是線程的狀態從RUNNING切換到BLOCKED,直到某個生產者線程使用完Buffer後釋放鎖,其餘線程狀態才從BLOCKED切換到RUNNABLE,而後時間片到其餘線程後再進行鎖的爭搶。上述過程當中,通常來講生產者存放一個數據到Buffer中所需時間是很是短的,操做系統切換線程上下文的速度也是很是快的,可是當線程數量增多後,OS切換線程所帶來的開銷逐漸增多,鎖的反覆申請和釋放成爲性能瓶頸。*BlockingQueue除了使用鎖帶來的性能損失外,還可能由於線程爭搶的順序問題形成性能再次損失:實際使用中發現線程的調度順序並不理想,可能出現短期內OS頻繁調度出生產者或消費者的狀況,這樣形成緩衝區可能短期內被填滿或被清空的極端狀況。(理想狀況應該是緩衝區長度適中,生產和消費速度基本一致)
對於上面的問題Disruptor的解決方案是:不用鎖。
Disruptor使用一個Ring Buffer存放生產者的「產品」,環形緩衝區實際上仍是一段連續內存,之因此稱做環形是由於它對數據存放位置的處理,生產者和消費者各有一個指針(數組下標),消費者的指針指向下一個要讀取的Slot,生產者指針指向下一個要放入的Slot,消費或生產後,各自的指針值p = (p +1) % n,n是緩衝區長度,這樣指針在緩衝區上反覆遊走,故能夠將緩衝區當作環狀。(如右圖)(Ring Buffer並不是Disruptor原創,Linux內核中就有環形緩衝區的實現。)使用Ring Buffer時:
①當生產者和消費者都只有一個時,因爲兩個線程分別操做不一樣的指針,因此不須要鎖。
②當有多個消費者時,(按Disruptor的設計)每一個消費者各自控制本身的指針,依次讀取每一個Slot(也就是每一個消費者都會讀取到全部的產品),這時只須要保證生產者指針不會超過最慢的消費者(超過最後一個消費者「一圈」)便可,也不須要鎖。
③當有多個生產者時,多個線程共用一個寫指針,此處須要考慮多線程問題,例如兩個生產者線程同時寫數據,當前寫指針=0,運行後其中一個線程應得到緩衝區0號Slot,另外一個應該得到1號,寫指針=2。對於這種狀況,Disruptor使用CAS來保證多線程安全。
CAS(Compare and Swap/Set)是如今CPU廣泛支持的一種指令(例如cmpxchg系類指令),CAS操做包含3個操做數:CAS(A,B,C),其功能是:取地址A的值與B比較,若是相同,則將C賦值到地址A。CAS特色是它是由硬件實現的極輕量級指令,同時CPU也保證此操做的原子性。在考慮線程間同步問題時,可使用Unsafe類的boolean compareAndSwapInt(java.lang.Object arg0, long arg1, int arg2, int arg3);系列方法,對於一個int變量(例如,Ring Buffer的寫指針),使用CAS能夠避免多線程訪問帶來的混亂,當compareAndSwap方法true時代表CAS操做成功賦值,返回false則代表地址A處的值並不等於B,此時從新試一遍便可,使用CAS移動寫指針的邏輯以下:
1 //寫指針向後移動n 2 public long next(int n) 3 { 4 //...... 5 long current,next; 6 do 7 { 8 //此處先將寫指針的當前值備份一下 9 current = pointer.get(); 10 //預計寫指針將要移動到的位置 11 next = current + n; 12 //......省略:確保從current到current+n的Slot已經被消費者讀完...... 13 //*原子操做*若是當前寫指針和剛纔同樣(說明9-12行的計算有效),那麼移動寫指針 14 if ( pointer.comapreAndSet(current,next) ) 15 break; 16 }while ( true )//若是CAS失敗或者還不能移動寫指針,則不斷嘗試 17 return next; 18 }
OK,咱們如今有了一個使用CAS的Ring Buffer,這比用鎖快上很多,但CAS的效率並無想象的那麼快,根據連接[2]pdf中評測:和單一線程無鎖執行某簡單任務相比,使用鎖的時間比無鎖高出2個數量級,CAS也高出了一個數量級。那麼Disruptor還有什麼提升性能的地方呢?下面列舉一下除了無鎖編程外的其餘性能優化點。
①緩存行填充(Cache Line Padding):CPU緩存常以64bytes做爲一個緩存行大小,緩存由若干個緩存行組成,緩存寫回主存或主存寫入緩存均是以行爲單位,此外每一個CPU核心都有本身的緩存(可是若某個核心對某緩存行作出修改,其餘擁有一樣緩存的核心須要進行同步),生產者和消費者的指針用long型表示,假設如今只有一個生產者和一個消費者,那麼雙方的指針間沒有什麼直接聯繫,只要不「挨着」,應該能夠各改各的指針。OK前面說有點亂,下面問題來了:若是生產者和消費者的指針(加起來共16bytes)出如今同一個緩存行中會怎麼樣?例如CPU核心A運行的消費者修改了一下本身的指針值(P1),那麼其餘核心中全部緩存了P1的緩存行都將失效,並從主存從新調配。這樣作的缺點顯而易見,可是CPU和編譯器並未聰明到避免這個問題,因此須要緩存行填充。雖然問題產生的緣由很繞,可是解決方案卻很是簡單:對於一個long型的緩衝區指針,用一個長度爲8的long型數組代替。如此一來,一個緩存行被這個數組填充滿,線程對各自指針的修改不會干擾到他人。
②避免GC:寫Java程序的時候,不少人習慣隨手new各類對象,雖然Java的GC會負責回收,可是系統在高壓力狀況下頻繁的new一定致使更頻繁的GC,Disruptor避免這個問題的策略是:提早分配。在建立RingBuffer實例時,參數中要求給出緩衝區元素類型的Factory,建立實例時,Ring Buffer會首先將整個緩衝區填滿爲Factory所產生的實例,後面生產者生產時,再也不用傳統作法(順手new一個實例出來而後add到buffer中),而是得到以前已經new好的實例,而後設置其中的值。舉個形象的例子就是,若緩衝區是個放不少紙片的地方,紙片上記錄着信息,之前的作法是:每次加入緩衝區時,都從系統那現準備一張紙片,而後再寫好紙片放進緩衝區,消費完就隨手扔掉。如今的作法是:實現準備好全部的紙片,想放入時只須要擦掉原來的信息寫上新的便可。
③成批操做(Batch):Ring Buffer的核心操做是生產和消費,若是能減小這兩個操做的次數,性能必然相應地提升。Disruptor中使用成批操做來減小生產和消費的次數,下面具體說一下Disruptor的生產和消費過程當中如何體現Batch的。向RingBuffer生產東西的時候,須要通過2個階段:階段一爲申請空間,申請後生產者得到了一個指針範圍[low,high],而後再對緩衝區中[low,high]這段的全部對象進行setValue(見優化點②),階段2爲發佈(像這樣ringBuffer.publish(low,high);)。階段1結束後,其餘生產者再申請的話,會獲得另外一段緩衝區。階段2結束後,以前申請的這一段數據就能夠被消費者讀到。Disruptor推薦成批生產、成批發布,減小生產時的同步帶來的性能損失。從RingBuffer消費東西的時候也須要兩個階段,階段一爲等待生產者的(寫)指針值超過指定值(N,即N以前的數據已經生產過了),階段一執行完後,消費者會獲得一個指針值(R),表示Ring Buffer中下標R以前的值是能夠讀的。階段2就是具體讀取(略)。階段一返回值R頗有可能大於N,此時消費者應該進行成批讀取操做,將[R,N]範圍內的數據所有處理。
④LMAX架構:(注:指的是LMAX公司在作他們的交易平臺時使用的一些設計思想的集合,嚴格講是LMAX架構包含Disruptor,並不是其中的一部分,可是Disruptor的設計中或多或少體現了這些思想,因此在這仍是要提一下,關於LMAX架構應該能夠寫不少,但限於我的水平,在這隻能簡單說說。另外,這個架構是以及極端追求性能的產物,不必定適合大衆。)以下圖所示LMAX架構分爲三個部分,輸入/輸出Disruptor,和中間核心的業務邏輯處理器。全部的信息輸入進入Input Disruptor,被業務邏輯處理器讀取後送入Output Disruptor,最後輸出到其餘地方。
對於通常由表現層+業務層+持久層組成的Web系統,LMAX架構指的是業務層,它有以下幾個特色:
a)業務邏輯處理器(簡稱BLP)徹底的In-Memory:如上圖,業務邏輯處理器是處理全部業務邏輯的地方,Input Disruptor把輸入(例如訂單數據、用戶操做)以消息的形式(稱做Message或者Event均可以)發到BLP,BLP進行響應。通常系統中咱們可能會多線程執行一些業務邏輯代碼,而後這些代碼最終生成一些SQL語句,而後這些語句再去查數據庫,數據庫可能在其餘主機,數據庫查詢結果可能直接用了內存中的緩存,最壞狀況是數據庫從磁盤中讀取了想要的數據,最後再返回給業務邏輯代碼。這個過程有不少的時間浪費:首先,多線程訪問持久層會涉及到同步問題(鎖,有是這貨)。其次,生成*QL語句、查詢數據庫的耗時也是很是大的。最後,最壞狀況下還要進行一大串磁盤IO和內存IO才能取到數據。LMAX對此的解決方案是:把能用到的全部數據所有裝入內存,只有到極少數或週期性須要同步、持久化的時候再訪問數據庫。(這聽起來有點瘋狂,可是仔細想一想這些業務真的須要那麼大空間嗎?)這麼作的好處也是顯而易見,減小了網絡、磁盤的IO後In-Memory系統上的大部分業務邏輯全都變成一些加減乘除運算了。
b)異步-事件驅動:通過a)的修改,若是還存在一些業務邏輯處理過程是須要長時間才能完成的,那麼就把它做爲一個事件,再拋給其餘組件(可能仍是Disruptor)等待。業務邏輯處理器須要時刻保持最快速度、最高效率,它不能等待任何事情。
c)每一個業務邏輯處理器是單線程的:你沒有聽錯。其實有了a)b)做爲前提,會發現多線程所帶來的業務層面同步問題將會極大限制BLP效率、增大BLP的複雜度,和BLP的設計(Keep it simple, stupid.)相悖,若是實在想多線程,能夠參照d)。
d)使用多級業務邏輯處理器:有些像管道模式,上圖的3塊結構能夠以多種方式組合,一個BLP能夠將輸出送往多個Output Disruptor,而這些Disruptor多是另外一些3塊結構的Input Disruptor,即有些BLP是起到分發做用的,另外一些是進行具體業務邏輯計算的。每一個BLP對應一個線程,整個架構可能比上圖複雜不少。
三、Hello Disruptor
Disruptor最初是由Java實現的,如今也有C/Cpp和.Net版本,Java版最全更新最快,代碼註釋較多比較好懂。說了這麼多,本節先給出一個測試例子,展現Disruptor的基本用法,例子中用LinkedBlockingQueue和Disruptor分別實現了單一輩子產者+單一消費者存取簡單對象的測試,統計了一下雙方消耗的時間,僅供參考。
//簡單對象:緩衝區中的元素,裏面只有一個value,提供setValue private class TestObj { public long value; public TestObj(long value) { this.value = value; } public void setValue(long value) { this.value = value; } } public class Test { //待生產的對象個數 final long objCount = 1000000; final long bufSize;//緩衝區大小 { bufSize = getRingBufferSize(objCount); } //獲取RingBuffer的緩衝區大小(2的冪次!加速計算) static long getRingBufferSize(long num) { long s = 2; while ( s < num ) { s <<= 1; } return s; } //使用LinkedBlockingQueue測試 public void testBlocingQueue() throws Exception { final LinkedBlockingQueue<TestObj> queue = new LinkedBlockingQueue<TestObj>(); Thread producer = new Thread(new Runnable() {//生產者 @Override public void run() { try{ for ( long i=1;i<=objCount;i++ ) { queue.put(new TestObj(i));//生產 } }catch ( InterruptedException e ){ } } }); Thread consumer = new Thread(new Runnable() {//消費者 @Override public void run() { try{ TestObj readObj = null; for ( long i=1;i<=objCount;i++ ) { readObj = queue.take();//消費 //DoSomethingAbout(readObj); } }catch ( InterruptedException e ){ } } }); long timeStart = System.currentTimeMillis();//統計時間 producer.start(); consumer.start(); consumer.join(); producer.join(); long timeEnd = System.currentTimeMillis(); DecimalFormat df = (DecimalFormat) DecimalFormat.getInstance(); System.out.println((timeEnd - timeStart) + "/" + df.format(objCount) + " = " + df.format(objCount/(timeEnd - timeStart)*1000) ); } //使用RingBuffer測試 public void testRingBuffer() throws Exception { //建立一個單生產者的RingBuffer,EventFactory是填充緩衝區的對象工廠 // YieldingWaitStrategy等"等待策略"指出消費者等待數據變得可用前的策略 final RingBuffer<TestObj> ringBuffer = RingBuffer.createSingleProducer(new EventFactory<TestObj>() { @Override public TestObj newInstance() { return new TestObj(0); } } , (int)bufSize, new YieldingWaitStrategy()); //建立消費者指針 final SequenceBarrier barrier = ringBuffer.newBarrier(); Thread producer = new Thread(new Runnable() {//生產者 @Override public void run() { for ( long i=1;i<=objCount;i++ ) { long index = ringBuffer.next();//申請下一個緩衝區Slot ringBuffer.get(index).setValue(i);//對申請到的Slot賦值 ringBuffer.publish(index);//發佈,而後消費者能夠讀到 } } }); Thread consumer = new Thread(new Runnable() {//消費者 @Override public void run() { TestObj readObj = null; int readCount = 0; long readIndex = Sequencer.INITIAL_CURSOR_VALUE; while ( readCount < objCount )//讀取objCount個元素後結束 { try{ long nextIndex = readIndex + 1;//當前讀取到的指針+1,即下一個該讀的位置 long availableIndex = barrier.waitFor(nextIndex);//等待直到上面的位置可讀取 while ( nextIndex <= availableIndex )//從下一個可讀位置到目前能讀到的位置(Batch!) { readObj = ringBuffer.get(nextIndex);//得到Buffer中的對象 //DoSomethingAbout(readObj); readCount++; nextIndex ++; } readIndex = availableIndex;//刷新當前讀取到的位置 }catch ( Exception ex) { ex.printStackTrace(); } } } }); long timeStart = System.currentTimeMillis();//統計時間 producer.start(); consumer.start(); consumer.join(); producer.join(); long timeEnd = System.currentTimeMillis(); DecimalFormat df = (DecimalFormat) DecimalFormat.getInstance(); System.out.println((timeEnd - timeStart) + "/" + df.format(objCount) + " = " + df.format(objCount/(timeEnd - timeStart)*1000) ); } public static void main(String[] args) throws Exception { Test ins = new Test(); //執行測試 ins.testBlocingQueue(); ins.testRingBuffer(); } } 測試代碼
測試結果:
319/1,000,000 = 3,134,000 //使用LinkedBlockingQueue在319毫秒內存取100萬個簡單對象,每秒鐘能執行313萬個
46/1,000,000 = 21,739,000 //使用Disruptor在46毫秒內存取100萬個簡單對象,每秒鐘能執行2173萬個
平均下來使用Disruptor速度能提升7倍。(不一樣電腦、應用環境下結果可能不一致)
四、隨想:Disruptor、完成端口與Mechanical Sympathy
When pushing performance like this, it starts to become important to take account of the way modern hardware is constructed.
「當對性能的追求達到這樣的程度,以至對現代硬件構成的理解變得愈來愈重要。」這句話恰當地形容了Disruptor/LMAX在對性能方面的追求和失敗。咦,失敗?爲何會這麼說呢?Disruptor固然是一個優秀的框架,我說的失敗指的是在開發它的過程當中,LMAX曽試圖提升併發程序效率,優化、使用鎖或藉助其餘模型,可是這些嘗試最終失敗了——而後他們構建了Disruptor。再提問:一個Java程序員在嘗試提升他的程序性能的時候,須要瞭解不少硬件知識嗎?我想不少人都會回答「不須要」,構建Disruptor的過程當中,最初開發人員對這個問題的回答可能也是「不須要」,可是嘗試失敗後他們決定另闢蹊徑。總的看下Disruptor的設計:鎖到CAS、緩衝行填充、避免GC等,我感受這些設計都在刻意「遷就」或者「依賴」硬件設計,這些設計更像是一種「(ugly)hack」(毫無疑問,Disruptor仍是目前最優秀的方案之一)。
Disruptor我想到了完成端口,完成端口聽說能是Windows上最快的併發網絡「框架」:你只要經過API告訴Windows你想recv哪些socket,而後各個recv操做在內核層面上執行並加入到某個隊列中,最後再使用Worker線程進行處理,大部分工做Windows都爲你作好了,不使用鎖也沒有上下文切換和大量線程,是否是和Disruptor殊途同歸呢?完成端口和Disruptor在追求性能時,都避免使用並行、鎖、多線程等概念,這些概念的出現自有它們的緣由,這裏不用多說,可是爲了性能(考慮到硬件)卻不能充分使用它們,說明在處理併發、並行問題上,硬件和軟件的發展存在不協調,可能馮氏計算機仍是適合單「線程」順序處理信息吧。關於這種不協調,我認爲應該是硬件應該會逐步適應軟件,但也有人提出了有意思的Mechanical Sympathy(連接[6]),至於將來會如何發展就不是這篇blog能討論的了:) 。
(完)
連接:
[1]Disruptor介紹譯文:http://ifeve.com/disruptor/
原文https://code.google.com/p/disruptor/wiki/BlogsAndArticles
[2]Disruptor的GitHub:http://lmax-exchange.github.io/disruptor/
其中http://lmax-exchange.github.io/disruptor/files/Disruptor-1.0.pdf這篇PDF對Disruptor做了很好的闡述。
[3]完成端口:http://blog.csdn.net/piggyxp/article/details/6922277
[4]致敬disruptor:CAS實現高效(僞)無鎖阻塞隊列實踐:http://www.majin163.com/2014/03/24/cas_queue/
[5]Disruptor 源碼分析:http://huangyunbin.iteye.com/blog/1944232
[6]Mechanical Sympathy:http://mechanical-sympathy.blogspot.com/
----------------------------------(我是分割線)----------------------------------
PS1: 轉載請註明做者。
PS2: 下載:Disruptor介紹PPT
PS3:這是我第5個博客(不過前4個都不是技術blog,笑),之後我會盡可能貼一些遇到的問題和思考到這裏,水平有限,歡迎各位指出不足!
http://ifeve.com/dissecting-disruptor-whats-so-special/
正如名字所說的同樣,它是一個環(首尾相接的環),你能夠把它用作在不一樣上下文(線程)間傳遞數據的buffer。
如何使用Disruptor(二)如何從Ringbuffer讀取
http://ifeve.com/dissecting_the_disruptor_how_doi_read_from_the_ring_buffer/