你應該知道的高性能無鎖隊列Disruptor

1.何爲隊列

聽到隊列相信你們對其並不陌生,在咱們現實生活中隊列隨處可見,去超市結帳,你會看見你們都會一排排的站得好好的,等待結帳,爲何要站得一排排的,你想象一下你們都沒有素質,一窩蜂的上去結帳,不只讓這個超市崩潰,還會容易形成各類踩踏事件,固然這些事其實在咱們現實中也是會常常發生。java

固然在計算機世界中,隊列是屬於一種數據結構,隊列採用的FIFO(first in firstout),新元素(等待進入隊列的元素)老是被插入到尾部,而讀取的時候老是從頭部開始讀取。在計算中隊列通常用來作排隊(如線程池的等待排隊,鎖的等待排隊),用來作解耦(生產者消費者模式),異步等等。git

2.jdk中的隊列

在jdk中的隊列都實現了java.util.Queue接口,在隊列中又分爲兩類,一類是線程不安全的,ArrayDeque,LinkedList等等,還有一類都在java.util.concurrent包下屬於線程安全,而在咱們真實的環境中,咱們的機器都是屬於多線程,當多線程對同一個隊列進行排隊操做的時候,若是使用線程不安全會出現,覆蓋數據,數據丟失等沒法預測的事情,因此咱們這個時候只能選擇線程安全的隊列。在jdk中提供的線程安全的隊列下面簡單列舉部分隊列:github

隊列名字 是否加鎖 數據結構 關鍵技術點 是否有鎖 是否有界
ArrayBlockingQueue 數組array ReentrantLock 有鎖 有界
LinkedBlockingQueue 鏈表 ReentrantLock 有鎖 有界
LinkedTransferQueue 鏈表 CAS 無鎖 無界
ConcurrentLinkedQueue 鏈表 CAS 無鎖 無界

咱們能夠看見,咱們無鎖的隊列是無界的,有鎖的隊列是有界的,這裏就會涉及到一個問題,咱們在真正的線上環境中,無界的隊列,對咱們系統的影響比較大,有可能會致使咱們內存直接溢出,因此咱們首先得排除無界隊列,固然並非無界隊列就沒用了,只是在某些場景下得排除。其次還剩下ArrayBlockingQueue,LinkedBlockingQueue兩個隊列,他們兩個都是用ReentrantLock控制的線程安全,他們兩個的區別一個是數組,一個是鏈表,在隊列中,通常獲取這個隊列元素以後緊接着會獲取下一個元素,或者一次獲取多個隊列元素都有可能,而數組在內存中地址是連續的,在操做系統中會有緩存的優化(下面也會介紹緩存行),因此訪問的速度會略勝一籌,咱們也會盡可能去選擇ArrayBlockingQueue。而事實證實在不少第三方的框架中,好比早期的log4j異步,都是選擇的ArrayBlockingQueue。數組

固然ArrayBlockingQueue,也有本身的弊端,就是性能比較低,爲何jdk會增長一些無鎖的隊列,其實就是爲了增長性能,很苦惱,又須要無鎖,又須要有界,這個時候恐怕會忍不住說一句你咋不上天呢?可是還真有人上天了。緩存

3.Disruptor

Disruptor就是上面說的那個天,Disruptor是英國外匯交易公司LMAX開發的一個高性能隊列,而且是一個開源的併發框架,並得到2011Duke’s程序框架創新獎。可以在無鎖的狀況下實現網絡的Queue併發操做,基於Disruptor開發的系統單線程能支撐每秒600萬訂單。目前,包括Apache Storm、Camel、Log4j2等等知名的框架都在內部集成了Disruptor用來替代jdk的隊列,以此來得到高性能。安全

3.1爲何這麼牛逼?

上面已經把Disruptor吹出了花了,你確定會產生疑問,他真的能有這麼牛逼嗎,個人回答是固然的,在Disruptor中有三大殺器:bash

  • CAS
  • 消除僞共享
  • RingBuffer 有了這三大殺器,Disruptor才變得如此牛逼。

3.1.1鎖和CAS

咱們ArrayBlockingQueue爲何會被拋棄的一點,就是由於用了重量級lock鎖,在咱們加鎖過程當中咱們會把鎖掛起,解鎖後,又會把線程恢復,這一過程會有必定的開銷,而且咱們一旦沒有獲取鎖,這個線程就只能一直等待,這個線程什麼事也不能作。網絡

CAS(compare and swap),顧名思義先比較在交換,通常是比較是不是老的值,若是是的進行交換設置,你們熟悉樂觀鎖的人都知道CAS能夠用來實現樂觀鎖,CAS中沒有線程的上下文切換,減小了沒必要要的開銷。 這裏使用JMH,用兩個線程,每次1一次調用,在我本機上進行測試,代碼以下:數據結構

@BenchmarkMode({Mode.SampleTime})
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@Warmup(iterations=3, time = 5, timeUnit = TimeUnit.MILLISECONDS)
@Measurement(iterations=1,batchSize = 100000000)
@Threads(2)
@Fork(1)
@State(Scope.Benchmark)
public class Myclass {
    Lock lock = new ReentrantLock();
    long i = 0;
    AtomicLong atomicLong = new AtomicLong(0);
    @Benchmark
    public void measureLock() {
        lock.lock();
        i++;
        lock.unlock();
    }
    @Benchmark
    public void measureCAS() {
        atomicLong.incrementAndGet();
    }
    @Benchmark
    public void measureNoLock() {
        i++;
    }
}
複製代碼

測試出來結果以下:多線程

測試項目 測試結果
Lock 26000ms
CAS 4840ms
無鎖 197ms

能夠看見Lock是五位數,CAS是四位數,無鎖更小是三位數。 由此咱們能夠知道Lock>CAS>無鎖。

而咱們的Disruptor中使用的就是CAS,他利用CAS進行隊列中的一些下標設置,減小了鎖的衝突,提升了性能。

另外對於jdk中其餘的無鎖隊列也是使用CAS,原子類也是使用CAS。

3.1.2僞共享

談到了僞共享就不得不說計算機CPU緩存,緩存大小是CPU的重要指標之一,並且緩存的結構和大小對CPU速度的影響很是大,CPU內緩存的運行頻率極高,通常是和處理器同頻運做,工做效率遠遠大於系統內存和硬盤。實際工做時,CPU每每須要重複讀取一樣的數據塊,而緩存容量的增大,能夠大幅度提高CPU內部讀取數據的命中率,而不用再到內存或者硬盤上尋找,以此提升系統性能。可是從CPU芯片面積和成本的因素來考慮,緩存都很小。

CPU緩存能夠分爲一級緩存,二級緩存,現在主流CPU還有三級緩存,甚至有些CPU還有四級緩存。每一級緩存中所儲存的所有數據都是下一級緩存的一部分,這三種緩存的技術難度和制形成本是相對遞減的,因此其容量也是相對遞增的。

爲何CPU會有L一、L二、L3這樣的緩存設計?主要是由於如今的處理器太快了,而從內存中讀取數據實在太慢(一個是由於內存自己速度不夠,另外一個是由於它離CPU太遠了,總的來講須要讓CPU等待幾十甚至幾百個時鐘週期),這個時候爲了保證CPU的速度,就須要延遲更小速度更快的內存提供幫助,而這就是緩存。對這個感興趣能夠把電腦CPU拆下來,本身把玩一下。

每一次你聽見intel發佈新的cpu什麼,好比i7-7700k,8700k,都會對cpu緩存大小進行優化,感興趣能夠自行下來搜索,這些的發佈會或者發佈文章。

Martin和Mike的 QConpresentation演講中給出了一些每一個緩存時間:

從CPU到 大約須要的CPU週期 大約須要的時間
主存 約60-80納秒
QPI 總線傳輸(between sockets, not drawn) 約20ns
L3 cache 約40-45 cycles 約15ns
L2 cache 約10 cycles 約3ns
L1 cache 約3-4 cycles 約1ns
寄存器 1 cycle

緩存行

在cpu的多級緩存中,並非以獨立的項來保存的,而是相似一種pageCahe的一種策略,以緩存行來保存,而緩存行的大小一般是64字節,在Java中Long是8個字節,因此能夠存儲8個Long,舉個例子,你訪問一個long的變量的時候,他會把幫助再加載7個,咱們上面說爲何選擇數組不選擇鏈表,也就是這個緣由,在數組中能夠依靠緩衝行獲得很快的訪問。

緩存行是萬能的嗎?NO,由於他依然帶來了一個缺點,我在這裏舉個例子說明這個缺點,能夠想象有個數組隊列,ArrayQueue,他的數據結構以下:

class ArrayQueue{
    long maxSize;
    long currentIndex;
}
複製代碼

對於maxSize是咱們一開始就定義好的,數組的大小,對於currentIndex,是標誌咱們當前隊列的位置,這個變化比較快,能夠想象你訪問maxSize的時候,是否是把currentIndex也加載進來了,這個時候,其餘線程更新currentIndex,就會把cpu中的緩存行置位無效,請注意這是CPU規定的,他並非只吧currentIndex置位無效,若是此時又繼續訪問maxSize他依然得繼續從內存中讀取,可是MaxSize倒是咱們一開始定義好的,咱們應該訪問緩存便可,可是卻被咱們常常改變的currentIndex所影響。

Padding的魔法

爲了解決上面緩存行出現的問題,在Disruptor中採用了Padding的方式,

class LhsPadding
{
    protected long p1, p2, p3, p4, p5, p6, p7;
}

class Value extends LhsPadding
{
    protected volatile long value;
}

class RhsPadding extends Value
{
    protected long p9, p10, p11, p12, p13, p14, p15;
}
複製代碼

其中的Value就被其餘一些無用的long變量給填充了。這樣你修改Value的時候,就不會影響到其餘變量的緩存行。

最後順便一提,在jdk8中提供了@Contended的註解,固然通常來講只容許Jdk中內部,若是你本身使用那就得配置Jvm參數 -RestricContentended = fase,將限制這個註解置位取消。不少文章分析了ConcurrentHashMap,可是都把這個註解給忽略掉了,在ConcurrentHashMap中就使用了這個註解,在ConcurrentHashMap每一個桶都是單獨的用計數器去作計算,而這個計數器因爲時刻都在變化,因此被用這個註解進行填充緩存行優化,以此來增長性能。

3.1.3RingBuffer

在Disruptor中採用了數組的方式保存了咱們的數據,上面咱們也介紹了採用數組保存咱們訪問時很好的利用緩存,可是在Disruptor中進一步選擇採用了環形數組進行保存數據,也就是RingBuffer。在這裏先說明一下環形數組並非真正的環形數組,在RingBuffer中是採用取餘的方式進行訪問的,好比數組大小爲 10,0訪問的是數組下標爲0這個位置,其實10,20等訪問的也是數組的下標爲0的這個位置。

實際上,在這些框架中取餘並非使用%運算,都是使用的&與運算,這就要求你設置的大小通常是2的N次方也就是,10,100,1000等等,這樣減去1的話就是,1,11,111,就能很好的使用index & (size -1),這樣利用位運算就增長了訪問速度。 若是在Disruptor中你不用2的N次方進行大小設置,他會拋出buffersize必須爲2的N次方異常。

固然其不只解決了數組快速訪問的問題,也解決了不須要再次分配內存的問題,減小了垃圾回收,由於咱們0,10,20等都是執行的同一片內存區域,這樣就不須要再次分配內存,頻繁的被JVM垃圾回收器回收。

自此三大殺器已經說完了,有了這三大殺器爲Disruptor如此高性能墊定了基礎。接下來還會在講解如何使用Disruptor和Disruptor的具體的工做原理。

3.2Disruptor怎麼使用

下面舉了一個簡單的例子:

ublic static void main(String[] args) throws Exception {
        // 隊列中的元素
        class Element {
            @Contended
            private String value;


            public String getValue() {
                return value;
            }

            public void setValue(String value) {
                this.value = value;
            }
        }

        // 生產者的線程工廠
        ThreadFactory threadFactory = new ThreadFactory() {
            int i = 0;
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "simpleThread" + String.valueOf(i++));
            }
        };

        // RingBuffer生產工廠,初始化RingBuffer的時候使用
        EventFactory<Element> factory = new EventFactory<Element>() {
            @Override
            public Element newInstance() {
                return new Element();
            }
        };

        // 處理Event的handler
        EventHandler<Element> handler = new EventHandler<Element>() {
            @Override
            public void onEvent(Element element, long sequence, boolean endOfBatch) throws InterruptedException {
                System.out.println("Element: " + Thread.currentThread().getName() + ": " + element.getValue() + ": " + sequence);
//                Thread.sleep(10000000);
            }
        };


        // 阻塞策略
        BlockingWaitStrategy strategy = new BlockingWaitStrategy();

        // 指定RingBuffer的大小
        int bufferSize = 8;

        // 建立disruptor,採用單生產者模式
        Disruptor<Element> disruptor = new Disruptor(factory, bufferSize, threadFactory, ProducerType.SINGLE, strategy);

        // 設置EventHandler
        disruptor.handleEventsWith(handler);

        // 啓動disruptor的線程
        disruptor.start();
        for (int i = 0; i < 10; i++) {
            disruptor.publishEvent((element, sequence) -> {
                System.out.println("以前的數據" + element.getValue() + "當前的sequence" + sequence);
                element.setValue("我是第" + sequence + "個");
            });

        }
    }
複製代碼

在Disruptor中有幾個比較關鍵的: ThreadFactory:這是一個線程工廠,用於咱們Disruptor中生產者消費的時候須要的線程。 EventFactory:事件工廠,用於產生咱們隊列元素的工廠,在Disruptor中,他會在初始化的時候直接填充滿RingBuffer,一次到位。 EventHandler:用於處理Event的handler,這裏一個EventHandler能夠看作是一個消費者,可是多個EventHandler他們都是獨立消費的隊列。 WorkHandler:也是用於處理Event的handler,和上面區別在於,多個消費者都是共享同一個隊列。 WaitStrategy:等待策略,在Disruptor中有多種策略,來決定消費者獲消費時,若是沒有數據採起的策略是什麼?下面簡單列舉一下Disruptor中的部分策略

  • BlockingWaitStrategy:經過線程阻塞的方式,等待生產者喚醒,被喚醒後,再循環檢查依賴的sequence是否已經消費。

  • BusySpinWaitStrategy:線程一直自旋等待,可能比較耗cpu

  • LiteBlockingWaitStrategy:線程阻塞等待生產者喚醒,與BlockingWaitStrategy相比,區別在signalNeeded.getAndSet,若是兩個線程同時訪問一個訪問waitfor,一個訪問signalAll時,能夠減小lock加鎖次數.

  • LiteTimeoutBlockingWaitStrategy:與LiteBlockingWaitStrategy相比,設置了阻塞時間,超過期間後拋異常。

  • YieldingWaitStrategy:嘗試100次,而後Thread.yield()讓出cpu

EventTranslator:實現這個接口能夠將咱們的其餘數據結構轉換爲在Disruptor中流通的Event。

3.3工做原理

上面已經介紹了CAS,減小僞共享,RingBuffer三大殺器,介紹下來講一下Disruptor中生產者和消費者的整個流程。

3.3.1生產者

對於生產者來講,能夠分爲多生產者和單生產者,用ProducerType.Single,和ProducerType.MULTI區分,多生產者和單生產者來講多了CAS,由於單生產者因爲是單線程,因此不須要保證線程安全。

在disruptor中一般用disruptor.publishEvent和disruptor.publishEvents()進行單發和羣發。

在disruptor發佈一個事件進入隊列須要下面幾個步驟:

  1. 首先獲取RingBuffer中下一個在RingBuffer上能夠發佈的位置,這個能夠分爲兩類:
  • 歷來沒有寫過的位置
  • 已經被全部消費者讀過,能夠在寫的位置。 若是沒有讀取到會一直嘗試去讀,disruptor作的很巧妙,並無一直佔據CPU,而是經過LockSuport.park(),進行了一下將線程阻塞掛起操做,爲的是不讓CPU一直進行這種空循環,否則其餘線程都搶不到CPU時間片。
    獲取位置以後會進行cas進行搶佔,若是是單線程就不須要。
  1. 接下來調用咱們上面所介紹的EventTranslator將第一步中RingBuffer中那個位置的event交給EventTranslator進行重寫。
  2. 進行發佈,在disruptor還有一個額外的數組用來記錄當前ringBuffer所在位置目前最新的序列號是多少,拿上面那個0,10,20舉例,寫到10的時候這個avliableBuffer就在對應的位置上記錄目前這個是屬於10,有什麼用呢後面會介紹。進行發佈的時候須要更新這個avliableBuffer,而後進行喚醒全部阻塞的生產者。

下面簡單畫一下流程,上面咱們拿10舉例是不對的,由於bufferSize必需要2的N次方,因此咱們這裏拿Buffersize=8來舉例:下面介紹了當咱們已經push了8個event也就是一圈的時候,接下來再push 3條消息的一些過程: 1.首先調用next(3),咱們當前在7這個位置上因此接下來三條是8,9,10,取餘也就是0,1,2。 2.重寫0,1,2這三個內存區域的數據。 3.寫avaliableBuffer。

對了不知道你們對上述流程是否是很熟悉呢,對的那就是相似咱們的2PC,兩階段提交,先進行RingBuffer的位置鎖定,而後在進行提交和通知消費者。具體2PC的介紹能夠參照個人另一篇文章再有人問你分佈式事務,給他看這篇文章

3.3.1消費者

對於消費者來講,上面介紹了分爲兩種,一種是多個消費者獨立消費,一種是多個消費者消費同一個隊列,這裏介紹一下較爲複雜的多個消費者消費同一個隊列,能理解這個也就能理解獨立消費。 在咱們的disruptor.strat()方法中會啓動咱們的消費者線程以此來進行後臺消費。在消費者中有兩個隊列須要咱們關注,一個是全部消費者共享的進度隊列,還有個是每一個消費者獨立消費進度隊列。 1.對消費者共享隊列進行下一個Next的CAS搶佔,以及對本身消費進度的隊列標記當前進度。 2.爲本身申請可讀的RingBuffer的Next位置,這裏的申請不只僅是申請到next,有可能會申請到比Next大的一個範圍,阻塞策略的申請的過程以下:

  • 獲取生產者對RingBuffer最新寫的位置
  • 判斷其是否小於我要申請讀的位置
  • 若是大於則證實這個位置已經寫了,返回給生產者。
  • 若是小於證實尚未寫到這個位置,在阻塞策略中會進行阻塞,其會在生產者提交階段進行喚醒。 3.對這個位置進行可讀校驗,由於你申請的位置多是連續的,好比生產者目前在7,接下來申請讀,若是消費者已經把8和10這個序列號的位置寫進去了,可是9這個位置還沒來得及寫入,因爲第一步會返回10,可是9實際上是不能讀的,因此得把位置向下收縮到8。
    4.若是收縮完了以後比當前next要小,則繼續循環申請。 5.交給handler.onEvent()處理

同樣的咱們舉個例子,咱們要申請next=8這個位置。 1.首先在共享隊列搶佔進度8,在獨立隊列寫入進度7 2.獲取8的可讀的最大位置,這裏根據不一樣的策略進行,咱們選擇阻塞,因爲生產者生產了8,9,10,因此返回的是10,這樣和後續就不須要再次和avaliableBuffer進行對比了。 3.最後交給handler進行處理。

4.Log4j中的Disruptor

下面的圖展示了Log4j使用Disruptor,ArrayBlockingQueue以及同步的Log4j吞吐量的對比,能夠看見使用了Disruptor完爆了其餘的,固然還有更多的框架使用了Disruptor,這裏就不作介紹了。

最後

本文介紹了傳統的阻塞隊列的缺點,後文重點吹逼了下Disruptor,以及他這麼牛逼的緣由,以及具體的工做流程。

若是之後有人問你叫你設計一個高效無鎖隊列,須要怎麼設計?相信你能從文章中總結出答案,若是對其有疑問或者想和我交流思路,能夠關注個人公衆號,加我好友和我一塊兒討論。

最後這篇文章被我收錄於JGrowing,一個全面,優秀,由社區一塊兒共建的Java學習路線,若是您想參與開源項目的維護,能夠一塊兒共建,github地址爲:github.com/javagrowing… 麻煩給個小星星喲。

若是你們以爲這篇文章對你有幫助,或者你有什麼疑問想提供1v1免費vip服務,均可以關注個人公衆號,你的關注和轉發是對我最大的支持,O(∩_∩)O:

相關文章
相關標籤/搜索