Disruptor 的大名從好久之前就據說了,可是一直沒有時間;看完之後才發現其內部的思想異常清晰,很容易就能前移到其餘的項目,因此仔細瞭解一下仍是頗有必要的這。篇博客將主要從源碼角度分析,Disruptor 爲何那麼快,在此以前能夠先查看 Disruptor 詳解 一 ,可以對 Disruptor 的使用有一個大體的瞭解;此外 Disruptor 一般會和 ArrayBlockingQueue 作對比,能夠參考 JDK源碼分析(11)之 BlockingQueue 相關 ;html
首先能夠從下面兩張圖看到,Disruptor 的內部結構,只這裏我偷了一下懶,圖中的內容是老版本的,可能和新版本有點不同可是主要結構仍是同樣的;java
具體使用示例代碼我這裏就不貼,你們能夠看我上一篇博客;設計模式
初始化;首先在啓動的時候,須要預先初始化 RingBuffer,因此須要傳入 EventFactory;這裏和 JUC 裏面 Queue 很不同的地方地方是,RingBuffer 中的 Event 不會被取出,每次 publish 的時候都是覆蓋以前的內容,因此 RingBuffer 這裏是不會產生 GC 的;而生產者和消費者都持有一個 Sequence,指示當前的處理位置,當須要獲取 Event 的時候,能夠直接使用 sequence & ringBuffer.size - 1
除留餘數法快速找到對應的數組位置;數組
private void fill(EventFactory<E> eventFactory) { for (int i = 0; i < bufferSize; i++) { entries[BUFFER_PAD + i] = eventFactory.newInstance(); } }
生產者;同時能夠指定 Disruptor 是單生產者仍是多生產者:緩存
UNSAFE.compareAndSwapLong
;當沒有空餘位置的時候他們都是使用 LockSupport.parkNanos(1L);
來阻塞線程的,若是有須要你也能夠改爲其餘的等待模式;併發
// RingBuffer // 首先經過 Sequencer 拿到下一個可用的序列 public long next() { return sequencer.next(); } // 而後用除留餘數發拿到對應的數組元素 public E get(long sequence) { return elementAt(sequence); } // 這裏是使用 UNSAFE 直接獲取內存對象 protected final E elementAt(long sequence) { return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT)); } // 最後將拿到的數組元素修改成新的 Event,再發布 public void publish(long sequence) { sequencer.publish(sequence); } // 這裏全部關於生產者併發的問題都封裝到了 Sequencer 裏面,後面最詳細講到
消費者;正由於上面說的 RingBuffer 中的對象不對像普通的 Queue 同樣,真正取出,因此在 Disruptor 中能夠很容易作到,同一個消息同時被多個消費者獲取的邏輯;這裏的關鍵就在於 每一個消費者所持有的 Sequence;框架
等待策略;Disruptor 提供了不少從等待策略,這裏須要根據實際的業務需求選擇使用;同時和 JDK 中的隊列相比,不管是阻塞隊列仍是併發隊列,其控制併發的方式都是固定的,而在 Disruptor 中則能夠很容易的定製這些策略,從這一點來看也能夠說是實現了策略模式;ide
以上這些就是 Disruptor 的大體框架性內容了,另外有兩點是 Disruptor 很快的重要緣由;源碼分析
首先計算機中各級存儲器的速度差別巨大,數量級描述大體以下:性能
存儲器 | 容量 | 速度 |
---|---|---|
寄存器 | * / B | 1 ns |
一級 Cache | * / KB | 5 ~ 10 ns |
二級 Cache | * / KB - M | 40 ~ 60 ns |
內存 | */ M - G | 100 ~ 150 ns |
硬盤 | * / G - T | 3 ~ 15 ms |
根據上圖的數據,直觀的反應若是想加快軟件的運行速度,固然是儘可能利用上層的緩存體系;在 JVM 中緩存不是以單字節存在的,而是以緩存行的形式,一般是 2 的整數冪個連續字節,通常爲 32-256 個字節。最多見的緩存行大小是 64 個字節;
在咱們的隊列,數則或者 Disruptor 中,理想狀態下就是生產者和消費的速度保持相對一致,這樣能避免阻塞的發生,其生產者和消費者就分別位於數組的頭部和尾部;
可是這樣的理想狀態很難到達,要麼是生產者快一些,要麼是消費者快一些,其結果以下圖;
因此頭和尾一般都位於同一個緩存行中,這樣者更新頭的時候,將對應的緩存標記爲失效,同時尾也被標記爲了失效,者就是僞緩存;
下面是一個緩存的測試例子;
public final class FalseSharing implements Runnable { private static final int NUM_THREADS = 4; // change private static final long ITERATIONS = 500L * 1000L * 1000L; private final int arrayIndex; private static VolatileLong[] longs = new VolatileLong[NUM_THREADS]; static { for (int i = 0; i < longs.length; i++) { longs[i] = new VolatileLong(); } } public FalseSharing(final int arrayIndex) { this.arrayIndex = arrayIndex; } public static void main(final String[] args) throws Exception { final long start = System.nanoTime(); runTest(); System.out.println("duration = " + (System.nanoTime() - start)); } private static void runTest() throws InterruptedException { Thread[] threads = new Thread[NUM_THREADS]; for (int i = 0; i < threads.length; i++) { threads[i] = new Thread(new FalseSharing(i)); } for (Thread t : threads) { t.start(); } for (Thread t : threads) { t.join(); } } @Override public void run() { long i = ITERATIONS + 1; while (0 != --i) { longs[arrayIndex].value = i; } } public static final class VolatileLong { // public long p1, p2, p3, p4, p5, p6; // cache line padding public volatile long value = 0L; // public long p8, p9, p10, p11, p12, p13, p14, p15; // cache line padding } }
這裏不一樣的機器測試的結果不一樣,你們能夠修改線程數,padding 數,和 padding 的前後順序;會獲得不一樣的結果;
我測試的結果:
無 padding :17988876300
有 padding :4667271000
能夠看到是查了一個數量級
這樣的緩存填充,在 Disruptor 中隨處可見:
abstract class RingBufferPad { protected long p1, p2, p3, p4, p5, p6, p7; } public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E> { public static final long INITIAL_CURSOR_VALUE = Sequence.INITIAL_VALUE; protected long p1, p2, p3, p4, p5, p6, p7; ... }
併發的處理,一樣的 Disruptor 中隨處可見,雖然在平時寫代碼的時候也會注意,可是當狀態變量多了之後,代碼就會變得很複雜,不容易讀懂;而在 Disruptor 中由 Sequence 串聯起來的各個部分,以及策略模式的應用,使得每部分的處理同樣的清晰;這裏的內容太多了就不一一分析了,好比 MultiProducerSequencer 和 SingleProducerSequencer;
// SingleProducerSequencer public long next(int n) { if (n < 1) throw new IllegalArgumentException("n must be > 0"); long nextValue = this.nextValue; long nextSequence = nextValue + n; long wrapPoint = nextSequence - bufferSize; long cachedGatingSequence = this.cachedValue; if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) { cursor.setVolatile(nextValue); // StoreLoad fence long minSequence; while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue))) { LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin? } this.cachedValue = minSequence; } this.nextValue = nextSequence; return nextSequence; } // MultiProducerSequencer public long next(int n) { if (n < 1) throw new IllegalArgumentException("n must be > 0"); long current; long next; do { current = cursor.get(); next = current + n; long wrapPoint = next - bufferSize; long cachedGatingSequence = gatingSequenceCache.get(); if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current) { long gatingSequence = Util.getMinimumSequence(gatingSequences, current); if (wrapPoint > gatingSequence) { LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy? continue; } gatingSequenceCache.set(gatingSequence); } else if (cursor.compareAndSet(current, next)) { break; } } while (true); return next; }
對 Disruptor 源碼查看的最大感受是,習覺得常的結構設計模式,均可以有更精妙的寫法,若是 Sequence 承擔的各部分邏輯串聯的角色,總體的消費者生產者模式,消費者部分能夠當作是觀察者模式,也能夠看出是事件監聽模式,以及併發控制的策略模式;兩外就是包括僞緩存在內的各細節優化;