阻塞隊列(BlockingQueue)是指當隊列滿時,隊列會阻塞插入元素的線程,直到隊列不滿;當隊列空時,隊列會阻塞得到元素的線程,直到隊列變非空。阻塞隊列就是生產者用來存放元素、消費者用來獲取元素的容器。php
當線程 插入/獲取 動做因爲隊列 滿/空 阻塞後,隊列也提供了一些機制去處理,或拋出異常,或返回特殊值,或者線程一直等待...html
方法/處理方式 | 拋出異常 | 返回特殊值 | 一直阻塞 | 超時退出 |
---|---|---|---|---|
插入方法 | add(e) | offer(e) | put(e) | offer(e, timeout, unit) |
移除方法 | remove(o) | poll() | take() | poll(timeout, unit) |
檢查方法 | element() | peek() — 不移除元素 | 不可用 | 不可用 |
tips: 若是是無界阻塞隊列,則 put 方法永遠不會被阻塞;offer 方法始終返回 true。java
Java 中的阻塞隊列:python
ArrayBlockingQueue 是一個用數組實現的有界阻塞隊列。此隊列按照先進先出(FIFO)的原則對元素進行排序,默認狀況下不保證線程公平的訪問。c++
經過可重入的獨佔鎖 ReentrantLock 來控制併發,Condition 來實現阻塞。編程
public class ArrayBlockingQueueTest { /** * 1. 因爲是有界阻塞隊列,須要設置初始大小 * 2. 默認不保證阻塞線程的公平訪問,可設置公平性 */ private static ArrayBlockingQueue<String> QUEUE = new ArrayBlockingQueue<>(2, true); public static void main(String[] args) throws InterruptedException { Thread put = new Thread(() -> { // 3. 嘗試插入元素 try { QUEUE.put("java"); QUEUE.put("javaScript"); // 4. 元素已滿,會阻塞線程 QUEUE.put("c++"); } catch (InterruptedException e) { e.printStackTrace(); } }); put.start(); Thread take = new Thread(() -> { try { // 5. 獲取一個元素 System.out.println(QUEUE.take()); } catch (InterruptedException e) { e.printStackTrace(); } }); take.start(); // 6 javaScript、c++ System.out.println(QUEUE.take()); System.out.println(QUEUE.take()); } }
LinkedBlockingQueue 是一個用單向鏈表實現的有界阻塞隊列。此隊列的默認最大長度爲 Integer.MAX_VALUE。此隊列按照先進先出的原則對元素進行排序,吞吐量一般要高於ArrayBlockingQueue。Executors.newFixedThreadPool() 就使用了這個隊列。數組
和 ArrayBlockingQueue 同樣,採用 ReentrantLock 來控制併發,不一樣的是它使用了兩個獨佔鎖來控制消費和生產,經過 takeLock 和 putLock 兩個鎖來控制生產和消費,互不干擾,只要隊列未滿,生產線程能夠一直生產;只要隊列不空,消費線程能夠一直消費,不會相互由於獨佔鎖而阻塞。緩存
tips:由於使用了雙鎖,避免併發計算不許確,使用了一個 AtomicInteger 變量統計元素總量。併發
LinkedBlockingDeque 是一個由雙向鏈表結構組成的有界阻塞隊列,能夠從隊列的兩端插入和移出元素。它實現了BlockingDeque接口,多了addFirst、addLast、offerFirst、offerLast、peekFirst和peekLast等方法,以 First 單詞結尾的方法,表示插入、獲取或移除雙端隊列的第一個元素。以 Last 單詞結尾的方法,表示插入、獲取或移除雙端隊列的最後一個元素。dom
LinkedBlockingDeque 的 Node 實現多了指向前一個節點的變量 prev,以此實現雙向隊列。併發控制上和 ArrayBlockingQueue 相似,採用單個 ReentrantLock 來控制併發。由於雙端隊列頭尾均可以消費和生產,因此使用了一個共享鎖。
雙向阻塞隊列能夠運用在「工做竊取」模式中。
public class LinkedBlockingDequeTest { private static LinkedBlockingDeque<String> DEQUE = new LinkedBlockingDeque<>(2); public static void main(String[] args) { DEQUE.addFirst("java"); DEQUE.addFirst("c++"); // java System.out.println(DEQUE.peekLast()); // java System.out.println(DEQUE.pollLast()); DEQUE.addLast("php"); // c++ System.out.println(DEQUE.pollFirst()); } }
tips: take() 方法調用的是 takeFirst(),使用時候需注意。
PriorityBlockingQueue 是一個底層由數組實現的無界阻塞隊列,並帶有排序功能。因爲是無界隊列,因此插入永遠不會被阻塞。默認狀況下元素採起天然順序升序排列。也能夠自定義類實現 compareTo()方法來指定元素排序規則,或者初始化 PriorityBlockingQueue 時,指定構造參數 Comparator 來對元素進行排序。
底層一樣採用 ReentrantLock 來控制併發,因爲只有獲取會阻塞,因此只採用一個Condition(只通知消費)來實現。
public class PriorityBlockingQueueTest { private static PriorityBlockingQueue<String> QUEUE = new PriorityBlockingQueue<>(); public static void main(String[] args) { QUEUE.add("java"); QUEUE.add("javaScript"); QUEUE.add("c++"); QUEUE.add("python"); QUEUE.add("php"); Iterator<String> it = QUEUE.iterator(); while (it.hasNext()) { // c++ javaScript java python php // 同優先級不保證排序順序 System.out.print(it.next() + " "); } } }
DelayQueue 是一個支持延時獲取元素的無界阻塞隊列。隊列使用 PriorityQueue 來實現。隊列中的元素必須實現 Delayed 接口(Delayed 接口的設計能夠參考 ScheduledFutureTask 類),元素按延遲優先級排序,延遲時間短的排在前面,只有在延遲期滿時才能從隊列中提取元素。
DelayQueue 中的 PriorityQueue 會對隊列中的任務進行排序。排序時,time 小的排在前面(時間早的任務將被先執行)。若是兩個任務的 time 相同,就比較 sequenceNumber,sequenceNumber 小的排在前面(也就是說,若是兩個任務的執行時間相同,那麼先提交的任務將被先執行)。
和 PriorityBlockingQueue 類似,底層也是數組,採用一個 ReentrantLock 來控制併發。
應用場景:
public class DelayElement implements Delayed, Runnable { private static final AtomicLong SEQUENCER = new AtomicLong(); /** * 標識元素前後順序 */ private final long sequenceNumber; /** * 延遲時間,單位納秒 */ private long time; public DelayElement(long time) { this.time = System.nanoTime() + time; this.sequenceNumber = SEQUENCER.getAndIncrement(); } @Override public long getDelay(TimeUnit unit) { return unit.convert(time - System.nanoTime(), NANOSECONDS); } @Override public int compareTo(Delayed other) { // compare zero if same object if (other == this) { return 0; } if (other instanceof DelayElement) { DelayElement x = (DelayElement) other; long diff = time - x.time; if (diff < 0) { return -1; } else if (diff > 0) { return 1; } else if (sequenceNumber < x.sequenceNumber) { return -1; } else { return 1; } } long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS); return (diff < 0) ? -1 : (diff > 0) ? 1 : 0; } @Override public void run() { System.out.println("sequenceNumber" + sequenceNumber); } @Override public String toString() { return "DelayElement{" + "sequenceNumber=" + sequenceNumber + ", time=" + time + '}'; } }
public class DelayQueueTest { private static DelayQueue<DelayElement> QUEUE = new DelayQueue<>(); public static void main(String[] args) { // 1. 添加 10 個參數 for (int i = 1; i < 10; i++) { // 2. 5 秒內隨機延遲 int nextInt = new Random().nextInt(5); long convert = TimeUnit.NANOSECONDS.convert(nextInt, TimeUnit.SECONDS); QUEUE.offer(new DelayElement(convert)); } // 3. 查詢元素排序 —— 延遲短的排在前面 Iterator<DelayElement> iterator = QUEUE.iterator(); while (iterator.hasNext()) { System.out.println(iterator.next()); } // 4. 可觀察到元素延遲輸出 while (!QUEUE.isEmpty()) { Thread thread = new Thread(QUEUE.poll()); thread.start(); } } }
LinkedTransferQueue是一個由鏈表結構組成的無界阻塞TransferQueue隊列。
併發控制上採用了大量的 CAS 操做,沒有使用鎖。
相對於其餘阻塞隊列,LinkedTransferQueue 多了 tryTransfer 和 transfer 方法。
SynchronousQueue 是一個不存儲元素的阻塞隊列。每個 put 操做必須等待一個 take 操做,不然繼續 put 操做會被阻塞。Executors.newCachedThreadPool 就使用了這個隊列。
SynchronousQueue 默認狀況下線程採用非公平性策略訪問隊列,未使用鎖,所有經過 CAS 操做來實現併發,吞吐量很是高,高於 LinkedBlockingQueue 和 ArrayBlockingQueue,很是適合用來處理一些高效的傳遞性場景。Executors.newCachedThreadPool() 就使用了 SynchronousQueue 進行任務傳遞。
public class SynchronousQueueTest { private static class SynchronousQueueProducer implements Runnable { private BlockingQueue<String> blockingQueue; private SynchronousQueueProducer(BlockingQueue<String> queue) { this.blockingQueue = queue; } @Override public void run() { while (true) { try { String data = UUID.randomUUID().toString(); System.out.println(Thread.currentThread().getName() + " Put: " + data); blockingQueue.put(data); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } } private static class SynchronousQueueConsumer implements Runnable { private BlockingQueue<String> blockingQueue; private SynchronousQueueConsumer(BlockingQueue<String> queue) { this.blockingQueue = queue; } @Override public void run() { while (true) { try { System.out.println(Thread.currentThread().getName() + " take(): " + blockingQueue.take()); Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } } } public static void main(String[] args) { final BlockingQueue<String> synchronousQueue = new SynchronousQueue<>(); SynchronousQueueProducer queueProducer = new SynchronousQueueProducer(synchronousQueue); new Thread(queueProducer, "producer - 1").start(); SynchronousQueueConsumer queueConsumer1 = new SynchronousQueueConsumer(synchronousQueue); new Thread(queueConsumer1, "consumer — 1").start(); SynchronousQueueConsumer queueConsumer2 = new SynchronousQueueConsumer(synchronousQueue); new Thread(queueConsumer2, "consumer — 2").start(); } }