ArrayBlcokingQueue,LinkedBlockingQueue與Disruptor三種隊列對比與分析

1、基本介紹node

ArrayBlcokingQueue,LinkedBlockingQueue是jdk中內置的阻塞隊列,網上對它們的分析已經不少,主要有如下幾點:數組

一、底層實現機制不一樣,ArrayBlcokingQueue是基於數組的,LinkedBlockingQueue是基於鏈表的;併發

二、初始化方式不一樣,ArrayBlcokingQueue是有界的,初始化時必須指定隊列的大小;LinkedBlockingQueue能夠是無界的,但若是初始化時指定了隊列大小,也能夠作爲有界隊列使用;框架

三、鎖機制實現不一樣,ArrayBlcokingQueue生產和消費使用的是同一把鎖,並無作鎖分離;LinkedBlockingQueue中生產、消費分別經過putLock與takeLock保證同步,進行了鎖的分離;ide

使用的過程當中,根據應該場景提供了可選插入和刪除策略,咱們須要掌握和區分性能

一、插入操做測試

//隊列未滿時,返回true;隊列滿則拋出IllegalStateException(「Queue full」)異常
add(e);
//隊列未滿時,直接插入沒有返回值;隊列滿時會阻塞等待,一直等到隊列未滿時再插入。 put(e); //隊列未滿時,返回true;隊列滿時返回false。非阻塞當即返回。 offer(e); //設定等待的時間,若是在指定時間內還不能往隊列中插入數據則返回false,插入成功返回true。 offer(e, timeout, unit);

二、刪除操做this

//隊列不爲空時,返回隊首值並移除;隊列爲空時拋出NoSuchElementException()異常
remove();
//隊列不爲空返回隊首值並移除;當隊列爲空時會阻塞等待,一直等到隊列不爲空時再返回隊首值。
queue.take();
//隊列不爲空時返回隊首值並移除;隊列爲空時返回null。非阻塞當即返回。  queue.poll(); //設定等待的時間,若是在指定時間內隊列還未孔則返回null,不爲空則返回隊首值  queue.poll(timeout, unit)

 

Disruptor框架是由LMAX公司開發的一款高效的無鎖內存隊列。 spa

Disruptor的最大特色就是高性能,它的內部不同凡響的使用了環形隊列(RingBuffer)來代替普通的線型隊列,相比普通隊列環形隊列不須要針對性的同步head和tail頭尾指針,減小了線程協做的複雜度,再加上它自己基於無鎖操做的特性,從而能夠達到了很是高的性能;操作系統

在使用Disruptor框架時,咱們須要注意如下幾個方面

一、Disruptor的構造

    /**
     * 
     *
     * @param eventFactory   定義的事件工廠
     * @param ringBufferSize  環形隊列RingBuffer的大小,必須是2的N次方
     * @param threadFactory  消費者線程工廠
     * @param producerType 生產者線程的設置,當你只有一個生產者線程時設置爲 ProducerType.SINGLE,多個生產者線程ProducerType.MULTI
     * @param waitStrategy  消費者的等待策略
     */
    public Disruptor(
            final EventFactory<T> eventFactory,
            final int ringBufferSize,
            final ThreadFactory threadFactory,
            final ProducerType producerType,
            final WaitStrategy waitStrategy)
    {
        this(
            RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),
            new BasicExecutor(threadFactory));
    }

上面的消費者等待策略有如下:

BlockingWaitStrategy: 使用鎖和條件變量。CPU資源的佔用少,延遲大;

SleepingWaitStrategy: 在屢次循環嘗試不成功後,選擇讓出CPU,等待下次調度,屢次調度後仍不成功,嘗試前睡眠一個納秒級別的時間再嘗試。這種策略平衡了延遲和CPU資源佔用,但延遲不均勻。

YieldingWaitStrategy: 在屢次循環嘗試不成功後,經過Thread.yield()讓出CPU,等待下次調度。性能和CPU資源佔用上較爲平衡,但要注意使用該策略時消費者線程最好小於CPU的核心數

BusySpinWaitStrategy: 性能最高的一種,一直不停的自旋等待,獲取資源。能夠壓榨出最高的性能,但會佔用最多的CPU資源

PhasedBackoffWaitStrategy: 上面多種策略的綜合,CPU資源的佔用少,延遲大。

 

二、handleEventsWith與handleEventsWithWorkerPool的區別

這兩個方法區別主要就是在因而否重複消費隊列中的消息,前者加載的不一樣消費者會各自對消息進行消費,各個消費者之間不存在競爭。後者消費者對於隊列中的同一條消息不重複消費;

 

2、性能對比

上面咱們對三種阻塞隊列作了一個基本的介紹,下面咱們分別對它們進行性能上的測試與比對,看下ArrayBlcokingQueue與LinkedBlockingQueue性能上有哪些差異,而Disruptor是否像說的那樣具有很高的併發性能

首先咱們構造一個加單的消息事件實體

    public class InfoEvent implements Serializable {
    private static final long serialVersionUID = 1L;
    private long id;
    private String value;

    public InfoEvent() {

    }

    public InfoEvent(long id, String value) {
        this.id = id;
        this.value = value;
    }

    public long getId() {
        return id;
    }

    public void setId(long id) {
        this.id = id;
    }

    public String getValue() {
        return value;
    }

    public void setValue(String value) {
        this.value = value;
    }
}

定義事件工廠

public class InfoEventFactory implements EventFactory<InfoEvent>{
    public InfoEvent newInstance() {
        return new InfoEvent();
    }
 
}

定義Disruptor的消費者

public class InfoEventConsumer implements WorkHandler<InfoEvent> {
    private long startTime;
    private int cnt;

    public InfoEventConsumer() {
        this.startTime = System.currentTimeMillis();
    }

    @Override
    public void onEvent(InfoEvent event) throws Exception {
        // TODO Auto-generated method stub
        cnt++;

        if (cnt == DisruptorTest.infoNum) {
            long endTime = System.currentTimeMillis();
            System.out.println(" 消耗時間: " + (endTime - startTime) + "毫秒");
        }

    }
}

接下來分別針對ArrayBlockingQueue、LinkedBlockingQueue與Disruptor編寫測試程序

ArrayBlcokingQueueTest

public class ArrayBlcokingQueueTest {
    public static int infoNum = 5000000;
    public static void main(String[] args) {
        final BlockingQueue<InfoEvent> queue = new ArrayBlockingQueue<InfoEvent>(100);
        final long startTime = System.currentTimeMillis();
        new Thread(new Runnable() {

            @Override
            public void run() {
                int pcnt = 0;
                while (pcnt < infoNum) {
                    InfoEvent kafkaInfoEvent = new InfoEvent(pcnt, pcnt+"info");
                    try {
                        queue.put(kafkaInfoEvent);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    pcnt++;
                }
            }
        }).start();

        new Thread(new Runnable() {

            @Override
            public void run() {
                int cnt = 0;
                while (cnt < infoNum) {
                    try {
                        queue.take();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    cnt++;
                }
                long endTime = System.currentTimeMillis();
                System.out.println("消耗時間 : " + (endTime - startTime) + "毫秒");
            }
        }).start();
    }
}

LinkedBlockingQueueTest

public class LinkedBlockingQueueTest {
    
    public static int infoNum = 50000000;

    public static void main(String[] args) {
        final BlockingQueue<InfoEvent> queue = new LinkedBlockingQueue<InfoEvent>();
        final long startTime = System.currentTimeMillis();
        new Thread(new Runnable() {
            @Override
            public void run() {
                int pcnt = 0;
                while (pcnt < infoNum) {
                    InfoEvent kafkaInfoEvent = new InfoEvent(pcnt, pcnt + "info");
                    try {
                        queue.put(kafkaInfoEvent);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    pcnt++;
                }
            }
        }).start();

        new Thread(new Runnable() {

            @Override
            public void run() {
                int cnt = 0;
                while (cnt < infoNum) {
                    try {
                        queue.take();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    cnt++;
                }
                long endTime = System.currentTimeMillis();
                System.out.println("消耗時間: " + (endTime - startTime) + "毫秒");
            }
        }).start();
    }

}

DisruptorTest

public class DisruptorTest {
    public static int infoNum = 5000000;
    @SuppressWarnings("unchecked")
    public static void main(String[] args) {
        InfoEventFactory factory = new InfoEventFactory();
        int ringBufferSize = 65536; //數據緩衝區的大小 必須爲2的次冪
        
        /**
         * 
         *  factory,定義的事件工廠
         *  ringBufferSize,環形隊列RingBuffer的大小,必須是2的N次方
         *  ProducerType,生產者線程的設置,當你只有一個生產者線程時設置爲 ProducerType.SINGLE,多個生產者線程ProducerType.MULTI
         *  waitStrategy,消費者的等待策略  
         *  
         */
        final Disruptor<InfoEvent> disruptor = new Disruptor<InfoEvent>(factory, ringBufferSize,
                DaemonThreadFactory.INSTANCE, ProducerType.SINGLE, new YieldingWaitStrategy());

        InfoEventConsumer consumer = new InfoEventConsumer();
        disruptor.handleEventsWithWorkerPool(consumer);
        disruptor.start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                RingBuffer<InfoEvent> ringBuffer = disruptor.getRingBuffer();
                for (int i = 0; i < infoNum; i++) {
                    long seq = ringBuffer.next();
                    InfoEvent infoEvent = ringBuffer.get(seq);
                    infoEvent.setId(i);
                    infoEvent.setValue("info" + i);
                    ringBuffer.publish(seq);
                }
            }
        }).start();
    }
}

咱們在十萬、百萬、千萬三個數量級上,分別對ArrayBlockingQueue,LinkedBlockingQueue初始化爲無界和有界隊列,Disruptor的BlockingWaitStrategy和YieldingWaitStrategy,進行三次測試,生產者與消費者均在單線程模式下運行,對結果進行統計記錄;

測試環境:

操做系統:win7 64位,CPU:Intel Core i7-3250M 2.9GHz ,內存:8G,JDK:1.8,disruptor版本:3.4.2

五十萬數據

 

第一次

第二次

第三次

ArrayBlcokingQueue

229ms

233ms

253ms

LinkedBlockingQueue(無界)

211ms

207ms

202ms

LinkedBlockingQueue(有界)

265ms

207ms

256ms

DisruptorBlockingWaitStrategy

71ms

56ms

65ms

DisruptorYieldingWaitStrategy

56ms

48ms

49ms

五百萬數據

 

第一次

第二次

第三次

ArrayBlcokingQueue

1530ms

1603ms

1576ms

LinkedBlockingQueue(無界)

1369ms

1390ms

1409ms

LinkedBlockingQueue(有界)

1408ms

1397ms

1494ms

DisruptorBlockingWaitStrategy

345ms

363ms

357ms

DisruptorYieldingWaitStrategy

104ms

108ms

107ms

五千萬數據

 

第一次

第二次

第三次

ArrayBlcokingQueue

14799ms

14928ms

15122ms

LinkedBlockingQueue(無界)

14226ms

14008ms

13518ms

LinkedBlockingQueue(有界)

14039ms

14434ms

13839ms

DisruptorBlockingWaitStrategy

2972ms

2910ms

2848ms

DisruptorYieldingWaitStrategy

699ms

742ms

698ms


而後我對程序進行了修改,讓測試程序持續運行,每五千萬輸出一次,對運行期間CPU和內存使用狀況進行了記錄

ArrayBlcokingQueue



LinkedBlockingQueue(無界)



LinkedBlockingQueue(有界)



Disruptor(BlockingWaitStrategy)


Disruptor(YieldingWaitStrategy)


從上面的測試中咱們能夠看到ArrayBlcokingQueue與LinkedBlockingQueue性能上區別不是很大,LinkedBlockingQueue因爲讀寫鎖的分離,平均性能會稍微好些,但差距並不明顯。
而Disruptor
性能表現突出,特別是隨着數據量的增大,優點會愈加明顯。同時在單線程生產和消費的應用場景下,相比jdk內置的阻塞隊列,CPU和GC的壓力反而更小。

3、總結

 一、ArrayBlcokingQueue與LinkedBlockingQueue,通常認爲前者基於數組實現,初始化後不須要再建立新的對象,但沒有進行鎖分離,因此內存GC壓力較小,但性能會相對較低;後者基於鏈表實現,每次都須要建立  一個node對象,會存在頻繁的建立銷燬操做,GC壓力較大,但插入和刪除數據是不一樣的鎖,進行了鎖分離,性能會相對較好;從測試結果上看,其實二者性能和GC上差異都不大,在實際運用過程當中,我認爲通常場景下ArrayBlcokingQueue的性能已經足夠應對,處於對GC壓力的考慮,及潛在的OOM的風險我建議普通狀況下使用ArrayBlcokingQueue便可。固然你也可使用LinkedBlockingQueue,從測試結果上看,它相比ArrayBlcokingQueue性能上有有所提高但並不明顯,結合gc的壓力和潛在OOM的風險,因此結合應用的場景須要綜合考慮。

二、Disruptor作爲一款高性能隊列框架,確實足夠優秀,在測試中咱們能夠看到不管是性能和GC壓力都遠遠好過ArrayBlcokingQueue與LinkedBlockingQueue;若是你追求更高的性能,那麼Disruptor是一個很好的選擇。
但須要注意的是,你須要結合本身的硬件配置和業務場景,正確配置Disruptor,選擇合適的消費策略,這樣不只能夠獲取較高的性能,同時能夠保證硬件資源的合理分配。
三、對這三種阻塞隊列的測試,並非爲了比較孰優孰劣,主要是爲了增強理解,實際的業務應用須要根據狀況合理進行選擇。這裏只是結合本身的使用,對它們進行一個簡單的總結,並無進行較深刻的探究,若有錯誤的的地方還請指正與海涵。
相關文章
相關標籤/搜索