Disruptor高性能隊列實現原理

1、Disruptor簡介

  1. Disruptor是英國外匯交易公司LMAX開發的一個低延遲高性能=無鎖的有界循環數組。基於Disruptor開發的系統單線程能支撐每秒600萬訂單,目前已經開源的併發框架。Log4j2底層使用的併發框架
  2. Disruptor設計特色html

    1. 環形數據結構:底層使用的是數組而非連表
    2. 元素位置定位:數組的長度是2^n,下標是遞增的,能夠經過位運算快速定位
    3. 無鎖設計,生產者或消費者須要先申請位置,申請成功之後才能讀寫,申請過程當中經過CAS保證線程安全。
  3. 用於解決單機多線程之間的數據交換,而非相似於kafka的分佈式隊列。

2、JDK裏的隊列解決方案

隊裏 有界性 底層結構
ArrayBlockingQueue 有界 有鎖 數組
LinkedBlockingQueue 有界 有鎖 鏈表
ConcurrentLinkedQueue 無界 無鎖 鏈表

在高併發且要求較高的穩定性的系統場景下,非了防止生產者速度過快,只能選有界隊列;同時,爲了減小Java的垃圾回收對系統性能的影響儘可能選擇「數組」做爲隊列的底層結構,符合條件只有一個:ArrayBlockingQueuejava

2.1 ArrayBlockingQueue的問題

  1. 加鎖:不加鎖的性能 > CAS操做的性能 > 加鎖的性能。數組

    2.1.2 僞共享

    僞共享:緩存系統中是以緩存行(cache line)爲單位存儲的,當多線程修改互相獨立的變量時,若是這些變量共享同一個緩存行,就會無心中影響彼此的性能,
  2. CPU 和主內存之間有好幾層緩存,距離CPU越近,緩存空間越小,速度越快。
  3. CPU運算時,優先從最近的緩存尋找數據,找不到時再往上層去找。

image

  1. 緩存系中以 緩存行(cache line) 爲單位存儲,一個緩存行有64字節,能夠存儲8個long類型數據。當cpu訪問一個long類型的數組,當數組中的一個值被加載到緩存中,它會額外加載另外7個。當數組的一個值失效,則整個緩存行失效,它將換出其餘7個值。
  2. ArrayBlockingQueue有三個成員變量:緩存

      • takeIndex:須要被取走的元素下標
      • putIndex:可被元素插入的位置的下標
      • count:隊列中元素的數量

這三個變量很容易放到一個緩存行中,可是之間修改沒有太多的關聯。因此每次修改,都會使以前緩存的數據失效,從而不能徹底達到共享的效果。安全

public class ArrayBlockingQueue<E> {

    /** The queued items */
    final Object[] items;

    /** items index for next take, poll, peek or remove */
    int takeIndex;

    /** items index for next put, offer, or add */
    int putIndex;

    /** Number of elements in the queue */
    int count;

}
  1. 僞共享解決思路:增大數組元素的間隔使得由不一樣線程存取的元素位於不一樣的緩存行上,以空間換時間。
// value1和value2可能會產生僞共享
class ValueNoPadding {
    protected volatile long value1 = 0L;
    protected volatile long value2 = 0L;
}

// value1和value2中間插入無用值 p1~p14 
class ValuePadding {
    protected long p1, p2, p3, p4, p5, p6, p7;
    protected volatile long value1 = 0L;
    protected long p9, p10, p11, p12, p13, p14;
    protected volatile long value2 = 0L;
}

3、Disruptor

RingBuffer

  1. ringBuffer是一個環,用作在不一樣線程間傳遞數據的空間
  2. ringBuffer擁有一個序號,整個序號是遞增的,用於指向下一個可用元素。
  3. 隊列空間在建立時就固定再也不改變,可用下降GC的壓力
    image

使用示例

  1. 準備數據容器數據結構

    // 數據容器,存放生產和消費的數據內容
    public class LongEvent {
     private long value;
    }
  2. 準備數據容器的生產工廠,用於RingBuffer初始化時的數據填充多線程

    // 數據容器生產工廠
    public class LongEventFactory implements EventFactory<LongEvent> {
     public LongEvent newInstance() {
        return new LongEvent();
     }
    }
  3. 準備消費者併發

    //消費者
    public class LongEventConsumer implements EventHandler<LongEvent> {
    
     /**
      *
      * @param longEvent
      * @param sequence 當前的序列
      * @param endOfBatch 是不是最後一個數據
      * @throws Exception
      */
     @Override
     public void onEvent(LongEvent longEvent, long sequence, boolean endOfBatch) throws Exception {
         String str = String.format("long event : %s l:%s b:%s", longEvent.getValue(), sequence, endOfBatch);
         System.out.println(str);
     }
    }
  4. 生產線程、主線程框架

    public class Main {
    
     public static void main(String[] args) throws Exception {
         
         // 線程工廠
         ThreadFactory threadFactory = (r) -> new Thread(r);
    
         // disruptor-建立一個disruptor
         // 設置數據容器的工廠類,ringBuffer的初始化大小,消費者線程的工廠類
         Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(new LongEventFactory(), 8, threadFactory);
         // disruptor-設置消費者
         disruptor.handleEventsWith(new LongEventConsumer());
         disruptor.start();
         
         // 獲取disruptor的RingBuffer
         RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
    
         // 主線程開始生產
         for (long l = 0; l <= 8; l++) {
             long nextIndex = ringBuffer.next();
             
             LongEvent event = ringBuffer.get(nextIndex);
             event.setValue(l);
             ringBuffer.publish(nextIndex);
             
             Thread.sleep(1000);
         }
     }
    }

實現原理

單生產者生產數據的流程

  1. 生產者線程申請寫入M個數據
  2. disruptor從當前指針cursor順序去找M個可寫空間,返回找到的可用空間的最大序號
  3. 經過CAS比對返回的序號和申請的序號是否一致,判斷是否會覆蓋未讀的元素,若返回正確,直接寫入數據

image

多生產者生產數據的流程

  1. 引入一個與ringBuffer大小相同的buff:availableBuffer用於記錄ringBuffer每個空間的使用狀況,若生產者寫入數據,則將對應availableBuffer位置標記爲寫入成功,若消費者讀取了數據,則將對應的availableBuffer位置標記爲空閒。
  2. 多個生產者分配空間時,使用CAS給每個線程獲取不一樣的數組空間進行操做。
  3. 多個消費者在消費數據時,順序的從availableBuffer搜索一段連續可讀的空間,並返回該空間的最大序列號,並讀取數據,同時將availableBuffer的對應的位置進行標記空閒。

image

Disruptor 解決僞共享與線程可見性問題

// 數據左右兩邊插入多餘變量隔離真正的變量
class LhsPadding
{
    protected long p1, p2, p3, p4, p5, p6, p7;
}

class Value extends LhsPadding
{
    protected volatile long value;
}

class RhsPadding extends Value
{
    protected long p9, p10, p11, p12, p13, p14, p15;
}
public class Sequence extends RhsPadding
{
    static final long INITIAL_VALUE = -1L;
    private static final Unsafe UNSAFE;
    private static final long VALUE_OFFSET;

 
    public Sequence(final long initialValue)
    {
        UNSAFE.putOrderedLong(this, VALUE_OFFSET, initialValue);
    }

    public long get()
    {
        return value;
    }
    // 使用UNSAFE操做直接修改內存值
    public void set(final long value)
    {
        UNSAFE.putOrderedLong(this, VALUE_OFFSET, value);
    }
}

4、參考文獻

  1. https://tech.meituan.com/2016...
  2. https://www.cnblogs.com/crazy...
相關文章
相關標籤/搜索