disruptor是一個高性能的線程間異步通訊的框架,即在同一個JVM進程中的多線程間消息傳遞。應用disruptor知名項目有以下的一些:Storm, Camel, Log4j2,還有目前的美團點評技術團隊也有不少很多的應用,或者說有一些借鑑了它的設計機制。 下面就跟着筆者一塊兒去領略下disruptor高性能之道吧~html
disruptor是一款開源的高性能隊列框架,github地址爲 https://github.com/LMAX-Exchange/disruptor。git
分析disruptor,只要把event的生產和消費流程弄懂,基本上disruptor的七寸就已經抓住了。話很少說,趕忙上車,筆者如下面代碼爲例講解disruptor:github
public static void main(String[] args) { Disruptor<StringEvent> disruptor = new Disruptor<>(StringEvent::new, 1024, new PrefixThreadFactory("consumer-pool-", new AtomicInteger(0)), ProducerType.MULTI, new BlockingWaitStrategy()); // 註冊consumer並啓動 disruptor.handleEventsWith((EventHandler<StringEvent>) (event, sequence, endOfBatch) -> { System.out.println(Util.threadName() + "onEvent " + event); }); disruptor.start(); // publisher邏輯 Executor executor = Executors.newFixedThreadPool(2, new PrefixThreadFactory("publisher-pool-", new AtomicInteger(0))); while (true) { for (int i = 0; i < 2; i++) { executor.execute(() -> { Util.sleep(1); disruptor.publishEvent((event, sequence, arg0) -> { event.setValue(arg0 + " " + sequence); }, "hello world"); }); } Util.sleep(1000); } }
class StringEvent { private String value; public String getValue() { return value; } public void setValue(String value) { this.value = value; } @Override public String toString() { return "StringEvent:{value=" + value + "}"; } } class PrefixThreadFactory implements ThreadFactory { private String prefix; private AtomicInteger num; public PrefixThreadFactory(String prefix, AtomicInteger num) { this.prefix = prefix; this.num = num; } @Override public Thread newThread(Runnable r) { return new Thread(r, prefix + num.getAndIncrement()); } } class Util { static String threadName() { return String.format("%-16s", Thread.currentThread().getName()) + ": "; } static void sleep(long millis) { try { Thread.sleep(millis); } catch (InterruptedException e) { e.printStackTrace(); } } }
public <A> void publishEvent(EventTranslatorOneArg<E, A> translator, A arg0) { // 獲取當前要設置的sequence序號,而後進行設置並通知消費者 final long sequence = sequencer.next(); translateAndPublish(translator, sequence, arg0); } // 獲取下一個sequence,直到獲取到位置才返回 public long next(int n) { long current; long next; do { // 獲取當前ringBuffer的可寫入sequence current = cursor.get(); next = current + n; long wrapPoint = next - bufferSize; long cachedGatingSequence = gatingSequenceCache.get(); if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current) { // 若是當前沒有空位置寫入,獲取多個consumer中消費進度最小的那個的消費進度 long gatingSequence = Util.getMinimumSequence(gatingSequences, current); if (wrapPoint > gatingSequence) { // 阻塞1ns,而後continue LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy? continue; } gatingSequenceCache.set(gatingSequence); } // cas設置ringBuffer的sequence else if (cursor.compareAndSet(current, next)) { break; } } while (true); return next; } private <A> void translateAndPublish(EventTranslatorOneArg<E, A> translator, long sequence, A arg0) { try { // 設置event translator.translateTo(get(sequence), sequence, arg0); } finally { sequencer.publish(sequence); } } public void publish(final long sequence) { // 1. 設置availableBuffer,表示對應的event是否設置完成,consumer線程中會用到 // - 注意,到這裏時,event已經設置完成,可是consumer還不知道該sequence對應的event是否設置完成, // - 因此須要設置availableBuffer中sequence對應event的sequence number // 2. 通知consumer setAvailable(sequence); waitStrategy.signalAllWhenBlocking(); }
從translateAndPublish中看,若是用戶的設置event方法拋出異常,這時event對象是不完整的,那麼publish到consumer端,consumer消費的不是完整的數據怎麼辦呢?在translateAndPublish中需不須要在異常狀況下reset event對象呢?關於這個問題筆者以前是有疑問的,關於這個問題筆者提了一個issue,可點擊 https://github.com/LMAX-Exchange/disruptor/issues/244 進行查看。數組
筆者建議在consumer消費完event以後,進行reset event操做,這樣避免下次設置event異常consumer時取到不完整的數據,好比log4j2中的AsyncLogger中處理完log4jEvent以後就會調用clear方法進行重置event。緩存
private void processEvents() { T event = null; long nextSequence = sequence.get() + 1L; while (true) { try { // 獲取可用的sequence,默認直到有可用sequence時才返回 final long availableSequence = sequenceBarrier.waitFor(nextSequence); if (batchStartAware != null) { batchStartAware.onBatchStart(availableSequence - nextSequence + 1); } // 執行消費回調動做,注意,這裏獲取到一個批次event,可能有多個,個數爲availableSequence-nextSequence + 1 // nextSequence == availableSequence表示該批次只有一個event while (nextSequence <= availableSequence) { // 獲取nextSequence位置上的event event = dataProvider.get(nextSequence); // 用戶自定義的event 回調 eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence); nextSequence++; } // 設置當前consumer線程的消費進度sequence sequence.set(availableSequence); } catch (final Throwable ex) { exceptionHandler.handleEventException(ex, nextSequence, event); sequence.set(nextSequence); nextSequence++; } } } public long waitFor(final long sequence) throws AlertException, InterruptedException, TimeoutException{ long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this); if (availableSequence < sequence) { return availableSequence; } // 獲取ringBuffer中可安全讀的最大的sequence number,該信息存在availableBuffer中的sequence // 在MultiProducerSequencer.publish方法中會設置 return sequencer.getHighestPublishedSequence(sequence, availableSequence); } // 默認consumer阻塞策略 BlockingWaitStrategy public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier) throws AlertException, InterruptedException { long availableSequence; if (cursorSequence.get() < sequence) { // 當前ringBuffer的sequence小於sequence,阻塞等待 // event生產以後會喚醒 synchronized (mutex) { while (cursorSequence.get() < sequence) { barrier.checkAlert(); mutex.wait(); } } } while ((availableSequence = dependentSequence.get()) < sequence) { barrier.checkAlert(); ThreadHints.onSpinWait(); } return availableSequence; }
從上面的event消費流程來看,消費線程會讀取ringBuffer的sequence,而後更新本消費線程內的offset(消費進度sequence),若是有多個event的話,那麼就是廣播消費模式了(單consumer線程內仍是順序消費),若是不想讓event被廣播消費(重複消費),可以使用以下方法添加consumer線程(WorkHandler是集羣消費,EventHandler是廣播消費):安全
disruptor.handleEventsWithWorkerPool((WorkHandler<StringEvent>) event -> { System.out.println(Util.threadName() + "onEvent " + event); });
event生產流程中獲取並自增sequence時用的就是CAS,獲取以後該sequence對應位置的操做只會在單線程,沒有了併發問題。數據結構
集羣消費模式下獲取sequence以後也會使用CAS設置爲sequence新值,設置本地消費進度,而後再執行獲取event並執行回調邏輯。多線程
注意,disruptor中較多地方使用了CAS,但並不表明徹底沒有了鎖機制,好比默認consumer阻塞策略 BlockingWaitStrategy發揮做用時,consumer消費線程就會阻塞,只不過這隻會出如今event生產能力不足是纔會存在。若是consumer消費不足,大量event生產致使ringBuffer爆滿,這時event生產線程就會輪詢調用LockSupport.parkNanos(1),這裏的成本也不容小覷(涉及到線程切換損耗)。併發
僞共享講的是多個CPU時的123級緩存的問題,一般,緩存是以緩存行的方式讀取數據,若是A、B兩個變量被緩衝在同一行以內,那麼對於其中一個的更新會致使另外一個緩衝無效,須要從內存中讀取,這種沒法充分利用緩存行的問題就是僞共享。disruptor相關代碼以下:框架
class LhsPadding { protected long p1, p2, p3, p4, p5, p6, p7; } class Value extends LhsPadding { protected volatile long value; }
ringBuffer是一個環形隊列,本質是一個數組,size爲2的冪次方(方便作&操做),數據位置sequence值會和size作&操做得出數組下標,而後進行數據的讀寫操做(只在同一個線程內,無併發問題)。
disruptor初衷是爲了解決內存隊列的延遲問題,做爲一個高性能隊列,包括Apache Storm、Camel、Log4j 2在內的不少知名項目都在使用。disruptor的重要機制就是CAS和RingBuffer,藉助於它們兩個實現數據高效的生產和消費。
disruptor多生產者多消費者模式下,由於RingBuffer數據的寫入是分爲2步的(先獲取到個sequence,而後寫入數據),若是獲取到sequence以後,生產者寫入RingBuffer較慢,consumer消費較快,那麼生產者最終會拖慢consumer消費進度,這一點需注意(若是已經消費到生產者佔位的前一個數據了,那麼consumer會執行對應的阻塞策略)。在實際使用過程當中,若是consumer消費邏輯耗時較長,能夠封裝成任務交給線程池來處理,避免consumer端拖慢生成者的寫入速度。
disruptor的設計對於開發者來講有哪些借鑑的呢?儘可能減小競爭,避免多線程對同一數據作操做,好比disruptor使用CAS獲取只會在一個線程內進行讀寫的event對象,這種思想其實已經在JDK的thread本地內存中有所體現;儘可能複用對象,避免大量的內存申請釋放,增長GC損耗,disruptor經過複用event對象來保證讀寫時不會產生對象GC問題;選擇合適數據結構,disruptor使用ringBuffer,環形數組來實現數據高效讀寫。
參考資料: