multiprocessing
模塊就是跨平臺版本的多進程模塊,提供了一個Process
類來表明一個進程對象,這個對象能夠理解爲是一個獨立的進程,能夠執行另外的事情算法
from multiprocessing import Process import os # 子進程要執行的代碼 def run_proc(name): print('啓動子線程{}{}'.format(name, os.getpid())) if __name__ == '__main__': print('父進程{}'.format(os.getpid())) p = Process(target=run_proc, args=('test',)) print('子進程將要啓動') p.start() p.join() print('子線程結束')
from multiprocessing import Process import os import time def run_proc(): """子進程要執行的代碼""" print('子進程運行中,pid=%d...' % os.getpid()) # os.getpid獲取當前進程的進程號 print('子進程將要結束...') if __name__ == '__main__': print('父進程pid: %d' % os.getpid()) # os.getpid獲取當前進程的進程號 p = Process(target=run_proc) p.start() >>> 父進程pid: 3580 >>> 子進程運行中,pid=3581... >>> 子進程將要結束...
Process([group [, target [, name [, args [, kwargs]]]]])多線程
from multiprocessing import Process import os from time import sleep def run_proc(name, age, **kwargs): for i in range(10): print('子進程運行中,name= %s,age=%d ,pid=%d...' % (name, age, os.getpid())) print(kwargs) sleep(0.2) if __name__=='__main__': p = Process(target=run_proc, args=('test',18), kwargs={"m":20}) p.start() sleep(1) # 1秒中以後,當即結束子進程 p.terminate() p.join() >>> 子進程運行中,name= test,age=18 ,pid=3593... >>> {'m': 20} >>> 子進程運行中,name= test,age=18 ,pid=3593... >>> {'m': 20} >>> 子進程運行中,name= test,age=18 ,pid=3593... >>> {'m': 20} >>> 子進程運行中,name= test,age=18 ,pid=3593... >>> {'m': 20} >>> 子進程運行中,name= test,age=18 ,pid=3593... >>> {'m': 20}
from multiprocessing import Process, Queue import os, time, random # 寫數據進程執行的代碼: def write(q): print('Process to write: %s' % os.getpid()) for value in ['A', 'B', 'C']: print('Put %s to queue...' % value) q.put(value) time.sleep(random.random()) # 讀數據進程執行的代碼: def read(q): print('Process to read: %s' % os.getpid()) while True: value = q.get(True) print('Get %s from queue.' % value) if __name__ == '__main__': # 父進程建立Queue,並傳給各個子進程: q = Queue() pw = Process(target=write, args=(q,)) pr = Process(target=read, args=(q,)) # 啓動子進程pw,寫入: pw.start() # 啓動子進程pr,讀取: pr.start() # 等待pw結束: pw.join() # pr進程裏是死循環,沒法等待其結束,只能強行終止: pr.terminate()
初始化Queue()對象時(例如:q=Queue()
),若括號中沒有指定最大可接收的消息數量,或數量爲負值,那麼就表明可接受的消息數量沒有上限(直到內存的盡頭);併發
""" 若是要啓動大量的子進程,能夠用進程池的方式批量建立子進程: """ from multiprocessing import Pool import os, time, random def long_time_task(name): print('運行任務 %s (%s)...' % (name, os.getpid())) start = time.time() time.sleep(random.random() * 3) end = time.time() print('任務 %s 運行 %0.2f 秒' % (name, (end - start))) if __name__ == '__main__': print('父進程 %s.' % os.getpid()) p = Pool(4) # 建立進程池中最多存4個子進程 for i in range(5): # Pool().apply_async(要調用的目標,(傳遞給目標的參數元祖,)) # 每次循環將會用空閒出來的子進程去調用目標 p.apply_async(long_time_task, args=(i,)) print('等待全部子進程完成...') p.close() p.join() print('全部子進程完成.') >>> 等待全部子進程完成... >>> 運行任務 0 (3722)... >>> 運行任務 1 (3723)... >>> 運行任務 2 (3724)... >>> 運行任務 3 (3725)... >>> 任務 3 運行 0.67 秒 >>> 運行任務 4 (3725)... >>> 任務 2 運行 1.29 秒 >>> 任務 0 運行 2.00 秒 >>> 任務 1 運行 2.77 秒 >>> 任務 4 運行 2.31 秒 >>> 全部子進程完成.
對Pool
對象調用join()
方法會等待全部子進程執行完畢,調用join()
以前必須先調用close()
,調用close()
以後就不能繼續添加新的Process
了。
請注意輸出的結果,task 0,1,2,3是馬上執行的,而task 4要等待前面某個task完成後才執行,這是由於Pool的默認大小在個人電腦上是4,所以,最多同時執行4個進程。這是Pool有意設計的限制,並非操做系統的限制。若是改爲:app
p = Pool(5)
就能夠同時跑5個進程。
因爲Pool
的默認大小是CPU的核數,若是你不幸擁有8核CPU,你要提交至少9個子進程才能看到上面的等待效果。
dom
# 修改import中的Queue爲Manager from multiprocessing import Manager,Pool import os,time,random def reader(q): print("reader啓動(%s),父進程爲(%s)" % (os.getpid(), os.getppid())) for i in range(q.qsize()): print("reader從Queue獲取到消息:%s" % q.get(True)) def writer(q): print("writer啓動(%s),父進程爲(%s)" % (os.getpid(), os.getppid())) for i in "itcast": q.put(i) if __name__=="__main__": print("(%s) start" % os.getpid()) q = Manager().Queue() # 使用Manager中的Queue po = Pool() po.apply_async(writer, (q,)) time.sleep(1) # 先讓上面的任務向Queue存入數據,而後再讓下面的任務開始從中取數據 po.apply_async(reader, (q,)) po.close() po.join() print("(%s) End" % os.getpid()) >>> (4157) start >>> writer啓動(4159),父進程爲(4157) >>> reader啓動(4160),父進程爲(4157) >>> reader從Queue獲取到消息:i >>> reader從Queue獲取到消息:t >>> reader從Queue獲取到消息:c >>> reader從Queue獲取到消息:a >>> reader從Queue獲取到消息:s >>> reader從Queue獲取到消息:t >>> (4157) End