Java阻塞隊列(BlockingQueue)實現 生產者/消費者 示例html
本文由 TonySpark 翻譯自 Javarevisited。轉載請參見文章末尾的要求。java
Java.util.concurrent.BlockingQueue 是一個隊列實現類,支持這樣的操做:當從隊列中獲取或者移除元素時,若是隊列爲空,須要等待,直到隊列不爲空;同時若是向隊列中添加元素時,此時若是隊列無可用空間,也須要等待。安全
BlockingQueue 類不接收Null值,若是你試圖向隊列中存入Null值將拋出NullPointerException.架構
BlockingQueue的實現是線程安全的。全部隊列方法自己都是原子操做,使用併發控制的內部鎖或者其它形式。併發
BlockingQueue這個接口是Java集合架構的一部分,它主要用於解決生產者/消費者問題。在BlockingQueue中,咱們不用擔憂生產者操做時是否有可用空間或者消費者操做時是否有可用的對像而等待這樣的問題,這些都會在它的實現類中進行處理。ide
Java中提供了幾個對BlockingQueue的實現類,如: ArrayBlockingQueue, LinkedBlockingQueue, PriorityBlockingQueue, SynchronousQueue 等this
在處理生產者/消費者問題上 咱們將會使用ArrayBlockingQueue來實現,以下是咱們需知道的重要方法:spa
如今我們看看用BlockingQueue來解決生產者/消費者問題。線程
Message翻譯
Producer產生的普通Java對象,並添加到隊列中。
Message.java
1 package com.journaldev.concurrency; 2 3 public class Message { 4 private String msg; 5 6 public Message(String str){ 7 this.msg=str; 8 } 9 10 public String getMsg() { 11 return msg; 12 } 13 14 }
Producer
Producer這個類會產生消息並將其放入隊列中。
Producer.java
package com.journaldev.concurrency; import java.util.concurrent.BlockingQueue; public class Producer implements Runnable { private BlockingQueue<Message> queue; public Producer(BlockingQueue<Message> q){ this.queue=q; } @Override public void run() { //生產消息 for(int i=0; i<100; i++){ Message msg = new Message(""+i); try { Thread.sleep(i); queue.put(msg); System.out.println("Produced "+msg.getMsg()); } catch (InterruptedException e) { e.printStackTrace(); } } //添加退出消息 Message msg = new Message("exit"); try { queue.put(msg); } catch (InterruptedException e) { e.printStackTrace(); } } }
Consumer
Consumer類會從隊列獲取消息進行處理。若是獲取的是退出消息則結束。
Consumer.java
package com.journaldev.concurrency; import java.util.concurrent.BlockingQueue; public class Consumer implements Runnable{ private BlockingQueue<Message> queue; public Consumer(BlockingQueue<Message> q){ this.queue=q; } @Override public void run() { try{ Message msg; //獲取並處理消息直到接收到「exit」消息 while((msg = queue.take()).getMsg() !="exit"){ Thread.sleep(10); System.out.println("Consumed "+msg.getMsg()); } }catch(InterruptedException e) { e.printStackTrace(); } } }
生產者/消費者的服務類將會產生固定大小的BlockingQueue,生產者和消費者同時共享該BlockingQueue,該服務類會起啓動生產者和消費者線程。
ProducerConsumerService.java
1 package com.journaldev.concurrency; 2 3 4 import java.util.concurrent.ArrayBlockingQueue; 5 import java.util.concurrent.BlockingQueue; 6 7 8 public class ProducerConsumerService { 9 10 public static void main(String[] args) { 11 //建立大小爲10的 BlockingQueue 12 BlockingQueue<Message> queue = new ArrayBlockingQueue<>(10); 13 Producer producer = new Producer(queue); 14 Consumer consumer = new Consumer(queue); 15 //開啓 producer線程向隊列中生產消息 16 new Thread(producer).start(); 17 //開啓 consumer線程 中隊列中消費消息 18 new Thread(consumer).start(); 19 System.out.println("Producer and Consumer has been started"); 20 } 21 22 }
上面程序的運行結果:
1 Producer and Consumer has been started 2 Produced 0 3 Produced 1 4 Produced 2 5 Produced 3 6 Produced 4 7 Consumed 0 8 Produced 5 9 Consumed 1 10 Produced 6 11 Produced 7 12 Consumed 2 13 Produced 8 14 ...
Thread sleep 使得生產者/消費者 生產、消費這此消息有必定的延遲。
原文連接: Javarevisited 翻譯: TonySpark
譯文連接: http://www.cnblogs.com/tonyspark/p/3722013.html
[ 轉載請保留原文出處、譯者和譯文連接。]