Python中的生產者消費者模型

---恢復內容開始---json

瞭解知識點:

一、守護進程:

·什麼是守護進程:併發

守護進程其實就是一個‘子進程’,守護即伴隨,守護進程會伴隨主進程的代碼運行完畢後而死掉app

·爲什麼用守護進程:dom

當該子進程內的代碼在父進程代碼運行完畢後就沒有存在的意義了,就應該將進程設置爲守護進程,會在父進程代碼結束後死掉ide

 

 1 from multiprocessing import Process
 2 
 3 import time,os
 4 
 5 def task(name):
 6     print('%s is running'%name)
 7     time.sleep(3)
 8 
 9 if __name__ == '__main__':
10     p1=Process(target=task,args=('守護進程',))
11     p2=Process(target=task,args=('正常的子進程',))
12     p1.daemon=True  # 必定要放到p.start()以前
13     p1.start()
14     p2.start()
15     print('')
16     
守護進程舉例

 

如下是守護進程會迷惑人的範例:ui

 1 #主進程代碼運行完畢,守護進程就會結束
 2 from multiprocessing import Process
 3 import time
 4 def foo():
 5     print(123)
 6     time.sleep(1)
 7     print("end123")
 8 
 9 def bar():
10     print(456)
11     time.sleep(3)
12     print("end456")
13 
14 if __name__ == '__main__':
15     p1=Process(target=foo)
16     p2=Process(target=bar)
17 
18     p1.daemon=True
19     p1.start()
20     p2.start()
21     print("main-------")
22 
23     '''
24     main-------
25     456
26     enn456
27     '''
28 
29 
30     '''
31     main-------
32     123
33     456
34     enn456
35     '''
36 
37     '''
38     123
39     main-------
40     456
41     end456
42     '''
View Code

二、互斥鎖:

互斥鎖:能夠將要執行任務的部分代碼(只涉及到修改共享數據的代碼)變成串行spa

join:是要執行任務的全部代碼總體串行code

強調:必須是lock.acquire()一次,而後 lock.release()釋放一次,才能繼續lock.acquire(),不能連續的lock.acquire()。否者程序停在原地。
互斥鎖vs join: 
大前提:兩者的原理都是同樣,都是將併發變成串行,從而保證有序(在多個程序共享一個資源時,爲保證有序不亂,需將併發變成串行)
區別一:join是按照人爲指定的順序執行,而互斥鎖是因此進程平等地競爭,誰先搶到誰執行
區別二:互斥鎖可讓一部分代碼(修改共享數據的代碼)串行,而join只能將代碼總體串行(詳見搶票系統)

互斥鎖
 1 from multiprocessing import Process,Lock
 2 import json
 3 import os
 4 import time
 5 import random
 6 
 7 def check():
 8     time.sleep(1) # 模擬網路延遲
 9     with open('db.txt','rt',encoding='utf-8') as f:
10         dic=json.load(f)
11     print('%s 查看到剩餘票數 [%s]' %(os.getpid(),dic['count']))
12 
13 def get():
14     with open('db.txt','rt',encoding='utf-8') as f:
15         dic=json.load(f)
16     time.sleep(2)
17     if dic['count'] > 0:
18         # 有票
19         dic['count']-=1
20         time.sleep(random.randint(1,3))
21         with open('db.txt','wt',encoding='utf-8') as f:
22             json.dump(dic,f)
23         print('%s 購票成功' %os.getpid())
24     else:
25         print('%s 沒有餘票' %os.getpid())
26 
27 
28 def task(mutex):
29     # 查票
30     check()
31 
32     #購票
33     mutex.acquire() # 互斥鎖不能連續的acquire,必須是release之後才能從新acquire
34     get()
35     mutex.release()
36 
37 
38 
39     # with mutex:
40     #     get()
41 
42 if __name__ == '__main__':
43     mutex=Lock()
44     for i in  range(10):
45         p=Process(target=task,args=(mutex,))
46         p.start()
47         # p.join()
模擬搶票

三、IPC通訊機制

進程之間通訊必須找到一種介質,該介質必須知足
一、是全部進程共享的
二、必須是內存空間
附加:幫咱們自動處理好鎖的問題
 
   
   
   
   
   a、   
   
   
   
   
   from multiprocessing import Manager(共享內存,但要本身解決鎖的問題)
   
   
   
   
   b、   
   
   
   
   
   IPC中的隊列(Queue) 共享,內存,自動處理鎖的問題(最經常使用)
   
   
   
   
   c、   
   
   
   
   
   IPC中的管道(Pipe),共享,內存,需本身解決鎖的問題

a、用Manager(瞭解知識點)blog

 1 from multiprocessing import Process,Manager,Lock
 2 import time
 3 
 4 mutex=Lock()
 5 
 6 def task(dic,lock):
 7     lock.acquire()
 8     temp=dic['num']
 9     time.sleep(0.1)
10     dic['num']=temp-1
11     lock.release()
12 
13 if __name__ == '__main__':
14     m=Manager()
15     dic=m.dict({'num':10})
16 
17     l=[]
18     for i in range(10):
19         p=Process(target=task,args=(dic,mutex))
20         l.append(p)
21         p.start()
22     for p in l:
23         p.join()
24     print(dic)
View Code

 

b、用隊列Queue隊列

1)共享的空間

2)是內存空間

3)自動幫咱們處理好鎖定問題

 1 from multiprocessing import Queue
 2 q=Queue(3)  #設置隊列中maxsize個數爲三
 3 q.put('first')
 4 q.put({'second':None})
 5 q.put('')
 6 # q.put(4)   #阻塞。不報錯,程序卡在原地等待隊列中清出一個值。默認blok=True
 7 print(q.get())
 8 print(q.get())
 9 print(q.get())
10 
11 強調:
12 1、隊列用來存成進程之間溝通的消息,數據量不該該過大
13 2、maxsize的值超過的內存限制就變得毫無心義
14                                                               
 1 瞭解:
 2 q=Queue(3)
 3 q.put('first',block=False)
 4 q.put('second',block=False)
 5 q.put('third',block=False)
 6 q.put('fourth',block=False)  #報錯 queue.Full
 7 
 8 q.put('first',block=True)
 9 q.put('second',block=True)
10 q.put('third',block=True)
11 q.put('fourth',block=True,timeout=3)  #等待3秒後若還進不去報錯。注意timeout不能和block=False連用
12 
13 q.get(block=False)
14 q.get(block=False)
15 q.get(block=False)
16 q.get(block=False)           #報錯 queue.Empty
17 
18 q.get(block=True)
19 q.get(block=True)
20 q.get(block=True)
21 q.get(block=True,timeout=2)    #等待2秒後還取不出東西則報錯。注意timeout不能和block=False連用
瞭解

四、生產者與消費者模型

該模型中包含兩類重要的角色:
一、生產者:將負責造數據的任務比喻爲生產者
二、消費者:接收生產者造出的數據來作進一步的處理,該類人物被比喻成消費者
 
實現生產者消費者模型三要素
一、生產者
二、消費者
三、隊列
何時用該模型:
程序中出現明顯的兩類任何,一類任務是負責生產,另一類任務是負責處理生產的數據的
 
該模型的好處:
一、實現了生產者與消費者解耦和
二、平衡了生產者的生產力與消費者的處理數據的能力

注意:生產者消費者模型是解決問題的思路不是技術。能夠用進程和隊列來實現,也能夠用其餘的來實現。

 1 from multiprocessing import JoinableQueue,Process
 2 import time
 3 import os
 4 import random
 5 
 6 def producer(name,food,q):
 7     for i in range(3):
 8         res='%s%s' %(food,i)
 9         time.sleep(random.randint(1,3))
10         # 往隊列裏丟
11         q.put(res)
12         print('\033[45m%s 生產了 %s\033[0m' %(name,res))
13     # q.put(None)
14 
15 def consumer(name,q):
16     while True:
17         #從隊列裏取走
18         res=q.get()
19         if res is None:break
20         time.sleep(random.randint(1,3))
21         print('\033[46m%s 吃了 %s\033[0m' %(name,res))
22         q.task_done()
23 
24 if __name__ == '__main__':
25     q=JoinableQueue()
26     # 生產者們
27     p1=Process(target=producer,args=('egon','包子',q,))
28     p2=Process(target=producer,args=('楊軍','泔水',q,))
29     p3=Process(target=producer,args=('猴老師','',q,))
30     # 消費者們
31     c1=Process(target=consumer,args=('Alex',q,))
32     c2=Process(target=consumer,args=('wupeiqidsb',q,))
33     c1.daemon=True
34     c2.daemon=True
35 
36     p1.start()
37     p2.start()
38     p3.start()
39     c1.start()
40     c2.start()
41 
42     p1.join()
43     p2.join()
44     p3.join()
45     q.join() #等待隊列被取乾淨
46     # q.join() 結束意味着
47     # 主進程的代碼運行完畢--->(生產者運行完畢)+隊列中的數據也被取乾淨了->消費者沒有存在的意義
48 
49     # print('主')
相關文章
相關標籤/搜索