從構建分佈式秒殺系統聊聊Disruptor高性能隊列

前言

秒殺架構持續優化中,基於自身認知不足之處在所不免,也請你們指正,共同進步。文章標題來自碼友<tukangzheng>的建議,但願能夠把阻塞隊列ArrayBlockingQueue這個隊列替換成Disruptor,因爲以前曾接觸過這個東西,據說很不錯,正好藉此機會整合進來。算法

簡介緩存

LMAX Disruptor是一個高性能的線程間消息庫。它源於LMAX對併發性,性能和非阻塞算法的研究,現在構成了Exchange基礎架構的核心部分。bash

  • Disruptor它是一個開源的併發框架,並得到2011 Duke’s 程序框架創新獎,可以在無鎖的狀況下實現網絡的Queue併發操做。
  • Disruptor是一個高性能的異步處理框架,或者能夠認爲是最快的消息框架(輕量的JMS),也能夠認爲是一個觀察者模式的實現,或者事件監聽模式的實現。

在這裏你能夠跟BlockingQueue隊列做比對,簡單的理解爲它是一種高效的"生產者-消費者"模型,先了解後深刻底層原理。網絡

核心

寫代碼案例以前,你們最好先了解 Disruptor 的核心概念,至少知道它是如何運做的。架構

  • Ring Buffer
  • 如其名,環形的緩衝區。曾經 RingBuffer 是 Disruptor 中的最主要的對象,但從3.0版本開始,其職責被簡化爲僅僅負責對經過 Disruptor 進行交換的數據(事件)進行存儲和更新。在一些更高級的應用場景中,Ring Buffer 能夠由用戶的自定義實現來徹底替代。
  • Sequence Disruptor
  • 經過順序遞增的序號來編號管理經過其進行交換的數據(事件),對數據(事件)的處理過程老是沿着序號逐個遞增處理。一個 Sequence 用於跟蹤標識某個特定的事件處理者( RingBuffer/Consumer )的處理進度。雖然一個 AtomicLong 也能夠用於標識進度,但定義 Sequence 來負責該問題還有另外一個目的,那就是防止不一樣的 Sequence 之間的CPU緩存僞共享(Flase Sharing)問題。
  • Sequencer
  • Sequencer 是 Disruptor 的真正核心。此接口有兩個實現類 SingleProducerSequencer、MultiProducerSequencer ,它們定義在生產者和消費者之間快速、正確地傳遞數據的併發算法。
  • Sequence Barrier
  • 用於保持對RingBuffer的 main published Sequence 和Consumer依賴的其它Consumer的 Sequence 的引用。 Sequence Barrier 還定義了決定 Consumer 是否還有可處理的事件的邏輯。
  • Wait Strategy
  • 定義 Consumer 如何進行等待下一個事件的策略。 (注:Disruptor 定義了多種不一樣的策略,針對不一樣的場景,提供了不同的性能表現)
  • Event
  • 在 Disruptor 的語義中,生產者和消費者之間進行交換的數據被稱爲事件(Event)。它不是一個被 Disruptor 定義的特定類型,而是由 Disruptor 的使用者定義並指定。
  • EventProcessor
  • EventProcessor 持有特定消費者(Consumer)的 Sequence,並提供用於調用事件處理實現的事件循環(Event Loop)。
  • EventHandler
  • Disruptor 定義的事件處理接口,由用戶實現,用於處理事件,是 Consumer 的真正實現。
  • Producer
  • 即生產者,只是泛指調用 Disruptor 發佈事件的用戶代碼,Disruptor 沒有定義特定接口或類型。
從構建分佈式秒殺系統聊聊Disruptor高性能隊列

優勢併發

  • 剖析Disruptor:爲何會這麼快?(一)鎖的缺點
  • 剖析Disruptor:爲何會這麼快?(二)神奇的緩存行填充
  • 剖析Disruptor:爲何會這麼快?(三)僞共享
  • 剖析Disruptor:爲何會這麼快?(四)揭祕內存屏障

使用案例框架

這裏以咱們系統中的秒殺做爲案例,後面有相對複雜的場景介紹。異步

定義秒殺事件對象:分佈式

/**
 * 事件對象(秒殺事件)
 * 建立者 科幫網
 */
public class SeckillEvent implements Serializable {
 private static final long serialVersionUID = 1L;
 private long seckillId;
 private long userId;
 public SeckillEvent(){
 }
 public long getSeckillId() {
 return seckillId;
 }
 public void setSeckillId(long seckillId) {
 this.seckillId = seckillId;
 }
 public long getUserId() {
 return userId;
 }
 public void setUserId(long userId) {
 this.userId = userId;
 }
}
複製代碼

爲了讓Disruptor爲咱們預先分配這些事件,咱們須要一個將執行構造的EventFactory:oop

/**
 * 事件生成工廠(用來初始化預分配事件對象)
 * 建立者 科幫網
 */
public class SeckillEventFactory implements EventFactory<SeckillEvent> {
 public SeckillEvent newInstance() {
 return new SeckillEvent();
 }
}
複製代碼

而後,咱們須要建立一個處理這些事件的消費者:

/**
 * 消費者(秒殺處理器)
 * 建立者 科幫網
 */
public class SeckillEventConsumer implements EventHandler<SeckillEvent> {
 //業務處理、這裏是沒法注入的,須要手動獲取,見源碼
 private ISeckillService seckillService = (ISeckillService) SpringUtil.getBean("seckillService");
 public void onEvent(SeckillEvent seckillEvent, long seq, boolean bool) throws Exception {
 seckillService.startSeckil(seckillEvent.getSeckillId(), seckillEvent.getUserId());
 }
}
複製代碼

既然有消費者,咱們將須要這些秒殺事件的來源:

/**
 * 使用translator方式生產者
 * 建立者 科幫網
 */
public class SeckillEventProducer {
 private final static EventTranslatorVararg<SeckillEvent> translator = new EventTranslatorVararg<SeckillEvent>() {
 public void translateTo(SeckillEvent seckillEvent, long seq, Object... objs) {
 seckillEvent.setSeckillId((Long) objs[0]);
 seckillEvent.setUserId((Long) objs[1]);
 }
 };
 private final RingBuffer<SeckillEvent> ringBuffer;
 public SeckillEventProducer(RingBuffer<SeckillEvent> ringBuffer){
 this.ringBuffer = ringBuffer;
 }
 public void seckill(long seckillId, long userId){
 this.ringBuffer.publishEvent(translator, seckillId, userId);
 }
}
複製代碼

最後,咱們來寫一個測試類,運行一下(跑不通,須要修改消費者):

/**
 * 測試類
 * 建立者 科幫網
 */
public class SeckillEventMain {
 public static void main(String[] args) {
 producerWithTranslator();
 }
 public static void producerWithTranslator(){
 SeckillEventFactory factory = new SeckillEventFactory();
 int ringBufferSize = 1024;
 ThreadFactory threadFactory = new ThreadFactory() {
 public Thread newThread(Runnable runnable) {
 return new Thread(runnable);
 }
 };
 //建立disruptor
 Disruptor<SeckillEvent> disruptor = new Disruptor<SeckillEvent>(factory, ringBufferSize, threadFactory);
 //鏈接消費事件方法
 disruptor.handleEventsWith(new SeckillEventConsumer());
 //啓動
 disruptor.start();
 RingBuffer<SeckillEvent> ringBuffer = disruptor.getRingBuffer();
 SeckillEventProducer producer = new SeckillEventProducer(ringBuffer);
 for(long i = 0; i<10; i++){
 producer.seckill(i, i);
 }
 disruptor.shutdown();//關閉 disruptor,方法會堵塞,直至全部的事件都獲得處理;
 }
}
複製代碼

使用場景

  • PCP (生產者-消費者問題)
  • 網上搜了下國內實戰案例並很少,大廠可能有在使用

這裏舉一個你們平常的例子,停車場景。當汽車進入停車場時(A),系統首先會記錄汽車信息(B)。同時也會發送消息到其餘系統處理相關業務(C),最後發送短信通知車主收費開始(D)。

一個生產者A與三個消費者B、C、D,D的事件處理須要B與C先完成。則該模型結構以下:

從構建分佈式秒殺系統聊聊Disruptor高性能隊列

在這個結構下,每一個消費者擁有各自獨立的事件序號Sequence,消費者之間不存在共享競態。SequenceBarrier1監聽RingBuffer的序號cursor,消費者B與C經過SequenceBarrier1等待可消費事件。SequenceBarrier2除了監聽cursor,同時也監聽B與C的序號Sequence,從而將最小的序號返回給消費者D,由此實現了D依賴B與C的邏輯。

相關文章
相關標籤/搜索