Disruptor爲什麼這麼快

Disruptor 是一個開源而且高效的生產者-消費者框架,很難直接解釋這個框架是作什麼的,可是能夠把這個框架理解成 Java 中的 BlockingQueue。這樣理解起來是否是輕鬆多了,這就是一個生產者-消費者隊列,只不過它的性能要比 BloockingQueue 好不少,號稱單機器能夠有百萬的 TPS。java

Disruptor 的特色

Disruptor 有如下主要三個特色:算法

  • 事件多播
  • 爲事件提早分配內存
  • 無鎖操做

通常咱們在使用隊列的時候,隊列中的消息只會被一個消費者使用,可是在 Disruptor 中,同一個消息能夠被多個消費者同時處理,多個消費者之間是並行的。編程

由於可能在同一時間,一份數據要在多個地方被用到,好比一個請求數據須要同時被存入日誌備份到遠程機器進行業務處理,以下圖所示:數組

若是是一條消息只能被一個消費者處理,那麼上面說的三個處理邏輯就得線性的完成或者放到一個消費者中再異步進行處理。若是能夠把這些操做拆成多個消費者來並行消費同一條消息,處理效率就會提升不少。緩存

這樣要確保一條消息在都被全部消費者處理以後才能開始處理下一條,不能出現消費者在同時處理不一樣的消息,因此須要相似 Java 中 CyclicBarrier 同樣的工具來保證全部消費者可以同時處理下一個消息,Disruptor 中實現了 SequenceBarrier 來完成這個功能。安全

Disruptor 目標是應用於低延遲的環境中。在低延遲的系統中須要減小或者徹底不進行內存分配,在 Java 中,就是要減小垃圾回收所帶來的停頓時間。在 Disruptor 中,使用 RingBuffer 來達成這個目標,在 RingBuffer 中提早建立好對象,後續經過反覆利用這些對象來避免垃圾回收,這個實現是線程安全的。微信

在 Disruptor 中,實現線程安全基本不使用鎖,而是使用 CAS 等無鎖機制來保證線程安全。數據結構

核心概念

在正式瞭解 Disruptor 以前,咱們須要瞭解一些核心的概念,Disruptor 的代碼並不複雜,基本是圍繞這些核心概念來展開的。併發

  • Producer: 數據的生產者,生產者自己與 Disruptor 無關,能夠是任何生產數據的代碼,甚至能夠是一個 for 循環
  • Event: Producer 產生的數據,用戶能夠根據本身的須要自行進行定義
  • RingBuffer: 用來存儲 Event 的數據結構,提早分配好內存,避免在程序運行的過程當中建立對象
  • EventHandler: 消費者,由用戶本身實現
  • Sequence: 用來標識 Disruptor 中的組件,多個組件之間的協同依靠它來實現
  • Sequencer: Disruptor 中的核心機制,實現了核心的併發算法,保證消息在生產者和消費之間正確的進行傳遞
  • SequenceBarrier: 用來保證全部的消費這可以同時處理新的消息
  • WaitStrategy: 消費者等待策略
  • EventProcessor: 將消息傳遞到消費者的具體實現

上面的這些組件組成了 Disruptor,整個框架的代碼量其實不多,應該不到 7000 行,並且代碼很乾淨。代碼基本沒有使用繼承,而是使用了面向接口編程以及組合,因此代碼之間的耦合度很低。框架

RingBuffer 和 Sequencer 是其中最重要的兩個組件,前者用於存儲消息,後者控制消息有序的生產和消費。

Disruptor 的核心目標就是提高程序的吞吐量,因此程序也是圍繞這些目標來實現的,主要作了以下的事情:

  • 減小垃圾回收
  • 讓消息能夠經過被多個消費者並行處理
  • 使用無鎖算法來實現併發
  • 緩存行填充

RingBuffer 是用來存儲消息的容器,內部實現使用了一個數組:

private final Object[] entries;
複製代碼

在 Disruptor 啓動以前,須要指定數據的大小以及,初始化這個數組:

private void fill(EventFactory<E> eventFactory) {
    for (int i = 0; i < bufferSize; i++)
    {
        entries[BUFFER_PAD + i] = eventFactory.newInstance();
    }
}
複製代碼

數組在初始化以後就再也不回收了,全部的消息會循環利用這些已經建立好的對象,因此這是一個循環數組,RingBuffer 的實現以下圖所示:

那麼在對循環數組進行操做的時候,須要對生產者和消費者對數組的訪問進行控制。一方面,由於 Disruptor 支持多個生產者和多個消費者,因此要保證線程安全,爲了保證性能,並無使用鎖來保證線程安全(只有 BlockingWaitStrategy 使用了鎖),在對 RingBuffer 的訪問控制中,主要使用 CAS 來完成:

protected final E elementAt(long sequence) {
    return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT));
}
複製代碼

另外一方面對於生產者和消費者的速度進行訪問,生產者不能對未消費的消息進行寫入,這樣會形成消息的丟失,在 RingBuffer 中,沒有使用 head 和 tail 指針進行控制,而是經過 Sequence 來進行控制,生產者寫入數據的時候,會經過當前的序列號加上須要寫入的數據量,與消費者的位置進行對比,看看是否有足夠的空間進行寫入。

在 RingBuffer 中,有這樣一段代碼:

abstract class RingBufferPad {
    protected long p1, p2, p3, p4, p5, p6, p7;
}
複製代碼

這段代碼就稱之爲緩存行填充,說到這個就須要瞭解 CPU 的緩存機制,由於內存的訪問速度與 CPU 的速度相差太遠,因此在 CPU 和內存之間還加上了 CPU 緩存,如今通常會加上 3 級,第一級和第二級是 CPU 核獨享的,第三級緩存則是多個核之間共享。

在不少狀況下,咱們想把一些不會變化的值緩存到 CPU 緩存中,好比 Java 中的 final 變量,這樣就能夠最大化的利用 CPU 緩存的速度,可是 CPU 緩存有一個特色,緩存數據的時候會以 CPU 緩存行爲單位,因此若是一個 final 變量的附近定義了會變化的變量,每次變量變化的時候,數據就會從新被寫回到內存中,那麼 final 變量一樣也不會再緩存在 CPU 緩存中了,因此在緩存行的先後部分都須要填充,確保不會緩存到其餘的數據:

abstract class RingBufferPad {
    // 填充緩存行的前部分
    protected long p1, p2, p3, p4, p5, p6, p7;
}
abstract class RingBufferFields extends RingBufferPad{ 
    ......
    // 下面須要被緩存到 CPU 緩存的數據
    private final long indexMask; 
    private final Object[] entries; 
    protected final int bufferSize;
    protected final Sequencer sequencer; 
    ...... 
}
public final class RingBuffer extends RingBufferFields implements Cursored, EventSequencer, EventSink{ 
    ...... 
    // 填充緩存行的後部分
    protected long p1, p2, p3, p4, p5, p6, p7; 
    ......
}
複製代碼

這樣 RingBufferFields 的數據被加載到 CPU 緩存中以後,就不會再須要從內存中讀取了。

Disruptor 經過各方面的措施來提高性能,這就是爲何這麼快的緣由。

原文

關注微信公衆號,聊點其餘的

相關文章
相關標籤/搜索