上一篇已經介紹了Disruptor是什麼?簡單總結了爲何這麼快?下面咱們直接源碼搞起來,簡單粗暴。
高性能隊列disruptor爲何這麼快?html
Disruptor 提供了對RingBuffer的封裝。java
RingBuffer 環形隊列,基於數組實現,內存被循環使用,減小了內存分配、回收擴容等操做。
git
EventProcessor 事件處理器,實現了Runnable,單線程批量處理BatchEventProcessor和多線程處理WorkProcessor。github
Sequencer 生產者訪問序列的接口,RingBuffer生產者的父接口,其直接實現有SingleProducerSequencer和MultiProducerSequencer。數組
EventSequencer 空接口,暫時沒用,用於之後擴展。緩存
SequenceBarrier 消費者屏障 消費者用於訪問緩存的控制器。數據結構
WaitStrategy 當沒有可消費的事件時,根據特定的實現進行等待。多線程
SingleProducerSequencer 單生產者發佈實現類框架
MultiProducerSequencer 多生產者發佈實現類less
筆者簡單介紹下經常使用的類,看不懂不要緊,等看完源碼天然明白。
核心類源碼分析已經上傳了git地址:https://github.com/Sonion/disroptor
開局一張圖,走起。
想分析disruptor,先看生產者,這是筆者整理的生產者相關類圖。
生產者發佈消息是從Disruptor的publish方法開始,
//Disruptor.java public <A> void publishEvent(final EventTranslatorOneArg<T, A> eventTranslator, final A arg) { ringBuffer.publishEvent(eventTranslator, arg); }
實際調用的RingBuffer的publishEvent,實際上也就是作兩件事,一先去獲取RingBuffer上的一個可用位置,第二步在可用位置上發佈數據。
//RingBuffer.java public <A> void publishEvent(EventTranslatorOneArg<E, A> translator, A arg0) { final long sequence = sequencer.next(); translateAndPublish(translator, sequence, arg0); }
來看看獲取RingBuffer上的一個可用位置,先看看單生產者SingleProducerSequencer.next()方法。
//SingleProducerSequencer.java /** * @see Sequencer#next() */ @Override public long next() { return next(1); } /** * @see Sequencer#next(int) */ @Override public long next(int n) { if (n < 1 || n > bufferSize) { throw new IllegalArgumentException("n must be > 0 and < bufferSize"); } // 獲取上次申請最後的序列值 long nextValue = this.nextValue; // n=1,獲得本次須要申請的序列值 long nextSequence = nextValue + n; // 可能發生繞環的點,本次申請值 - 環形一圈長度 long wrapPoint = nextSequence - bufferSize; // 數值最小的序列值,理解爲最慢消費者 long cachedGatingSequence = this.cachedValue; // 序列值初始值是 -1 ,只有wrapPoint 大於 cachedGatingSequence 將發生繞環行爲,生產者超一圈從後方追上消費者,生產者覆蓋未消費的狀況。 // 沒有空坑位,將進入循環等待。 if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) { cursor.setVolatile(nextValue); // StoreLoad fence long minSequence; // 只有當消費者消費,向前移動後,才能跳出循環 // 每次從新獲取消費者序列最小值進行輪詢判斷 while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue))) { LockSupport.parkNanos(1L); } // 當消費者消費後,更新緩存的最小序號 this.cachedValue = minSequence; } // 將成功申請的序號賦值給對象實例變量 this.nextValue = nextSequence; return nextSequence; }
next獲取能夠寫入的序列號,回到RingBuffer的publishEvent方法,執行translateAndPublish方法,進行發佈操做。
//RingBuffer.java private void translateAndPublish(EventTranslator<E> translator, long sequence) { try { translator.translateTo(get(sequence), sequence); } finally { sequencer.publish(sequence); } }
translator.translateTo()對EventTranslator接口的實現。將數據放置好,進行發佈。
//SingleProducerSequencer.java /** * @see Sequencer#publish(long) */ @Override public void publish(long sequence) { // 更新Sequencer內部遊標值 cursor.set(sequence); // 當生產者發佈新事件後,將通知等待的EventProcessor,能夠進行消費 waitStrategy.signalAllWhenBlocking(); } // BlockingWaitStrategy.java @Override public void signalAllWhenBlocking() { synchronized (mutex) { mutex.notifyAll(); } }
到此單生產者發佈流程已經講完。仍是那句話,很簡單,兩步操做先去獲取RingBuffer上的一個可用位置,第二步在可用位置上發佈數據。
咱們簡單看下,前面和單生產者發佈流程同樣,實現接口AbstractSequencer,仍是next()方法,咱們來看。
//MultiProducerSequencer.java /** * @see Sequencer#next() */ @Override public long next() { return next(1); } /** * @see Sequencer#next(int) */ @Override public long next(int n) { if (n < 1 || n > bufferSize) { throw new IllegalArgumentException("n must be > 0 and < bufferSize"); } long current; long next; do { // 當前遊標值,初始化時是-1 current = cursor.get(); next = current + n; long wrapPoint = next - bufferSize; long cachedGatingSequence = gatingSequenceCache.get(); if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current) { long gatingSequence = Util.getMinimumSequence(gatingSequences, current); if (wrapPoint > gatingSequence) { LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy? continue; } gatingSequenceCache.set(gatingSequence); } else if (cursor.compareAndSet(current, next)) { break; } } while (true); return next; }
能夠看到多生產者發佈流程和單生產者發佈流程區別不大,最後固然仍是調用publish發佈,publish有點區別,咱們來看。
/** * @see Sequencer#publish(long) */ @Override public void publish(final long sequence) { //多生產者是採用availableBuffer數組設置 setAvailable(sequence); waitStrategy.signalAllWhenBlocking(); } /** * @see Sequencer#publish(long, long) */ @Override public void publish(long lo, long hi) { for (long l = lo; l <= hi; l++) { setAvailable(l); } waitStrategy.signalAllWhenBlocking(); }
對比SingleProducerSequencer的publish,MultiProducerSequencer的publish沒有設置cursor,而是將內部使用的availableBuffer數組對應位置進行設置。availableBuffer是一個記錄RingBuffer槽位狀態的數組,經過對序列值sequence&bufferSize-1,得到槽位號,再經過位運算,獲取序列值所在的圈數,進行設置。使用更高效的位與和右移操做。
private void setAvailable(final long sequence) { // calculateIndex 與&, calculateAvailabilityFlag 移位操做 setAvailableBufferValue(calculateIndex(sequence), calculateAvailabilityFlag(sequence)); } private void setAvailableBufferValue(int index, int flag) { // 使用Unsafe更新屬性,由於是直接操做內存,因此須要計算元素位置對應的內存位置buffer地址 long bufferAddress = (index * SCALE) + BASE; // availableBuffer是標誌可用位置的int數組,初始全爲-1 UNSAFE.putOrderedInt(availableBuffer, bufferAddress, flag); } private int calculateAvailabilityFlag(final long sequence) { return (int) (sequence >>> indexShift); } private int calculateIndex(final long sequence) { return ((int) sequence) & indexMask; }
到此,多生產者發佈流程也講完了,是否是很easy,若是你們有問題,評論區咱們一塊兒討論。
老規矩,再來一張圖,這個就比較簡單了。
EventProcessor是整個消費者事件處理框架,EventProcessor接口繼承了Runnable接口,主要有兩種實現:單線程批量處理BatchEventProcessor和多線程處理WorkProcessor。
在使用Disruptor幫助類構建消費者時,使用handleEventsWith方法傳入多個EventHandler,內部使用多個BatchEventProcessor關聯多個線程執行。這種狀況相似JMS中的發佈訂閱模式,同一事件會被多個消費者並行消費。適用於同一事件觸發多種操做。
而使用Disruptor的handleEventsWithWorkerPool傳入多個WorkHandler時,內部使用多個WorkProcessor關聯多個線程執行。這種狀況相似JMS的點對點模式,同一事件會被一組消費者其中之一消費。適用於提高消費者並行處理能力。
// EventHandlerGroup.java public EventHandlerGroup<T> then(final EventHandler<? super T>... handlers) { return handleEventsWith(handlers); } public EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers) { return disruptor.createEventProcessors(sequences, handlers); }
//Disruptor.java // barrierSequences是EventHandlerGroup實例的序列,就是上一個事件處理者組的序列 EventHandlerGroup<T> createEventProcessors( final Sequence[] barrierSequences, final EventHandler<? super T>[] eventHandlers) { checkNotStarted(); // processorSequences本次事件處理器組的序列組 final Sequence[] processorSequences = new Sequence[eventHandlers.length]; final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences); for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++) { final EventHandler<? super T> eventHandler = eventHandlers[i]; final BatchEventProcessor<T> batchEventProcessor = new BatchEventProcessor<>(ringBuffer, barrier, eventHandler); if (exceptionHandler != null) { batchEventProcessor.setExceptionHandler(exceptionHandler); } consumerRepository.add(batchEventProcessor, eventHandler, barrier); processorSequences[i] = batchEventProcessor.getSequence(); } // 每次添加完事件處理器後,更新門控序列,用於後續調用鏈的添加判斷。 updateGatingSequencesForNextInChain(barrierSequences, processorSequences); return new EventHandlerGroup<>(this, consumerRepository, processorSequences); } // 門控,是指後續消費鏈的消費,不能超過前邊。 private void updateGatingSequencesForNextInChain(final Sequence[] barrierSequences, final Sequence[] processorSequences) { if (processorSequences.length > 0) { //GatingSequences一直保存消費鏈末端消費者的序列組 ringBuffer.addGatingSequences(processorSequences); for (final Sequence barrierSequence : barrierSequences) { ringBuffer.removeGatingSequence(barrierSequence); } // 取消標記上一組消費者爲消費鏈末端 consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences); } }
BatchEventProcessor構建消費者鏈時的邏輯都在createEventProcessors這個方法中。
先簡單說下ConsumerRepository,這個類主要保存消費者的各類關係,如經過EventHandler引用獲取EventProcessorInfo信息,經過Sequence獲取ConsumerInfo信息等。
由於要使用引用作key,因此數據結構使用IdentityHashMap。
IdentityHashMap
使用的是==
比較key的值,而HashMap
使用的是equals()
HashMap
使用的是hashCode()
查找位置,IdentityHashMap
使用的是System.identityHashCode(object)
IdentityHashMap
理論上來講速度要比HashMap
快一點另一點呢就是IdentityHashMap
中key能重複,但須要注意一點的是key比較的方法是==
,因此若要存放兩個相同的key,就須要存放不一樣的地址。
這個createEventProcessors方法接收兩個參數,barrierSequences表示當前消費者組的屏障序列數組,若是當前消費者組是第一組,則取一個空的序列數組;不然,barrierSequences就是上一組消費者組的序列數組。createEventProcessors方法的另外一個參數eventHandlers,這個參數是表明事件消費邏輯的EventHandler數組。
Disruptor爲每一個EventHandler實現類都建立了一個對應的BatchEventProcessor,全部消費者共用一個SequenceBarrier。
在構建BatchEventProcessor時須要如下傳入三個構造參數:dataProvider是數據存儲結構如RingBuffer;sequenceBarrier用於跟蹤生產者遊標,協調數據處理;eventHandler是用戶實現的事件處理器,也就是實際的消費者。
//BatchEventProcessor.java @Override public void run() { if (running.compareAndSet(IDLE, RUNNING)) { sequenceBarrier.clearAlert(); notifyStart(); try { if (running.get() == RUNNING) { processEvents(); } } finally { notifyShutdown(); running.set(IDLE); } } else { // This is a little bit of guess work. The running state could of changed to HALTED by // this point. However, Java does not have compareAndExchange which is the only way // to get it exactly correct. if (running.get() == RUNNING) { throw new IllegalStateException("Thread is already running"); } else { earlyExit(); } } } private void processEvents() { T event = null; long nextSequence = sequence.get() + 1L; while (true) { try { // 當前可以使用的最大值 // 使用給定的等待策略去等待下一個序列可用 final long availableSequence = sequenceBarrier.waitFor(nextSequence); if (batchStartAware != null) { batchStartAware.onBatchStart(availableSequence - nextSequence + 1); } // 批處理 // 消費的偏移量大於上次消費記錄 while (nextSequence <= availableSequence) { event = dataProvider.get(nextSequence); eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence); nextSequence++; } // eventHandler處理完畢後,更新當前序號 sequence.set(availableSequence); } catch (final TimeoutException e) { notifyTimeout(sequence.get()); } catch (final AlertException ex) { if (running.get() != RUNNING) { break; } } catch (final Throwable ex) { exceptionHandler.handleEventException(ex, nextSequence, event); sequence.set(nextSequence); nextSequence++; } } }
咱們再來看看SequenceBarrier實現類ProcessingSequenceBarrier的代碼是如何實現waitFor方法。
final class ProcessingSequenceBarrier implements SequenceBarrier { /** * 等待可用消費時,指定的等待策略 */ private final WaitStrategy waitStrategy; /** * 依賴的上組消費者的序號,若是當前爲第一組則爲cursorSequence(即生產者發佈遊標序列) * 不然使用FixedSequenceGroup封裝上組消費者序列 */ private final Sequence dependentSequence; /** * 當觸發halt時,將標記alerted爲true */ private volatile boolean alerted = false; /** * AbstractSequencer中的cursor引用,記錄當前發佈者發佈的最新位置 */ private final Sequence cursorSequence; /** * MultiProducerSequencer 或 SingleProducerSequencer */ private final Sequencer sequencer; ProcessingSequenceBarrier( final Sequencer sequencer, final WaitStrategy waitStrategy, final Sequence cursorSequence, final Sequence[] dependentSequences) { this.sequencer = sequencer; this.waitStrategy = waitStrategy; this.cursorSequence = cursorSequence; // 依賴的上一組序列長度,第一次是0 if (0 == dependentSequences.length) { dependentSequence = cursorSequence; } // 將上一組序列數組複製成新數組保存,引用不變 else { dependentSequence = new FixedSequenceGroup(dependentSequences); } } @Override public long waitFor(final long sequence) throws AlertException, InterruptedException, TimeoutException { // 檢查是否中止服務 checkAlert(); // 獲取最大可用序號 sequence爲給定序號,通常爲當前序號+1,cursorSequence記錄生產者最新位置, long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this); if (availableSequence < sequence) { return availableSequence; } // 返回已發佈最高的序列值,將對每一個序號進行校驗 return sequencer.getHighestPublishedSequence(sequence, availableSequence); }
咱們再來看看等待策略WaitStrategy#waitFor
//BlockingWaitStrategy.java public final class BlockingWaitStrategy implements WaitStrategy { private final Object mutex = new Object(); @Override public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier) throws AlertException, InterruptedException { long availableSequence; // 當前遊標小於給定序號,也就是無可用事件 if (cursorSequence.get() < sequence) { //也就是隻有等待策略纔會用鎖,其餘使用CAS,這就是前文提到的高效緣由 synchronized (mutex) { // 當給定的序號大於生產者遊標序號時,進行等待 while (cursorSequence.get() < sequence) // 循環等待,在Sequencer中publish進行喚醒;等待消費時也會在循環中定時喚醒。 { barrier.checkAlert(); mutex.wait(); } } } while ((availableSequence = dependentSequence.get()) < sequence) { barrier.checkAlert(); ThreadHints.onSpinWait(); } return availableSequence; }
WorkProcessor的原理和BatchEventProcessor相似,只是多了workSequence用來保存同組共用的處理序列。在更新workSequence時,涉及多線程操做,因此使用CAS進行更新。
//WorkProcessor.java @Override public void run() { if (!running.compareAndSet(false, true)) { throw new IllegalStateException("Thread is already running"); } sequenceBarrier.clearAlert(); notifyStart(); boolean processedSequence = true; long cachedAvailableSequence = Long.MIN_VALUE; long nextSequence = sequence.get(); T event = null; while (true) { try { // if previous sequence was processed - fetch the next sequence and set // that we have successfully processed the previous sequence // typically, this will be true // this prevents the sequence getting too far forward if an exception // is thrown from the WorkHandler // 表示nextSequence序號的處理狀況(不區分正常或是異常處理)。只有處理過,才能申請下一個序號。 if (processedSequence) { processedSequence = false; do { // 同組中多個消費線程有可能會爭搶一個序號,使用CAS避免使用鎖。 // 同一組使用一個workSequence,WorkProcessor不斷申請下一個可用序號,對workSequence設置成功纔會實際消費。 nextSequence = workSequence.get() + 1L; sequence.set(nextSequence - 1L); } while (!workSequence.compareAndSet(nextSequence - 1L, nextSequence)); } // 緩存的可用序號比要處理的序號大,才能進行處理 if (cachedAvailableSequence >= nextSequence) { event = ringBuffer.get(nextSequence); workHandler.onEvent(event); processedSequence = true; } // 更新緩存的可用序列。這個cachedAvailableSequence只用在WorkProcessor實例內,不一樣實例的緩存多是不同 else { // 和單線程模式相似,返回的也是最大可用序號 cachedAvailableSequence = sequenceBarrier.waitFor(nextSequence); } } catch (final TimeoutException e) { notifyTimeout(sequence.get()); } catch (final AlertException ex) { if (!running.get()) { break; } } catch (final Throwable ex) { // handle, mark as processed, unless the exception handler threw an exception exceptionHandler.handleEventException(ex, nextSequence, event); processedSequence = true; } } notifyShutdown(); running.set(false); }
總結一波:
BatchEventProcessor主要用於處理單線程並行任務,同一消費者組的不一樣消費者會接收相同的事件,並在全部事件處理完畢後進入下一消費者組進行處理(是否是相似JUC裏的Phaser、CyclicBarrier或CountDownLatch呢)。WorkProcessor經過WorkerPool管理多個WorkProcessor,達到多線程處理事件的目的,同一消費者組的多個WorkProcessor不會處理同一個事件。經過選擇不一樣的WaitStragegy實現,能夠控制消費者在沒有可用事件處理時的等待策略。
BatchEventProcessor
WorkProcessor
具體的核心類分析,能夠參考 https://github.com/Sonion/disroptor 裏面有分析和註釋,寫個demo,打個斷點,再看看核心類和方法,Disruptor 源碼仍是簡單的,主要是環形隊列,循環寫入,不用GC回收,還有一些緩存行優化,無鎖等處理是值得學習和思考的。
一、參考demo能夠看這裏
http://www.javashuo.com/article/p-acgvopbo-dv.html
二、簡單資料