package com.thread.blockingqueue.test2; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class ProducerConsumerService { public static void main(String[] args) { //建立大小爲10的 BlockingQueue BlockingQueue<Message> queue = new ArrayBlockingQueue<>(10); Producer producer = new Producer(queue); //這裏Thread.sleep(500);的休眠時間大於Consumer的時間 Consumer consumer = new Consumer(queue); //開啓 producer線程向隊列中生產消息 new Thread(producer).start(); //開啓 consumer線程 中隊列中消費消息 new Thread(consumer).start(); System.out.println("Producer and Consumer has been started"); } }
package com.thread.blockingqueue.test2; import java.util.concurrent.BlockingQueue; public class Producer implements Runnable { private BlockingQueue<Message> queue; public Producer(BlockingQueue<Message> q){ this.queue=q; } @Override public void run() { //生產消息 for(int i=0; i<50; i++){ Message msg = new Message(""+i); try { Thread.sleep(500); queue.put(msg); System.out.println("Produced "+msg.getMsg()); } catch (InterruptedException e) { e.printStackTrace(); } } //添加退出消息 Message msg = new Message("exit"); try { queue.put(msg); } catch (InterruptedException e) { e.printStackTrace(); } } }
package com.thread.blockingqueue.test2; import java.util.concurrent.BlockingQueue; public class Consumer implements Runnable { private BlockingQueue<Message> queue; public Consumer(BlockingQueue<Message> q) { this.queue = q; } @Override public void run() { try { Message msg; // 獲取並處理消息直到接收到「exit」消息 while ((msg = queue.take()).getMsg() != "exit") { Thread.sleep(10); System.out.println("Consumed " + msg.getMsg()); } } catch (InterruptedException e) { e.printStackTrace(); } } }
package com.thread.blockingqueue.test2; public class Message { private String msg; public Message(String str){ this.msg=str; } public String getMsg() { return msg; } }