java:一個生產者消費者模式的簡單實現

先囉嗦一點:java

因爲最近工做中,涉及到生產者消費者設計模式,對此有一些體會,因此總結一下,與你們分享。設計模式


一、什麼是生產者消費者模式


在 工做中,你們可能會碰到這樣一種狀況:某個模塊負責產生數據,這些數據由另外一個模塊來負責處理(此處的模塊是廣義的,能夠是類、函數、線程、進程等)。產 生數據的模塊,就形象地稱爲生產者;而處理數據的模塊,就稱爲消費者。在生產者與消費者之間在加個緩衝區,咱們形象的稱之爲倉庫,生產者負責往倉庫了進商 品,而消費者負責從倉庫裏拿商品,這就構成了生產者消費者模式。結構圖以下:服務器

 

一、先上代碼,簡要說明後再作分析:併發

a、首先創建一個隊列類(WorkQueue):函數

 

package cuu.com.workqueue;
import java.util.LinkedList;
/**
 * 隊列
 * @author DS
 *
 */
public class WorkQueue {
 
 /**
  * 隊列名稱
  */
 private String name;
 
 /**
  * 隊列大小
  */
 private int size ;
 
 /**
  * 隊列最大容量
  */
 private int upperSize = 100;
 
 /**
  * 當前隊列容量
  */
 private int currentSize;
 
 /**
  * 存放對象的集合:存儲快、查詢慢
  */
 private LinkedList queue = new LinkedList();
 
 public String getName() {
  return name;
 }
 public void setName(String name) {
  this.name = name;
 }
 public int getCurrentSize() {
  return currentSize;
 }
 public void setCurrentSize(int currentSize) {
  this.currentSize = currentSize;
 }
 public LinkedList getQueue() {
  return queue;
 }
 public void setQueue(LinkedList queue) {
  this.queue = queue;
 }
 public int getSize() {
  return size;
 }
 public void setSize(int size) {
  this.size = size;
 }
 
 public WorkQueue(String name){
  super();
  this.name = name;
 }
 
 public WorkQueue(String name ,int size){
  super();
  this.name = name;
  if(size > upperSize){
   upperSize = size;
  }
 }
 private int notifySize = 0 ;//生產者工做次數
 private int responseSize = 0 ;//消費者工做次數
 /**
  * 生產者
  * @param object
  */
 public synchronized void addWork(Object object){
  if(currentSize <= upperSize ){
   //放進集合
   queue.add(object);
   currentSize = queue.size();
   //喚醒消費者
   notifySize++;
   System.out.println(">>>>> 生產者 工做完成,開始喚醒消費者,notifySize="+notifySize);
   notify();
  }
 }
 private int waitSize = 0 ;
 /**
  * 消費者
  * @param object
  * @return
  */
 public synchronized Object getWork() throws InterruptedException{
  while(queue.isEmpty()){
   waitSize ++;
   System.out.println("倉庫 沒東西,  消費者 waiting.....  waitSize= "+  waitSize );
   //消費者釋放對象鎖,由生產者獲取這個對象鎖
   wait();
  }
  
  //移除並返回第一個元素
  Object object = queue.removeFirst();
  currentSize = queue.size();
  responseSize ++;
  System.out.println("**消費者 工做完成,開始喚醒生產者,responseSize="+responseSize);
  return object;
 }
 
}
測試

b、上面的隊列類負責處理數據的讀寫。那麼,咱們還須要一個隊列管理者來管理這些隊列this

package cuu.com.workqueue;spa

/**
 * 隊列管理者
 * @author DS
 *
 */
public class EventQueueManager {
 
 /**
  * 接收NETBAR數據隊列
  */
 public static WorkQueue receiveDataQueue = new WorkQueue("receiveDataQueue");
 
 /**
  * 接收LOCATOIN數據隊列
  */
 public static WorkQueue receiveFjDataQueue = new WorkQueue("receiveDataQueue");
 
}
.net

c、上面的隊列管理類中會有不少的隊列類的實例,每一個實例負責一種數據的處理;接下來 就是客戶端的代碼:線程

package cuu.com.workqueue;

public class TestQueue {

 private Integer id;
 
 private String serviceCode;

 private String serviceName;

 private String address;
 
 public Integer getId() {
  return id;
 }

 public void setId(Integer id) {
  this.id = id;
 }

 public String getServiceCode() {
  return serviceCode;
 }

 public void setServiceCode(String serviceCode) {
  this.serviceCode = serviceCode;
 }

 public String getServiceName() {
  return serviceName;
 }

 public void setServiceName(String serviceName) {
  this.serviceName = serviceName;
 }

 public String getAddress() {
  return address;
 }

 public void setAddress(String address) {
  this.address = address;
 }

 public String toString() {
  System.out.println("id="+getId()+",serviceCode="+getServiceCode()+",serviceName="+getServiceName()+",address="+getAddress()+"");
  return "id="+getId()+",serviceCode="+getServiceCode()+",serviceName="+getServiceName()+",address="+getAddress()+"";
 }
 
 public TestQueue () {
  new PentagonThread().start();//固然也能夠多個線程來處理數據
 }
 
 private boolean running = false;
 
 public static void main(String[] args) {
  for (int i = 0; i < 50; i++) {
   TestQueue queue = new TestQueue();
   queue.setId(new Integer(i));
   queue.setAddress("美國五角大樓_00"+i+"號");
   queue.setServiceCode("00012_"+i+"");
   queue.setServiceName("聽風者00"+i+"碼");
   EventQueueManager.receiveDataQueue.addWork(queue);//生產數據
  }
 }
 
 class PentagonThread extends Thread  {

  public void run() {
   if (running == true){
    return;
   }
   running = true;
   process();
  }
  
 }

 /**
  * 真正執行
  */
 public void process() {
    try {
    TestQueue queue =  (TestQueue) EventQueueManager.receiveDataQueue.getWork();//消費數據
    if(queue != null){
     //queue.toString();
    }
   } catch (InterruptedException e) {
    e.printStackTrace();
   }
 }
 
}

 

測試結果:

第一次測試:

倉庫 沒東西,  消費者 waiting.....  waitSize= 1    
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=1  
*消費者 工做完成,開始喚醒生產者,responseSize=1    
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=2  
*消費者 工做完成,開始喚醒生產者,responseSize=2    
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=3  
*消費者 工做完成,開始喚醒生產者,responseSize=3    
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=4  
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=5  
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=6  
*消費者 工做完成,開始喚醒生產者,responseSize=4    
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=7  
*消費者 工做完成,開始喚醒生產者,responseSize=5    
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=8  
*消費者 工做完成,開始喚醒生產者,responseSize=6    
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=9  
*消費者 工做完成,開始喚醒生產者,responseSize=7    
*消費者 工做完成,開始喚醒生產者,responseSize=8    
*消費者 工做完成,開始喚醒生產者,responseSize=9    
摯?沒東西,  消費者 waiting.....  waitSize= 2     
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=10 
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=11 
*消費者 工做完成,開始喚醒生產者,responseSize=10   
*消費者 工做完成,開始喚醒生產者,responseSize=11   
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=12 
*消費者 工做完成,開始喚醒生產者,responseSize=12   
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=13 
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=14 
*消費者 工做完成,開始喚醒生產者,responseSize=13   
*消費者 工做完成,開始喚醒生產者,responseSize=14   
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=15 
*消費者 工做完成,開始喚醒生產者,responseSize=15   
摯?沒東西,  消費者 waiting.....  waitSize= 3     
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=16 
*消費者 工做完成,開始喚醒生產者,responseSize=16   
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=17 
*消費者 工做完成,開始喚醒生產者,responseSize=17   
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=18 
*消費者 工做完成,開始喚醒生產者,responseSize=18   
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=19 
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=20 
*消費者 工做完成,開始喚醒生產者,responseSize=19   
*消費者 工做完成,開始喚醒生產者,responseSize=20   
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=21 
*消費者 工做完成,開始喚醒生產者,responseSize=21   
摯?沒東西,  消費者 waiting.....  waitSize= 4     
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=22 
*消費者 工做完成,開始喚醒生產者,responseSize=22   
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=23 
*消費者 工做完成,開始喚醒生產者,responseSize=23   
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=24 
*消費者 工做完成,開始喚醒生產者,responseSize=24   
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=25 
*消費者 工做完成,開始喚醒生產者,responseSize=25   
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=26 
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=27 
*消費者 工做完成,開始喚醒生產者,responseSize=26   
*消費者 工做完成,開始喚醒生產者,responseSize=27   
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=28 
*消費者 工做完成,開始喚醒生產者,responseSize=28   
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=29 
*消費者 工做完成,開始喚醒生產者,responseSize=29   
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=30 
*消費者 工做完成,開始喚醒生產者,responseSize=30   
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=31 
*消費者 工做完成,開始喚醒生產者,responseSize=31   
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=32 
*消費者 工做完成,開始喚醒生產者,responseSize=32   
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=33 
*消費者 工做完成,開始喚醒生產者,responseSize=33   
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=34 
*消費者 工做完成,開始喚醒生產者,responseSize=34   
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=35 
*消費者 工做完成,開始喚醒生產者,responseSize=35   
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=36 
*消費者 工做完成,開始喚醒生產者,responseSize=36   
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=37 
*消費者 工做完成,開始喚醒生產者,responseSize=37   
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=38 
*消費者 工做完成,開始喚醒生產者,responseSize=38   
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=39 
*消費者 工做完成,開始喚醒生產者,responseSize=39   
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=40 
*消費者 工做完成,開始喚醒生產者,responseSize=40   
摯?沒東西,  消費者 waiting.....  waitSize= 5     
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=41 
*消費者 工做完成,開始喚醒生產者,responseSize=41   
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=42 
*消費者 工做完成,開始喚醒生產者,responseSize=42   
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=43 
*消費者 工做完成,開始喚醒生產者,responseSize=43   
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=44 
*消費者 工做完成,開始喚醒生產者,responseSize=44   
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=45 
*消費者 工做完成,開始喚醒生產者,responseSize=45   
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=46 
*消費者 工做完成,開始喚醒生產者,responseSize=46   
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=47 
*消費者 工做完成,開始喚醒生產者,responseSize=47   
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=48 
*消費者 工做完成,開始喚醒生產者,responseSize=48   
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=49 
*消費者 工做完成,開始喚醒生產者,responseSize=49   
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=50 
*消費者 工做完成,開始喚醒生產者,responseSize=50   

第二次測試:

  倉庫 沒東西,  消費者 waiting.....  waitSize= 1     
  >>>>> 生產者 工做完成,開始喚醒消費者,notifySize=1  
  **消費者 工做完成,開始喚醒生產者,responseSize=1    
  >>>>> 生產者 工做完成,開始喚醒消費者,notifySize=2  
  >>>>> 生產者 工做完成,開始喚醒消費者,notifySize=3  
  **消費者 工做完成,開始喚醒生產者,responseSize=2    
  **消費者 工做完成,開始喚醒生產者,responseSize=3    
  >>>>> 生產者 工做完成,開始喚醒消費者,notifySize=4  
  >>>>> 生產者 工做完成,開始喚醒消費者,notifySize=5  
  >>>>> 生產者 工做完成,開始喚醒消費者,notifySize=6  
  **消費者 工做完成,開始喚醒生產者,responseSize=4    
  >>>>> 生產者 工做完成,開始喚醒消費者,notifySize=7  
  **消費者 工做完成,開始喚醒生產者,responseSize=5    
  **消費者 工做完成,開始喚醒生產者,responseSize=6    
  >>>>> 生產者 工做完成,開始喚醒消費者,notifySize=8  
  **消費者 工做完成,開始喚醒生產者,responseSize=7    
  **消費者 工做完成,開始喚醒生產者,responseSize=8    
  >>>>> 生產者 工做完成,開始喚醒消費者,notifySize=9  
  >>>>> 生產者 工做完成,開始喚醒消費者,notifySize=10 
  >>>>> 生產者 工做完成,開始喚醒消費者,notifySize=11 
  >>>>> 生產者 工做完成,開始喚醒消費者,notifySize=12 
  >>>>> 生產者 工做完成,開始喚醒消費者,notifySize=13 
  >>>>> 生產者 工做完成,開始喚醒消費者,notifySize=14 
  **消費者 工做完成,開始喚醒生產者,responseSize=9    
  **消費者 工做完成,開始喚醒生產者,responseSize=10   
  >>>>> 生產者 工做完成,開始喚醒消費者,notifySize=15 
  >>>>> 生產者 工做完成,開始喚醒消費者,notifySize=16 
  >>>>> 生產者 工做完成,開始喚醒消費者,notifySize=17 
  >>>>> 生產者 工做完成,開始喚醒消費者,notifySize=18 
  **消費者 工做完成,開始喚醒生產者,responseSize=11   
  **消費者 工做完成,開始喚醒生產者,responseSize=12   
  **消費者 工做完成,開始喚醒生產者,responseSize=13   
  >>>>> 生產者 工做完成,開始喚醒消費者,notifySize=19 
  **消費者 工做完成,開始喚醒生產者,responseSize=14   
  **消費者 工做完成,開始喚醒生產者,responseSize=15   
  **消費者 工做完成,開始喚醒生產者,responseSize=16   
  >>>>> 生產者 工做完成,開始喚醒消費者,notifySize=20 
  **消費者 工做完成,開始喚醒生產者,responseSize=17   
  >>>>> 生產者 工做完成,開始喚醒消費者,notifySize=21 
  **消費者 工做完成,開始喚醒生產者,responseSize=18   
  >>>>> 生產者 工做完成,開始喚醒消費者,notifySize=22 
  **消費者 工做完成,開始喚醒生產者,responseSize=19   
  **消費者 工做完成,開始喚醒生產者,responseSize=20   
  >>>>> 生產者 工做完成,開始喚醒消費者,notifySize=23 
  **消費者 工做完成,開始喚醒生產者,responseSize=21   
  >>>>> 生產者 工做完成,開始喚醒消費者,notifySize=24 
  >>>>> 生產者 工做完成,開始喚醒消費者,notifySize=25 
  **消費者 工做完成,開始喚醒生產者,responseSize=22   
  >>>>> 生產者 工做完成,開始喚醒消費者,notifySize=26 
  **消費者 工做完成,開始喚醒生產者,responseSize=23   
  >>>>> 生產者 工做完成,開始喚醒消費者,notifySize=27 
  **消費者 工做完成,開始喚醒生產者,responseSize=24   
  **消費者 工做完成,開始喚醒生產者,responseSize=25   
  **消費者 工做完成,開始喚醒生產者,responseSize=26   
  **消費者 工做完成,開始喚醒生產者,responseSize=27   
  >>>>> 生產者 工做完成,開始喚醒消費者,notifySize=28 
  **消費者 工做完成,開始喚醒生產者,responseSize=28   
  >>>>> 生產者 工做完成,開始喚醒消費者,notifySize=29 
  >>>>> 生產者 工做完成,開始喚醒消費者,notifySize=30 
  **消費者 工做完成,開始喚醒生產者,responseSize=29   
  >>>>> 生產者 工做完成,開始喚醒消費者,notifySize=31 
  **消費者 工做完成,開始喚醒生產者,responseSize=30   
  >>>>> 生產者 工做完成,開始喚醒消費者,notifySize=32 
  **消費者 工做完成,開始喚醒生產者,responseSize=31   
  **消費者 工做完成,開始喚醒生產者,responseSize=32   
  倉庫 沒東西,  消費者 waiting.....  waitSize= 2     
  >>>>> 生產者 工做完成,開始喚醒消費者,notifySize=33 
  **消費者 工做完成,開始喚醒生產者,responseSize=33   
  倉庫 沒東西,  消費者 waiting.....  waitSize= 3     
  >>>>> 生產者 工做完成,開始喚醒消費者,notifySize=34 
  **消費者 工做完成,開始喚醒生產者,responseSize=34   
  >>>>> 生產者 工做完成,開始喚醒消費者,notifySize=35 
  **消費者 工做完成,開始喚醒生產者,responseSize=35   
  >>>>> 生產者 工做完成,開始喚醒消費者,notifySize=36 
  **消費者 工做完成,開始喚醒生產者,responseSize=36   
  >>>>> 生產者 工做完成,開始喚醒消費者,notifySize=37 
  **消費者 工做完成,開始喚醒生產者,responseSize=37   
  >>>>> 生產者 工做完成,開始喚醒消費者,notifySize=38 
  **消費者 工做完成,開始喚醒生產者,responseSize=38   
  >>>>> 生產者 工做完成,開始喚醒消費者,notifySize=39 
  **消費者 工做完成,開始喚醒生產者,responseSize=39   
  >>>>> 生產者 工做完成,開始喚醒消費者,notifySize=40 
  **消費者 工做完成,開始喚醒生產者,responseSize=40   
  >>>>> 生產者 工做完成,開始喚醒消費者,notifySize=41 
  **消費者 工做完成,開始喚醒生產者,responseSize=41   
  倉庫 沒東西,  消費者 waiting.....  waitSize= 4     
  >>>>> 生產者 工做完成,開始喚醒消費者,notifySize=42 
  **消費者 工做完成,開始喚醒生產者,responseSize=42   
  >>>>> 生產者 工做完成,開始喚醒消費者,notifySize=43 
  **消費者 工做完成,開始喚醒生產者,responseSize=43   
  >>>>> 生產者 工做完成,開始喚醒消費者,notifySize=44 
  **消費者 工做完成,開始喚醒生產者,responseSize=44   
  >>>>> 生產者 工做完成,開始喚醒消費者,notifySize=45 
  **消費者 工做完成,開始喚醒生產者,responseSize=45   
  >>>>> 生產者 工做完成,開始喚醒消費者,notifySize=46 
  **消費者 工做完成,開始喚醒生產者,responseSize=46   
  >>>>> 生產者 工做完成,開始喚醒消費者,notifySize=47 
  **消費者 工做完成,開始喚醒生產者,responseSize=47   
  >>>>> 生產者 工做完成,開始喚醒消費者,notifySize=48 
  **消費者 工做完成,開始喚醒生產者,responseSize=48   
  >>>>> 生產者 工做完成,開始喚醒消費者,notifySize=49 
  **消費者 工做完成,開始喚醒生產者,responseSize=49   
  >>>>> 生產者 工做完成,開始喚醒消費者,notifySize=50 
  **消費者 工做完成,開始喚醒生產者,responseSize=50   

結合以上的實現,總結以下:

二、生產者消費者模式的優勢

一、解耦

                 假 設生產者和消費者分別是兩個類。若是讓生產者直接調用消費者的某個方法,那 麼生產者對於消費者就會產生依賴(也就是耦合)。未來若是消費者的代碼發生變化, 可能會影響到生產者。而若是二者都依賴於某個緩衝區,二者之間不直接依賴,耦合也 就相應下降了。

舉個例子,咱們去郵局投遞信件,若是不使 用郵筒(也就是緩衝區),你必須得把 信直接交給郵遞員。有同窗會說,直接給郵遞員不是挺簡單的嘛?其實不簡單,你必須 得認識誰是郵遞員,才能把信給他(光憑身上穿的制服,萬一有人假冒,就慘了)。這 就產生和你和郵遞員之間的依賴(至關於生產者和消費者的強耦合)。萬一哪天郵遞員 換人了,你還要從新認識一下(至關於消費者變化致使修改生產者代碼)。而郵筒相對 來講比較固定,你依賴它的成本就比較低(至關於和緩衝區之間的弱耦合)。

二、支持併發

                 因爲生產者與消費者是兩個獨立的併發體,他們之間是用緩衝區做爲橋樑鏈接,生產者只須要往緩衝區裏丟數據,就能夠繼續生產下一個數據,而消費者只須要從緩衝區了拿數據便可,這樣就不會由於彼此的處理速度而發生阻塞。

接上面的例子,若是咱們不使用郵筒,咱們就得在郵局等郵遞員,直到他回來,咱們把信件交給他,這期間咱們啥事兒都不能幹(也就是生產者阻塞),或者郵遞員得挨家挨戶問,誰要寄信(至關於消費者輪詢)。

三、支持忙閒不均

                   緩衝區還有另外一個好處。若是製造數據的速度時快時慢,緩衝區的好處就體現出來 了。當數據製造快的時候,消費者來不及處理,未處理的數據能夠暫時存在緩衝區中。 等生產者的製造速度慢下來,消費者再慢慢處理掉。

爲了充分複用,咱們再拿寄信的例子來講事。假設郵遞員一次只能帶走1000封信。 萬一某次碰上情人節(也多是聖誕節)送賀卡,須要寄出去的信超過1000封,這時 候郵筒這個緩衝區就派上用場了。郵遞員把來不及帶走的信暫存在郵筒中,等下次過來 時再拿走。

三、實際應用

          在咱們的數據接收R項目中,數據接收服務器要接受大批量的客戶端請求處理,一開始,天天要處理的數據最多也就100W, 隨着業務點的增多,數據愈來愈多,天天的平均數據達到百萬級別(重要地區的數據量甚至達到千萬);原始的哪一種串行化處理根原本不及,這樣就形成了大量數據 的丟失,也形成了嚴重的後果;以後就採用了生產者消費者模式,在業務請求與業務處理間,創建了一個List 類型的緩衝區,服務端接收到業務請求,就往裏扔,而後再去接收下一個業務請求,而 多個業務處理線程,就會去緩衝區裏取業務請求處理。這樣就大大提升了服務器的相 應速度。

相關文章
相關標籤/搜索