聽到隊列相信你們對其並不陌生,在咱們現實生活中隊列隨處可見,去超市結帳,你會看見你們都會一排排的站得好好的,等待結帳,爲何要站得一排排的,你想象一下你們都沒有素質,一窩蜂的上去結帳,不只讓這個超市崩潰,還會容易形成各類踩踏事件,固然這些事其實在咱們現實中也是會常常發生。java
固然在計算機世界中,隊列是屬於一種數據結構,隊列採用的FIFO(first in firstout),新元素(等待進入隊列的元素)老是被插入到尾部,而讀取的時候老是從頭部開始讀取。在計算中隊列通常用來作排隊(如線程池的等待排隊,鎖的等待排隊),用來作解耦(生產者消費者模式),異步等等。git
在jdk中的隊列都實現了java.util.Queue接口,在隊列中又分爲兩類,一類是線程不安全的,ArrayDeque,LinkedList等等,還有一類都在java.util.concurrent包下屬於線程安全,而在咱們真實的環境中,咱們的機器都是屬於多線程,當多線程對同一個隊列進行排隊操做的時候,若是使用線程不安全會出現,覆蓋數據,數據丟失等沒法預測的事情,因此咱們這個時候只能選擇線程安全的隊列。在jdk中提供的線程安全的隊列下面簡單列舉部分隊列:github
隊列名字 | 是否加鎖 | 數據結構 | 關鍵技術點 | 是否有鎖 | 是否有界 |
---|---|---|---|---|---|
ArrayBlockingQueue | 是 | 數組array | ReentrantLock | 有鎖 | 有界 |
LinkedBlockingQueue | 是 | 鏈表 | ReentrantLock | 有鎖 | 有界 |
LinkedTransferQueue | 否 | 鏈表 | CAS | 無鎖 | 無界 |
ConcurrentLinkedQueue | 否 | 鏈表 | CAS | 無鎖 | 無界 |
咱們能夠看見,咱們無鎖的隊列是無界的,有鎖的隊列是有界的,這裏就會涉及到一個問題,咱們在真正的線上環境中,無界的隊列,對咱們系統的影響比較大,有可能會致使咱們內存直接溢出,因此咱們首先得排除無界隊列,固然並非無界隊列就沒用了,只是在某些場景下得排除。其次還剩下ArrayBlockingQueue,LinkedBlockingQueue兩個隊列,他們兩個都是用ReentrantLock控制的線程安全,他們兩個的區別一個是數組,一個是鏈表,在隊列中,通常獲取這個隊列元素以後緊接着會獲取下一個元素,或者一次獲取多個隊列元素都有可能,而數組在內存中地址是連續的,在操做系統中會有緩存的優化(下面也會介紹緩存行),因此訪問的速度會略勝一籌,咱們也會盡可能去選擇ArrayBlockingQueue。而事實證實在不少第三方的框架中,好比早期的log4j異步,都是選擇的ArrayBlockingQueue。數組
固然ArrayBlockingQueue,也有本身的弊端,就是性能比較低,爲何jdk會增長一些無鎖的隊列,其實就是爲了增長性能,很苦惱,又須要無鎖,又須要有界,這個時候恐怕會忍不住說一句你咋不上天呢?可是還真有人上天了。緩存
Disruptor就是上面說的那個天,Disruptor是英國外匯交易公司LMAX開發的一個高性能隊列,而且是一個開源的併發框架,並得到2011Duke’s程序框架創新獎。可以在無鎖的狀況下實現網絡的Queue併發操做,基於Disruptor開發的系統單線程能支撐每秒600萬訂單。目前,包括Apache Storm、Camel、Log4j2等等知名的框架都在內部集成了Disruptor用來替代jdk的隊列,以此來得到高性能。安全
上面已經把Disruptor吹出了花了,你確定會產生疑問,他真的能有這麼牛逼嗎,個人回答是固然的,在Disruptor中有三大殺器:bash
咱們ArrayBlockingQueue爲何會被拋棄的一點,就是由於用了重量級lock鎖,在咱們加鎖過程當中咱們會把鎖掛起,解鎖後,又會把線程恢復,這一過程會有必定的開銷,而且咱們一旦沒有獲取鎖,這個線程就只能一直等待,這個線程什麼事也不能作。網絡
CAS(compare and swap),顧名思義先比較在交換,通常是比較是不是老的值,若是是的進行交換設置,你們熟悉樂觀鎖的人都知道CAS能夠用來實現樂觀鎖,CAS中沒有線程的上下文切換,減小了沒必要要的開銷。 這裏使用JMH,用兩個線程,每次1一次調用,在我本機上進行測試,代碼以下:數據結構
@BenchmarkMode({Mode.SampleTime})
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@Warmup(iterations=3, time = 5, timeUnit = TimeUnit.MILLISECONDS)
@Measurement(iterations=1,batchSize = 100000000)
@Threads(2)
@Fork(1)
@State(Scope.Benchmark)
public class Myclass {
Lock lock = new ReentrantLock();
long i = 0;
AtomicLong atomicLong = new AtomicLong(0);
@Benchmark
public void measureLock() {
lock.lock();
i++;
lock.unlock();
}
@Benchmark
public void measureCAS() {
atomicLong.incrementAndGet();
}
@Benchmark
public void measureNoLock() {
i++;
}
}
複製代碼
測試出來結果以下:多線程
測試項目 | 測試結果 |
---|---|
Lock | 26000ms |
CAS | 4840ms |
無鎖 | 197ms |
能夠看見Lock是五位數,CAS是四位數,無鎖更小是三位數。 由此咱們能夠知道Lock>CAS>無鎖。
而咱們的Disruptor中使用的就是CAS,他利用CAS進行隊列中的一些下標設置,減小了鎖的衝突,提升了性能。
另外對於jdk中其餘的無鎖隊列也是使用CAS,原子類也是使用CAS。
談到了僞共享就不得不說計算機CPU緩存,緩存大小是CPU的重要指標之一,並且緩存的結構和大小對CPU速度的影響很是大,CPU內緩存的運行頻率極高,通常是和處理器同頻運做,工做效率遠遠大於系統內存和硬盤。實際工做時,CPU每每須要重複讀取一樣的數據塊,而緩存容量的增大,能夠大幅度提高CPU內部讀取數據的命中率,而不用再到內存或者硬盤上尋找,以此提升系統性能。可是從CPU芯片面積和成本的因素來考慮,緩存都很小。
CPU緩存能夠分爲一級緩存,二級緩存,現在主流CPU還有三級緩存,甚至有些CPU還有四級緩存。每一級緩存中所儲存的所有數據都是下一級緩存的一部分,這三種緩存的技術難度和制形成本是相對遞減的,因此其容量也是相對遞增的。
爲何CPU會有L一、L二、L3這樣的緩存設計?主要是由於如今的處理器太快了,而從內存中讀取數據實在太慢(一個是由於內存自己速度不夠,另外一個是由於它離CPU太遠了,總的來講須要讓CPU等待幾十甚至幾百個時鐘週期),這個時候爲了保證CPU的速度,就須要延遲更小速度更快的內存提供幫助,而這就是緩存。對這個感興趣能夠把電腦CPU拆下來,本身把玩一下。
每一次你聽見intel發佈新的cpu什麼,好比i7-7700k,8700k,都會對cpu緩存大小進行優化,感興趣能夠自行下來搜索,這些的發佈會或者發佈文章。
Martin和Mike的 QConpresentation演講中給出了一些每一個緩存時間:
從CPU到 | 大約須要的CPU週期 | 大約須要的時間 |
---|---|---|
主存 | 約60-80納秒 | |
QPI 總線傳輸(between sockets, not drawn) | 約20ns | |
L3 cache | 約40-45 cycles | 約15ns |
L2 cache | 約10 cycles | 約3ns |
L1 cache | 約3-4 cycles | 約1ns |
寄存器 | 1 cycle |
在cpu的多級緩存中,並非以獨立的項來保存的,而是相似一種pageCahe的一種策略,以緩存行來保存,而緩存行的大小一般是64字節,在Java中Long是8個字節,因此能夠存儲8個Long,舉個例子,你訪問一個long的變量的時候,他會把幫助再加載7個,咱們上面說爲何選擇數組不選擇鏈表,也就是這個緣由,在數組中能夠依靠緩衝行獲得很快的訪問。
緩存行是萬能的嗎?NO,由於他依然帶來了一個缺點,我在這裏舉個例子說明這個缺點,能夠想象有個數組隊列,ArrayQueue,他的數據結構以下:
class ArrayQueue{
long maxSize;
long currentIndex;
}
複製代碼
對於maxSize是咱們一開始就定義好的,數組的大小,對於currentIndex,是標誌咱們當前隊列的位置,這個變化比較快,能夠想象你訪問maxSize的時候,是否是把currentIndex也加載進來了,這個時候,其餘線程更新currentIndex,就會把cpu中的緩存行置位無效,請注意這是CPU規定的,他並非只吧currentIndex置位無效,若是此時又繼續訪問maxSize他依然得繼續從內存中讀取,可是MaxSize倒是咱們一開始定義好的,咱們應該訪問緩存便可,可是卻被咱們常常改變的currentIndex所影響。
爲了解決上面緩存行出現的問題,在Disruptor中採用了Padding的方式,
class LhsPadding
{
protected long p1, p2, p3, p4, p5, p6, p7;
}
class Value extends LhsPadding
{
protected volatile long value;
}
class RhsPadding extends Value
{
protected long p9, p10, p11, p12, p13, p14, p15;
}
複製代碼
其中的Value就被其餘一些無用的long變量給填充了。這樣你修改Value的時候,就不會影響到其餘變量的緩存行。
最後順便一提,在jdk8中提供了@Contended的註解,固然通常來講只容許Jdk中內部,若是你本身使用那就得配置Jvm參數 -RestricContentended = fase,將限制這個註解置位取消。不少文章分析了ConcurrentHashMap,可是都把這個註解給忽略掉了,在ConcurrentHashMap中就使用了這個註解,在ConcurrentHashMap每一個桶都是單獨的用計數器去作計算,而這個計數器因爲時刻都在變化,因此被用這個註解進行填充緩存行優化,以此來增長性能。
在Disruptor中採用了數組的方式保存了咱們的數據,上面咱們也介紹了採用數組保存咱們訪問時很好的利用緩存,可是在Disruptor中進一步選擇採用了環形數組進行保存數據,也就是RingBuffer。在這裏先說明一下環形數組並非真正的環形數組,在RingBuffer中是採用取餘的方式進行訪問的,好比數組大小爲 10,0訪問的是數組下標爲0這個位置,其實10,20等訪問的也是數組的下標爲0的這個位置。
實際上,在這些框架中取餘並非使用%運算,都是使用的&與運算,這就要求你設置的大小通常是2的N次方也就是,10,100,1000等等,這樣減去1的話就是,1,11,111,就能很好的使用index & (size -1),這樣利用位運算就增長了訪問速度。 若是在Disruptor中你不用2的N次方進行大小設置,他會拋出buffersize必須爲2的N次方異常。
固然其不只解決了數組快速訪問的問題,也解決了不須要再次分配內存的問題,減小了垃圾回收,由於咱們0,10,20等都是執行的同一片內存區域,這樣就不須要再次分配內存,頻繁的被JVM垃圾回收器回收。
自此三大殺器已經說完了,有了這三大殺器爲Disruptor如此高性能墊定了基礎。接下來還會在講解如何使用Disruptor和Disruptor的具體的工做原理。
下面舉了一個簡單的例子:
ublic static void main(String[] args) throws Exception {
// 隊列中的元素
class Element {
@Contended
private String value;
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
}
// 生產者的線程工廠
ThreadFactory threadFactory = new ThreadFactory() {
int i = 0;
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "simpleThread" + String.valueOf(i++));
}
};
// RingBuffer生產工廠,初始化RingBuffer的時候使用
EventFactory<Element> factory = new EventFactory<Element>() {
@Override
public Element newInstance() {
return new Element();
}
};
// 處理Event的handler
EventHandler<Element> handler = new EventHandler<Element>() {
@Override
public void onEvent(Element element, long sequence, boolean endOfBatch) throws InterruptedException {
System.out.println("Element: " + Thread.currentThread().getName() + ": " + element.getValue() + ": " + sequence);
// Thread.sleep(10000000);
}
};
// 阻塞策略
BlockingWaitStrategy strategy = new BlockingWaitStrategy();
// 指定RingBuffer的大小
int bufferSize = 8;
// 建立disruptor,採用單生產者模式
Disruptor<Element> disruptor = new Disruptor(factory, bufferSize, threadFactory, ProducerType.SINGLE, strategy);
// 設置EventHandler
disruptor.handleEventsWith(handler);
// 啓動disruptor的線程
disruptor.start();
for (int i = 0; i < 10; i++) {
disruptor.publishEvent((element, sequence) -> {
System.out.println("以前的數據" + element.getValue() + "當前的sequence" + sequence);
element.setValue("我是第" + sequence + "個");
});
}
}
複製代碼
在Disruptor中有幾個比較關鍵的: ThreadFactory:這是一個線程工廠,用於咱們Disruptor中生產者消費的時候須要的線程。 EventFactory:事件工廠,用於產生咱們隊列元素的工廠,在Disruptor中,他會在初始化的時候直接填充滿RingBuffer,一次到位。 EventHandler:用於處理Event的handler,這裏一個EventHandler能夠看作是一個消費者,可是多個EventHandler他們都是獨立消費的隊列。 WorkHandler:也是用於處理Event的handler,和上面區別在於,多個消費者都是共享同一個隊列。 WaitStrategy:等待策略,在Disruptor中有多種策略,來決定消費者獲消費時,若是沒有數據採起的策略是什麼?下面簡單列舉一下Disruptor中的部分策略
BlockingWaitStrategy:經過線程阻塞的方式,等待生產者喚醒,被喚醒後,再循環檢查依賴的sequence是否已經消費。
BusySpinWaitStrategy:線程一直自旋等待,可能比較耗cpu
LiteBlockingWaitStrategy:線程阻塞等待生產者喚醒,與BlockingWaitStrategy相比,區別在signalNeeded.getAndSet,若是兩個線程同時訪問一個訪問waitfor,一個訪問signalAll時,能夠減小lock加鎖次數.
LiteTimeoutBlockingWaitStrategy:與LiteBlockingWaitStrategy相比,設置了阻塞時間,超過期間後拋異常。
YieldingWaitStrategy:嘗試100次,而後Thread.yield()讓出cpu
EventTranslator:實現這個接口能夠將咱們的其餘數據結構轉換爲在Disruptor中流通的Event。
上面已經介紹了CAS,減小僞共享,RingBuffer三大殺器,介紹下來講一下Disruptor中生產者和消費者的整個流程。
對於生產者來講,能夠分爲多生產者和單生產者,用ProducerType.Single,和ProducerType.MULTI區分,多生產者和單生產者來講多了CAS,由於單生產者因爲是單線程,因此不須要保證線程安全。
在disruptor中一般用disruptor.publishEvent和disruptor.publishEvents()進行單發和羣發。
在disruptor發佈一個事件進入隊列須要下面幾個步驟:
下面簡單畫一下流程,上面咱們拿10舉例是不對的,由於bufferSize必需要2的N次方,因此咱們這裏拿Buffersize=8來舉例:下面介紹了當咱們已經push了8個event也就是一圈的時候,接下來再push 3條消息的一些過程: 1.首先調用next(3),咱們當前在7這個位置上因此接下來三條是8,9,10,取餘也就是0,1,2。 2.重寫0,1,2這三個內存區域的數據。 3.寫avaliableBuffer。
對了不知道你們對上述流程是否是很熟悉呢,對的那就是相似咱們的2PC,兩階段提交,先進行RingBuffer的位置鎖定,而後在進行提交和通知消費者。具體2PC的介紹能夠參照個人另一篇文章再有人問你分佈式事務,給他看這篇文章。
對於消費者來講,上面介紹了分爲兩種,一種是多個消費者獨立消費,一種是多個消費者消費同一個隊列,這裏介紹一下較爲複雜的多個消費者消費同一個隊列,能理解這個也就能理解獨立消費。 在咱們的disruptor.strat()方法中會啓動咱們的消費者線程以此來進行後臺消費。在消費者中有兩個隊列須要咱們關注,一個是全部消費者共享的進度隊列,還有個是每一個消費者獨立消費進度隊列。 1.對消費者共享隊列進行下一個Next的CAS搶佔,以及對本身消費進度的隊列標記當前進度。 2.爲本身申請可讀的RingBuffer的Next位置,這裏的申請不只僅是申請到next,有可能會申請到比Next大的一個範圍,阻塞策略的申請的過程以下:
同樣的咱們舉個例子,咱們要申請next=8這個位置。 1.首先在共享隊列搶佔進度8,在獨立隊列寫入進度7 2.獲取8的可讀的最大位置,這裏根據不一樣的策略進行,咱們選擇阻塞,因爲生產者生產了8,9,10,因此返回的是10,這樣和後續就不須要再次和avaliableBuffer進行對比了。 3.最後交給handler進行處理。
下面的圖展示了Log4j使用Disruptor,ArrayBlockingQueue以及同步的Log4j吞吐量的對比,能夠看見使用了Disruptor完爆了其餘的,固然還有更多的框架使用了Disruptor,這裏就不作介紹了。
本文介紹了傳統的阻塞隊列的缺點,後文重點吹逼了下Disruptor,以及他這麼牛逼的緣由,以及具體的工做流程。
若是之後有人問你叫你設計一個高效無鎖隊列,須要怎麼設計?相信你能從文章中總結出答案,若是對其有疑問或者想和我交流思路,能夠關注個人公衆號,加我好友和我一塊兒討論。
最後這篇文章被我收錄於JGrowing,一個全面,優秀,由社區一塊兒共建的Java學習路線,若是您想參與開源項目的維護,能夠一塊兒共建,github地址爲:github.com/javagrowing… 麻煩給個小星星喲。
若是你們以爲這篇文章對你有幫助,或者你有什麼疑問想提供1v1免費vip服務,均可以關注個人公衆號,你的關注和轉發是對我最大的支持,O(∩_∩)O: