高性能高併發隊列-Disruptor

Disruptor是英國外匯交易公司LMAX開發的一個高性能隊列,研發的初衷是解決內存隊列的延遲問題(在性能測試中發現居然與I/O操做處於一樣的數量級)。基於Disruptor開發的系統單線程能支撐每秒600萬訂單,2010年在QCon演講後,得到了業界關注。2011年,企業應用軟件專家Martin Fowler專門撰寫長文介紹。同年它還得到了Oracle官方的Duke大獎。html

Java內置隊列

介紹Disruptor以前,咱們先來看一看經常使用的線程安全的內置隊列有什麼問題。Java的內置隊列以下表所示。java

隊列 有界性 數據結構
ArrayBlockingQueue bounded 加鎖 arraylist
LinkedBlockingQueue optionally-bounded 加鎖 linkedlist
ConcurrentLinkedQueue unbounded 無鎖 linkedlist
LinkedTransferQueue unbounded 無鎖 linkedlist
PriorityBlockingQueue unbounded 加鎖 heap
DelayQueue unbounded 加鎖 heap

隊列的底層通常分紅三種:數組、鏈表和堆。其中,堆通常狀況下是爲了實現帶有優先級特性的隊列,暫且不考慮。數組

咱們就從數組和鏈表兩種數據結構來看,基於數組線程安全的隊列,比較典型的是ArrayBlockingQueue,它主要經過加鎖的方式來保證線程安全;基於鏈表的線程安全隊列分紅LinkedBlockingQueue和ConcurrentLinkedQueue兩大類,前者也經過鎖的方式來實現線程安全,然後者以及上面表格中的LinkedTransferQueue都是經過原子變量compare and swap(如下簡稱「CAS」)這種不加鎖的方式來實現的。緩存

經過不加鎖的方式實現的隊列都是無界的(沒法保證隊列的長度在肯定的範圍內);而加鎖的方式,能夠實現有界隊列。在穩定性要求特別高的系統中,爲了防止生產者速度過快,致使內存溢出,只能選擇有界隊列;同時,爲了減小Java的垃圾回收對系統性能的影響,會盡可能選擇array/heap格式的數據結構。這樣篩選下來,符合條件的隊列就只有ArrayBlockingQueue。安全

Disruptor論文中講述了一個實驗:服務器

  • 這個測試程序調用了一個函數,該函數會對一個64位的計數器循環自增5億次。
  • 機器環境:2.4G 6核
  • 運算: 64位的計數器累加5億次
Method Time (ms)
Single thread 300
Single thread with CAS 5,700
Single thread with lock 10,000
Single thread with volatile write 4,700
Two threads with CAS 30,000
Two threads with lock 224,000

CAS操做比單線程無鎖慢了1個數量級;有鎖且多線程併發的狀況下,速度比單線程無鎖慢3個數量級。可見無鎖速度最快。數據結構

單線程狀況下,不加鎖的性能 > CAS操做的性能 > 加鎖的性能。多線程

在多線程狀況下,爲了保證線程安全,必須使用CAS或鎖,這種狀況下,CAS的性能超過鎖的性能,前者大約是後者的8倍。併發

綜上可知,加鎖的性能是最差的。分佈式

 

Disruptor的設計方案

Disruptor經過如下設計來解決隊列速度慢的問題:

  • 環形數組結構

爲了不垃圾回收,採用數組而非鏈表。同時,數組對處理器的緩存機制更加友好。

  • 元素位置定位

數組長度2^n,經過位運算,加快定位的速度。下標採起遞增的形式。不用擔憂index溢出的問題。index是long類型,即便100萬QPS的處理速度,也須要30萬年才能用完。

  • 無鎖設計

每一個生產者或者消費者線程,會先申請能夠操做的元素在數組中的位置,申請到以後,直接在該位置寫入或者讀取數據。

一個生產者

寫數據

生產者單線程寫數據的流程比較簡單:

  1. 申請寫入m個元素;
  2. 如果有m個元素能夠寫入,則返回最大的序列號。這兒主要判斷是否會覆蓋未讀的元素;
  3. 如果返回的正確,則生產者開始寫入元素。




圖5 單個生產者生產過程示意圖

多個生產者

多個生產者的狀況下,會遇到「如何防止多個線程重複寫同一個元素」的問題。Disruptor的解決方法是,每一個線程獲取不一樣的一段數組空間進行操做。這個經過CAS很容易達到。只須要在分配元素的時候,經過CAS判斷一下這段空間是否已經分配出去便可。

可是會遇到一個新問題:如何防止讀取的時候,讀到還未寫的元素。Disruptor在多個生產者的狀況下,引入了一個與Ring Buffer大小相同的buffer:available Buffer。當某個位置寫入成功的時候,便把availble Buffer相應的位置置位,標記爲寫入成功。讀取的時候,會遍歷available Buffer,來判斷元素是否已經就緒。

下面分讀數據和寫數據兩種狀況介紹。

讀數據

生產者多線程寫入的狀況會複雜不少:

  1. 申請讀取到序號n;
  2. 若writer cursor >= n,這時仍然沒法肯定連續可讀的最大下標。從reader cursor開始讀取available Buffer,一直查到第一個不可用的元素,而後返回最大連續可讀元素的位置;
  3. 消費者讀取元素。

以下圖所示,讀線程讀到下標爲2的元素,三個線程Writer1/Writer2/Writer3正在向RingBuffer相應位置寫數據,寫線程被分配到的最大元素下標是11。

讀線程申請讀取到下標從3到11的元素,判斷writer cursor>=11。而後開始讀取availableBuffer,從3開始,日後讀取,發現下標爲7的元素沒有生產成功,因而WaitFor(11)返回6。

而後,消費者讀取下標從3到6共計4個元素。




圖6 多個生產者狀況下,消費者消費過程示意圖

寫數據

多個生產者寫入的時候:

  1. 申請寫入m個元素;
  2. 如果有m個元素能夠寫入,則返回最大的序列號。每一個生產者會被分配一段獨享的空間;
  3. 生產者寫入元素,寫入元素的同時設置available Buffer裏面相應的位置,以標記本身哪些位置是已經寫入成功的。

以下圖所示,Writer1和Writer2兩個線程寫入數組,都申請可寫的數組空間。Writer1被分配了下標3到下表5的空間,Writer2被分配了下標6到下標9的空間。

Writer1寫入下標3位置的元素,同時把available Buffer相應位置置位,標記已經寫入成功,日後移一位,開始寫下標4位置的元素。Writer2一樣的方式。最終都寫入完成。




圖7 多個生產者狀況下,生產者生產過程示意圖

下面忽略數組的環形結構,介紹一下如何實現無鎖設計。整個過程經過原子變量CAS,保證操做的線程安全。

防止不一樣生產者對同一段空間寫入的代碼,以下所示:

public long tryNext(int n) throws InsufficientCapacityException
{
    if (n < 1)
    {
        throw new IllegalArgumentException("n must be > 0");
    }

    long current;
    long next;

    do
    {
        current = cursor.get();
        next = current + n;

        if (!hasAvailableCapacity(gatingSequences, n, current))
        {
            throw InsufficientCapacityException.INSTANCE;
        }
    }
    while (!cursor.compareAndSet(current, next));

    return next;
}

經過do/while循環的條件cursor.compareAndSet(current, next),來判斷每次申請的空間是否已經被其餘生產者佔據。假如已經被佔據,該函數會返回失敗,While循環從新執行,申請寫入空間。

消費者的流程與生產者很是相似,這兒就很少描述了。

總結

Disruptor經過精巧的無鎖設計實現了在高併發情形下的高性能。

在美團點評內部,不少高併發場景借鑑了Disruptor的設計,減小競爭的強度。其設計思想能夠擴展到分佈式場景,經過無鎖設計,來提高服務性能。

 

代碼樣例

/**
 * @description disruptor代碼樣例。每10ms向disruptor中插入一個元素,消費者讀取數據,並打印到終端
 */
import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.concurrent.ThreadFactory;

public class DisruptorMain
{
    public static void main(String[] args) throws Exception
    {
        // 隊列中的元素
        class Element {

            private int value;

            public int get(){
                return value;
            }

            public void set(int value){
                this.value= value;
            }

        }

        // 生產者的線程工廠
        ThreadFactory threadFactory = new ThreadFactory(){
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "simpleThread");
            }
        };

        // RingBuffer生產工廠,初始化RingBuffer的時候使用
        EventFactory<Element> factory = new EventFactory<Element>() {
            @Override
            public Element newInstance() {
                return new Element();
            }
        };

        // 處理Event的handler
        EventHandler<Element> handler = new EventHandler<Element>(){
            @Override
            public void onEvent(Element element, long sequence, boolean endOfBatch)
            {
                System.out.println("Element: " + element.get());
            }
        };

        // 阻塞策略
        BlockingWaitStrategy strategy = new BlockingWaitStrategy();

        // 指定RingBuffer的大小
        int bufferSize = 16;

        // 建立disruptor,採用單生產者模式
        Disruptor<Element> disruptor = new Disruptor(factory, bufferSize, threadFactory, ProducerType.SINGLE, strategy);

        // 設置EventHandler
        disruptor.handleEventsWith(handler);

        // 啓動disruptor的線程
        disruptor.start();

        RingBuffer<Element> ringBuffer = disruptor.getRingBuffer();

        for (int l = 0; true; l++)
        {
            // 獲取下一個可用位置的下標
            long sequence = ringBuffer.next();  
            try
            {
                // 返回可用位置的元素
                Element event = ringBuffer.get(sequence); 
                // 設置該位置元素的值
                event.set(l); 
            }
            finally
            {
                ringBuffer.publish(sequence);
            }
            Thread.sleep(10);
        }
    }
}

  切記:必定要在設置值的地方加上

  try{

  }finally{

  },

      不然若是數據發佈不成功,最後數據會逐漸填滿ringbuffer,最後後面來的數據根本沒有辦法調用可用空間,致使方法阻塞,佔用CPU和內存,沒法釋放資源,最後致使服務器死機

      注意,最後的 ringBuffer.publish 方法必須包含在 finally 中以確保必須獲得調用;若是某個請求的 sequence 未被提交,將會堵塞後續的發佈操做或者其它的 producer。

Disruptor 還提供另一種形式的調用來簡化以上操做,並確保 publish 老是獲得調用。

public class LongEvent
{
    private long value;

    public void set(long value)
    {
        this.value = value;
    }
}
static class Translator implements EventTranslatorOneArg<LongEvent, Long>{
    @Override
    publicvoid translateTo(LongEvent event, long sequence, Long data) {
        event.set(data);
    }    
}
    
public static Translator TRANSLATOR = new Translator();
    
public staticvoid publishEvent2(Disruptor<LongEvent> disruptor) {
    // 發佈事件;
    RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
    long data = getEventDataxxxx();//獲取要經過事件傳遞的業務數據;    
    ringBuffer.publishEvent(TRANSLATOR,data);
}

多個生產者

    在構建Disruptor實例的時候,須要指定生產者是單生產者ProducerType.SINGLE)仍是多生產者ProducerType.MULTI

多個消費者

   (類型1)   多個消費者每一個消費者都有機會消費相同數據,使用handleEventsWith方法

class ComsumerHandler implements EventHandler<ResultTxt>{
    
    private int no;

    public  ComsumerHandler(int no) {
        this.no=no;
    }

    @Override
    public void onEvent(ResultTxt resultTxt, long sequence, boolean endOfBatch)
        throws Exception {
        System.out.println(no+" data commming......"+resultTxt.getBarCode());
    }
}

//設置多個消費者
disruptor.handleEventsWith(new ComsumerHandler(1),new ComsumerHandler(2));

     (類型2)  多個消費者,每一個消費者消費不一樣數據。也就是說每一個消費者競爭數據,競爭到消費,其餘消費者沒有機會。使用handleEventsWithWorkerPool方法

class ComsumerHandler implements WorkHandler<ResultTxt>{

    private int no;

    public  ComsumerHandler(int no) {
        this.no=no;
    }

    @Override
    public void onEvent(ResultTxt event)
        throws Exception {
        System.out.println(no+"  data commming......"+event.getBarCode());
    }
}

//多個消費者,每一個消費者競爭消費不一樣數據
disruptor.handleEventsWithWorkerPool(new ComsumerHandler(1),new ComsumerHandler(2));

 

等待策略

生產者的等待策略

暫時只有休眠1ns。

LockSupport.parkNanos(1);

消費者的等待策略

名稱 措施 適用場景
BlockingWaitStrategy 加鎖 CPU資源緊缺,吞吐量和延遲並不重要的場景
BusySpinWaitStrategy 自旋 經過不斷重試,減小切換線程致使的系統調用,而下降延遲。推薦在線程綁定到固定的CPU的場景下使用
PhasedBackoffWaitStrategy 自旋 + yield + 自定義策略 CPU資源緊缺,吞吐量和延遲並不重要的場景
SleepingWaitStrategy 自旋 + yield + sleep 性能和CPU資源之間有很好的折中。延遲不均勻
TimeoutBlockingWaitStrategy 加鎖,有超時限制 CPU資源緊缺,吞吐量和延遲並不重要的場景
YieldingWaitStrategy 自旋 + yield + 自旋 性能和CPU資源之間有很好的折中。延遲比較均勻

更多詳細信息請參考: http://tech.meituan.com/disruptor.html

相關文章
相關標籤/搜索