老樣子,咱們仍是從一些例子開始慢慢熟悉各類併發隊列。以看小說看故事的心態來學習不會顯得那麼枯燥並且更容易記憶深入。java
阻塞隊列最適合作的事情就是作爲生產消費者的中間存儲,以抵抗生產者消費者速率不匹配的問題,不可是在速率不匹配的時候可以有地方暫存任務,並且能在隊列滿或空的時候讓線程進行阻塞,讓出CPU的時間。這裏對於阻塞兩字加粗,是由於其實Java的線程在這個時候是等待(WAITING)狀態而不是阻塞(BLOCKED),這個容易引發歧義。git
下面咱們來寫一個程序比較一下阻塞和等待:github
@Slf4j public class BlockVsWait { Object locker = new Object(); ArrayBlockingQueue<Integer> arrayBlockingQueue1 = new ArrayBlockingQueue<>(1); ArrayBlockingQueue<Integer> arrayBlockingQueue2 = new ArrayBlockingQueue<>(1); @Test public void test() throws InterruptedException { arrayBlockingQueue1.put(1); Thread waitOnTake = new Thread(() -> { synchronized (locker) { try { arrayBlockingQueue2.take(); } catch (InterruptedException e) { e.printStackTrace(); } } }); waitOnTake.setName("waitOnTake"); waitOnTake.start(); Thread waitOnPut = new Thread(() -> { try { arrayBlockingQueue1.put(2); } catch (InterruptedException e) { e.printStackTrace(); } }); waitOnPut.setName("waitOnPut"); waitOnPut.start(); Thread block = new Thread(() -> { synchronized (locker) { log.info("OK"); } }); block.setName("block"); block.start(); block.join(); } }
在上面的代碼裏,咱們開啓了三個線程:數組
運行程序以後,咱們看一下線程的狀態,能夠看到:安全
咱們來查看一下線程這兩種狀態的定義:微信
通俗一點說,BLOCKED就是線程本身想作事情,可是很無奈只能等別人先把事情幹完,因此說是被阻塞,被動的,WAITING就是線程本身主動願意放棄CPU時間進行等待,等別人在合適的時候通知本身來繼續幹活,因此說是等待中,主動的。Blocking Queue實際上是讓線程Waiting而不是Block。數據結構
如今,咱們使用阻塞隊列嘗試實現生產者消費者的功能。多線程
首先,實現一個基類,經過一個開關來控制生產者消費者的執行:併發
@Slf4j public abstract class Worker implements Runnable { protected volatile boolean enable = true; protected String name; protected BlockingQueue<Integer> queue; public Worker(String name, BlockingQueue<Integer> queue) { this.name = name; this.queue = queue; } public void stop() { this.enable = false; log.info("Stop:{}", name); } }
而後實現生產者:異步
@Slf4j public class Producer extends Worker { private static AtomicInteger atomicInteger = new AtomicInteger(0); public Producer(String name, BlockingQueue<Integer> queue) { super(name, queue); } @Override public void run() { while (enable) { try { int value = atomicInteger.incrementAndGet(); queue.put(value); log.info("size:{}, put:{}, enable:{}", queue.size(), value, enable); TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { } } log.info("{} quit", name); } }
只要開關開啓,生產者會無限進行數據生產,把數據加入隊列,生產者每100ms生產一個數據,這裏有一個計數器來提供要生產的數據。
下面實現消費者:
@Slf4j public class Consumer extends Worker { private static AtomicInteger totalConsumedAfterShutdown = new AtomicInteger(); public Consumer(String name, BlockingQueue<Integer> queue) { super(name, queue); } public static int totalConsumedAfterShutdown() { return totalConsumedAfterShutdown.get(); } @Override public void run() { while (enable || queue.size() > 0) { try { Integer item = queue.take(); log.info("size:{}, got:{}, enable:{}", queue.size(), item, enable); if (!enable) { totalConsumedAfterShutdown.incrementAndGet(); } TimeUnit.MILLISECONDS.sleep(200); } catch (InterruptedException e) { } } log.info("{} quit", name); } }
一樣,消費者也是在開關開啓或隊列中有數據的時候,會不斷進行數據消費。這裏咱們有一個計數器用來統計開關關閉以後,消費者還能消費多少數據。消費者消費速度是200ms消費一次,明顯比生產者慢一半。經過這個配置咱們能夠想到,若是使用有界阻塞隊列的話,由於消費速度比生產速度慢,因此隊列會慢慢堆積一直到隊列滿,而後生產者線程被阻塞,咱們來寫一個測試程序看看是否是這樣:
@Slf4j public class ArrayBlockingQueueTest { @Test public void test() throws InterruptedException { BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(50, false); List<Worker> workers = new ArrayList<>(); List<Thread> threads = new ArrayList<>(); for (int i = 0; i < 10; i++) { String name = "Producer" + i; Producer worker = new Producer(name, queue); workers.add(worker); Thread thread = new Thread(worker); thread.setName(name); threads.add(thread); thread.start(); } for (int i = 0; i < 4; i++) { String name = "Consumer" + i; Consumer worker = new Consumer(name, queue); workers.add(worker); Thread thread = new Thread(worker); thread.setName(name); threads.add(thread); thread.start(); } Executors.newSingleThreadScheduledExecutor().schedule(() -> { for (Worker worker : workers) { worker.stop(); } }, 2, TimeUnit.SECONDS); for (Thread thread : threads) { thread.join(); } log.info("totalConsumedAfterShutdown:{}", Consumer.totalConsumedAfterShutdown()); } }
在這段代碼裏:
部分運行結果以下:
12:59:34.609 [Producer7] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:33, put:40, enable:true 12:59:34.609 [Producer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:36, put:37, enable:true 12:59:34.609 [Producer8] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:35, put:36, enable:true 12:59:34.609 [Producer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:36, put:38, enable:true 12:59:34.609 [Producer6] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:34, put:39, enable:true 12:59:34.683 [Consumer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:33, got:7, enable:true 12:59:34.683 [Consumer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:34, got:6, enable:true 12:59:34.683 [Consumer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:33, got:5, enable:true 12:59:34.687 [Consumer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:32, got:8, enable:true 12:59:34.701 [Producer5] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:33, put:41, enable:true 12:59:34.701 [Producer4] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:35, put:42, enable:true 12:59:34.701 [Producer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:35, put:44, enable:true 12:59:34.701 [Producer9] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:36, put:43, enable:true 12:59:34.711 [Producer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:37, put:45, enable:true 12:59:34.714 [Producer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:41, put:46, enable:true 12:59:34.714 [Producer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:39, put:48, enable:true 12:59:34.714 [Producer8] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:40, put:50, enable:true 12:59:34.714 [Producer6] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:42, put:49, enable:true 12:59:34.714 [Producer7] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:38, put:47, enable:true 12:59:34.805 [Producer4] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:45, put:53, enable:true 12:59:34.805 [Producer5] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:43, put:51, enable:true 12:59:34.805 [Producer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:44, put:52, enable:true 12:59:34.805 [Producer9] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:46, put:54, enable:true 12:59:34.814 [Producer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:47, put:55, enable:true 12:59:34.818 [Producer8] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:50, put:58, enable:true 12:59:34.818 [Producer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:48, put:57, enable:true 12:59:34.818 [Producer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:49, put:56, enable:true 12:59:34.888 [Consumer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:47, got:12, enable:true 12:59:34.888 [Producer7] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:49, put:60, enable:true 12:59:34.888 [Producer6] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:48, put:59, enable:true 12:59:34.887 [Consumer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:49, got:9, enable:true 12:59:34.887 [Consumer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:48, got:10, enable:true 12:59:34.892 [Consumer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:48, got:11, enable:true 12:59:34.909 [Producer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:49, put:62, enable:true 12:59:34.909 [Producer5] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:50, put:61, enable:true 12:59:35.093 [Producer9] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:50, put:64, enable:true 12:59:35.093 [Consumer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:49, got:13, enable:true 12:59:35.094 [Consumer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:47, got:16, enable:true 12:59:35.094 [Consumer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:49, got:17, enable:true 12:59:35.094 [Producer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:49, put:65, enable:true 12:59:35.094 [Consumer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:49, got:18, enable:true 12:59:35.094 [Producer8] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:50, put:66, enable:true 12:59:35.094 [Producer4] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:48, put:63, enable:true 12:59:35.297 [Consumer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:49, got:19, enable:true 12:59:35.298 [Producer7] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:49, put:69, enable:true 12:59:35.298 [Producer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:47, put:68, enable:true 12:59:35.298 [Producer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:48, put:67, enable:true 12:59:35.298 [Consumer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:46, got:20, enable:true 12:59:35.298 [Producer6] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:50, put:70, enable:true 12:59:35.298 [Consumer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:47, got:15, enable:true 12:59:35.298 [Consumer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:48, got:14, enable:true 12:59:35.502 [Producer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:50, put:74, enable:true 12:59:35.502 [Consumer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:47, got:24, enable:true 12:59:35.502 [Consumer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:49, got:22, enable:true 12:59:35.502 [Consumer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:49, got:23, enable:true 12:59:35.502 [Producer9] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:49, put:73, enable:true 12:59:35.502 [Producer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:48, put:72, enable:true 12:59:35.502 [Consumer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:48, got:21, enable:true 12:59:35.502 [Producer5] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:50, put:71, enable:true 12:59:35.704 [Producer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:49, put:77, enable:true 12:59:35.704 [Consumer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:47, got:30, enable:true 12:59:35.704 [Consumer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:48, got:28, enable:true 12:59:35.704 [Producer8] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:49, put:75, enable:true 12:59:35.704 [Producer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:50, put:80, enable:true 12:59:35.704 [Consumer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:48, got:27, enable:true 12:59:35.704 [Producer4] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:48, put:76, enable:true 12:59:35.704 [Consumer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:48, got:29, enable:true 12:59:35.909 [Consumer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:48, got:32, enable:true 12:59:35.909 [Producer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:50, put:84, enable:true 12:59:35.909 [Consumer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:49, got:25, enable:true 12:59:35.909 [Producer6] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:48, put:79, enable:true 12:59:35.909 [Producer7] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:50, put:78, enable:true 12:59:35.909 [Consumer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:47, got:33, enable:true 12:59:35.909 [Producer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:49, put:83, enable:true 12:59:35.909 [Consumer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:49, got:26, enable:true 12:59:36.113 [Consumer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:48, got:34, enable:true 12:59:36.113 [Consumer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:48, got:35, enable:true 12:59:36.113 [Consumer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:48, got:31, enable:true 12:59:36.113 [Consumer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:47, got:38, enable:true 12:59:36.113 [Producer5] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:48, put:81, enable:true 12:59:36.113 [Producer9] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:48, put:82, enable:true 12:59:36.114 [Producer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:50, put:87, enable:true 12:59:36.114 [Producer8] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:49, put:85, enable:true 12:59:36.313 [pool-1-thread-1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Worker - Stop:Producer0 12:59:36.313 [pool-1-thread-1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Worker - Stop:Producer1 12:59:36.313 [pool-1-thread-1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Worker - Stop:Producer2 12:59:36.313 [pool-1-thread-1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Worker - Stop:Producer3 12:59:36.313 [pool-1-thread-1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Worker - Stop:Producer4 12:59:36.313 [pool-1-thread-1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Worker - Stop:Producer5 12:59:36.313 [pool-1-thread-1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Worker - Stop:Producer6 12:59:36.313 [pool-1-thread-1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Worker - Stop:Producer7 12:59:36.313 [pool-1-thread-1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Worker - Stop:Producer8 12:59:36.314 [pool-1-thread-1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Worker - Stop:Producer9 12:59:36.314 [pool-1-thread-1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Worker - Stop:Consumer0 12:59:36.314 [pool-1-thread-1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Worker - Stop:Consumer1 12:59:36.314 [pool-1-thread-1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Worker - Stop:Consumer2 12:59:36.314 [pool-1-thread-1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Worker - Stop:Consumer3 12:59:36.317 [Consumer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:48, got:39, enable:false 12:59:36.317 [Consumer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:47, got:36, enable:false 12:59:36.317 [Consumer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:46, got:37, enable:false 12:59:36.317 [Consumer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:48, got:40, enable:false 12:59:36.317 [Producer4] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:47, put:86, enable:false 12:59:36.317 [Producer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:48, put:88, enable:false 12:59:36.317 [Producer6] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:49, put:92, enable:false 12:59:36.317 [Producer7] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:50, put:91, enable:false 12:59:36.420 [Producer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - Producer1 quit 12:59:36.420 [Producer6] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - Producer6 quit 12:59:36.420 [Producer7] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - Producer7 quit 12:59:36.420 [Producer4] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - Producer4 quit 12:59:36.522 [Consumer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:48, got:41, enable:false 12:59:36.522 [Consumer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:47, got:44, enable:false 12:59:36.522 [Consumer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:48, got:43, enable:false 12:59:36.522 [Producer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:49, put:96, enable:false 12:59:36.522 [Consumer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:49, got:42, enable:false 12:59:36.522 [Producer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:49, put:90, enable:false 12:59:36.522 [Producer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:48, put:89, enable:false 12:59:36.522 [Producer8] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:50, put:93, enable:false 12:59:36.626 [Producer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - Producer2 quit 12:59:36.626 [Producer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - Producer0 quit 12:59:36.626 [Producer8] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - Producer8 quit 12:59:36.626 [Producer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - Producer3 quit 12:59:36.725 [Consumer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:49, got:45, enable:false 12:59:36.726 [Producer9] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:48, put:95, enable:false 12:59:36.726 [Producer5] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:48, put:94, enable:false 12:59:36.726 [Consumer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:47, got:50, enable:false 12:59:36.725 [Consumer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:48, got:47, enable:false 12:59:36.726 [Consumer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:47, got:48, enable:false 12:59:36.829 [Producer5] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - Producer5 quit 12:59:36.829 [Producer9] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - Producer9 quit 12:59:36.930 [Consumer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:46, got:49, enable:false 12:59:36.930 [Consumer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:44, got:46, enable:false 12:59:36.930 [Consumer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:45, got:51, enable:false 12:59:36.930 [Consumer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:44, got:52, enable:false 12:59:37.133 [Consumer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:42, got:54, enable:false 12:59:37.133 [Consumer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:40, got:57, enable:false 12:59:37.133 [Consumer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:40, got:53, enable:false 12:59:37.133 [Consumer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:41, got:55, enable:false 12:59:37.334 [Consumer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:37, got:59, enable:false 12:59:37.334 [Consumer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:36, got:56, enable:false 12:59:37.334 [Consumer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:36, got:60, enable:false 12:59:37.334 [Consumer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:37, got:58, enable:false 12:59:37.538 [Consumer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:34, got:61, enable:false 12:59:37.538 [Consumer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:32, got:63, enable:false 12:59:37.538 [Consumer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:33, got:64, enable:false 12:59:37.539 [Consumer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:32, got:62, enable:false 12:59:37.742 [Consumer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:29, got:68, enable:false 12:59:37.742 [Consumer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:30, got:65, enable:false 12:59:37.742 [Consumer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:30, got:66, enable:false 12:59:37.742 [Consumer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:28, got:67, enable:false 12:59:37.948 [Consumer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:26, got:70, enable:false 12:59:37.948 [Consumer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:24, got:69, enable:false 12:59:37.948 [Consumer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:24, got:72, enable:false 12:59:37.948 [Consumer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:25, got:71, enable:false 12:59:38.149 [Consumer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:21, got:75, enable:false 12:59:38.149 [Consumer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:20, got:76, enable:false 12:59:38.149 [Consumer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:22, got:74, enable:false 12:59:38.149 [Consumer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:20, got:73, enable:false 12:59:38.350 [Consumer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:18, got:80, enable:false 12:59:38.350 [Consumer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:18, got:77, enable:false 12:59:38.350 [Consumer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:16, got:79, enable:false 12:59:38.350 [Consumer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:17, got:78, enable:false 12:59:38.553 [Consumer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:12, got:83, enable:false 12:59:38.553 [Consumer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:14, got:84, enable:false 12:59:38.553 [Consumer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:13, got:82, enable:false 12:59:38.553 [Consumer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:12, got:81, enable:false 12:59:38.759 [Consumer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:8, got:87, enable:false 12:59:38.759 [Consumer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:8, got:88, enable:false 12:59:38.759 [Consumer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:8, got:86, enable:false 12:59:38.759 [Consumer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:8, got:85, enable:false 12:59:38.960 [Consumer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:7, got:92, enable:false 12:59:38.963 [Consumer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:5, got:89, enable:false 12:59:38.963 [Consumer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:4, got:90, enable:false 12:59:38.963 [Consumer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:5, got:91, enable:false 12:59:39.161 [Consumer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:3, got:96, enable:false 12:59:39.168 [Consumer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:1, got:93, enable:false 12:59:39.168 [Consumer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:0, got:94, enable:false 12:59:39.168 [Consumer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:1, got:95, enable:false 12:59:39.168 [Consumer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - Consumer2 quit 12:59:39.168 [Consumer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - Consumer1 quit 12:59:39.168 [Consumer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - Consumer3 quit
從結果看到幾個結論:
固然這個狀態不那麼容易碰巧遇到,我運行了20+次代碼才遇到一次,你也能夠把sleep移到前面去這樣更容易出現這樣的問題。
細細品味一下爲何有一個消費者卡住了,咱們不是判斷了隊列中有數據才繼續執行take()的嗎?問題就出在這裏,在判斷的時候隊列中的確有數據,看看Consumer0最後輸出了3,可是在這以後的瞬間,還有3條數據都被其它線程消費完了,等到執行下一行代碼的時候就卡住了。在編寫多線程程序的時候,咱們很容易去假設:
這個Bug是很容易忽略的,咱們能夠改一下消費者代碼,利用有超時等待的poll()來解決這個問題:
@Override public void run() { while (enable || queue.size() > 0) { try { Integer item = queue.poll(1, TimeUnit.SECONDS); log.info("size:{}, got:{}, enable:{}", queue.size(), item, enable); if (!enable && item != null) { totalConsumedAfterShutdown.incrementAndGet(); } TimeUnit.MILLISECONDS.sleep(200); } catch (InterruptedException e) { } } log.info("{} quit", name); }
修改主程序後能夠獲得下面的結果:
值得注意幾點:
前面咱們也看到了,隊列消費的操做能夠take()能夠poll(),各類操做的區別以下:
這些操做之間的性能是否有區別呢,咱們寫一個簡單的程序測試一下
@Slf4j public class QueueBenchmark { int taskCount = 20000000; int threadCount = 10; @Test public void test() throws InterruptedException { List<Queue<Integer>> queues = getQueues(); benchmark("add", queues, taskCount, threadCount); benchmark("poll", queues, taskCount, threadCount); benchmark("offer", queues, taskCount, threadCount); benchmark("size", queues, taskCount, threadCount); benchmark("remove", queues, taskCount, threadCount); } private List<Queue<Integer>> getQueues() { return Arrays.asList(new ConcurrentLinkedQueue<>(), new LinkedBlockingQueue<>(), new ArrayBlockingQueue<>(taskCount, false), new LinkedTransferQueue<>(), new PriorityBlockingQueue<>(), new LinkedList<>()); } private void benchmark(String operation, List<Queue<Integer>> queues, int taskCount, int threadCount) throws InterruptedException { StopWatch stopWatch = new StopWatch(); queues.forEach(queue -> { stopWatch.start(queue.getClass().getSimpleName() + "-" + operation); try { tasks(queue, taskCount, threadCount, operation); } catch (InterruptedException e) { e.printStackTrace(); } stopWatch.stop(); log.info("queue:{}, operation:{}, size:{}, qps:{}", queue.getClass().getSimpleName(), operation, queue.size(), (long) taskCount * 1000 / stopWatch.getLastTaskTimeMillis()); }); log.info(stopWatch.prettyPrint()); } private void tasks(Queue<Integer> queue, int taskCount, int threadCount, String operation) throws InterruptedException { ForkJoinPool forkJoinPool = new ForkJoinPool(threadCount); forkJoinPool.execute(() -> IntStream.rangeClosed(1, taskCount).parallel().forEach(i -> { IntConsumer opt = task(queue, operation); if (queue instanceof LinkedList) { synchronized (queue) { opt.accept(i); } } else { opt.accept(i); } } )); forkJoinPool.shutdown(); forkJoinPool.awaitTermination(1, TimeUnit.HOURS); } private IntConsumer task(Queue<Integer> queue, String name) { if (name.equals("add")) return queue::add; if (name.equals("offer")) return queue::offer; if (name.equals("poll")) return i -> queue.poll(); if (name.equals("remove")) return i -> queue.remove(); if (name.equals("size")) return i -> queue.size(); return i -> { }; } }
在代碼裏,咱們測試10個線程下,對各類隊列的各類方法執行N次操做的耗時。
結論以下,表格中數據的單位毫秒,也就是耗時,數字越小性能越好:
有幾個地方值得注意:
下面咱們稍微改下代碼測試一下BlockingQueue的put()和take():
@Slf4j public class BlockingQueueBenchmark { int taskCount = 20000000; int threadCount = 10; @Test public void test() throws InterruptedException { List<BlockingQueue<Integer>> queues = getQueues(); benchmark("put", queues, taskCount, threadCount); benchmark("take", queues, taskCount, threadCount); } private List<BlockingQueue<Integer>> getQueues() { return Arrays.asList( new LinkedBlockingQueue<>(), new LinkedTransferQueue<>(), new ArrayBlockingQueue<>(taskCount, false), new PriorityBlockingQueue<>()); } private void benchmark(String operation, List<BlockingQueue<Integer>> queues, int taskCount, int threadCount) throws InterruptedException { StopWatch stopWatch = new StopWatch(); queues.forEach(queue -> { stopWatch.start(queue.getClass().getSimpleName() + "-" + operation); try { tasks(queue, taskCount, threadCount, operation); } catch (InterruptedException e) { e.printStackTrace(); } stopWatch.stop(); log.info("queue:{}, operation:{}, size:{}", queue.getClass().getSimpleName(), operation, queue.size()); }); log.info(stopWatch.prettyPrint()); } private void tasks(BlockingQueue<Integer> queue, int taskCount, int threadCount, String operation) throws InterruptedException { ForkJoinPool forkJoinPool = new ForkJoinPool(threadCount); forkJoinPool.execute(() -> IntStream.rangeClosed(1, taskCount).parallel().forEach(task(queue, operation))); forkJoinPool.shutdown(); forkJoinPool.awaitTermination(1, TimeUnit.HOURS); } private IntConsumer task(BlockingQueue<Integer> queue, String name) { if (name.equals("put")) return i -> { try { queue.put(i); } catch (InterruptedException e) { e.printStackTrace(); } }; if (name.equals("take")) return i -> { try { queue.take(); } catch (InterruptedException e) { e.printStackTrace(); } }; return i -> { }; } }
把結果一塊兒完善到前面表格中:
能夠看到,阻塞的方法和非阻塞的性能差很少,也是根據須要選擇便可。看代碼實現的話也能夠看到不少隊列對於各類存取方法邏輯基本是一致的。
各個隊列之間的性能貌似區別不大,我感受這個測試寫的不是很好,可能和線程池的調度也有關係,咱們接下去再從新換一種測試方式來測試下各類隊列的吞吐。
在此次的測試中,咱們模擬一下場景:
@Data @AllArgsConstructor @NoArgsConstructor class TestCase { private int elementCount; private Mode mode; private int producerCount; private int consumerCount; }
模擬一下不一樣的消費者生產者線程數量配比的狀況下,各類隊列完成必定數量元素的存取操做總共的耗時。咱們定義三種模式:
enum Mode { ProducerAndConsumerShareThread, ProducerAndThenConsumer, ConcurrentProducerAndConsumer }
咱們定義的全部測試場景以下:
List<TestCase> testCases = new ArrayList<>(); testCases.add(new TestCase(element_count, Mode.ConcurrentProducerAndConsumer, 1, 1)); testCases.add(new TestCase(element_count, Mode.ConcurrentProducerAndConsumer, 10, 10)); testCases.add(new TestCase(element_count, Mode.ConcurrentProducerAndConsumer, 100, 100)); testCases.add(new TestCase(element_count, Mode.ConcurrentProducerAndConsumer, 1000, 1000)); testCases.add(new TestCase(element_count, Mode.ConcurrentProducerAndConsumer, Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors())); testCases.add(new TestCase(element_count, Mode.ConcurrentProducerAndConsumer, 1, 100)); testCases.add(new TestCase(element_count, Mode.ConcurrentProducerAndConsumer, 100, 1)); testCases.add(new TestCase(element_count, Mode.ProducerAndConsumerShareThread, 1, 0)); testCases.add(new TestCase(element_count, Mode.ProducerAndConsumerShareThread, 10, 0)); testCases.add(new TestCase(element_count, Mode.ProducerAndConsumerShareThread, 100, 0)); testCases.add(new TestCase(element_count, Mode.ProducerAndConsumerShareThread, 1000, 0)); testCases.add(new TestCase(element_count, Mode.ProducerAndConsumerShareThread, Runtime.getRuntime().availableProcessors(), 0)); testCases.add(new TestCase(element_count, Mode.ProducerAndThenConsumer, 1, 1)); testCases.add(new TestCase(element_count, Mode.ProducerAndThenConsumer, 10, 10)); testCases.add(new TestCase(element_count, Mode.ProducerAndThenConsumer, 100, 100)); testCases.add(new TestCase(element_count, Mode.ProducerAndThenConsumer, 1000, 1000)); testCases.add(new TestCase(element_count, Mode.ProducerAndThenConsumer, Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors()));
十幾種測試,覆蓋這些場景:
主要測試三種隊列,每一種隊列測試之間GC一次儘可能排除干擾:
LinkedBlockingQueue<String> linkedBlockingQueue = new LinkedBlockingQueue<>(); for (TestCase testCase : testCases) { System.gc(); benchmark(linkedBlockingQueue, testCase); } linkedBlockingQueue = null; LinkedTransferQueue<String> linkedTransferQueue = new LinkedTransferQueue<>(); for (TestCase testCase : testCases) { System.gc(); benchmark(linkedTransferQueue, testCase); } linkedTransferQueue = null; ArrayBlockingQueue<String> arrayBlockingQueue = new ArrayBlockingQueue<>(element_count); for (TestCase testCase : testCases) { System.gc(); benchmark(arrayBlockingQueue, testCase); } arrayBlockingQueue = null;
生產者:
class ProducerTask implements Runnable { private String name; private BlockingQueue<String> queue; private TestCase testCase; private CountDownLatch startCountDownLatch; private CountDownLatch finishCountDownLatch; public ProducerTask(CountDownLatch startCountDownLatch, CountDownLatch finishCountDownLatch, String name, BlockingQueue<String> queue, TestCase testCase) { this.startCountDownLatch = startCountDownLatch; this.finishCountDownLatch = finishCountDownLatch; this.name = name; this.queue = queue; this.testCase = testCase; } @Override public void run() { try { startCountDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } int count = testCase.elementCount / testCase.getProducerCount(); if (testCase.mode == Mode.ProducerAndConsumerShareThread) { for (int i = 0; i < count; i++) { try { queue.put(name + i); queue.take(); } catch (Exception e) { e.printStackTrace(); } } } else { for (int i = 0; i < count; i++) { try { queue.put(name + i); } catch (Exception e) { e.printStackTrace(); } } } finishCountDownLatch.countDown(); } }
此次的測試,咱們預先根據線程數量算好執行次數,而不是像以前的測試同樣全部的任務統一由線程池調度,這樣更容易測試出隊列自己的性能,排除干擾。這裏能夠看到若是是存取共享模式的話,生產者直接作存取操做,其它模式的話,生產者僅僅作存的操做。
消費者:
class ConsumerTask implements Runnable { private BlockingQueue<String> queue; private TestCase testCase; private CountDownLatch startCountDownLatch; private CountDownLatch finishCountDownLatch; public ConsumerTask(CountDownLatch startCountDownLatch, CountDownLatch finishCountDownLatch, BlockingQueue<String> queue, TestCase testCase) { this.startCountDownLatch = startCountDownLatch; this.finishCountDownLatch = finishCountDownLatch; this.queue = queue; this.testCase = testCase; } @Override public void run() { try { startCountDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } int count = testCase.elementCount / testCase.getConsumerCount(); if (testCase.mode != Mode.ProducerAndConsumerShareThread) { for (int i = 0; i < count; i++) { try { queue.take(); } catch (Exception e) { e.printStackTrace(); } } } finishCountDownLatch.countDown(); } }
生產者和消費者咱們都用了兩個CountDownLatch來作攔截,一個startCountDownLatch用來在全部線程都啓動後由主線程通知一會兒放開全部的線程,一個finishCountDownLatch用來讓主線程等待線程的執行完畢。
主要的測試代碼以下:
private void benchmark(BlockingQueue<String> queue, TestCase testCase) throws InterruptedException { long begin = System.currentTimeMillis(); log.info("\r\n==========================\r\nBegin benchmark Queue:[{}], case:{}", queue.getClass().getSimpleName(), testCase.toString()); CountDownLatch startCountDownLatch = new CountDownLatch(1); if (testCase.mode == Mode.ProducerAndConsumerShareThread) { CountDownLatch finishCountDownLatch = new CountDownLatch(testCase.getProducerCount()); for (int i = 0; i < testCase.getProducerCount(); i++) { new Thread(new ProducerTask( startCountDownLatch, finishCountDownLatch, String.format("Thread_%d_", i), queue, testCase)).start(); } startCountDownLatch.countDown(); finishCountDownLatch.await(); } else if (testCase.mode == Mode.ConcurrentProducerAndConsumer) { CountDownLatch finishCountDownLatch = new CountDownLatch(testCase.getProducerCount() + testCase.getConsumerCount()); for (int i = 0; i < testCase.getProducerCount(); i++) { new Thread(new ProducerTask( startCountDownLatch, finishCountDownLatch, String.format("Thread_%d_", i), queue, testCase)).start(); } for (int i = 0; i < testCase.getConsumerCount(); i++) { new Thread(new ConsumerTask( startCountDownLatch, finishCountDownLatch, queue, testCase)).start(); } startCountDownLatch.countDown(); finishCountDownLatch.await(); } else if (testCase.mode == Mode.ProducerAndThenConsumer) { CountDownLatch finishCountDownLatch = new CountDownLatch(testCase.getProducerCount()); for (int i = 0; i < testCase.getProducerCount(); i++) { new Thread(new ProducerTask( startCountDownLatch, finishCountDownLatch, String.format("Thread_%d_", i), queue, testCase)).start(); } startCountDownLatch.countDown(); finishCountDownLatch.await(); startCountDownLatch = new CountDownLatch(1); finishCountDownLatch = new CountDownLatch(testCase.getConsumerCount()); for (int i = 0; i < testCase.getConsumerCount(); i++) { new Thread(new ConsumerTask( startCountDownLatch, finishCountDownLatch, queue, testCase)).start(); } startCountDownLatch.countDown(); finishCountDownLatch.await(); } long finish = System.currentTimeMillis(); log.info("Finish benchmark Queue:[{}], case:{}, QPS:{}\r\n==========================\n", queue.getClass().getSimpleName(), testCase.toString(), (long) element_count * 1000 / (finish - begin)); }
能夠看到三種模式的處理不一樣:
整個測試結果彙總以下(這個測試是在12核阿里雲跑出來的,元素數1000萬):
說實話這個測試的結果不是我想象的那樣,我想象的是隨着併發的增多隊列性能會急劇降低,並且各類隊列之間有顯著的性能差別,這個結果是這樣這也能夠說明這些隊列性能都是很不錯的,沒有明顯的短板。
能夠大概得出幾個結論:
通常而言,阻塞隊列中,無界隊列能夠選擇LinkedBlockingQueue,有界隊列能夠選擇ArrayBlockingQueue,後者還有公平參數能夠開啓公平特性,有關這個特性下面咱們也會來觀察。
SynchronousQueue是沒有容量的阻塞隊列,只有等另外一個線程移出元素後才能插入元素成功。這裏咱們寫一段代碼來測試,沿用以前的消費者和生產者類,只是修改了2秒後關閉隊列的地方,這裏咱們加上了interrupt()操做,不然生產者是沒法退出的:
@Slf4j public class SynchronousQueueTest { @Test public void test() throws InterruptedException { SynchronousQueue<Integer> queue = new SynchronousQueue<>(false); List<Worker> workers = new ArrayList<>(); List<Thread> threads = new ArrayList<>(); for (int i = 0; i < 10; i++) { String name = "Producer" + i; Producer worker = new Producer(name, queue); workers.add(worker); Thread thread = new Thread(worker); thread.setName(name); threads.add(thread); thread.start(); } for (int i = 0; i < 4; i++) { String name = "Consumer" + i; Consumer worker = new Consumer(name, queue); workers.add(worker); Thread thread = new Thread(worker); thread.setName(name); threads.add(thread); thread.start(); } Executors.newSingleThreadScheduledExecutor().schedule(() -> { for (Worker worker : workers) { worker.stop(); } for (Thread thread : threads) { thread.interrupt(); } }, 2, TimeUnit.SECONDS); for (Thread thread : threads) { thread.join(); } } }
咱們先把公平參數設置爲false看看輸出:
搜索日誌能夠發現找不到Producer0~Producer5這6個生產者的蹤影,由於沒有消費者來拉取它們的數據,它們都卡住了,這些生產者都餓死了,日誌中最小的put也是從7開始的。改成公平模式試試:
此次能夠找到全部生產者的日誌,公平模式也就是全部等待的線程FIFO次序來訪問隊列:
這裏給出一個延遲隊列的例子,咱們往隊列提交10次延遲消息,每次提交2條同樣的消息,消息的絕對延遲時間從1到10秒。
@Slf4j public class DelayQueueTest { @Test public void test() throws InterruptedException { DelayQueue<Message> delayQueue = new DelayQueue<>(); IntStream.rangeClosed(1, 10).forEach(i -> { for (int __ = 0; __ < 2; __++) delayQueue.add(new Message(i * 1000)); }); Executors.newFixedThreadPool(1).submit(() -> { while (true) { Message message = delayQueue.take(); log.debug("Got:{}", message); } }); TimeUnit.SECONDS.sleep(20); } @ToString class Message implements Delayed { private final long delay; private final long expire; public Message(long delay) { this.delay = delay; expire = System.currentTimeMillis() + delay; } @Override public long getDelay(TimeUnit unit) { //log.debug("getDelay called : {}", unit); return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } @Override public int compareTo(Delayed o) { return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS)); } } }
輸出以下:
17:14:43.957 [pool-1-thread-1] DEBUG me.josephzhu.javaconcurrenttest.concurrent.queues.DelayQueueTest - Got:DelayQueueTest.Message(delay=1000, expire=1563354883947) 17:14:44.007 [pool-1-thread-1] DEBUG me.josephzhu.javaconcurrenttest.concurrent.queues.DelayQueueTest - Got:DelayQueueTest.Message(delay=1000, expire=1563354883947) 17:14:44.953 [pool-1-thread-1] DEBUG me.josephzhu.javaconcurrenttest.concurrent.queues.DelayQueueTest - Got:DelayQueueTest.Message(delay=2000, expire=1563354884949) 17:14:44.953 [pool-1-thread-1] DEBUG me.josephzhu.javaconcurrenttest.concurrent.queues.DelayQueueTest - Got:DelayQueueTest.Message(delay=2000, expire=1563354884949) 17:14:45.954 [pool-1-thread-1] DEBUG me.josephzhu.javaconcurrenttest.concurrent.queues.DelayQueueTest - Got:DelayQueueTest.Message(delay=3000, expire=1563354885949) 17:14:45.954 [pool-1-thread-1] DEBUG me.josephzhu.javaconcurrenttest.concurrent.queues.DelayQueueTest - Got:DelayQueueTest.Message(delay=3000, expire=1563354885949) 17:14:46.956 [pool-1-thread-1] DEBUG me.josephzhu.javaconcurrenttest.concurrent.queues.DelayQueueTest - Got:DelayQueueTest.Message(delay=4000, expire=1563354886949) 17:14:46.956 [pool-1-thread-1] DEBUG me.josephzhu.javaconcurrenttest.concurrent.queues.DelayQueueTest - Got:DelayQueueTest.Message(delay=4000, expire=1563354886949) 17:14:47.953 [pool-1-thread-1] DEBUG me.josephzhu.javaconcurrenttest.concurrent.queues.DelayQueueTest - Got:DelayQueueTest.Message(delay=5000, expire=1563354887949) 17:14:47.953 [pool-1-thread-1] DEBUG me.josephzhu.javaconcurrenttest.concurrent.queues.DelayQueueTest - Got:DelayQueueTest.Message(delay=5000, expire=1563354887949) 17:14:48.953 [pool-1-thread-1] DEBUG me.josephzhu.javaconcurrenttest.concurrent.queues.DelayQueueTest - Got:DelayQueueTest.Message(delay=6000, expire=1563354888949) 17:14:48.953 [pool-1-thread-1] DEBUG me.josephzhu.javaconcurrenttest.concurrent.queues.DelayQueueTest - Got:DelayQueueTest.Message(delay=6000, expire=1563354888949) 17:14:49.954 [pool-1-thread-1] DEBUG me.josephzhu.javaconcurrenttest.concurrent.queues.DelayQueueTest - Got:DelayQueueTest.Message(delay=7000, expire=1563354889949) 17:14:49.954 [pool-1-thread-1] DEBUG me.josephzhu.javaconcurrenttest.concurrent.queues.DelayQueueTest - Got:DelayQueueTest.Message(delay=7000, expire=1563354889949) 17:14:50.954 [pool-1-thread-1] DEBUG me.josephzhu.javaconcurrenttest.concurrent.queues.DelayQueueTest - Got:DelayQueueTest.Message(delay=8000, expire=1563354890949) 17:14:50.955 [pool-1-thread-1] DEBUG me.josephzhu.javaconcurrenttest.concurrent.queues.DelayQueueTest - Got:DelayQueueTest.Message(delay=8000, expire=1563354890949) 17:14:51.953 [pool-1-thread-1] DEBUG me.josephzhu.javaconcurrenttest.concurrent.queues.DelayQueueTest - Got:DelayQueueTest.Message(delay=9000, expire=1563354891949) 17:14:51.953 [pool-1-thread-1] DEBUG me.josephzhu.javaconcurrenttest.concurrent.queues.DelayQueueTest - Got:DelayQueueTest.Message(delay=9000, expire=1563354891949) 17:14:52.953 [pool-1-thread-1] DEBUG me.josephzhu.javaconcurrenttest.concurrent.queues.DelayQueueTest - Got:DelayQueueTest.Message(delay=10000, expire=1563354892949) 17:14:52.953 [pool-1-thread-1] DEBUG me.josephzhu.javaconcurrenttest.concurrent.queues.DelayQueueTest - Got:DelayQueueTest.Message(delay=10000, expire=1563354892949)
能夠看到每過1秒輸出2條日誌,符合預期。
以前生產上遇到過一個OOM的問題,排查下來是隊列使用不當,這裏咱們就來看下這個問題,代碼邏輯是:
比較特殊的是,使用了transfer()方法,開發的小夥伴可能以爲LinkedTransferQueue比較酷炫,因此選擇了這個隊列,而且認爲transfer()能夠直接把任務交給消費者性能較高,因此使用了這個方法。
代碼以下:
@Slf4j public class BlockingQueueMisuse { LinkedTransferQueue<String> linkedTransferQueue = new LinkedTransferQueue<>(); @Test public void test() throws InterruptedException { int taskCount = 4000; StopWatch stopWatch = new StopWatch(); stopWatch.start("misuse"); ExecutorService threadPool = Executors.newFixedThreadPool(10); //ExecutorService threadPool = Executors.newCachedThreadPool(); IntStream.rangeClosed(1, taskCount).forEach(i -> threadPool.submit(() -> { try { linkedTransferQueue.transfer("message" + i); } catch (InterruptedException e) { e.printStackTrace(); } })); IntStream.rangeClosed(1, taskCount).forEach(i -> threadPool.submit(() -> { try { log.debug("Got:{}", linkedTransferQueue.take()); } catch (InterruptedException e) { e.printStackTrace(); } })); threadPool.shutdown(); threadPool.awaitTermination(1, TimeUnit.HOURS); stopWatch.stop(); log.info(stopWatch.prettyPrint()); } }
運行程序後發現沒有任何輸出,其實這是由於只有10個線程,生產者須要存的元素數量是4000大大超過了10,全部線程都在等待:
因而,他沒多想把線程池修改成了newCachedThreadPool,程序能夠正常執行了,看看運行結果:
這個代碼是很嚇人的,運行過程當中開啓了幾千個線程。咱們想一下緣由,其實newCachedThreadPool使用的是SynchronousQueue,在沒有可用線程的狀況下就會新建線程,而這個特性趕上了transfer()的特性,就會致使線程池建立幾千個線程。
即便咱們把代碼修改成使用LinkedBlockingQueue,配合newCachedThreadPool也會建立幾十個線程(若是元素數量足夠多,幾百個幾千個也有可能)。由於一旦阻塞,newCachedThreadPool就會絕不猶豫建立新線程。
對於生產者消費者這種任務,仍是建議直接使用線程來實現,生產者消費者的阻塞不相互干擾,並且線程池也是使用隊列來管理任務的,用了線程池至關於兩次隊列,沒有必要。
咱們來看一下此次實驗涉及到的一些阻塞隊列:
DelayQueue、SynchronousQueue和PriorityBlockingQueue是特種隊列,有特殊用途根據須要選擇。
LinkedTransferQueue也算是特種隊列,它能夠實現相似背壓的效果,在特殊場景下使用。
ArrayBlockingQueue和LinkedBlockingQueue背後的數據結構不一樣,它們多是咱們最經常使用的隊列了,區別以下:
非阻塞隊列ConcurrentLinkedQueue比較特殊,首先它不是阻塞隊列,其次它不使用鎖,而是使用CAS,在超高併發的場景下,顯然它能夠到達更好的性能。
這裏利用以前的代碼最後作了一次對比測試,這裏咱們沒有測試併發存取模式,由於消費者不知道什麼時候消費完畢,在消費不到數據的時候進行死循環意義不大:
因此在特殊的場景下,好比生產者生產好了數據扔到隊列中,有N多個消費者須要併發消費這個時或許能夠發揮ConcurrentLinkedQueue的威力(可是,以前也說過了,它的size()比較坑爹),常年處於空的隊列不太適合,這個時候使用阻塞隊列更合適。
好吧,看來90%的時候仍是用ArrayBlockingQueue和LinkedBlockingQueue太平,有界用前者,須要無界用後者,可是認真考慮下,你真的須要無界嗎。經過咱們的測試能夠發現這些隊列在高併發下都有着百萬以上的QPS性能,通常而言用哪一個都不會出現瓶頸,反而是咱們更應該注意由於阻塞致使的線程數量增多和隊列的容量佔用的內存。
本文中,咱們還花式使用了各類方式來測試隊列:
這裏想說的是,對於生產消費這樣的任務最好仍是使用阻塞隊列配置獨立的消費線程,生產者能夠直接是業務線程,而不是去使用線程池,沒有這個必要。
一樣,代碼見個人Github,歡迎clone後本身把玩,歡迎點贊。
歡迎關注個人微信公衆號:隨緣主人的園子