本文首發於一世流雲專欄: https://segmentfault.com/blog...
從本節開始,咱們將介紹juc-collections框架中的「阻塞隊列」部分。阻塞隊列在實際應用中很是普遍,許多消息中間件中定義的隊列,一般就是一種「阻塞隊列」。算法
那麼「阻塞隊列」和咱們以前討論過的ConcurrentLinkedQueue、ConcurrentLinkedDeque有什麼不一樣呢?
ConcurrentLinkedQueue
和ConcurrentLinkedDeque
是以非阻塞算法實現的高性能隊列,其使用場景通常是在併發環境下,須要「隊列」/「棧」這類數據結構時纔會使用;而「阻塞隊列」一般利用了「鎖」來實現,也就是會阻塞調用線程,其使用場景通常是在「生產者-消費者」模式中,用於線程之間的數據交換或系統解耦。segmentfault
在Java多線程基礎(七)——Producer-Consumer模式中,咱們曾簡要的談到過「生產者-消費者」這種模式。在這種模式中,「生產者」和「消費者」是相互獨立的,二者之間的通訊須要依靠一個隊列。這個隊列,其實就是本文中的「阻塞隊列」。安全
引入「阻塞隊列」的最大好處就是解耦,在軟件工程中,「高內聚,低耦合」是進行模塊設計的準則之一,這樣「生產者」和「消費者」實際上是互不影響的,未來任意一方須要升級時,能夠保證系統的平滑過渡。數據結構
BlockingQueue
是在JDK1.5時,隨着J.U.C引入的一個接口:多線程
BlockingQueue繼承了Queue接口,提供了一些阻塞方法,主要做用以下:併發
既然BlockingQueue是一種隊列,因此也具有隊列的三種基本方法:插入、刪除、讀取:框架
操做類型 | 拋出異常 | 返回特殊值 | 阻塞線程 | 超時 |
---|---|---|---|---|
插入 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
刪除 | remove() | poll() | take() | poll(time, unit) |
讀取 | element() | peek() | / | / |
能夠看到,對於每種基本方法,「拋出異常」和「返回特殊值」的方法定義和Queue是徹底同樣的。BlockingQueue只是增長了兩類和阻塞相關的方法:put(e)
、take()
;offer(e, time, unit)
、poll(time, unit)
。dom
put(e)和take()方法會一直阻塞調用線程,直到線程被中斷或隊列狀態可用;
offer(e, time, unit)和poll(time, unit)方法會限時阻塞調用線程,直到超時或線程被中斷或隊列狀態可用。ide
public interface BlockingQueue<E> extends Queue<E> { /** * 插入元素e至隊尾, 若是隊列已滿, 則阻塞調用線程直到隊列有空閒空間. */ void put(E e) throws InterruptedException; /** * 插入元素e至隊列, 若是隊列已滿, 則限時阻塞調用線程,直到隊列有空閒空間或超時. */ boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException; /** * 從隊首刪除元素,若是隊列爲空, 則阻塞調用線程直到隊列中有元素. */ E take() throws InterruptedException; /** * 從隊首刪除元素,若是隊列爲空, 則限時阻塞調用線程,直到隊列中有元素或超時. */ E poll(long timeout, TimeUnit unit) throws InterruptedException; // ... }
除此以外,BlockingQueue還具備如下特色:性能
最後,咱們來看下如何利用BlockingQueue來實現生產者-消費者模式。在生產者-消費者模式中,一共有四類角色:生產者、消費者、消息隊列、消息體。咱們利用BlockingQueue來實現消息隊列,其他部分沒有什麼變化。
生產者生產消息體(Data),並將消息體(Data)傳遞給通道(Channel)。
/** * 生產者 */ public class Producer implements Runnable { private Channel channel; public Producer(Channel channel) { this.channel = channel; } @Override public void run() { while (true) { String v = String.valueOf(ThreadLocalRandom.current().nextInt()); Data data = new Data(v); try { channel.put(data); System.out.println(Thread.currentThread().getName() + " produce :" + data); } catch (InterruptedException e) { e.printStackTrace(); } Thread.yield(); } } }
消費者從通道(Channel)中獲取數據,進行處理。
/** * 消費者 */ public class Consumer implements Runnable { private final Channel channel; public Consumer(Channel channel) { this.channel = channel; } @Override public void run() { while (true) { try { Object obj = channel.take(); System.out.println(Thread.currentThread().getName() + " consume :" + obj.toString()); } catch (InterruptedException e) { e.printStackTrace(); } Thread.yield(); } } }
至關於消息的隊列,對消息進行排隊,控制消息的傳輸。
/** * 通道類 */ public class Channel { private final BlockingQueue blockingQueue; public Channel(BlockingQueue blockingQueue) { this.blockingQueue = blockingQueue; } public Object take() throws InterruptedException { return blockingQueue.take(); } public void put(Object o) throws InterruptedException { blockingQueue.put(o); } }
Data表明了實際生產或消費的數據。
/** * 數據/消息 */ public class Data<T> implements Serializable { private T data; public Data(T data) { this.data = data; } public T getData() { return data; } public void setData(T data) { this.data = data; } @Override public String toString() { return "Data{" + "data=" + data + '}'; } }
調用以下:
public class Main { public static void main(String[] args) { BlockingQueue blockingQueue = new SomeQueueImplementation(); Channel channel = new Channel(blockingQueue); Producer p = new Producer(channel); Consumer c1 = new Consumer(channel); Consumer c2 = new Consumer(channel); new Thread(p).start(); new Thread(c1).start(); new Thread(c2).start(); } }