python的multiprocessing模塊是用來建立多進程的,下面對multiprocessing總結一下使用記錄。html
multiprocessing建立多進程在windows和linux系統下的對比python
import ospid = os.fork() # 建立一個子進程if pid == 0: print('這是子進程') print(os.getpid(),os.getppid())else: print('這是父進程') print(os.getpid())os.wait() # 等待子進程結束釋放資源
fork函數被調用後會返回兩次,pid爲0的表明子進程,其餘返回子進程的id號表示父進程。linux
getpid和getppid函數能夠獲取本進程和父進程的id號;windows
fork方式的缺點:安全
兼容性差,只能在類linux系統下使用,windows系統不可以使用;app
擴展性差,當須要多條進程的時候,進程管理變得很複雜;異步
會產生「孤兒」進程和「殭屍」進程,須要手動回收資源。async
優勢:函數
是系統自帶的接近低層的建立方式,運行效率高。ui
建立方式一:
from multiprocessing import Queue, Processimport osdef test(): time.sleep(2) print('this is process {}'.format(os.getpid()))if __name__ == '__main__': p = Process(target=test) p.start() # 子進程 開始執行 p.join() # 等待子進程結束 print('ths peocess is ended')
建立方式二:
from multiprocessing import Queue, Processimport osclass MyProcess(Process): def run(self): time.sleep(2) print('this is process {}'.format(os.getpid())) def __del__(self): print('del the process {}'.format(os.getpid()))if __name__ == '__main__': p = MyProcess() p.start() print('ths process is ended')# 結果:ths process is ended this is process 7600del the process 7600del the process 12304
說明:
Process對象能夠建立進程,但Process對象不是進程,其刪除與否與系統資源是否被回收沒有直接的關係。
上例看到del方法被調用了兩次,Process進程建立時,子進程會將主進程的Process對象徹底複製一份,這樣在主進程和子進程各有一個Process對象,可是p1.start()啓動的是子進程,主進程中的Process對象做爲一個靜態對象存在。
主進程執行完畢後會默認等待子進程結束後回收資源,不須要手動回收資源;
join()函數用來控制子進程結束的順序,主進程會阻塞等待子進程結束,其內部也有一個清除殭屍進程的函數,能夠回收資源;
當子進程執行完畢後,會產生一個殭屍進程,其會被join函數回收,或者再有一條進程開啓,start函數也會回收殭屍進程,因此不必定須要寫join函數。
windows系統在子進程結束後會當即自動清除子進程的Process對象,而linux系統子進程的Process對象若是沒有join函數和start函數的話會在主進程結束後統一清除。
class Process(object): def __init__(self, group=None, target=None, name=None, args=(), kwargs={}): pass # Process對象是python用來建立進程的類 group:擴展保留字段; target:目標代碼,通常是咱們須要建立進程執行的目標函數。 name:進程的名字,若是不指定會自動分配一個; args:目標函數的普通參數; kwargs:目標函數的鍵值對參數; # 方法 start():建立一個子進程並執行,該方法一個Process實例只能執行一次,其會建立一個進程執行該類的run方法。 run():子進程須要執行的代碼; join():主進程阻塞等待子進程直到子進程結束才繼續執行,能夠設置等待超時時間timeout. terminate():使活着的進程終止; is_alive():判斷子進程是否還活着。
若是須要建立大量的進程,就須要使用Pool了。
from multiprocessing import Queue, Process, Poolimport osdef test(): time.sleep(2) print('this is process {}'.format(os.getpid()))def get_pool(n=5): p = Pool(n) # 設置進程池的大小 for i in range(10): p.apply_async(test) p.close() # 關閉進程池 p.join()if __name__ == '__main__': get_pool() print('ths process is ended')
分析:廈門廈工叉車怎麼樣?
如上,進程池Pool被建立出來後,即便實際須要建立的進程數遠遠大於進程池的最大上限,p1.apply_async(test)代碼依舊會不停的執行,並不會停下等待;至關於向進程池提交了10個請求,會被放到一個隊列中;
當執行完p1 = Pool(5)這條代碼後,5條進程已經被建立出來了,只是尚未爲他們各自分配任務,也就是說,不管有多少任務,實際的進程數只有5條,計算機每次最多5條進程並行。
當Pool中有進程任務執行完畢後,這條進程資源會被釋放,pool會按先進先出的原則取出一個新的請求給空閒的進程繼續執行;
當Pool全部的進程任務完成後,會產生5個殭屍進程,若是主線程不結束,系統不會自動回收資源,須要調用join函數去回收。
join函數是主進程等待子進程結束回收系統資源的,若是沒有join,主程序退出後無論子進程有沒有結束都會被強制殺死;
建立Pool池時,若是不指定進程最大數量,默認建立的進程數爲系統的內核數量.
class Pool(object): def __init__(self, processes=None, initializer=None, initargs=(), maxtasksperchild=None, context=None): pass # 初始化參數 processes:進程池的大小,默認cpu內核的數量 initializer:建立進程執行的目標函數,其會按照進程池的大小建立相應個數的進程; initargs:目標函數的參數 context:代碼的上下文 # 方法 apply():使用阻塞方式調用func; apply_async():使用非阻塞方式條用func; close():關閉Pool,使其再也不接受新的任務; terminate():無論任務是否完成,當即終止; join():主進程阻塞,等待子進程的退出,必須在close()後面使用; map(self, func, iterable, chunksize=None):多進程執行一個函數,傳入不一樣的參數; starmap(self, func, iterable, chunksize=None):和map相似,但iterable參數可解壓縮; starmap_async(self, func, iterable, chunksize=None, callback=None,error_callback=None):使用異步的方式的starmap,callback爲返回後的處理函數 map_async(self, func, iterable, chunksize=None, callback=None,error_callback=None):異步方式的map
實例
from multiprocessing import Poolimport osdef test(n): time.sleep(1) print('this is process {}'.format(os.getpid())) return ndef test1(n, m): print(n, m) print('this is process {}'.format(os.getpid()))def back_func(values): # 多進程執行完畢會返回全部的結果的列表 print(values)def back_func_err(values): # 多進程執行完畢會返回全部錯誤的列表 print(values)def get_pool(n=5): p = Pool(n) # p.map(test, (i for i in range(10))) # 阻塞式多進程執行 # p.starmap(test1, zip([1,2,3],[3,4,5])) # 阻塞式多進程執行多參數函數 # 異步多進程執行函數 p.map_async(test, (i for i in range(5)), callback=back_func, error_callback=back_func_err) # 異步多進程執行多參數函數 p.starmap_async(test1, zip([1,2,3],[3,4,5]), callback=back_func, error_callback=back_func_err) print('-----') p.close() p.join()if __name__ == '__main__': get_pool() print('ths process is ended')
進程雖然不像線程那樣共享內存的數據,而是每一個進程有單獨的內存,但多進程也是共享文件系統的,即硬盤系統;當多進程同時寫入文件操做時,可能形成數據的破壞,所以進程也存在同步鎖。
from multiprocessing import Pool, Lockmuex = Lock()def test(): if muex.acquire(): f = open('./test_pro.txt', 'r+', encoding='utf-8') x = f.read() if not x: f.write('0') else: f.seek(0) f.write(str(int(x)+1)) f.close() muex.release()if __name__ == '__main__': p = Pool(5) for i in range(10): p.apply_async(test) p.close() p.join() with open('./test_pro.txt', 'r+', encoding='utf-8') as f: print(f.read())
進程鎖能夠保證文件系統的安全,可是它使得並行變成了串行,效率降低了,也可能形成死鎖問題,通常避免用鎖機制。