Disruptor Ringbuffer

 系列譯文: http://ifeve.com/disruptor/html

 當有多個消費者時,(按Disruptor的設計)每一個消費者各自控制本身的指針,依次讀取每一個Slot(也就是每一個消費者都會讀取到全部的產品),這時只須要保證生產者指針不會超過最慢的消費者(超過最後一個消費者「一圈」)便可,也不須要鎖。java

重複消費http://blog.csdn.net/zero__007/article/details/49684363

 

 

Disruptor多個消費者不重複處理生產者發送的消息的demo

Disruptor多個消費者獨立處理生產者消息的簡單demo

 

http://abc08010051.iteye.com/blog/2246976git

 

http://www.360doc.com/content/15/0330/20/11962419_459384128.shtml程序員

 

    • EventProcessor
      EventProcessor 持有特定消費者(Consumer)的 Sequence,並提供用於調用事件處理實現的事件循環(Event Loop)。
    • EventHandler
      Disruptor 定義的事件處理接口,由用戶實現,用於處理事件,是 Consumer 的真正實現

 

 

Disruptor 極速體驗

 

 https://www.cnblogs.com/haiq/p/4112689.html

 

 LMAX架構github

併發框架Disruptor淺析

 

一、引言ajax

  Disruptor是一個開源的Java框架,它被設計用於在生產者—消費者(producer-consumer problem,簡稱PCP)問題上得到儘可能高的吞吐量(TPS)和儘可能低的延遲。Disruptor是LMAX在線交易平臺的關鍵組成部分,LMAX平臺使用該框架對訂單處理速度能達到600萬TPS,除金融領域以外,其餘通常的應用中均可以用到Disruptor,它能夠帶來顯著的性能提高。其實Disruptor與其說是一個框架,不如說是一種設計思路,這個設計思路對於存在「併發、緩衝區、生產者—消費者模型、事務處理」這些元素的程序來講,Disruptor提出了一種大幅提高性能(TPS)的方案數據庫

  如今有不少人寫過關於Disruptor文章,可是我仍是想寫這篇淺析,畢竟不一樣人的理解是不一樣的,但願沒接觸過它的人能經過本文對Disruptor有個初步的瞭解,本文後面給出了一些相關連接供參考。編程

二、什麼是Disruptor?爲何速度更快?設計模式

  簡單的說,Disruptor是一個高性能的Buffer,並提供了使用這個Buffer的框架。爲何說是它性能更好呢?這得從PCP和傳統解決辦法的缺點開始提及。數組

  咱們知道,PCP又稱Bounded-Buffer問題,其核心就是保證對一個Buffer的存取操做在多線程環境下不會出錯。使用Java中的ArrayBlockingQueue和LinkedBlockingQueue類能輕鬆的完成PCP模型,這對於通常程序已經沒問題了,可是對於併發度高、TPS要求較大的系統則否則。(kafka經過 底層,和 增長 partition(分區)實現)

  *BlockingQueue使用的是package java.util.concurrent.locks中實現的鎖,當多個線程(例如生產者)同時寫入Queue時,鎖的爭搶會致使只有一個生產者能夠執行,其餘線程都中斷了,也就是線程的狀態從RUNNING切換到BLOCKED,直到某個生產者線程使用完Buffer後釋放鎖,其餘線程狀態才從BLOCKED切換到RUNNABLE,而後時間片到其餘線程後再進行鎖的爭搶。上述過程當中,通常來講生產者存放一個數據到Buffer中所需時間是很是短的,操做系統切換線程上下文的速度也是很是快的,可是當線程數量增多後,OS切換線程所帶來的開銷逐漸增多,鎖的反覆申請和釋放成爲性能瓶頸。*BlockingQueue除了使用鎖帶來的性能損失外,還可能由於線程爭搶的順序問題形成性能再次損失:實際使用中發現線程的調度順序並不理想,可能出現短期內OS頻繁調度出生產者或消費者的狀況,這樣形成緩衝區可能短期內被填滿或被清空的極端狀況。(理想狀況應該是緩衝區長度適中,生產和消費速度基本一致)

  對於上面的問題Disruptor的解決方案是:不用鎖。

 

  Disruptor使用一個Ring Buffer存放生產者的「產品」,環形緩衝區實際上仍是一段連續內存,之因此稱做環形是由於它對數據存放位置的處理,生產者和消費者各有一個指針(數組下標),消費者的指針指向下一個要讀取的Slot,生產者指針指向下一個要放入的Slot,消費或生產後,各自的指針值p = (p +1) % n,n是緩衝區長度,這樣指針在緩衝區上反覆遊走,故能夠將緩衝區當作環狀。(如右圖)(Ring Buffer並不是Disruptor原創,Linux內核中就有環形緩衝區的實現。)使用Ring Buffer時:

當生產者和消費者都只有一個時,因爲兩個線程分別操做不一樣的指針,因此不須要鎖。

當有多個消費者時,(按Disruptor的設計)每一個消費者各自控制本身的指針,依次讀取每一個Slot(也就是每一個消費者都會讀取到全部的產品),這時只須要保證生產者指針不會超過最慢的消費者(超過最後一個消費者「一圈」)便可,也不須要鎖。

當有多個生產者時,多個線程共用一個寫指針,此處須要考慮多線程問題,例如兩個生產者線程同時寫數據,當前寫指針=0,運行後其中一個線程應得到緩衝區0號Slot,另外一個應該得到1號,寫指針=2。對於這種狀況,Disruptor使用CAS來保證多線程安全。

  CAS(Compare and Swap/Set)是如今CPU廣泛支持的一種指令(例如cmpxchg系類指令),CAS操做包含3個操做數:CAS(A,B,C),其功能是:取地址A的值與B比較,若是相同,則將C賦值到地址A。CAS特色是它是由硬件實現的極輕量級指令,同時CPU也保證此操做的原子性。在考慮線程間同步問題時,可使用Unsafe類的boolean compareAndSwapInt(java.lang.Object arg0, long arg1, int arg2, int arg3);系列方法,對於一個int變量(例如,Ring Buffer的寫指針),使用CAS能夠避免多線程訪問帶來的混亂,當compareAndSwap方法true時代表CAS操做成功賦值,返回false則代表地址A處的值並不等於B,此時從新試一遍便可,使用CAS移動寫指針的邏輯以下:  

複製代碼
  1 //寫指針向後移動n
 2 public long next(int n)
 3 {
 4     //......
 5     long current,next;
 6     do
 7     {
 8         //此處先將寫指針的當前值備份一下
 9         current = pointer.get();
10         //預計寫指針將要移動到的位置
11         next = current + n;
12         //......省略:確保從current到current+n的Slot已經被消費者讀完......
13         //*原子操做*若是當前寫指針和剛纔同樣(說明9-12行的計算有效),那麼移動寫指針
14         if ( pointer.comapreAndSet(current,next) )
15             break;  
16     }while ( true )//若是CAS失敗或者還不能移動寫指針,則不斷嘗試
17     return next;
18 }
複製代碼

   OK,咱們如今有了一個使用CAS的Ring Buffer,這比用鎖快上很多,但CAS的效率並無想象的那麼快,根據連接[2]pdf中評測:和單一線程無鎖執行某簡單任務相比,使用鎖的時間比無鎖高出2個數量級,CAS也高出了一個數量級。那麼Disruptor還有什麼提升性能的地方呢?下面列舉一下除了無鎖編程外的其餘性能優化點。

  緩存行填充(Cache Line Padding):CPU緩存常以64bytes做爲一個緩存行大小,緩存由若干個緩存行組成,緩存寫回主存或主存寫入緩存均是以行爲單位,此外每一個CPU核心都有本身的緩存(可是若某個核心對某緩存行作出修改,其餘擁有一樣緩存的核心須要進行同步),生產者和消費者的指針用long型表示,假設如今只有一個生產者和一個消費者,那麼雙方的指針間沒有什麼直接聯繫,只要不「挨着」,應該能夠各改各的指針。OK前面說有點亂,下面問題來了:若是生產者和消費者的指針(加起來共16bytes)出如今同一個緩存行中會怎麼樣?例如CPU核心A運行的消費者修改了一下本身的指針值(P1),那麼其餘核心中全部緩存了P1的緩存行都將失效,並從主存從新調配。這樣作的缺點顯而易見,可是CPU和編譯器並未聰明到避免這個問題,因此須要緩存行填充。雖然問題產生的緣由很繞,可是解決方案卻很是簡單:對於一個long型的緩衝區指針,用一個長度爲8的long型數組代替。如此一來,一個緩存行被這個數組填充滿,線程對各自指針的修改不會干擾到他人。

  避免GC:寫Java程序的時候,不少人習慣隨手new各類對象,雖然Java的GC會負責回收,可是系統在高壓力狀況下頻繁的new一定致使更頻繁的GC,Disruptor避免這個問題的策略是:提早分配。在建立RingBuffer實例時,參數中要求給出緩衝區元素類型的Factory,建立實例時,Ring Buffer會首先將整個緩衝區填滿爲Factory所產生的實例,後面生產者生產時,再也不用傳統作法(順手new一個實例出來而後add到buffer中),而是得到以前已經new好的實例,而後設置其中的值。舉個形象的例子就是,若緩衝區是個放不少紙片的地方,紙片上記錄着信息,之前的作法是:每次加入緩衝區時,都從系統那現準備一張紙片,而後再寫好紙片放進緩衝區,消費完就隨手扔掉。如今的作法是:實現準備好全部的紙片,想放入時只須要擦掉原來的信息寫上新的便可。

  成批操做(Batch):Ring Buffer的核心操做是生產和消費,若是能減小這兩個操做的次數,性能必然相應地提升。Disruptor中使用成批操做來減小生產和消費的次數,下面具體說一下Disruptor的生產和消費過程當中如何體現Batch的。向RingBuffer生產東西的時候,須要通過2個階段:階段一爲申請空間,申請後生產者得到了一個指針範圍[low,high],而後再對緩衝區中[low,high]這段的全部對象進行setValue(見優化點②),階段2爲發佈(像這樣ringBuffer.publish(low,high);)。階段1結束後,其餘生產者再申請的話,會獲得另外一段緩衝區。階段2結束後,以前申請的這一段數據就能夠被消費者讀到。Disruptor推薦成批生產、成批發布,減小生產時的同步帶來的性能損失。從RingBuffer消費東西的時候也須要兩個階段,階段一爲等待生產者的(寫)指針值超過指定值(N,即N以前的數據已經生產過了),階段一執行完後,消費者會獲得一個指針值(R),表示Ring Buffer中下標R以前的值是能夠讀的。階段2就是具體讀取(略)。階段一返回值R頗有可能大於N,此時消費者應該進行成批讀取操做,將[R,N]範圍內的數據所有處理。

  LMAX架構:(注:指的是LMAX公司在作他們的交易平臺時使用的一些設計思想的集合,嚴格講是LMAX架構包含Disruptor,並不是其中的一部分,可是Disruptor的設計中或多或少體現了這些思想,因此在這仍是要提一下,關於LMAX架構應該能夠寫不少,但限於我的水平,在這隻能簡單說說。另外,這個架構是以及極端追求性能的產物,不必定適合大衆。)以下圖所示LMAX架構分爲三個部分,輸入/輸出Disruptor,和中間核心的業務邏輯處理器。全部的信息輸入進入Input Disruptor,被業務邏輯處理器讀取後送入Output Disruptor,最後輸出到其餘地方。

 

  對於通常由表現層+業務層+持久層組成的Web系統,LMAX架構指的是業務層,它有以下幾個特色:

    a)業務邏輯處理器(簡稱BLP)徹底的In-Memory:如上圖,業務邏輯處理器是處理全部業務邏輯的地方,Input Disruptor把輸入(例如訂單數據、用戶操做)以消息的形式(稱做Message或者Event均可以)發到BLP,BLP進行響應。通常系統中咱們可能會多線程執行一些業務邏輯代碼,而後這些代碼最終生成一些SQL語句,而後這些語句再去查數據庫,數據庫可能在其餘主機,數據庫查詢結果可能直接用了內存中的緩存,最壞狀況是數據庫從磁盤中讀取了想要的數據,最後再返回給業務邏輯代碼。這個過程有不少的時間浪費:首先,多線程訪問持久層會涉及到同步問題(鎖,有是這貨)。其次,生成*QL語句、查詢數據庫的耗時也是很是大的。最後,最壞狀況下還要進行一大串磁盤IO和內存IO才能取到數據。LMAX對此的解決方案是:把能用到的全部數據所有裝入內存,只有到極少數或週期性須要同步、持久化的時候再訪問數據庫。(這聽起來有點瘋狂,可是仔細想一想這些業務真的須要那麼大空間嗎?)這麼作的好處也是顯而易見,減小了網絡、磁盤的IO後In-Memory系統上的大部分業務邏輯全都變成一些加減乘除運算了。

    b)異步-事件驅動:通過a)的修改,若是還存在一些業務邏輯處理過程是須要長時間才能完成的,那麼就把它做爲一個事件,再拋給其餘組件(可能仍是Disruptor)等待。業務邏輯處理器須要時刻保持最快速度、最高效率,它不能等待任何事情。

    c)每一個業務邏輯處理器是單線程的:你沒有聽錯。其實有了a)b)做爲前提,會發現多線程所帶來的業務層面同步問題將會極大限制BLP效率、增大BLP的複雜度,和BLP的設計(Keep it simple, stupid.)相悖,若是實在想多線程,能夠參照d)。

    d)使用多級業務邏輯處理器:有些像管道模式,上圖的3塊結構能夠以多種方式組合,一個BLP能夠將輸出送往多個Output Disruptor,而這些Disruptor多是另外一些3塊結構的Input Disruptor,即有些BLP是起到分發做用的,另外一些是進行具體業務邏輯計算的。每一個BLP對應一個線程,整個架構可能比上圖複雜不少。

 三、Hello Disruptor

  Disruptor最初是由Java實現的,如今也有C/Cpp和.Net版本,Java版最全更新最快,代碼註釋較多比較好懂。說了這麼多,本節先給出一個測試例子,展現Disruptor的基本用法,例子中用LinkedBlockingQueue和Disruptor分別實現了單一輩子產者+單一消費者存取簡單對象的測試,統計了一下雙方消耗的時間,僅供參考。

  例子中使用Disruptor 3.2.1。不一樣版本間的Disruptor一些術語可能有變化,在該版本中,緩衝區裏的元素被稱做Event,指針(緩衝區的下標)被稱做Sequence,生產者的指針爲RingBuffer.sequencer(private成員),消費者的指針經過ringBufferInstance.newBarrier()獲得。
  測試代碼
//簡單對象:緩衝區中的元素,裏面只有一個value,提供setValue
private class TestObj {
    
    public long value;
    
    public TestObj(long value)
    {
        this.value = value;
    }
    
    public void setValue(long value)
    {
        this.value = value;
    }
    
}

public class Test {

    //待生產的對象個數
    final long objCount = 1000000;
    final long bufSize;//緩衝區大小
    {
        bufSize = getRingBufferSize(objCount);
    }
    
    //獲取RingBuffer的緩衝區大小(2的冪次!加速計算)
    static long getRingBufferSize(long num)
    {
        long s = 2;
        while ( s < num )
        {
            s <<= 1;
        }
        return s;
    }
    
    //使用LinkedBlockingQueue測試
    public void testBlocingQueue() throws Exception
    {
        final LinkedBlockingQueue<TestObj> queue = new LinkedBlockingQueue<TestObj>();
        Thread producer = new Thread(new Runnable() {//生產者
            @Override
            public void run() {
                try{
                    for ( long i=1;i<=objCount;i++ )
                    {
                        queue.put(new TestObj(i));//生產
                    }
                }catch ( InterruptedException e ){
                }
            }
        });
        Thread consumer = new Thread(new Runnable() {//消費者
            @Override
            public void run() {
                try{
                    TestObj readObj = null;
                    for ( long i=1;i<=objCount;i++ )
                    {
                        readObj = queue.take();//消費
                        //DoSomethingAbout(readObj);
                    }
                }catch ( InterruptedException e ){
                }
            }
        });
        
        long timeStart = System.currentTimeMillis();//統計時間
        producer.start();
        consumer.start();
        consumer.join();
        producer.join();
        long timeEnd = System.currentTimeMillis();
        DecimalFormat df = (DecimalFormat) DecimalFormat.getInstance();
        System.out.println((timeEnd - timeStart) + "/" + df.format(objCount) +
                " = " + df.format(objCount/(timeEnd - timeStart)*1000) );
    }
    
    //使用RingBuffer測試
    public void testRingBuffer() throws Exception
    {
        //建立一個單生產者的RingBuffer,EventFactory是填充緩衝區的對象工廠
        //            YieldingWaitStrategy等"等待策略"指出消費者等待數據變得可用前的策略
        final RingBuffer<TestObj> ringBuffer = RingBuffer.createSingleProducer(new EventFactory<TestObj>() {
            @Override
            public TestObj newInstance() {
                return new TestObj(0);
            }
        } , (int)bufSize, new YieldingWaitStrategy());
        //建立消費者指針
        final SequenceBarrier barrier = ringBuffer.newBarrier();
        
        Thread producer = new Thread(new Runnable() {//生產者
            @Override
            public void run() {
                for ( long i=1;i<=objCount;i++ )
                {
                    long index = ringBuffer.next();//申請下一個緩衝區Slot
                    ringBuffer.get(index).setValue(i);//對申請到的Slot賦值
                    ringBuffer.publish(index);//發佈,而後消費者能夠讀到
                }
            }
        });
        Thread consumer = new Thread(new Runnable() {//消費者
            @Override
            public void run() {
                TestObj readObj = null;
                int readCount = 0;
                long readIndex = Sequencer.INITIAL_CURSOR_VALUE;
                while ( readCount < objCount )//讀取objCount個元素後結束
                {
                    try{
                        long nextIndex = readIndex + 1;//當前讀取到的指針+1,即下一個該讀的位置
                        long availableIndex = barrier.waitFor(nextIndex);//等待直到上面的位置可讀取
                        while ( nextIndex <= availableIndex )//從下一個可讀位置到目前能讀到的位置(Batch!)
                        {
                            readObj = ringBuffer.get(nextIndex);//得到Buffer中的對象
                            //DoSomethingAbout(readObj);
                            readCount++;
                            nextIndex ++;
                        }
                        readIndex = availableIndex;//刷新當前讀取到的位置
                    }catch ( Exception ex)
                    {
                        ex.printStackTrace();
                    }
                }
            }
        });
        
        long timeStart = System.currentTimeMillis();//統計時間
        producer.start();
        consumer.start();
        consumer.join();
        producer.join();
        long timeEnd = System.currentTimeMillis();
        DecimalFormat df = (DecimalFormat) DecimalFormat.getInstance();
        System.out.println((timeEnd - timeStart) + "/" + df.format(objCount) +
                " = " + df.format(objCount/(timeEnd - timeStart)*1000) );
        
    }
    
    public static void main(String[] args) throws Exception {
        Test ins = new Test();
        //執行測試
        ins.testBlocingQueue();
        ins.testRingBuffer();
    }

}

測試代碼

 

測試結果:

319/1,000,000 = 3,134,000 //使用LinkedBlockingQueue在319毫秒內存取100萬個簡單對象,每秒鐘能執行313萬個

46/1,000,000 = 21,739,000 //使用Disruptor在46毫秒內存取100萬個簡單對象,每秒鐘能執行2173萬個

平均下來使用Disruptor速度能提升7倍。(不一樣電腦、應用環境下結果可能不一致)

四、隨想:Disruptor、完成端口與Mechanical Sympathy

When pushing performance like this, it starts to become important to take account of the way modern hardware is constructed.

                      —The LMAX Architecture

   「當對性能的追求達到這樣的程度,以至對現代硬件構成的理解變得愈來愈重要。」這句話恰當地形容了Disruptor/LMAX在對性能方面的追求和失敗。咦,失敗?爲何會這麼說呢?Disruptor固然是一個優秀的框架,我說的失敗指的是在開發它的過程當中,LMAX曽試圖提升併發程序效率,優化、使用鎖或藉助其餘模型,可是這些嘗試最終失敗了——而後他們構建了Disruptor。再提問:一個Java程序員在嘗試提升他的程序性能的時候,須要瞭解不少硬件知識嗎?我想不少人都會回答「不須要」,構建Disruptor的過程當中,最初開發人員對這個問題的回答可能也是「不須要」,可是嘗試失敗後他們決定另闢蹊徑。總的看下Disruptor的設計:鎖到CAS、緩衝行填充、避免GC等,我感受這些設計都在刻意「遷就」或者「依賴」硬件設計,這些設計更像是一種「(ugly)hack」(毫無疑問,Disruptor仍是目前最優秀的方案之一)。

  Disruptor我想到了完成端口,完成端口聽說能是Windows上最快的併發網絡「框架」:你只要經過API告訴Windows你想recv哪些socket,而後各個recv操做在內核層面上執行並加入到某個隊列中,最後再使用Worker線程進行處理,大部分工做Windows都爲你作好了,不使用鎖也沒有上下文切換和大量線程,是否是和Disruptor殊途同歸呢?完成端口和Disruptor在追求性能時,都避免使用並行、鎖、多線程等概念,這些概念的出現自有它們的緣由,這裏不用多說,可是爲了性能(考慮到硬件)卻不能充分使用它們,說明在處理併發、並行問題上,硬件和軟件的發展存在不協調,可能馮氏計算機仍是適合單「線程」順序處理信息吧。關於這種不協調,我認爲應該是硬件應該會逐步適應軟件,但也有人提出了有意思的Mechanical Sympathy(連接[6]),至於將來會如何發展就不是這篇blog能討論的了:) 。

(完)

連接:

[1]Disruptor介紹譯文:http://ifeve.com/disruptor/

原文https://code.google.com/p/disruptor/wiki/BlogsAndArticles

[2]Disruptor的GitHub:http://lmax-exchange.github.io/disruptor/

其中http://lmax-exchange.github.io/disruptor/files/Disruptor-1.0.pdf這篇PDF對Disruptor做了很好的闡述。

[3]完成端口:http://blog.csdn.net/piggyxp/article/details/6922277

[4]致敬disruptor:CAS實現高效(僞)無鎖阻塞隊列實踐:http://www.majin163.com/2014/03/24/cas_queue/

[5]Disruptor 源碼分析:http://huangyunbin.iteye.com/blog/1944232

[6]Mechanical Sympathy:http://mechanical-sympathy.blogspot.com/

----------------------------------(我是分割線)----------------------------------

PS1: 轉載請註明做者。

PS2: 下載:Disruptor介紹PPT

PS3:這是我第5個博客(不過前4個都不是技術blog,笑),之後我會盡可能貼一些遇到的問題和思考到這裏,水平有限,歡迎各位指出不足!

 
標籤:  JAVA併發框架Disruptor
好文要頂  關注我  收藏該文   

 

http://ifeve.com/dissecting-disruptor-whats-so-special/

正如名字所說的同樣,它是一個環(首尾相接的環),你能夠把它用作在不一樣上下文(線程)間傳遞數據的buffer。

 

如何使用Disruptor(二)如何從Ringbuffer讀取

http://ifeve.com/dissecting_the_disruptor_how_doi_read_from_the_ring_buffer/

 

Disruptor多topic問題  : 

BatchEventProcessor 仍是 workpool ? 
     
2012-03-26 15:19
 
BatchEventProcessor 仍是 workpool ? :
 

看國外的不少介紹資料,以及官方資料,還有壇主翻譯的關於disruptor資料都有一個圖,就是將日誌/序列化/等等各類event塞到了一個ringbuffer裏邊,我如今困惑的就是這點,一個ringbuffer是否應該放多個topic或者多個類型的event。

例如我如今的流程有兩個:1 核心業務流程處理 (短小快速,併發量高) 2. 業務流程以後的日誌處理


由於日誌一般採用log4j等記錄,同類型事件多線程是無心義的。爲了記錄日誌不影響核心流程的處理速度,每每會把日誌丟到另一個異步環境處理,我之前用隊列解耦就是這麼幹的,可是看了
這個圖,你會發現裏邊有幾種事件,裏邊也包括了日誌event,這樣作還能作到經過解耦來平衡環節壓力嗎?

我看了disruptor的processor風格,一個是workpool,就是均衡負載,多個handler同質化,同一個時間只會被一個handler處理,另一個風格就是併發分支,同一個事件,全部的handler都有機會處理。
若是使用均衡負載,handler根據event類型走處理分支,日誌event可能會吃掉全部的handler線程池資源,那就跟ringbuffer的目的背道而馳了,至關於沒用,直接用線程池是同樣的效果。

仍是用不一樣的barrier?

我是否是有什麼地方理解錯了?還請各位理解disruptor的大俠給予指點^_^。

另外看這幅圖:

這個又是多個disruptor了,呵呵,除了分佈式形成了這種割裂之外,若是輸入和輸出/業務處理都在一臺主機上,是否就會使用同一個disruptor了?呵呵。
disruptor解決的thread之間的數據共享,若是按照這種角度出發,程序的分佈式與否不該該成爲disruptor的個數的依據。
[該貼被kuaiyuelaogong於2012-03-26 15:31修改過]

 

1
 
 
 

2012-04-05 18:33
 

就你的這個問題我是這麼理解的:

ringbuffer裏的事件依次會被數個handler處理,好比說先是核心業務處理,接下來是日子處理。 那麼就應該使用BatchEventProcessor+barrier而不是workpool.

好比說這個例子:



int consumerCount=4;
final SimpleEventHandler[] eventHandler = new SimpleEventHandler[consumerCount];
final BatchEventProcessor<SimpleEvent>[] eventProcessors = new BatchEventProcessor[consumerCount];
for (int i = 0; i < consumerCount; i++) {
eventHandler[i] = new SimpleEventHandler();

SequenceBarrier barrier = null;
if (i > 0) {
barrier = ringBuffer.newBarrier(eventProcessors[i - 1]
.getSequence());
} else {
barrier = ringBuffer.newBarrier();
}
eventProcessors[i] = new BatchEventProcessor<SimpleEvent>(
ringBuffer, barrier, eventHandler[i]);
}



有4個Processor, Processors[1]必須在Processors[0]以後執行。依次類推。。

 

 
 

2012-12-06 13:58
 

個人理解是 日誌event 不必定就在當前的線程裏寫入了,徹底能夠緩存起來或者異步處理,採用這種多消費者的模式,每一個環節的處理時間不該該有太大的差別,不然就可能出現瓶頸

相關文章
相關標籤/搜索