先囉嗦一點:java
因爲最近工做中,涉及到生產者消費者設計模式,對此有一些體會,因此總結一下,與你們分享。設計模式
在工做中,你們可能會碰到這樣一種狀況:某個模塊負責產生數據,這些數據由另外一個模塊來負責處理(此處的模塊是廣義的,能夠是類、函數、線程、進程等)。產生數據的模塊,就形象地稱爲生產者;而處理數據的模塊,就稱爲消費者。在生產者與消費者之間在加個緩衝區,咱們形象的稱之爲倉庫,生產者負責往倉庫了進商品,而消費者負責從倉庫裏拿商品,這就構成了生產者消費者模式。結構圖以下:服務器
一、先上代碼,簡要說明後再作分析:併發
a、首先創建一個隊列類(WorkQueue):函數
package cuu.com.workqueue;
import java.util.LinkedList;
/**
* 隊列
* @author DS
*
*/
public class WorkQueue {
/**
* 隊列名稱
*/
private String name;
/**
* 隊列大小
*/
private int size ;
/**
* 隊列最大容量
*/
private int upperSize = 100;
/**
* 當前隊列容量
*/
private int currentSize;
/**
* 存放對象的集合:存儲快、查詢慢
*/
private LinkedList queue = new LinkedList();
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getCurrentSize() {
return currentSize;
}
public void setCurrentSize(int currentSize) {
this.currentSize = currentSize;
}
public LinkedList getQueue() {
return queue;
}
public void setQueue(LinkedList queue) {
this.queue = queue;
}
public int getSize() {
return size;
}
public void setSize(int size) {
this.size = size;
}
public WorkQueue(String name){
super();
this.name = name;
}
public WorkQueue(String name ,int size){
super();
this.name = name;
if(size > upperSize){
upperSize = size;
}
}
private int notifySize = 0 ;//生產者工做次數
private int responseSize = 0 ;//消費者工做次數
/**
* 生產者
* @param object
*/
public synchronized void addWork(Object object){
if(currentSize <= upperSize ){
//放進集合
queue.add(object);
currentSize = queue.size();
//喚醒消費者
notifySize++;
System.out.println(">>>>> 生產者 工做完成,開始喚醒消費者,notifySize="+notifySize);
notify();
}
}
private int waitSize = 0 ;
/**
* 消費者
* @param object
* @return
*/
public synchronized Object getWork() throws InterruptedException{
while(queue.isEmpty()){
waitSize ++;
System.out.println("倉庫 沒東西, 消費者 waiting..... waitSize= "+ waitSize );
//消費者釋放對象鎖,由生產者獲取這個對象鎖
wait();
}
//移除並返回第一個元素
Object object = queue.removeFirst();
currentSize = queue.size();
responseSize ++;
System.out.println("**消費者 工做完成,開始喚醒生產者,responseSize="+responseSize);
return object;
}
}
測試
b、上面的隊列類負責處理數據的讀寫。那麼,咱們還須要一個隊列管理者來管理這些隊列this
package cuu.com.workqueue;spa
/**
* 隊列管理者
* @author DS
*
*/
public class EventQueueManager {
/**
* 接收NETBAR數據隊列
*/
public static WorkQueue receiveDataQueue = new WorkQueue("receiveDataQueue");
/**
* 接收LOCATOIN數據隊列
*/
public static WorkQueue receiveFjDataQueue = new WorkQueue("receiveDataQueue");
}
.net
c、上面的隊列管理類中會有不少的隊列類的實例,每一個實例負責一種數據的處理;接下來 就是客戶端的代碼:線程
package cuu.com.workqueue;
public class TestQueue {
private Integer id;
private String serviceCode;
private String serviceName;
private String address;
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getServiceCode() {
return serviceCode;
}
public void setServiceCode(String serviceCode) {
this.serviceCode = serviceCode;
}
public String getServiceName() {
return serviceName;
}
public void setServiceName(String serviceName) {
this.serviceName = serviceName;
}
public String getAddress() {
return address;
}
public void setAddress(String address) {
this.address = address;
}
public String toString() {
System.out.println("id="+getId()+",serviceCode="+getServiceCode()+",serviceName="+getServiceName()+",address="+getAddress()+"");
return "id="+getId()+",serviceCode="+getServiceCode()+",serviceName="+getServiceName()+",address="+getAddress()+"";
}
public TestQueue () {
new PentagonThread().start();//固然也能夠多個線程來處理數據
}
private boolean running = false;
public static void main(String[] args) {
for (int i = 0; i < 50; i++) {
TestQueue queue = new TestQueue();
queue.setId(new Integer(i));
queue.setAddress("美國五角大樓_00"+i+"號");
queue.setServiceCode("00012_"+i+"");
queue.setServiceName("聽風者00"+i+"碼");
EventQueueManager.receiveDataQueue.addWork(queue);//生產數據
}
}
class PentagonThread extends Thread {
public void run() {
if (running == true){
return;
}
running = true;
process();
}
}
/**
* 真正執行
*/
public void process() {
try {
TestQueue queue = (TestQueue) EventQueueManager.receiveDataQueue.getWork();//消費數據
if(queue != null){
//queue.toString();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
測試結果:
第一次測試:
倉庫 沒東西, 消費者 waiting..... waitSize= 1
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=1
*消費者 工做完成,開始喚醒生產者,responseSize=1
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=2
*消費者 工做完成,開始喚醒生產者,responseSize=2
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=3
*消費者 工做完成,開始喚醒生產者,responseSize=3
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=4
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=5
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=6
*消費者 工做完成,開始喚醒生產者,responseSize=4
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=7
*消費者 工做完成,開始喚醒生產者,responseSize=5
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=8
*消費者 工做完成,開始喚醒生產者,responseSize=6
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=9
*消費者 工做完成,開始喚醒生產者,responseSize=7
*消費者 工做完成,開始喚醒生產者,responseSize=8
*消費者 工做完成,開始喚醒生產者,responseSize=9
摯?沒東西, 消費者 waiting..... waitSize= 2
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=10
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=11
*消費者 工做完成,開始喚醒生產者,responseSize=10
*消費者 工做完成,開始喚醒生產者,responseSize=11
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=12
*消費者 工做完成,開始喚醒生產者,responseSize=12
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=13
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=14
*消費者 工做完成,開始喚醒生產者,responseSize=13
*消費者 工做完成,開始喚醒生產者,responseSize=14
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=15
*消費者 工做完成,開始喚醒生產者,responseSize=15
摯?沒東西, 消費者 waiting..... waitSize= 3
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=16
*消費者 工做完成,開始喚醒生產者,responseSize=16
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=17
*消費者 工做完成,開始喚醒生產者,responseSize=17
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=18
*消費者 工做完成,開始喚醒生產者,responseSize=18
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=19
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=20
*消費者 工做完成,開始喚醒生產者,responseSize=19
*消費者 工做完成,開始喚醒生產者,responseSize=20
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=21
*消費者 工做完成,開始喚醒生產者,responseSize=21
摯?沒東西, 消費者 waiting..... waitSize= 4
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=22
*消費者 工做完成,開始喚醒生產者,responseSize=22
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=23
*消費者 工做完成,開始喚醒生產者,responseSize=23
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=24
*消費者 工做完成,開始喚醒生產者,responseSize=24
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=25
*消費者 工做完成,開始喚醒生產者,responseSize=25
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=26
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=27
*消費者 工做完成,開始喚醒生產者,responseSize=26
*消費者 工做完成,開始喚醒生產者,responseSize=27
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=28
*消費者 工做完成,開始喚醒生產者,responseSize=28
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=29
*消費者 工做完成,開始喚醒生產者,responseSize=29
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=30
*消費者 工做完成,開始喚醒生產者,responseSize=30
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=31
*消費者 工做完成,開始喚醒生產者,responseSize=31
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=32
*消費者 工做完成,開始喚醒生產者,responseSize=32
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=33
*消費者 工做完成,開始喚醒生產者,responseSize=33
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=34
*消費者 工做完成,開始喚醒生產者,responseSize=34
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=35
*消費者 工做完成,開始喚醒生產者,responseSize=35
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=36
*消費者 工做完成,開始喚醒生產者,responseSize=36
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=37
*消費者 工做完成,開始喚醒生產者,responseSize=37
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=38
*消費者 工做完成,開始喚醒生產者,responseSize=38
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=39
*消費者 工做完成,開始喚醒生產者,responseSize=39
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=40
*消費者 工做完成,開始喚醒生產者,responseSize=40
摯?沒東西, 消費者 waiting..... waitSize= 5
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=41
*消費者 工做完成,開始喚醒生產者,responseSize=41
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=42
*消費者 工做完成,開始喚醒生產者,responseSize=42
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=43
*消費者 工做完成,開始喚醒生產者,responseSize=43
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=44
*消費者 工做完成,開始喚醒生產者,responseSize=44
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=45
*消費者 工做完成,開始喚醒生產者,responseSize=45
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=46
*消費者 工做完成,開始喚醒生產者,responseSize=46
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=47
*消費者 工做完成,開始喚醒生產者,responseSize=47
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=48
*消費者 工做完成,開始喚醒生產者,responseSize=48
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=49
*消費者 工做完成,開始喚醒生產者,responseSize=49
>>>> 生產者 工做完成,開始喚醒消費者,notifySize=50
*消費者 工做完成,開始喚醒生產者,responseSize=50
第二次測試:
倉庫 沒東西, 消費者 waiting..... waitSize= 1
>>>>> 生產者 工做完成,開始喚醒消費者,notifySize=1
**消費者 工做完成,開始喚醒生產者,responseSize=1
>>>>> 生產者 工做完成,開始喚醒消費者,notifySize=2
>>>>> 生產者 工做完成,開始喚醒消費者,notifySize=3
**消費者 工做完成,開始喚醒生產者,responseSize=2
**消費者 工做完成,開始喚醒生產者,responseSize=3
>>>>> 生產者 工做完成,開始喚醒消費者,notifySize=4
>>>>> 生產者 工做完成,開始喚醒消費者,notifySize=5
>>>>> 生產者 工做完成,開始喚醒消費者,notifySize=6
**消費者 工做完成,開始喚醒生產者,responseSize=4
>>>>> 生產者 工做完成,開始喚醒消費者,notifySize=7
**消費者 工做完成,開始喚醒生產者,responseSize=5
**消費者 工做完成,開始喚醒生產者,responseSize=6
>>>>> 生產者 工做完成,開始喚醒消費者,notifySize=8
**消費者 工做完成,開始喚醒生產者,responseSize=7
**消費者 工做完成,開始喚醒生產者,responseSize=8
>>>>> 生產者 工做完成,開始喚醒消費者,notifySize=9
>>>>> 生產者 工做完成,開始喚醒消費者,notifySize=10
>>>>> 生產者 工做完成,開始喚醒消費者,notifySize=11
>>>>> 生產者 工做完成,開始喚醒消費者,notifySize=12
>>>>> 生產者 工做完成,開始喚醒消費者,notifySize=13
>>>>> 生產者 工做完成,開始喚醒消費者,notifySize=14
**消費者 工做完成,開始喚醒生產者,responseSize=9
**消費者 工做完成,開始喚醒生產者,responseSize=10
>>>>> 生產者 工做完成,開始喚醒消費者,notifySize=15
>>>>> 生產者 工做完成,開始喚醒消費者,notifySize=16
>>>>> 生產者 工做完成,開始喚醒消費者,notifySize=17
>>>>> 生產者 工做完成,開始喚醒消費者,notifySize=18
**消費者 工做完成,開始喚醒生產者,responseSize=11
**消費者 工做完成,開始喚醒生產者,responseSize=12
**消費者 工做完成,開始喚醒生產者,responseSize=13
>>>>> 生產者 工做完成,開始喚醒消費者,notifySize=19
**消費者 工做完成,開始喚醒生產者,responseSize=14
**消費者 工做完成,開始喚醒生產者,responseSize=15
**消費者 工做完成,開始喚醒生產者,responseSize=16
>>>>> 生產者 工做完成,開始喚醒消費者,notifySize=20
**消費者 工做完成,開始喚醒生產者,responseSize=17
>>>>> 生產者 工做完成,開始喚醒消費者,notifySize=21
**消費者 工做完成,開始喚醒生產者,responseSize=18
>>>>> 生產者 工做完成,開始喚醒消費者,notifySize=22
**消費者 工做完成,開始喚醒生產者,responseSize=19
**消費者 工做完成,開始喚醒生產者,responseSize=20
>>>>> 生產者 工做完成,開始喚醒消費者,notifySize=23
**消費者 工做完成,開始喚醒生產者,responseSize=21
>>>>> 生產者 工做完成,開始喚醒消費者,notifySize=24
>>>>> 生產者 工做完成,開始喚醒消費者,notifySize=25
**消費者 工做完成,開始喚醒生產者,responseSize=22
>>>>> 生產者 工做完成,開始喚醒消費者,notifySize=26
**消費者 工做完成,開始喚醒生產者,responseSize=23
>>>>> 生產者 工做完成,開始喚醒消費者,notifySize=27
**消費者 工做完成,開始喚醒生產者,responseSize=24
**消費者 工做完成,開始喚醒生產者,responseSize=25
**消費者 工做完成,開始喚醒生產者,responseSize=26
**消費者 工做完成,開始喚醒生產者,responseSize=27
>>>>> 生產者 工做完成,開始喚醒消費者,notifySize=28
**消費者 工做完成,開始喚醒生產者,responseSize=28
>>>>> 生產者 工做完成,開始喚醒消費者,notifySize=29
>>>>> 生產者 工做完成,開始喚醒消費者,notifySize=30
**消費者 工做完成,開始喚醒生產者,responseSize=29
>>>>> 生產者 工做完成,開始喚醒消費者,notifySize=31
**消費者 工做完成,開始喚醒生產者,responseSize=30
>>>>> 生產者 工做完成,開始喚醒消費者,notifySize=32
**消費者 工做完成,開始喚醒生產者,responseSize=31
**消費者 工做完成,開始喚醒生產者,responseSize=32
倉庫 沒東西, 消費者 waiting..... waitSize= 2
>>>>> 生產者 工做完成,開始喚醒消費者,notifySize=33
**消費者 工做完成,開始喚醒生產者,responseSize=33
倉庫 沒東西, 消費者 waiting..... waitSize= 3
>>>>> 生產者 工做完成,開始喚醒消費者,notifySize=34
**消費者 工做完成,開始喚醒生產者,responseSize=34
>>>>> 生產者 工做完成,開始喚醒消費者,notifySize=35
**消費者 工做完成,開始喚醒生產者,responseSize=35
>>>>> 生產者 工做完成,開始喚醒消費者,notifySize=36
**消費者 工做完成,開始喚醒生產者,responseSize=36
>>>>> 生產者 工做完成,開始喚醒消費者,notifySize=37
**消費者 工做完成,開始喚醒生產者,responseSize=37
>>>>> 生產者 工做完成,開始喚醒消費者,notifySize=38
**消費者 工做完成,開始喚醒生產者,responseSize=38
>>>>> 生產者 工做完成,開始喚醒消費者,notifySize=39
**消費者 工做完成,開始喚醒生產者,responseSize=39
>>>>> 生產者 工做完成,開始喚醒消費者,notifySize=40
**消費者 工做完成,開始喚醒生產者,responseSize=40
>>>>> 生產者 工做完成,開始喚醒消費者,notifySize=41
**消費者 工做完成,開始喚醒生產者,responseSize=41
倉庫 沒東西, 消費者 waiting..... waitSize= 4
>>>>> 生產者 工做完成,開始喚醒消費者,notifySize=42
**消費者 工做完成,開始喚醒生產者,responseSize=42
>>>>> 生產者 工做完成,開始喚醒消費者,notifySize=43
**消費者 工做完成,開始喚醒生產者,responseSize=43
>>>>> 生產者 工做完成,開始喚醒消費者,notifySize=44
**消費者 工做完成,開始喚醒生產者,responseSize=44
>>>>> 生產者 工做完成,開始喚醒消費者,notifySize=45
**消費者 工做完成,開始喚醒生產者,responseSize=45
>>>>> 生產者 工做完成,開始喚醒消費者,notifySize=46
**消費者 工做完成,開始喚醒生產者,responseSize=46
>>>>> 生產者 工做完成,開始喚醒消費者,notifySize=47
**消費者 工做完成,開始喚醒生產者,responseSize=47
>>>>> 生產者 工做完成,開始喚醒消費者,notifySize=48
**消費者 工做完成,開始喚醒生產者,responseSize=48
>>>>> 生產者 工做完成,開始喚醒消費者,notifySize=49
**消費者 工做完成,開始喚醒生產者,responseSize=49
>>>>> 生產者 工做完成,開始喚醒消費者,notifySize=50
**消費者 工做完成,開始喚醒生產者,responseSize=50
結合以上的實現,總結以下:
一、解耦
假設生產者和消費者分別是兩個類。若是讓生產者直接調用消費者的某個方法,那 麼生產者對於消費者就會產生依賴(也就是耦合)。未來若是消費者的代碼發生變化, 可能會影響到生產者。而若是二者都依賴於某個緩衝區,二者之間不直接依賴,耦合也 就相應下降了。
舉個例子,咱們去郵局投遞信件,若是不使用郵筒(也就是緩衝區),你必須得把 信直接交給郵遞員。有同窗會說,直接給郵遞員不是挺簡單的嘛?其實不簡單,你必須 得認識誰是郵遞員,才能把信給他(光憑身上穿的制服,萬一有人假冒,就慘了)。這 就產生和你和郵遞員之間的依賴(至關於生產者和消費者的強耦合)。萬一哪天郵遞員 換人了,你還要從新認識一下(至關於消費者變化致使修改生產者代碼)。而郵筒相對 來講比較固定,你依賴它的成本就比較低(至關於和緩衝區之間的弱耦合)。
二、支持併發
因爲生產者與消費者是兩個獨立的併發體,他們之間是用緩衝區做爲橋樑鏈接,生產者只須要往緩衝區裏丟數據,就能夠繼續生產下一個數據,而消費者只須要從緩衝區了拿數據便可,這樣就不會由於彼此的處理速度而發生阻塞。
接上面的例子,若是咱們不使用郵筒,咱們就得在郵局等郵遞員,直到他回來,咱們把信件交給他,這期間咱們啥事兒都不能幹(也就是生產者阻塞),或者郵遞員得挨家挨戶問,誰要寄信(至關於消費者輪詢)。
三、支持忙閒不均
緩衝區還有另外一個好處。若是製造數據的速度時快時慢,緩衝區的好處就體現出來 了。當數據製造快的時候,消費者來不及處理,未處理的數據能夠暫時存在緩衝區中。 等生產者的製造速度慢下來,消費者再慢慢處理掉。
爲了充分複用,咱們再拿寄信的例子來講事。假設郵遞員一次只能帶走1000封信。 萬一某次碰上情人節(也多是聖誕節)送賀卡,須要寄出去的信超過1000封,這時 候郵筒這個緩衝區就派上用場了。郵遞員把來不及帶走的信暫存在郵筒中,等下次過來 時再拿走。
在咱們的數據接收R項目中,數據接收服務器要接受大批量的客戶端請求處理,一開始,天天要處理的數據最多也就100W, 隨着業務點的增多,數據愈來愈多,天天的平均數據達到百萬級別(重要地區的數據量甚至達到千萬);原始的哪一種串行化處理根原本不及,這樣就形成了大量數據的丟失,也形成了嚴重的後果;以後就採用了生產者消費者模式,在業務請求與業務處理間,創建了一個List 類型的緩衝區,服務端接收到業務請求,就往裏扔,而後再去接收下一個業務請求,而 多個業務處理線程,就會去緩衝區裏取業務請求處理。這樣就大大提升了服務器的相 應速度。