內容目錄:
- multiprocessing.Queue()
- JoinableQueue
- 進程間的信號傳遞 Event
- 控制對資源的訪問 Lock
- 同步操做 Condition
- 控制對資源的併發訪問 Semaphore
- 管理共享狀態 Manager
- 共享命名空間 mgr.Namespace()
- 進程池 multiprocessing.Pool
和線程同樣,多進程的一個常見的使用模式是將一個任務劃分爲幾個worker,以便並行運行。有效地使用多進程一般須要它們之間的一些通訊,這樣工做就能夠被分割,結果能夠被聚合。一種簡單方法是使用隊列multiprocessing.Queue()
來回傳遞消息。任何能夠用pickle序列化的對象均可以經過隊列。python
import multiprocessing class MyFancyClass: def __init__(self, name): self.name = name def do_something(self): proc_name = multiprocessing.current_process().name print('Doing something fancy in {} for {}!'.format( proc_name, self.name)) def worker(q): obj = q.get() obj.do_something() if __name__ == '__main__': queue = multiprocessing.Queue() p = multiprocessing.Process(target=worker, args=(queue,)) p.start() queue.put(MyFancyClass('Fancy Dan')) # Wait for the worker to finish queue.close() queue.join_thread() p.join()
結果:當q是空的時候,q.get()會等。併發
Doing something fancy in Process-1 for Fancy Da
JoinableQueue的實例p除了與Queue對象相同的方法以外還具備:app
使用None這個特殊值來判斷是否結束Workerui
import multiprocessing import time class Consumer(multiprocessing.Process): def __init__(self, task_queue, result_queue): multiprocessing.Process.__init__(self) self.task_queue = task_queue self.result_queue = result_queue def run(self): proc_name = self.name while True: next_task = self.task_queue.get() if next_task is None: # Poison pill means shutdown print('{}: Exiting'.format(proc_name)) self.task_queue.task_done() break # next_task是Task()的一個實例,打印next_task會輸出__str__ print('{}: {}'.format(proc_name, next_task)) # 執行next_task()會執行__call__ answer = next_task() self.task_queue.task_done() self.result_queue.put(answer) class Task: def __init__(self, a, b): self.a = a self.b = b def __call__(self): time.sleep(0.1) # pretend to take time to do the work return '{self.a} * {self.b} = {product}'.format( self=self, product=self.a * self.b) def __str__(self): return '{self.a} * {self.b}'.format(self=self) if __name__ == '__main__': # Establish communication queues tasks = multiprocessing.JoinableQueue() results = multiprocessing.Queue() # Start consumers num_consumers = multiprocessing.cpu_count() * 2 print('Creating {} consumers'.format(num_consumers)) consumers = [ Consumer(tasks, results) for i in range(num_consumers) ] for w in consumers: w.start() # Enqueue jobs num_jobs = 10 for i in range(num_jobs): tasks.put(Task(i, i)) # Add a poison pill for each consumer for i in range(num_consumers): tasks.put(None) # Wait for all of the tasks to finish tasks.join() # Start printing results while num_jobs: result = results.get() print('Result:', result) num_jobs -= 1
執行結果:spa
Creating 8 consumers Consumer-4: 0 * 0 Consumer-1: 1 * 1 Consumer-2: 2 * 2 Consumer-4: 3 * 3 Consumer-1: 4 * 4 Consumer-2: 5 * 5 Consumer-1: 6 * 6 Consumer-6: 7 * 7 Consumer-4: 8 * 8 Consumer-2: 9 * 9 Consumer-1: Exiting Consumer-4: Exiting Consumer-6: Exiting Consumer-2: Exiting Consumer-5: Exiting Consumer-8: Exiting Consumer-3: Exiting Consumer-7: Exiting Result: 0 * 0 = 0 Result: 1 * 1 = 1 Result: 2 * 2 = 4 Result: 4 * 4 = 16 Result: 3 * 3 = 9 Result: 5 * 5 = 25 Result: 6 * 6 = 36 Result: 8 * 8 = 64 Result: 7 * 7 = 49 Result: 9 * 9 = 81
Event類提供了一種簡單的方法來在進程之間傳遞狀態信息。
當wait()超時時,它返回時不會出現錯誤。調用者負責使用is_set()檢查事件的狀態線程
import multiprocessing import time def wait_for_event(e): """Wait for the event to be set before doing anything""" print('wait_for_event: starting') e.wait() print('wait_for_event: e.is_set()->', e.is_set()) def wait_for_event_timeout(e, t): """Wait t seconds and then timeout""" print('wait_for_event_timeout: starting') e.wait(t) print('wait_for_event_timeout: e.is_set()->', e.is_set()) if __name__ == '__main__': e = multiprocessing.Event() w1 = multiprocessing.Process( name='block', target=wait_for_event, args=(e,), ) w1.start() w2 = multiprocessing.Process( name='nonblock', target=wait_for_event_timeout, args=(e, 2), ) w2.start() print('main: waiting before calling Event.set()') time.sleep(3) e.set() print('main: event is set')
執行結果:code
main: waiting before calling Event.set() wait_for_event: starting wait_for_event_timeout: starting wait_for_event_timeout: e.is_set()-> False main: event is set wait_for_event: e.is_set()-> True
在須要在多個進程之間共享單個資源的狀況下,可使用鎖來避免衝突的訪問。orm
import multiprocessing import sys def worker_with(lock): with lock: sys.stdout.write('Lock acquired via with\n') def worker_no_with(lock): lock.acquire() try: sys.stdout.write('Lock acquired directly\n') finally: lock.release() if __name__ == '__main__': lock = multiprocessing.Lock() w = multiprocessing.Process( target=worker_with, args=(lock,), ) nw = multiprocessing.Process( target=worker_no_with, args=(lock,), ) w.start() nw.start() w.join() nw.join()
運行結果:對象
Lock acquired via with Lock acquired directly
cond.wait()等着,cond.notify_all()通知能夠往下運行了隊列
import multiprocessing import time def stage_1(cond): """perform first stage of work, then notify stage_2 to continue """ name = multiprocessing.current_process().name print('Starting', name) with cond: print('{} done and ready for stage 2'.format(name)) cond.notify_all() def stage_2(cond): """wait for the condition telling us stage_1 is done""" name = multiprocessing.current_process().name print('Starting', name) with cond: cond.wait() print('{} running'.format(name)) if __name__ == '__main__': condition = multiprocessing.Condition() s1 = multiprocessing.Process(name='s1', target=stage_1, args=(condition,)) s2_clients = [ multiprocessing.Process( name='stage_2[{}]'.format(i), target=stage_2, args=(condition,), ) for i in range(1, 3) ] for c in s2_clients: c.start() time.sleep(1) s1.start() s1.join() for c in s2_clients: c.join()
運行結果:在這個例子中,兩個進程並行地運行第二階段的工做,可是隻有在第一個階段完成以後。
Starting stage_2[1] Starting stage_2[2] Starting s1 s1 done and ready for stage 2 stage_2[1] running stage_2[2] running
有時,容許多個worker一次訪問一個資源是頗有用的,但要限制了數量。
import multiprocessing import time def worker(s, i): s.acquire() print(multiprocessing.current_process().name + "acquire"); time.sleep(i) print(multiprocessing.current_process().name + "release\n"); s.release() if __name__ == "__main__": s = multiprocessing.Semaphore(2) for i in range(5): p = multiprocessing.Process(target = worker, args=(s, i*2)) p.start()
運行結果:
Process-2acquire Process-3acquire Process-2release Process-4acquire Process-3release Process-1acquire Process-1release Process-5acquire Process-4release Process-5release
經過Manager共享信息,全部進程都能看獲得。
import multiprocessing import pprint def worker(d, key, value): d[key] = value if __name__ == '__main__': mgr = multiprocessing.Manager() d = mgr.dict() jobs = [ multiprocessing.Process( target=worker, args=(d, i, i * 2), ) for i in range(10) ] for j in jobs: j.start() for j in jobs: j.join() print('Results:', d
運行結果:經過Manager建立列表,它是共享的,而且在全部進程中均可以看到更新。字典也支持。
Results: {0: 0, 2: 4, 3: 6, 1: 2, 4: 8, 6: 12, 5: 10, 7: 14, 8: 16, 9: 18}
除了字典和列表以外,管理者還能夠建立一個共享的名稱空間。
import multiprocessing def producer(ns, event): ns.value = 'This is the value' event.set() def consumer(ns, event): try: print('Before event: {}'.format(ns.value)) except Exception as err: print('Before event, error:', str(err)) event.wait() print('After event:', ns.value) if __name__ == '__main__': mgr = multiprocessing.Manager() namespace = mgr.Namespace() event = multiprocessing.Event() p = multiprocessing.Process( target=producer, args=(namespace, event), ) c = multiprocessing.Process( target=consumer, args=(namespace, event), ) c.start() p.start() c.join() p.join()
運行結果:能夠看到在另外一個進程中能夠對mgr.Namespace()進行復制,其餘進程能夠訪問。
Before event, error: 'Namespace' object has no attribute 'value' After event: This is the value
重要的是要知道mgr.Namespace()中可變值的內容的更新不會自動傳播。
import multiprocessing def producer(ns, event): # DOES NOT UPDATE GLOBAL VALUE! ns.my_list.append('This is the value') event.set() def consumer(ns, event): print('Before event:', ns.my_list) event.wait() print('After event :', ns.my_list) if __name__ == '__main__': mgr = multiprocessing.Manager() namespace = mgr.Namespace() namespace.my_list = [] event = multiprocessing.Event() p = multiprocessing.Process( target=producer, args=(namespace, event), ) c = multiprocessing.Process( target=consumer, args=(namespace, event), ) c.start() p.start() c.join() p.join()
運行結果:
Before event: [] After event : []
池類可用於管理固定數量的worker,用於簡單的工做,在這些狀況下,能夠將工做分解並獨立地分配給worker。
import multiprocessing def do_calculation(data): return data * 2 def start_process(): print('Starting', multiprocessing.current_process().name) if __name__ == '__main__': inputs = list(range(10)) print('Input :', inputs) builtin_outputs = map(do_calculation, inputs) print('Built-in:', builtin_outputs) pool_size = multiprocessing.cpu_count() * 2 pool = multiprocessing.Pool( processes=pool_size, initializer=start_process, ) pool_outputs = pool.map(do_calculation, inputs) pool.close() # no more tasks pool.join() # wrap up current tasks print('Pool :', pool_outputs)
運行結果:進程的返回值被收集並做爲一個列表返回。
Input : [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] Built-in: <map object at 0x000000000256A080> Starting SpawnPoolWorker-2 Starting SpawnPoolWorker-3 Starting SpawnPoolWorker-4 Starting SpawnPoolWorker-1 Starting SpawnPoolWorker-6 Starting SpawnPoolWorker-5 Starting SpawnPoolWorker-7 Starting SpawnPoolWorker-8 Pool : [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
在默認狀況下,池會建立固定數量的worker進程,並將做業傳遞給他們,直到沒有更多的工做。設置maxtasksperchild參數告訴池在完成了幾個任務後從新啓動worker進程,防止長時間運行的worker消耗更多的系統資源。
import multiprocessing def do_calculation(data): return data * 2 def start_process(): print('Starting', multiprocessing.current_process().name) if __name__ == '__main__': inputs = list(range(10)) print('Input :', inputs) builtin_outputs = map(do_calculation, inputs) print('Built-in:', builtin_outputs) pool_size = multiprocessing.cpu_count() * 2 pool = multiprocessing.Pool( processes=pool_size, initializer=start_process, maxtasksperchild=2, ) pool_outputs = pool.map(do_calculation, inputs) pool.close() # no more tasks pool.join() # wrap up current tasks print('Pool :', pool_outputs)
運行結果:當工人完成分配的任務時,即便沒有更多的工做,他們也會從新開始工做。在這個輸出中,有9個worker被建立,儘管只有10個任務,有的worker一次能夠完成其中的兩個任務。
Input : [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] Built-in: <map object at 0x00000000025CA080> Starting SpawnPoolWorker-4 Starting SpawnPoolWorker-2 Starting SpawnPoolWorker-1 Starting SpawnPoolWorker-5 Starting SpawnPoolWorker-3 Starting SpawnPoolWorker-8 Starting SpawnPoolWorker-6 Starting SpawnPoolWorker-7 Starting SpawnPoolWorker-9 Pool : [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]