高性能隊列 Disruptor

偶然中看到一篇 關於disruptor的分析,高性能低延遲的特性吸引了我。html

disruptorjava

java實現的線程間通訊的高性能低延遲消息組件,也就是消息隊列,相比於BlockingQueue 最大的特色在於無鎖的機制,性能提升一倍以上,經常使用於金融行業。git

核心機制

1)無鎖 CAS

使用了CPU級別的CAS指令,比os級的 lock快不少 ,避免了上下文切換。github

2)數組減少GC開銷

使用環形數組,而不是鏈表的數組結構存儲數據,數組預分配,避免JavaGC的開銷r數組

3)避免僞共享

數組順序執行,利用了cpu的緩存特性,不至於 常常緩存失效,從新到內存讀,提升緩存命中率。disruptor會盡可能將兩個不相關的內存隔離到兩個緩存行上,不過要犧牲空間。一個緩存行可能128字節或者64字節。可重用大於替換。緩存

  • 環形數組結構

爲了不垃圾回收,採用數組而非鏈表。同時,數組對處理器的緩存機制更加友好。安全

  • 元素位置定位

數組長度2^n,經過位運算,加快定位的速度。下標採起遞增的形式。不用擔憂index溢出的問題。index是long類型,即便100萬QPS的處理速度,也須要30萬年才能用完。併發

9 223 372 036 854 775 807
  • 無鎖設計

每一個生產者或者消費者線程,會先申請能夠操做的元素在數組中的位置,申請到以後,直接在該位置寫入或者讀取數據。性能

 

核心組件

RingBuffer

經過使用遞增的sequence 表示 訪問位置,寫入位置等。不用訪問位置由消費者維護,而ringBuffer 自身維護了一個當前的寫入值。經過CAS保證併發的安全性。而消費者維護消息指針,這樣能夠帶來不少的功能,各個消費者可互不影響的消費。但內部也提供瞭如順序執行,同步執行,等消費機制。經過取餘的方式來訪問數組的位置,且不須要考慮溢出的問題,由於long類型的自增,是短期內沒法一溢出。且其須要跟蹤消費者的位置,來肯定是否隊列已滿。ui

Sequence

最核心的組件,經過sequence達到多生產者互斥的訪問,生產者與消費者之間的協調,以及消納者之間的協調。其本質是一個遞增的序號(計數器),線程間引用傳遞,CAS更新。線程安全。而且經過padding避免僞共享。

下面給出一個多消費者如何獲取資源的源碼。當生產者須要的資源超過了可用資源時拋異常。

//多生產者時獲取n個生產位置
public long tryNext(int n) throws InsufficientCapacityException {
        if(n < 1) {
            throw new IllegalArgumentException("n must be > 0");
        } else {
            long current;
            long next;
            do {
                current = this.cursor.get();
                next = current + (long)n;
                if(!this.hasAvailableCapacity(this.gatingSequences, n, current)) {//檢測是否超過最慢的消費者
                    throw InsufficientCapacityException.INSTANCE;
                }
            } while(!this.cursor.compareAndSet(current, next));

            return next;
        }
}
private boolean hasAvailableCapacity(Sequence[] gatingSequences, int requiredCapacity, long cursorValue) {
        long wrapPoint = cursorValue + (long)requiredCapacity - (long)this.bufferSize;
        long cachedGatingSequence = this.gatingSequenceCache.get();
        if(wrapPoint > cachedGatingSequence || cachedGatingSequence > cursorValue) {
            long minSequence = Util.getMinimumSequence(gatingSequences, cursorValue);
            this.gatingSequenceCache.set(minSequence);
            if(wrapPoint > minSequence) {
                return false;
            }
        }

        return true;
    }

SequenceBarrier

用來隊列(RingBuffer)與消費者以及消費者與消費與消費者之間的依賴關係,也就是一個執行順序的的定義。因此消費者必須 小於隊列中的curosr(當前隊列中隊首的位置).SequenceBarrier會收集所依賴的組件的Sequencr.

WaitStrategy

當消費者等待在SequenceBarrier上時,有多種策略,在延遲和CPU資源的佔用上各有不一樣。

  • BusySpinWaitStrategy : 自旋等待,相似Linux Kernel使用的自旋鎖。低延遲但同時對CPU資源的佔用也多。

  • BlockingWaitStrategy : 使用鎖和條件變量。CPU資源的佔用少,延遲大。

  • SleepingWaitStrategy : 在屢次循環嘗試不成功後,選擇讓出CPU,等待下次調度,屢次調度後仍不成功,嘗試前睡眠一個納秒級別的時間再嘗試。這種策略平衡了延遲和CPU資源佔用,但延遲不均勻。

  • YieldingWaitStrategy : 在屢次循環嘗試不成功後,選擇讓出CPU,等待下次調。平衡了延遲和CPU資源佔用,但延遲也比較均勻。

  • PhasedBackoffWaitStrategy : 上面多種策略的綜合,CPU資源的佔用少,延遲大。

BatchEvenProcessor

在Disruptor中,消費者是以EventProcessor的形式存在的。其中一類消費者是BatchEvenProcessor。每一個BatchEvenProcessor有一個Sequence,來記錄本身消費RingBuffer中消息的狀況。因此,一個消息必然會被每個BatchEvenProcessor消費。

WorkProcessor

另外一類消費者是WorkProcessor。每一個WorkProcessor也有一個Sequence,多個WorkProcessor還共享一個Sequence用於互斥的訪問RingBuffer。一個消息被一個WorkProcessor消費,就不會被共享一個Sequence的其餘WorkProcessor消費。這個被WorkProcessor共享的Sequence至關於尾指針。

WorkerPool

共享同一個Sequence的WorkProcessor可由一個WorkerPool管理,這時,共享的Sequence也由WorkerPool建立。

 

http://www.tuicool.com/articles/b2eUBn

http://tech.meituan.com/disruptor.html

http://ifeve.com/disruptor/

http://ziyue1987.github.io/pages/2013/09/22/disruptor-use-manual.html#onepublishertoonebatcheventprocessor

相關文章
相關標籤/搜索