本文將介紹Disruptor的工做機制,並分析Disruptor的主要源碼html
基於的版本是3.3.7(發佈於2017.09.28)git
水平有限,若有謬誤請留言指正github
Disruptor是一個開源的併發框架,提供了相似於Java中有界隊列的功能,主要用於生產消費者場景。編程
與Java原生併發隊列不一樣的是,Disruptor高度優化,在單機上能夠輕鬆跑到千萬級別的tps與ns級別的延時數組
a. 使用環形隊列做爲底層存儲(存儲空間連續,能夠充分利用cache)緩存
b. 環形隊列中存儲的對象都是預先創建好的,減小了頻繁建立/釋放對象帶來的開銷安全
c. 生產者使用兩階段提交的方式來發布事件(第一階段是先在環形隊列中預佔一個空位,第二階段是向這個空位中寫入數據,競爭只發生在第一階段),並使用CAS操做來解決衝突,而不是使用昂貴的Lock併發
d. 用cache line padding(緩衝區填充)的思想來解決false sharing(僞共享)的問題框架
e. 使用了Java底層的Unsafe操做ide
RingBuffer
環形緩衝區,本質是一個定長Object數組(後續稱裏面的格子爲slot),爲了不僞共享,在這個數組的兩端額外填充了若干空位(這也致使訪問RingBuffer數據的方式比較崎嶇,具體請自行參見源碼)
Sequence
相似於AtomicLong,用於標記事件id
全部生產者共用一個Sequence,用於不衝突的將事件放到RingBuffer上
每一個消費者本身維護一個Sequence,用於標記本身當前正在處理的事件的id
Sequencer
生產者訪問RingBuffer時的控制器,主要實現有兩種:SingleProducerSequencer與MultiProducerSequencer,分別用於單生產者和多生產者的場景
SequenceBarrier
只有一個實現類爲ProcessingSequenceBarrier,用於協調生產者與消費者(若是某個slot中的事件尚未被全部消費者消費完畢,那麼這個slot是不能被複用的,須要等待)
WaitStrategy
消費者等待下一個可用事件的策略,Disruptor自帶了多種WaitStrategy的實現,能夠根據場景自行選擇。
示例代碼以下:
long sequence = ringBuffer.next(); // 第一階段,獲取RingBuffer上下一個可用的slot的序列號,這裏可能會有爭用 try { Event event = ringBuffer.get(sequence); // 根據序列號直接去RingBuffer上獲取對應的slot上存儲的事件 event.setData(data); // 寫入數據 } finally { ringBuffer.publish(sequence); // 第二階段,將這個事件正式發佈到RingBuffer中 }
須要重點關注的是next()與publish()方法
RingBuffer的next方法直接調用關聯的Sequencer的next方法,Sequencer的實現又分爲SingleProducerSequencer與MultiProducerSequencer這兩種
先從相對簡單的單生產者SingleProducerSequencer看起:
SingleProducerSequencer.next() @Override public long next() { return next(1); } @Override public long next(int n) { if (n < 1)//參數檢驗 { throw new IllegalArgumentException("n must be > 0"); } long nextValue = this.nextValue;//上一次返回的seq long nextSequence = nextValue + n;//此次應該返回的序列值,這個序列值還未被產生,對應的slot裏的元素的seq須要減去RingBuffer的大小 long wrapPoint = nextSequence - bufferSize;//這個序列值對應的slot上正在存儲的元素的seq,這個slot可能已經被消費了,也可能沒有 long cachedGatingSequence = this.cachedValue;//獲取消費者未消費的元素的seq最小值,這個值不是實時的 //wrapPoint > cachedGatingSequence,檢查將要被放入元素的slot是否已經沒有消費者佔用了 //cachedGatingSequence > nextValue,用於來應對seq發生溢出的狀況 if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) { cursor.setVolatile(nextValue); // StoreLoad fence,更新RingBuffer的遊標,用到了Unsafe方法 long minSequence; //Util.getMinimumSequence能夠得到全部消費者未消費事件的seq最小值,在比這個值更小的slot裏發佈元素是安全的 //若是這個判斷成立,說明生產者正在試圖將元素放到消費者未消費完畢的slot裏,這個操做是不安全的,生產者須要在這裏被阻塞 while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue))) { waitStrategy.signalAllWhenBlocking();//激活全部的消費者(避免有的消費者睡死過去了) LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin? 自旋等待 } this.cachedValue = minSequence;//更新cachedValue } this.nextValue = nextSequence; return nextSequence; }
邏輯比較難懂,關鍵之處以下:
a. 返回的seq對應的slot必須已經被全部消費者消費完畢
b. Util.getMinimumSequence會遍歷全部消費者使用的Sequence,並獲取其最小值,這是一個比較昂貴的操做,因此將其緩存在本地的cachedValue變量中
c. 若是seq對應的slot還沒被消費者消費完畢,說明生產速度快於消費速度,生產者須要原地自旋等待,同時向消費者發送信號,避免消費者睡死過去的狀況
再來看多生產者版本:
@Override public long next(int n) { if (n < 1)//參數檢驗 { throw new IllegalArgumentException("n must be > 0"); } long current; long next; do { current = cursor.get();//獲取最新返回的seq next = current + n;//本次返回的seq long wrapPoint = next - bufferSize;//本次返回的seq對應的slot裏的元素的seq long cachedGatingSequence = gatingSequenceCache.get();//有多個生產者,gatingSequenceCache其實是SingleProducerSequencer裏的cachedValue的Atomic版本 if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current)//檢查將要被放入元素的slot是否已經沒有消費者佔用了 { long gatingSequence = Util.getMinimumSequence(gatingSequences, current);//得到全部消費者未消費事件的seq最小值 if (wrapPoint > gatingSequence)//slot仍被消費者佔用,生產者自旋等待 { waitStrategy.signalAllWhenBlocking(); LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy? continue; } gatingSequenceCache.set(gatingSequence);//更新gatingSequenceCache的值 } else if (cursor.compareAndSet(current, next))//用CAS操做更新cursor的值,若是失敗了說明有其餘生產者在爭用,進入下一輪循環 { break; } } while (true); return next; }
與單生產者版本相似,可是當前序列值以及緩存住的安全序列值都使用了原子變量,以解決線程安全問題。能夠說是很是精巧了。
SingleProducerSequencer.publish @Override public void publish(long sequence) { cursor.set(sequence);//更新RingBuffer的遊標 waitStrategy.signalAllWhenBlocking();//給消費者發送signal信號,具體怎麼作與waitStrategy的實現有關 } MultiProducerSequencer.publish @Override public void publish(final long sequence) { setAvailable(sequence);//這裏用了一個額外的availableBuffer數組,來標記RingBuffer的某個slot是否已經被publish成功,後面生產者消費的時候會用到 waitStrategy.signalAllWhenBlocking();//給消費者發送signal信號,具體怎麼作與waitStrategy的實現有關 }
單生產者/多生產者之間微妙的區別:
單生產者publish一個seq,那麼這個seq以前全部的seq都被publish了
多生產者publish一個seq,那麼只有這一個seq被publish
消費者是經過調用Disruptor的handlerEventsWith方法被添加到系統中的,其調用鏈以下:
Disruptor.handleEventsWith() public EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers) { return createEventProcessors(new Sequence[0], handlers);//添加事件處理器, } Disruptor.createEventProcessors() EventHandlerGroup<T> createEventProcessors( final Sequence[] barrierSequences,//默認狀況下,barrierSequences是不存在的 final EventHandler<? super T>[] eventHandlers) { checkNotStarted(); final Sequence[] processorSequences = new Sequence[eventHandlers.length];//爲每一個消費者建立一個Sequence final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);//建立一個與當前RingBuffer的生產者有關的Barrier for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++)//遍歷消費者 { final EventHandler<? super T> eventHandler = eventHandlers[i]; final BatchEventProcessor<T> batchEventProcessor = new BatchEventProcessor<T>(ringBuffer, barrier, eventHandler);//爲每一個消費者建立一個BatchEventProcessor if (exceptionHandler != null) { batchEventProcessor.setExceptionHandler(exceptionHandler);//異常處理器 } consumerRepository.add(batchEventProcessor, eventHandler, barrier);//將EventHandler與BatchEventProcessor關聯起來 processorSequences[i] = batchEventProcessor.getSequence(); } updateGatingSequencesForNextInChain(barrierSequences, processorSequences);//將消費者的sequence添加爲RingBuffer的gating sequence return new EventHandlerGroup<T>(this, consumerRepository, processorSequences); }
而後調用Disruptor的start方法啓動系統,其調用鏈以下:
Disruptor.start() public RingBuffer<T> start() { checkOnlyStartedOnce(); for (final ConsumerInfo consumerInfo : consumerRepository)//遍歷註冊的全部消費者 { consumerInfo.start(executor); } return ringBuffer; } EventProcessorInfo.start() @Override public void start(final Executor executor) { executor.execute(eventprocessor);//這裏是將以前在createEventProcessors中爲消費者註冊的BatchEventProcessor放到線程池裏運行起來了 } BatchEventProcessor.run() @Override public void run() { if (!running.compareAndSet(false, true))//避免重複運行 { throw new IllegalStateException("Thread is already running"); } sequenceBarrier.clearAlert(); notifyStart(); T event = null; long nextSequence = sequence.get() + 1L; try { while (true)//在死循環中處理事件 { try { final long availableSequence = sequenceBarrier.waitFor(nextSequence);//從RingBuffer中獲取一批能夠處理的事件的seq,策略由以前設置的waitStrategy決定,返回的seq可能會大於nextSequence(批量,提升效率) if (batchStartAware != null) { batchStartAware.onBatchStart(availableSequence - nextSequence + 1); } while (nextSequence <= availableSequence)//在循環中消耗返回的availableSequence { event = dataProvider.get(nextSequence);//從RingBuffer中讀取數據 eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);//調用用戶傳入的EventHandler中的onEvent方法來處理事件 nextSequence++; } sequence.set(availableSequence);//這一批seq消耗完了,更新當前消費者關聯的sequence,讓生產者能夠知道,這裏調用的是相對廉價的putOrderedLong方法,由於不須要很高的實時性 } catch (final TimeoutException e) { notifyTimeout(sequence.get()); } catch (final AlertException ex) { if (!running.get()) { break; } } catch (final Throwable ex) { exceptionHandler.handleEventException(ex, nextSequence, event); sequence.set(nextSequence); nextSequence++; } } } finally { notifyShutdown(); running.set(false); } } ProcessingSequenceBarrier.waitFor() public long waitFor()(final long sequence) throws AlertException, InterruptedException, TimeoutException { checkAlert(); long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);//調用消費者的waitStrategy來等待sequence變得可用 if (availableSequence < sequence) { return availableSequence; } return sequencer.getHighestPublishedSequence(sequence, availableSequence);//從RingBuffer中找到最大的已經被publish事件的slot,尋找策略根據單生產者/多生產者有不一樣 } SingleProducerSequencer.getHighestPublishedSequence()//單生產者的狀況,簡單 public long getHighestPublishedSequence(long lowerBound, long availableSequence) { return availableSequence; } MultiProducerSequencer.getHighestPublishedSequence()//多生產者 public long getHighestPublishedSequence(long lowerBound, long availableSequence) { for (long sequence = lowerBound; sequence <= availableSequence; sequence++)//從lowerBound開始遍歷,在RingBuffer中找到最大的已經被publish事件的slot { if (!isAvailable(sequence)) { return sequence - 1; } } return availableSequence; } public boolean isAvailable(long sequence) { int index = calculateIndex(sequence);//在RingBuffer中定位 int flag = calculateAvailabilityFlag(sequence);//後面的3行是在availableBuffer數組中尋找對應位置的元素是否被標記爲available。在MultiProducerSequencer的publish方法中會作這一操做 long bufferAddress = (index * SCALE) + BASE; return UNSAFE.getIntVolatile(availableBuffer, bufferAddress) == flag; }
這一塊的代碼也比較難,作個小結:
a. 每一個消費者都與一個BatchEventProcessor關聯,BatchEventProcessor初始化的時候會建立一個sequence,這個sequence記錄的是消費者已經處理的事件的seq
b. Disruptor初始化的時候會建立一個SequenceBarrier,這個barrier與生產者有關
c. 消費者會無限的調用barrier的waitFor方法,以嘗試獲取最新publish的事件,一旦waitFor方法返回了可用的seq,就在循環中調用消費者的onEvent方法將這些事件消耗掉
d. 在waitFor方法中,會根據設定的WaitStrategy來在RingBuffer上查找最新publish的事件的seq
WaitStrategy在消費速度快於生產速度,消費者等待生產者publish新的事件時會被用到。
若是生產速度快於消費速度,生產者等待消費者時,用的是LockSupport.parkNanos(1),相似於自旋等待。
WaitStrategy很是重要,不一樣的WaitStrategy會直接影響到響應事件和CPU佔用,值得專門開一節來分析
WaitStrategy是一個接口,其中只含有兩個方法:
//sequence:消費者等待這個sequence關聯的事件產生 //cursor:RingBuffer上,生產者關聯的Sequence //dependentSequence:默認狀況下與cursor相同 //barrier:與生產者關聯的barrier long waitFor(long sequence, Sequence cursor, Sequence dependentSequence, SequenceBarrier barrier) throws AlertException, InterruptedException, TimeoutException; void signalAllWhenBlocking();//若是有消費者阻塞等待,將其喚醒
WaitStrategy有多種實現,我這裏只分析兩種最有表明性的:BlockingWaitStrategy與BusySpinWaitStrategy
private final Lock lock = new ReentrantLock();//鎖與關聯的條件 private final Condition processorNotifyCondition = lock.newCondition(); @Override public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier) throws AlertException, InterruptedException { long availableSequence; if (cursorSequence.get() < sequence)//若是生產者尚未生產出足夠的事件,那麼在鎖上等待 { lock.lock();//嘗試佔有鎖 try { while (cursorSequence.get() < sequence) { barrier.checkAlert(); processorNotifyCondition.await();//等待 } } finally { lock.unlock();//釋放鎖 } } while ((availableSequence = dependentSequence.get()) < sequence) { barrier.checkAlert(); } return availableSequence; } @Override public void signalAllWhenBlocking() { lock.lock(); try { processorNotifyCondition.signalAll();//喚醒全部在鎖上等待的生產者 } finally { lock.unlock(); } }
代碼很簡單,用JDK自帶的ReentrantLock與Condition來完成消費者的等待控制,只要消費者拿不到可用的事件,就調用Condition.await方法等待
好處:CPU佔用少
壞處:在生產的事件足夠後,消費者沒法在第一時間醒來,須要生產者調用signalAll才行,因爲此時消費者線程可能已經被OS切走了,這會帶來必定的延時
@Override public long waitFor( final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier) throws AlertException, InterruptedException { long availableSequence; while ((availableSequence = dependentSequence.get()) < sequence)//只要生產者沒有生產出足夠的事件,消費者就一直自旋等待 { barrier.checkAlert(); } return availableSequence; } @Override public void signalAllWhenBlocking()//消費者根本不被阻塞,因此無需喚醒 { }
這個等待策略就更簡單了,甚至能夠說是喪心病狂,全部消費者都在無限自旋等待(Busy Spin),直到生產者生產了足夠的事件爲止。
好處:延時極低(消費者線程一直是熱的),生產出來的事件會第一時間被消費掉
壞處:有多少個消費者,就會吃滿多少個Core
小結一下:
WaitStrategy其實是延時與CPU資源佔用的權衡
若是你追求最低的延時(ns級別),那就必須保證消費者一直是是熱的,不能被系統調度走,所以你須要BusySpinWaitStrategy
若是你不須要那麼低的延時,那麼基於鎖的BlockingWaitStrategy可能更加適合你
ArrayBlockingQueue的源碼分析能夠參見這篇博客,其核心想法以下:
a. 內置一個ReentrantLock與兩個Condition
b. 任何對Queue的讀寫操做均用ReentrantLock加鎖 -> 實現了線程安全的語義
c. 在隊列空/滿的狀況下若是繼續取出/插入元素,則利用Condition將工做線程阻塞,在符合條件的時候再將被阻塞的線程喚醒 -> 實現了阻塞隊列的語義
很明顯,這種實如今高併發的狀況下存在必定的問題:
a. 在任一時刻,只能有一個讀/寫線程在工做,其餘的線程都被ReentrantLock所阻塞
b. takeIndex與putIndex這兩個被頻繁訪問的域在內存上距離很近,容易引發僞共享問題
Disruptor則很好的解決了這些問題,具體請參加本文第一節
Disruptor是一個設計很是精巧的框架,爲了追求極致性能,作了不少底層優化,值得學習參考