java中Queue家族簡介java
java中Collection集合有三你們族List,Set和Queue。固然Map也算是一種集合類,但Map並不繼承Collection接口。安全
List,Set在咱們的工做中會常常使用,一般用來存儲結果數據,而Queue因爲它的特殊性,一般用在生產者消費者模式中。數據結構
如今很火的消息中間件好比:Rabbit MQ等都是Queue這種數據結構的展開。多線程
今天這篇文章將帶你們進入Queue家族。app
先看下Queue的繼承關係和其中定義的方法:ide
Queue繼承自Collection,Collection繼承自Iterable。spa
Queue有三類主要的方法,咱們用個表格來看一下他們的區別:線程
方法類型 | 方法名稱 | 方法名稱 | 區別 |
---|---|---|---|
Insert | add | offer | 兩個方法都表示向Queue中添加某個元素,不一樣之處在於添加失敗的狀況,add只會返回true,若是添加失敗,會拋出異常。offer在添加失敗的時候會返回false。因此對那些有固定長度的Queue,優先使用offer方法。 |
Remove | remove | poll | 若是Queue是空的狀況下,remove會拋出異常,而poll會返回null。 |
Examine | element | peek | 獲取Queue頭部的元素,但不從Queue中刪除。二者的區別仍是在於Queue爲空的狀況下,element會拋出異常,而peek返回null。 |
注意,由於對poll和peek來講null是有特殊含義的,因此通常來講Queue中禁止插入null,可是在實現中仍是有一些類容許插入null好比LinkedList。儘管如此,咱們在使用中仍是要避免插入null元素。code
通常來講Queue能夠分爲BlockingQueue,Deque和TransferQueue三種。中間件
BlockingQueue是Queue的一種實現,它提供了兩種額外的功能:
BlockingQueue的操做能夠分爲下面四類:
操做類型 | Throws exception | Special value | Blocks | Times out |
---|---|---|---|---|
Insert | add(e) | offer(e) | put(e) | offer(e, time, unit) |
Remove | remove() | poll() | take() | poll(time, unit) |
Examine | element() | peek() | not applicable | not applicable |
第一類是會拋出異常的操做,當遇到插入失敗,隊列爲空的時候拋出異常。
第二類是不會拋出異常的操做。
第三類是會Block的操做。當Queue爲空或者達到最大容量的時候。
第四類是time out的操做,在給定的時間裏會Block,超時會直接返回。
BlockingQueue是線程安全的Queue,能夠在生產者消費者模式的多線程中使用,以下所示:
class Producer implements Runnable { private final BlockingQueue queue; Producer(BlockingQueue q) { queue = q; } public void run() { try { while (true) { queue.put(produce()); } } catch (InterruptedException ex) { ... handle ...} } Object produce() { ... } } class Consumer implements Runnable { private final BlockingQueue queue; Consumer(BlockingQueue q) { queue = q; } public void run() { try { while (true) { consume(queue.take()); } } catch (InterruptedException ex) { ... handle ...} } void consume(Object x) { ... } } class Setup { void main() { BlockingQueue q = new SomeQueueImplementation(); Producer p = new Producer(q); Consumer c1 = new Consumer(q); Consumer c2 = new Consumer(q); new Thread(p).start(); new Thread(c1).start(); new Thread(c2).start(); } }
最後,在一個線程中向BlockQueue中插入元素以前的操做happens-before另一個線程中從BlockQueue中刪除或者獲取的操做。
Deque是Queue的子類,它表明double ended queue,也就是說能夠從Queue的頭部或者尾部插入和刪除元素。
一樣的,咱們也能夠將Deque的方法用下面的表格來表示,Deque的方法能夠分爲對頭部的操做和對尾部的操做:
方法類型 | Throws exception | Special value | Throws exception | Special value |
---|---|---|---|---|
Insert | addFirst(e) | offerFirst(e) | addLast(e) | offerLast(e) |
Remove | removeFirst() | pollFirst() | removeLast() | pollLast() |
Examine | getFirst() | peekFirst() | getLast() | peekLast() |
和Queue的方法描述基本一致,這裏就很少講了。
當Deque以 FIFO (First-In-First-Out)的方法處理元素的時候,Deque就至關於一個Queue。
當Deque以LIFO (Last-In-First-Out)的方式處理元素的時候,Deque就至關於一個Stack。
TransferQueue繼承自BlockingQueue,爲何叫Transfer呢?由於TransferQueue提供了一個transfer的方法,生產者能夠調用這個transfer方法,從而等待消費者調用take或者poll方法從Queue中拿取數據。
還提供了非阻塞和timeout版本的tryTransfer方法以供使用。
咱們舉個TransferQueue實現的生產者消費者的問題。
先定義一個生產者:
@Slf4j @Data @AllArgsConstructor class Producer implements Runnable { private TransferQueue<String> transferQueue; private String name; private Integer messageCount; public static final AtomicInteger messageProduced = new AtomicInteger(); @Override public void run() { for (int i = 0; i < messageCount; i++) { try { boolean added = transferQueue.tryTransfer( "第"+i+"個", 2000, TimeUnit.MILLISECONDS); log.info("transfered {} 是否成功: {}","第"+i+"個",added); if(added){ messageProduced.incrementAndGet(); } } catch (InterruptedException e) { log.error(e.getMessage(),e); } } log.info("total transfered {}",messageProduced.get()); } }
在生產者的run方法中,咱們調用了tryTransfer方法,等待2秒鐘,若是沒成功則直接返回。
再定義一個消費者:
@Slf4j @Data @AllArgsConstructor public class Consumer implements Runnable { private TransferQueue<String> transferQueue; private String name; private int messageCount; public static final AtomicInteger messageConsumed = new AtomicInteger(); @Override public void run() { for (int i = 0; i < messageCount; i++) { try { String element = transferQueue.take(); log.info("take {}",element ); messageConsumed.incrementAndGet(); Thread.sleep(500); } catch (InterruptedException e) { log.error(e.getMessage(),e); } } log.info("total consumed {}",messageConsumed.get()); } }
在run方法中,調用了transferQueue.take方法來取消息。
下面先看一下一個生產者,零個消費者的狀況:
@Test public void testOneProduceZeroConsumer() throws InterruptedException { TransferQueue<String> transferQueue = new LinkedTransferQueue<>(); ExecutorService exService = Executors.newFixedThreadPool(10); Producer producer = new Producer(transferQueue, "ProducerOne", 5); exService.execute(producer); exService.awaitTermination(50000, TimeUnit.MILLISECONDS); exService.shutdown(); }
輸出結果:
[pool-1-thread-1] INFO com.flydean.Producer - transfered 第0個 是否成功: false [pool-1-thread-1] INFO com.flydean.Producer - transfered 第1個 是否成功: false [pool-1-thread-1] INFO com.flydean.Producer - transfered 第2個 是否成功: false [pool-1-thread-1] INFO com.flydean.Producer - transfered 第3個 是否成功: false [pool-1-thread-1] INFO com.flydean.Producer - transfered 第4個 是否成功: false [pool-1-thread-1] INFO com.flydean.Producer - total transfered 0
能夠看到,由於沒有消費者,因此消息並無發送成功。
再看下一個有消費者的狀況:
@Test public void testOneProduceOneConsumer() throws InterruptedException { TransferQueue<String> transferQueue = new LinkedTransferQueue<>(); ExecutorService exService = Executors.newFixedThreadPool(10); Producer producer = new Producer(transferQueue, "ProducerOne", 2); Consumer consumer = new Consumer(transferQueue, "ConsumerOne", 2); exService.execute(producer); exService.execute(consumer); exService.awaitTermination(50000, TimeUnit.MILLISECONDS); exService.shutdown(); }
輸出結果:
[pool-1-thread-2] INFO com.flydean.Consumer - take 第0個 [pool-1-thread-1] INFO com.flydean.Producer - transfered 第0個 是否成功: true [pool-1-thread-2] INFO com.flydean.Consumer - take 第1個 [pool-1-thread-1] INFO com.flydean.Producer - transfered 第1個 是否成功: true [pool-1-thread-1] INFO com.flydean.Producer - total transfered 2 [pool-1-thread-2] INFO com.flydean.Consumer - total consumed 2
能夠看到Producer和Consumer是一個一個來生產和消費的。
本文介紹了Queue接口和它的三大分類,這三大分類又有很是多的實現類,咱們將會在後面的文章中再詳細介紹。
歡迎關注個人公衆號:程序那些事,更多精彩等着您!
更多內容請訪問 www.flydean.com