Disruptor和LinkedBlockingQueue簡介
Disruptor是Java實現的用於線程間通訊的消息組件,其核心是一個Lock-free(無鎖)的Ringbuffer;LinkedBlockingQueue是java.util.concurrent包中提供的一個阻塞隊列;由於兩者之間有不少相同的地方,因此在此進行一次性能的對比。java
壓力測試
1.針對LinkedBlockingQueue的壓測類git
public class LinkedBlockingQueueTest { public static int eventNum = 5000000; public static void main(String[] args) { final BlockingQueue<LogEvent> queue = new LinkedBlockingQueue<LogEvent>(); final long startTime = System.currentTimeMillis(); new Thread(new Runnable() { @Override public void run() { int i = 0; while (i < eventNum) { LogEvent logEvent = new LogEvent(i, "c" + i); try { queue.put(logEvent); } catch (InterruptedException e) { e.printStackTrace(); } i++; } } }).start(); new Thread(new Runnable() { @Override public void run() { int k = 0; while (k < eventNum) { try { queue.take(); } catch (InterruptedException e) { e.printStackTrace(); } k++; } long endTime = System.currentTimeMillis(); System.out .println("costTime = " + (endTime - startTime) + "ms"); } }).start(); } }
LinkedBlockingQueueTest 實現了一個簡單的生產者-消費者模式,一條線程負責插入,另一條線程負責讀取。github
public class LogEvent implements Serializable { private static final long serialVersionUID = 1L; private long logId; private String content; public LogEvent(){ } public LogEvent(long logId, String content){ this.logId = logId; this.content = content; } public long getLogId() { return logId; } public void setLogId(long logId) { this.logId = logId; } public String getContent() { return content; } public void setContent(String content) { this.content = content; } }
LogEvent實體類,Disruptor的壓測類中也一樣會用到數組
2.下面是針對Disruptor的壓測類,須要引入Disruptor的jar包緩存
<dependency> <groupId>com.lmax</groupId> <artifactId>disruptor</artifactId> <version>3.3.6</version> </dependency>
Disruptor的壓測類多線程
public class DisruptorTest { public static void main(String[] args) { LogEventFactory factory = new LogEventFactory(); int ringBufferSize = 65536; final Disruptor<LogEvent> disruptor = new Disruptor<LogEvent>(factory, ringBufferSize, DaemonThreadFactory.INSTANCE, ProducerType.SINGLE, new BusySpinWaitStrategy()); LogEventConsumer consumer = new LogEventConsumer(); disruptor.handleEventsWith(consumer); disruptor.start(); new Thread(new Runnable() { @Override public void run() { RingBuffer<LogEvent> ringBuffer = disruptor.getRingBuffer(); for (int i = 0; i < LinkedBlockingQueueTest.eventNum; i++) { long seq = ringBuffer.next(); LogEvent logEvent = ringBuffer.get(seq); logEvent.setLogId(i); logEvent.setContent("c" + i); ringBuffer.publish(seq); } } }).start(); } }
一樣爲了保證測試數據的準確性,Disruptor使用了ProducerType.SINGLE(單生產者)模式,同時也只使用了一個LogEventConsumer(消費者)ide
public class LogEventConsumer implements EventHandler<LogEvent> { private long startTime; private int i; public LogEventConsumer() { this.startTime = System.currentTimeMillis(); } public void onEvent(LogEvent logEvent, long seq, boolean bool) throws Exception { i++; if (i == LinkedBlockingQueueTest.eventNum) { long endTime = System.currentTimeMillis(); System.out.println(" costTime = " + (endTime - startTime) + "ms"); } } }
LogEventConsumer 中負責記錄開始時間和結束時間以及接受消息的數量,方便統計時間性能
壓測結果統計
測試環境:
操做系統:win7 32位
CPU:Intel Core i3-2350M 2.3GHz 4核
內存:3G
JDK:1.6測試
分別運行以上兩個實例,運行屢次取平均值,結果以下:this
結果顯示Disruptor是LinkedBlockingQueue的1.65倍,測試環境是本人的筆記本電腦,配置有點低全部差距並非特別明顯;一樣在公司臺式機(win7 64位 – Intel Core i5 4核 – 4g內存 – jdk1.7)顯示的結果是3-4倍左右;官方提供的數據是在5倍左右:https://github.com/LMAX-Exchange/disruptor/wiki/Performance-Results
性能差距緣由分析
1.lock和cas的差距
LinkedBlockingQueue中使用了鎖,以下所示:
/** Lock held by take, poll, etc */ private final ReentrantLock takeLock = new ReentrantLock(); /** Lock held by put, offer, etc */ private final ReentrantLock putLock = new ReentrantLock();
而Disruptor中提供了cas的無鎖支持,提供了BusySpinWaitStrategy策略的支持
2.避免僞共享
緩存系統中是以緩存行(cache line)爲單位存儲的。緩存行是2的整數冪個連續字節,通常爲32-256個字節。最多見的緩存行大小是64個字節。當多線程修改互相獨立的變量時,若是這些變量共享同一個緩存行,就會無心中影響彼此的性能,這就是僞共享。
看一個實例:
public class FalseSharing implements Runnable { public final static int NUM_THREADS = 4; public final static long ITERATIONS = 50000000; private final int arrayIndex; private static VolatileLong[] longs = new VolatileLong[NUM_THREADS]; static { for (int i = 0; i < longs.length; i++) { longs[i] = new VolatileLong(); } } public FalseSharing(final int arrayIndex) { this.arrayIndex = arrayIndex; } public static void main(final String[] args) throws Exception { final long start = System.currentTimeMillis(); runTest(); System.out.println("costTime = " + (System.currentTimeMillis() - start) + "ms"); } private static void runTest() throws InterruptedException { Thread[] threads = new Thread[NUM_THREADS]; for (int i = 0; i < threads.length; i++) { threads[i] = new Thread(new FalseSharing(i)); } for (Thread t : threads) { t.start(); } for (Thread t : threads) { t.join(); } } @Override public void run() { long i = ITERATIONS + 1; while (0 != --i) { longs[arrayIndex].value = i; } } public final static class VolatileLong { public volatile long value = 0L; public long p1, p2, p3, p4, p5, p6; } }
分別註釋掉VolatileLong 中的public long p1, p2, p3, p4, p5, p6;和不註釋掉進行對比,發現不註釋掉的性能竟然是註釋掉性能的4倍,緣由就是緩存行大小是64個字節,不註釋掉說明一個VolatileLong 對象恰好佔用一個緩存行;註釋掉的話一個緩存行會被多個變量佔用,就會無心中影響彼此的性能。
查看Disruptor的源碼會發現不少地方避免了僞共享,好比:
abstract class SingleProducerSequencerPad extends AbstractSequencer { protected long p1, p2, p3, p4, p5, p6, p7; public SingleProducerSequencerPad(int bufferSize, WaitStrategy waitStrategy) { super(bufferSize, waitStrategy); } }
3.Ringbuffer的使用
Disruptor選擇使用Ringbuffer來構造lock-free隊列,什麼事Ringbuffer,能夠參考wiki:https://zh.wikipedia.org/wiki/%E7%92%B0%E5%BD%A2%E7%B7%A9%E8%A1%9D%E5%8D%80
數組是預分配的,這樣避免了Java GC帶來的運行開銷。生產者在生產消息或產生事件的時候對Ringbuffer元素中的屬性進行更新,而不是替換Ringbuffer中的元素。
佔時先整理這三條,確定還有其餘緣由
總結
Disruptor的高性能早就被用在了一些第三方庫中,好比log4j2,讓log4j2在性能上有質的飛越,以前對三種主流日誌性能對比:https://my.oschina.net/OutOfMemory/blog/789267