---恢復內容開始---json
·什麼是守護進程:併發
守護進程其實就是一個‘子進程’,守護即伴隨,守護進程會伴隨主進程的代碼運行完畢後而死掉app
·爲什麼用守護進程:dom
當該子進程內的代碼在父進程代碼運行完畢後就沒有存在的意義了,就應該將進程設置爲守護進程,會在父進程代碼結束後死掉ide
1 from multiprocessing import Process 2 3 import time,os 4 5 def task(name): 6 print('%s is running'%name) 7 time.sleep(3) 8 9 if __name__ == '__main__': 10 p1=Process(target=task,args=('守護進程',)) 11 p2=Process(target=task,args=('正常的子進程',)) 12 p1.daemon=True # 必定要放到p.start()以前 13 p1.start() 14 p2.start() 15 print('主') 16
如下是守護進程會迷惑人的範例:ui
1 #主進程代碼運行完畢,守護進程就會結束 2 from multiprocessing import Process 3 import time 4 def foo(): 5 print(123) 6 time.sleep(1) 7 print("end123") 8 9 def bar(): 10 print(456) 11 time.sleep(3) 12 print("end456") 13 14 if __name__ == '__main__': 15 p1=Process(target=foo) 16 p2=Process(target=bar) 17 18 p1.daemon=True 19 p1.start() 20 p2.start() 21 print("main-------") 22 23 ''' 24 main------- 25 456 26 enn456 27 ''' 28 29 30 ''' 31 main------- 32 123 33 456 34 enn456 35 ''' 36 37 ''' 38 123 39 main------- 40 456 41 end456 42 '''
互斥鎖:能夠將要執行任務的部分代碼(只涉及到修改共享數據的代碼)變成串行spa
join:是要執行任務的全部代碼總體串行code
強調:必須是lock.acquire()一次,而後 lock.release()釋放一次,才能繼續lock.acquire(),不能連續的lock.acquire()。否者程序停在原地。
互斥鎖vs join:
大前提:兩者的原理都是同樣,都是將併發變成串行,從而保證有序(在多個程序共享一個資源時,爲保證有序不亂,需將併發變成串行)
區別一:join是按照人爲指定的順序執行,而互斥鎖是因此進程平等地競爭,誰先搶到誰執行
區別二:互斥鎖可讓一部分代碼(修改共享數據的代碼)串行,而join只能將代碼總體串行(詳見搶票系統)
1 from multiprocessing import Process,Lock 2 import json 3 import os 4 import time 5 import random 6 7 def check(): 8 time.sleep(1) # 模擬網路延遲 9 with open('db.txt','rt',encoding='utf-8') as f: 10 dic=json.load(f) 11 print('%s 查看到剩餘票數 [%s]' %(os.getpid(),dic['count'])) 12 13 def get(): 14 with open('db.txt','rt',encoding='utf-8') as f: 15 dic=json.load(f) 16 time.sleep(2) 17 if dic['count'] > 0: 18 # 有票 19 dic['count']-=1 20 time.sleep(random.randint(1,3)) 21 with open('db.txt','wt',encoding='utf-8') as f: 22 json.dump(dic,f) 23 print('%s 購票成功' %os.getpid()) 24 else: 25 print('%s 沒有餘票' %os.getpid()) 26 27 28 def task(mutex): 29 # 查票 30 check() 31 32 #購票 33 mutex.acquire() # 互斥鎖不能連續的acquire,必須是release之後才能從新acquire 34 get() 35 mutex.release() 36 37 38 39 # with mutex: 40 # get() 41 42 if __name__ == '__main__': 43 mutex=Lock() 44 for i in range(10): 45 p=Process(target=task,args=(mutex,)) 46 p.start() 47 # p.join()
進程之間通訊必須找到一種介質,該介質必須知足
一、是全部進程共享的
二、必須是內存空間
附加:幫咱們自動處理好鎖的問題
a、 from multiprocessing import Manager(共享內存,但要本身解決鎖的問題)
b、 IPC中的隊列(Queue) 共享,內存,自動處理鎖的問題(最經常使用)
c、 IPC中的管道(Pipe),共享,內存,需本身解決鎖的問題
a、用Manager(瞭解知識點)blog
1 from multiprocessing import Process,Manager,Lock 2 import time 3 4 mutex=Lock() 5 6 def task(dic,lock): 7 lock.acquire() 8 temp=dic['num'] 9 time.sleep(0.1) 10 dic['num']=temp-1 11 lock.release() 12 13 if __name__ == '__main__': 14 m=Manager() 15 dic=m.dict({'num':10}) 16 17 l=[] 18 for i in range(10): 19 p=Process(target=task,args=(dic,mutex)) 20 l.append(p) 21 p.start() 22 for p in l: 23 p.join() 24 print(dic)
b、用隊列Queue隊列
1)共享的空間
2)是內存空間
3)自動幫咱們處理好鎖定問題
1 from multiprocessing import Queue 2 q=Queue(3) #設置隊列中maxsize個數爲三 3 q.put('first') 4 q.put({'second':None}) 5 q.put('三') 6 # q.put(4) #阻塞。不報錯,程序卡在原地等待隊列中清出一個值。默認blok=True 7 print(q.get()) 8 print(q.get()) 9 print(q.get()) 10 11 強調: 12 1、隊列用來存成進程之間溝通的消息,數據量不該該過大 13 2、maxsize的值超過的內存限制就變得毫無心義 14
1 瞭解: 2 q=Queue(3) 3 q.put('first',block=False) 4 q.put('second',block=False) 5 q.put('third',block=False) 6 q.put('fourth',block=False) #報錯 queue.Full 7 8 q.put('first',block=True) 9 q.put('second',block=True) 10 q.put('third',block=True) 11 q.put('fourth',block=True,timeout=3) #等待3秒後若還進不去報錯。注意timeout不能和block=False連用 12 13 q.get(block=False) 14 q.get(block=False) 15 q.get(block=False) 16 q.get(block=False) #報錯 queue.Empty 17 18 q.get(block=True) 19 q.get(block=True) 20 q.get(block=True) 21 q.get(block=True,timeout=2) #等待2秒後還取不出東西則報錯。注意timeout不能和block=False連用
該模型中包含兩類重要的角色:
一、生產者:將負責造數據的任務比喻爲生產者
二、消費者:接收生產者造出的數據來作進一步的處理,該類人物被比喻成消費者
實現生產者消費者模型三要素
一、生產者
二、消費者
三、隊列
何時用該模型:
程序中出現明顯的兩類任何,一類任務是負責生產,另一類任務是負責處理生產的數據的
該模型的好處:
一、實現了生產者與消費者解耦和
二、平衡了生產者的生產力與消費者的處理數據的能力
注意:生產者消費者模型是解決問題的思路不是技術。能夠用進程和隊列來實現,也能夠用其餘的來實現。
1 from multiprocessing import JoinableQueue,Process 2 import time 3 import os 4 import random 5 6 def producer(name,food,q): 7 for i in range(3): 8 res='%s%s' %(food,i) 9 time.sleep(random.randint(1,3)) 10 # 往隊列裏丟 11 q.put(res) 12 print('\033[45m%s 生產了 %s\033[0m' %(name,res)) 13 # q.put(None) 14 15 def consumer(name,q): 16 while True: 17 #從隊列裏取走 18 res=q.get() 19 if res is None:break 20 time.sleep(random.randint(1,3)) 21 print('\033[46m%s 吃了 %s\033[0m' %(name,res)) 22 q.task_done() 23 24 if __name__ == '__main__': 25 q=JoinableQueue() 26 # 生產者們 27 p1=Process(target=producer,args=('egon','包子',q,)) 28 p2=Process(target=producer,args=('楊軍','泔水',q,)) 29 p3=Process(target=producer,args=('猴老師','翔',q,)) 30 # 消費者們 31 c1=Process(target=consumer,args=('Alex',q,)) 32 c2=Process(target=consumer,args=('wupeiqidsb',q,)) 33 c1.daemon=True 34 c2.daemon=True 35 36 p1.start() 37 p2.start() 38 p3.start() 39 c1.start() 40 c2.start() 41 42 p1.join() 43 p2.join() 44 p3.join() 45 q.join() #等待隊列被取乾淨 46 # q.join() 結束意味着 47 # 主進程的代碼運行完畢--->(生產者運行完畢)+隊列中的數據也被取乾淨了->消費者沒有存在的意義 48 49 # print('主')