管道:能夠互相通訊、數據共享,但容易出現數據搶佔問題,能夠加鎖解決。python
進程池:每開啓進程,開啓屬於這個進程的內存空間;能提高計算機的效率,進程過多 操做系統的調度;安全
一、初識管道,能夠互相通訊。app
# 一、初識管道,能夠互相通訊。 from multiprocessing import Pipe conn1, conn2 = Pipe() conn1.send('123') print(conn2.recv()) conn2.send('321') print(conn1.recv())
二、管道實現:生產者/消費者模型。dom
可是這裏可能會有個問題,消費者可能同時拿同一個數據,那怎樣纔好呢?(接收的時候上鎖:lock)異步
因此管道 + 鎖來控制操做管道的行爲 來避免進程之間爭搶數據形成的數據不安全現象。async
# 二、管道實現:生產者/消費者模型 from multiprocessing import Pipe,Process import time,random def consumer(con,pro,name): pro.close() while 1: try: m=con.recv() print('%s使用了 %s' % (name,m)) time.sleep(random.random()) except EOFError: con.close() break def producer(con,pro,name,mask): con.close() for i in range(1,6): time.sleep(random.random()) m='%s生產了%s %s'%(name,mask,i) print(m) pro.send(m) pro.close() if __name__ == '__main__': con,pro=Pipe() p=Process(target=producer,args=(con,pro,'大廠','N95 ')) p.start() c=Process(target=consumer,args=(con,pro,'A企業')) c.start() c1=Process(target=consumer,args=(con,pro,'B企業')) c1.start() con.close() pro.close()
三、Manager,數據共享,但不安全的,進程之間也會搶佔資源。函數
但能夠加鎖進行約束解決。學習
# 三、Manager,數據共享不安全的,但會進程之間搶佔資源。 # 但能夠加鎖約束解決 from multiprocessing import Manager,Process,Lock def func(dic,lock): # lock.acquire() dic['count']+=1 # lock.release() if __name__ == '__main__': lock=Lock() m=Manager() dic=m.dict({'count':0}) p_lst=[] for i in range(50): p=Process(target=func,args=(dic,lock)) p.start() p_lst.append(p) for i in p_lst:i.join() print('主進程:%s'%dic)
加鎖後:數值一直是準確的。ui
一、進程池與多進程效率對比:spa
一樣是執行100個進程,進程池每次處理5個,而多進程for循環處理。結果進程池效率賽過多進程。
from multiprocessing import Pool,Process import time def func(i): print(i) if __name__ == '__main__': # 進程池的效果 st=time.time() pool=Pool(5) # 池子可放5個(通常CPU的個數+1) pool.map(func,range(100)) # 100個進程任務 t1=time.time()-st st=time.time() print('進程池時間:',t1) # 原來多進程的效果 p_lst=[] for i in range(100): p=Process(target=func,args=(i,)) p_lst.append(p) p.start() for p in p_lst:p.join() t2=time.time()-st print('多進程時間:',t2)
二、進程池傳多個參數:
# 二、進程池傳多個參數: from multiprocessing import Pool import time def func(i): print(i) if __name__ == '__main__': st=time.time() pool=Pool(5) pool.map(func,[(10,'name','age'),100])
三、apply:同步
# 三、apply:同步 from multiprocessing import Pool import time def func(): print('--開始~') time.sleep(0.1) print('==結束!'+'\n') if __name__ == '__main__': pool=Pool() for i in range(5): pool.apply(func) # 同步了
四、apply_async:異步
配合close()、join()進行使用。有沒有發現進程池中的pid有重複的?那是由於進程池有固定的N個進程,因此不會變。
# 四、apply_async:異步 from multiprocessing import Pool import time,os def func(i): pid=os.getpid() print('%s--開始~:%s'%(i,pid)) time.sleep(1) if __name__ == '__main__': pool=Pool(2) for i in range(5): pool.apply_async(func,(i,)) pool.close() # 結束進程池接收任務 pool.join() # 感知進程池中的任務執行結束
五、進程池的返回值:
# 五、進程池的返回值 # apply:直接接收返回值 # apply_async:須要get(),會堵塞因等待返回值,解決可先放列表get()。 from multiprocessing import Pool import time def func(i): time.sleep(0.5) return i+1 if __name__ == '__main__': p=Pool(3) p_lst=[] # for i in range(10): # res=p.apply(func,args=(i,)) # 直接接收返回值 # print(res) # res=p.apply_async(func,args=(i,)) # apply_async # p_lst.append(res) # for i in p_lst:print(i.get()) res=p.map(func,range(10)) # map一次性返回 print(res)
①apply同步返回值:
②apply_async返回值:
因3個線程,因此每次打印3個信息。
③map返回值:
六、進程池回調函數:
# 六、進程池回調函數 # 先執行異步函數func,將func返回值傳入func1函數中的ii參數。 # 回調函數是在主進程中執行,而不是子進程中。 from multiprocessing import Pool def func(i): return i def func1(ii): print('i+1=',ii+1) if __name__ == '__main__': p=Pool() for i in range(5): p.apply_async(func,args=(i,),callback=func1) p.close() p.join()
小結:
歡迎來你們QQ交流羣一塊兒學習:482713805