disruptor 高性能之道

disruptor是一個高性能的線程間異步通訊的框架,即在同一個JVM進程中的多線程間消息傳遞。應用disruptor知名項目有以下的一些:Storm, Camel, Log4j2,還有目前的美團點評技術團隊也有不少很多的應用,或者說有一些借鑑了它的設計機制。 下面就跟着筆者一塊兒去領略下disruptor高性能之道吧~html

disruptor是一款開源的高性能隊列框架,github地址爲 https://github.com/LMAX-Exchange/disruptorgit

分析disruptor,只要把event的生產和消費流程弄懂,基本上disruptor的七寸就已經抓住了。話很少說,趕忙上車,筆者如下面代碼爲例講解disruptor:github

public static void main(String[] args) {
    Disruptor<StringEvent> disruptor = new Disruptor<>(StringEvent::new, 1024,
            new PrefixThreadFactory("consumer-pool-", new AtomicInteger(0)), ProducerType.MULTI,
            new BlockingWaitStrategy());
 
    // 註冊consumer並啓動
    disruptor.handleEventsWith((EventHandler<StringEvent>) (event, sequence, endOfBatch) -> {
        System.out.println(Util.threadName() + "onEvent " + event);
    });
    disruptor.start();
 
    // publisher邏輯
    Executor executor = Executors.newFixedThreadPool(2,
            new PrefixThreadFactory("publisher-pool-", new AtomicInteger(0)));
    while (true) {
        for (int i = 0; i < 2; i++) {
            executor.execute(() -> {
                Util.sleep(1);
                disruptor.publishEvent((event, sequence, arg0) -> {
                    event.setValue(arg0 + " " + sequence);
                }, "hello world");
            });
        }
 
        Util.sleep(1000);
    }
}
class StringEvent {
    private String value;

    public String getValue() {
        return value;
    }

    public void setValue(String value) {
        this.value = value;
    }

    @Override
    public String toString() {
        return "StringEvent:{value=" + value + "}";
    }
}

class PrefixThreadFactory implements ThreadFactory {
    private String prefix;
    private AtomicInteger num;

    public PrefixThreadFactory(String prefix, AtomicInteger num) {
        this.prefix = prefix;
        this.num = num;
    }

    @Override
    public Thread newThread(Runnable r) {
        return new Thread(r, prefix + num.getAndIncrement());
    }

}

class Util {

    static String threadName() {
        return String.format("%-16s", Thread.currentThread().getName()) + ": ";
    }

    static void sleep(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
測試相關類

event生產流程

event的生產是從 RingBuffer.publishEvent 開始的,event生產流程步驟以下:
  • 獲取待插入(到ringBuffer的)位置,至關於先佔個位
  • 往該位置上設置event
  • 設置sequence對應event的標誌,通知consumer
public <A> void publishEvent(EventTranslatorOneArg<E, A> translator, A arg0)
{
    // 獲取當前要設置的sequence序號,而後進行設置並通知消費者
    final long sequence = sequencer.next();
    translateAndPublish(translator, sequence, arg0);
}
 
// 獲取下一個sequence,直到獲取到位置才返回
public long next(int n) {
    long current;
    long next;
     
    do {
        // 獲取當前ringBuffer的可寫入sequence
        current = cursor.get();
        next = current + n;
 
        long wrapPoint = next - bufferSize;
        long cachedGatingSequence = gatingSequenceCache.get();
 
        if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current) {
            // 若是當前沒有空位置寫入,獲取多個consumer中消費進度最小的那個的消費進度
            long gatingSequence = Util.getMinimumSequence(gatingSequences, current);
 
            if (wrapPoint > gatingSequence) {
                // 阻塞1ns,而後continue
                LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
                continue;
            }
 
            gatingSequenceCache.set(gatingSequence);
        }
        // cas設置ringBuffer的sequence
        else if (cursor.compareAndSet(current, next)) {
            break;
        }
    } while (true);
 
    return next;
}
 
private <A> void translateAndPublish(EventTranslatorOneArg<E, A> translator, long sequence, A arg0) {
    try {
        // 設置event
        translator.translateTo(get(sequence), sequence, arg0);
    } finally {
        sequencer.publish(sequence);
    }
}
public void publish(final long sequence) {
    // 1. 設置availableBuffer,表示對應的event是否設置完成,consumer線程中會用到
    //   - 注意,到這裏時,event已經設置完成,可是consumer還不知道該sequence對應的event是否設置完成,
    //   - 因此須要設置availableBuffer中sequence對應event的sequence number
    // 2. 通知consumer
    setAvailable(sequence);
    waitStrategy.signalAllWhenBlocking();
}

從translateAndPublish中看,若是用戶的設置event方法拋出異常,這時event對象是不完整的,那麼publish到consumer端,consumer消費的不是完整的數據怎麼辦呢?在translateAndPublish中需不須要在異常狀況下reset event對象呢?關於這個問題筆者以前是有疑問的,關於這個問題筆者提了一個issue,可點擊 https://github.com/LMAX-Exchange/disruptor/issues/244 進行查看。數組

筆者建議在consumer消費完event以後,進行reset event操做,這樣避免下次設置event異常consumer時取到不完整的數據,好比log4j2中的AsyncLogger中處理完log4jEvent以後就會調用clear方法進行重置event。緩存

event消費流程

event消費流程入口是BatchEventProcessor.processEvents,event消費流程步驟:
  • 獲取當前consumer線程消費的offset,即nextSequence
  • 從ringBuffer獲取可用的sequence,沒有新的event時,會根據consmer阻塞策略進行執行某些動做
  • 獲取event,而後執行event回調
  • 設置當前consumer線程的消費進度
private void processEvents() {
    T event = null;
    long nextSequence = sequence.get() + 1L;
 
    while (true) {
        try {
            // 獲取可用的sequence,默認直到有可用sequence時才返回
            final long availableSequence = sequenceBarrier.waitFor(nextSequence);
            if (batchStartAware != null) {
                batchStartAware.onBatchStart(availableSequence - nextSequence + 1);
            }
 
            // 執行消費回調動做,注意,這裏獲取到一個批次event,可能有多個,個數爲availableSequence-nextSequence + 1
            // nextSequence == availableSequence表示該批次只有一個event
            while (nextSequence <= availableSequence) {
                // 獲取nextSequence位置上的event
                event = dataProvider.get(nextSequence);
                // 用戶自定義的event 回調
                eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
                nextSequence++;
            }
 
            // 設置當前consumer線程的消費進度sequence
            sequence.set(availableSequence);
        } catch (final Throwable ex) {
            exceptionHandler.handleEventException(ex, nextSequence, event);
            sequence.set(nextSequence);
            nextSequence++;
        }
    }
}
 
public long waitFor(final long sequence)
        throws AlertException, InterruptedException, TimeoutException{
    long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);
 
    if (availableSequence < sequence) {
        return availableSequence;
    }
 
    // 獲取ringBuffer中可安全讀的最大的sequence number,該信息存在availableBuffer中的sequence
    // 在MultiProducerSequencer.publish方法中會設置
    return sequencer.getHighestPublishedSequence(sequence, availableSequence);
}
 
// 默認consumer阻塞策略 BlockingWaitStrategy
public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier)
    throws AlertException, InterruptedException
{
    long availableSequence;
    if (cursorSequence.get() < sequence) {
        // 當前ringBuffer的sequence小於sequence,阻塞等待
        // event生產以後會喚醒
        synchronized (mutex) {
            while (cursorSequence.get() < sequence) {
                barrier.checkAlert();
                mutex.wait();
            }
        }
    }
 
    while ((availableSequence = dependentSequence.get()) < sequence) {
        barrier.checkAlert();
        ThreadHints.onSpinWait();
    }
 
    return availableSequence;
}

從上面的event消費流程來看,消費線程會讀取ringBuffer的sequence,而後更新本消費線程內的offset(消費進度sequence),若是有多個event的話,那麼就是廣播消費模式了(單consumer線程內仍是順序消費),若是不想讓event被廣播消費(重複消費),可以使用以下方法添加consumer線程(WorkHandler是集羣消費,EventHandler是廣播消費):安全

disruptor.handleEventsWithWorkerPool((WorkHandler<StringEvent>) event -> {
    System.out.println(Util.threadName() + "onEvent " + event);
});

disruptor高性能之道

棄用鎖機制改用CAS

event生產流程中獲取並自增sequence時用的就是CAS,獲取以後該sequence對應位置的操做只會在單線程,沒有了併發問題。數據結構

集羣消費模式下獲取sequence以後也會使用CAS設置爲sequence新值,設置本地消費進度,而後再執行獲取event並執行回調邏輯。多線程

注意,disruptor中較多地方使用了CAS,但並不表明徹底沒有了鎖機制,好比默認consumer阻塞策略 BlockingWaitStrategy發揮做用時,consumer消費線程就會阻塞,只不過這隻會出如今event生產能力不足是纔會存在。若是consumer消費不足,大量event生產致使ringBuffer爆滿,這時event生產線程就會輪詢調用LockSupport.parkNanos(1),這裏的成本也不容小覷(涉及到線程切換損耗)。併發

 
避免僞共享引入緩衝行填充

僞共享講的是多個CPU時的123級緩存的問題,一般,緩存是以緩存行的方式讀取數據,若是A、B兩個變量被緩衝在同一行以內,那麼對於其中一個的更新會致使另外一個緩衝無效,須要從內存中讀取,這種沒法充分利用緩存行的問題就是僞共享。disruptor相關代碼以下:框架

class LhsPadding {
    protected long p1, p2, p3, p4, p5, p6, p7;
}
class Value extends LhsPadding {
    protected volatile long value;
}
 
使用RingBuffer做爲數據存儲容器

ringBuffer是一個環形隊列,本質是一個數組,size爲2的冪次方(方便作&操做),數據位置sequence值會和size作&操做得出數組下標,而後進行數據的讀寫操做(只在同一個線程內,無併發問題)。

 
小結

disruptor初衷是爲了解決內存隊列的延遲問題,做爲一個高性能隊列,包括Apache Storm、Camel、Log4j 2在內的不少知名項目都在使用。disruptor的重要機制就是CAS和RingBuffer,藉助於它們兩個實現數據高效的生產和消費

disruptor多生產者多消費者模式下,由於RingBuffer數據的寫入是分爲2步的(先獲取到個sequence,而後寫入數據),若是獲取到sequence以後,生產者寫入RingBuffer較慢,consumer消費較快,那麼生產者最終會拖慢consumer消費進度,這一點需注意(若是已經消費到生產者佔位的前一個數據了,那麼consumer會執行對應的阻塞策略)。在實際使用過程當中,若是consumer消費邏輯耗時較長,能夠封裝成任務交給線程池來處理,避免consumer端拖慢生成者的寫入速度。

disruptor的設計對於開發者來講有哪些借鑑的呢?儘可能減小競爭,避免多線程對同一數據作操做,好比disruptor使用CAS獲取只會在一個線程內進行讀寫的event對象,這種思想其實已經在JDK的thread本地內存中有所體現;儘可能複用對象,避免大量的內存申請釋放,增長GC損耗,disruptor經過複用event對象來保證讀寫時不會產生對象GC問題;選擇合適數據結構,disruptor使用ringBuffer,環形數組來實現數據高效讀寫。

 

參考資料:

一、https://tech.meituan.com/disruptor.html

相關文章
相關標籤/搜索