一文弄懂java中的Queue家族

java中Queue家族簡介java

簡介

java中Collection集合有三你們族List,Set和Queue。固然Map也算是一種集合類,但Map並不繼承Collection接口。安全

List,Set在咱們的工做中會常常使用,一般用來存儲結果數據,而Queue因爲它的特殊性,一般用在生產者消費者模式中。數據結構

如今很火的消息中間件好比:Rabbit MQ等都是Queue這種數據結構的展開。多線程

今天這篇文章將帶你們進入Queue家族。app

Queue接口

先看下Queue的繼承關係和其中定義的方法:ide

Queue繼承自Collection,Collection繼承自Iterable。spa

Queue有三類主要的方法,咱們用個表格來看一下他們的區別:線程

方法類型 方法名稱 方法名稱 區別
Insert add offer 兩個方法都表示向Queue中添加某個元素,不一樣之處在於添加失敗的狀況,add只會返回true,若是添加失敗,會拋出異常。offer在添加失敗的時候會返回false。因此對那些有固定長度的Queue,優先使用offer方法。
Remove remove poll 若是Queue是空的狀況下,remove會拋出異常,而poll會返回null。
Examine element peek 獲取Queue頭部的元素,但不從Queue中刪除。二者的區別仍是在於Queue爲空的狀況下,element會拋出異常,而peek返回null。
注意,由於對poll和peek來講null是有特殊含義的,因此通常來講Queue中禁止插入null,可是在實現中仍是有一些類容許插入null好比LinkedList。

儘管如此,咱們在使用中仍是要避免插入null元素。code

Queue的分類

通常來講Queue能夠分爲BlockingQueue,Deque和TransferQueue三種。中間件

BlockingQueue

BlockingQueue是Queue的一種實現,它提供了兩種額外的功能:

  1. 噹噹前Queue是空的時候,從BlockingQueue中獲取元素的操做會被阻塞。
  2. 噹噹前Queue達到最大容量的時候,插入BlockingQueue的操做會被阻塞。

BlockingQueue的操做能夠分爲下面四類:

操做類型 Throws exception Special value Blocks Times out
Insert add(e) offer(e) put(e) offer(e, time, unit)
Remove remove() poll() take() poll(time, unit)
Examine element() peek() not applicable not applicable

第一類是會拋出異常的操做,當遇到插入失敗,隊列爲空的時候拋出異常。

第二類是不會拋出異常的操做。

第三類是會Block的操做。當Queue爲空或者達到最大容量的時候。

第四類是time out的操做,在給定的時間裏會Block,超時會直接返回。

BlockingQueue是線程安全的Queue,能夠在生產者消費者模式的多線程中使用,以下所示:

class Producer implements Runnable {
   private final BlockingQueue queue;
   Producer(BlockingQueue q) { queue = q; }
   public void run() {
     try {
       while (true) { queue.put(produce()); }
     } catch (InterruptedException ex) { ... handle ...}
   }
   Object produce() { ... }
 }

 class Consumer implements Runnable {
   private final BlockingQueue queue;
   Consumer(BlockingQueue q) { queue = q; }
   public void run() {
     try {
       while (true) { consume(queue.take()); }
     } catch (InterruptedException ex) { ... handle ...}
   }
   void consume(Object x) { ... }
 }

 class Setup {
   void main() {
     BlockingQueue q = new SomeQueueImplementation();
     Producer p = new Producer(q);
     Consumer c1 = new Consumer(q);
     Consumer c2 = new Consumer(q);
     new Thread(p).start();
     new Thread(c1).start();
     new Thread(c2).start();
   }
 }

最後,在一個線程中向BlockQueue中插入元素以前的操做happens-before另一個線程中從BlockQueue中刪除或者獲取的操做。

Deque

Deque是Queue的子類,它表明double ended queue,也就是說能夠從Queue的頭部或者尾部插入和刪除元素。

一樣的,咱們也能夠將Deque的方法用下面的表格來表示,Deque的方法能夠分爲對頭部的操做和對尾部的操做:

方法類型 Throws exception Special value Throws exception Special value
Insert addFirst(e) offerFirst(e) addLast(e) offerLast(e)
Remove removeFirst() pollFirst() removeLast() pollLast()
Examine getFirst() peekFirst() getLast() peekLast()

和Queue的方法描述基本一致,這裏就很少講了。

當Deque以 FIFO (First-In-First-Out)的方法處理元素的時候,Deque就至關於一個Queue。

當Deque以LIFO (Last-In-First-Out)的方式處理元素的時候,Deque就至關於一個Stack。

TransferQueue

TransferQueue繼承自BlockingQueue,爲何叫Transfer呢?由於TransferQueue提供了一個transfer的方法,生產者能夠調用這個transfer方法,從而等待消費者調用take或者poll方法從Queue中拿取數據。

還提供了非阻塞和timeout版本的tryTransfer方法以供使用。

咱們舉個TransferQueue實現的生產者消費者的問題。

先定義一個生產者:

@Slf4j
@Data
@AllArgsConstructor
class Producer implements Runnable {
    private TransferQueue<String> transferQueue;

    private String name;

    private Integer messageCount;

    public static final AtomicInteger messageProduced = new AtomicInteger();

    @Override
    public void run() {
        for (int i = 0; i < messageCount; i++) {
            try {
                boolean added = transferQueue.tryTransfer( "第"+i+"個", 2000, TimeUnit.MILLISECONDS);
                log.info("transfered {} 是否成功: {}","第"+i+"個",added);
                if(added){
                    messageProduced.incrementAndGet();
                }
            } catch (InterruptedException e) {
                log.error(e.getMessage(),e);
            }
        }
        log.info("total transfered {}",messageProduced.get());
    }
}

在生產者的run方法中,咱們調用了tryTransfer方法,等待2秒鐘,若是沒成功則直接返回。

再定義一個消費者:

@Slf4j
@Data
@AllArgsConstructor
public class Consumer implements Runnable {

    private TransferQueue<String> transferQueue;

    private String name;

    private int messageCount;

    public static final AtomicInteger messageConsumed = new AtomicInteger();

    @Override
    public void run() {
        for (int i = 0; i < messageCount; i++) {
            try {
                String element = transferQueue.take();
                log.info("take {}",element );
                messageConsumed.incrementAndGet();
                Thread.sleep(500);
            } catch (InterruptedException e) {
                log.error(e.getMessage(),e);
            }
        }
        log.info("total consumed {}",messageConsumed.get());
    }

}

在run方法中,調用了transferQueue.take方法來取消息。

下面先看一下一個生產者,零個消費者的狀況:

@Test
    public void testOneProduceZeroConsumer() throws InterruptedException {

        TransferQueue<String> transferQueue = new LinkedTransferQueue<>();
        ExecutorService exService = Executors.newFixedThreadPool(10);
        Producer producer = new Producer(transferQueue, "ProducerOne", 5);

        exService.execute(producer);

        exService.awaitTermination(50000, TimeUnit.MILLISECONDS);
        exService.shutdown();
    }

輸出結果:

[pool-1-thread-1] INFO com.flydean.Producer - transfered 第0個 是否成功: false
[pool-1-thread-1] INFO com.flydean.Producer - transfered 第1個 是否成功: false
[pool-1-thread-1] INFO com.flydean.Producer - transfered 第2個 是否成功: false
[pool-1-thread-1] INFO com.flydean.Producer - transfered 第3個 是否成功: false
[pool-1-thread-1] INFO com.flydean.Producer - transfered 第4個 是否成功: false
[pool-1-thread-1] INFO com.flydean.Producer - total transfered 0

能夠看到,由於沒有消費者,因此消息並無發送成功。

再看下一個有消費者的狀況:

@Test
    public void testOneProduceOneConsumer() throws InterruptedException {

        TransferQueue<String> transferQueue = new LinkedTransferQueue<>();
        ExecutorService exService = Executors.newFixedThreadPool(10);
        Producer producer = new Producer(transferQueue, "ProducerOne", 2);
        Consumer consumer = new Consumer(transferQueue, "ConsumerOne", 2);

        exService.execute(producer);
        exService.execute(consumer);

        exService.awaitTermination(50000, TimeUnit.MILLISECONDS);
        exService.shutdown();
    }

輸出結果:

[pool-1-thread-2] INFO com.flydean.Consumer - take 第0個
[pool-1-thread-1] INFO com.flydean.Producer - transfered 第0個 是否成功: true
[pool-1-thread-2] INFO com.flydean.Consumer - take 第1個
[pool-1-thread-1] INFO com.flydean.Producer - transfered 第1個 是否成功: true
[pool-1-thread-1] INFO com.flydean.Producer - total transfered 2
[pool-1-thread-2] INFO com.flydean.Consumer - total consumed 2

能夠看到Producer和Consumer是一個一個來生產和消費的。

總結

本文介紹了Queue接口和它的三大分類,這三大分類又有很是多的實現類,咱們將會在後面的文章中再詳細介紹。

歡迎關注個人公衆號:程序那些事,更多精彩等着您!
更多內容請訪問 www.flydean.com
相關文章
相關標籤/搜索