1. 爲何使用生產者消費者模式java
(1)解耦合。消費者只關心隊列裏面取出來的數據,不用關心數據的來源。好比,生產者服務的域名,url這些變動。異步
(2)支持異步。生產者生產出來數據,直接放入隊列就行了,接着生產下一個數據,沒必要等待。好比廚師作菜的時候,只須要把作好的菜放到傳送帶就接着作下一道菜。不須要有等有顧客過來把這個菜領走在作下一道;效率更高。ide
(3)流量削峯。雙十一零點那一刻,qps會飆升。若是爲了這一小會的時間,增長機器不划算,由於平時的時候,這些機器足夠用。那我能夠吧這些請求放到一個隊列,服務從隊列中拿出請求,運算後返回給客戶端。this
2. 生產者消費者圖示url
3. 代碼實現spa
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; public class ProducerConsumer<E> { private int queueSize = 5; private int producerNum = 2; private int consumerNum = 2; //建立一個阻塞隊列 private LinkedBlockingQueue<E> blockingQueue = null; //生產者線程池 private ExecutorService producerTheadPool = null; //消費者線程池 private ExecutorService consumerTheadPool = null; public ProducerConsumer(){ blockingQueue = new LinkedBlockingQueue<>(queueSize); producerTheadPool = Executors.newFixedThreadPool(producerNum); consumerTheadPool = Executors.newFixedThreadPool(consumerNum); } public ProducerConsumer(int queueSize, int producerNum, int consumerNum){ blockingQueue = new LinkedBlockingQueue<>(queueSize); producerTheadPool = Executors.newFixedThreadPool(producerNum); consumerTheadPool = Executors.newFixedThreadPool(consumerNum); } public void produceEleAsync(E ele){ if(!checkSuccess()){ return; } Producer<E> producer = new Producer<E>(this.blockingQueue, ele); producerTheadPool.execute(producer); } //執行消費過程 public void consumeEleAsync() { if(!checkSuccess()){ return; } new Thread(new Runnable() { @Override public void run() { while(true){ try { E ele = blockingQueue.take();//阻塞獲取數據 Consumer<E> consumer = new Consumer<E>(ele); consumerTheadPool.execute(consumer); } catch (InterruptedException e) { e.printStackTrace(); } } } }).start(); } //判空檢查 private boolean checkSuccess(){ if(blockingQueue!=null && producerTheadPool!=null && consumerTheadPool!=null){ return true; } return false; } //生產者 private class Producer<E> implements Runnable{ private LinkedBlockingQueue<E> blockingQueue; private E ele; public Producer(LinkedBlockingQueue<E> blockingQueue, E ele){ this.blockingQueue = blockingQueue; this.ele = ele; } @Override public void run() { if(this.blockingQueue!=null && ele!=null){ try { this.blockingQueue.put(ele); } catch (InterruptedException e) { e.getStackTrace(); } } } } //消費者 private class Consumer<E> implements Runnable{ private E ele; public Consumer(E ele){ this.ele = ele; } @Override public void run() { //執行消費過程 try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } if(ele!=null){ System.out.println("消費--->" + ele.toString()); } } } }