前兩篇文章都是從github wiki中翻譯而來,旨在可以快速瞭解和上手使用Disruptor。可是爲了可以掌握該技術的核心思想,停留在使用層面還遠遠不夠,須要瞭解其設計思想,實現原理,故這篇從源碼層面分析其實現原理。html
Disruptor中的術語很是多,這個在系列的第一篇已經介紹disruptor介紹。爲了可以更加清晰而有條理的閱讀源碼,首先分析各個術語描述的組件的源碼,下篇文章再將其串聯起來分析:git
Disruptor做爲高性能的線程間傳遞數據的數據庫,必然須要存儲數據(即數據或者事件)。Disruptor中使用叫作RingBuffer的數據結構來存儲數據,並將其抽象爲類RingBuffer,提供操做數據的行爲。github
RingBuffer內部的數據結構是一個環形緩衝區,以下圖:算法
生產者將數據存入環形緩衝區,消費者隨後從環形緩衝區取出數據處理。數據庫
對於該數據結構的表示,相信你們應該再熟悉不過,數組。RingBuffer內部包含了Object[]來表示環形緩衝區,存儲Disruptor的數據:數組
private final long indexMask; // 該數組表示環形緩衝 private final Object[] entries; protected final int bufferSize; protected final Sequencer sequencer;
數組的自然優點就是順序訪問效率很是高,可是對於數組的讀寫訪問須要維護其下標,表示讀寫的位置。緩存
Disruptor將下標的表示抽象成Sequence類表示,用Sequence來記錄讀寫位置。爲何單獨抽象成Sequence來表示?緣由有兩點:安全
普通的Long類型沒法保證線程安全,單獨使用AtomicLong表示位置又存在僞共享問題(關於僞共享,這裏不作詳細介紹)。因此抽象Sequence類,幷包含實際記錄位置的value值。數據結構以下:數據結構
對於如何它是如何保證線程安全和解決僞共享問題能夠看Javadocs中的描述:併發
Concurrent sequence class used for tracking the progress of the ring buffer and event processors. Support a number of concurrent operations including CAS and order writes.
Also attempts to be more efficient with regards to false sharing by adding padding around the volatile field.
Sequence用於併發場景下追蹤RingBuffer和EventProcessor的進度。支持多種併發操做,如:CAS和順序寫。也嘗試利用填充方式包圍volatile value解決false sharing問題。
須要明白的是這裏的填充是填充緩存行,保證value能獨處一個緩存行中(即不和無依賴的變量同處一個緩存行中)。
再來看下它是如何進行填充的:
// 左邊填充 class LhsPadding { protected long p1, p2, p3, p4, p5, p6, p7; } // volatile value,保證可見性和有序性 class Value extends LhsPadding { protected volatile long value; } // 右邊填充 class RhsPadding extends Value { protected long p9, p10, p11, p12, p13, p14, p15; } // 繼承RhsPadding,從而有緩存填充的volatile value public class Sequence extends RhsPadding{ ... }
經過以上方式保證:在一個緩存行中,只有填充的數據和value。其中value用於記錄RingBuffer的位置。關於其餘的行爲實現,基本徹底和AtomicLong的實現同樣,均使用Unsafe類提供的CAS操做實現去線程安全的操做,如:
public void set(final long value) { UNSAFE.putOrderedLong(this, VALUE_OFFSET, value); }
經過UNSAFE提供的順序寫API設置,putOrderLong將會在寫和以前的任何存儲之間插入Store/Store barrier,保證此次write不會被重排。
如相似AtomicLong的CAS操做:
public boolean compareAndSet(final long expectedValue, final long newValue) { return UNSAFE.compareAndSwapLong(this, VALUE_OFFSET, expectedValue, newValue); }
Sequence提供兩種類型的構造函數:
// 使用默認值-1L初始化Sequence public Sequence() { this(INITIAL_VALUE); } // 使用指定的參數構造Sequence public Sequence(final long initialValue) { UNSAFE.putOrderedLong(this, VALUE_OFFSET, initialValue); }
RingBuffer用於存儲數據,可是不少行爲,如:控制寫入數據至RingBuffer,控制讀取等等行爲,等待控制等都是在Sequencer中實現。Sequencer是Disruptor的核心。
其中控制圖以下:
Sequencer中有兩個很是重要的Sequence。一種是用來記錄生產者的位置cursor,另外一種用來記錄消費者的位置gatingSequence。Sequencer須要控制生產者遊標cursor沿着RingBuffer旋轉方向不能超過覆蓋消費者消費者的gatingSequence。
對於生產者而言,每次生產存入數據時,須要判斷cursor + 1是否會覆蓋gatingSequence。若是不覆蓋,則能夠寫入數據。若是抵達消費者的Sequence邊界,則須要使用相應的等待策略等待,等待有空的可用槽位寫入。
對於消費者而言,每次消費時,須要檢查是否有課消費的數據,只須要檢查自身的Sequence和cursor的大小關係便可。
Sequencer的協調做用看下圖描述:
Disrutpor中根據場景不一樣分爲兩類Sequencer。單生產者使用SingleProducerSequencer,多生產者使用MultiProducerSequencer。Sequencer的UML類圖以下:
最頂層抽象了Sequencer接口,定義了Sequencer的基本行爲:
首先分析Sequencer的數據結構,其中成員域都在AbstractSequencer中定義:
// 原子引用更新器 private static final AtomicReferenceFieldUpdater<AbstractSequencer, Sequence[]> SEQUENCE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(AbstractSequencer.class, Sequence[].class, "gatingSequences"); // RingBuffer的大小 protected final int bufferSize; // 等待策略 protected final WaitStrategy waitStrategy; // RingBuffer遊標,初始化值爲-1L protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); // 門控Sequence,即消費者Sequence,消費者可能會存在多個 protected volatile Sequence[] gatingSequences = new Sequence[0];
從包含的成員域也能夠看出其實Disruptor的核心。cursor Sequence用於做爲RingBuffer的遊標,表示RingBuffer目前最大的可用的數據槽位。gatingSequences是消費者消費的位置Sequence,Sequencer依此進行控制生產者不能超過而覆蓋未被消費的數據。waitStrategy主要用於建立SequenceBarrier,消費者須要依此策略進行wait。
Sequencer在Disruptor中起到的核心做用即是控制協調Sequence,而且作追蹤使用。
SequenceBarrier的屏障主要做用於消費者的Sequence,控制消費者等待生產者生產可達的數據即cursor Sequence。
前一種模式是消費者依賴生產者最大可達數據。還有另外一種模式是消費者依賴圖(關於這個在第一篇Disruptor介紹中已經詳細介紹),此時SequenceBarrier將控制消費者的Sequence不超過另外被依賴消費者的Sequence。
在Disruptor中關於SequenceBarrier有兩個很是重要的行爲:
// 等待指定sequence的slot可用 long waitFor(long sequence) throws AlertException, InterruptedException, TimeoutException; // 獲取RingBuffer的cursor long getCursor();
Disruptor中關於SequenceBarrier的實現只有ProcessingSequenceBarrier其一個。其中結構以下:
SequenceBarrier主要處在消費者和Sequencer之間,用於協調消費者與cursor Sequence,使用waitStraty策略協調。
再來看下其包含的成員域:
// SequneceBarrier使用的等待策略 private final WaitStrategy waitStrategy; // SequenceBarrier依賴的Sequence,取決於消費者依賴圖 // 要麼是依賴cursor,要麼依賴其餘消費者Sequence private final Sequence dependentSequence; private volatile boolean alerted = false; // cursor Sequence private final Sequence cursorSequence; private final Sequencer sequencer;
主要屏障的算法流程:消費者每次請求消費指定Sequence位置的數據時,SequenceBarrier發揮屏障做用,對其Sequence與依賴的Sequence比較。若是dependentSequence大於當前消費的Sequence,則返回不大於dependentSequence的最大可消費的位置。
上圖中展現了上述全部的兩種模型,消費者依賴生產者和消費者依賴消費者。Consumer A的SequenceBarrier A依賴cursor Sequence;Consumer B的SequenceBarrier B依賴Consumer A的gating Sequence。
EventProcessor和EventHandler都是對消費端而言。其中EventProcessor由Disruptor內部使用,循環從RingBuffer中獲取EventData。EventHandler由用戶自實現的業務邏輯,處理消費的EventData。他們之間的關係是,EventHandler做爲回調接口,EventProcessor將從RingBuffer消費者的Event傳遞給Handler處理。
在Disruptor中,EventProcessor的實現有三類:
其中BatchEventProcessor是最頻繁使用,這裏具體看下它的數據結構:
// Sequence屏障,用於處理Sequene之間的依賴關係 private final SequenceBarrier sequenceBarrier; // 用於定義的EventHandler private final EventHandler<? super T> eventHandler; // 消費者端的Sequence,用於標記消費者的位置 private final Sequence sequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
Disruptor中使用RingBuffer存儲數據,實現消費者和生產者之間的數據交互。生產-消費模型中必然有等待,Disruptor也不例外。只不過Disrutor的優異之處在於提供了多種等待方式供用戶針對各類應用場景進行選擇。
Note:
目前Disruptor只針對消費者等待生產者時應用了等待策略,對於生產者等待消費者並未使用WaitStrategy。
當消費者消費到可達的最大Sequence位置時,即須要等待生產者生產數據,這時各類WaitStrategy便油然而生:
關於各類策略的語義和使用場景,上篇文章disruptor使用中已經詳細介紹,這裏再也不贅述。這裏着重分析每種策略的實現原理。
BlockingWaitStrategy
當無可達事件消費時,使用該策略,消費者將發生阻塞直到有事件時,消費者再繼續運行:
public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier) throws AlertException, InterruptedException long availableSequence; if (cursorSequence.get() < sequence) { lock.lock(); try { while (cursorSequence.get() < sequence) { barrier.checkAlert(); processorNotifyCondition.await(); } } finally { lock.unlock(); } } while ((availableSequence = dependentSequence.get()) < sequence) { barrier.checkAlert(); ThreadHints.onSpinWait(); } return availableSequence; }
策略中使用Java的工具Lock和Condition實現循環等待。當有生產者發佈事件時,將調用通知:
@Override public void signalAllWhenBlocking() { lock.lock(); try { processorNotifyCondition.signalAll(); } finally { lock.unlock(); } }
BusySpinWaitStrategy
該策略是忙等策略,當無事件消費者,將一直處於循環運行,檢測是否有事件:
@Override public long waitFor( final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier) throws AlertException, InterruptedException { long availableSequence; while ((availableSequence = dependentSequence.get()) < sequence) { barrier.checkAlert(); ThreadHints.onSpinWait(); } return availableSequence; }
SleepingWaitStrategy
使用該策略時,當無事件可消費,將睡眠指定的時間:
@Override public long waitFor( final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier) throws AlertException { long availableSequence; int counter = retries; while ((availableSequence = dependentSequence.get()) < sequence) { counter = applyWaitMethod(barrier, counter); } return availableSequence; } private int applyWaitMethod(final SequenceBarrier barrier, int counter) throws AlertException { barrier.checkAlert(); if (counter > 100) { --counter; } else if (counter > 0) { --counter; Thread.yield(); } else { LockSupport.parkNanos(sleepTimeNs); } return counter; }
其中使用LockSupport.parkNanos讓消費者線程睡眠指定時間。若是一直無事件可消費,將循環睡眠,直到有事件可消費爲止。
YieldingWaitStrategy
該策略使用Thread yield方式,置換出CPU給其餘線程的方式達到等待:
@Override public long waitFor( final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier) throws AlertException, InterruptedException { long availableSequence; int counter = SPIN_TRIES; while ((availableSequence = dependentSequence.get()) < sequence) { counter = applyWaitMethod(barrier, counter); } return availableSequence; } private int applyWaitMethod(final SequenceBarrier barrier, int counter) throws AlertException { barrier.checkAlert(); if (0 == counter) { Thread.yield(); } else { --counter; } return counter; }
其中使用Thread.yield()放棄CPU的使用,讓其餘線程可以使用。
Disruptor中的策略不少,可是大多數狀況仍然使用Block策略,只有對嚴格要求低延時且CPU資源充足的狀況纔會使用忙等策略。
以上介紹了不少圍繞着RingBuffer的組件,若是將其裝配組合,讓其運行起來,估計要寫不少樣板代碼且比較複雜。爲了可以讓其簡單且易用,這裏使用了DSL(Driven Specific Language)風格構建了Disruptor類幫助可以快速構建。
PS:
實際上,使用了構造者模式外觀模式,可以藉助Disruptor快速構建RingBuffer及其組件。經過組合裝配這些組件,造成Disruptor。
Disruptor中持有RingBuffer和消費者信息,幫助完成快速構建高性能隊列。
Disruptor高性能隊列中涉及到衆多組件,本篇文章主要對其中的生產端和消費端以及處於中間的存儲RingBuffer作了原理性分析。在這篇文章的基礎上,下一篇將對其串聯起來,從源碼角度深刻分析其實現。