過氣網紅Disruptor源碼分析

Disruptor

Disruptor是java圈子裏著名的併發隊列,它是一個基於生產者-消費者模型,並優化了cpu僞共享的高性能隊列。要理解disruptor須要理解一下幾個概念:RingBufferSequenceSequencerSequenceBarrierdisruptorjava

RingBuffer

RingBuffer是disruptor中用來存數據的環形數組。Disruptor的基本數據結構就是一個循環隊列。學習過數據結構的同窗都知道循環隊列是一個基於數組的隊列,用一個變量來表示隊頭位置下標,另外一個變量來表示隊尾位置下標。當位置下標到達數組末尾的時候,下標的下一個位置就移動到數組開頭,例如jdk中的java.util.concurrent.ArrayBlockingQueue,它用putIndex來表示隊尾位置,用takeIndex來表示隊頭位置,當隊頭或隊尾到達數組末尾的時候,被置爲數組開頭位置。下面是java.util.concurrent.ArrayBlockingQueue中的代碼。數組

/** items index for next take, poll, peek or remove */
int takeIndex;  隊頭位置
/** items index for next put, offer, or add */
int putIndex;   隊尾位置
private void enqueue(E x) {                 入隊方法
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    final Object[] items = this.items;
    items[putIndex] = x;                   隊尾入隊數據
    if (++putIndex == items.length)        若是已經到了數組的最後一個位置
        putIndex = 0;                      位置置於數組開頭
    count++;
    notEmpty.signal();
}
private E dequeue() {
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];           隊頭出隊數據
    items[takeIndex] = null;              若是已經到了數組的最後一個位置
    if (++takeIndex == items.length)      位置置於數組開頭
        takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    notFull.signal();
    return x;
}

Sequence

Sequence是disruptor對隊列中表示位置的下標位置的抽象。爲何要用一個類而不是一個整型來表示數組的下標位置呢?這是由於disruptor在試圖解決cpu僞共享問題。CPU僞共享簡單講就是在不一樣cpu核的多個線程他們分別在本身的cache中緩存了同一個變量,當一個線程修改了這個變量將會使用MESI協議將別的線程緩存了相同變量的cache-line失效。若是多個線程高頻修改一個變量可能會相互影響使得cpu緩存的做用大打折扣。那怎麼才能儘可能避免這種狀況呢?disruptor的作法是讓每一個消費者都維護着本身的sequence,而且sequence作了cache-line填充,使得每一個sequence將佔用整個cache-line。通常來講一個cache-line是64個字節,用一個long來表示位置,那麼就須要8個long。所以,disruptor在表示下標的long變量先後都放置了7個long,這樣當讀取value時,不管從哪一個方向讀取64個字節都能保證cache-line被填充。下面是com.lmax.disruptor.Sequence的代碼。緩存

class LhsPadding
{
    protected long p1, p2, p3, p4, p5, p6, p7;            填充
}

class Value extends LhsPadding
{
    protected volatile long value;                        真正的值
}

class RhsPadding extends Value
{
    protected long p9, p10, p11, p12, p13, p14, p15;      填充
}

Sequencer

Sequencer是用來協調生產者進度和消費者進度的。消費者不能跑到生產者前面去了,生產者也不能超過消費者一圈。AbstractSequencer有3個重要的參數,cursor表示的生產者的位置,gatingSequences表示的是末端消費者的位置,waitstrategy表示當沒有數據給消費者時,消費者的等待行爲。下面是com.lmax.disruptor.AbstractSequencer的代碼。數據結構

protected final WaitStrategy waitStrategy;                                           等待策略
protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);      生產者位置
protected volatile Sequence[] gatingSequences = new Sequence[0];                     消費者位置

生產者在生產數據的時候有2個步驟,第一步,獲取新數據的位置;第二步,插入數據併發布,發佈後的數據就能夠被消費了。其中第一步對應着next方法,第二步對應着publish方法。依據生產者是單線程的仍是多線程的,Sequencer被分爲MultiProducerSequencer和SingleProducerSequencer,這2種Sequencer大致邏輯類似但又有差異。
MultiProducerSequencer的next方法中,首先獲取生產者位置並加上n再減去buffersize,將他和消費者位置比較,若是大於則代表生產者超過了消費者一圈,這是不可行的,不然是可行的就用cas更新生產者位置。獲取消費者位置的時候並非從gatingSequences中直接獲取最小的那個,而是經過一個gatingSequenceCache來獲取的,這是由於sequence是一個頻繁改變,被多個線程操做的對象,而且每次去獲取都要去找最小值,爲了減小沒必要要的獲取,每次從gatingSequences中獲取一次最小值時將其緩存起來,在生產者沒有追到這個緩存的最小值前,能夠不用去獲取最新的最小值。當追上這個最小值的時候,就須要從gatingSequences中獲取最小值,若是生產者仍是超過了一圈那麼就暫停一下,再重複以上操做,不然就將最小值賦值給gatingSequenceCache並重復以上操做。下面是com.lmax.disruptor.MultiProducerSequencer#next(int)的代碼。多線程

do
{
    current = cursor.get();                                                         生產者當前位置
    next = current + n;                                                             插入n個新數據後的位置

    long wrapPoint = next - bufferSize;                                             新位置減去ringbuffer長度
    long cachedGatingSequence = gatingSequenceCache.get();                          消費者位置

    if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current)         wrapPoint > cachedGatingSequence表示生產者超過了消費者一圈
    {
        long gatingSequence = Util.getMinimumSequence(gatingSequences, current);    獲取最新的消費者位置

        if (wrapPoint > gatingSequence)                                             若是仍是超過一圈則等待並重試
        {
            LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
            continue;
        }

        gatingSequenceCache.set(gatingSequence);                                   更新緩存的消費者位置
    }
    else if (cursor.compareAndSet(current, next))                                  更新生產者位置
    {
        break;
    }
}
while (true);

SingleProducerSequencer的next方法的邏輯和MultiProducerSequencer差很少。不一樣的是因爲只有一個生產者線程,所以SingleProducerSequencer直接使用了一個long的nextValue來表示可生產數據的位置,一個long的cachedValue來表示消費者位置緩存。下面是com.lmax.disruptor.SingleProducerSequencer#next(int)的代碼。併發

long nextValue = this.nextValue;                                                  獲取可生產數據的最小位置               

long nextSequence = nextValue + n;                                                插入n個新數據後的位置
long wrapPoint = nextSequence - bufferSize;                                       新位置減去ringbuffer長度
long cachedGatingSequence = this.cachedValue;                                     消費者位置

if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)         wrapPoint > cachedGatingSequence表示生產者超過了消費者一圈
{
    cursor.setVolatile(nextValue);  // StoreLoad fence

    long minSequence;
    while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))  獲取消費者最新位置,若是仍是超過一圈則等待並重試
    {
        LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
    }

    this.cachedValue = minSequence;                                              保存消費者位置
}

this.nextValue = nextSequence;                                                   保存生產者位置

2種Sequencer的獲取新數據位置的邏輯類似,可是發佈數據的邏輯卻徹底不同。SingleProducerSequencer的發佈邏輯較爲簡單,publish方法中直接更新生產者位置cursor,注意cursor和nextValue的差異,nextValue用來獲取新數據的位置,而cursor是已經發布的數據的位置,對於消費者來講cursor纔是真正的生產者位置。下面是com.lmax.disruptor.SingleProducerSequencer#publish(long)的代碼。ide

@Override
public void publish(long sequence)
{
    cursor.set(sequence);                  更新生成者位置
    waitStrategy.signalAllWhenBlocking();  通知等待的消費者消費
}

這種方式用在MultiProducerSequencer上顯然是不合適的,由於一個生產者發佈可能會致使其餘生產者也發佈了。事實上,MultiProducerSequencer在next方法中就直接更新了cursor。MultiProducerSequencer用一個長度和ringbuffer相同的數組availableBuffer來跟蹤數據的發佈狀態。下面是com.lmax.disruptor.MultiProducerSequencer發佈相關的代碼。函數

@Override
public void publish(final long sequence)
{
    setAvailable(sequence);                             設置改位置的狀態
    waitStrategy.signalAllWhenBlocking();
}
private void setAvailable(final long sequence)
{
    setAvailableBufferValue(calculateIndex(sequence), calculateAvailabilityFlag(sequence));
}
private void setAvailableBufferValue(int index, int flag)
{
    long bufferAddress = (index * SCALE) + BASE;
    UNSAFE.putOrderedInt(availableBuffer, bufferAddress, flag);
}

能夠看到使用cursor來獲取消費者可消費的最大位置是不合適的了。在Sequencer中有個getHighestPublishedSequence(long lowerBound, long availableSequence)方法用來返回能夠被消費的最大位置。對於SingleProducerSequencer因爲是發佈時更新cursor,所以能夠直接返回availableSequence;對於MultiProducerSequencer是在availableBuffer的[lowerBound,availableSequence]區間上找到最小的已發佈位置。性能


SequenceBarrier

SequenceBarrier是協調消費者的進度和它依賴的進度的。這裏說依賴是由於消費者自己是有層級的,第一層的消費者依賴(不超過)生產者的進度,第二層的消費者依賴(不超過)第一層的消費進度。從構造方法能夠看出當傳入一個長度爲0的dependentSequences數組時,該barrier的dependentSequence就是生產者的位置。若是大於0就用FixedSequenceGroup包裝一下dependentSequences數組,FixedSequenceGroup的get方法返回的就是dependentSequences數組的最小值。下面是com.lmax.disruptor.ProcessingSequenceBarrier的構造函數。學習

ProcessingSequenceBarrier(
    final Sequencer sequencer,
    final WaitStrategy waitStrategy,
    final Sequence cursorSequence,
    final Sequence[] dependentSequences)
{
    this.sequencer = sequencer;
    this.waitStrategy = waitStrategy;
    this.cursorSequence = cursorSequence;
    if (0 == dependentSequences.length)            若是dependentSequences長度爲0,就依賴生產者進度
    {
        dependentSequence = cursorSequence;
    }
    else
    {
        dependentSequence = new FixedSequenceGroup(dependentSequences);
    }
}

SequenceBarrier的核心方法就是waitFor(final long sequence)。該方法是用來等待入參sequence變成可消費狀態的。使用waitStrategy來等待並獲取一個有效的sequence,在waitstrategy的全部實現中,這個返回值其實就是dependentSequence。最後經過Sequencer的getHighestPublishedSequence方法獲取[sequence,dependentSequence]區間內可消費的最大位置。下面是com.lmax.disruptor.ProcessingSequenceBarrier#waitFor(final long sequence)的代碼。

public long waitFor(final long sequence)
    throws AlertException, InterruptedException, TimeoutException
{
    checkAlert();
    等待sequence有效並返回dependentSequence位置
    long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);        

    if (availableSequence < sequence)
    {
        return availableSequence;
    }
    返回[sequence,dependentSequence]最小已發佈位置
    return sequencer.getHighestPublishedSequence(sequence, availableSequence);
}

WaitStrategy是消費者等待消費的動做。判斷sequence是否有效的方法是和dependentSequence比較,當且僅當sequence小於等於dependentSequence時有效。 好比以下代碼:

while ((availableSequence = dependentSequence.get()) < sequence)
{
    等待
}
return availableSequence;

消費者

Disruptor的消費者有2種一種是WokerHandler一種是EventHandler。能夠設置多個WokerHandler,多個WokerHandler會一塊兒去處理全部的數據,也能夠設置多個EventHandler,多個EventHandler會分別處理全部的數據。
WokerHandler是由com.lmax.disruptor.dsl.Disruptor#handleEventsWithWorkerPool(final WorkHandler<T>... workHandlers)方法用來建立。該方法會建立一個workPool,workPool裏面有sequenceBarrier,除此外workPool裏還有一個workSequence。每一個workHandler會建立一個workProcessor,workSequence也會傳入workProcessor的構造方法。下面是com.lmax.disruptor.WorkerPool部分代碼。

private final AtomicBoolean started = new AtomicBoolean(false);
private final Sequence workSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
private final RingBuffer<T> ringBuffer;
// WorkProcessors are created to wrap each of the provided WorkHandlers
private final WorkProcessor<?>[] workProcessors;
@SafeVarargs
public WorkerPool(
    final RingBuffer<T> ringBuffer,
    final SequenceBarrier sequenceBarrier,
    final ExceptionHandler<? super T> exceptionHandler,
    final WorkHandler<? super T>... workHandlers)
{
    this.ringBuffer = ringBuffer;
    final int numWorkers = workHandlers.length;
    workProcessors = new WorkProcessor[numWorkers];

    for (int i = 0; i < numWorkers; i++)
    {
        workProcessors[i] = new WorkProcessor<>(            建立WorkHandler執行器
            ringBuffer,
            sequenceBarrier,                                協調消費進度
            workHandlers[i],
            exceptionHandler,
            workSequence);                                  全部WorkHandler使用一個workSequence
    }
}

WorkProcessor能夠當作是workHandler的執行者,他的核心方法是run。run方法中使用CAS從workSequence中獲取要消費的下標,能夠看出workPool中的全部workProcessor是從同一個workSequence中獲取的,所以一個workPool裏的workHandler是共同消費數據的。當成功獲取到須要nextSequence後,將其於cachedAvailableSequence比較,若是小於等於cachedAvailableSequence表示能夠消費,不然使用sequenceBarrier等待並從新獲取依賴(能夠先理解爲生產者)的最大可消費位置。這裏cachedAvailableSequence和sequencer中提到了gatingSequenceCache思路是同樣的,爲了避免用每次都去獲取,每次獲取後將其保存一塊兒來,消費者還沒消費到這個位置的時候,能夠不用去獲取,由於這時消費者必定沒有超過依賴。

while (true)
{
    try
    {
        省略註解
        if (processedSequence)
        {
            processedSequence = false;
            do
            {
                nextSequence = workSequence.get() + 1L;                            從workSequence的下一個位置
                sequence.set(nextSequence - 1L);
            }
            while (!workSequence.compareAndSet(nextSequence - 1L, nextSequence));  競爭這個位置
        }
                                                                           
        if (cachedAvailableSequence >= nextSequence)                               若是這個位置小於緩存的依賴的位置
        {
            event = ringBuffer.get(nextSequence);                                  獲取這個位置的數據並消費
            workHandler.onEvent(event);
            processedSequence = true;
        }
        else
        {
            cachedAvailableSequence = sequenceBarrier.waitFor(nextSequence);      等待並獲取最大可消費位置
        }
    }
    省略部分代碼
}

EventHandler是由com.lmax.disruptor.dsl.Disruptor#handleEventsWith(final EventHandler<? super T>... handlers)方法建立的。每個EventHandler會被建立爲一個BatchEventProcessor。BatchEventProcessor的核心方法是processEvents方法。該方法中就是使用sequencerBarrier去獲取了依賴的最新位置,而後從直接當前位置一直消費到依賴最新的位置。這和WorkProcessor是不一樣的,由於BatchEventProcessor中的sequence各自增加互不影響,而WorkProcessor的sequence都是從workSequence中去爭搶,因此多個EventHandler是分別消費全部的數據。下面是com.lmax.disruptor.BatchEventProcessor#processEvents方法代碼。

while (true)
{
    try
    {
        final long availableSequence = sequenceBarrier.waitFor(nextSequence);                直接獲取最大可消費位置
        if (batchStartAware != null)
        {
            batchStartAware.onBatchStart(availableSequence - nextSequence + 1);
        }

        while (nextSequence <= availableSequence)                                           直接消費到最大可消費位置
        {
            event = dataProvider.get(nextSequence);
            eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
            nextSequence++;
        }

        sequence.set(availableSequence);                                                   一匹消費完後才更新消費進度
    }

Disruptor中handleEventsWit方法和handleEventsWithWorkerPool方法建立ProcessingSequenceBarrier時傳入的dependentSequences都是長度爲0的Sequence數組,這樣建立的ProcessingSequenceBarrier的dependentSequences就是生產者的位置,這樣建立出來的消費者就是依賴於生產者進度的。這2個方法都返回EventHandlerGroup,它包含了表示消費者進度的Sequence數組,當使用EventHandlerGroup建立消費者時就會使用該Sequence數組做爲參數建立ProcessingSequenceBarrier,這樣建立出來的消費者就會依賴前一個消費者的消費進度。Disruptor總體運行以下圖所示:
disruptor 其中生產者的位置是7,生產者的gatingSeq指向消費者依賴圖中的最末端的消費者的seq,表示生產者不能超過最末端的消費者;workpool有2個workhandler分別在2,3,workpool的seqbarrier指向生產者,表示workpool不能超過生產者;eventhandler目前消費到位置是1,他的seqbarrier指向workpool表示其消費進度不能超過workpool。

相關文章
相關標籤/搜索