Disruptor 詳解 一

這篇博客將主要經過幾個示例,簡單講述 Disruptor 的使用方法;java

1、disruptor 簡介

Disruptor 是英國外匯交易公司 LMAX 開發的一個無鎖高性能的線程間消息傳遞的框架。目前包括 Apache Storm、Camel、Log4j2 等知名項目都是用了 Disruptor;git

由於 Disruptor 中的一個很重要的結構 RingBuffer 和 JDK 中的 ArrayBlockingQueue 很類似,其內部都是一個環形數組,因此常常將他們放在一塊兒比較,如下是官網公佈測試結果github

從圖中能夠明顯看到他們之間性能的巨大差別;數組

此外在使用 Disruptor 的項目中也能看到其性能的差別,例如 Log4j多線程

其中 Loggers all async 採用的是 Disruptor,Async Appender 採用的是 ArrayBlockingQueue, Sync 是同步模式;從圖中能夠看到,線程越多競爭越激烈的時候 Disruptor 的性能優點越明顯,其緣由很很容易想到,由於 ArrayBlockingQueue 的進出由同一把鎖控制,因此競爭對其性能有巨大的影響;框架

此外個人筆記本配置爲 「i7-8550U 8G」,使用的版本爲:async

<dependency>
  <groupId>com.lmax</groupId>
  <artifactId>disruptor</artifactId>
  <version>3.4.2</version>
</dependency>

2、ArrayBlockingQueue 性能對比

如下經過一個單線程的 demo,演示Disruptor 的基本用法,並個 ArrayBlockingQueue 作簡單對比;ide

public class Contrast {
  public static final int count = 50000000;
  public static final int size = 1024;
  private static CountDownLatch latch = new CountDownLatch(1);

  public void testDisruptor() throws InterruptedException {
    long start = System.currentTimeMillis();
    final Disruptor<Event> disruptor = new Disruptor<>(
        () -> new Event(),             // 綁定事件工廠,主要用於初始化 RingBuffer
        size,                          // RingBuffer 大小
        DaemonThreadFactory.INSTANCE,  // 指定生產者線程工廠,也能夠直接傳入線程池
        ProducerType.SINGLE,           // 指定生產者爲單線程,也支持多線程模式
          new YieldingWaitStrategy()   // 等待策略
//        new BlockingWaitStrategy()
    );

    Handler handler = new Handler();
    disruptor.handleEventsWith(handler);  // 綁定事件處理程序
    disruptor.start();

    RingBuffer<Event> ringBuffer = disruptor.getRingBuffer();  // 開始以後 RingBuffer 的全部位置就已經初始化完成
    for (int i = 0; i < count; i++) {
      long seq = ringBuffer.next();       // 獲取下一個放置位置
      Event event = ringBuffer.get(seq);  // 等到指定位置的槽
      event.seId(i);                      // 更新事件,注意這裏是更新,不是放入新的,因此不會有 GC 產生
      ringBuffer.publish(seq);            // 發佈事件
    }

    latch.await();
    System.out.println("time: " + (System.currentTimeMillis() - start));
  }

  private void testQueue() throws InterruptedException {
    long start = System.currentTimeMillis();
    final BlockingQueue<Event> queue = new ArrayBlockingQueue<>(size);
    new Thread(() -> {
      for (int i = 0; i < count; i++) {
        try {
          queue.put(new Event(i));
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
    }).start();

    new Thread(() -> {
      for (int i = 0; i < count; i++) {
        try {
          Event event = queue.take();
          if (i == count - 1) {
            System.out.println("last: " + event.getLogId());
            latch.countDown();
          }
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
    }).start();

    latch.await();
    System.out.println("time: " + (System.currentTimeMillis() - start));
  }

  class Event {
    private long id;
    Event() {}
    Event(long id) { this.id = id; }
    public long getLogId() { return id; }
    public void seId(int id) { this.id = id; }
  }

  class Handler implements EventHandler<Event> {
    private int i = 0;

    @Override
    public void onEvent(Event event, long seq, boolean bool) {
      if (++i == count) {
        System.out.println("last: " + event.getLogId());
        latch.countDown();
      }
    }
  }

  public static void main(String[] args) throws InterruptedException {
    Contrast contrast = new Contrast();
    contrast.testDisruptor();
//    contrast.testQueue();
  }
}

Disruptor-YieldingWaitStrategy: 919
Disruptor-BlockingWaitStrategy: 3142
ArrayBlockingQueue : 4307工具

其中 BlockingWaitStrategy 等待策略和 ArrayBlockingQueue 大體相識性能

3、多消費者

上面的例子在使用多個消費這時,會出現重複消費的狀況,若是想要一條消息只消費一次,能夠參照下面的代碼:

public class MoreConsumer {
  public static final int count = 5000;
  public static final int size = 16;

  public void testDisruptor() {
    long start = System.currentTimeMillis();
    final Disruptor<Event> disruptor = new Disruptor<>(
        () -> new Event(),
        size, DaemonThreadFactory.INSTANCE,
        ProducerType.SINGLE,
        new BlockingWaitStrategy()
    );

    disruptor.handleEventsWithWorkerPool(new Handler("h1"), new Handler("h2"), new Handler("h3"));
    disruptor.start();

    RingBuffer<Event> ringBuffer = disruptor.getRingBuffer();
    for (int i = 0; i < count; i++) {
      long seq = ringBuffer.next();
      Event event = ringBuffer.get(seq);
      event.id = i;
      ringBuffer.publish(seq);
    }

    System.out.println("time: " + (System.currentTimeMillis() - start));
  }

  class Event { public long id; }

  class Handler implements WorkHandler<Event> {
    private String name;

    Handler(String name) { this.name = name; }
      
    @Override
    public void onEvent(Event event) { System.out.println(name + ": " + event.id); }
  }

  public static void main(String[] args) {
    MoreConsumer moreConsumer = new MoreConsumer();
    moreConsumer.testDisruptor();
  }
}

如上面的代碼所示使用 WorkHandler 便可,同時還須要注意選擇等待策略,策略不一樣也可能致使重複消費的問題,同時官網也只出須要在代碼裏面保證重複消費問題;

4、複雜業務邏輯

不少也業務邏輯會出現如下的相似狀況,第三個消費者,須要等待前面的任務完成後才能繼續執行的狀況;一般咱們會使用鎖、同步工具以及一些其餘的方式,但都顯得比較麻煩,並且效率比較低,這裏若是咱們使用 Disruptor 就能很方便的解決;

disruptor.handleEventsWith(c1Handler, c2Handler);
disruptor.after(c1Handler, c2Handler).handleEventsWith(c3Handler);

如此僅需兩行代碼,就能將上面的關係表述清楚,對於更復雜的狀況一樣;

對於更多的使用技巧就須要你根據實際狀況分析了,下一篇博客將主要分析 Disruptor 爲何會那麼快;

相關文章
相關標籤/搜索