管道安全
主進程將管道的兩端都傳送給子進程,子進程和主進程共用管道的兩種報錯狀況,都是在recv接收的時候報錯的:併發
1.主進程和子進程中的管道的相同一端都關閉了,出現EOFError;app
2.若是你管道的一端在主進程和子進程中都關閉了,可是你還用這個關閉的一端去接收消息,那麼就會出現OSError;異步
因此你關閉管道的時候,就容易出現問題,須要將全部只用這個管道的進程中的兩端所有關閉才行。固然也能夠經過異常捕獲(try:except EOFerror)來處理。async
雖然咱們在主進程和子進程中都打印了一下conn1一端的對象,發現兩個再也不同一個地址,可是子進程中的管道和主進程中的管道仍是能夠通訊的,由於管道是同一套,系統可以記錄。 函數
咱們的目的就是關閉全部的管道,那麼主進程和子進程進行通訊的時候,能夠給子進程傳管道的一端就夠了,而且用咱們以前學到的,信息發送完以後,再發送一個結束信號None,那麼你收到的消息爲None的時候直接結束接收或者說結束循環,就不用每次都關閉各個進程中的管道了。spa
數據共享:操作系統
Manager模塊介紹線程
多進程共同去處理共享數據的時候,就和咱們多進程同時去操做一個文件中的數據是同樣的,不加鎖就會出現錯誤的結果,進程不安全的,因此也須要加鎖code
from multiprocessing import Manager,Process,Lock def work(d,lock): with lock: #不加鎖而操做共享的數據,確定會出現數據錯亂 d['count']-=1 if __name__ == '__main__': lock=Lock() with Manager() as m: dic=m.dict({'count':100}) #字典形式 p_l=[] for i in range(100): p=Process(target=work,args=(dic,lock)) p_l.append(p) p.start() for p in p_l: p.join() print(dic)
進程池和multiprocess Pool 模塊
建立進程池的類:若是指定numprocess爲3,則進程池會從無到有建立三個進程,而後自始至終使用這三個進程去執行全部任務(高級一些的進程池能夠根據你的併發量,搞成動態增長或減小進程池中的進程數量的操做),不會開啓其餘進程,提升操做系統效率,減小空間的佔用等。
定義池子將建立定量的進程:去完成任務,3個出去再回到池子中但進程不關閉,在出去執行任務,直到將任務執行完成結束
進程池的主要方法:
from multiprocessing import Process,Pool import time def func(n): for i in range(5): n=n=i print(n) if __name__ == '__main__': pool_start_time=time.time() pool= Pool(4) pool.map(func,range(100)) pool_end_time=time.time() pool_dif_time=pool_end_time-pool_start_time p_s_time=time.time() p_list=[] for i in range(100): p2=Process(target=func,args=(i,)) p2.start() p_list.append(p2) [p.join() for p in p_list] p_e_time=time.time() p_dif_time=p_e_time-p_s_time print("進程池時間",pool_dif_time) print("多進程時間",p_dif_time)
進程池的同步異步:
同步:
from multiprocessing import Process,Pool import time def func(i): time.sleep(1) return i**2 if __name__ == '__main__': p=Pool(4) for i in range(10): res=p.apply(func,args=(i,)) #同步的方法多個進程完成任務 能夠獲得返回值也能夠不提取返回值 print(res)
異步:
from multiprocessing import Process,Pool import time import os def func(i): time.sleep(2) # print(os.getpid()) return i**2 if __name__ == '__main__': p=Pool(4) res_list=[] for i in range(10): res=p.apply_async(func,args=(i,)) #異步的方法 獲得返回值 res_list.append(res) for i in res_list: print(i.get())
回調函數:
進程池中任何一個任務一旦處理完了,就當即告知主進程:我好了額,你能夠處理個人結果了。主進程則調用一個函數去處理該結果,該函數即回調函數,這是進程池特有的,普通進程沒有這個機制,可是咱們也能夠經過進程通訊來拿到返回值,進程池的這個回調也是進程通訊的機制完成的。
import os from multiprocessing import Pool def func1(n): print('func1>>',os.getpid()) print('func1') return n*n def func2(nn): print('func2>>',os.getpid()) print('func2') print(nn) # import time # time.sleep(0.5) if __name__ == '__main__': print('主進程:',os.getpid()) p = Pool(5) #args裏面的10給了func1,func1的返回值做爲回調函數的參數給了callback對應的函數,不能直接給回調函數直接傳參數,他只能是你任務函數func1的函數的返回值 # for i in range(10,20): #若是是多個進程來執行任務,那麼當全部子進程將結果給了回調函數以後,回調函數又是在主進程上執行的,那麼就會出現打印結果是同步的效果。咱們上面func2裏面註銷的時間模塊打開看看 # p.apply_async(func1,args=(i,),callback=func2) p.apply_async(func1,args=(10,),callback=func2) p.close() p.join() #結果 # 主進程: 11852 #發現回調函數是在主進程中完成的,其實若是是在子進程中完成的,那咱們直接將代碼寫在子進程的任務函數func1裏面就好了,對不對,這也是爲何稱爲回調函數的緣由。 # func1>> 17332 # func1 # func2>> 11852 # func2 # 100