multiprocessing:html
multiprocessing
模塊容許程序員在給定機器上充分利用多個處理器。它能夠在Unix和Windows上運行。python
在中multiprocessing
,經過建立Process
對象而後調用其start()
方法來生成進程。 Process
遵循的API threading.Thread
。多進程程序的一個簡單示例:程序員
class Process(object): def __init__(self, group=None, target=None, name=None, args=(), kwargs={}): self.name = '' self.daemon = False self.authkey = None self.exitcode = None self.ident = 0 self.pid = 0 self.sentinel = None def run(self): pass def start(self): pass def terminate(self): pass def join(self, timeout=None): pass def is_alive(self): return False
from multiprocessing import Process import time,os def func(arg1,arg2): #子進程 print(arg1+'\t'+arg2) time.sleep(5) print('編號1:我是func,個人進程號:%s'%os.getpid()) print('編號2:我是func,個人父進程號:%s'%os.getppid()) if __name__ == '__main__': arg1 = 'aike' arg2 = 'cool' p = Process(target=func,args=(arg1,arg2)) #建立一個Process對象 p.start() #啓動子進程 time.sleep(3) print(p.pid) # 子進程進程號 print('編號3:start後,子進程未結束,依然鄉下執行') print('編號4:我是func的父進程,個人進程號:%s\t個人父進程號:%s'%(os.getpid(),os.getppid()))
結論:數據庫
os,getpid 獲取當前進程的進程號
os.getppid 獲取當前進程的父進程號
Process.pid 能夠獲取這個對象的進程號
Process.name 獲取這個進程對象的名字
Process能夠建立一個進程對象,target指定一個函數的內存地址,該函數去實現子進程的操做,args爲須要傳的參數(元祖類型)
當主進程開啓了子進程:
主進程本身的代碼若是執行時間長,等待本身的代碼執行結束
子進程的執行時間長,主進程會在主進程代碼執行完畢以後等待子進程執行完畢以後 主進程才結束
Process當中的幾個方法:
run:建立一個類去建立Process對象時,這個類必須繼承process,切必須實現run方法,run方法中是子進程中執行的代碼
is_alive:檢驗一個進程是否活着的狀態,當一個子進程結束,立刻進行檢驗,由於操做系統反應須要時間,在這段時間進行檢驗,可能會返回True
start:啓動子進程
join:感知一個子進程的結束,將異步的程序改成同步。只有當子進程執行結束時,主進程才繼續向下執行
terminate:結束子進程
Process當中的幾個重要屬性:
name:查看進程名字
daemon:設置是否爲守護進程
pid:查看進程號
from multiprocessing import Process import time def func(args): time.sleep(2) print(args) print('子進程執行完畢') if __name__ == '__main__': p = Process(target=func,args=('aike',)) p.start()#啓動子進程 print('我是主進程,我開始等待子進程執行完畢') p.join()#感知一個子進程的結束,將異步的程序改成同步。只有當子進程執行結束時,主進程才繼續向下執行 p.terminate()#結束子進程 time.sleep(2) print(p.is_alive())#檢驗一個進程是否活着的狀態,當一個子進程結束,立刻進行檢驗,由於操做系統反應須要時間,在這段時間檢驗,可能會返回True print('我是主進程,子進程執行結束')
建立多個子進程:
第一種方法:直接Process建立
from multiprocessing import Process import time def func(args): print('%s正在上傳'%args) time.sleep(1) print('%s上傳完畢'%args) if __name__ == '__main__': lis_p = [] for i in range(10):#建立十個子進程 p = Process(target=func,args=('aike%s'%i,)) lis_p.append(p) p.start() [p.join() for p in lis_p]#等待十個子進程結束操做後,主進程才進行下步操做,使用場景爲寫文件等 print('全部對象上傳完畢,你能夠進行訪問了')
第二種方法:經過類繼承建立
from multiprocessing import Process import time class MyProcess(Process): #必須繼承Process def __init__(self,args): super().__init__()#須要傳參時,自定義__init__方法,用super繼承父類的默認參數 self.args = args def run(self):#子進程的實現 time.sleep(1) print('我是%s'%self.args) if __name__ == '__main__': p = MyProcess(1) p.start() p2 = MyProcess(2) print('不須要等p結束,主進程繼續異步執行') p2.start()
多進程之間的數據隔離:主進程與子進程的數據不互通
from multiprocessing import Process def func(): global i i = 0 print(i) if __name__ == '__main__': i = 10 p = Process(target=func) p.start() print('主進程',i) #打印: 主進程 10 0
守護進程:
當這個子進程設置爲守護進程,它會隨着主進程的代碼執行結束而結束
from multiprocessing import Process import time def func(): # while True: time.sleep(5) print('我是func') def func2(): while True: time.sleep(1) print('我是守護進程,我還活着') if __name__ == '__main__': t1 = Process(target=func) t2 = Process(target=func2) t2.daemon = True #設置爲守護進程 t1.start() t2.start() t1.join() print('我是主進程')
進程同步控制:
進程鎖:multiprocessing.Lock
lock = multiprocessing.Lock() 建立一個鎖
lock.acquire() 獲取鎖
lock.release() 釋放鎖
誰先搶到鎖誰先執行,等到該進程執行完成後,其它進程再搶鎖執行
#模擬搶票,若是不設置鎖,因爲程序執行過快,本只有一張餘票,但會被多我的搶到 #設置鎖後,只有等釋放鎖,其餘人才能拿着鎖繼續執行下去,相似socket客戶端創建監聽的listen,只有等一我的結束後,後面人才能進去 from multiprocessing import Process from multiprocessing import Lock import time import json def ticket(args,lock): lock.acquire() with open('ticket') as f: data_ticket = json.load(f) if data_ticket['ticket'] > 0: print('%s搶票成功'%args) data_ticket['ticket'] -= 1 else: print('%s沒有多餘的票'%args) time.sleep(0.5) with open('ticket','w') as f: json.dump(data_ticket,f) lock.release() if __name__ == '__main__': lock = Lock() for i in range(10): p = Process(target=ticket,args=('aike%s'%i,lock)) p.start()
信號量:multiprocessing.Semaphore
特色:某一段代碼,同一時間,只能被n個進程執行,與進程鎖lock相似,進程鎖僅能有一個,而信號量能同時多個,一個進程結束後,另一個進程會補上,直到全部進程都被執行。
#一個洗手間只有2個位置( Semaphore(2)),restroom表明洗手間,最多同時被2個進程執行 from multiprocessing import Semaphore from multiprocessing import Process import time,random def restroom(args,s): s.acquire() print('%s進來了'%args) time.sleep(random.randint(10,30)) print('%s出去了'%args) s.release() if __name__ == '__main__': s = Semaphore(2) for i in range(5): p = Process(target=restroom,args=('aike%s'%i,s)) p.start()
事件:multiprocessing.Event
特色:經過一個信號來控制多個進程,同時執行(異步)或者阻塞(同步)
class Event(object): def is_set(self): #查看事件的狀態,返回True或Flase return False def set(self): #修改事件的狀態爲True pass def clear(self): #設置事件的狀態爲Flase pass def wait(self, timeout=None): #依據事件的狀態來決定本身是否在wait處阻塞 pass
from multiprocessing import Process from multiprocessing import Event import time import random def light(traffic_light): ''' 事件建立出來默認爲Flase,當他爲Flase狀態,改變爲True狀態,則爲綠燈,放行5秒 當他爲True狀態,改變爲Flase狀態,則爲紅燈,等待5秒 ''' while True: if traffic_light.is_set(): print('紅紅紅紅紅紅紅紅紅紅紅紅') traffic_light.clear() #事件狀態變爲Flase # print(traffic_light.is_set()) else: print('綠綠綠綠綠綠綠綠綠綠綠綠綠') traffic_light.set() #事件狀態變爲True # print(traffic_light.is_set()) time.sleep(5) def cars(car,traffic_light): ''' 檢測到事件爲True狀態,即爲綠燈,車輛通行 Flase狀態,即爲紅燈,車輛等待,程序阻塞,等待事件爲True狀態,即爲綠燈後再通行 ''' if traffic_light.is_set(): print('%s過去了'%car) else: print('%s在等待' % car) traffic_light.wait() print('%s過去了'%car) if __name__ == '__main__': traffic_light= Event() traffic = Process(target=light,args=(traffic_light,)) #建立一個紅綠燈進程 traffic.start() for i in range(20): car = Process(target=cars,args=('%s號車'%i,traffic_light)) #建立多個車輛進程 car.start() time.sleep(random.random())
隊列:multiprocessing.Queue編程
建立共享的進程隊列,Queue是多進程安全的隊列,可使用Queue實現多進程之間的數據傳遞。 json
Queue([maxsize])
建立共享的進程隊列。
參數 :maxsize是隊列中容許的最大項數。若是省略此參數,則無大小限制。
底層隊列使用管道和鎖定實現。
class Queue(object): def __init__(self, maxsize=-1): #maxsize隊列中容許的最大項數。若是省略此參數,則無大小限制。 self._maxsize = maxsize def qsize(self): #返回隊列中目前項目的正確數量。 return 0 def empty(self): #隊列是否爲空,空返回True return False def full(self): #隊列是否滿,滿了返回True return False def put(self, obj, block=True, timeout=None): #添加obj對象至隊列 pass def put_nowait(self, obj): # pass def get(self, block=True, timeout=None): #返回elf中的一個項目 pass def get_nowait(self): pass def close(self): #關閉隊列,防止隊列中加入更多數據。 pass def join_thread(self): #鏈接隊列的後臺線程。 pass def cancel_join_thread(self): #不會再進程退出時自動鏈接後臺線程。這能夠防止join_thread()方法阻塞。 pass
Queue([maxsize])
建立共享的進程隊列。maxsize是隊列中容許的最大項數。若是省略此參數,則無大小限制。底層隊列使用管道和鎖定實現。另外,還須要運行支持線程以便隊列中的數據傳輸到底層管道中。
Queue的實例q具備如下方法:
q.get( [ block [ ,timeout ] ] )
返回q中的一個項目。若是q爲空,此方法將阻塞,直到隊列中有項目可用爲止。block用於控制阻塞行爲,默認爲True. 若是設置爲False,將引起Queue.Empty異常(定義在Queue模塊中)。timeout是可選超時時間,用在阻塞模式中。若是在制定的時間間隔內沒有項目變爲可用,將引起Queue.Empty異常。
q.get_nowait( )
同q.get(False)方法。
q.put(item [, block [,timeout ] ] )
將item放入隊列。若是隊列已滿,此方法將阻塞至有空間可用爲止。block控制阻塞行爲,默認爲True。若是設置爲False,將引起Queue.Empty異常(定義在Queue庫模塊中)。timeout指定在阻塞模式中等待可用空間的時間長短。超時後將引起Queue.Full異常。
q.qsize()
返回隊列中目前項目的正確數量。此函數的結果並不可靠,由於在返回結果和在稍後程序中使用結果之間,隊列中可能添加或刪除了項目。在某些系統上,此方法可能引起NotImplementedError異常。
q.empty()
若是調用此方法時 q爲空,返回True。若是其餘進程或線程正在往隊列中添加項目,結果是不可靠的。也就是說,在返回和使用結果之間,隊列中可能已經加入新的項目。
q.full()
若是q已滿,返回爲True. 因爲線程的存在,結果也多是不可靠的(參考q.empty()方法)。。
q.close()
關閉隊列,防止隊列中加入更多數據。調用此方法時,後臺線程將繼續寫入那些已入隊列但還沒有寫入的數據,但將在此方法完成時立刻關閉。若是q被垃圾收集,將自動調用此方法。關閉隊列不會在隊列使用者中生成任何類型的數據結束信號或異常。例如,若是某個使用者正被阻塞在get()操做上,關閉生產者中的隊列不會致使get()方法返回錯誤。
q.cancel_join_thread()
不會再進程退出時自動鏈接後臺線程。這能夠防止join_thread()方法阻塞。
q.join_thread()
鏈接隊列的後臺線程。此方法用於在調用q.close()方法後,等待全部隊列項被消耗。默認狀況下,此方法由不是q的原始建立者的全部進程調用。調用q.cancel_join_thread()方法能夠禁止這種行爲。
摘自:老男孩女神Eva_j
q = Queue(3) # 最大存3個對象 q.put('aike') q.put('chenfan') q.put('xiaxiang') print(q.full()) # 隊列是否滿 print(q.empty()) # 隊列是否空 # q.put('dingchao') # 當隊列已經滿了再繼續添加,程序默認會阻塞,直至有數據被取出 # q.put('aike',block=False)#能夠設置爲不阻塞,即block = False,可是隊列滿了會報錯 try: q.put_nowait(3) # 可使用put_nowait,若是隊列滿了不會阻塞,與q.put(block=False)同樣,會由於隊列滿了而報錯。 except: # 所以咱們能夠用一個try語句來處理這個錯誤。這樣程序不會一直阻塞下去,可是會丟掉這個消息。 print('隊列已經滿了')
q = Queue(3) # 最大存3個對象 q.put('aike') q.put('chenfan') q.put('xiaxiang') q.get() q.get() q.get() # q.get() #當隊列的值已經所有被取出後繼續取值,程序默認會阻塞,直至有新的值進入 # q.get(block = False) #能夠設置爲不阻塞,即block = False,可是沒取到值會報錯 try: q.get_nowait(3) # 可使用get_nowait,若是隊列滿了不會阻塞,與q.get(block = False)同樣,會由於沒取到值而報錯。 except: # 所以咱們能夠用一個try語句來處理這個錯誤。這樣程序不會一直阻塞下去。 print('隊列已經空了')
from multiprocessing import Process from multiprocessing import Queue def func1(q): q.put('aike') q.put('chenfan') q.put('xiaxiang') def func2(q): print(q.get()) #aike print(q.get())#chenfan if __name__ == '__main__': q = Queue(3) #最大存3個對象 p1 = Process(target=func1,args=(q,)) p1.start() p2 = Process(target=func2,args=(q,)) p2.start()
生產者消費者模型:數組
在併發編程中使用生產者和消費者模式可以解決絕大多數併發問題。該模式經過平衡生產線程和消費線程的工做能力來提升程序的總體處理數據的速度。安全
爲何要使用生產者和消費者模型:多線程
在線程世界裏,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,若是生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產數據。一樣的道理,若是消費者的處理能力大於生產者,那麼消費者就必須等待生產者。爲了解決這個問題因而引入了生產者和消費者模式。併發
什麼是生產者消費者模型:
生產者消費者模式是經過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通信,而經過阻塞隊列來進行通信,因此生產者生產完數據以後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力。
from multiprocessing import Queue from multiprocessing import Process import time def producer(name,food,q): for i in range(20): # time.sleep(1) q.put((food+str(i))) print('%s生產了%s%s' % (name, i, food)) def consumer(name,q): while True: time.sleep(0.2) food = q.get() if food == None: print('沒有了') break print('%s買下了%s'%(name,food)) if __name__ == '__main__': q = Queue(20) p = Process(target=producer,args=('金早綠點','包子',q)) p.start() c = Process(target=consumer,args=('aike',q)) c.start() p.join() q.put(None)
基於隊列實現的生產者與消費者模型的升級版:multiprocessing.JoinableQueue
建立可鏈接的共享進程隊列。這就像是一個Queue對象,但隊列容許項目的使用者通知生產者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。
JoinableQueue的實例p除了與Queue對象相同的方法以外,還具備如下方法:
q.task_done()
使用者使用此方法發出信號,表示q.get()返回的項目已經被處理。若是調用此方法的次數大於從隊列中刪除的項目數量,將引起ValueError異常。
q.join()
生產者將使用此方法進行阻塞,直到隊列中全部項目均被處理。阻塞將持續到爲隊列中的每一個項目均調用q.task_done()方法爲止。
下面的例子說明如何創建永遠運行的進程,使用和處理隊列上的項目。生產者將項目放入隊列,並等待它們被處理。
from multiprocessing import JoinableQueue from multiprocessing import Process import time def producer(name,food,q): for i in range(25): # time.sleep(1) q.put((food+str(i))) print('%s生產了%s%s' % (name, i, food)) q.join() #阻塞,直到隊列中全部項目均被處理 def consumer(name,q): while True: time.sleep(0.1) food = q.get() print('%s買下了%s'%(name,food)) q.task_done() #每次get後須要調用task_done,直到隊列全部數據都task_done,join才取消阻塞,若是不調用,join會認爲項目沒被處理,join會一直阻塞 if __name__ == '__main__': q = JoinableQueue(20)#最多存放20個對象 p = Process(target=producer,args=('金早綠點','包子',q)) p.start() c = Process(target=consumer,args=('aike',q)) c.daemon = True #設置爲守護進程,隨着主進程的代碼結束而結束 c.start() p.join() #阻塞,直到生產者結束 print('代碼結束,守護進程結束')
管道:multiprocessing.Pipe
#建立管道的類: Pipe([duplex]):在進程之間建立一條管道,並返回元組(conn1,conn2),其中conn1,conn2表示管道兩端的鏈接對象,強調一點:必須在產生Process對象以前產生管道 #參數介紹: dumplex:默認管道是全雙工的,若是將duplex射成False,conn1只能用於接收,conn2只能用於發送。 #主要方法: conn1.recv():接收conn2.send(obj)發送的對象。若是沒有消息可接收,recv方法會一直阻塞。若是鏈接的另一端已經關閉,那麼recv方法會拋出EOFError。 conn1.send(obj):經過鏈接發送對象。obj是與序列化兼容的任意對象 #其餘方法: conn1.close():關閉鏈接。若是conn1被垃圾回收,將自動調用此方法 conn1.fileno():返回鏈接使用的整數文件描述符 conn1.poll([timeout]):若是鏈接上的數據可用,返回True。timeout指定等待的最長時限。若是省略此參數,方法將當即返回結果。若是將timeout射成None,操做將無限期地等待數據到達。 conn1.recv_bytes([maxlength]):接收c.send_bytes()方法發送的一條完整的字節消息。maxlength指定要接收的最大字節數。若是進入的消息,超過了這個最大值,將引起IOError異常,而且在鏈接上沒法進行進一步讀取。若是鏈接的另一端已經關閉,不再存在任何數據,將引起EOFError異常。 conn.send_bytes(buffer [, offset [, size]]):經過鏈接發送字節數據緩衝區,buffer是支持緩衝區接口的任意對象,offset是緩衝區中的字節偏移量,而size是要發送字節數。結果數據以單條消息的形式發出,而後調用c.recv_bytes()函數進行接收 conn1.recv_bytes_into(buffer [, offset]):接收一條完整的字節消息,並把它保存在buffer對象中,該對象支持可寫入的緩衝區接口(即bytearray對象或相似的對象)。offset指定緩衝區中放置消息處的字節位移。返回值是收到的字節數。若是消息長度大於可用的緩衝區空間,將引起BufferTooShort異常。
from multiprocessing import Pipe,Process def func2(conn,conn2): conn.close() while True: try: ret = conn2.recv() print(ret) except EOFError: #當全部的管口都被關閉,且繼續取值,會拋出異常 print('沒有值了') conn2.close() break if __name__ == '__main__': conn,conn2 = Pipe() p2 = Process(target=func2,args=(conn,conn2)) p2.start() conn2.close() conn.send('aike') conn.close()
注意:管道口的管理問題

管道口的正確管理問題。若是是生產者或消費者中都沒有使用管道的某個端點,就應將它關閉。
這也說明了爲什麼在生產者中關閉了管道的輸出端,在消費者中關閉管道的輸入端。
若是忘記執行這些步驟,程序可能在消費者中的recv()操做上掛起。
管道是由操做系統進行引用計數的,必須在全部進程中關閉管道後才能生成EOFError異常。
所以,在生產者中關閉管道不會有任何效果,除非消費者也關閉了相同的管道端點。
from multiprocessing import Pipe,Process,Lock import time def producer(food,conn,conn2): conn2.close() for i in range(50): print('生產了%s%s' % (food, i)) conn.send('%s%s'%(food,i)) conn.close() def consumer(name,conn,conn2): conn.close() while True: time.sleep(0.5) try: ret = conn2.recv() print('%s消費了%s'%(name,ret)) except EOFError: print('沒有包子了') conn2.close() break if __name__ == '__main__': conn,conn2 = Pipe() pro = Process(target=producer,args=('包子',conn,conn2))#生產者 pro.start() sumer_lis = [] for i in range(5): sumer = Process(target=consumer, args=('aike%s'%i, conn, conn2,)) # 消費者 sumer_lis.append(sumer) sumer.start() # sumer = Process(target=consumer,args=('aike',conn,conn2))#消費者 # sumer.start() # sumer1 = Process(target=consumer,args=('chenfan',conn,conn2))#消費者 # sumer1.start() conn.close() conn2.close()
管道存在的數據安全問題: 數據訪問請求太快,未被系統反應過來,同一數據被多個進程請求訪問。解決辦法是加鎖,但加上鎖後,數據雖是輪流訪問,可是管道內的全部數據被訪問完後,程序會被阻塞。
from multiprocessing import Pipe,Process,Lock import time def producer(food,conn,conn2): conn2.close() for i in range(50): print('生產了%s%s' % (food, i)) conn.send('%s%s'%(food,i)) conn.send(None)#用於退出消費者循環,不然程序一直阻塞 conn.send(None) conn.send(None) conn.send(None) conn.send(None) conn.close() def consumer(name,conn,conn2,lock): conn.close() while True: time.sleep(0.5) try: lock.acquire() ret = conn2.recv() lock.release() if ret: print('%s消費了%s'%(name,ret)) else: conn2.close() break except EOFError: print('沒有包子了') conn2.close() break if __name__ == '__main__': lock = Lock() conn,conn2 = Pipe() pro = Process(target=producer,args=('包子',conn,conn2))#生產者 pro.start() sumer_lis = [] for i in range(5): sumer = Process(target=consumer, args=('aike%s'%i, conn, conn2,lock)) # 消費者 sumer_lis.append(sumer) sumer.start() # sumer = Process(target=consumer,args=('aike',conn,conn2))#消費者 # sumer.start() # sumer1 = Process(target=consumer,args=('chenfan',conn,conn2))#消費者 # sumer1.start() conn.close() conn2.close()
數據共享:multiprocessing.Manager
進程間數據是獨立的,能夠藉助於隊列或管道實現通訊,兩者都是基於消息傳遞的 雖然進程間數據獨立,但能夠經過Manager實現數據共享,事實上Manager的功能遠不止於此 A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies. A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array.
from multiprocessing import Manager,Process,Lock def func(dic,lock): lock.acquire() dic['int'] -= 1 lock.release() if __name__ == '__main__': lock = Lock() m = Manager() dic = m.dict({'int':100}) p_lis = [] for i in range(20): p = Process(target=func,args=(dic,lock)) p_lis.append(p) p.start() for p in p_lis:p.join() print(dic)
注意:
展望將來,基於消息傳遞的併發編程是大勢所趨
即使是使用線程,推薦作法也是將程序設計爲大量獨立的線程集合,經過消息隊列交換數據。
這樣極大地減小了對使用鎖定和其餘同步手段的需求,還能夠擴展到分佈式系統中。
但進程間應該儘可能避免通訊,即使須要通訊,也應該選擇進程安全的工具來避免加鎖帶來的問題。
之後咱們會嘗試使用數據庫來解決如今進程之間的數據共享問題。
一個陷阱:當共享數據類型數一個字典嵌套列表,或者列表嵌套字典等,在子進程修改數據時直接經過下標修改是無效的
from multiprocessing import Manager,Process,Lock def func(lis): lis[0]['int'] -= 1 if __name__ == '__main__': m = Manager() lis = m.list([{'int':100},50]) #列表嵌套字典 p_lis = [] for i in range(20): p = Process(target=func,args=(lis,)) p_lis.append(p) p.start() for p in p_lis:p.join() print(lis) #打印: [{'int': 100}, 50]
解決辦法: 觸發__setitem__方法
from multiprocessing import Manager,Process,Lock def func(lis): ''' 觸發__setitem__方法 ''' num = lis[0] #{'int': 100} num['int'] -= 1 #99 lis[0] = num #{'int': 100} = {'int': 99} if __name__ == '__main__': m = Manager() lis = m.list([{'int':100},50]) #列表嵌套字典 p_lis = [] for i in range(20): p = Process(target=func,args=(lis,)) p_lis.append(p) p.start() for p in p_lis:p.join() print(lis)
緣由:
以上代碼中讓人困惑的操做的目的是繞過Manager的一個隱祕問題,這個問題是指:Manager對象沒法監測到它引用的可變對象值的修改,須要經過觸發__setitem__方法來讓它得到通知
代碼中lis[0] = num這行代碼就是用來故意觸發proxy對象的__setitem__方法的,關於這個問題Python官方文檔解釋以下:
If standard (non-proxy) list or dict objects are contained in a referent, modifications to those mutable values will not be propagated through the manager because the proxy has no way of knowing when the values contained within are modified. However, storing a value in a container proxy (which triggers a__setitem__on the proxy object) does propagate through the manager and so to effectively modify such an item, one could re-assign the modified value to the container proxy.
進程池和multiprocess.Pool模塊:
爲何要有進程池,進程池的概念:
在程序實際處理問題過程當中,忙時會有成千上萬的任務須要被執行,閒時可能只有零星任務。那麼在成千上萬個任務須要被執行的時候,咱們就須要去建立成千上萬個進程麼?首先,建立進程須要消耗時間,銷燬進程也須要消耗時間。第二即使開啓了成千上萬的進程,操做系統也不能讓他們同時執行,這樣反而會影響程序的效率。所以咱們不能無限制的根據任務開啓或者結束進程。那麼咱們要怎麼作呢?
在這裏,要給你們介紹一個進程池的概念,定義一個池子,在裏面放上固定數量的進程,有需求來了,就拿一個池中的進程來處理任務,等處處理完畢,進程並不關閉,而是將進程再放回進程池中繼續等待任務。若是有不少任務須要執行,池中的進程數量不夠,任務就要等待以前的進程執行任務完畢歸來,拿到空閒進程才能繼續執行。也就是說,池中進程的數量是固定的,那麼同一時間最多有固定數量的進程在運行。這樣不會增長操做系統的調度難度,還節省了開閉進程的時間,也必定程度上可以實現併發效果。
multiprocess.Pool模塊:
1 numprocess:要建立的進程數,若是省略,將默認使用cpu_count()的值 2 initializer:是每一個工做進程啓動時要執行的可調用對象,默認爲None 3 initargs:是要傳給initializer的參數組
1 p.apply(func [, args [, kwargs]]):在一個池工做進程中執行func(*args,**kwargs),而後返回結果。 2 '''須要強調的是:此操做並不會在全部池工做進程中並執行func函數。若是要經過不一樣參數併發地執行func函數,必須從不一樣線程調用p.apply()函數或者使用p.apply_async()''' 3 4 p.apply_async(func [, args [, kwargs]]):在一個池工做進程中執行func(*args,**kwargs),而後返回結果。 5 '''此方法的結果是AsyncResult類的實例,callback是可調用對象,接收輸入參數。當func的結果變爲可用時,將理解傳遞給callback。callback禁止執行任何阻塞操做,不然將接收其餘異步操做中的結果。''' 6 7 p.close():關閉進程池,防止進一步操做。若是全部操做持續掛起,它們將在工做進程終止前完成 8 9 P.jion():等待全部工做進程退出。此方法只能在close()或teminate()以後調用
1 方法apply_async()和map_async()的返回值是AsyncResul的實例obj。實例具備如下方法 2 obj.get():返回結果,若是有必要則等待結果到達。timeout是可選的。若是在指定時間內尚未到達,將引起一場。若是遠程操做中引起了異常,它將在調用此方法時再次被引起。 3 obj.ready():若是調用完成,返回True 4 obj.successful():若是調用完成且沒有引起異常,返回True,若是在結果就緒以前調用此方法,引起異常 5 obj.wait([timeout]):等待結果變爲可用。 6 obj.terminate():當即終止全部工做進程,同時不執行任何清理或結束任何掛起工做。若是p被垃圾回收,將自動調用此函數
代碼實例:
from multiprocessing import Pool,Process import time def func(args): # time.sleep(1) args += 1 return args #進程池擁有返回值 def func2(args): args += 1 if __name__ == '__main__': #建立一個進程池 pool = Pool(5) start = time.time() p = pool.map(func,range(10)) print(time.time() - start)#0.3028266429901123 # print(p) # 建立多個進程 p_lis = [] for i in range(10): start = time.time() p = Process(target=func2,args=(i,)) p_lis.append(p) p.start() for p in p_lis:p.join() print(time.time() - start)#0.8240413665771484
apply方法的同步和異步:
同步:apply
from multiprocessing import Pool,Process import time,os def func(args): print('進程號:%s'%os.getpid()) args += 1 time.sleep(2) return args if __name__ == '__main__': pool = Pool(5) #建立1個進程池,進程池有5個進程 lis = [] for i in range(10): p = pool.apply(func,args=(i,))# 同步調用,直到本次任務執行完畢拿到返回值p,等待任務func執行的過程當中可能有阻塞也可能沒有阻塞, # 但無論該任務是否存在阻塞,同步調用都會在原地等着 lis.append(p) print(lis) pool.close() pool.join() print('進程池任務爲空,關閉進程池')
異步:apply_async
from multiprocessing import Pool,Process import time,os def func(args): print('進程號:%s' % os.getpid()) args += 1 time.sleep(1) return args if __name__ == '__main__': pool = Pool(5) # lis = [] for i in range(10): p = pool.apply_async(func,args=(i,))# 異步運行,根據進程池中有的進程數,每次最多5個子進程在異步執行 # 返回結果以後,將結果放入列表,歸還進程,以後再執行新的任務 # 須要注意的是,進程池中的三個進程不會同時開啓或者同時結束 # 而是執行完一個就釋放一個進程,這個進程就去接收新的任務。 # 異步apply_async用法:若是使用異步提交的任務,主進程須要使用jion,等待進程池內任務都處理完,而後能夠用get收集結果 # 不然,主進程結束,進程池可能還沒來得及執行,也就跟着一塊兒結束了 # lis.append(p.get()) #get自帶阻塞,等收到返回值纔會進行下個進程任務,# # 使用get來獲取apply_aync的結果,若是是apply,則沒有get方法,由於apply是同步執行,馬上獲取結果,也根本無需get # print(lis) pool.close() pool.join() print('進程池任務爲空,關閉進程池')
進程池版本socket併發:
import socket,os from multiprocessing import Pool,Process def chat(conn): while True: ret = conn.recv(1024).decode('utf-8') print('進程號%s發送消息:'%os.getpid(),ret) # info = input('請輸入消息:').encode('utf-8')#子進程當中的IO操做,不會在pycharm當中反映。 conn.send('收到'.encode('utf-8')) if __name__ == '__main__': sk = socket.socket() sk.bind(('127.0.0.1', 8080)) sk.listen() socket_pool = Pool(5) while True: conn,addr = sk.accept() socket_pool.apply_async(chat,args=(conn,))
import socket sk = socket.socket() sk.connect(('127.0.0.1',8080)) while True: msg = input('請輸入消息:').encode('utf-8') sk.send(msg) if msg == b'q': break ret = sk.recv(1024).decode('utf-8') print(ret) sk.close()
發現:併發開啓多個客戶端,服務端同一時間只有4個不一樣的pid,只能結束一個客戶端,另一個客戶端纔會進來.
進程池能夠有返回值:
from multiprocessing import Pool def func(args): args += 1 print(args) return args if __name__ == '__main__': pool = Pool(5) ret = pool.map(func,range(20))#map是異步的,根據進程池大小同時運行限定的進程 # 結果是func的返回值,返回一個存放計算結果的列表 print(ret) #[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20]
from multiprocessing import Pool import time def func(args): time.sleep(1) args += 1 return args if __name__ == '__main__': pool = Pool(5) for i in range(20): ret = pool.apply(func,args=(i,))#map是同步的,自帶join和close方法,等待一個進程結束才進行下個進程任務 # 結果是func的返回值 print(ret) #1
from multiprocessing import Pool import time def func(args): time.sleep(1) args += 1 return args if __name__ == '__main__': pool = Pool(5) for i in range(20): ret = pool.apply_async(func,args=(i,))#apply_async是同步的,須要主動使用join和close方法,才能關閉進程池。 # 結果是一個存放func返回值的對象,須要調用get方法才能獲取值, #而get自帶阻塞,至關於join,當你一條一條獲取值時,會變成同步,由於須要等待計算完畢的返回值 print(ret.get()) pool.close()
進程池的回調函數:
須要回調函數的場景:進程池中任何一個任務一旦處理完了,就當即告知主進程:我好了額,你能夠處理個人結果了。主進程則調用一個函數去處理該結果,該函數即回調函數
咱們能夠把耗時間(阻塞)的任務放到進程池中,而後指定回調函數(主進程負責執行),這樣主進程在執行回調函數時就省去了I/O的過程,直接拿到的是任務的結果。
from multiprocessing import Pool import os def func(args): args += 1 return args def func2(args2): #接收子進程func的返回值 args2 += 2 print('個人父進程號%s'%os.getppid()) print('個人進程號號%s'%os.getpid())#916,回調函數屬於主進程 print(args2) if __name__ == '__main__': pool = Pool(5) for i in range(10): p = pool.apply_async(func,args=(i,),callback=func2)#callback指定回調函數,將子進程的返回值返回給回調函數 pool.close() pool.join() print('主進程號%s'%os.getpid())#916
爬蟲示例:暫時跳過