multiprocessing包—Process模塊開啓多進程的兩種方式,Process的方法,守護進程
html
進程同步控制—multiprocessing.Lock multiprocessing.Semaphore multiprocessing.Eventpython
進程間通訊(IPC)— multiprocessing.Pipe multiprocessing.Queuelinux
進程間的數據共享 — multiprocessing.Manager編程
1.是運行中的程序; 2.是系統最小的資源分配單位 3.爲多個任務之間的數據安全和內存隔離作約束
multiprocessing:提到多進程就要想到它json
multiprocess不是一個模塊而是python中一個操做、管理進程的包。 之因此叫multi是取自multiple的多功能的意思,在這個包中幾乎包含了和進程有關的全部子模塊。windows
1、multiprocessing.Process開啓多進程:安全
#process方法: start,join,terminate,isalive 屬性 name,pid,daemon(True爲守護進程)服務器
#除了子進程代碼,其餘都是主進程 #注意進程的三態轉換 就緒態-(時間片)-執行態,阻塞態 #process方法: start,join,terminate,isalive 屬性 name,pid,daemon(True爲守護進程) import time from multiprocessing import Process def func(): #在子進程中執行的代碼,由於被註冊了 print('開啓一個子進程') time.sleep(1) print('證實,print1和print*是一塊兒啓動的') #其中p是一個主進程,實例化一個進程對象, # p.start()這句話爲主進程代碼。(可是它告訴系統啓動子進程,操做系統建立新進程,執行進程中的代碼) if __name__ == '__main__':#·只有在windows下開啓子進程,才須要這句話,其餘系統不用 p = Process(target=func) #註冊函數。 能夠理解爲,把func這個函數,放到一個新的py文件裏執行 #p是一個進程對象,告訴系統開啓一個子進程 p.start() #啓動子進程,子進程啓動起來後,進入就緒排隊狀態,延遲0.01仍是0.001秒看排隊長度 print('*'*5) #其中 這裏的print和p.start是一塊兒啓動的,無論有多少個print,都同步啓動 #假設若是咱們直接調用func()函數,那麼print('*'*5)則與函數此時是同步的,要等函數運行完才能執行print('*'*5) #可是此時咱們用子進程p.start告訴系統,要開啓一個子進程去執行func,而後系統就會給func一個進程id,開始運行func.同時主程序會繼續運行本身的代碼print('*'*5) # 此時程序變爲了異步了,不影響p.start後面的代碼運行,也就是說,p.start告訴系統開啓子進程調用func後,立刻運行 #print('*'*5),而後進程開啓後,就緒狀態進入了執行狀態,再運行 func。 # 此時運行func和執行print('*'*5),變爲了異步,不是同步了。這兩段代碼已是在兩條馬路上跑的兩輛車了 def f(name): #這裏是子進程代碼 print('hello', name) print('我是子進程') if __name__ == '__main__': #如下所有都是主進程代碼,包括import模塊那 p = Process(target=f, args=('bob',)) #主進程 p.start() #開啓了一個子進程,理解爲把f放到新的py裏執行,可是實際上他還有後續的主進程代碼【print('執行主進程的內容了')】要執行 time.sleep(1) #主進程的代碼 print('執行主進程的內容了') #主進程的代碼 #搭配上面的例子食用更佳。若是上面的sleep是0.1秒,就會先執行上面的print2。若是是1秒,則確定是先執行,print('執行主進程的內容了') #此時主進程的代碼,和子進程要啓動的代碼。就是異步執行
#pid processid進程id ppid :parent process id import os,time from multiprocessing import Process # print('py中的進程',os.getpid())#執行這整段代碼,這句話會運行兩次第一次指向父進程,第二次指向子進程 def func(name): print('參數是',name) print('子進程',os.getpid()) print('子進程的父進程',os.getppid()) #就是下面這一段的pid time.sleep(1) # name = input('111111111') #EOFError: EOF when reading a line print('子進程,調皮一下') if __name__ == '__main__': p = Process(target=func,args=('gkx',)) #注意傳參數時候 args=('gkx',) 逗號必定要加,參數能夠是*args,不能是**kwargs p.start() print('父進程',os.getpid()) print('父進程的父進程',os.getppid()) #這個就是pycharm的進程id
# 進程的生命週期 # 主進程 # 子進程 # 開啓了子進程的主進程 : # 主進程本身的代碼若是長,等待本身的代碼執行結束, # 子進程的執行時間長,主進程會在主進程代碼執行完畢以後等待子進程執行完畢以後 主進程才結束 #···子進程不必定要依賴運行着的父進程 #至於父進程若是關閉了 可是子進程沒運行完, #好比 控制檯 python XXX.py 運行了py文件,此時若是關閉了控制檯,py文件是否會跟着關閉? #在linux中 若是 python XXX.py & 後面跟上&則 py文件會一直在後臺運行 #因此,關閉父進程,子進程是否關閉要看是怎麼規定的
import os from multiprocessing import Process class MyProcess(Process): def __init__(self,arg1,arg2): #若是屬性都有默認值,那麼在實例化的時候能夠不傳屬性,類型函數的傳值方式 super().__init__()#父類的屬性都有默認值因此不用列出 self.arg1 = arg1 self.arg2 = arg2 def run(self): print(self.name) print(self.pid) print('arg1,arg2',self.arg1,self.arg2) # print('run參數',name) print(os.getpid()) if __name__ == '__main__': print('主進程',os.getpid()) p1 = MyProcess('1','2') # p1.run('gkx') p1.start() p2 = MyProcess('3','4') p2.start() #~~~第二種方法的條件: #1.自定義類,繼承Process類 #2.定義個名爲 run的方法,run方法中是在子進程中執行的代碼 #3.子進程的參數,經過 __init__的方法初始化,並調用父類的__init__
#注意進程間是數據隔離的,因此纔會有進程間的通訊需求,在博客的後面會提到 多線程
from multiprocessing import Process def func(): num = input('>>>') #EOFError: EOF when reading a line print(num) if __name__ == '__main__': Process(target=func).start() #子進程中不能有input #由於在pycharm中開啓多個進程,軟件幫你優化後,顯示在同一個控制檯 #可是實際上它仍是兩個進程的,感覺不到子進程的input,因此報錯 #所以,子進程是不能input 的 #不一樣於聊天,QQ用的是UDP,是客戶端和客戶端之間的通訊
(1)Process模塊的幾個方法:併發
import time,os from multiprocessing import Process def func(arg1,arg2): print('*'*arg1) # time.sleep(2) #有種效果:讓時間片輪轉一下 print('='*arg2) if __name__ == '__main__': p = Process(target=func ,args=(10,20,)) p.start() print('哈哈哈哈哈') p.join() #感知一個子進程的結束,從異步變成了同步。有點像把子進程代碼拼進主進程 print('====== 運行完了')
# 多進程代碼
# from multiprocessing import Process ,必定要在 if __name__ == '__main__':下運行
# 方法
# 進程對象.start() 開啓一個子進程
# 進程對象.join() 感知一個子進程的結束
# 進程對象.terminate() 結束一個子進程
# 進程對象.is_alive() 查看某個子進程是否還在運行
# 屬性
# 進程對象.name 進程名
# 進程對象.pid 進程號
# 進程對象.daemon 值爲True的時候,表示新的子進程是一個守護進程
# 守護進程 隨着主進程代碼的執行結束而結束
# 必定在start以前設置
(2)開啓多個子進程
使用 空列表 [p.join() for p in p_lst] 保證多個子進程運行完後,才運行主進程
import time,os from multiprocessing import Process def func(arg1,arg2): print('*'*arg1) print('!'*arg1) # time.sleep(2) #因爲是異步執行,這裏sleep2秒,有點像幾條馬路上的一個紅路燈,把全部子進程攔住,紅綠燈一消失,全部進程就開始執行 #因此開啓多個子進程,也只會等待2秒而已 print('='*arg2) if __name__ == '__main__': p_lst = [] for i in range(10): #按順序告訴系統要開啓多個子進程,可是系統並不會按順序執行!有些是執行態,有些會是就緒態 p = Process(target=func ,args=(10*i,20*i,)) p_lst.append(p) p.start() #p.join()若是在for循環里加了p.join則相似變爲了同步 [p.join() for p in p_lst] #保證print('====== 運行完了')確定在最後運行 # p.join() #因爲系統執行順序不肯定,print('====== 運行完了') 會出如今不肯定的地方 print('+++++++ 運行完了')
from multiprocessing import Process import os def func(filename,content): with open(filename,'w') as f: f.write(content*10*'*') if __name__ == '__main__': p_lst = [] for i in range(5): p = Process(target=func,args=('info%s'%i,i)) #這裏進程是同時啓動的,可是有些進入執行態,有些進入就緒態 p_lst.append(p) p.start() # p.join() i=0等待文件寫入,i=1 等待文件寫入........ [p.join() for p in p_lst] #等待你們跑步結束,以前的全部進程必須在這裏都執行完才能繼續往 下執行 #這裏, i=1,i=2,i=3,i=4,i=5是同時寫入的,所有寫入完,纔打印文件名 print([i for i in os.walk(os.getcwd())]) #在使用for生成進程的時候,進程是幾乎同時進行的,浪費的時間只是for i in range(5) 這極短的時間 #這個例子適合場景: 全部的子進程須要異步執行,可是須要全部子進程作完了的結果,返回給主進程
在用for循環的時候,確定是按順序的。可是for循環只是極短的時間,生成的子進程,此時有些進程進入就緒態,有些進入執行態,因此順序不一
(3)守護進程:
#守護進程---> 主進程結束的時候,子進程也主動結束 import time from multiprocessing import Process def func(): # 守護進程 會 隨着 主進程的代碼執行完畢 而 結束 while True: time.sleep(0.2) print('我還活着') def fun2(): print('in the func22222222222') time.sleep(8) print('func2 finished') if __name__ == '__main__': p = Process(target=func) p.daemon=True #設置子進程爲守護進程,必定要在 start方法前 設置好 p.start() p.terminate()#強制結束一個子進程 time.sleep(0.0000000000000000000001) print(p.is_alive()) #檢驗子進程是否或者。雖然上一個語句結束了,可是這裏操做系統正在回收,可是此時響應時間很是短,因此會返回True。sleep一下就是false了 p2 = Process(target=fun2) p2.start() # p2.join() i = 0 while i<5: print('我是socket server.') time.sleep(1) i +=1 # 守護進程 會 隨着 主進程的代碼執行完畢 而 結束 # 在主進程內結束一個子進程 p.terminate() # 結束一個進程不是在執行方法以後當即生效,須要一個操做系統響應的過程 # 檢驗一個進程是否活着的狀態 p.is_alive() # p.name p.pid 這個進程的名字和進程號
(4)使用多進程實現socket服務端的併發
import socket from multiprocessing import Process def server(conn): conn.send('你好'.encode('utf-8')) msg = conn.recv(1024).decode('utf-8') print(msg) conn.close() if __name__ == '__main__': sk = socket.socket() sk.bind(('127.0.0.1',8080)) sk.listen() while True: conn,addr = sk.accept() p = Process(target=server,args=(conn,)) p.start() sk.close()
import socket sk = socket.socket() sk.connect(('127.0.0.1',8080)) msg = sk.recv(1024).decode('utf-8') print(msg) ret = input('>>>> ') sk.send(ret.encode('utf8')) sk.close()
當多個進程使用同一份數據資源的時候,就會引起數據安全或順序混亂問題。
(1)multiprocessing.Lock 同步鎖/互斥鎖。使用鎖來保證數據安全:
#加鎖能夠保證多個進程修改同一塊數據時,同一時間只能有一個任務能夠進行修改,即串行的修改,沒錯,速度是慢了,但犧牲了速度卻保證了數據安全。 雖然能夠用文件共享數據實現進程間通訊,但問題是: 1.效率低(共享數據基於文件,而文件是硬盤上的數據) 2.須要本身加鎖處理 #所以咱們最好找尋一種解決方案可以兼顧:一、效率高(多個進程共享一塊內存的數據)二、幫咱們處理好鎖問題。這就是mutiprocessing模塊爲咱們提供的基於消息的IPC通訊機制:隊列和管道。 隊列和管道都是將數據存放於內存中 隊列又是基於(管道+鎖)實現的,可讓咱們從複雜的鎖問題中解脫出來, 咱們應該儘可能避免使用共享數據,儘量使用消息傳遞和隊列,避免處理複雜的同步和鎖問題,並且在進程數目增多時,每每能夠得到更好的可獲展性。
#會形成數據不安全的地方 要加鎖 lock.acquire lock.release from multiprocessing import Process,Lock import json,time def tick(): with open('ticket_num',) as f: ret = json.load(f) time.sleep(0.1) print('餘票: %s'%ret['ticket']) def buy_tick(i,lock): #with lock lock.acquire() #至關於拿鑰匙才能進門。一旦有進程拿了鑰匙,其餘進程在這裏拿不到,就會阻塞 with open('ticket_num',) as f: #阻塞到有人還鑰匙爲止 dic = json.load(f) time.sleep(0.1) if dic['ticket'] > 0: dic['ticket'] -= 1 print('\033[32m%s買到票啦\033[0m'%i) else: print('\033[31m沒有買到票%s\033[0m'%i) time.sleep(0.1) with open('ticket_num','w') as f: json.dump(dic,f) lock.release() #還鑰匙 if __name__ == '__main__': p_lst = [] for i in range(10): p = Process(target=tick) p_lst.append(p) p.start() [p.join() for p in p_lst] print('\033[34;42m開始買票了\033[0m') lock = Lock() for i in range(10): p2 = Process(target=buy_tick,args=(i,lock)) p2.start() # class MyTicket(Process): # def __init__(self,i,lock): # super().__init__() # self.i = i # self.lock = lock # # def run(self): # self.lock.acquire() # 至關於拿鑰匙才能進門。一旦有進程拿了鑰匙,其餘進程在這裏拿不到,就會阻塞 # with open('ticket_num', ) as f: # 阻塞到有人還鑰匙爲止 # dic = json.load(f) # time.sleep(0.1) # if dic['ticket'] > 0: # dic['ticket'] -= 1 # print('\033[32m%s買到票啦\033[0m' % self.i) # else: # print('\033[31m沒有買到票%s\033[0m' % self.i) # time.sleep(0.1) # with open('ticket_num', 'w') as f: # json.dump(dic, f) # self.lock.release() # 還鑰匙 # # if __name__ == '__main__': # lock = Lock() # for i in 'abcdefghijk': # p = MyTicket(i,lock) # p.start()
當不加鎖的時候,會出現多我的同時搶到1張票,這明顯是不合理的,因此須要加鎖
所謂死鎖: 是指兩個或兩個以上的進程或線程在執行過程當中,因爭奪資源而形成的一種互相等待的現象,若無外力做用,它們都將沒法推動下去。此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的進程稱爲死鎖進程,以下就是死鎖
用遞歸鎖解決:from multiprocessing import RLock (具體參見線程中的RLock)
(2)multiprocessing.Semaphore 信號量,也就是加了計數器的鎖,即同一時間只能讓指定數目的程序執行任務。如下以移動KTV舉例:
互斥鎖同時只容許一個線程更改數據,而信號量Semaphore是同時容許必定數量的線程更改數據 。
假設商場裏有4個迷你唱吧,因此同時能夠進去4我的,若是來了第五我的就要在外面等待,等到有人出來才能再進去玩。
實現:
信號量同步基於內部計數器,每調用一次acquire(),計數器減1;每調用一次release(),計數器加1.當計數器爲0時,acquire()調用被阻塞。這是迪科斯徹(Dijkstra)信號量概念P()和V()的Python實現。信號量同步機制適用於訪問像服務器這樣的有限資源。
信號量與進程池的概念很像,可是要區分開,信號量涉及到加鎖的概念
#信號量 semaphore sem = Semaphore(int) ,也是鎖的概念,只是內部加了個計數器 import time,random from multiprocessing import Semaphore from multiprocessing import Process # sem = Semaphore(2) # sem.acquire() # print('第一把鑰匙') # sem.acquire() # print('第二把鑰匙') # sem.acquire() # print('第三把鑰匙') def ktv(i,sem): sem.acquire() print('%s走進去KTV了'%i) time.sleep(random.randint(1,5)) print('%s走出來了'%i) sem.release() if __name__ == '__main__': sem = Semaphore(5) for i in range(20): p = Process(target=ktv,args=(i,sem)) p.start()
(3)multiprocessing.Event 事件:事件的阻塞與否是要看 event.wait()是True仍是False
其中 wait(time) 能夠傳參數,wait(2)表示等待2秒後,就不阻塞了
from multiprocessing import Event #代碼要執行,要獲得一個信號通知wait(event),同一時間只能由一個進程執行這段代碼lock,同一時間只能有指定數量的進程執行這段代碼semaphore #一個信號可使全部的進程都進入阻塞狀態 #也能夠控制全部的進程解除阻塞 #一個事件被建立以後,默認是阻塞狀態 e = Event() #建立了一個事件對象 print('事件默認狀態',e.is_set()) #查看一個事件的狀態,默認設置爲阻塞 e.set() #將Flag改成True,解除阻塞 print('set以後',e.is_set()) e.clear() #將Flag改成False,繼續阻塞 print('clear以後',e.is_set()) e.wait() #是依據 e.is_set()的值來決定是否阻塞 #這裏的wait是指,依據事件的狀態來決定是否在wait處阻塞 print(123456) #is_set()爲 False 那麼阻塞,默認阻塞。 # set 將Flag改成True,解除阻塞 #clear 將Flag改成False,繼續阻塞
# 事件處理的機制:全局定義了一個「Flag」,若是「Flag」值爲 False,那麼當程序執行 event.wait 方法時就會阻塞, # 若是「Flag」值爲True,那麼event.wait 方法時便再也不阻塞。 #e.wait set clear #wait要配套 set和clear使用。單單隻有 set和clear 是不阻塞的 #set和clear是告訴 wait,要不要阻塞,真正的阻塞機制仍是在 wait這
import time import random from multiprocessing import Event,Process def cars(e,i): if not e.is_set(): print('car%i在等待'%i) e.wait() # 阻塞 直到獲得一個 事件狀態變成 True 的信號 print('\033[0;32;40mcar%i經過\033[0m' % i) def light(e): while True: if e.is_set(): #若是e.is_set()真才走下面這句,可是此時 e.is_set默認是False,因此先綠燈亮了 e.clear() print('\033[31m紅燈亮了\033[0m') else: #默認false,因此先走這裏 e.set() #設置 e.is_set() 爲True ,此時有車子正在生成 print('\033[32m綠燈亮了\033[0m') time.sleep(2) if __name__ == '__main__': e = Event() traffic = Process(target=light,args=(e,)) traffic.start() for i in range(20): car = Process(target=cars, args=(e,i)) car.start() time.sleep(random.random()) #思路整理: #當事件傳入的時候,首先判斷了e.is_set()是false,而後改變 e.is_set()的狀態。此時car正在生成,等待事件爲 0-1的隨機小數 #大部分狀況這個小數是足夠大,讓e.is_set()狀態改變的,可是有時候,car隨機的小數很是小,小到e.set還在就緒態,因此有很小的概率,會默認 e.is_set()是 false #讓車輛等待和同行顯示正常的關鍵點,在 等待時間要明確
三.進程間的通訊 — IPC(Inter-Process Communication)
多進程之間,若是不經過特殊的手段,不會共享數據。即數據隔離:
#多進程之間,若是不經過特殊的手段,不會共享數據。即數據隔離 import os from multiprocessing import Process # n = 50 #由於在這個py裏,這兩行會被執行兩次 # print(n) def func(): global n n = 0 print('子pid %s'%os.getpid(),n) # n = 100 # func() # print(n) if __name__ == '__main__': n = 100 p = Process(target=func) p.start() p.join() print('父pid %s' % os.getpid(), n) #n 仍是100 說明,子進程和主進程,全局不同,即內存是隔離的
#之後工做中,通常不會直接用Queue,會使用更強大的幾個模塊: kafka:大數據的 消息中間鍵 可保留數據 rebbitmq memcache:不可保留
(1) mulitiprocessing.Queue 隊列。
隊列能夠理解爲:基於管道和鎖實現的一個數據在線程/進程之間互相通訊的容器
#最主要的四個方法 put get full(滿了返回True) empty (爲空返回True) 還有q.get_nowait() 具體見最下方 from multiprocessing import Queue,Process def produce(q): q.put('hello') print('放進去了') def consume(q): ret = q.get() print('收到了',ret) if __name__ == '__main__': q = Queue() #不設置最大值 Queue(5)最大存儲5 p = Process(target=produce,args=(q,)) p.start() c = Process(target=consume,args=(q,)) c.start()
Queue([maxsize])
建立共享的進程隊列。maxsize是隊列中容許的最大項數。若是省略此參數,則無大小限制。底層隊列使用管道和鎖定實現。另外,還須要運行支持線程以便隊列中的數據傳輸到底層管道中。
Queue的實例q具備如下方法:
q.get( [ block [ ,timeout ] ] )
返回q中的一個項目。若是q爲空,此方法將阻塞,直到隊列中有項目可用爲止。block用於控制阻塞行爲,默認爲True. 若是設置爲False,將引起Queue.Empty異常(定義在Queue模塊中)。timeout是可選超時時間,用在阻塞模式中。若是在制定的時間間隔內沒有項目變爲可用,將引起Queue.Empty異常。
q.get_nowait( )
同q.get(False)方法。
q.put(item [, block [,timeout ] ] )
將item放入隊列。若是隊列已滿,此方法將阻塞至有空間可用爲止。block控制阻塞行爲,默認爲True。若是設置爲False,將引起Queue.Empty異常(定義在Queue庫模塊中)。timeout指定在阻塞模式中等待可用空間的時間長短。超時後將引起Queue.Full異常。
q.qsize()
返回隊列中目前項目的正確數量。此函數的結果並不可靠,由於在返回結果和在稍後程序中使用結果之間,隊列中可能添加或刪除了項目。在某些系統上,此方法可能引起NotImplementedError異常。
q.empty()
若是調用此方法時 q爲空,返回True。若是其餘進程或線程正在往隊列中添加項目,結果是不可靠的。也就是說,在返回和使用結果之間,隊列中可能已經加入新的項目。
q.full()
若是q已滿,返回爲True. 因爲線程的存在,結果也多是不可靠的(參考q.empty()方法)。。
方法介紹
''' multiprocessing模塊支持進程間通訊的兩種主要形式:管道和隊列 都是基於消息傳遞實現的,可是隊列接口 ''' from multiprocessing import Queue q=Queue(3) #put ,get ,put_nowait,get_nowait,full,empty q.put(3) q.put(3) q.put(3) # q.put(3) # 若是隊列已經滿了,程序就會停在這裏,等待數據被別人取走,再將數據放入隊列。 # 若是隊列中的數據一直不被取走,程序就會永遠停在這裏。 try: q.put_nowait(3) # 可使用put_nowait,若是隊列滿了不會阻塞,可是會由於隊列滿了而報錯。 except: # 所以咱們能夠用一個try語句來處理這個錯誤。這樣程序不會一直阻塞下去,可是會丟掉這個消息。 print('隊列已經滿了') # 所以,咱們再放入數據以前,能夠先看一下隊列的狀態,若是已經滿了,就不繼續put了。 print(q.full()) #滿了 print(q.get()) print(q.get()) print(q.get()) # print(q.get()) # 同put方法同樣,若是隊列已經空了,那麼繼續取就會出現阻塞。 try: q.get_nowait(3) # 可使用get_nowait,若是隊列滿了不會阻塞,可是會由於沒取到值而報錯。 except: # 所以咱們能夠用一個try語句來處理這個錯誤。這樣程序不會一直阻塞下去。 print('隊列已經空了') print(q.empty()) #空了 單看隊列用法
隊列中最出名的模型:生產者消費者模型
在併發編程中使用生產者和消費者模式可以解決絕大多數併發問題。該模式經過平衡生產線程和消費線程的工做能力來提升程序的總體處理數據的速度。
爲何要使用生產者和消費者模式
在線程世界裏,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,若是生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產數據。一樣的道理,若是消費者的處理能力大於生產者,那麼消費者就必須等待生產者。爲了解決這個問題因而引入了生產者和消費者模式。
什麼是生產者消費者模式
生產者消費者模式是經過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通信,而經過阻塞隊列來進行通信,因此生產者生產完數據以後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力。
#首先判斷生產者是否生產完畢,不能用empty和get_nowait,由於有可能消費者消費得快,隊列被取光了,此時隊列爲空,可是生產者隨時可能再put進來 #其次,多個進程取隊列裏取值,好比此時put一個None讓進程break,可是其中一個進程拿到了None退出了,另外一個卻繼續阻塞 #因此須要put多個None
# 隊列 # 生產者消費者模型 #首先判斷生產者是否生產完畢,不能用empty和get_nowait,由於有可能消費者消費得快,隊列被取光了,此時隊列爲空,可是生產者隨時可能再put進來 #其次,多個進程取隊列裏取值,好比此時put一個None讓進程break,可是其中一個進程拿到了None退出了,另外一個卻繼續阻塞 #因此須要put多個None import time import random from multiprocessing import Process,Queue def producer(name,food,q): for i in range(10): time.sleep(random.randint(1, 3)) f = '%s生產了%s%s'%(name,food,i) q.put(f) print(f) def consumer(name,q): while True: food = q.get() if food == None: print('沒了~~~~~~') break print('\033[31m%s消費了%s\033[0m'%(name,food)) time.sleep(random.randint(1,3)) if __name__ == '__main__': q = Queue(20) p1 = Process(target=producer,args=('egg','包子',q)) p2 = Process(target=producer,args=('wusir','Cake',q)) p1.start() p2.start() c1 = Process(target=consumer,args=('alex',q)) c2 = Process(target=consumer,args=('jin',q)) c1.start() c2.start() p1.join() p2.join() q.put(None) q.put(None) # def consumer(q,name): # while True: # food = q.get() # if food is None: # print('%s獲取到了一個空'%name) # break # print('\033[31m%s消費了%s\033[0m' % (name,food)) # time.sleep(random.randint(1,3)) # # def producer(name,food,q): # for i in range(4): # time.sleep(random.randint(1,3)) # f = '%s生產了%s%s'%(name,food,i) # print(f) # q.put(f) # # if __name__ == '__main__': # q = Queue(20) # p1 = Process(target=producer,args=('Egon','包子',q)) # p2 = Process(target=producer, args=('wusir','泔水', q)) # c1 = Process(target=consumer, args=(q,'alex')) # c2 = Process(target=consumer, args=(q,'jinboss')) # p1.start() # p2.start() # c1.start() # c2.start() # p1.join() # p2.join() # q.put(None) # q.put(None)
使用multiprocessing.JoinableQueue的生產者消費者模型詳解:
JoinableQueue的實例p除了與Queue對象相同的方法以外,還具備如下方法:
q.task_done()
使用者使用此方法發出信號,表示q.get()返回的項目已經被處理。若是調用此方法的次數大於從隊列中刪除的項目數量,將引起ValueError異常。
q.join()
生產者將使用此方法進行阻塞,直到隊列中全部項目均被處理。阻塞將持續到爲隊列中的每一個項目均調用q.task_done()方法爲止。
下面的例子說明如何創建永遠運行的進程,使用和處理隊列上的項目。生產者將項目放入隊列,並等待它們被處理。
import time import random from multiprocessing import Process,JoinableQueue def consumer(q,name): while True: food = q.get() print('\033[31m%s消費了%s\033[0m' % (name,food)) time.sleep(random.randint(1,3)) q.task_done() # count - 1 每處理一個數據,就得到一個task_done記錄,最後獲取一個join信號,進行判斷,是否所有都已經task_done #能夠理解爲,join信號發送過來講我生產了20個東西,task_done對比如下記錄,若是也有20個記錄則表示處理完畢 def producer(name,food,q): for i in range(4): time.sleep(random.randint(1,3)) f = '%s生產了%s%s'%(name,food,i) print(f) q.put(f) q.join() # 阻塞 直到一個隊列中的全部數據 所有被處理完畢。本來只要生產完就行,如今要等到包子都吃完才行 #當所有生產完畢,會發送一個join信號 #隊列生產過程被拉長了,從生產完就結束,變爲了要等 q.task_done() 完才能結束 #也就是說,執行q.task_done() 都會記錄下來,直到隊列中全部的數據,都有task_done記錄 if __name__ == '__main__': q = JoinableQueue(20) p1 = Process(target=producer,args=('Egon','包子',q)) p2 = Process(target=producer, args=('wusir','泔水', q)) c1 = Process(target=consumer, args=(q,'alex')) c2 = Process(target=consumer, args=(q,'jinboss')) p1.start() p2.start() c1.daemon = True # 設置爲守護進程 主進程中的代碼執行完畢以後,子進程自動結束 c2.daemon = True c1.start() c2.start() p1.join() p2.join() # 感知一個子進程的結束 #首先經過 q.join感知consumer的結束,而後經過 p1.join感知 producer的結束。再而後經過結束主進程,來結束consumer的子進程(設置了守護進程) # 在消費者這一端: # 每次獲取一個數據 # 處理一個數據 # 發送一個記號 : 標誌一個數據被處理成功 # 在生產者這一端: # 每一次生產一個數據, # 且每一次生產的數據都放在隊列中 # 在隊列中刻上一個記號 # 當生產者所有生產完畢以後, # join信號 : 已經中止生產數據了 # 且要等待以前被刻上的記號都被消費完 # 當數據都被處理完時,join阻塞結束 # consumer 中把全部的任務消耗完 # producer 端 的 join感知到,中止阻塞 # 全部的producer進程結束 # 主進程中的p.join結束 # 主進程中代碼結束 # 守護進程(消費者的進程)結束
(2) mulitiprocessing.Pipe 管道。
Pipe:進程數據的不安全性
from multiprocessing import Process,Pipe def func(conn1,conn2): conn2.close() while True: # msg = conn1.recv() # print(msg) try: msg = conn1.recv() print(msg) except EOFError: #只有當主進程2個口都關閉,子進程關閉了其中一個口,且管道內沒數據了,纔會報錯EOFError conn1.close() #由於程序要斷定,任何一個口都不會給我發消息了 break
#隊列 管道+鎖,因此之後會比較多使用隊列 #管道 更底層的 #IPC(Inter-Process Communication)進程間通訊 #Pipe:進程數據的不安全性,在只有一個生產者和只有一個消費者時,不會有問題 #可是存在這種問題:同時有幾個進程往管道要數據,有一條數據被準備取走了,可是另外一個進程,此時又來取,就會出現問題。 #多個消費者取同一個數據 #因此要經過枷鎖解決 from multiprocessing import Process,Pipe,Lock import time import random def producer(con,pro,name,food): con.close() for i in range(6): f = '%s生產了%s%s'%(name,food,i) print(f) pro.send(f) time.sleep(0.5) pro.send(None) pro.send(None) pro.close() def consumer(con,pro,name,lock): pro.close() while True: lock.acquire() msg = con.recv() lock.release() time.sleep(0.5) if msg: print('\033[32m%s消費了%s\033[0m' % (name, msg)) else: con.close() break # try: # lock.acquire() # msg = con.recv() # lock.release() # print('\033[32m%s消費了%s\033[0m'%(name,msg)) # time.sleep(0.5) # except EOFError: #子進程,主進程的通道關閉,且管道里沒值了,就會引起EOFError # con.close # print('meile') # lock.release() # break if __name__ == '__main__': con,pro = Pipe() # lock = Lock() p1 = Process(target=producer,args=(con,pro,'egg','baozi',)) c1 = Process(target=consumer,args=(con,pro,'aaaa',lock)) c2 = Process(target=consumer,args=(con,pro,'bbbb',lock)) p1.start() c1.start() c2.start() con.close() #注意要引起 EFOError ,主進程得con和pro也要關閉 pro.close()
#Manager # 進程間數據是獨立的,能夠藉助於隊列或管道實現通訊,兩者都是基於消息傳遞的 # 雖然進程間數據獨立,但能夠經過Manager實現數據共享,事實上Manager的功能遠不止於此 # A manager returned by Manager() will support types # list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array. #可是manager的數據共享是不安全性的,同管道,可能兩個進程取了同一個數據 #加鎖解決 from multiprocessing import Manager,Process,Lock def work(d,lock): with lock: #不加鎖而操做共享的數據,確定會出現數據錯亂 d['count']-=1 if __name__ == '__main__': lock=Lock() with Manager() as m: dic=m.dict({'count':100}) p_l=[] for i in range(100): p=Process(target=work,args=(dic,lock)) p_l.append(p) p.start() for p in p_l: p.join() print(dic)