生產者消費者模式是一個典型的併發編程須要考慮的問題, 經過一箇中間容器來解決生產者和消費者的強耦合特性。 生產者和消費者彼此之間不直接通信,而經過阻塞隊列來進行通信,因此生產者生產完數據以後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力java
其特色大體以下:編程
碼農的世界裏面向來是Talk is cheap, show me the code.數組
筆者寫了一個小demo,緩衝區用的是BlockingQueue 接口的實現類,java.util.concurrent包下具備ArrayBlockingQueue, DelayQueue, LinkedBlockingQueue, PriorityBlockingQueue, SynchronousQueue.併發
示例中使用的是ArrayBlockingQueue, 一個數組實現的有界阻塞隊列。有界也就意味着,它不可以存儲無限多數量的元素。它有一個同一時間可以存儲元素數量的上限。你能夠在對其初始化的時候設定這個上限,但以後就沒法對這個上限進行修改了。一旦初始化,大小就沒法修改。dom
package demo; import java.util.Random; import java.util.concurrent.ArrayBlockingQueue; public class Producer implements Runnable { private ArrayBlockingQueue<Integer> myQueue; private Random random = new Random(); public ArrayBlockingQueue<Integer> getMyQueue() { return myQueue; } public void setMyQueue(ArrayBlockingQueue<Integer> myQueue) { this.myQueue = myQueue; } @Override public void run() { int tmp =-1; try { while (true) { long millis = (long) (Math.random() * 6000); // 模擬耗時計算 Thread.sleep(millis); int element = random.nextInt(2000); myQueue.put(element); tmp = myQueue.size(); String msg = Thread.currentThread().getName() + " is producing data : " + element; msg += "\r\n"; msg += "Current queue size : " + tmp; System.out.println(msg); } } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
package demo; import java.util.concurrent.ArrayBlockingQueue; public class Consumer implements Runnable { private ArrayBlockingQueue<Integer> myQueue; public ArrayBlockingQueue<Integer> getMyQueue() { return myQueue; } public void setMyQueue(ArrayBlockingQueue<Integer> myQueue) { this.myQueue = myQueue; } @Override public void run() { int element = -1; int tmp = -1; try { while (true) { long millis = (long) (Math.random() * 5000); // 模擬複雜計算 Thread.sleep(millis); element = myQueue.take(); tmp = myQueue.size(); // 延遲一毫秒寫日誌 Thread.sleep(1); String msg = Thread.currentThread().getName() + " is consuming data : " + element; msg += "\r\n"; msg += "Current queue size : " + tmp; System.out.println(msg); } } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
package demo; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class Main { private static final int SIZE = 10; private static ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(SIZE); public static void main(String[] args) { // TODO Auto-generated method stub ExecutorService producerPool = Executors.newFixedThreadPool(3); ExecutorService consumerPool = Executors.newFixedThreadPool(2); Producer p1 = new Producer(); p1.setMyQueue(queue); Producer p2 = new Producer(); p2.setMyQueue(queue); Producer p3 = new Producer(); p3.setMyQueue(queue); producerPool.execute(p1); producerPool.execute(p2); producerPool.execute(p3); Consumer c1 = new Consumer(); c1.setMyQueue(queue); Consumer c2 = new Consumer(); c2.setMyQueue(queue); consumerPool.execute(c1); consumerPool.execute(c2); //consumerPool.shutdown(); //producerPool.shutdown(); } }