秒殺架構持續優化中,基於自身認知不足之處在所不免,也請你們指正,共同進步。文章標題來自碼友<tukangzheng>的建議,但願能夠把阻塞隊列ArrayBlockingQueue這個隊列替換成Disruptor,因爲以前曾接觸過這個東西,據說很不錯,正好藉此機會整合進來。html
LMAX Disruptor是一個高性能的線程間消息庫。它源於LMAX對併發性,性能和非阻塞算法的研究,現在構成了Exchange基礎架構的核心部分。java
Disruptor它是一個開源的併發框架,並得到2011 Duke’s 程序框架創新獎,可以在無鎖的狀況下實現網絡的Queue併發操做。git
在這裏你能夠跟BlockingQueue隊列做比對,簡單的理解爲它是一種高效的"生產者-消費者"模型,先了解後深刻底層原理。github
寫代碼案例以前,你們最好先了解 Disruptor 的核心概念,至少知道它是如何運做的。算法
有興趣的參考:
https://coolshell.cn/articles/9169.htmlspring
https://www.cnblogs.com/daoqidelv/p/6995888.htmlshell
這裏以咱們系統中的秒殺做爲案例,後面有相對複雜的場景介紹。緩存
定義秒殺事件對象:網絡
/** * 事件對象(秒殺事件) * 建立者 科幫網 */ public class SeckillEvent implements Serializable { private static final long serialVersionUID = 1L; private long seckillId; private long userId; public SeckillEvent(){ } public long getSeckillId() { return seckillId; } public void setSeckillId(long seckillId) { this.seckillId = seckillId; } public long getUserId() { return userId; } public void setUserId(long userId) { this.userId = userId; } }
爲了讓Disruptor爲咱們預先分配這些事件,咱們須要一個將執行構造的EventFactory:架構
/** * 事件生成工廠(用來初始化預分配事件對象) * 建立者 科幫網 */ public class SeckillEventFactory implements EventFactory<SeckillEvent> { public SeckillEvent newInstance() { return new SeckillEvent(); } }
而後,咱們須要建立一個處理這些事件的消費者:
/** * 消費者(秒殺處理器) * 建立者 科幫網 */ public class SeckillEventConsumer implements EventHandler<SeckillEvent> { //業務處理、這裏是沒法注入的,須要手動獲取,見源碼 private ISeckillService seckillService = (ISeckillService) SpringUtil.getBean("seckillService"); public void onEvent(SeckillEvent seckillEvent, long seq, boolean bool) throws Exception { seckillService.startSeckil(seckillEvent.getSeckillId(), seckillEvent.getUserId()); } }
既然有消費者,咱們將須要這些秒殺事件的來源:
/** * 使用translator方式生產者 * 建立者 科幫網 */ public class SeckillEventProducer { private final static EventTranslatorVararg<SeckillEvent> translator = new EventTranslatorVararg<SeckillEvent>() { public void translateTo(SeckillEvent seckillEvent, long seq, Object... objs) { seckillEvent.setSeckillId((Long) objs[0]); seckillEvent.setUserId((Long) objs[1]); } }; private final RingBuffer<SeckillEvent> ringBuffer; public SeckillEventProducer(RingBuffer<SeckillEvent> ringBuffer){ this.ringBuffer = ringBuffer; } public void seckill(long seckillId, long userId){ this.ringBuffer.publishEvent(translator, seckillId, userId); } }
最後,咱們來寫一個測試類,運行一下(跑不通,須要修改消費者):
/** * 測試類 * 建立者 科幫網 */ public class SeckillEventMain { public static void main(String[] args) { producerWithTranslator(); } public static void producerWithTranslator(){ SeckillEventFactory factory = new SeckillEventFactory(); int ringBufferSize = 1024; ThreadFactory threadFactory = new ThreadFactory() { public Thread newThread(Runnable runnable) { return new Thread(runnable); } }; //建立disruptor Disruptor<SeckillEvent> disruptor = new Disruptor<SeckillEvent>(factory, ringBufferSize, threadFactory); //鏈接消費事件方法 disruptor.handleEventsWith(new SeckillEventConsumer()); //啓動 disruptor.start(); RingBuffer<SeckillEvent> ringBuffer = disruptor.getRingBuffer(); SeckillEventProducer producer = new SeckillEventProducer(ringBuffer); for(long i = 0; i<10; i++){ producer.seckill(i, i); } disruptor.shutdown();//關閉 disruptor,方法會堵塞,直至全部的事件都獲得處理; } }
這裏舉一個你們平常的例子,停車場景。當汽車進入停車場時(A),系統首先會記錄汽車信息(B)。同時也會發送消息到其餘系統處理相關業務(C),最後發送短信通知車主收費開始(D)。
一個生產者A與三個消費者B、C、D,D的事件處理須要B與C先完成。則該模型結構以下:
在這個結構下,每一個消費者擁有各自獨立的事件序號Sequence,消費者之間不存在共享競態。SequenceBarrier1監聽RingBuffer的序號cursor,消費者B與C經過SequenceBarrier1等待可消費事件。SequenceBarrier2除了監聽cursor,同時也監聽B與C的序號Sequence,從而將最小的序號返回給消費者D,由此實現了D依賴B與C的邏輯。
代碼案例:從0到1構建分佈式秒殺系統
參考:
https://github.com/LMAX-Exchange/disruptor/wiki
https://github.com/LMAX-Exchange/disruptor/wiki/Getting-Started
http://wiki.jikexueyuan.com/project/disruptor-getting-started/lmax-framework.html