Disruptor系列(二)— disruptor使用

本文譯自Dirsruptor在github上的wiki中文章:Getting Startedhtml

獲取Disruptor

Disruptor jar包能夠從maven倉庫mvnrepository獲取,能夠將其集成進項目的依賴管理中。java

<dependency>
  <groupId>com.lmax</groupId>
  <artifactId>disruptor</artifactId>
  <version>3.4.2</version>
</dependency>


編寫事件處理生產者和消費者

爲了學習Disruptor的使用,這裏以很是簡單的例子入手:生產者生產單個long型value傳遞給消費者。這裏簡化消費者邏輯,只打印消費的value。首先定義攜帶數據的Event:git

public class LongEvent
{
    private long value;

    public void set(long value)
    {
        this.value = value; 
    }
}

爲了容許Disruptor可以爲咱們預分配這些事件,咱們須要一個EventFactory用於構造事件:github

public class LongEventFactory implements EventFactory<LongEvent>
{
    public LongEvent newInstance()
    {
        return new LongEvent();
    }
}

一旦咱們定義了事件,我便再須要建立事件消費者用於消費處理事件。在咱們的例子中,咱們只須要打印value值到控制檯便可:網絡

public class LongEventHandler implements EventHandler<LongEvent>
{
    public void onEvent(LongEvent event, long sequence, boolean endOfBatch)
    {
        System.out.println("Event: " + event);
    }
}

有了事件消費者,咱們還須要事件生產者產生事件。爲了簡單起見,咱們假設數據來源於I/O,如:網絡或者文件。因爲不一樣版本的Disruptor,提供了不一樣的方式編寫生產者。併發

隨着3.0版本,Disruptor經過將複雜邏輯囊括在RingBuffer中,從而提供了豐富的Lambda-style API幫助開發者構建Producer。所以從3.0以後,更偏心使用Event Publisher/Event Translator的API發佈消息:app

public class LongEventProducerWithTranslator
{
    private final RingBuffer<LongEvent> ringBuffer;
    
    public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer)
    {
        this.ringBuffer = ringBuffer;
    }
    
    private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR =
        new EventTranslatorOneArg<LongEvent, ByteBuffer>()
        {
            public void translateTo(LongEvent event, long sequence, ByteBuffer bb)
            {
                event.set(bb.getLong(0));
            }
        };

    public void onData(ByteBuffer bb)
    {
        ringBuffer.publishEvent(TRANSLATOR, bb);
    }
}

這種方式的另外一個優點在於Translator代碼能夠被分離在單獨的類中,同時也比較容易進行無依賴的單元測試。Disruptor提供了許多不一樣的接口(EventTranslator, EventTranslatorOneArg, EventTranslatorTwoArg, etc.),能夠經過實現這些接口提供translators。緣由是容許轉換器被表示爲靜態類或非捕獲lambda做爲轉換方法的參數經過Ring Buffer上的調用傳遞給轉換器。異步

另外一方式使用3.0版本以前的遺留API構建生產者發佈消息,這種方式比較原始:maven

public class LongEventProducer
{
    private final RingBuffer<LongEvent> ringBuffer;

    public LongEventProducer(RingBuffer<LongEvent> ringBuffer)
    {
        this.ringBuffer = ringBuffer;
    }

    public void onData(ByteBuffer bb)
    {
        long sequence = ringBuffer.next();  // Grab the next sequence
        try
        {
            LongEvent event = ringBuffer.get(sequence); // Get the entry in the Disruptor
                                                        // for the sequence
            event.set(bb.getLong(0));  // Fill with data
        }
        finally
        {
            ringBuffer.publish(sequence);
        }
    }
}

從以上的代碼流程編寫能夠看出,事件的發佈比使用一個簡單的隊列要複雜。這是因爲須要對事件預分配致使。對於消息的發佈有兩個階段,首先在RingBuffer中聲明須要的槽位,而後再發布可用的數據。必須使用try/finally語句塊包裹消息的發佈。必須如今try塊中聲明使用RingBuffer的槽位,而後再finally塊中發佈使用的sequece。若是不這樣作,將可能致使Disruptor狀態的錯誤,特別是在多生產者的狀況下,若是不重啓Disruptor將不能恢復。所以推薦使用EventTranslator編寫producer。高併發

最後一步須要將以上編寫的組件鏈接起來。雖然能夠手動鏈接各個組件,然而那樣可能比較複雜,所以提供了一個DSL用於構造以便簡化過程。使用DSL帶來裝配的簡化,可是卻對於不少參數沒法作到更細緻的控制,然而對於大多數狀況,DSL仍是很是適合:

public class LongEventMain
{
    public static void main(String[] args) throws Exception
    {
        // The factory for the event
        LongEventFactory factory = new LongEventFactory();

        // Specify the size of the ring buffer, must be power of 2.
        int bufferSize = 1024;

        // Construct the Disruptor
        Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, DaemonThreadFactory.INSTANCE);

        // Connect the handler
        disruptor.handleEventsWith(new LongEventHandler());

        // Start the Disruptor, starts all threads running
        disruptor.start();

        // Get the ring buffer from the Disruptor to be used for publishing.
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();

        LongEventProducer producer = new LongEventProducer(ringBuffer);

        ByteBuffer bb = ByteBuffer.allocate(8);
        for (long l = 0; true; l++)
        {
            bb.putLong(0, l);
            producer.onData(bb);
            Thread.sleep(1000);
        }
    }
}

關於對Disruptor的接口設計的影響之一是Java 8,由於它使用了Functional Interfaces去實現Java Lambdas。在Disruptor API的大多數接口都被定義成Functional Interfaces以便Lambdas能夠被使用。以上的LongEventMain可使用Lambdas進行簡化:

public class LongEventMain
{
    public static void main(String[] args) throws Exception
    {
        // Specify the size of the ring buffer, must be power of 2.
        int bufferSize = 1024;

        // Construct the Disruptor
        Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);

        // Connect the handler
        disruptor.handleEventsWith((event, sequence, endOfBatch) -> System.out.println("Event: " + event));

        // Start the Disruptor, starts all threads running
        disruptor.start();

        // Get the ring buffer from the Disruptor to be used for publishing.
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();

        ByteBuffer bb = ByteBuffer.allocate(8);
        for (long l = 0; true; l++)
        {
            bb.putLong(0, l);
            ringBuffer.publishEvent((event, sequence, buffer) -> event.set(buffer.getLong(0)), bb);
            Thread.sleep(1000);
        }
    }
}

能夠看出使用Lambdas有大量的類將再也不須要,如:handler,translator等。也能夠看出使用Lambdas簡化publishEvent()只僅僅涉及到參數傳遞。

然而若是將代碼改爲這樣:

ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; true; l++)
{
    bb.putLong(0, l);
    ringBuffer.publishEvent((event, sequence) -> event.set(bb.getLong(0)));
    Thread.sleep(1000);
}

注意這裏使用了捕獲式的Lambda,意味着經過調用publishEvent()時可能須要實例化一個對象來持有ByteBuffer bb將其傳遞給lambda。這個將可能建立額外的垃圾,若是對GC壓力有嚴格要求的狀況下,經過傳遞參數的方式將更加受歡迎。

使用方法引用來代理上述的lambda將能進一步簡化上述的方式,也將更時髦:

public class LongEventMain
{
    public static void handleEvent(LongEvent event, long sequence, boolean endOfBatch)
    {
        System.out.println(event);
    }

    public static void translate(LongEvent event, long sequence, ByteBuffer buffer)
    {
        event.set(buffer.getLong(0));
    }

    public static void main(String[] args) throws Exception
    {
        // Specify the size of the ring buffer, must be power of 2.
        int bufferSize = 1024;

        // Construct the Disruptor
        Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);

        // Connect the handler
        disruptor.handleEventsWith(LongEventMain::handleEvent);

        // Start the Disruptor, starts all threads running
        disruptor.start();

        // Get the ring buffer from the Disruptor to be used for publishing.
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();

        ByteBuffer bb = ByteBuffer.allocate(8);
        for (long l = 0; true; l++)
        {
            bb.putLong(0, l);
            ringBuffer.publishEvent(LongEventMain::translate, bb);
            Thread.sleep(1000);
        }
    }
}

這裏對ringBuffer.publishEvent的參數使用了方法引用替換了lambda,使其更進一步簡化。


基本的參數設置

對於大多數場景使用方式便可。然而,若是你能肯定硬件和軟件的環境即可以進一步對Disruptor的參數進行調整以提升性能。主要有兩種參數能夠被調整:

  • single vs. multiple producers
  • alternative wait strategies

Single vs. Multiple Producers

提升併發系統的性能的最好方式是遵循Single Writer Principle,這個也在Disruptor也被應用。若是在你的場景中只僅僅是單生產者,而後你能夠調優得到額外的性能提高:

public class LongEventMain
{
    public static void main(String[] args) throws Exception
    {
        //.....
        // Construct the Disruptor with a SingleProducerSequencer
        Disruptor<LongEvent> disruptor = new Disruptor(
            factory, bufferSize, ProducerType.SINGLE, new BlockingWaitStrategy(), DaemonThreadFactory.INSTANCE);
        //.....
    }
}

爲了說明經過這種技術方式能替身多少性能優點,這裏有一份測試類OneToOne performance test。在i7 Sandy Bridge MacBook Air的運行結果:

Multiple Producer:

Run 0, Disruptor=26,553,372 ops/sec
Run 1, Disruptor=28,727,377 ops/sec
Run 2, Disruptor=29,806,259 ops/sec
Run 3, Disruptor=29,717,682 ops/sec
Run 4, Disruptor=28,818,443 ops/sec
Run 5, Disruptor=29,103,608 ops/sec
Run 6, Disruptor=29,239,766 ops/sec

Single Producer:

Run 0, Disruptor=89,365,504 ops/sec
Run 1, Disruptor=77,579,519 ops/sec
Run 2, Disruptor=78,678,206 ops/sec
Run 3, Disruptor=80,840,743 ops/sec
Run 4, Disruptor=81,037,277 ops/sec
Run 5, Disruptor=81,168,831 ops/sec
Run 6, Disruptor=81,699,346 ops/sec

Alternative Wait Strategies

Disruptor默認使用的等待策略是BlockingWaitStrategy。內部的BlockingWaitStrategy使用典型的Lock和Condition處理線程的wake-up。BlockingWaitStrategy是等待策略中最慢的,可是在CPU使用率方面是最保守的,最普遍的適用於大多數場景。能夠經過調整等待策略參數獲取額外的性能。

1.SleepingWaitStrategy

相似BlockingWaitStrategy,SleepingWaitStrategy也試圖保持CPU使用率。經過使用簡單的忙等循環,可是在循環過程當中調用了LockSupport.parkNanos(1)。在典型的Linux系統上停頓線程60us。然而,它具備如下好處:生產線程不須要採起任何其餘增長適當計數器的動做,而且不須要發信號通知條件變量的成本。然而將增大生產者和消費者以前數據傳遞的延遲。在低延遲沒有被要求的場景中,這是一個很是好的策略。一個公共的使用場景是異步日誌。

2.YieldingWaitStrategy

YieldingWaitStrategy是一個低延遲系統中等待策略。經過犧牲CPU資源來下降延遲。YieldingWaitStrategy經過busy spin等待sequence增加到合適的值。在內部實現中,經過在循環內部使用Thread.yield()容許其餘的隊列線程運行。當須要很高的性能且事件處理線程少於CPU邏輯核數時這個策略被強烈推薦。如:啓用了超線程。

3.BusySpinWaitStrategy

BusySpinWaitStrategy是高新跟那個的等待策略,可是對環境有限制。若是事件處理器的數量小於物理核數時才使用這個策略。


清理RingBuffer中的對象

當經過Disruptor傳遞數據時,對象的存活時間可能超過預期。爲了可以避免這個發生,在事件處理結束後應當清理下事件對象。若是隻有單個生產者,在該生產者中清理對象便是最高效的。而後有時間處理鏈時,就須要特定的事件處理器被放置在鏈的最末尾用於清理事件。

class ObjectEvent<T>
{
    T val;

    void clear()
    {
        val = null;
    }
}

public class ClearingEventHandler<T> implements EventHandler<ObjectEvent<T>>
{
    public void onEvent(ObjectEvent<T> event, long sequence, boolean endOfBatch)
    {
        // Failing to call clear here will result in the 
        // object associated with the event to live until
        // it is overwritten once the ring buffer has wrapped
        // around to the beginning.
        event.clear(); 
    }
}

public static void main(String[] args)
{
    Disruptor<ObjectEvent<String>> disruptor = new Disruptor<>(
        () -> ObjectEvent<String>(), bufferSize, DaemonThreadFactory.INSTANCE);

    disruptor
        .handleEventsWith(new ProcessingEventHandler())
        .then(new ClearingObjectHandler());
}
相關文章
相關標籤/搜索