阻塞隊列(BlockingQueue):支持2個附加操做的隊列。阻塞隊列經常使用於生產者和消費者的場景,生產者是往隊列中添加元素的線程,消費者是從隊列中獲取元素的線程。java
附加操做:數組
1)隊列爲空時,獲取元素的線程會等待隊列變爲非空ide
2)隊列爲滿時,存儲元素的線程會等待隊列可用。spa
ArrayBlockingQueue:由數組結構組成的 有界 阻塞隊列線程
LinkedBlockingQueue:由鏈表結構組成的 有界(但默認大小爲Integer.MAX_VALUE)阻塞隊列code
PriorityBlockingQueue:支持優先級排序的 無界 阻塞隊列blog
DelayQueue:使用優先級隊列實現的 延遲 無界 阻塞隊列排序
SynchronousQueue:不存儲元素 的阻塞隊列,也即單個元素的隊列隊列
LinkedTransferQueue:由鏈表結構組成的 無界 阻塞隊列element
LinkedBlockingDeque:由鏈表結構組成的 雙向 阻塞隊列
方法 | 拋出異常 | 返回特殊值 | 一直阻塞 | 超時退出 |
---|---|---|---|---|
插入 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除 | remove() | poll() | take() | poll(time,unit) |
檢查 | element() | peek() | 不可用 | 不可用 |
1)合法的add:add元素數量與規定元素數量一致
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class BlockingQueueDemo { public static void main(String[] args) { // 構造方法傳參是規定其長度 BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3); System.out.println(blockingQueue.add("1")); System.out.println(blockingQueue.add("2")); System.out.println(blockingQueue.add("3")); // true true true } }
2)異常( IllegalStateException )
當阻塞隊列滿時候,再往隊列裏插入元素,會拋出IllegalStateException("Queue full")異常
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class BlockingQueueDemo { public static void main(String[] args) { // 構造方法傳參是規定其長度 BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3); System.out.println(blockingQueue.add("1")); System.out.println(blockingQueue.add("2")); System.out.println(blockingQueue.add("3")); System.out.println(blockingQueue.add("4")); // java.lang.IllegalStateException: Queue full } }
1)若是隊列已滿,再插入元素,則返回false,而不會拋出異常
public static void main(String[] args) { // 構造方法傳參是規定其長度 BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3); System.out.println(blockingQueue.offer("1")); System.out.println(blockingQueue.offer("2")); System.out.println(blockingQueue.offer("3")); // true true true System.out.println(blockingQueue.offer("4")); // false }
2)設置超時時間
private static void offer() throws InterruptedException { BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3); // 設置超時時間爲2s,TimeUnit是時間顆粒度轉換,TimeUnit.SECOND表明秒 System.out.println(blockingQueue.offer("1", 2L, TimeUnit.SECONDS)); System.out.println(blockingQueue.offer("2", 2L, TimeUnit.SECONDS)); System.out.println(blockingQueue.offer("3", 2L, TimeUnit.SECONDS)); // true true true System.out.println(blockingQueue.offer("4", 2L, TimeUnit.SECONDS)); // false }
若是隊列已滿,再插入元素,就會阻塞,直至能夠添加進去爲止
public static void main(String[] args) { BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3); try { blockingQueue.put("1"); blockingQueue.put("2"); blockingQueue.put("3"); blockingQueue.put("4"); } catch (InterruptedException e) { e.printStackTrace(); } }
合法的remove
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class BlockingQueueDemo { public static void main(String[] args) { // 構造方法傳參是規定其長度 BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3); blockingQueue.add("1"); blockingQueue.add("2"); blockingQueue.add("3"); System.out.println(blockingQueue.remove()); System.out.println(blockingQueue.remove()); System.out.println(blockingQueue.remove()); // 1 2 3 System.out.println(blockingQueue.remove()); } }
異常(NoSuchElementException)
當隊列爲空時,從隊列裏獲取元素時會拋出NoSuchElementException異常
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class BlockingQueueDemo { public static void main(String[] args) { // 構造方法傳參是規定其長度 BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3); blockingQueue.add("1"); blockingQueue.add("2"); blockingQueue.add("3"); System.out.println(blockingQueue.remove()); System.out.println(blockingQueue.remove()); System.out.println(blockingQueue.remove()); System.out.println(blockingQueue.remove()); // java.util.NoSuchElementException } }
當隊列爲空時,從隊列裏獲取元素時直接返回null,不拋出異常
public static void main(String[] args) { BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3); blockingQueue.offer("1"); blockingQueue.offer("2"); blockingQueue.offer("3"); System.out.println(blockingQueue.poll()); System.out.println(blockingQueue.poll()); System.out.println(blockingQueue.poll()); // 1 2 3 System.out.println(blockingQueue.poll()); // null }
當隊列爲空時,從隊列裏獲取元素時阻塞,直至其中存在元素
public static void main(String[] args) throws Exception { BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3); blockingQueue.put("1"); blockingQueue.put("2"); blockingQueue.put("3"); System.out.println(blockingQueue.take()); System.out.println(blockingQueue.take()); System.out.println(blockingQueue.take()); System.out.println(blockingQueue.take()); }
若是隊列爲空,則拋異常NoSuchElementException
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class BlockingQueueDemo { public static void main(String[] args) { // 構造方法傳參是規定其長度 BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3); blockingQueue.add("1"); blockingQueue.add("2"); blockingQueue.add("3"); System.out.println(blockingQueue.element()); // 1 } }
若是隊列爲空,則返回null
public static void main(String[] args) { BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3); blockingQueue.offer("1"); blockingQueue.offer("2"); blockingQueue.offer("3"); System.out.println(blockingQueue.peek()); }
SynchronousQueue(同步隊列)模擬生產消費
import java.util.concurrent.BlockingQueue; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit; public class SynchronousQueueDemo { public static void main(String[] args) { // 同步隊列不存儲元素 BlockingQueue queue = new SynchronousQueue(); // 生產者線程 new Thread(() -> { try { System.out.println(Thread.currentThread().getName() + "\t put 1"); queue.put("1"); System.out.println(Thread.currentThread().getName() + "\t put 2"); queue.put("2"); System.out.println(Thread.currentThread().getName() + "\t put 3"); queue.put("3"); } catch (InterruptedException e) { e.printStackTrace(); } }, "provider").start(); // 消費者線程 new Thread(() -> { try { // 每次消費前先睡眠2秒鐘 TimeUnit.SECONDS.sleep(2); System.out.println(Thread.currentThread().getName() + "\t" + queue.take()); TimeUnit.SECONDS.sleep(2); System.out.println(Thread.currentThread().getName() + "\t" + queue.take()); TimeUnit.SECONDS.sleep(2); System.out.println(Thread.currentThread().getName() + "\t" + queue.take()); } catch (InterruptedException e) { e.printStackTrace(); } }, "consumer").start(); // 程序執行結果: // provider put 1 // consumer 1 // provider put 2 // consumer 2 // provider put 3 // consumer 3 } }