python之路-day31-守護進程、鎖、隊列、生產者消費者模型

 

1、守護進程python

    以前咱們講的子進程是不會隨着主進程而結束的,子進程所有執行完以後,程序才結束,那麼若是有linux

  一天咱們的需求是個人主進程結束了,由主進程建立的子進程必須跟着結束,怎麼辦?守護進程就來了!編程

    主進程建立守護進程json

      其一:守護進程會在主進程代碼執行結束後就終止windows

      其二:守護進程內沒法再開啓子進程,不然拋出異常:AssertionError: daemonic processes are not allowed to have children安全

    注意:進程之間是互相獨立的,主程序代碼運行結束,守護進程隨即終止網絡

 1 import os  2 import time  3 from multiprocessing import Process  4 
 5 class Myprocess(Process):  6     def __init__(self,person):  7         super().__init__()  8         self.person = person  9     def run(self): 10         print(os.getpid(),self.name) 11         print('%s正在和女主播聊天' %self.person) 12         time.sleep(3) 13 if __name__ == '__main__': 14     p=Myprocess('太白') 15     p.daemon=True #必定要在p.start()前設置,設置p爲守護進程,禁止p建立子進程,而且父進程代碼執行結束,p即終止運行
16  p.start() 17     # time.sleep(1) # 在sleep時linux下查看進程id對應的進程ps -ef|grep id
18     print('')
守護進程

 

2、進程同步(鎖)多線程

    咱們以前實現了進程的異步,讓多個任務能夠同時在幾個進程中併發處理,他們之間的運行時沒有順序的,一旦併發

  開啓也不受咱們控制。儘管併發編程讓咱們能更加充分的利用IO資源,可是也給咱們帶來了新的問題:進程之間數據dom

  不共享。雖然能夠共享同一臺文件系統,因此訪問同一個文件,或者同一個打印紅緞,是沒有問題的,而共享帶來的是

  競爭,競爭帶來的結果就是錯亂,如何控制,就是加鎖處理。

 1 import os  2 import time  3 import random  4 from multiprocessing import Process  5 
 6 def work(n):  7     print('%s: %s is running' %(n,os.getpid()))  8  time.sleep(random.random())  9     print('%s:%s is done' %(n,os.getpid())) 10 
11 if __name__ == '__main__': 12     for i in range(5): 13         p=Process(target=work,args=(i,)) 14  p.start() 15 
16 # 看結果:經過結果能夠看出兩個問題:問題一:每一個進程中work函數的第一個打印就不是按照咱們for循環的0-4的順序來打印的
17 #問題二:咱們發現,每一個work進程中有兩個打印,可是咱們看到全部進程中第一個打印的順序爲0-2-1-4-3,可是第二個打印沒有按照這個順序,變成了2-1-0-3-4,說明咱們一個進程中的程序的執行順序都混亂了。
18 #問題的解決方法,第二個問題加鎖來解決,第一個問題是沒有辦法解決的,由於進程開到了內核,有操做系統來決定進程的調度,咱們本身控制不了
19 # 0: 9560 is running
20 # 2: 13824 is running
21 # 1: 7476 is running
22 # 4: 11296 is running
23 # 3: 14364 is running
24 
25 # 2:13824 is done
26 # 1:7476 is done
27 # 0:9560 is done
28 # 3:14364 is done
29 # 4:11296 is done
多進程搶佔輸出資源,致使打印混亂
 1 #由併發變成了串行,犧牲了運行效率,但避免了競爭
 2 from multiprocessing import Process,Lock  3 import os,time  4 def work(n,lock):  5     #加鎖,保證每次只有一個進程在執行鎖裏面的程序,這一段程序對於全部寫上這個鎖的進程,你們都變成了串行
 6  lock.acquire()  7     print('%s: %s is running' %(n,os.getpid()))  8     time.sleep(1)  9     print('%s:%s is done' %(n,os.getpid())) 10     #解鎖,解鎖以後其餘進程才能去執行本身的程序
11  lock.release() 12 if __name__ == '__main__': 13     lock=Lock() 14     for i in range(5): 15         p=Process(target=work,args=(i,lock)) 16  p.start() 17 
18 #打印結果:
19 # 2: 10968 is running
20 # 2:10968 is done
21 # 0: 7932 is running
22 # 0:7932 is done
23 # 4: 4404 is running
24 # 4:4404 is done
25 # 1: 12852 is running
26 # 1:12852 is done
27 # 3: 980 is running
28 # 3:980 is done
29 
30 #結果分析:(本身去屢次運行一下,看看結果,我拿出其中一個結果來看)經過結果咱們能夠看出,多進程剛開始去執行的時候,每次運行,首先打印出來哪一個進程的程序是不固定的,可是咱們解決了上面打印混亂示例代碼的第二個問題,那就是同一個進程中的兩次打印都是先完成的,而後才切換到下一個進程去,打印下一個進程中的兩個打印結果,說明咱們控制住了同一進程中的代碼執行順序,若是涉及到多個進程去操做同一個數據或者文件的時候,就不擔憂數據算錯或者文件中的內容寫入混亂了。
加鎖:由併發改爲了串行,犧牲了運行效率,但避免了競爭

    上面這種狀況雖然用加鎖的形式是險惡熟悉怒的執行,可是程序又從新變成串行了,這樣確實會浪費了時間

  可是卻保證了數據安全

    接下來,模擬搶票爲例,來看看數據安全的重要性。

 1 #注意:首先在當前文件目錄下建立一個名爲db的文件
 2 #文件db的內容爲:{"count":1},只有這一行數據,而且注意,每次運行完了以後,文件中的1變成了0,你須要手動將0改成1,而後在去運行代碼。
 3 #注意必定要用雙引號,否則json沒法識別
 4 #併發運行,效率高,但競爭寫同一文件,數據寫入錯亂
 5 from multiprocessing import Process,Lock  6 import time,json,random  7 
 8 #查看剩餘票數
 9 def search(): 10     dic=json.load(open('db')) #打開文件,直接load文件中的內容,拿到文件中的包含剩餘票數的字典
11     print('\033[43m剩餘票數%s\033[0m' %dic['count']) 12 
13 #搶票
14 def get(): 15     dic=json.load(open('db')) 16     time.sleep(0.1)       #模擬讀數據的網絡延遲,那麼進程之間的切換,致使全部人拿到的字典都是{"count": 1},也就是每一個人都拿到了這一票。
17     if dic['count'] >0: 18         dic['count']-=1
19         time.sleep(0.2)   #模擬寫數據的網絡延遲
20         json.dump(dic,open('db','w')) 21         #最終結果致使,每一個人顯示都搶到了票,這就出現了問題~
22         print('\033[43m購票成功\033[0m') 23 
24 def task(): 25  search() 26  get() 27 
28 if __name__ == '__main__': 29     for i in range(3): #模擬併發100個客戶端搶票
30         p=Process(target=task) 31  p.start() 32 
33 #看結果分析:因爲網絡延遲等緣由使得進程切換,致使每一個人都搶到了這最後一張票
34 # 剩餘票數1
35 # 剩餘票數1
36 # 剩餘票數1
37 # 購票成功
38 # 購票成功
39 # 購票成功
併發運行,效率高,可是競爭同一個文件,致使數據混亂
 1 #注意:首先在當前文件目錄下建立一個名爲db的文件
 2 #文件db的內容爲:{"count":1},只有這一行數據,而且注意,每次運行完了以後,文件中的1變成了0,你須要手動將0改成1,而後在去運行代碼。
 3 #注意必定要用雙引號,否則json沒法識別
 4 #加鎖保證數據安全,不出現混亂
 5 from multiprocessing import Process,Lock  6 import time,json,random  7 
 8 #查看剩餘票數
 9 def search(): 10     dic=json.load(open('db')) #打開文件,直接load文件中的內容,拿到文件中的包含剩餘票數的字典
11     print('\033[43m剩餘票數%s\033[0m' %dic['count']) 12 
13 #搶票
14 def get(): 15     dic=json.load(open('db')) 16     time.sleep(0.1)       #模擬讀數據的網絡延遲,那麼進程之間的切換,致使全部人拿到的字典都是{"count": 1},也就是每一個人都拿到了這一票。
17     if dic['count'] >0: 18         dic['count']-=1
19         time.sleep(0.2)   #模擬寫數據的網絡延遲
20         json.dump(dic,open('db','w')) 21         #最終結果致使,每一個人顯示都搶到了票,這就出現了問題~
22         print('\033[43m購票成功\033[0m') 23     else: 24         print('sorry,沒票了親!') 25 def task(lock): 26  search() 27     #由於搶票的時候是發生數據變化的時候,全部咱們將鎖加加到這裏
28  lock.acquire() 29  get() 30  lock.release() 31 if __name__ == '__main__': 32     lock = Lock() #建立一個鎖
33     for i in range(3): #模擬併發100個客戶端搶票
34         p=Process(target=task,args=(lock,)) #將鎖做爲參數傳給task函數
35  p.start() 36 
37 #看結果分析:只有一我的搶到了票
38 # 剩餘票數1
39 # 剩餘票數1
40 # 剩餘票數1
41 # 購票成功 #幸運的人兒
42 # sorry,沒票了親!
43 # sorry,沒票了親!
加鎖:購票行爲由併發變成了串行,犧牲了效率,可是保證了數據安全

 

  進程鎖的總結:

#加鎖能夠保證多個進程修改同一塊數據時,同一時間只能有一個任務能夠進行修改,即串行的修改,沒錯,速度是慢了,但犧牲了速度卻保證了數據安全。
雖然能夠用文件共享數據實現進程間通訊,但問題是:
1.效率低(共享數據基於文件,而文件是硬盤上的數據)
2.須要本身加鎖處理

#所以咱們最好找尋一種解決方案可以兼顧:一、效率高(多個進程共享一塊內存的數據)二、幫咱們處理好鎖問題。這就是mutiprocessing模塊爲咱們提供的基於消息的IPC通訊機制:隊列和管道。
隊列和管道都是將數據存放於內存中
隊列又是基於(管道+鎖)實現的,可讓咱們從複雜的鎖問題中解脫出來,
咱們應該儘可能避免使用共享數據,儘量使用消息傳遞和隊列,避免處理複雜的同步和鎖問題,並且在進程數目增多時,每每能夠得到更好的可獲展性。

IPC通訊機制(瞭解):IPC是intent-Process Communication的縮寫,含義爲進程間通訊或者跨進程通訊,是指兩個進程之間進行數據交換的過程。IPC不是某個系統所獨有的,任何一個操做系統都須要有相應的IPC機制,
好比Windows上能夠經過剪貼板、管道和郵槽等來進行進程間通訊,而Linux上能夠經過命名共享內容、信號量等來進行進程間通訊。Android它也有本身的進程間通訊方式,Android建構在Linux基礎上,繼承了一
部分Linux的通訊方式。

  

3、隊列(推薦使用)  

    進程彼此之間互相隔離,要實現進程間通訊(IPC),multiprocessing模塊支持兩種形式:對列和管道,這兩種方式都是使用消息傳遞的。

  隊列就像一個特殊的列表,可是能夠設置固定長度,而且從前面插入數據,從後面取出數據,先進先出。

Queue([maxsize])建立共享的進程隊列
參數:maxsize是隊列中容許的最大項數。若是省略此參數,則無大小限制。
底層隊列使用管道和鎖實現。

  queue的方法介紹  

q = Queue([maxsize])
建立共享的進程隊列。maxsize是隊列中容許的最大項數。若是省略此參數,則無大小限制。
底層隊列使用管道和鎖定實現。另外,還須要運行支持線程以便隊列中的數據傳輸到底層管道中。
Queue的實例q具備如下方法:

q.get([block[,timeout]])
返回q中的一個項目。若是q爲空,此方法將阻塞,直到隊列中有項目可用爲止。block用於控制阻塞行爲,默認爲True。若是設置爲False,將引起Queue.Empty異常(定義在Queue模塊中)。timeout是可選超時時間,用在阻塞模式中。若是在指定的時間間隔內沒有項目變爲可用,將引起Queue.Empty異常

q.put(item [,block[,timeout]])
將item放入隊列。若是隊列已滿,此方法將阻塞至有空間可用爲止。block用於控制阻塞行爲,默認爲True。若是設置爲False,將引起Queue.Empty異常(定義在Queue庫模塊中)
timeout指定在阻塞模式中等待可用空間的時間長短。超時後將引起Queue.Full異常。

q.get_nowait()
同q.get(False)方法。

q.qsize()
返回隊列中目前項目的正確數量。此函數的結果並不可靠,由於在返回結果和在稍後程序中使用結果之間,隊列中可能添加或刪除了項目。在某些系統上,此方法可能引起NotImplementedError異常。

q.empty() 
若是調用此方法時 q爲空,返回True。若是其餘進程或線程正在往隊列中添加項目,結果是不可靠的。也就是說,在返回和使用結果之間,隊列中可能已經加入新的項目。

q.full() 
若是q已滿,返回爲True. 因爲線程的存在,結果也多是不可靠的(參考q.empty()方法)。

  queue的其餘方法(瞭解)

q.close() 
關閉隊列,防止隊列中加入更多數據。調用此方法時,後臺線程將繼續寫入那些已入隊列但還沒有寫入的數據,但將在此方法完成時立刻關閉。若是q被垃圾收集,將自動調用此方法。關閉隊列不會在隊列使用者中生成任何類型的數據結束信號或異常。例如,若是某個使用者正被阻塞在get()操做上,關閉生產者中的隊列不會致使get()方法返回錯誤。

q.cancel_join_thread() 
不會再進程退出時自動鏈接後臺線程。這能夠防止join_thread()方法阻塞。

q.join_thread() 
鏈接隊列的後臺線程。此方法用於在調用q.close()方法後,等待全部隊列項被消耗。默認狀況下,此方法由不是q的原始建立者的全部進程調用。調用q.cancel_join_thread()方法能夠禁止這種行爲。

  queue的簡單用法

 1 from multiprocessing import Queue  2 q=Queue(3) #建立一個隊列對象,隊列長度爲3
 3 
 4 #put ,get ,put_nowait,get_nowait,full,empty
 5 q.put(3)   #往隊列中添加數據
 6 q.put(2)  7 q.put(1)  8 # q.put(4) # 若是隊列已經滿了,程序就會停在這裏,等待數據被別人取走,再將數據放入隊列。
 9            # 若是隊列中的數據一直不被取走,程序就會永遠停在這裏。
10 try: 11     q.put_nowait(4) # 可使用put_nowait,若是隊列滿了不會阻塞,可是會由於隊列滿了而報錯。
12 except: # 所以咱們能夠用一個try語句來處理這個錯誤。這樣程序不會一直阻塞下去,可是會丟掉這個消息。
13     print('隊列已經滿了') 14 
15 # 所以,咱們再放入數據以前,能夠先看一下隊列的狀態,若是已經滿了,就不繼續put了。
16 print(q.full()) #查看是否滿了,滿了返回True,不滿返回False
17 
18 print(q.get())  #取出數據
19 print(q.get()) 20 print(q.get()) 21 # print(q.get()) # 同put方法同樣,若是隊列已經空了,那麼繼續取就會出現阻塞。
22 try: 23     q.get_nowait(3) # 可使用get_nowait,若是隊列滿了不會阻塞,可是會由於沒取到值而報錯。
24 except: # 所以咱們能夠用一個try語句來處理這個錯誤。這樣程序不會一直阻塞下去。
25     print('隊列已經空了') 26 
27 print(q.empty()) #空了
queue的簡單用法

  子進程與父進程經過隊列進行通訊

 1 #看下面的隊列的時候,按照編號看註釋
 2 import time  3 from multiprocessing import Process, Queue  4 
 5 # 8. q = Queue(2) #建立一個Queue對象,若是寫在這裏,那麼在windows還子進程去執行的時候,咱們知道子進程中還會執行這個代碼,可是子進程中不可以再次建立了,也就是這個q就是你主進程中建立的那個q,經過咱們下面在主進程中先添加了一個字符串以後,在去開啓子進程,你會發現,小鬼這個字符串還在隊列中,也就是說,咱們使用的仍是主進程中建立的這個隊列。
 6 def f(q):  7     # q = Queue() #9. 咱們在主進程中開啓了一個q,若是咱們在子進程中的函數裏面再開一個q,那麼你下面q.put('姑娘,多少錢~')添加到了新建立的這q裏裏面了
 8     q.put('姑娘,多少錢~')  #4.調用主函數中p進程傳遞過來的進程參數 put函數爲向隊列中添加一條數據。
 9     # print(q.qsize()) #6.查看隊列中有多少條數據了
10 
11 def f2(q): 12     print('》》》》》》》》') 13     print(q.get())  #5.取數據
14 if __name__ == '__main__': 15     q = Queue() #1.建立一個Queue對象
16     q.put('小鬼') 17 
18     p = Process(target=f, args=(q,)) #2.建立一個進程
19     p2 = Process(target=f2, args=(q,)) #3.建立一個進程
20  p.start() 21  p2.start() 22     time.sleep(1) #7.若是阻塞一點時間,就會出現主進程運行太快,致使咱們在子進程中查看qsize爲1個。
23     # print(q.get()) #結果:小鬼
24     print(q.get()) #結果:姑娘,多少錢~
25     p.join()
子進程與父進程經過隊列進行通訊

 

4、生產者和消費者模型

 

在併發編程中使用生產者和消費者模式可以解決絕大多數併發問題。該模式經過平衡生產線程和消費線程的工做能力來提升程序的總體處理數據的速度。

    爲何要使用生產者和消費者模式

      在線程世界裏,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,若是生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產數據。一樣的道理,若是消費者的處理能力大於生產者,那麼消費者就必須等待生產者。爲了解決這個問題因而引入了生產者和消費者模式。

    什麼是生產者消費者模式

      生產者消費者模式是經過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通信,而經過阻塞隊列來進行通信,因此生產者生產完數據以後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力,而且我能夠根據生產速度和消費速度來均衡一下多少個生產者能夠爲多少個消費者提供足夠的服務,就能夠開多進程等等,而這些進程都是到阻塞隊列或者說是緩衝區中去獲取或者添加數據。

    通俗的解釋:看圖說話。。背景有點亂,等我更新~~

 

 

 1 from multiprocessing import Process,Queue  2 import time,random,os  3 def consumer(q):  4     while True:  5         res=q.get()  6         time.sleep(random.randint(1,3))  7         print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))  8 
 9 def producer(q): 10     for i in range(10): 11         time.sleep(random.randint(1,3)) 12         res='包子%s' %i 13  q.put(res) 14         print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res)) 15 
16 if __name__ == '__main__': 17     q=Queue() 18     #生產者們:即廚師們
19     p1=Process(target=producer,args=(q,)) 20 
21     #消費者們:即吃貨們
22     c1=Process(target=consumer,args=(q,)) 23 
24     #開始
25  p1.start() 26  c1.start() 27     print('')
基於隊列的生產者消費模型
 1 #生產者消費者模型總結
 2 
 3     #程序中有兩類角色
 4  一類負責生產數據(生產者)  5  一類負責處理數據(消費者)  6         
 7     #引入生產者消費者模型爲了解決的問題是:
 8  平衡生產者與消費者之間的工做能力,從而提升程序總體處理數據的速度  9         
10     #如何實現:
11         生產者<-->隊列<——>消費者 12     #生產者消費者模型實現類程序的解耦和
生產者消費者模型總結

    經過上面基於隊列的生產者消費者代碼示例,咱們發現一個問題:主進程永遠不會結束,緣由是:生產者p在生產完後就結束了,可是消費者c在取空了q以後,則一直處於死循環中且卡在q.get()這一步。

    解決方式無非是讓生產者在生產完畢後,往隊列中再發一個結束信號,這樣消費者在接收到結束信號後就能夠break出死循環

 1 from multiprocessing import Process,Queue  2 import time,random,os  3 def consumer(q):  4     while True:  5         res=q.get()  6         if res is None:break #收到結束信號則結束
 7         time.sleep(random.randint(1,3))  8         print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))  9 
10 def producer(q): 11     for i in range(5): 12         time.sleep(random.randint(1,3)) 13         res='包子%s' %i 14  q.put(res) 15         print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res)) 16     q.put(None) #在本身的子進程的最後加入一個結束信號
17 if __name__ == '__main__': 18     q=Queue() 19     #生產者們:即廚師們
20     p1=Process(target=producer,args=(q,)) 21 
22     #消費者們:即吃貨們
23     c1=Process(target=consumer,args=(q,)) 24 
25     #開始
26  p1.start() 27  c1.start() 28 
29     print('')
子進程生產者在生產完畢後發送結束信號None

注意:結束信號None,不必定要由生產者發,主進程裏一樣能夠發,但主進程須要等生產者結束後才應該發送該信號

 1 from multiprocessing import Process,Queue  2 import time,random,os  3 def consumer(q):  4     while True:  5         res=q.get()  6         if res is None:break #收到結束信號則結束
 7         time.sleep(random.randint(1,3))  8         print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))  9 
10 def producer(q): 11     for i in range(2): 12         time.sleep(random.randint(1,3)) 13         res='包子%s' %i 14  q.put(res) 15         print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res)) 16 
17 if __name__ == '__main__': 18     q=Queue() 19     #生產者們:即廚師們
20     p1=Process(target=producer,args=(q,)) 21 
22     #消費者們:即吃貨們
23     c1=Process(target=consumer,args=(q,)) 24 
25     #開始
26  p1.start() 27  c1.start() 28 
29     p1.join() #等待生產者進程結束
30     q.put(None) #發送結束信號
31     print('')
主進程在生產者生產完後發送結束信號

  

   但上述解決方式,在有多個生產者和多個消費者時,因爲隊列咱們說了是進程安全的,我一個進程拿走告終束信號,另一個進程就拿不到了,還須要多發送一個結束信號,有幾個取數據的進程就要發送幾個結束信號,咱們則須要用一個很low的方式去解決

 1 from multiprocessing import Process,Queue  2 import time,random,os  3 def consumer(q):  4     while True:  5         res=q.get()  6         if res is None:break #收到結束信號則結束
 7         time.sleep(random.randint(1,3))  8         print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))  9 
10 def producer(name,q): 11     for i in range(2): 12         time.sleep(random.randint(1,3)) 13         res='%s%s' %(name,i) 14  q.put(res) 15         print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res)) 16 
17 
18 
19 if __name__ == '__main__': 20     q=Queue() 21     #生產者們:即廚師們
22     p1=Process(target=producer,args=('包子',q)) 23     p2=Process(target=producer,args=('骨頭',q)) 24     p3=Process(target=producer,args=('泔水',q)) 25 
26     #消費者們:即吃貨們
27     c1=Process(target=consumer,args=(q,)) 28     c2=Process(target=consumer,args=(q,)) 29 
30     #開始
31  p1.start() 32  p2.start() 33  p3.start() 34  c1.start() 35 
36     p1.join() #必須保證生產者所有生產完畢,才應該發送結束信號
37  p2.join() 38  p3.join() 39     q.put(None) #有幾個消費者就應該發送幾回結束信號None
40     q.put(None) #發送結束信號
41     print('')
有多個消費者和生產者的時候須要發送屢次結束信號

 

 

其實咱們的思路無非是發送結束信號而已,有另一種隊列提供了這種機制

 JoinableQueue([maxsize]) 

 

#JoinableQueue([maxsize]):這就像是一個Queue對象,但隊列容許項目的使用者通知生成者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。

   #參數介紹:
    maxsize是隊列中容許最大項數,省略則無大小限制。    
  #方法介紹:
    JoinableQueue的實例p除了與Queue對象相同的方法以外還具備:
    q.task_done():使用者使用此方法發出信號,表示q.get()的返回項目已經被處理。若是調用此方法的次數大於從隊列中刪除項目的數量,將引起ValueError異常
    q.join():生產者調用此方法進行阻塞,直到隊列中全部的項目均被處理。阻塞將持續到隊列中的每一個項目均調用q.task_done()方法爲止,也就是隊列中的數據所有被get拿走了。

 

 1 from multiprocessing import Process,JoinableQueue  2 import time,random,os  3 def consumer(q):  4     while True:  5         res=q.get()  6         # time.sleep(random.randint(1,3))
 7  time.sleep(random.random())  8         print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))  9         q.task_done() #向q.join()發送一次信號,證實一個數據已經被取走並執行完了
10 
11 def producer(name,q): 12     for i in range(10): 13         # time.sleep(random.randint(1,3))
14  time.sleep(random.random()) 15         res='%s%s' %(name,i) 16  q.put(res) 17         print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res)) 18     print('%s生產結束'%name) 19     q.join() #生產完畢,使用此方法進行阻塞,直到隊列中全部項目均被處理。
20     print('%s生產結束~~~~~~'%name) 21 
22 if __name__ == '__main__': 23     q=JoinableQueue() 24     #生產者們:即廚師們
25     p1=Process(target=producer,args=('包子',q)) 26     p2=Process(target=producer,args=('骨頭',q)) 27     p3=Process(target=producer,args=('泔水',q)) 28 
29     #消費者們:即吃貨們
30     c1=Process(target=consumer,args=(q,)) 31     c2=Process(target=consumer,args=(q,)) 32     c1.daemon=True #若是不加守護,那麼主進程結束不了,可是加了守護以後,必須確保生產者的內容生產完而且被處理完了,全部必須還要在主進程給生產者設置join,才能確保生產者生產的任務被執行完了,而且可以確保守護進程在全部任務執行完成以後才隨着主進程的結束而結束。
33     c2.daemon=True 34 
35     #開始
36     p_l=[p1,p2,p3,c1,c2] 37     for p in p_l: 38  p.start() 39 
40     p1.join() #我要確保你的生產者進程結束了,生產者進程的結束標誌着你生產的全部的人任務都已經被處理完了
41  p2.join() 42  p3.join() 43     print('') 44     
45     # 主進程等--->p1,p2,p3等---->c1,c2
46     # p1,p2,p3結束了,證實c1,c2確定全都收完了p1,p2,p3發到隊列的數據
47     # 於是c1,c2也沒有存在的價值了,不須要繼續阻塞在進程中影響主進程了。應該隨着主進程的結束而結束,因此設置成守護進程就能夠了。
JoinableQueue隊列實現生產者消費者模型

 

 

 

 

 

 

 

 

 

 

 

 

 

同一個

相關文章
相關標籤/搜索