秒殺架構持續優化中,基於自身認知不足之處在所不免,也請你們指正,共同進步。文章標題來自碼友<tukangzheng>的建議,但願能夠把阻塞隊列ArrayBlockingQueue這個隊列替換成Disruptor,因爲以前曾接觸過這個東西,據說很不錯,正好藉此機會整合進來。算法
簡介緩存
LMAX Disruptor是一個高性能的線程間消息庫。它源於LMAX對併發性,性能和非阻塞算法的研究,現在構成了Exchange基礎架構的核心部分。bash
在這裏你能夠跟BlockingQueue隊列做比對,簡單的理解爲它是一種高效的"生產者-消費者"模型,先了解後深刻底層原理。網絡
寫代碼案例以前,你們最好先了解 Disruptor 的核心概念,至少知道它是如何運做的。架構
優勢併發
使用案例框架
這裏以咱們系統中的秒殺做爲案例,後面有相對複雜的場景介紹。異步
定義秒殺事件對象:分佈式
/**
* 事件對象(秒殺事件)
* 建立者 科幫網
*/
public class SeckillEvent implements Serializable {
private static final long serialVersionUID = 1L;
private long seckillId;
private long userId;
public SeckillEvent(){
}
public long getSeckillId() {
return seckillId;
}
public void setSeckillId(long seckillId) {
this.seckillId = seckillId;
}
public long getUserId() {
return userId;
}
public void setUserId(long userId) {
this.userId = userId;
}
}
複製代碼
爲了讓Disruptor爲咱們預先分配這些事件,咱們須要一個將執行構造的EventFactory:oop
/**
* 事件生成工廠(用來初始化預分配事件對象)
* 建立者 科幫網
*/
public class SeckillEventFactory implements EventFactory<SeckillEvent> {
public SeckillEvent newInstance() {
return new SeckillEvent();
}
}
複製代碼
而後,咱們須要建立一個處理這些事件的消費者:
/**
* 消費者(秒殺處理器)
* 建立者 科幫網
*/
public class SeckillEventConsumer implements EventHandler<SeckillEvent> {
//業務處理、這裏是沒法注入的,須要手動獲取,見源碼
private ISeckillService seckillService = (ISeckillService) SpringUtil.getBean("seckillService");
public void onEvent(SeckillEvent seckillEvent, long seq, boolean bool) throws Exception {
seckillService.startSeckil(seckillEvent.getSeckillId(), seckillEvent.getUserId());
}
}
複製代碼
既然有消費者,咱們將須要這些秒殺事件的來源:
/**
* 使用translator方式生產者
* 建立者 科幫網
*/
public class SeckillEventProducer {
private final static EventTranslatorVararg<SeckillEvent> translator = new EventTranslatorVararg<SeckillEvent>() {
public void translateTo(SeckillEvent seckillEvent, long seq, Object... objs) {
seckillEvent.setSeckillId((Long) objs[0]);
seckillEvent.setUserId((Long) objs[1]);
}
};
private final RingBuffer<SeckillEvent> ringBuffer;
public SeckillEventProducer(RingBuffer<SeckillEvent> ringBuffer){
this.ringBuffer = ringBuffer;
}
public void seckill(long seckillId, long userId){
this.ringBuffer.publishEvent(translator, seckillId, userId);
}
}
複製代碼
最後,咱們來寫一個測試類,運行一下(跑不通,須要修改消費者):
/**
* 測試類
* 建立者 科幫網
*/
public class SeckillEventMain {
public static void main(String[] args) {
producerWithTranslator();
}
public static void producerWithTranslator(){
SeckillEventFactory factory = new SeckillEventFactory();
int ringBufferSize = 1024;
ThreadFactory threadFactory = new ThreadFactory() {
public Thread newThread(Runnable runnable) {
return new Thread(runnable);
}
};
//建立disruptor
Disruptor<SeckillEvent> disruptor = new Disruptor<SeckillEvent>(factory, ringBufferSize, threadFactory);
//鏈接消費事件方法
disruptor.handleEventsWith(new SeckillEventConsumer());
//啓動
disruptor.start();
RingBuffer<SeckillEvent> ringBuffer = disruptor.getRingBuffer();
SeckillEventProducer producer = new SeckillEventProducer(ringBuffer);
for(long i = 0; i<10; i++){
producer.seckill(i, i);
}
disruptor.shutdown();//關閉 disruptor,方法會堵塞,直至全部的事件都獲得處理;
}
}
複製代碼
使用場景
這裏舉一個你們平常的例子,停車場景。當汽車進入停車場時(A),系統首先會記錄汽車信息(B)。同時也會發送消息到其餘系統處理相關業務(C),最後發送短信通知車主收費開始(D)。
一個生產者A與三個消費者B、C、D,D的事件處理須要B與C先完成。則該模型結構以下:
在這個結構下,每一個消費者擁有各自獨立的事件序號Sequence,消費者之間不存在共享競態。SequenceBarrier1監聽RingBuffer的序號cursor,消費者B與C經過SequenceBarrier1等待可消費事件。SequenceBarrier2除了監聽cursor,同時也監聽B與C的序號Sequence,從而將最小的序號返回給消費者D,由此實現了D依賴B與C的邏輯。