python之multiprocessing建立進程

python的multiprocessing模塊是用來建立多進程的,下面對multiprocessing總結一下使用記錄。html

multiprocessing建立多進程在windows和linux系統下的對比python

fork()

import ospid = os.fork() # 建立一個子進程if pid == 0:    print('這是子進程')    print(os.getpid(),os.getppid())else:    print('這是父進程')    print(os.getpid())os.wait() # 等待子進程結束釋放資源
  • fork函數被調用後會返回兩次,pid爲0的表明子進程,其餘返回子進程的id號表示父進程。linux

  • getpid和getppid函數能夠獲取本進程和父進程的id號;windows

fork方式的缺點:安全

  1. 兼容性差,只能在類linux系統下使用,windows系統不可以使用;app

  2. 擴展性差,當須要多條進程的時候,進程管理變得很複雜;異步

  3. 會產生「孤兒」進程和「殭屍」進程,須要手動回收資源。async

優勢:函數

是系統自帶的接近低層的建立方式,運行效率高。ui

Process建立進程

  • 建立方式一:

from multiprocessing import Queue, Processimport osdef test():
    time.sleep(2)
    print('this is process {}'.format(os.getpid()))if __name__ == '__main__':
    p = Process(target=test)
    p.start() # 子進程 開始執行
    p.join() # 等待子進程結束
    print('ths peocess is ended')
  • 建立方式二:

from multiprocessing import Queue, Processimport osclass MyProcess(Process):

    def run(self):
        time.sleep(2)
        print('this is process {}'.format(os.getpid()))    def __del__(self):
        print('del the process {}'.format(os.getpid()))if __name__ == '__main__':
    p = MyProcess()
    p.start()
    print('ths process is ended')# 結果:ths process is ended
this is process 7600del the process 7600del the process 12304

說明:

  • Process對象能夠建立進程,但Process對象不是進程,其刪除與否與系統資源是否被回收沒有直接的關係。

  • 上例看到del方法被調用了兩次,Process進程建立時,子進程會將主進程的Process對象徹底複製一份,這樣在主進程和子進程各有一個Process對象,可是p1.start()啓動的是子進程,主進程中的Process對象做爲一個靜態對象存在。

  • 主進程執行完畢後會默認等待子進程結束後回收資源,不須要手動回收資源;

  • join()函數用來控制子進程結束的順序,主進程會阻塞等待子進程結束,其內部也有一個清除殭屍進程的函數,能夠回收資源;

  • 當子進程執行完畢後,會產生一個殭屍進程,其會被join函數回收,或者再有一條進程開啓,start函數也會回收殭屍進程,因此不必定須要寫join函數。

  • windows系統在子進程結束後會當即自動清除子進程的Process對象,而linux系統子進程的Process對象若是沒有join函數和start函數的話會在主進程結束後統一清除。

Process對象分析

class Process(object):
    def __init__(self, group=None, target=None, name=None, args=(), kwargs={}):
        pass
# Process對象是python用來建立進程的類
group:擴展保留字段;
target:目標代碼,通常是咱們須要建立進程執行的目標函數。
name:進程的名字,若是不指定會自動分配一個;
args:目標函數的普通參數;
kwargs:目標函數的鍵值對參數;

# 方法
start():建立一個子進程並執行,該方法一個Process實例只能執行一次,其會建立一個進程執行該類的run方法。
run():子進程須要執行的代碼;
join():主進程阻塞等待子進程直到子進程結束才繼續執行,能夠設置等待超時時間timeout.
terminate():使活着的進程終止;
is_alive():判斷子進程是否還活着。

進程池Pool

若是須要建立大量的進程,就須要使用Pool了。

from multiprocessing import Queue, Process, Poolimport osdef test():
    time.sleep(2)
    print('this is process {}'.format(os.getpid()))def get_pool(n=5):
    p = Pool(n) # 設置進程池的大小
    for i in range(10):
        p.apply_async(test)
    p.close() # 關閉進程池
    p.join()if __name__ == '__main__':
    get_pool()
    print('ths process is ended')

分析:廈門廈工叉車怎麼樣?

  • 如上,進程池Pool被建立出來後,即便實際須要建立的進程數遠遠大於進程池的最大上限,p1.apply_async(test)代碼依舊會不停的執行,並不會停下等待;至關於向進程池提交了10個請求,會被放到一個隊列中;

  • 當執行完p1 = Pool(5)這條代碼後,5條進程已經被建立出來了,只是尚未爲他們各自分配任務,也就是說,不管有多少任務,實際的進程數只有5條,計算機每次最多5條進程並行。

  • 當Pool中有進程任務執行完畢後,這條進程資源會被釋放,pool會按先進先出的原則取出一個新的請求給空閒的進程繼續執行;

  • 當Pool全部的進程任務完成後,會產生5個殭屍進程,若是主線程不結束,系統不會自動回收資源,須要調用join函數去回收。

  • join函數是主進程等待子進程結束回收系統資源的,若是沒有join,主程序退出後無論子進程有沒有結束都會被強制殺死;

  • 建立Pool池時,若是不指定進程最大數量,默認建立的進程數爲系統的內核數量.

Pool對象分析

class Pool(object):
    def __init__(self, processes=None, initializer=None, initargs=(),
                 maxtasksperchild=None, context=None):
        pass
# 初始化參數
processes:進程池的大小,默認cpu內核的數量
initializer:建立進程執行的目標函數,其會按照進程池的大小建立相應個數的進程;
initargs:目標函數的參數
context:代碼的上下文

# 方法
apply():使用阻塞方式調用func;
apply_async():使用非阻塞方式條用func;
close():關閉Pool,使其再也不接受新的任務;
terminate():無論任務是否完成,當即終止;
join():主進程阻塞,等待子進程的退出,必須在close()後面使用;
map(self, func, iterable, chunksize=None):多進程執行一個函數,傳入不一樣的參數;
starmap(self, func, iterable, chunksize=None):和map相似,但iterable參數可解壓縮;
starmap_async(self, func, iterable, chunksize=None, callback=None,error_callback=None):使用異步的方式的starmap,callback爲返回後的處理函數
map_async(self, func, iterable, chunksize=None, callback=None,error_callback=None):異步方式的map
  • 實例

from multiprocessing import Poolimport osdef test(n):
    time.sleep(1)
    print('this is process {}'.format(os.getpid()))    return ndef test1(n, m):
    print(n, m)
    print('this is process {}'.format(os.getpid()))def back_func(values): # 多進程執行完畢會返回全部的結果的列表
    print(values)def back_func_err(values): # 多進程執行完畢會返回全部錯誤的列表
    print(values)def get_pool(n=5):
    p = Pool(n)    # p.map(test, (i for i in range(10))) # 阻塞式多進程執行
    # p.starmap(test1, zip([1,2,3],[3,4,5])) # 阻塞式多進程執行多參數函數
    # 異步多進程執行函數
    p.map_async(test, (i for i in range(5)), callback=back_func, error_callback=back_func_err)    # 異步多進程執行多參數函數
    p.starmap_async(test1, zip([1,2,3],[3,4,5]), callback=back_func, error_callback=back_func_err)
    print('-----')
    p.close()
    p.join()if __name__ == '__main__':
    get_pool()
    print('ths process is ended')

進程鎖

進程雖然不像線程那樣共享內存的數據,而是每一個進程有單獨的內存,但多進程也是共享文件系統的,即硬盤系統;當多進程同時寫入文件操做時,可能形成數據的破壞,所以進程也存在同步鎖。

from multiprocessing import Pool, Lockmuex = Lock()def test():    if muex.acquire():
        f = open('./test_pro.txt', 'r+', encoding='utf-8')
        x = f.read()        if not x:
            f.write('0')        else:
            f.seek(0)
            f.write(str(int(x)+1))
        f.close()
        muex.release()if __name__ == '__main__':
    p = Pool(5)    for i in range(10):
        p.apply_async(test)
    p.close()
    p.join()    with open('./test_pro.txt', 'r+', encoding='utf-8') as f:
        print(f.read())

進程鎖能夠保證文件系統的安全,可是它使得並行變成了串行,效率降低了,也可能形成死鎖問題,通常避免用鎖機制。

相關文章
相關標籤/搜索