Disruptor 詳解 二

Disruptor 的大名從好久之前就據說了,可是一直沒有時間;看完之後才發現其內部的思想異常清晰,很容易就能前移到其餘的項目,因此仔細瞭解一下仍是頗有必要的這。篇博客將主要從源碼角度分析,Disruptor 爲何那麼快,在此以前能夠先查看 Disruptor 詳解 一 ,可以對 Disruptor 的使用有一個大體的瞭解;此外 Disruptor 一般會和 ArrayBlockingQueue 作對比,能夠參考 JDK源碼分析(11)之 BlockingQueue 相關html

1、Disruptor 簡介

首先能夠從下面兩張圖看到,Disruptor 的內部結構,只這裏我偷了一下懶,圖中的內容是老版本的,可能和新版本有點不同可是主要結構仍是同樣的;java

具體使用示例代碼我這裏就不貼,你們能夠看我上一篇博客;設計模式

初始化;首先在啓動的時候,須要預先初始化 RingBuffer,因此須要傳入 EventFactory;這裏和 JUC 裏面 Queue 很不同的地方地方是,RingBuffer 中的 Event 不會被取出,每次 publish 的時候都是覆蓋以前的內容,因此 RingBuffer 這裏是不會產生 GC 的;而生產者和消費者都持有一個 Sequence,指示當前的處理位置,當須要獲取 Event 的時候,能夠直接使用 sequence & ringBuffer.size - 1 除留餘數法快速找到對應的數組位置;數組

private void fill(EventFactory<E> eventFactory) {
  for (int i = 0; i < bufferSize; i++) {
    entries[BUFFER_PAD + i] = eventFactory.newInstance();
  }
}

生產者;同時能夠指定 Disruptor 是單生產者仍是多生產者:緩存

  • ProducerType.SINGLE - SingleProducerSequencer :由於只有一個生產者,因此者更新 sequence 的時候是不須要加鎖的;
  • ProducerType.MULTI - MultiProducerSequencer :多個生產者的時候,使用樂觀鎖機制更新 sequence,即 UNSAFE.compareAndSwapLong

當沒有空餘位置的時候他們都是使用 LockSupport.parkNanos(1L); 來阻塞線程的,若是有須要你也能夠改爲其餘的等待模式;併發

// RingBuffer
// 首先經過 Sequencer 拿到下一個可用的序列
public long next() { return sequencer.next(); }

// 而後用除留餘數發拿到對應的數組元素
public E get(long sequence) { return elementAt(sequence); }

// 這裏是使用 UNSAFE 直接獲取內存對象
protected final E elementAt(long sequence) {
  return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT));
}

// 最後將拿到的數組元素修改成新的 Event,再發布
public void publish(long sequence) { sequencer.publish(sequence); }

// 這裏全部關於生產者併發的問題都封裝到了 Sequencer 裏面,後面最詳細講到

消費者;正由於上面說的 RingBuffer 中的對象不對像普通的 Queue 同樣,真正取出,因此在 Disruptor 中能夠很容易作到,同一個消息同時被多個消費者獲取的邏輯;這裏的關鍵就在於 每一個消費者所持有的 Sequence;框架

  • 當消息能夠被重複消費的時候,每一個消費者不須要管其餘的消費者,每次獲取新任務的時候,只須要和生產者的 Sequence 比較就能夠了,獲取成功後更新本身,這樣每一個消費者就能夠互不影響了;
  • 當消息不能被重複消費的時候,全部的消費者共享一個 Sequence,當發生競爭的時候使用指定的 WaitStrategy 解決衝突;

等待策略;Disruptor 提供了不少從等待策略,這裏須要根據實際的業務需求選擇使用;同時和 JDK 中的隊列相比,不管是阻塞隊列仍是併發隊列,其控制併發的方式都是固定的,而在 Disruptor 中則能夠很容易的定製這些策略,從這一點來看也能夠說是實現了策略模式;ide

  • BlockingWaitStrategy: 和 ArrayBlockingQueue 同樣使用加鎖的方式
  • BusySpinWaitStrategy: Busy Spin strategy 自旋等待
  • LiteBlockingWaitStrategy: BlockingWaitStrategy 的變種,也是使用加鎖方式
  • PhasedBackoffWaitStrategy:兩段式策略
  • SleepingWaitStrategy: 這是一個在性能和 CPU 佔用率作了平衡的一種策略,初始自旋,而後 Yield,最後 Sleep
  • TimeoutBlockingWaitStrategy:同名字
  • YieldingWaitStrategy: 使用自旋、Yield 方式

以上這些就是 Disruptor 的大體框架性內容了,另外有兩點是 Disruptor 很快的重要緣由;源碼分析

  • 緩存的引用
  • 併發的處理

2、Disruptor 對緩存的應用

首先計算機中各級存儲器的速度差別巨大,數量級描述大體以下:性能

存儲器 容量 速度
寄存器 * / B 1 ns
一級 Cache * / KB 5 ~ 10 ns
二級 Cache * / KB - M 40 ~ 60 ns
內存 */ M - G 100 ~ 150 ns
硬盤 * / G - T 3 ~ 15 ms

根據上圖的數據,直觀的反應若是想加快軟件的運行速度,固然是儘可能利用上層的緩存體系;在 JVM 中緩存不是以單字節存在的,而是以緩存行的形式,一般是 2 的整數冪個連續字節,通常爲 32-256 個字節。最多見的緩存行大小是 64 個字節;

在咱們的隊列,數則或者 Disruptor 中,理想狀態下就是生產者和消費的速度保持相對一致,這樣能避免阻塞的發生,其生產者和消費者就分別位於數組的頭部和尾部;

可是這樣的理想狀態很難到達,要麼是生產者快一些,要麼是消費者快一些,其結果以下圖;

因此頭和尾一般都位於同一個緩存行中,這樣者更新頭的時候,將對應的緩存標記爲失效,同時尾也被標記爲了失效,者就是僞緩存;

下面是一個緩存的測試例子;

public final class FalseSharing implements Runnable {
  private static final int NUM_THREADS = 4; // change
  private static final long ITERATIONS = 500L * 1000L * 1000L;
  private final int arrayIndex;
  private static VolatileLong[] longs = new VolatileLong[NUM_THREADS];

  static {
    for (int i = 0; i < longs.length; i++) {
      longs[i] = new VolatileLong();
    }
  }

  public FalseSharing(final int arrayIndex) {
    this.arrayIndex = arrayIndex;
  }

  public static void main(final String[] args) throws Exception {
    final long start = System.nanoTime();
    runTest();
    System.out.println("duration = " + (System.nanoTime() - start));
  }

  private static void runTest() throws InterruptedException {
    Thread[] threads = new Thread[NUM_THREADS];
    for (int i = 0; i < threads.length; i++) {
      threads[i] = new Thread(new FalseSharing(i));
    }
    for (Thread t : threads) {
      t.start();
    }

    for (Thread t : threads) {
      t.join();
    }
  }

  @Override
  public void run() {
    long i = ITERATIONS + 1;
    while (0 != --i) {
      longs[arrayIndex].value = i;
    }
  }

  public static final class VolatileLong {
    // public long p1, p2, p3, p4, p5, p6; // cache line padding
    public volatile long value = 0L;
    // public long p8, p9, p10, p11, p12, p13, p14, p15; // cache line padding
  }
}

這裏不一樣的機器測試的結果不一樣,你們能夠修改線程數,padding 數,和 padding 的前後順序;會獲得不一樣的結果;

我測試的結果:
無 padding :17988876300
有 padding :4667271000
能夠看到是查了一個數量級

這樣的緩存填充,在 Disruptor 中隨處可見:

abstract class RingBufferPad {
  protected long p1, p2, p3, p4, p5, p6, p7;
}

public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E> {
  public static final long INITIAL_CURSOR_VALUE = Sequence.INITIAL_VALUE;
  protected long p1, p2, p3, p4, p5, p6, p7;
  ...
}

3、Disruptor 的併發處理

併發的處理,一樣的 Disruptor 中隨處可見,雖然在平時寫代碼的時候也會注意,可是當狀態變量多了之後,代碼就會變得很複雜,不容易讀懂;而在 Disruptor 中由 Sequence 串聯起來的各個部分,以及策略模式的應用,使得每部分的處理同樣的清晰;這裏的內容太多了就不一一分析了,好比 MultiProducerSequencer 和 SingleProducerSequencer;

// SingleProducerSequencer
public long next(int n) {
  if (n < 1) throw new IllegalArgumentException("n must be > 0");
  long nextValue = this.nextValue;
  long nextSequence = nextValue + n;
  long wrapPoint = nextSequence - bufferSize;
  long cachedGatingSequence = this.cachedValue;

  if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) {
    cursor.setVolatile(nextValue);  // StoreLoad fence

    long minSequence;
    while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue))) {
        LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
    }

    this.cachedValue = minSequence;
  }

  this.nextValue = nextSequence;
  return nextSequence;
}

// MultiProducerSequencer 
public long next(int n) {
  if (n < 1) throw new IllegalArgumentException("n must be > 0");
  long current;
  long next;

  do {
    current = cursor.get();
    next = current + n;

    long wrapPoint = next - bufferSize;
    long cachedGatingSequence = gatingSequenceCache.get();

    if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current) {
      long gatingSequence = Util.getMinimumSequence(gatingSequences, current);

      if (wrapPoint > gatingSequence) {
        LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
        continue;
      }
      gatingSequenceCache.set(gatingSequence);
    }
    else if (cursor.compareAndSet(current, next)) {
      break;
    }
  }
  while (true);
  return next;
}

總結

對 Disruptor 源碼查看的最大感受是,習覺得常的結構設計模式,均可以有更精妙的寫法,若是 Sequence 承擔的各部分邏輯串聯的角色,總體的消費者生產者模式,消費者部分能夠當作是觀察者模式,也能夠看出是事件監聽模式,以及併發控制的策略模式;兩外就是包括僞緩存在內的各細節優化;

相關文章
相關標籤/搜索