Python中主要經過 multiprocess 包來操做和管理進程。html
python 啓動進程方式1:python
import time from multiprocessing import Process def fork(thread_name): time.sleep(2) print("subprocess: " + thread_name) if __name__ == '__main__': p = Process(target=fork, args=('hello_1',)) p.start() # 啓動進程 print("end...") # 結果輸出 end... subprocess: hello_1
Process類參數說明:安全
Process([ target [, name [, args [, kwargs]]]]]) target 表示子進程要執行的任務 args 表示調用對象的位置參數元組,args=(1,2,'hello',) kwargs 表示調用對象的字典,kwargs={'name':'baby','age':18} name 子進程的名稱
python 啓動進程方式2:服務器
import time from multiprocessing import Process class MyProcess(Process): def __init__(self, thread_name): super().__init__() self.thread_name = thread_name def run(self): time.sleep(2) print("subprocess: " + self.thread_name) if __name__ == '__main__': p = MyProcess('hello_1') p.start() print("end...") # 結果輸出 end... subprocess: hello_1
Tip:兩種啓動進程的方式沒有優劣之分~網絡
在主進程中經過 join 方法,可讓主進程等待子進程執行完畢後,再繼續往下執行併發
import time from multiprocessing import Process def fork(thread_name): time.sleep(2) print("subprocess: " + thread_name) if __name__ == '__main__': p = Process(target=fork, args=('hello_1',)) p.start() p.join() # 等待子進程執行完畢 print("end...") # 結果輸出 subprocess: hello_1 end...
多個子進程同時運行app
import time from multiprocessing import Process def fork(thread_name): time.sleep(2) print("subprocess: " + thread_name) if __name__ == '__main__': p_list = [] for i in range(1, 4): p = Process(target=fork, args=('hello_' + str(i),)) p.start() p_list.append(p) [p.join() for p in p_list] # 等待子進程執行完畢 print("end...") # 結果輸出 subprocess: hello_1 subprocess: hello_2 subprocess: hello_3 end...
如上是經過第一種方式啓動子進程,使用繼承 Process 類的形式啓動子進程示例以下:dom
import time from multiprocessing import Process class MyProcess(Process): def __init__(self, thread_name): super().__init__() self.thread_name = thread_name def run(self): time.sleep(2) print("subprocess: " + self.thread_name) if __name__ == '__main__': p_list = [] for i in range(1, 4): p = MyProcess('hello_' + str(i)) p.start() p_list.append(p) [p.join() for p in p_list] print("end...")
import time from multiprocessing import Process def fork(thread_name): time.sleep(2) print("subprocess: " + thread_name) if __name__ == '__main__': p = Process(target=fork, args=('hello',)) p.start() # 進程的名稱 print(p.name) # 輸出:Process-1 # 布爾值,True 表示該進程爲守護進程,默認爲 False,這個值須要在 p.start() 以前設置 print(p.daemon) # 輸出:False # 進程的pid print(p.pid) # 輸出:7980 # 進程的身份驗證鍵,默認是由 os.urandom() 隨機生成的32字符的字符串。 print(p.authkey) # 輸出:b'\xf2M)\xc8\xf6\xae8\x0c\xbet\xbcAT\xad7%ig9zl\xe5|\xb5|\x7f\xa6\xab\x8a\x8a\x94:' # 查看進程是否還在運行,若還在運行,則返回 True print(p.is_alive()) # 輸出:True # 主進程等待子進程 p 執行結束,再繼續往下執行 # p.join() # 強制終止子進程 p p.terminate() print('end...')
import time from multiprocessing import Process def fork(thread_name): for i in range(5): time.sleep(1) print("subprocess: " + thread_name + "..." + str(i)) if __name__ == '__main__': p = Process(target=fork, args=('hello',)) p.start() time.sleep(2) print('end...') # 輸出結果: subprocess: hello...0 subprocess: hello...1 end... subprocess: hello...2 subprocess: hello...3 subprocess: hello...4
能夠看到主進程的代碼先運行完畢,運行完成後,它會等待子進程執行完成後再結束。如果將子進程設置爲守護進程,則子進程會隨着主進程的代碼執行完畢而結束。注意守護進程內沒法再開啓子進程,不然拋出異常:AssertionError: daemonic processes are not allowed to have children。異步
import time from multiprocessing import Process def fork(thread_name): for i in range(5): time.sleep(1) print("subprocess: " + thread_name + "..." + str(i)) if __name__ == '__main__': p = Process(target=fork, args=('hello',)) p.daemon = True # 設置進程 p 爲守護進程 p.start() time.sleep(2) print('end...') # 輸出結果: subprocess: hello...0 subprocess: hello...1 end...
值得注意的是:守護進程是在主進程代碼執行結束後就終止,即主進程的代碼執行完畢,守護進程就終止。來看以下示例:async
import time from multiprocessing import Process def fork_1(thread_name): for i in range(5): time.sleep(1) print("subprocess: " + thread_name + "..." + str(i), end="\n") def fork_2(thread_name): for i in range(7): time.sleep(1) print("subprocess: " + thread_name + "..." + str(i), end="\n") if __name__ == '__main__': p1 = Process(target=fork_1, args=('hello',)) p2 = Process(target=fork_2, args=('hi',)) p1.daemon = True # 設置進程 p1 爲守護進程 p1.start() p2.start() time.sleep(2) print('end...') # 輸出結果: subprocess: hello...0 subprocess: hi...0 subprocess: hello...1 subprocess: hi...1 end... subprocess: hi...2 subprocess: hi...3 subprocess: hi...4 subprocess: hi...5 subprocess: hi...6
如上示例中,p1 爲守護進程,在主進程輸出 ‘end…’ 後,即主進程的代碼執行完畢後,守護進程 p1 就終止了。可是此時,主進程並無終止,它須要等待 p2 執行完畢以後再終止。
進程與進程之間數據是隔離的
from multiprocessing import Process def fork(thread_name): global n print("subprocess: " + thread_name + "...n=" + str(n)) n = 1 print("subprocess: " + thread_name + "...n=" + str(n)) if __name__ == '__main__': n = 100 p = Process(target=fork, args=('hello',)) p.start() p.join() print("main...n=" + str(n)) # 輸出結果: subprocess: hello...n=100 subprocess: hello...n=1 main...n=100
經過如上示例能夠看出,子進程 p 中的變量 n 和主進程中的變量 n 是兩個獨立的變量,存放在不一樣的內存空間,更改其中一個變量並不會影響另外一個變量的值。
要想在進程間共享數據,可經過 Manager 類實現。Manager 類中提供了不少能夠共享數據的數據類型,包括dict,list,Queue,Pipe 等。注意:Manager 中的數據是不安全的。當多個進程同時訪問共享數據的時候,就會產生數據安全問題。
多進程同時搶購餘票示例:
from multiprocessing import Process, Manager def work(m_dict): if m_dict['count'] > 0: print("%s get ticket %d" % (str(os.getpid()), m_dict['count'])) m_dict['count'] -= 1 if __name__ == '__main__': m = Manager() m_dict = m.dict({'count': 20}) p_list = [] for i in range(20): p = Process(target=work, args=(m_dict, )) p.start() p_list.append(p) for i in p_list: i.join() print("end..." + str(m_dict['count'])) # 輸出結果: 32940 get ticket 20 32941 get ticket 19 32942 get ticket 18 32939 get ticket 17 32943 get ticket 16 32944 get ticket 15 32946 get ticket 14 32945 get ticket 13 32947 get ticket 12 32948 get ticket 11 32953 get ticket 11 32958 get ticket 9 32957 get ticket 8 32955 get ticket 7 32956 get ticket 7 32954 get ticket 6 32950 get ticket 5 32949 get ticket 5 32951 get ticket 3 32952 get ticket 2 end...1
輸出結果中 「ticket 11」 被購買了2次,能夠看到當多個進程對同一份數據進行操做的時候,就會引起數據安全問題。
在如上示例中,增長進程數據還有可能出現以下這樣的報錯:
AttributeError: 'ForkAwareLocal' object has no attribute 'connection'
這個報錯的觸發緣由並無深究,極有多是 manager 內部緣由,在 manager 管理進程的同時不能夠進入主進程進行某些交互。能夠經過在子進程中 sleep 一下 來避免這個問題(這並非根本的解決方式)
import time, os from multiprocessing import Process, Manager def work(m_dict): time.sleep(0.5) # sleep 0.5 s,能夠繞過這個問題 if m_dict['count'] > 0: print("%s get ticket %d" % (str(os.getpid()), m_dict['count'])) m_dict['count'] -= 1 ...
如上的數據安全問題,能夠在子進程中加鎖來解決,即在同一時刻,僅容許一個進程執行 lock.acquire() 和 lock.release() 之間的代碼
import os from multiprocessing import Process, Manager, Lock def work(m_dict, lock): lock.acquire() if m_dict['count'] > 0: print("%s get ticket %d" % (str(os.getpid()), m_dict['count'])) m_dict['count'] -= 1 lock.release() if __name__ == '__main__': m = Manager() m_dict = m.dict({'count': 20}) lock = Lock() p_list = [] for i in range(20): p = Process(target=work, args=(m_dict, lock)) p.start() p_list.append(p) for i in p_list: i.join() print("end..." + str(m_dict['count'])) # 輸出結果: 33240 get ticket 20 33242 get ticket 19 33241 get ticket 18 33243 get ticket 17 33244 get ticket 16 33245 get ticket 15 33247 get ticket 14 33246 get ticket 13 33249 get ticket 12 33248 get ticket 11 33250 get ticket 10 33251 get ticket 9 33252 get ticket 8 33257 get ticket 7 33258 get ticket 6 33253 get ticket 5 33254 get ticket 4 33255 get ticket 3 33259 get ticket 2 33256 get ticket 1 end...0
Manager() 是經過共享進程來實現多進程之間數據共享。Manager() 返回的對象控制了一個 server 進程,這個 server 進程容許其餘進程經過 proxies 來訪問。多進程之間數據共享,除了 Manager() 外,還有 Value 、 Array,Value 和 Array 是經過共享內存的方式實現數據共享,一樣爲了保證數據安全,常常和同步互斥鎖配合使用。
關於 Value 、 Array 的具體使用方式可參閱 https://www.cnblogs.com/gengyi/p/8661235.html。
使用 Value 實現上述的搶票示例:
import os from multiprocessing import Process, Value, Lock def work(count, lock): lock.acquire() if count.value > 0: print("%s get ticket %d" % (str(os.getpid()), count.value)) count.value -= 1 lock.release() if __name__ == '__main__': count = Value('l', 50) lock = Lock() p_list = [] for i in range(50): p = Process(target=work, args=(count, lock)) p.start() p_list.append(p) for i in p_list: i.join() print("end..." + str(count.value))
from multiprocessing import Queue queue = Queue(3) # 建立隊列:Queue([maxsize]),maxsize 表示隊列的最大長度 queue.put('a') queue.put('b') queue.put('c') print(queue.full()) # 輸出 True,表示隊列已經滿了 # 若隊列已經滿了,繼續向隊列中插入數據,則程序會阻塞在這裏,直到隊列的另外一端有數據被取出,新的數據才能插入 # put 方法有兩個可選參數:block 和 timeout。 # block 默認爲 True,表示會阻塞 timeout 指定的時間,若是超時,會拋出 Queue.Full 異常。若是 block 爲 False,在 put 時 隊列已滿,則會當即拋出 Queue.Full 異常。 # timeout 默認爲 None,表示會一直阻塞。 # queue.put('d') # queue.put_nowait() # 等同於 queue.put(block = False) print(queue.get()) # 'a' print(queue.get()) # 'b' print(queue.get()) # 'c' print(queue.empty()) # 輸出 True,表示隊列已空 # 若隊列已空,繼續從該隊列中 get 數據,則程序會阻塞在這裏,直到隊列中新插入了數據。 # get 方法也有兩個參數:block 和 timeout,通 put 方法 # block 默認爲 True,表示會阻塞 timeout 指定的時間,若是 timeout 之間以內仍是沒有獲取到數據,會拋出 Queue.Empty 異常。block 爲 False 時,若隊列中有數據,則會當即返回數據,若是隊列爲空,則會當即拋出 Queue.Empty 異常. # timeout 默認爲 None,表示會一直阻塞。 # queue.get(False) # queue.get_nowait() # 等同於 queue.get(block = False) # print(queue.qsize()) # 獲取隊列的長度,某些系統上,此方法可能引起NotImplementedError異常。 # q.close() # 關閉隊列
生產者和消費者示例
from multiprocessing import Process, Queue import time def producer(name, production, queue): for i in range(2): time.sleep(0.5) queue.put(production + '_' + str(i)) print('%s produce %s' % (name, production + '_' + str(i)), end="\n") def consumer(name, queue): while True: data = queue.get() if data is None: break # None 爲結束信號 time.sleep(0.3) print('%s consume %s' % (name, data), end="\n") if __name__ == '__main__': queue = Queue() p_list = [] for index, f in enumerate(['apple', 'pear', 'peach']): p = Process(target=producer, args=('producer_' + str(index), f, queue)) p_list.append(p) p.start() Process(target=consumer, args=('consumer_1', queue)).start() Process(target=consumer, args=('consumer_2', queue)).start() [p.join() for p in p_list] # 有2個消費者,則發送2次 None queue.put(None) queue.put(None) # 輸出結果: producer_1 produce pear_0 producer_2 produce peach_0 producer_0 produce apple_0 consumer_2 consume peach_0 consumer_1 consume pear_0 producer_1 produce pear_1 producer_2 produce peach_1 producer_0 produce apple_1 consumer_2 consume apple_0 consumer_1 consume peach_1 consumer_2 consume pear_1 consumer_1 consume apple_1
經過向隊列中插入 None,來告訴消費者生產已經結束。這是一種比較低端的實現方式。
JoinableQueue 類是 Queue 類的擴展,JoinableQueue 類中的 task_done() 方法爲消費者調用方法,表示從隊列中獲取的項目(queue.get() 獲取的數據)已經被處理;JoinableQueue 類中的 join() 方法爲生產者調用的方法,生產者在調用 join() 方法後會被阻塞,直到隊列中的每一個項目都被調用 queue.task_done() 方法爲止。
以下示例是經過 task_done() 方法 和 join() 方法來實現相似於上述的發送結束信號機制。
from multiprocessing import Process, JoinableQueue import time def producer(name, production, queue): for i in range(2): time.sleep(0.5) queue.put(production + '_' + str(i)) print('%s produce %s' % (name, production + '_' + str(i)), end="\n") queue.join() def consumer(name, queue): while True: data = queue.get() time.sleep(0.3) print('%s consume %s' % (name, data), end="\n") queue.task_done() if __name__ == '__main__': queue = JoinableQueue() p_list = [] for index, f in enumerate(['apple', 'pear', 'peach']): p = Process(target=producer, args=('producer_' + str(index), f, queue)) p_list.append(p) p.start() c1 = Process(target=consumer, args=('consumer_1', queue)) c2 = Process(target=consumer, args=('consumer_2', queue)) c1.daemon = True c2.daemon = True c1.start() c2.start() [p.join() for p in p_list] print('end...')
輸出結果與上一個示例一致。這裏將 2個 consumer 設置爲守護進程,在等待 producer 完成後,也隨主進程的結束而結束。
管道的使用:
from multiprocessing import Process, Pipe def func(pro, con): pro.close() while True: try: print(con.recv()) except EOFError: con.close() break if __name__ == '__main__': pro, con = Pipe() # pro, con 分別表示管道的兩端 Process(target=func, args=(pro, con)).start() con.close() # 這裏也能夠不關閉 for i in range(5): pro.send(i) pro.close() # 輸出結果: 0 1 2 3 4
傳給進程的 conn(管道鏈接)是不會相互影響的,在一個進程中關閉了管道,並不會影響這個管道在另外一個進程中的使用。如果在一個進程中,管道的一端沒有被用到,那麼就應該將這一端關閉。例如在生產者中,應該關閉管道的 con 端(右端),在消費者中應該關閉管道的 pro 端(左端)。
當管道全部的入口都已經關閉(上述示例中,主進程和子進程中管道的入口都爲 pro),消費者繼續接收數據(調用 recv() 方法),當管道中已經沒有數據時,就會拋出 EOFError。
若是管道有入口沒有關閉,且該入口沒有在向管道發送數據,那麼消費者就會阻塞在 recv() 方法上。
如上示例是經過 拋出 EOFError 錯誤來結束管道,還有另外一種方式,就是經過管道中的數據(例如向管道中傳遞None)來結束管道
from multiprocessing import Process, Pipe def func(con): while True: data = con.recv() if data is None: break print(data) if __name__ == '__main__': pro, con = Pipe() # con, pro 分別表示管道的兩端 Process(target=func, args=(con,)).start() for i in range(5): pro.send(i) pro.send(None)
多個消費者消費管道中的數據示例(加鎖):
from multiprocessing import Process, Pipe, Lock import time def producer(pro, con, name, production): con.close() for i in range(4): time.sleep(0.5) pro.send(production + str(i)) print('%s produce %s' % (name, production + '_' + str(i)), end="\n") pro.close() def consumer(pro, con, name, lock): pro.close() while True: lock.acquire() try: data = con.recv() time.sleep(0.3) print('%s consume %s' % (name, data), end="\n") except EOFError: con.close() break finally: lock.release() if __name__ == '__main__': pro, con = Pipe() lock = Lock() Process(target=producer, args=(pro, con, 'producer', 'apple')).start() Process(target=consumer, args=(pro, con, 'c_1', lock)).start() Process(target=consumer, args=(pro, con, 'c_2', lock)).start() pro.close() con.close()
pipe(管道)是進程數據不安全的,隊列進程之間是數據安全的,由於隊列的實現就是基於管道和鎖實現的。因此管道極少被用到,生產環境中 pipe 通常也不多被用到,使用較多的通常會是隊列服務器,例如 rabbitmq,kafka…...
信號量也是一種鎖,信號量與互斥鎖區別在於,互斥鎖的 acquire() 方法和 release() 方法之間,僅容許一個線程(或進程)執行,而信號量可容許多個線程(或進程)執行。信號量的一種應用就是控制併發執行的線程(或進程)數。
from multiprocessing import Process, Semaphore import time def func(semaphore, name): if semaphore.acquire(): print(name) time.sleep(2) semaphore.release() if __name__ == '__main__': semaphore = Semaphore(3) for i in range(9): Process(target=func, args=(semaphore, 'process_' + str(i), )).start()
Python中的事件(Event)主要用於主線程(進程)控制其餘線程(進程)的執行,其主要方法包括 set、wait、clear,is_set。
若事件(Event)的標記取值爲 False,則線程(進程)會阻塞在 event.wait() 方法,event.wait() 還能夠設置一個參數 timeout,在等待 timeout 指定的時間後中止阻塞,繼續運行。
方法說明:
event.set():將 event 的標記設置爲 True,全部 阻塞在 event.wait() 的線程(進程)都會繼續執行 event.clear():將 event 的標記設置爲 False。 event.is_set():判斷 event 的標誌是否爲 True。
以下示例,在主進程中控制子進程在什麼時候繼續向下執行。例如在主進程的 time.sleep(3) 處能夠執行一些檢測工做,確保子進程的運行,若檢測沒有問題則繼續子進程的運行。
from multiprocessing import Process, Event import time def worker(name, event): print('Process_%s is ready' % name) event.wait() print('Process_%s is running' % name) if __name__ == '__main__': event = Event() for i in range(0, 2): Process(target=worker, args=(i, event)).start() time.sleep(3) event.set() # 結果輸出: Process_0 is ready Process_1 is ready Process_0 is running Process_1 is running
如上示例,若主進程一直沒有容許子進程繼續執行(例如檢測工做沒有經過),則子進程會一直阻塞在 event.wait() 這兒,咱們但願在子進程阻塞過程當中會有持續的提示信息,這個能夠經過設置 event.wait 方法的 timeout 參數實現。
from multiprocessing import Process, Event import time def worker(name, event): while not event.is_set(): print('Process_%s is ready' % name) event.wait(1) print('Process_%s is running' % name) if __name__ == '__main__': event = Event() for i in range(0, 2): Process(target=worker, args=(i, event)).start() time.sleep(3) event.set() # 結果輸出: Process_0 is ready Process_1 is ready Process_0 is ready Process_1 is ready Process_0 is ready Process_1 is ready Process_0 is ready Process_1 is ready Process_1 is running Process_0 is running
進程的建立和銷燬都須要消耗系統資源,且每一臺服務器的 cpu 核心數有限,建立過多的進程反而會下降執行效率。這裏就可使用進程池,進程池一啓動就會建立固定數量的進程,有執行須要了,就從進程池中獲取一個進程處理對應的任務,處理完成後,進程不會被銷燬,而是放回進程池中。若是同時須要執行的任務過多,沒有獲取到進程的任務須要等待,等有空閒的進程了才能運行。
進程池節省了操做系統在建立和銷燬進程上所花去的開銷,也限制了同一時間可以運行的進程總數,在必定程度上提高了多進程的執行效率。
以下示例是使用進程池啓動進程和直接啓動進程的效率差距:
from multiprocessing import Process, Pool import time def m_add(a): return a ** a if __name__ == '__main__': # print(os.cpu_count()) # 調試環境的 cpu 核數爲 8 # 建立進程池 pool = Pool(8) start_t1 = time.time() # 使用進程池啓動進程 res = pool.map(m_add, range(500)) print(time.time() - start_t1) p_list = [] start_t2 = time.time() # 直接啓動進程 for i in range(500): p = Process(target=m_add, args=(i, )) p_list.append(p) p.start() for p in p_list: p.join() print(time.time() - start_t2) # 輸出結果: 0.003328084945678711 0.6395020484924316
建立進程池:
Pool([numprocess [,initializer [, initargs]]]): numprocess:進程池中的固定繼承數,默認爲 cpu 核心數(os.cpu_count()) initializer:每次啓動進程須要執行的可調用對象 initargs:傳遞給 initializer 的參數
Pool 的經常使用方法:
map(func, iterable):異步提交任務。iterable 爲一個可迭代對象,這個可迭代對象的長度是多少,就啓動多少個子進程,且可迭代對象的每個元素會做爲參數傳遞給 func。注意,使用 map 方法開啓子進程,只能傳遞一個參數,若子進程須要多個參數,則這個參數可使用 元組;將全部子進程的返回結果以列表的形式返回。 apply(func [, args [, kwargs]]):同步提交任務,返回子進程的執行結果。若是須要併發地執行 func,必須從不一樣線程中調用同一個進程池的 apply() 方法; apply_async(func [, args [, kwargs]]):異步提交任務,返回 AsyncResult 類的實例,從 AsyncResult 實例中獲取執行結果。與 map 方法的區別是,apply_async 方法能夠爲所欲爲地傳遞參數; close():結束進程池接受任務; jion():感知進程池中的任務執行結束。即全部提交進來的任務都已經執行完畢,且沒有新的任務提交進來。
Tip:進程池能夠有返回值,這是進程池特有的,可是直接起進程,是作不到有返回值的。
apply 方法應用:
import time, os from multiprocessing import Pool def worker(i): print('worker_%s running, pid: %s' % (i, os.getpid())) time.sleep(1) return i * i if __name__ == '__main__': pool = Pool(3) res_list = [] for i in range(7): res = pool.apply(worker, args=(i, )) # 返回的 res 便是子進程的返回結果 res_list.append(res) print(res_list) print('...end') # 輸出結果: worker_0 running, pid: 20584 worker_1 running, pid: 20585 worker_2 running, pid: 20586 worker_3 running, pid: 20584 worker_4 running, pid: 20585 worker_5 running, pid: 20586 worker_6 running, pid: 20584 [0, 1, 4, 9, 16, 25, 36] ...end
在同一個線程中使用 pool.apply 方法提交任務,是提交一個,執行一個,執行完成後才能繼續提交下一個任務。如上輸出結果也是逐個輸出。
apply_async 方法應用:
import time, os from multiprocessing import Pool def worker(i): print('worker_%s running, pid: %s' % (i, os.getpid())) time.sleep(1) return i * i if __name__ == '__main__': pool = Pool(3) res_list = [] for i in range(7): res = pool.apply_async(worker, args=(i, )) # res 爲 AsyncResult 類的實例 res_list.append(res) pool.close() pool.join() for i in res_list: print(i.get()) print('...end') # 輸出結果: worker_0 running, pid: 20598 worker_1 running, pid: 20599 worker_2 running, pid: 20600 worker_3 running, pid: 20598 worker_4 running, pid: 20599 worker_5 running, pid: 20600 worker_6 running, pid: 20599 0 1 4 9 16 25 36 ...end
經過 AsyncResult 對象的 get 方法獲取返回值,get 方法會阻塞,即阻塞到子進程執行完畢,而後獲取其返回值。
通常使用 apply_async 方法 異步提交任務,須要在主進程中感知任務結束(join方法),而且在 join 方法前面結束進程池接受任務(close方法)
map 方法應用:
import time, os from multiprocessing import Pool def worker(i): print('worker_%s running, pid: %s' % (i, os.getpid())) time.sleep(1) return i * i if __name__ == '__main__': pool = Pool(3) res_list = pool.map(worker, range(7)) for i in res_list: print(i) print('...end') # 輸出結果: worker_0 running, pid: 20713 worker_1 running, pid: 20714 worker_2 running, pid: 20715 worker_3 running, pid: 20714 worker_4 running, pid: 20713 worker_5 running, pid: 20715 worker_6 running, pid: 20715 0 1 4 9 16 25 36 ...end
map 方法自帶 join 方法和 close 方法,map 方法啓動子進程後,就不容許再提交任務,且 map 方法會阻塞,直到子進程所有執行完畢,且將全部子進程的返回結果以列表的形式返回。
如果不想阻塞在 map 方法,則可使用 map_async,只是用了 map_async 方法,須要本身進行 close 和 join。
import time, os from multiprocessing import Pool def worker(i): print('worker_%s running, pid: %s' % (i, os.getpid())) time.sleep(1) return i * i if __name__ == '__main__': pool = Pool(3) res_list = pool.map_async(worker, range(7)) pool.close() pool.join() for i in res_list.get(): print(i) print('...end')
返回結果與上述一致。
進程池中一個進程處理完任務以後,這進程能夠調用一個函數去處理該進程返回的結果,這個函數就是回調函數。回調函數的主要做用是告訴主進程,這裏已經執行完畢,主進程能夠針對返回結果繼續後續的處理。相對於主進程輪詢等待子進程的返回結果,利用回調函數能夠提升程序的執行效率。
注意回調函數是由主進程執行的,能夠將一些比較耗IO的操做放到進程池中執行,由主進程統一處理它們的返回結果。
回調函數簡單示例:
from multiprocessing import Pool def func(info): print('...' + str(info)) def worker(i): return i * i if __name__ == '__main__': pool = Pool(3) res_list = [] for i in range(7): res = pool.apply_async(worker, args=(i, ), callback=func) res_list.append(res) pool.close() pool.join() print('~end') # 輸出結果: ...0 ...4 ...9 ...1 ...16 ...36 ...25 ~end
以下示例中,能夠將具體的業務放在 worker 方法中,例如從網絡上爬取數據,而後統一由回調函數 func 寫到一個文件中。
from multiprocessing import Pool def func(info): with open('abc.txt', 'a+') as f: f.writelines(str(info) + '\n') def worker(i): return i * i if __name__ == '__main__': pool = Pool() for i in range(10): pool.apply_async(worker, (i,), callback=func) pool.close() pool.join()
.................^_^