Disruptor和LinkedBlockingQueue性能對比以及分析

Disruptor和LinkedBlockingQueue簡介
Disruptor是Java實現的用於線程間通訊的消息組件,其核心是一個Lock-free(無鎖)的Ringbuffer;LinkedBlockingQueue是java.util.concurrent包中提供的一個阻塞隊列;由於兩者之間有不少相同的地方,因此在此進行一次性能的對比。java

壓力測試
1.針對LinkedBlockingQueue的壓測類git

public class LinkedBlockingQueueTest {

    public static int eventNum = 5000000;

    public static void main(String[] args) {
        final BlockingQueue<LogEvent> queue = new LinkedBlockingQueue<LogEvent>();
        final long startTime = System.currentTimeMillis();
        new Thread(new Runnable() {

            @Override
            public void run() {
                int i = 0;
                while (i < eventNum) {
                    LogEvent logEvent = new LogEvent(i, "c" + i);
                    try {
                        queue.put(logEvent);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    i++;
                }
            }
        }).start();

        new Thread(new Runnable() {

            @Override
            public void run() {
                int k = 0;
                while (k < eventNum) {
                    try {
                        queue.take();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    k++;
                }
                long endTime = System.currentTimeMillis();
                System.out
                        .println("costTime = " + (endTime - startTime) + "ms");
            }
        }).start();
    }
}

LinkedBlockingQueueTest 實現了一個簡單的生產者-消費者模式,一條線程負責插入,另一條線程負責讀取。github

public class LogEvent implements Serializable {

    private static final long serialVersionUID = 1L;
    private long logId;
    private String content;
    
    public LogEvent(){
        
    }
    
    public LogEvent(long logId, String content){
        this.logId = logId;
        this.content = content;
    }

    public long getLogId() {
        return logId;
    }

    public void setLogId(long logId) {
        this.logId = logId;
    }

    public String getContent() {
        return content;
    }

    public void setContent(String content) {
        this.content = content;
    }
}

LogEvent實體類,Disruptor的壓測類中也一樣會用到數組

2.下面是針對Disruptor的壓測類,須要引入Disruptor的jar包緩存

<dependency>
    <groupId>com.lmax</groupId>
    <artifactId>disruptor</artifactId>
    <version>3.3.6</version>
</dependency>

Disruptor的壓測類多線程

public class DisruptorTest {

    public static void main(String[] args) {
        LogEventFactory factory = new LogEventFactory();
        int ringBufferSize = 65536;
        final Disruptor<LogEvent> disruptor = new Disruptor<LogEvent>(factory,
                ringBufferSize, DaemonThreadFactory.INSTANCE,
                ProducerType.SINGLE, new BusySpinWaitStrategy());

        LogEventConsumer consumer = new LogEventConsumer();
        disruptor.handleEventsWith(consumer);
        disruptor.start();
        new Thread(new Runnable() {

            @Override
            public void run() {
                RingBuffer<LogEvent> ringBuffer = disruptor.getRingBuffer();
                for (int i = 0; i < LinkedBlockingQueueTest.eventNum; i++) {
                    long seq = ringBuffer.next();
                    LogEvent logEvent = ringBuffer.get(seq);
                    logEvent.setLogId(i);
                    logEvent.setContent("c" + i);
                    ringBuffer.publish(seq);
                }
            }
        }).start();
    }
}

一樣爲了保證測試數據的準確性,Disruptor使用了ProducerType.SINGLE(單生產者)模式,同時也只使用了一個LogEventConsumer(消費者)ide

public class LogEventConsumer implements EventHandler<LogEvent> {

    private long startTime;
    private int i;

    public LogEventConsumer() {
        this.startTime = System.currentTimeMillis();
    }

    public void onEvent(LogEvent logEvent, long seq, boolean bool)
            throws Exception {
        i++;
        if (i == LinkedBlockingQueueTest.eventNum) {
            long endTime = System.currentTimeMillis();
            System.out.println(" costTime = " + (endTime - startTime) + "ms");
        }
    }

}

LogEventConsumer 中負責記錄開始時間和結束時間以及接受消息的數量,方便統計時間性能

壓測結果統計
測試環境:
操做系統:win7 32位
CPU:Intel Core i3-2350M 2.3GHz 4核
內存:3G
JDK:1.6測試

分別運行以上兩個實例,運行屢次取平均值,結果以下:this

結果顯示Disruptor是LinkedBlockingQueue的1.65倍,測試環境是本人的筆記本電腦,配置有點低全部差距並非特別明顯;一樣在公司臺式機(win7 64位 – Intel Core i5 4核 – 4g內存 – jdk1.7)顯示的結果是3-4倍左右;官方提供的數據是在5倍左右:https://github.com/LMAX-Exchange/disruptor/wiki/Performance-Results

性能差距緣由分析
1.lock和cas的差距
LinkedBlockingQueue中使用了鎖,以下所示:

/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();

/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();

而Disruptor中提供了cas的無鎖支持,提供了BusySpinWaitStrategy策略的支持

2.避免僞共享
緩存系統中是以緩存行(cache line)爲單位存儲的。緩存行是2的整數冪個連續字節,通常爲32-256個字節。最多見的緩存行大小是64個字節。當多線程修改互相獨立的變量時,若是這些變量共享同一個緩存行,就會無心中影響彼此的性能,這就是僞共享。

看一個實例:

public class FalseSharing implements Runnable {
    public final static int NUM_THREADS = 4;
    public final static long ITERATIONS = 50000000;
    private final int arrayIndex;

    private static VolatileLong[] longs = new VolatileLong[NUM_THREADS];
    static {
        for (int i = 0; i < longs.length; i++) {
            longs[i] = new VolatileLong();
        }
    }

    public FalseSharing(final int arrayIndex) {
        this.arrayIndex = arrayIndex;
    }

    public static void main(final String[] args) throws Exception {
        final long start = System.currentTimeMillis();
        runTest();
        System.out.println("costTime = " + (System.currentTimeMillis() - start) + "ms");
    }

    private static void runTest() throws InterruptedException {
        Thread[] threads = new Thread[NUM_THREADS];

        for (int i = 0; i < threads.length; i++) {
            threads[i] = new Thread(new FalseSharing(i));
        }

        for (Thread t : threads) {
            t.start();
        }

        for (Thread t : threads) {
            t.join();
        }
    }

    @Override
    public void run() {
        long i = ITERATIONS + 1;
        while (0 != --i) {
            longs[arrayIndex].value = i;
        }
    }

    public final static class VolatileLong {
        public volatile long value = 0L;
        public long p1, p2, p3, p4, p5, p6;
    }
}

分別註釋掉VolatileLong 中的public long p1, p2, p3, p4, p5, p6;和不註釋掉進行對比,發現不註釋掉的性能竟然是註釋掉性能的4倍,緣由就是緩存行大小是64個字節,不註釋掉說明一個VolatileLong 對象恰好佔用一個緩存行;註釋掉的話一個緩存行會被多個變量佔用,就會無心中影響彼此的性能。

查看Disruptor的源碼會發現不少地方避免了僞共享,好比:

abstract class SingleProducerSequencerPad extends AbstractSequencer
{
    protected long p1, p2, p3, p4, p5, p6, p7;

    public SingleProducerSequencerPad(int bufferSize, WaitStrategy waitStrategy)
    {
        super(bufferSize, waitStrategy);
    }
}

3.Ringbuffer的使用
Disruptor選擇使用Ringbuffer來構造lock-free隊列,什麼事Ringbuffer,能夠參考wiki:https://zh.wikipedia.org/wiki/%E7%92%B0%E5%BD%A2%E7%B7%A9%E8%A1%9D%E5%8D%80
數組是預分配的,這樣避免了Java GC帶來的運行開銷。生產者在生產消息或產生事件的時候對Ringbuffer元素中的屬性進行更新,而不是替換Ringbuffer中的元素。

佔時先整理這三條,確定還有其餘緣由

總結
Disruptor的高性能早就被用在了一些第三方庫中,好比log4j2,讓log4j2在性能上有質的飛越,以前對三種主流日誌性能對比:https://my.oschina.net/OutOfMemory/blog/789267

相關文章
相關標籤/搜索