Disruptor
Disruptor是java圈子裏著名的併發隊列,它是一個基於生產者-消費者模型,並優化了cpu僞共享的高性能隊列。要理解disruptor須要理解一下幾個概念:RingBuffer,Sequence,Sequencer,SequenceBarrier。 java
RingBuffer
RingBuffer是disruptor中用來存數據的環形數組。Disruptor的基本數據結構就是一個循環隊列。學習過數據結構的同窗都知道循環隊列是一個基於數組的隊列,用一個變量來表示隊頭位置下標,另外一個變量來表示隊尾位置下標。當位置下標到達數組末尾的時候,下標的下一個位置就移動到數組開頭,例如jdk中的java.util.concurrent.ArrayBlockingQueue,它用putIndex來表示隊尾位置,用takeIndex來表示隊頭位置,當隊頭或隊尾到達數組末尾的時候,被置爲數組開頭位置。下面是java.util.concurrent.ArrayBlockingQueue中的代碼。數組
/** items index for next take, poll, peek or remove */ int takeIndex; 隊頭位置 /** items index for next put, offer, or add */ int putIndex; 隊尾位置
private void enqueue(E x) { 入隊方法 // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; 隊尾入隊數據 if (++putIndex == items.length) 若是已經到了數組的最後一個位置 putIndex = 0; 位置置於數組開頭 count++; notEmpty.signal(); }
private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; 隊頭出隊數據 items[takeIndex] = null; 若是已經到了數組的最後一個位置 if (++takeIndex == items.length) 位置置於數組開頭 takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal(); return x; }
Sequence
Sequence是disruptor對隊列中表示位置的下標位置的抽象。爲何要用一個類而不是一個整型來表示數組的下標位置呢?這是由於disruptor在試圖解決cpu僞共享問題。CPU僞共享簡單講就是在不一樣cpu核的多個線程他們分別在本身的cache中緩存了同一個變量,當一個線程修改了這個變量將會使用MESI協議將別的線程緩存了相同變量的cache-line失效。若是多個線程高頻修改一個變量可能會相互影響使得cpu緩存的做用大打折扣。那怎麼才能儘可能避免這種狀況呢?disruptor的作法是讓每一個消費者都維護着本身的sequence,而且sequence作了cache-line填充,使得每一個sequence將佔用整個cache-line。通常來講一個cache-line是64個字節,用一個long來表示位置,那麼就須要8個long。所以,disruptor在表示下標的long變量先後都放置了7個long,這樣當讀取value時,不管從哪一個方向讀取64個字節都能保證cache-line被填充。下面是com.lmax.disruptor.Sequence的代碼。緩存
class LhsPadding { protected long p1, p2, p3, p4, p5, p6, p7; 填充 } class Value extends LhsPadding { protected volatile long value; 真正的值 } class RhsPadding extends Value { protected long p9, p10, p11, p12, p13, p14, p15; 填充 }
Sequencer
Sequencer是用來協調生產者進度和消費者進度的。消費者不能跑到生產者前面去了,生產者也不能超過消費者一圈。AbstractSequencer有3個重要的參數,cursor表示的生產者的位置,gatingSequences表示的是末端消費者的位置,waitstrategy表示當沒有數據給消費者時,消費者的等待行爲。下面是com.lmax.disruptor.AbstractSequencer的代碼。數據結構
protected final WaitStrategy waitStrategy; 等待策略 protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); 生產者位置 protected volatile Sequence[] gatingSequences = new Sequence[0]; 消費者位置
生產者在生產數據的時候有2個步驟,第一步,獲取新數據的位置;第二步,插入數據併發布,發佈後的數據就能夠被消費了。其中第一步對應着next方法,第二步對應着publish方法。依據生產者是單線程的仍是多線程的,Sequencer被分爲MultiProducerSequencer和SingleProducerSequencer,這2種Sequencer大致邏輯類似但又有差異。
MultiProducerSequencer的next方法中,首先獲取生產者位置並加上n再減去buffersize,將他和消費者位置比較,若是大於則代表生產者超過了消費者一圈,這是不可行的,不然是可行的就用cas更新生產者位置。獲取消費者位置的時候並非從gatingSequences中直接獲取最小的那個,而是經過一個gatingSequenceCache來獲取的,這是由於sequence是一個頻繁改變,被多個線程操做的對象,而且每次去獲取都要去找最小值,爲了減小沒必要要的獲取,每次從gatingSequences中獲取一次最小值時將其緩存起來,在生產者沒有追到這個緩存的最小值前,能夠不用去獲取最新的最小值。當追上這個最小值的時候,就須要從gatingSequences中獲取最小值,若是生產者仍是超過了一圈那麼就暫停一下,再重複以上操做,不然就將最小值賦值給gatingSequenceCache並重復以上操做。下面是com.lmax.disruptor.MultiProducerSequencer#next(int)的代碼。多線程
do { current = cursor.get(); 生產者當前位置 next = current + n; 插入n個新數據後的位置 long wrapPoint = next - bufferSize; 新位置減去ringbuffer長度 long cachedGatingSequence = gatingSequenceCache.get(); 消費者位置 if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current) wrapPoint > cachedGatingSequence表示生產者超過了消費者一圈 { long gatingSequence = Util.getMinimumSequence(gatingSequences, current); 獲取最新的消費者位置 if (wrapPoint > gatingSequence) 若是仍是超過一圈則等待並重試 { LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy? continue; } gatingSequenceCache.set(gatingSequence); 更新緩存的消費者位置 } else if (cursor.compareAndSet(current, next)) 更新生產者位置 { break; } } while (true);
SingleProducerSequencer的next方法的邏輯和MultiProducerSequencer差很少。不一樣的是因爲只有一個生產者線程,所以SingleProducerSequencer直接使用了一個long的nextValue來表示可生產數據的位置,一個long的cachedValue來表示消費者位置緩存。下面是com.lmax.disruptor.SingleProducerSequencer#next(int)的代碼。併發
long nextValue = this.nextValue; 獲取可生產數據的最小位置 long nextSequence = nextValue + n; 插入n個新數據後的位置 long wrapPoint = nextSequence - bufferSize; 新位置減去ringbuffer長度 long cachedGatingSequence = this.cachedValue; 消費者位置 if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) wrapPoint > cachedGatingSequence表示生產者超過了消費者一圈 { cursor.setVolatile(nextValue); // StoreLoad fence long minSequence; while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue))) 獲取消費者最新位置,若是仍是超過一圈則等待並重試 { LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin? } this.cachedValue = minSequence; 保存消費者位置 } this.nextValue = nextSequence; 保存生產者位置
2種Sequencer的獲取新數據位置的邏輯類似,可是發佈數據的邏輯卻徹底不同。SingleProducerSequencer的發佈邏輯較爲簡單,publish方法中直接更新生產者位置cursor,注意cursor和nextValue的差異,nextValue用來獲取新數據的位置,而cursor是已經發布的數據的位置,對於消費者來講cursor纔是真正的生產者位置。下面是com.lmax.disruptor.SingleProducerSequencer#publish(long)的代碼。ide
@Override public void publish(long sequence) { cursor.set(sequence); 更新生成者位置 waitStrategy.signalAllWhenBlocking(); 通知等待的消費者消費 }
這種方式用在MultiProducerSequencer上顯然是不合適的,由於一個生產者發佈可能會致使其餘生產者也發佈了。事實上,MultiProducerSequencer在next方法中就直接更新了cursor。MultiProducerSequencer用一個長度和ringbuffer相同的數組availableBuffer來跟蹤數據的發佈狀態。下面是com.lmax.disruptor.MultiProducerSequencer發佈相關的代碼。函數
@Override public void publish(final long sequence) { setAvailable(sequence); 設置改位置的狀態 waitStrategy.signalAllWhenBlocking(); } private void setAvailable(final long sequence) { setAvailableBufferValue(calculateIndex(sequence), calculateAvailabilityFlag(sequence)); } private void setAvailableBufferValue(int index, int flag) { long bufferAddress = (index * SCALE) + BASE; UNSAFE.putOrderedInt(availableBuffer, bufferAddress, flag); }
能夠看到使用cursor來獲取消費者可消費的最大位置是不合適的了。在Sequencer中有個getHighestPublishedSequence(long lowerBound, long availableSequence)方法用來返回能夠被消費的最大位置。對於SingleProducerSequencer因爲是發佈時更新cursor,所以能夠直接返回availableSequence;對於MultiProducerSequencer是在availableBuffer的[lowerBound,availableSequence]區間上找到最小的已發佈位置。性能
SequenceBarrier
SequenceBarrier是協調消費者的進度和它依賴的進度的。這裏說依賴是由於消費者自己是有層級的,第一層的消費者依賴(不超過)生產者的進度,第二層的消費者依賴(不超過)第一層的消費進度。從構造方法能夠看出當傳入一個長度爲0的dependentSequences數組時,該barrier的dependentSequence就是生產者的位置。若是大於0就用FixedSequenceGroup包裝一下dependentSequences數組,FixedSequenceGroup的get方法返回的就是dependentSequences數組的最小值。下面是com.lmax.disruptor.ProcessingSequenceBarrier的構造函數。學習
ProcessingSequenceBarrier( final Sequencer sequencer, final WaitStrategy waitStrategy, final Sequence cursorSequence, final Sequence[] dependentSequences) { this.sequencer = sequencer; this.waitStrategy = waitStrategy; this.cursorSequence = cursorSequence; if (0 == dependentSequences.length) 若是dependentSequences長度爲0,就依賴生產者進度 { dependentSequence = cursorSequence; } else { dependentSequence = new FixedSequenceGroup(dependentSequences); } }
SequenceBarrier的核心方法就是waitFor(final long sequence)。該方法是用來等待入參sequence變成可消費狀態的。使用waitStrategy來等待並獲取一個有效的sequence,在waitstrategy的全部實現中,這個返回值其實就是dependentSequence。最後經過Sequencer的getHighestPublishedSequence方法獲取[sequence,dependentSequence]區間內可消費的最大位置。下面是com.lmax.disruptor.ProcessingSequenceBarrier#waitFor(final long sequence)的代碼。
public long waitFor(final long sequence) throws AlertException, InterruptedException, TimeoutException { checkAlert(); 等待sequence有效並返回dependentSequence位置 long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this); if (availableSequence < sequence) { return availableSequence; } 返回[sequence,dependentSequence]最小已發佈位置 return sequencer.getHighestPublishedSequence(sequence, availableSequence); }
WaitStrategy是消費者等待消費的動做。判斷sequence是否有效的方法是和dependentSequence比較,當且僅當sequence小於等於dependentSequence時有效。 好比以下代碼:
while ((availableSequence = dependentSequence.get()) < sequence) { 等待 } return availableSequence;
消費者
Disruptor的消費者有2種一種是WokerHandler一種是EventHandler。能夠設置多個WokerHandler,多個WokerHandler會一塊兒去處理全部的數據,也能夠設置多個EventHandler,多個EventHandler會分別處理全部的數據。
WokerHandler是由com.lmax.disruptor.dsl.Disruptor#handleEventsWithWorkerPool(final WorkHandler<T>... workHandlers)方法用來建立。該方法會建立一個workPool,workPool裏面有sequenceBarrier,除此外workPool裏還有一個workSequence。每一個workHandler會建立一個workProcessor,workSequence也會傳入workProcessor的構造方法。下面是com.lmax.disruptor.WorkerPool部分代碼。
private final AtomicBoolean started = new AtomicBoolean(false); private final Sequence workSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); private final RingBuffer<T> ringBuffer; // WorkProcessors are created to wrap each of the provided WorkHandlers private final WorkProcessor<?>[] workProcessors; @SafeVarargs public WorkerPool( final RingBuffer<T> ringBuffer, final SequenceBarrier sequenceBarrier, final ExceptionHandler<? super T> exceptionHandler, final WorkHandler<? super T>... workHandlers) { this.ringBuffer = ringBuffer; final int numWorkers = workHandlers.length; workProcessors = new WorkProcessor[numWorkers]; for (int i = 0; i < numWorkers; i++) { workProcessors[i] = new WorkProcessor<>( 建立WorkHandler執行器 ringBuffer, sequenceBarrier, 協調消費進度 workHandlers[i], exceptionHandler, workSequence); 全部WorkHandler使用一個workSequence } }
WorkProcessor能夠當作是workHandler的執行者,他的核心方法是run。run方法中使用CAS從workSequence中獲取要消費的下標,能夠看出workPool中的全部workProcessor是從同一個workSequence中獲取的,所以一個workPool裏的workHandler是共同消費數據的。當成功獲取到須要nextSequence後,將其於cachedAvailableSequence比較,若是小於等於cachedAvailableSequence表示能夠消費,不然使用sequenceBarrier等待並從新獲取依賴(能夠先理解爲生產者)的最大可消費位置。這裏cachedAvailableSequence和sequencer中提到了gatingSequenceCache思路是同樣的,爲了避免用每次都去獲取,每次獲取後將其保存一塊兒來,消費者還沒消費到這個位置的時候,能夠不用去獲取,由於這時消費者必定沒有超過依賴。
while (true) { try { 省略註解 if (processedSequence) { processedSequence = false; do { nextSequence = workSequence.get() + 1L; 從workSequence的下一個位置 sequence.set(nextSequence - 1L); } while (!workSequence.compareAndSet(nextSequence - 1L, nextSequence)); 競爭這個位置 } if (cachedAvailableSequence >= nextSequence) 若是這個位置小於緩存的依賴的位置 { event = ringBuffer.get(nextSequence); 獲取這個位置的數據並消費 workHandler.onEvent(event); processedSequence = true; } else { cachedAvailableSequence = sequenceBarrier.waitFor(nextSequence); 等待並獲取最大可消費位置 } } 省略部分代碼 }
EventHandler是由com.lmax.disruptor.dsl.Disruptor#handleEventsWith(final EventHandler<? super T>... handlers)方法建立的。每個EventHandler會被建立爲一個BatchEventProcessor。BatchEventProcessor的核心方法是processEvents方法。該方法中就是使用sequencerBarrier去獲取了依賴的最新位置,而後從直接當前位置一直消費到依賴最新的位置。這和WorkProcessor是不一樣的,由於BatchEventProcessor中的sequence各自增加互不影響,而WorkProcessor的sequence都是從workSequence中去爭搶,因此多個EventHandler是分別消費全部的數據。下面是com.lmax.disruptor.BatchEventProcessor#processEvents方法代碼。
while (true) { try { final long availableSequence = sequenceBarrier.waitFor(nextSequence); 直接獲取最大可消費位置 if (batchStartAware != null) { batchStartAware.onBatchStart(availableSequence - nextSequence + 1); } while (nextSequence <= availableSequence) 直接消費到最大可消費位置 { event = dataProvider.get(nextSequence); eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence); nextSequence++; } sequence.set(availableSequence); 一匹消費完後才更新消費進度 }
Disruptor中handleEventsWit方法和handleEventsWithWorkerPool方法建立ProcessingSequenceBarrier時傳入的dependentSequences都是長度爲0的Sequence數組,這樣建立的ProcessingSequenceBarrier的dependentSequences就是生產者的位置,這樣建立出來的消費者就是依賴於生產者進度的。這2個方法都返回EventHandlerGroup,它包含了表示消費者進度的Sequence數組,當使用EventHandlerGroup建立消費者時就會使用該Sequence數組做爲參數建立ProcessingSequenceBarrier,這樣建立出來的消費者就會依賴前一個消費者的消費進度。Disruptor總體運行以下圖所示:
其中生產者的位置是7,生產者的gatingSeq指向消費者依賴圖中的最末端的消費者的seq,表示生產者不能超過最末端的消費者;workpool有2個workhandler分別在2,3,workpool的seqbarrier指向生產者,表示workpool不能超過生產者;eventhandler目前消費到位置是1,他的seqbarrier指向workpool表示其消費進度不能超過workpool。