原文地址: haifeiWu和他朋友們的博客
博客地址:www.hchstudio.cn
歡迎轉載,轉載請註明做者及出處,謝謝!java
最近一直在研究隊列的一些問題,今天樓主要分享一個高性能的隊列 Disruptor 。
git
它是英國外匯交易公司 LMAX 開發的一個高性能隊列,研發的初衷是解決內存隊列的延遲問題。基於 Disruptor 開發的系統單線程能支撐每秒600萬訂單。github
目前,包括 Apache Storm、Log4j2 在內的不少知名項目都應用了Disruptor以獲取高性能。在樓主公司內部使用 Disruptor 與 Netty 結合用來作 GPS 實時數據的處理,性能至關強悍。本文從實戰角度來大概瞭解一下 Disruptor 的實現原理。編程
Disruptor經過如下設計來解決隊列速度慢的問題:數組
經過上面的介紹,咱們大概能夠了解到 Disruptor 是一個高性能的無鎖隊列,那麼該如何使用呢,下面樓主經過 Disruptor 實現一個簡單的生產者消費者模型,介紹 Disruptor 的使用緩存
首先,根據 Disruptor 的事件驅動的編程模型,咱們須要定義一個事件來攜帶數據。服務器
public class DataEvent { private long value; public void set(long value) { this.value = value; } public long getValue() { return value; } }
爲了讓 Disruptor 爲咱們預先分配這些事件,咱們須要構造一個 EventFactory 來執行構造網絡
public class DataEventFactory implements EventFactory<DataEvent> { @Override public DataEvent newInstance() { return new DataEvent(); } }
一旦咱們定義了事件,咱們須要建立一個處理這些事件的消費者。 在咱們的例子中,咱們要作的就是從控制檯中打印出值。架構
public class DataEventHandler implements EventHandler<DataEvent> { @Override public void onEvent(DataEvent dataEvent, long l, boolean b) throws Exception { new DataEventConsumer(dataEvent); } }
接下來咱們須要初始化 Disruptor ,並定義一個生產者來生成消息併發
public class DisruptorManager { private final static Logger LOG = LoggerFactory.getLogger(DisruptorManager.class); /*消費者線程池*/ private static ExecutorService threadPool; private static Disruptor<DataEvent> disruptor; private static RingBuffer<DataEvent> ringBuffer; private static AtomicLong dataNum = new AtomicLong(); public static void init(EventHandler<DataEvent> eventHandler) { //初始化disruptor threadPool = Executors.newCachedThreadPool(); disruptor = new Disruptor<>(new DataEventFactory(), 8 * 1024, threadPool, ProducerType.MULTI, new BlockingWaitStrategy()); ringBuffer = disruptor.getRingBuffer(); disruptor.handleEventsWith(eventHandler); disruptor.start(); new Timer().schedule(new TimerTask() { @Override public void run() { LOG.info("放入隊列中數據編號{},隊列剩餘空間{}", dataNum.get(), ringBuffer.remainingCapacity()); } }, new Date(), 60 * 1000); } /** * * @param message */ public static void putDataToQueue(long message) { if (dataNum.get() == Long.MAX_VALUE) { dataNum.set(0L); } // 往隊列中加事件 long next = ringBuffer.next(); try { ringBuffer.get(next).set(message); dataNum.incrementAndGet(); } catch (Exception e) { LOG.error("向RingBuffer存入數據[{}]出現異常=>{}", message, e.getStackTrace()); } finally { ringBuffer.publish(next); } } public static void close() { threadPool.shutdown(); disruptor.shutdown(); } }
最後咱們來定義一個 Main 方法來執行代碼
public class EventMain { public static void main(String[] args) throws Exception { DisruptorManager.init(new DataEventHandler()); for (long l = 0; true; l++) { DisruptorManager.putDataToQueue(l); Thread.sleep(1000); } } }
上面代碼具體感興趣的小夥伴請移步 https://github.com/haifeiWu/disruptor-learn
而後咱們能夠看到控制檯打印出來的數據
Disruptor 經過精巧的無鎖設計實現了在高併發情形下的高性能。
另外在Log4j 2中的異步模式採用了Disruptor來處理。在這裏樓主遇到一個小問題,就是在使用Log4j 2經過 TCP 模式往 logstash 發日誌數據的時候,因爲網絡問題致使連接中斷,從而致使 Log4j 2 不停的往 ringbuffer 中寫數據,ringbuffer數據沒有消費者,致使服務器內存跑滿。解決方案是設置 Log4j 2 中 Disruptor 隊列有界,或者換成 UDP 模式來寫日誌數據(若是數據不重要的話)。