這篇博客將主要經過幾個示例,簡單講述 Disruptor 的使用方法;java
Disruptor 是英國外匯交易公司 LMAX 開發的一個無鎖高性能的線程間消息傳遞的框架。目前包括 Apache Storm、Camel、Log4j2 等知名項目都是用了 Disruptor;git
由於 Disruptor 中的一個很重要的結構 RingBuffer
和 JDK 中的 ArrayBlockingQueue
很類似,其內部都是一個環形數組,因此常常將他們放在一塊兒比較,如下是官網公佈測試結果github
從圖中能夠明顯看到他們之間性能的巨大差別;數組
此外在使用 Disruptor 的項目中也能看到其性能的差別,例如 Log4j多線程
其中 Loggers all async
採用的是 Disruptor,Async Appender
採用的是 ArrayBlockingQueue, Sync
是同步模式;從圖中能夠看到,線程越多競爭越激烈的時候 Disruptor 的性能優點越明顯,其緣由很很容易想到,由於 ArrayBlockingQueue 的進出由同一把鎖控制,因此競爭對其性能有巨大的影響;框架
此外個人筆記本配置爲 「i7-8550U 8G」,使用的版本爲:async
<dependency> <groupId>com.lmax</groupId> <artifactId>disruptor</artifactId> <version>3.4.2</version> </dependency>
如下經過一個單線程的 demo,演示Disruptor 的基本用法,並個 ArrayBlockingQueue 作簡單對比;ide
public class Contrast { public static final int count = 50000000; public static final int size = 1024; private static CountDownLatch latch = new CountDownLatch(1); public void testDisruptor() throws InterruptedException { long start = System.currentTimeMillis(); final Disruptor<Event> disruptor = new Disruptor<>( () -> new Event(), // 綁定事件工廠,主要用於初始化 RingBuffer size, // RingBuffer 大小 DaemonThreadFactory.INSTANCE, // 指定生產者線程工廠,也能夠直接傳入線程池 ProducerType.SINGLE, // 指定生產者爲單線程,也支持多線程模式 new YieldingWaitStrategy() // 等待策略 // new BlockingWaitStrategy() ); Handler handler = new Handler(); disruptor.handleEventsWith(handler); // 綁定事件處理程序 disruptor.start(); RingBuffer<Event> ringBuffer = disruptor.getRingBuffer(); // 開始以後 RingBuffer 的全部位置就已經初始化完成 for (int i = 0; i < count; i++) { long seq = ringBuffer.next(); // 獲取下一個放置位置 Event event = ringBuffer.get(seq); // 等到指定位置的槽 event.seId(i); // 更新事件,注意這裏是更新,不是放入新的,因此不會有 GC 產生 ringBuffer.publish(seq); // 發佈事件 } latch.await(); System.out.println("time: " + (System.currentTimeMillis() - start)); } private void testQueue() throws InterruptedException { long start = System.currentTimeMillis(); final BlockingQueue<Event> queue = new ArrayBlockingQueue<>(size); new Thread(() -> { for (int i = 0; i < count; i++) { try { queue.put(new Event(i)); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); new Thread(() -> { for (int i = 0; i < count; i++) { try { Event event = queue.take(); if (i == count - 1) { System.out.println("last: " + event.getLogId()); latch.countDown(); } } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); latch.await(); System.out.println("time: " + (System.currentTimeMillis() - start)); } class Event { private long id; Event() {} Event(long id) { this.id = id; } public long getLogId() { return id; } public void seId(int id) { this.id = id; } } class Handler implements EventHandler<Event> { private int i = 0; @Override public void onEvent(Event event, long seq, boolean bool) { if (++i == count) { System.out.println("last: " + event.getLogId()); latch.countDown(); } } } public static void main(String[] args) throws InterruptedException { Contrast contrast = new Contrast(); contrast.testDisruptor(); // contrast.testQueue(); } }
Disruptor-YieldingWaitStrategy: 919
Disruptor-BlockingWaitStrategy: 3142
ArrayBlockingQueue : 4307工具其中 BlockingWaitStrategy 等待策略和 ArrayBlockingQueue 大體相識性能
上面的例子在使用多個消費這時,會出現重複消費的狀況,若是想要一條消息只消費一次,能夠參照下面的代碼:
public class MoreConsumer { public static final int count = 5000; public static final int size = 16; public void testDisruptor() { long start = System.currentTimeMillis(); final Disruptor<Event> disruptor = new Disruptor<>( () -> new Event(), size, DaemonThreadFactory.INSTANCE, ProducerType.SINGLE, new BlockingWaitStrategy() ); disruptor.handleEventsWithWorkerPool(new Handler("h1"), new Handler("h2"), new Handler("h3")); disruptor.start(); RingBuffer<Event> ringBuffer = disruptor.getRingBuffer(); for (int i = 0; i < count; i++) { long seq = ringBuffer.next(); Event event = ringBuffer.get(seq); event.id = i; ringBuffer.publish(seq); } System.out.println("time: " + (System.currentTimeMillis() - start)); } class Event { public long id; } class Handler implements WorkHandler<Event> { private String name; Handler(String name) { this.name = name; } @Override public void onEvent(Event event) { System.out.println(name + ": " + event.id); } } public static void main(String[] args) { MoreConsumer moreConsumer = new MoreConsumer(); moreConsumer.testDisruptor(); } }
如上面的代碼所示使用 WorkHandler
便可,同時還須要注意選擇等待策略,策略不一樣也可能致使重複消費的問題,同時官網也只出須要在代碼裏面保證重複消費問題;
不少也業務邏輯會出現如下的相似狀況,第三個消費者,須要等待前面的任務完成後才能繼續執行的狀況;一般咱們會使用鎖、同步工具以及一些其餘的方式,但都顯得比較麻煩,並且效率比較低,這裏若是咱們使用 Disruptor 就能很方便的解決;
disruptor.handleEventsWith(c1Handler, c2Handler); disruptor.after(c1Handler, c2Handler).handleEventsWith(c3Handler);
如此僅需兩行代碼,就能將上面的關係表述清楚,對於更復雜的狀況一樣;
對於更多的使用技巧就須要你根據實際狀況分析了,下一篇博客將主要分析 Disruptor 爲何會那麼快;