和朱曄一塊兒複習Java併發(二):隊列

和朱曄一塊兒複習Java併發(二):隊列

老樣子,咱們仍是從一些例子開始慢慢熟悉各類併發隊列。以看小說看故事的心態來學習不會顯得那麼枯燥並且更容易記憶深入。java

阻塞隊列的等待?

阻塞隊列最適合作的事情就是作爲生產消費者的中間存儲,以抵抗生產者消費者速率不匹配的問題,不可是在速率不匹配的時候可以有地方暫存任務,並且能在隊列滿或空的時候讓線程進行阻塞,讓出CPU的時間。這裏對於阻塞兩字加粗,是由於其實Java的線程在這個時候是等待(WAITING)狀態而不是阻塞(BLOCKED),這個容易引發歧義。git

下面咱們來寫一個程序比較一下阻塞和等待:github

@Slf4j
public class BlockVsWait {

    Object locker = new Object();
    ArrayBlockingQueue<Integer> arrayBlockingQueue1 = new ArrayBlockingQueue<>(1);
    ArrayBlockingQueue<Integer> arrayBlockingQueue2 = new ArrayBlockingQueue<>(1);

    @Test
    public void test() throws InterruptedException {

        arrayBlockingQueue1.put(1);


        Thread waitOnTake = new Thread(() -> {
            synchronized (locker) {
                try {
                    arrayBlockingQueue2.take();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        waitOnTake.setName("waitOnTake");
        waitOnTake.start();

        Thread waitOnPut = new Thread(() -> {
            try {
                arrayBlockingQueue1.put(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        waitOnPut.setName("waitOnPut");
        waitOnPut.start();

        Thread block = new Thread(() -> {
            synchronized (locker) {
                log.info("OK");
            }
        });
        block.setName("block");
        block.start();

        block.join();
    }
}

在上面的代碼裏,咱們開啓了三個線程:數組

  • 一個是等待鎖
  • 一個是等待從隊列獲取數據
  • 一個是等待加入數據到隊列

運行程序以後,咱們看一下線程的狀態,能夠看到:安全

  • 等待鎖的block線程,處於BLOCKED狀態
  • 還有兩個被阻塞隊列阻塞的線程,處於WAITING狀態

image_1dfuu0lnapb61bek2rbmed19pp9.png-404.3kB

咱們來查看一下線程這兩種狀態的定義:微信

image_1dfuuun4k50e1cg13j017hp15jh16.png-229.8kB

通俗一點說,BLOCKED就是線程本身想作事情,可是很無奈只能等別人先把事情幹完,因此說是被阻塞,被動的,WAITING就是線程本身主動願意放棄CPU時間進行等待,等別人在合適的時候通知本身來繼續幹活,因此說是等待中,主動的。Blocking Queue實際上是讓線程Waiting而不是Block。數據結構

生產消費

如今,咱們使用阻塞隊列嘗試實現生產者消費者的功能。多線程

首先,實現一個基類,經過一個開關來控制生產者消費者的執行:併發

@Slf4j
public abstract class Worker implements Runnable {
    protected volatile boolean enable = true;
    protected String name;
    protected BlockingQueue<Integer> queue;

    public Worker(String name, BlockingQueue<Integer> queue) {
        this.name = name;
        this.queue = queue;
    }

    public void stop() {
        this.enable = false;
        log.info("Stop:{}", name);
    }
}

而後實現生產者:異步

@Slf4j
public class Producer extends Worker {
    private static AtomicInteger atomicInteger = new AtomicInteger(0);

    public Producer(String name, BlockingQueue<Integer> queue) {
        super(name, queue);
    }

    @Override
    public void run() {
        while (enable) {
            try {
                int value = atomicInteger.incrementAndGet();
                queue.put(value);
                log.info("size:{}, put:{}, enable:{}", queue.size(), value, enable);
                TimeUnit.MILLISECONDS.sleep(100);
            } catch (InterruptedException e) {
            }
        }
        log.info("{} quit", name);
    }
}

只要開關開啓,生產者會無限進行數據生產,把數據加入隊列,生產者每100ms生產一個數據,這裏有一個計數器來提供要生產的數據。

下面實現消費者:

@Slf4j
public class Consumer extends Worker {

    private static AtomicInteger totalConsumedAfterShutdown = new AtomicInteger();

    public Consumer(String name, BlockingQueue<Integer> queue) {
        super(name, queue);
    }

    public static int totalConsumedAfterShutdown() {
        return totalConsumedAfterShutdown.get();
    }

    @Override
    public void run() {
        while (enable || queue.size() > 0) {
            try {
                Integer item = queue.take();
                log.info("size:{}, got:{}, enable:{}", queue.size(), item, enable);
                if (!enable) {
                    totalConsumedAfterShutdown.incrementAndGet();
                }
                TimeUnit.MILLISECONDS.sleep(200);
            } catch (InterruptedException e) {
            }
        }
        log.info("{} quit", name);
    }
}

一樣,消費者也是在開關開啓或隊列中有數據的時候,會不斷進行數據消費。這裏咱們有一個計數器用來統計開關關閉以後,消費者還能消費多少數據。消費者消費速度是200ms消費一次,明顯比生產者慢一半。經過這個配置咱們能夠想到,若是使用有界阻塞隊列的話,由於消費速度比生產速度慢,因此隊列會慢慢堆積一直到隊列滿,而後生產者線程被阻塞,咱們來寫一個測試程序看看是否是這樣:

@Slf4j
public class ArrayBlockingQueueTest {

    @Test
    public void test() throws InterruptedException {
        BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(50, false);
        List<Worker> workers = new ArrayList<>();
        List<Thread> threads = new ArrayList<>();

        for (int i = 0; i < 10; i++) {
            String name = "Producer" + i;
            Producer worker = new Producer(name, queue);
            workers.add(worker);
            Thread thread = new Thread(worker);
            thread.setName(name);
            threads.add(thread);
            thread.start();
        }
        for (int i = 0; i < 4; i++) {
            String name = "Consumer" + i;
            Consumer worker = new Consumer(name, queue);
            workers.add(worker);
            Thread thread = new Thread(worker);
            thread.setName(name);
            threads.add(thread);
            thread.start();
        }

        Executors.newSingleThreadScheduledExecutor().schedule(() -> {
            for (Worker worker : workers) {
                worker.stop();
            }
        }, 2, TimeUnit.SECONDS);

        for (Thread thread : threads) {
            thread.join();
        }
        log.info("totalConsumedAfterShutdown:{}", Consumer.totalConsumedAfterShutdown());
    }
}

在這段代碼裏:

  • 咱們使用了容量爲50的有界阻塞隊列ArrayBlockingQueue做爲容器
  • 生產者10個線程
  • 消費者4個線程
  • 2秒後關閉生產者和消費者(這個時候生產者應該不會繼續生產,可是消費者還會繼續消費)
  • 主線程等待全部生產者消費者執行完成
  • 最後輸出關閉後,消費者還能消費多少數據

部分運行結果以下:

12:59:34.609 [Producer7] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:33, put:40, enable:true
12:59:34.609 [Producer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:36, put:37, enable:true
12:59:34.609 [Producer8] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:35, put:36, enable:true
12:59:34.609 [Producer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:36, put:38, enable:true
12:59:34.609 [Producer6] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:34, put:39, enable:true
12:59:34.683 [Consumer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:33, got:7, enable:true
12:59:34.683 [Consumer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:34, got:6, enable:true
12:59:34.683 [Consumer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:33, got:5, enable:true
12:59:34.687 [Consumer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:32, got:8, enable:true
12:59:34.701 [Producer5] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:33, put:41, enable:true
12:59:34.701 [Producer4] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:35, put:42, enable:true
12:59:34.701 [Producer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:35, put:44, enable:true
12:59:34.701 [Producer9] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:36, put:43, enable:true
12:59:34.711 [Producer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:37, put:45, enable:true
12:59:34.714 [Producer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:41, put:46, enable:true
12:59:34.714 [Producer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:39, put:48, enable:true
12:59:34.714 [Producer8] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:40, put:50, enable:true
12:59:34.714 [Producer6] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:42, put:49, enable:true
12:59:34.714 [Producer7] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:38, put:47, enable:true
12:59:34.805 [Producer4] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:45, put:53, enable:true
12:59:34.805 [Producer5] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:43, put:51, enable:true
12:59:34.805 [Producer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:44, put:52, enable:true
12:59:34.805 [Producer9] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:46, put:54, enable:true
12:59:34.814 [Producer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:47, put:55, enable:true
12:59:34.818 [Producer8] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:50, put:58, enable:true
12:59:34.818 [Producer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:48, put:57, enable:true
12:59:34.818 [Producer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:49, put:56, enable:true
12:59:34.888 [Consumer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:47, got:12, enable:true
12:59:34.888 [Producer7] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:49, put:60, enable:true
12:59:34.888 [Producer6] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:48, put:59, enable:true
12:59:34.887 [Consumer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:49, got:9, enable:true
12:59:34.887 [Consumer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:48, got:10, enable:true
12:59:34.892 [Consumer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:48, got:11, enable:true
12:59:34.909 [Producer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:49, put:62, enable:true
12:59:34.909 [Producer5] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:50, put:61, enable:true
12:59:35.093 [Producer9] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:50, put:64, enable:true
12:59:35.093 [Consumer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:49, got:13, enable:true
12:59:35.094 [Consumer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:47, got:16, enable:true
12:59:35.094 [Consumer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:49, got:17, enable:true
12:59:35.094 [Producer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:49, put:65, enable:true
12:59:35.094 [Consumer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:49, got:18, enable:true
12:59:35.094 [Producer8] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:50, put:66, enable:true
12:59:35.094 [Producer4] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:48, put:63, enable:true
12:59:35.297 [Consumer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:49, got:19, enable:true
12:59:35.298 [Producer7] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:49, put:69, enable:true
12:59:35.298 [Producer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:47, put:68, enable:true
12:59:35.298 [Producer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:48, put:67, enable:true
12:59:35.298 [Consumer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:46, got:20, enable:true
12:59:35.298 [Producer6] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:50, put:70, enable:true
12:59:35.298 [Consumer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:47, got:15, enable:true
12:59:35.298 [Consumer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:48, got:14, enable:true
12:59:35.502 [Producer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:50, put:74, enable:true
12:59:35.502 [Consumer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:47, got:24, enable:true
12:59:35.502 [Consumer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:49, got:22, enable:true
12:59:35.502 [Consumer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:49, got:23, enable:true
12:59:35.502 [Producer9] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:49, put:73, enable:true
12:59:35.502 [Producer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:48, put:72, enable:true
12:59:35.502 [Consumer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:48, got:21, enable:true
12:59:35.502 [Producer5] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:50, put:71, enable:true
12:59:35.704 [Producer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:49, put:77, enable:true
12:59:35.704 [Consumer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:47, got:30, enable:true
12:59:35.704 [Consumer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:48, got:28, enable:true
12:59:35.704 [Producer8] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:49, put:75, enable:true
12:59:35.704 [Producer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:50, put:80, enable:true
12:59:35.704 [Consumer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:48, got:27, enable:true
12:59:35.704 [Producer4] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:48, put:76, enable:true
12:59:35.704 [Consumer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:48, got:29, enable:true
12:59:35.909 [Consumer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:48, got:32, enable:true
12:59:35.909 [Producer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:50, put:84, enable:true
12:59:35.909 [Consumer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:49, got:25, enable:true
12:59:35.909 [Producer6] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:48, put:79, enable:true
12:59:35.909 [Producer7] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:50, put:78, enable:true
12:59:35.909 [Consumer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:47, got:33, enable:true
12:59:35.909 [Producer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:49, put:83, enable:true
12:59:35.909 [Consumer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:49, got:26, enable:true
12:59:36.113 [Consumer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:48, got:34, enable:true
12:59:36.113 [Consumer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:48, got:35, enable:true
12:59:36.113 [Consumer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:48, got:31, enable:true
12:59:36.113 [Consumer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:47, got:38, enable:true
12:59:36.113 [Producer5] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:48, put:81, enable:true
12:59:36.113 [Producer9] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:48, put:82, enable:true
12:59:36.114 [Producer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:50, put:87, enable:true
12:59:36.114 [Producer8] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:49, put:85, enable:true
12:59:36.313 [pool-1-thread-1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Worker - Stop:Producer0
12:59:36.313 [pool-1-thread-1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Worker - Stop:Producer1
12:59:36.313 [pool-1-thread-1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Worker - Stop:Producer2
12:59:36.313 [pool-1-thread-1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Worker - Stop:Producer3
12:59:36.313 [pool-1-thread-1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Worker - Stop:Producer4
12:59:36.313 [pool-1-thread-1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Worker - Stop:Producer5
12:59:36.313 [pool-1-thread-1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Worker - Stop:Producer6
12:59:36.313 [pool-1-thread-1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Worker - Stop:Producer7
12:59:36.313 [pool-1-thread-1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Worker - Stop:Producer8
12:59:36.314 [pool-1-thread-1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Worker - Stop:Producer9
12:59:36.314 [pool-1-thread-1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Worker - Stop:Consumer0
12:59:36.314 [pool-1-thread-1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Worker - Stop:Consumer1
12:59:36.314 [pool-1-thread-1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Worker - Stop:Consumer2
12:59:36.314 [pool-1-thread-1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Worker - Stop:Consumer3
12:59:36.317 [Consumer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:48, got:39, enable:false
12:59:36.317 [Consumer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:47, got:36, enable:false
12:59:36.317 [Consumer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:46, got:37, enable:false
12:59:36.317 [Consumer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:48, got:40, enable:false
12:59:36.317 [Producer4] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:47, put:86, enable:false
12:59:36.317 [Producer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:48, put:88, enable:false
12:59:36.317 [Producer6] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:49, put:92, enable:false
12:59:36.317 [Producer7] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:50, put:91, enable:false
12:59:36.420 [Producer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - Producer1 quit
12:59:36.420 [Producer6] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - Producer6 quit
12:59:36.420 [Producer7] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - Producer7 quit
12:59:36.420 [Producer4] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - Producer4 quit
12:59:36.522 [Consumer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:48, got:41, enable:false
12:59:36.522 [Consumer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:47, got:44, enable:false
12:59:36.522 [Consumer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:48, got:43, enable:false
12:59:36.522 [Producer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:49, put:96, enable:false
12:59:36.522 [Consumer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:49, got:42, enable:false
12:59:36.522 [Producer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:49, put:90, enable:false
12:59:36.522 [Producer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:48, put:89, enable:false
12:59:36.522 [Producer8] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:50, put:93, enable:false
12:59:36.626 [Producer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - Producer2 quit
12:59:36.626 [Producer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - Producer0 quit
12:59:36.626 [Producer8] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - Producer8 quit
12:59:36.626 [Producer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - Producer3 quit
12:59:36.725 [Consumer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:49, got:45, enable:false
12:59:36.726 [Producer9] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:48, put:95, enable:false
12:59:36.726 [Producer5] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:48, put:94, enable:false
12:59:36.726 [Consumer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:47, got:50, enable:false
12:59:36.725 [Consumer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:48, got:47, enable:false
12:59:36.726 [Consumer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:47, got:48, enable:false
12:59:36.829 [Producer5] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - Producer5 quit
12:59:36.829 [Producer9] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - Producer9 quit
12:59:36.930 [Consumer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:46, got:49, enable:false
12:59:36.930 [Consumer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:44, got:46, enable:false
12:59:36.930 [Consumer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:45, got:51, enable:false
12:59:36.930 [Consumer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:44, got:52, enable:false
12:59:37.133 [Consumer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:42, got:54, enable:false
12:59:37.133 [Consumer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:40, got:57, enable:false
12:59:37.133 [Consumer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:40, got:53, enable:false
12:59:37.133 [Consumer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:41, got:55, enable:false
12:59:37.334 [Consumer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:37, got:59, enable:false
12:59:37.334 [Consumer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:36, got:56, enable:false
12:59:37.334 [Consumer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:36, got:60, enable:false
12:59:37.334 [Consumer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:37, got:58, enable:false
12:59:37.538 [Consumer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:34, got:61, enable:false
12:59:37.538 [Consumer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:32, got:63, enable:false
12:59:37.538 [Consumer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:33, got:64, enable:false
12:59:37.539 [Consumer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:32, got:62, enable:false
12:59:37.742 [Consumer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:29, got:68, enable:false
12:59:37.742 [Consumer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:30, got:65, enable:false
12:59:37.742 [Consumer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:30, got:66, enable:false
12:59:37.742 [Consumer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:28, got:67, enable:false
12:59:37.948 [Consumer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:26, got:70, enable:false
12:59:37.948 [Consumer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:24, got:69, enable:false
12:59:37.948 [Consumer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:24, got:72, enable:false
12:59:37.948 [Consumer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:25, got:71, enable:false
12:59:38.149 [Consumer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:21, got:75, enable:false
12:59:38.149 [Consumer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:20, got:76, enable:false
12:59:38.149 [Consumer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:22, got:74, enable:false
12:59:38.149 [Consumer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:20, got:73, enable:false
12:59:38.350 [Consumer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:18, got:80, enable:false
12:59:38.350 [Consumer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:18, got:77, enable:false
12:59:38.350 [Consumer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:16, got:79, enable:false
12:59:38.350 [Consumer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:17, got:78, enable:false
12:59:38.553 [Consumer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:12, got:83, enable:false
12:59:38.553 [Consumer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:14, got:84, enable:false
12:59:38.553 [Consumer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:13, got:82, enable:false
12:59:38.553 [Consumer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:12, got:81, enable:false
12:59:38.759 [Consumer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:8, got:87, enable:false
12:59:38.759 [Consumer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:8, got:88, enable:false
12:59:38.759 [Consumer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:8, got:86, enable:false
12:59:38.759 [Consumer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:8, got:85, enable:false
12:59:38.960 [Consumer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:7, got:92, enable:false
12:59:38.963 [Consumer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:5, got:89, enable:false
12:59:38.963 [Consumer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:4, got:90, enable:false
12:59:38.963 [Consumer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:5, got:91, enable:false
12:59:39.161 [Consumer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:3, got:96, enable:false
12:59:39.168 [Consumer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:1, got:93, enable:false
12:59:39.168 [Consumer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:0, got:94, enable:false
12:59:39.168 [Consumer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:1, got:95, enable:false
12:59:39.168 [Consumer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - Consumer2 quit
12:59:39.168 [Consumer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - Consumer1 quit
12:59:39.168 [Consumer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - Consumer3 quit

從結果看到幾個結論:

  • 在隊列滿以前,生產者能夠任意按照本身的速度生產,滿了以後只能等消費者消費後才能進行生產,符合預期
  • 關閉開啓設置後,生產者很快就都完成了,可是最後消費者只退出了3個,有一個卡住了,線程狀態以下:

image_1dfv50m5tfre14f0hh1k2gtda9.png-133.7kB

固然這個狀態不那麼容易碰巧遇到,我運行了20+次代碼才遇到一次,你也能夠把sleep移到前面去這樣更容易出現這樣的問題。
細細品味一下爲何有一個消費者卡住了,咱們不是判斷了隊列中有數據才繼續執行take()的嗎?問題就出在這裏,在判斷的時候隊列中的確有數據,看看Consumer0最後輸出了3,可是在這以後的瞬間,還有3條數據都被其它線程消費完了,等到執行下一行代碼的時候就卡住了。在編寫多線程程序的時候,咱們很容易去假設:

  • 兩行靠在一塊兒的代碼就是能在一個原子操做內完成的,不是這樣的,在以後的文章中咱們會繼續看到更有意思的一個錯覺
  • 既然使用了線程安全的隊列,那麼全部操做都是線程安全的一致的,這個說法也是一個誤區,首先,咱們沒法確保全部操做都是線程安全以及一致的,具體須要參考JDK的文檔說明,好比迭代操做,好比size()操做,很對線程安全的併發類型也沒法提供一致性的保證,有的時候只是估算;其次,所謂全部操做僅限於單個操做,通常而言容器沒法確保你兩個操做兩行代碼之間不能有其它線程來繼續操做這個容器

這個Bug是很容易忽略的,咱們能夠改一下消費者代碼,利用有超時等待的poll()來解決這個問題:

@Override
public void run() {
    while (enable || queue.size() > 0) {
        try {
            Integer item = queue.poll(1, TimeUnit.SECONDS);
            log.info("size:{}, got:{}, enable:{}", queue.size(), item, enable);
            if (!enable && item != null) {
                totalConsumedAfterShutdown.incrementAndGet();
            }
            TimeUnit.MILLISECONDS.sleep(200);
        } catch (InterruptedException e) {
        }
    }
    log.info("{} quit", name);
}

修改主程序後能夠獲得下面的結果:
image_1dfv633k69a41g901hfvi911a5612.png-592.6kB
值得注意幾點:

  • 此次Consumer3沒有永遠卡住,而是在等待了1秒後超時了,沒有拿到數據
  • 最後輸出的totalConsumedAfterShutdown是60而不是最大隊列50,這個也很容易想到爲何,enable=false以後,以前那10個生產者當前的循環還會繼續執行,把數據加入隊列,可是這個結果永遠只會是60(50+10生產者)嗎?你能夠想一想

隊列各類方法執行速度比拼

前面咱們也看到了,隊列消費的操做能夠take()能夠poll(),各類操做的區別以下:

image_1dfv6drpq4c3pfu14j254p1m031i.png-39.8kB

  • 拋出異常就是在操做失敗的時候直接拋出異常
  • 特殊值就是不能執行操做的時候返回false或null
  • 阻塞就是線程進行等待狀態等待能夠操做爲止
  • 超時就是等待必定時間不行的話再放棄

這些操做之間的性能是否有區別呢,咱們寫一個簡單的程序測試一下

@Slf4j
public class QueueBenchmark {

    int taskCount = 20000000;
    int threadCount = 10;

    @Test
    public void test() throws InterruptedException {

        List<Queue<Integer>> queues = getQueues();
        benchmark("add", queues, taskCount, threadCount);
        benchmark("poll", queues, taskCount, threadCount);
        benchmark("offer", queues, taskCount, threadCount);
        benchmark("size", queues, taskCount, threadCount);
        benchmark("remove", queues, taskCount, threadCount);
    }

    private List<Queue<Integer>> getQueues() {
        return Arrays.asList(new ConcurrentLinkedQueue<>(),
                new LinkedBlockingQueue<>(),
                new ArrayBlockingQueue<>(taskCount, false),
                new LinkedTransferQueue<>(),
                new PriorityBlockingQueue<>(),
                new LinkedList<>());
    }

    private void benchmark(String operation, List<Queue<Integer>> queues, int taskCount, int threadCount) throws InterruptedException {
        StopWatch stopWatch = new StopWatch();
        queues.forEach(queue -> {
            stopWatch.start(queue.getClass().getSimpleName() + "-" + operation);
            try {
                tasks(queue, taskCount, threadCount, operation);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            stopWatch.stop();
            log.info("queue:{}, operation:{}, size:{}, qps:{}", queue.getClass().getSimpleName(), operation, queue.size(), (long) taskCount * 1000 / stopWatch.getLastTaskTimeMillis());
        });
        log.info(stopWatch.prettyPrint());
    }

    private void tasks(Queue<Integer> queue, int taskCount, int threadCount, String operation) throws InterruptedException {
        ForkJoinPool forkJoinPool = new ForkJoinPool(threadCount);
        forkJoinPool.execute(() -> IntStream.rangeClosed(1, taskCount).parallel().forEach(i -> {
                    IntConsumer opt = task(queue, operation);
                    if (queue instanceof LinkedList) {
                        synchronized (queue) {
                            opt.accept(i);
                        }
                    } else {
                        opt.accept(i);
                    }
                }
        ));
        forkJoinPool.shutdown();
        forkJoinPool.awaitTermination(1, TimeUnit.HOURS);
    }

    private IntConsumer task(Queue<Integer> queue, String name) {
        if (name.equals("add")) return queue::add;
        if (name.equals("offer")) return queue::offer;
        if (name.equals("poll")) return i -> queue.poll();
        if (name.equals("remove")) return i -> queue.remove();
        if (name.equals("size")) return i -> queue.size();

        return i -> {
        };
    }
}

在代碼裏,咱們測試10個線程下,對各類隊列的各類方法執行N次操做的耗時。
結論以下,表格中數據的單位毫秒,也就是耗時,數字越小性能越好:
image_1dfv8o0so167fmdf1kticm1r5h2p.png-42.7kB

有幾個地方值得注意:

  • ConcurrentLinkedQueue以及LinkedTransferQueue的size()操做特別慢,見JDK說明:
    image_1dfv8bmqgpg82vf15m4os01qjp1v.png-54.8kB
    因此咱們在使用這兩種隊列的時候特別須要注意
  • 整體上來講,add相對於offer,poll相對於remove沒有什麼性能差別,根據本身的需求使用對應的方法便可

下面咱們稍微改下代碼測試一下BlockingQueue的put()和take():

@Slf4j
public class BlockingQueueBenchmark {

    int taskCount = 20000000;
    int threadCount = 10;

    @Test
    public void test() throws InterruptedException {

        List<BlockingQueue<Integer>> queues = getQueues();
        benchmark("put", queues, taskCount, threadCount);
        benchmark("take", queues, taskCount, threadCount);
    }

    private List<BlockingQueue<Integer>> getQueues() {
        return Arrays.asList(
                new LinkedBlockingQueue<>(),
                new LinkedTransferQueue<>(),
                new ArrayBlockingQueue<>(taskCount, false),
                new PriorityBlockingQueue<>());
    }

    private void benchmark(String operation, List<BlockingQueue<Integer>> queues, int taskCount, int threadCount) throws InterruptedException {
        StopWatch stopWatch = new StopWatch();
        queues.forEach(queue -> {
            stopWatch.start(queue.getClass().getSimpleName() + "-" + operation);
            try {
                tasks(queue, taskCount, threadCount, operation);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            stopWatch.stop();
            log.info("queue:{}, operation:{}, size:{}", queue.getClass().getSimpleName(), operation, queue.size());
        });
        log.info(stopWatch.prettyPrint());
    }

    private void tasks(BlockingQueue<Integer> queue, int taskCount, int threadCount, String operation) throws InterruptedException {
        ForkJoinPool forkJoinPool = new ForkJoinPool(threadCount);
        forkJoinPool.execute(() -> IntStream.rangeClosed(1, taskCount).parallel().forEach(task(queue, operation)));
        forkJoinPool.shutdown();
        forkJoinPool.awaitTermination(1, TimeUnit.HOURS);
    }

    private IntConsumer task(BlockingQueue<Integer> queue, String name) {
        if (name.equals("put")) return i -> {
            try {
                queue.put(i);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };
        if (name.equals("take")) return i -> {
            try {
                queue.take();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };
        return i -> {
        };
    }
}

把結果一塊兒完善到前面表格中:
image_1dfv9l78mf9a1khp1uqgn53199e36.png-51.9kB
能夠看到,阻塞的方法和非阻塞的性能差很少,也是根據須要選擇便可。看代碼實現的話也能夠看到不少隊列對於各類存取方法邏輯基本是一致的。
各個隊列之間的性能貌似區別不大,我感受這個測試寫的不是很好,可能和線程池的調度也有關係,咱們接下去再從新換一種測試方式來測試下各類隊列的吞吐。

各類場景下各類隊列的吞吐測試

在此次的測試中,咱們模擬一下場景:

@Data
@AllArgsConstructor
@NoArgsConstructor
class TestCase {
    private int elementCount;
    private Mode mode;
    private int producerCount;
    private int consumerCount;
}

模擬一下不一樣的消費者生產者線程數量配比的狀況下,各類隊列完成必定數量元素的存取操做總共的耗時。咱們定義三種模式:

  • ProducerAndConsumerShareThread:也就是存取操做在一個線程中完成,先存後取
  • ProducerAndThenConsumer:也就是先把隊列用生產者填充完畢,而後再用消費者去所有讀取出來
  • ConcurrentProducerAndConsumer:也就是生產者和消費者同時操做隊列,同時進行存和取操做
enum Mode {
    ProducerAndConsumerShareThread,
    ProducerAndThenConsumer,
    ConcurrentProducerAndConsumer
}

咱們定義的全部測試場景以下:

List<TestCase> testCases = new ArrayList<>();
        testCases.add(new TestCase(element_count, Mode.ConcurrentProducerAndConsumer, 1, 1));
        testCases.add(new TestCase(element_count, Mode.ConcurrentProducerAndConsumer, 10, 10));
        testCases.add(new TestCase(element_count, Mode.ConcurrentProducerAndConsumer, 100, 100));
        testCases.add(new TestCase(element_count, Mode.ConcurrentProducerAndConsumer, 1000, 1000));
        testCases.add(new TestCase(element_count, Mode.ConcurrentProducerAndConsumer, Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors()));

        testCases.add(new TestCase(element_count, Mode.ConcurrentProducerAndConsumer, 1, 100));
        testCases.add(new TestCase(element_count, Mode.ConcurrentProducerAndConsumer, 100, 1));

        testCases.add(new TestCase(element_count, Mode.ProducerAndConsumerShareThread, 1, 0));
        testCases.add(new TestCase(element_count, Mode.ProducerAndConsumerShareThread, 10, 0));
        testCases.add(new TestCase(element_count, Mode.ProducerAndConsumerShareThread, 100, 0));
        testCases.add(new TestCase(element_count, Mode.ProducerAndConsumerShareThread, 1000, 0));
        testCases.add(new TestCase(element_count, Mode.ProducerAndConsumerShareThread, Runtime.getRuntime().availableProcessors(), 0));


        testCases.add(new TestCase(element_count, Mode.ProducerAndThenConsumer, 1, 1));
        testCases.add(new TestCase(element_count, Mode.ProducerAndThenConsumer, 10, 10));
        testCases.add(new TestCase(element_count, Mode.ProducerAndThenConsumer, 100, 100));
        testCases.add(new TestCase(element_count, Mode.ProducerAndThenConsumer, 1000, 1000));
        testCases.add(new TestCase(element_count, Mode.ProducerAndThenConsumer, Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors()));

十幾種測試,覆蓋這些場景:

  • 同時存取模式下不一樣生產者和消費者線程數量的狀況
  • 同時存取模式下生產者和消費者數量不均衡的狀況
  • 先存後取模式下不一樣生產者和消費者線程數量的狀況
  • 存取操做在一個線程依次操做模式下不一樣線程數量的狀況

主要測試三種隊列,每一種隊列測試之間GC一次儘可能排除干擾:

LinkedBlockingQueue<String> linkedBlockingQueue = new LinkedBlockingQueue<>();
for (TestCase testCase : testCases) {
    System.gc();
    benchmark(linkedBlockingQueue, testCase);
}
linkedBlockingQueue = null;

LinkedTransferQueue<String> linkedTransferQueue = new LinkedTransferQueue<>();
for (TestCase testCase : testCases) {
    System.gc();
    benchmark(linkedTransferQueue, testCase);
}
linkedTransferQueue = null;

ArrayBlockingQueue<String> arrayBlockingQueue = new ArrayBlockingQueue<>(element_count);
for (TestCase testCase : testCases) {
    System.gc();
    benchmark(arrayBlockingQueue, testCase);
}
arrayBlockingQueue = null;

生產者:

class ProducerTask implements Runnable {

    private String name;
    private BlockingQueue<String> queue;
    private TestCase testCase;
    private CountDownLatch startCountDownLatch;
    private CountDownLatch finishCountDownLatch;

    public ProducerTask(CountDownLatch startCountDownLatch,
                        CountDownLatch finishCountDownLatch,
                        String name,
                        BlockingQueue<String> queue,
                        TestCase testCase) {
        this.startCountDownLatch = startCountDownLatch;
        this.finishCountDownLatch = finishCountDownLatch;
        this.name = name;
        this.queue = queue;
        this.testCase = testCase;
    }

    @Override
    public void run() {

        try {
            startCountDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        int count = testCase.elementCount / testCase.getProducerCount();

        if (testCase.mode == Mode.ProducerAndConsumerShareThread) {
            for (int i = 0; i < count; i++) {
                try {
                    queue.put(name + i);
                    queue.take();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        } else {
            for (int i = 0; i < count; i++) {
                try {
                    queue.put(name + i);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
        finishCountDownLatch.countDown();
    }
}

此次的測試,咱們預先根據線程數量算好執行次數,而不是像以前的測試同樣全部的任務統一由線程池調度,這樣更容易測試出隊列自己的性能,排除干擾。這裏能夠看到若是是存取共享模式的話,生產者直接作存取操做,其它模式的話,生產者僅僅作存的操做。

消費者:

class ConsumerTask implements Runnable {

    private BlockingQueue<String> queue;
    private TestCase testCase;
    private CountDownLatch startCountDownLatch;
    private CountDownLatch finishCountDownLatch;

    public ConsumerTask(CountDownLatch startCountDownLatch,
                        CountDownLatch finishCountDownLatch,
                        BlockingQueue<String> queue,
                        TestCase testCase) {
        this.startCountDownLatch = startCountDownLatch;
        this.finishCountDownLatch = finishCountDownLatch;
        this.queue = queue;
        this.testCase = testCase;
    }

    @Override
    public void run() {
        try {
            startCountDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        int count = testCase.elementCount / testCase.getConsumerCount();

        if (testCase.mode != Mode.ProducerAndConsumerShareThread) {
            for (int i = 0; i < count; i++) {
                try {
                    queue.take();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
        finishCountDownLatch.countDown();
    }
}

生產者和消費者咱們都用了兩個CountDownLatch來作攔截,一個startCountDownLatch用來在全部線程都啓動後由主線程通知一會兒放開全部的線程,一個finishCountDownLatch用來讓主線程等待線程的執行完畢。

主要的測試代碼以下:

private void benchmark(BlockingQueue<String> queue, TestCase testCase) throws InterruptedException {

        long begin = System.currentTimeMillis();
        log.info("\r\n==========================\r\nBegin benchmark Queue:[{}], case:{}", queue.getClass().getSimpleName(),
                testCase.toString());
        CountDownLatch startCountDownLatch = new CountDownLatch(1);

        if (testCase.mode == Mode.ProducerAndConsumerShareThread) {
            CountDownLatch finishCountDownLatch = new CountDownLatch(testCase.getProducerCount());
            for (int i = 0; i < testCase.getProducerCount(); i++) {
                new Thread(new ProducerTask(
                        startCountDownLatch,
                        finishCountDownLatch,
                        String.format("Thread_%d_", i),
                        queue,
                        testCase)).start();
            }
            startCountDownLatch.countDown();
            finishCountDownLatch.await();

        } else if (testCase.mode == Mode.ConcurrentProducerAndConsumer) {
            CountDownLatch finishCountDownLatch = new CountDownLatch(testCase.getProducerCount() + testCase.getConsumerCount());
            for (int i = 0; i < testCase.getProducerCount(); i++) {
                new Thread(new ProducerTask(
                        startCountDownLatch,
                        finishCountDownLatch,
                        String.format("Thread_%d_", i),
                        queue,
                        testCase)).start();
            }
            for (int i = 0; i < testCase.getConsumerCount(); i++) {
                new Thread(new ConsumerTask(
                        startCountDownLatch,
                        finishCountDownLatch,
                        queue,
                        testCase)).start();
            }
            startCountDownLatch.countDown();
            finishCountDownLatch.await();
        } else if (testCase.mode == Mode.ProducerAndThenConsumer) {
            CountDownLatch finishCountDownLatch = new CountDownLatch(testCase.getProducerCount());
            for (int i = 0; i < testCase.getProducerCount(); i++) {
                new Thread(new ProducerTask(
                        startCountDownLatch,
                        finishCountDownLatch,
                        String.format("Thread_%d_", i),
                        queue,
                        testCase)).start();
            }
            startCountDownLatch.countDown();
            finishCountDownLatch.await();

            startCountDownLatch = new CountDownLatch(1);
            finishCountDownLatch = new CountDownLatch(testCase.getConsumerCount());
            for (int i = 0; i < testCase.getConsumerCount(); i++) {
                new Thread(new ConsumerTask(
                        startCountDownLatch,
                        finishCountDownLatch,
                        queue,
                        testCase)).start();
            }
            startCountDownLatch.countDown();
            finishCountDownLatch.await();
        }

        long finish = System.currentTimeMillis();
        log.info("Finish benchmark Queue:[{}], case:{}, QPS:{}\r\n==========================\n", queue.getClass().getSimpleName(),
                testCase.toString(),
                (long) element_count * 1000 / (finish - begin));
    }

能夠看到三種模式的處理不一樣:

  • 對於存取共享線程的話,咱們只有生產者線程
  • 對於先存後取模式的話,在全部生產者線程執行完成後咱們再開啓消費者線程
  • 對於併發存取模式的話,咱們同時開啓兩組線程

整個測試結果彙總以下(這個測試是在12核阿里雲跑出來的,元素數1000萬):

image_1dfvhc5g31vrj1dtbehf35qove4q.png-129.9kB

說實話這個測試的結果不是我想象的那樣,我想象的是隨着併發的增多隊列性能會急劇降低,並且各類隊列之間有顯著的性能差別,這個結果是這樣這也能夠說明這些隊列性能都是很不錯的,沒有明顯的短板。

能夠大概得出幾個結論:

  • 隨着併發的增多會下降一些吞吐,不過也都還好,併發過小吞吐也上不去
  • ArrayBlockingQueue性能穩定,並且性能也幾乎是最好的
  • 在生產者數量大大小於消費者數量的時候,LinkedBlockingQueue表現出最好的吞吐,並且比其它兩個好不少,這點我還沒細究,有待研究是爲何

通常而言,阻塞隊列中,無界隊列能夠選擇LinkedBlockingQueue,有界隊列能夠選擇ArrayBlockingQueue,後者還有公平參數能夠開啓公平特性,有關這個特性下面咱們也會來觀察。

經過同步隊列觀察公平特性

SynchronousQueue是沒有容量的阻塞隊列,只有等另外一個線程移出元素後才能插入元素成功。這裏咱們寫一段代碼來測試,沿用以前的消費者和生產者類,只是修改了2秒後關閉隊列的地方,這裏咱們加上了interrupt()操做,不然生產者是沒法退出的:

@Slf4j
public class SynchronousQueueTest {

    @Test
    public void test() throws InterruptedException {
        SynchronousQueue<Integer> queue = new SynchronousQueue<>(false);
        List<Worker> workers = new ArrayList<>();
        List<Thread> threads = new ArrayList<>();

        for (int i = 0; i < 10; i++) {
            String name = "Producer" + i;
            Producer worker = new Producer(name, queue);
            workers.add(worker);
            Thread thread = new Thread(worker);
            thread.setName(name);
            threads.add(thread);
            thread.start();
        }
        for (int i = 0; i < 4; i++) {
            String name = "Consumer" + i;
            Consumer worker = new Consumer(name, queue);
            workers.add(worker);
            Thread thread = new Thread(worker);
            thread.setName(name);
            threads.add(thread);
            thread.start();
        }

        Executors.newSingleThreadScheduledExecutor().schedule(() -> {
            for (Worker worker : workers) {
                worker.stop();
            }
            for (Thread thread : threads) {
                thread.interrupt();
            }
        }, 2, TimeUnit.SECONDS);

        for (Thread thread : threads) {
            thread.join();
        }
    }
}

咱們先把公平參數設置爲false看看輸出:
image_1dfvik2puns51b2b1qnev0o8ni57.png-224kB
搜索日誌能夠發現找不到Producer0~Producer5這6個生產者的蹤影,由於沒有消費者來拉取它們的數據,它們都卡住了,這些生產者都餓死了,日誌中最小的put也是從7開始的。改成公平模式試試:
image_1dfviuu5revv1l4019ff1o971n6l74.png-233.5kB
此次能夠找到全部生產者的日誌,公平模式也就是全部等待的線程FIFO次序來訪問隊列:
image_1dfvj1gm410g8crvh661fpuuft7h.png-56.1kB

延遲隊列

這裏給出一個延遲隊列的例子,咱們往隊列提交10次延遲消息,每次提交2條同樣的消息,消息的絕對延遲時間從1到10秒。

@Slf4j
public class DelayQueueTest {

    @Test
    public void test() throws InterruptedException {
        DelayQueue<Message> delayQueue = new DelayQueue<>();
        IntStream.rangeClosed(1, 10).forEach(i -> {
            for (int __ = 0; __ < 2; __++)
                delayQueue.add(new Message(i * 1000));
        });

        Executors.newFixedThreadPool(1).submit(() -> {
            while (true) {
                Message message = delayQueue.take();
                log.debug("Got:{}", message);
            }
        });

        TimeUnit.SECONDS.sleep(20);
    }


    @ToString
    class Message implements Delayed {

        private final long delay;
        private final long expire;

        public Message(long delay) {
            this.delay = delay;
            expire = System.currentTimeMillis() + delay;
        }

        @Override
        public long getDelay(TimeUnit unit) {
            //log.debug("getDelay called : {}", unit);
            return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }

        @Override
        public int compareTo(Delayed o) {
            return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
        }
    }
}

輸出以下:

17:14:43.957 [pool-1-thread-1] DEBUG me.josephzhu.javaconcurrenttest.concurrent.queues.DelayQueueTest - Got:DelayQueueTest.Message(delay=1000, expire=1563354883947)
17:14:44.007 [pool-1-thread-1] DEBUG me.josephzhu.javaconcurrenttest.concurrent.queues.DelayQueueTest - Got:DelayQueueTest.Message(delay=1000, expire=1563354883947)
17:14:44.953 [pool-1-thread-1] DEBUG me.josephzhu.javaconcurrenttest.concurrent.queues.DelayQueueTest - Got:DelayQueueTest.Message(delay=2000, expire=1563354884949)
17:14:44.953 [pool-1-thread-1] DEBUG me.josephzhu.javaconcurrenttest.concurrent.queues.DelayQueueTest - Got:DelayQueueTest.Message(delay=2000, expire=1563354884949)
17:14:45.954 [pool-1-thread-1] DEBUG me.josephzhu.javaconcurrenttest.concurrent.queues.DelayQueueTest - Got:DelayQueueTest.Message(delay=3000, expire=1563354885949)
17:14:45.954 [pool-1-thread-1] DEBUG me.josephzhu.javaconcurrenttest.concurrent.queues.DelayQueueTest - Got:DelayQueueTest.Message(delay=3000, expire=1563354885949)
17:14:46.956 [pool-1-thread-1] DEBUG me.josephzhu.javaconcurrenttest.concurrent.queues.DelayQueueTest - Got:DelayQueueTest.Message(delay=4000, expire=1563354886949)
17:14:46.956 [pool-1-thread-1] DEBUG me.josephzhu.javaconcurrenttest.concurrent.queues.DelayQueueTest - Got:DelayQueueTest.Message(delay=4000, expire=1563354886949)
17:14:47.953 [pool-1-thread-1] DEBUG me.josephzhu.javaconcurrenttest.concurrent.queues.DelayQueueTest - Got:DelayQueueTest.Message(delay=5000, expire=1563354887949)
17:14:47.953 [pool-1-thread-1] DEBUG me.josephzhu.javaconcurrenttest.concurrent.queues.DelayQueueTest - Got:DelayQueueTest.Message(delay=5000, expire=1563354887949)
17:14:48.953 [pool-1-thread-1] DEBUG me.josephzhu.javaconcurrenttest.concurrent.queues.DelayQueueTest - Got:DelayQueueTest.Message(delay=6000, expire=1563354888949)
17:14:48.953 [pool-1-thread-1] DEBUG me.josephzhu.javaconcurrenttest.concurrent.queues.DelayQueueTest - Got:DelayQueueTest.Message(delay=6000, expire=1563354888949)
17:14:49.954 [pool-1-thread-1] DEBUG me.josephzhu.javaconcurrenttest.concurrent.queues.DelayQueueTest - Got:DelayQueueTest.Message(delay=7000, expire=1563354889949)
17:14:49.954 [pool-1-thread-1] DEBUG me.josephzhu.javaconcurrenttest.concurrent.queues.DelayQueueTest - Got:DelayQueueTest.Message(delay=7000, expire=1563354889949)
17:14:50.954 [pool-1-thread-1] DEBUG me.josephzhu.javaconcurrenttest.concurrent.queues.DelayQueueTest - Got:DelayQueueTest.Message(delay=8000, expire=1563354890949)
17:14:50.955 [pool-1-thread-1] DEBUG me.josephzhu.javaconcurrenttest.concurrent.queues.DelayQueueTest - Got:DelayQueueTest.Message(delay=8000, expire=1563354890949)
17:14:51.953 [pool-1-thread-1] DEBUG me.josephzhu.javaconcurrenttest.concurrent.queues.DelayQueueTest - Got:DelayQueueTest.Message(delay=9000, expire=1563354891949)
17:14:51.953 [pool-1-thread-1] DEBUG me.josephzhu.javaconcurrenttest.concurrent.queues.DelayQueueTest - Got:DelayQueueTest.Message(delay=9000, expire=1563354891949)
17:14:52.953 [pool-1-thread-1] DEBUG me.josephzhu.javaconcurrenttest.concurrent.queues.DelayQueueTest - Got:DelayQueueTest.Message(delay=10000, expire=1563354892949)
17:14:52.953 [pool-1-thread-1] DEBUG me.josephzhu.javaconcurrenttest.concurrent.queues.DelayQueueTest - Got:DelayQueueTest.Message(delay=10000, expire=1563354892949)

能夠看到每過1秒輸出2條日誌,符合預期。

一個真實的隊列誤用的血案

以前生產上遇到過一個OOM的問題,排查下來是隊列使用不當,這裏咱們就來看下這個問題,代碼邏輯是:

  • 咱們有一個10個線程的線程池
  • 咱們使用了LinkedTransferQueue阻塞隊列
  • 咱們經過線程池異步向這個隊列提交4000個任務
  • 咱們經過線程池異步從這個隊列獲取4000個任務

比較特殊的是,使用了transfer()方法,開發的小夥伴可能以爲LinkedTransferQueue比較酷炫,因此選擇了這個隊列,而且認爲transfer()能夠直接把任務交給消費者性能較高,因此使用了這個方法。

image_1dfvkan7url27sg3ig3of15j19u.png-140.7kB

代碼以下:

@Slf4j
public class BlockingQueueMisuse {

    LinkedTransferQueue<String> linkedTransferQueue = new LinkedTransferQueue<>();

    @Test
    public void test() throws InterruptedException {
        int taskCount = 4000;
        StopWatch stopWatch = new StopWatch();
        stopWatch.start("misuse");
        ExecutorService threadPool = Executors.newFixedThreadPool(10);
        //ExecutorService threadPool = Executors.newCachedThreadPool();
        IntStream.rangeClosed(1, taskCount).forEach(i -> threadPool.submit(() -> {
            try {
                linkedTransferQueue.transfer("message" + i);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }));
        IntStream.rangeClosed(1, taskCount).forEach(i -> threadPool.submit(() -> {
            try {
                log.debug("Got:{}", linkedTransferQueue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }));
        threadPool.shutdown();
        threadPool.awaitTermination(1, TimeUnit.HOURS);
        stopWatch.stop();
        log.info(stopWatch.prettyPrint());
    }
}

運行程序後發現沒有任何輸出,其實這是由於只有10個線程,生產者須要存的元素數量是4000大大超過了10,全部線程都在等待:
image_1dfvknr245v01709123e94vmurar.png-261.9kB

因而,他沒多想把線程池修改成了newCachedThreadPool,程序能夠正常執行了,看看運行結果:

image_1dfvktfut1uj0h8gke2g2s1pg5b8.png-89.9kB
這個代碼是很嚇人的,運行過程當中開啓了幾千個線程。咱們想一下緣由,其實newCachedThreadPool使用的是SynchronousQueue,在沒有可用線程的狀況下就會新建線程,而這個特性趕上了transfer()的特性,就會致使線程池建立幾千個線程。

即便咱們把代碼修改成使用LinkedBlockingQueue,配合newCachedThreadPool也會建立幾十個線程(若是元素數量足夠多,幾百個幾千個也有可能)。由於一旦阻塞,newCachedThreadPool就會絕不猶豫建立新線程。

對於生產者消費者這種任務,仍是建議直接使用線程來實現,生產者消費者的阻塞不相互干擾,並且線程池也是使用隊列來管理任務的,用了線程池至關於兩次隊列,沒有必要。

回顧總結

咱們來看一下此次實驗涉及到的一些阻塞隊列:

  • ArrayBlockingQueue :一個由數組結構組成的有界阻塞隊列。
  • LinkedBlockingQueue :一個由鏈表結構組成的有界阻塞隊列。
  • PriorityBlockingQueue :一個支持優先級排序的無界阻塞隊列。
  • DelayQueue:一個使用優先級隊列PriorityQueue實現的無界阻塞隊列。
  • SynchronousQueue:一個不存儲元素的阻塞隊列。
  • LinkedTransferQueue:一個由鏈表結構組成的無界阻塞隊列。

DelayQueue、SynchronousQueue和PriorityBlockingQueue是特種隊列,有特殊用途根據須要選擇。
LinkedTransferQueue也算是特種隊列,它能夠實現相似背壓的效果,在特殊場景下使用。
ArrayBlockingQueue和LinkedBlockingQueue背後的數據結構不一樣,它們多是咱們最經常使用的隊列了,區別以下:

  • ArrayBlockingQueue有公平特性,開啓公平特性會下降吞吐,1000000次操做結果以下,前面一個是關閉公平,後面一個是開啓公平
    image_1dfvmkm9v1hmjto41e6p1ovs492bl.png-54.7kB
  • ArrayBlockingQueue會預分配存儲,可是這也意味着會一會兒佔用大塊內存,LinkedBlockingQueue不是這樣的
  • 若是須要無界的話只能選擇LinkedBlockingQueue(固然LinkedBlockingQueue也能夠有界)

非阻塞隊列ConcurrentLinkedQueue比較特殊,首先它不是阻塞隊列,其次它不使用鎖,而是使用CAS,在超高併發的場景下,顯然它能夠到達更好的性能。

這裏利用以前的代碼最後作了一次對比測試,這裏咱們沒有測試併發存取模式,由於消費者不知道什麼時候消費完畢,在消費不到數據的時候進行死循環意義不大:

image_1dfvo3359d6f14ah1qu21hu41bojc2.png-47.3kB

因此在特殊的場景下,好比生產者生產好了數據扔到隊列中,有N多個消費者須要併發消費這個時或許能夠發揮ConcurrentLinkedQueue的威力(可是,以前也說過了,它的size()比較坑爹),常年處於空的隊列不太適合,這個時候使用阻塞隊列更合適。

好吧,看來90%的時候仍是用ArrayBlockingQueue和LinkedBlockingQueue太平,有界用前者,須要無界用後者,可是認真考慮下,你真的須要無界嗎。經過咱們的測試能夠發現這些隊列在高併發下都有着百萬以上的QPS性能,通常而言用哪一個都不會出現瓶頸,反而是咱們更應該注意由於阻塞致使的線程數量增多和隊列的容量佔用的內存。

本文中,咱們還花式使用了各類方式來測試隊列:

  • 普通線程池
  • ForkJoin
  • 獨立線程

這裏想說的是,對於生產消費這樣的任務最好仍是使用阻塞隊列配置獨立的消費線程,生產者能夠直接是業務線程,而不是去使用線程池,沒有這個必要。

一樣,代碼見個人Github,歡迎clone後本身把玩,歡迎點贊。

歡迎關注個人微信公衆號:隨緣主人的園子

image_1dfvp8d55spm14t7erkr3mdbscf.png-45kB

相關文章
相關標籤/搜索