今天來說講我所知道的單機最快的MQ,它叫Disruptor數組
先來介紹一下Disruptor,從翻譯上來看,Disruptor—分裂、瓦解,Disruptor是國外某個金融、股票交易所開發的,2011年得到Duke獎,爲成爲單機最快的MQ,性能及高,無鎖CAS,單機支持高併發併發
怎麼樣,心動了沒?來來來,讓我來帶你們學習一下今天的主角—Disruptoride
你們能夠把Disruptor當作是內存裏的高效的隊列高併發
無鎖(CAS)、高併發,使用環形Buffer,直接覆蓋(不用清除)舊數據,下降GC頻繁,實現了基於事件的生產者消費者模型(觀察者模式)性能
RingBuffer有一個序號sequence,指向下一個可用元素,採用數組實現,沒有首尾指針學習
首先,它是基於數組實現的,遍歷起來要比鏈表要快
其次不用維護首尾指針,固然他也沒有首尾指針,之須要維護一個sequence便可this
這時就會有小夥伴着急了,怎麼能覆蓋掉呢,那我數據不就丟失了嗎?**spa
那確定是不會就讓他這麼輕易滴把這數據覆蓋掉滴,當須要覆蓋數據時,會執行一個策略,Disruptor給提供多種策略,說說比較經常使用的線程
//定義Event消息(事件)類 public class LongEvent{ private long value; private String name; @Override public String toString() { return "LongEvent{" + "value=" + value + ", name='" + name + '\'' + '}'; } public String getName() { return name; } public void setName(String name) { this.name = name; } public long getValue() { return value; } public void setValue(long value) { this.value = value; } }
//定義消息(事件)工廠 public class LongEventFactory implements EventFactory<LongEvent> { @Override public LongEvent newInstance() { return new LongEvent(); } }
//定義消息(事件)的消費方式 public class LongEventHandler implements EventHandler<LongEvent> { @Override public void onEvent(LongEvent longEvent, long l, boolean b) throws Exception { System.out.println(longEvent.getName()+"-----"+longEvent.getValue()); } }
//消息(事件)生產者 public class LongEventProducer { private final RingBuffer<LongEvent> ringBuffer; public LongEventProducer(RingBuffer<LongEvent> ringBuffer) { this.ringBuffer = ringBuffer; } public void onData(long val, String name) { long sequence = ringBuffer.next(); try { LongEvent event = ringBuffer.get(sequence); event.setValue(val); event.setName(name); } finally { ringBuffer.publish(sequence); } } }
public static void main(String[] args) { //new一個消息(事件)工廠 LongEventFactory factory = new LongEventFactory(); //設置環形Buffer的SIZE int size = 1024; //new Disruptor,參數是消息(事件)工廠,Buffer的Size,線程工廠 Disruptor<LongEvent> longEventDisruptor = new Disruptor<LongEvent>(factory, size, Executors.defaultThreadFactory()); //設置如何消費生產者產出的消息(事件) longEventDisruptor.handleEventsWith(new LongEventHandler()); //啓動--環形Buffer建立成功,全部的位置均已建立好Event對象 longEventDisruptor.start(); //獲取Disruptor的環形Buffer RingBuffer<LongEvent> ringBuffer = longEventDisruptor.getRingBuffer(); //new 消息(事件)生產者 LongEventProducer producer = new LongEventProducer(ringBuffer); //循環調用-往裏添加消息 for(long l = 0; l<100; l++) { //TODO 調用producer的生產消息(事件)的方法 producer.onData(l,"MingLog-"+l); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } //將消息(事件)發佈出去 longEventDisruptor.shutdown(); }
好了,Disruptor講解到這裏就結束了,你們有什麼想要學習的均可以私信或評論告訴我哦\~ 我會盡全力知足你們滴,我學,你也學,咳咳\~廣告看多了翻譯
點贊、關注來一波好嗎,秋梨膏~