生產者消費者模型

內容:html

1.什麼是生產者消費者模型python

2.python實現生產者消費者模型安全

3.Java實現生產者消費者模型服務器

 

參考:https://www.cnblogs.com/Eva-J/articles/8253549.html多線程

 

 

 

1.什麼是生產者消費者模型架構

(1)前言併發

隨着軟件業的發展,互聯網用戶的日漸增多,併發這門藝術的興起彷佛是那麼合情合理。每日PV十多億的淘寶,處理併發的手段可謂是業界一流。用戶訪問淘寶首頁的平均等待時間只有區區幾秒,可是服務器所處理的流程十分複雜框架

首先負責首頁的服務器就有好幾千臺,經過計算把與用戶路由最近的服務器處理首頁的返回。其次是網頁上的資源,就JS和CSS文件就有上百個,還有圖片資源等。它能在幾秒內加載出來可見阿里幾千名頂尖工程師的智慧是如何登峯造極dom

在大型電商網站中,他們的服務或者應用解耦以後,是經過消息隊列在彼此間通訊的。消息隊列和應用之間的架構關係就是生產者消費者模型網站

 

(2)生產者消費者模型

生產者消費者模型具體來說,就是在一個系統中,存在生產者和消費者兩種角色,他們經過內存緩衝區進行通訊,生產者生產消費者須要的資料,消費者把資料作成產品。生產消費者模式以下圖:

              

在日益發展的服務類型中,譬如註冊用戶這種服務,它可能解耦成好幾種獨立的服務(帳號驗證,郵箱驗證碼,手機短信碼等)。它們做爲消費者,等待用戶輸入數據,在前臺數據提交以後會通過分解併發送到各個服務所在的url,分發的那個角色就至關於生產者。消費者在獲取數據時候有可能一次不能處理完,那麼它們各自有一個請求隊列,那就是內存緩衝區了。作這項工做的框架叫作消息隊列

 

爲何要使用生產者和消費者模式:

在線程世界裏,生產者就是生產數據的線程,消費者就是消費數據的線程。

在多線程開發當中,若是生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產數據。

一樣的道理,若是消費者的處理能力大於生產者,那麼消費者就必須等待生產者。爲了解決這個問題因而引入了生產者和消費者模式

 

(3)生產者消費者模型的簡單僞代碼

 1 semaphore mutex=1;  // 臨界區互斥信號量
 2 semaphore empty=n;  // 空閒緩衝區
 3 semaphore full=0;       // 緩衝區初始化爲空
 4 producer ()                 // 生產者進程 
 5 {
 6     while(1)
 7     {
 8         produce an item in nextp;      // 生產數據
 9         P(empty);                             // 獲取空緩衝區單元
10         P(mutex);                             // 進入臨界區.
11         add nextp to buffer;              // 將數據放入緩衝區
12         V(mutex);                            // 離開臨界區,釋放互斥信號量
13         V(full);                                 // 滿緩衝區數加1
14     }    
15 }
16 
17 consumer ()                // 消費者進程
18 {
19     while(1)
20     {
21         P(full);                                  // 獲取滿緩衝區單元
22         P(mutex);                             // 進入臨界區
23         remove an item from buffer;   // 從緩衝區中取出數據
24         V (mutex);                            // 離開臨界區,釋放互斥信號量
25         V (empty) ;                           // 空緩衝區數加1
26         consume the item;                 // 消費數據
27     }
28 }

 

 

2.python實現生產者消費者模型

(1)用基本的隊列實現

 1 import time, random
 2 from multiprocessing import Process, Queue
 3 
 4 def producer(name, food, q):
 5     for i in range(10):
 6         time.sleep(random.randint(1, 3))
 7         f = "%s生產了%s%s" % (name, food, i+1)
 8         print(f)
 9         q.put(f)
10 
11 def consumer(name, q):
12     while True:
13         food = q.get()
14         if food is None:
15             print("%s獲取到了一個空" % name)
16             break
17         print("\033[31m%s消費: %s\033[0m" % (name, food))
18         time.sleep(random.randint(1, 3))
19 
20 if __name__ == '__main__':
21     que = Queue(20)
22     p1 = Process(target=producer, args=('wyb', '包子', que))
23     p1.start()
24     c1 = Process(target=consumer, args=('xxx', que))
25     c1.start()
26     p1.join()
27     que.put(None)

 

(2)JoinableQueue實現實現生產者消費者模型

JoinableQueue([maxsize]) :建立可鏈接的共享進程隊列。這就像是一個Queue對象,但隊列容許項目的使用者通知生產者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的

1 JoinableQueue的實例除了與Queue對象相同的方法以外,還具備如下方法:
2 
3 q.task_done(): 
4 使用者使用此方法發出信號,表示q.get()返回的項目已經被處理。若是調用此方法的次數大於從隊列中刪除的項目數量,將引起ValueError異常
5 
6 q.join():
7 生產者將使用此方法進行阻塞,直到隊列中全部項目均被處理。阻塞將持續到爲隊列中的每一個項目均調用q.task_done()方法爲止
 1 # JoinableQueue實現生產者消費者模型
 2 import time
 3 import random
 4 from multiprocessing import Process, JoinableQueue
 5 
 6 def producer(name, food, q):
 7     for i in range(10):
 8         time.sleep(random.randint(1, 3))
 9         f = "%s%s produced by %s" % (food, i+1, name)
10         print(("%s生產: " % name) + f)
11         q.put(f)
12     q.join()        # 阻塞 直到一個隊列中的數據 所有被處理完畢
13 
14 def consumer(name, q):
15     while True:
16         food = q.get()
17         print("\033[31m%s消費: %s\033[0m" % (name, food))
18         time.sleep(random.randint(1, 3))
19         q.task_done()       # 向q.join()發送一次信號,證實一個數據已經被取走了
20 
21 
22 if __name__ == '__main__':
23     que = JoinableQueue(20)
24     p1 = Process(target=producer, args=('wyb', '包子', que))
25 
26     c1 = Process(target=consumer, args=('xxx', que))
27     c1.daemon = True        # 設置消費者爲守護進程 當主進程的代碼執行完畢以後 子進程自動結束
28 
29     p1.start()
30     c1.start()
31     p1.join()
 1 # 上面代碼的過程:
 2 #  在消費者這一端:
 3     # 每次獲取一個數據
 4     # 處理一個數據
 5     # 發送一個記號 : 標誌一個數據被處理成功
 6 
 7 # 在生產者這一端:
 8     # 每一次生產一個數據,
 9     # 且每一次生產的數據都放在隊列中
10     # 在隊列中刻上一個記號
11     # 當生產者所有生產完畢以後,
12     # join信號 : 已經中止生產數據了
13                 # 且要等待以前被刻上的記號都被消費完
14                 # 當數據都被處理完時,join阻塞結束
15 
16 # consumer 中把全部的任務消耗完
17 # producer 端 的 join感知到,中止阻塞
18 # 全部的producer進程結束
19 # 主進程中的p.join結束
20 # 主進程中代碼結束
21 # 守護進程(消費者的進程)結束

 

(3)用管道實現生產者消費者模型

 1 from multiprocessing import Process, Pipe, Lock
 2 
 3 def consumer(con, pro, name, lk):
 4     pro.close()
 5     while True:
 6         lk.acquire()
 7         food = con.recv()
 8         lk.release()
 9         if food is not None:
10             print("%s消費: %s" % (name, food))
11         else:
12             con.close()
13             break
14 
15 def producer(con, pro, name, food, lk):
16     con.close()
17     for i in range(20):
18         f = "%s生產: %s%s" % (name, food, i+1)
19         print(f)
20         pro.send(f)
21     pro.send(None)
22     pro.send(None)
23     pro.close()
24 
25 if __name__ == '__main__':
26     lock = Lock()
27     con_pipe, pro_pipe = Pipe()
28     p = Process(target=producer, args=(con_pipe, pro_pipe, 'wyb', '包子', lock))
29     c1 = Process(target=consumer, args=(con_pipe, pro_pipe, 'xxx', lock))
30     c2 = Process(target=consumer, args=(con_pipe, pro_pipe, 'abc', lock))
31     p.start()
32     c1.start()
33     c2.start()
34     con_pipe.close()
35     pro_pipe.close()

注意:

使用管道會帶來數據不安全性的問題,能夠加鎖來控制操做管道的行爲 來避免進程之間爭搶數據形成的數據不安全現象

可是隊列不會帶來數據不安全性的問題,這是由於隊列是基於管道和鎖實現的

 

 

3.Java實現生產者消費者模型

相關文章
相關標籤/搜索