運用多進程時,將方法放在main()中,不然會出現異常警告。服務器
Process()
基本使用:與Thread()
相似。markdown
Pool()
基本使用:網絡
其中map方法用起來和內置的map函數同樣,卻有多進程的支持。數據結構
from multiprocessing import Pool pool = Pool(2) pool.map(fib, [35] * 2)
multiprocessing.dummy
模塊:app
multiprocessing.dummy replicates the API of multiprocessing but is no more than a wrapper around the threading module.dom
對於以上部分知識點,沒有實際運用過,只是單純瞭解並編寫Demo進行了練習,理解沒有很透徹。分佈式
# -*- coding: utf-8 -*- from multiprocessing import Process, Pool from multiprocessing.dummy import Pool as DummyPool import time import datetime def log_time(methond_name): def decorator(f): def wrapper(*args, **kwargs): start_time = time.time() res = f(*args, **kwargs) end_time = time.time() print('%s cost %ss' % (methond_name, (end_time - start_time))) return res return wrapper return decorator def fib(n): if n <=2 : return 1 return fib(n-1) + fib(n-2) @log_time('single_process') def single_process(): fib(33) fib(33) @log_time('multi_process') def multi_process(): jobs = [] for _ in range(2): p = Process(target=fib, args=(33, )) p.start() jobs.append(p) for j in jobs: j.join() @log_time('pool_process') def pool_process(): pool = Pool(2) pool.map(fib, [33]*2) @log_time('dummy_pool') def dummy_pool(): pool = DummyPool(2) pool.map(fib, [33]*2) if __name__ == '__main__': single_process() multi_process() pool_process() dummy_pool()
理解稍有困難。注意:若是你Python基礎不夠紮實,能夠點我進裙看個人最新入門到實戰教程複習memcached
實現生產消費者模型,一個隊列存聽任務,一個隊列存放結果。 multiprocessing
模塊下也有Queue
,但不提供task_done()
和join()
方法。故利用Queue
存放結果,JoinableQueue()
來存聽任務。函數
仿照的Demo,一個消費者進程和一個生產者進程:
# -*- coding: utf-8 -*- from multiprocessing import Process, Queue, JoinableQueue import time import random def double(n): return n * 2 def producer(name, task_q): while 1: n = random.random() if n > 0.8: # 大於0.8時跳出 task_q.put(None) print('%s break.' % name) break print('%s produce %s.' % (name, n)) task_q.put((double, n)) def consumer(name, task_q, result_q): while 1: task = task_q.get() if task is None: print('%s break.' % name) break func, arg = task res = func(arg) time.sleep(0.5) # 阻塞 task_q.task_done() result_q.put(res) print('%s consume %s, result %s' % (name, arg, res)) def run(): task_q = JoinableQueue() result_q = Queue() processes = [] p1 = Process(name='p1', target=producer, args=('p1', task_q)) c1 = Process(name='c1', target=consumer, args=('c1', task_q, result_q)) p1.start() c1.start() processes.append(p1) processes.append(c1) # join()阻塞主進程 for p in processes: p.join() # 子進程結束後,輸出result中的值 while 1: if result_q.empty(): break result = result_q.get() print('result is: %s' % result) if __name__ == '__main__': run()
若是存在多個consumer()
進程,只會有一個consumer()
進程能取出None
並break,其餘的則會在task_q.get()
一直掛起,嘗試在consumer()
方法中添加超時退出。
import queue def consumer(name, task_q, result_q): while 1: try: task = task_q.get(1) # 1s except queue.Empty: print('%s time out, break.' % name) if task is None: print('%s break.' % name) break func, arg = task res = func(arg) time.sleep(0.5) # 阻塞 task_q.task_done() result_q.put(res) print('%s consume %s, result %s' % (name, arg, res))
利用sharedctypes
中的Array
, Value
來共享內存。
下例爲仿照。
# -*- coding: utf-8 -*- from pprint import pprint # 共享內存 from multiprocessing import sharedctypes, Process, Lock from ctypes import Structure, c_bool, c_double pprint(sharedctypes.typecode_to_type) lock = Lock() class Point(Structure): _fields_ = [('x', c_double), ('y', c_double)] # _fields_ def modify(n, b, s, arr, A): n.value **= 2 b.value = True s.value = s.value.upper() arr[0] = 10 for a in A: a.x **= 2 a.y **= 2 if __name__ == '__main__': n = sharedctypes.Value('i', 7) b = sharedctypes.Value(c_bool, False, lock=False) s = sharedctypes.Array('c', b'hello world', lock=lock) # bytes arr = sharedctypes.Array('i', range(5), lock=True) A = sharedctypes.Array(Point, [(1.875, -6.25), (-5.75, 2.0)], lock=lock) p = Process(target=modify, args=(n, b, s, arr, A)) p.start() p.join() print(n.value) print(b.value) print(s.value) print(arr[:]) print([(a.x, a.y) for a in A])
實際項目中利用Value
來監測子進程的任務狀態, 並經過memcached來存儲更新刪除。
# -*- coding: utf-8 -*- from multiprocessing import Process, Value import time import datetime import random FINISHED = 3 FAILED = 4 INPROCESS = 2 WAITING = 1 def execute_method(status, process): time.sleep(1) status.value = INPROCESS # test time.sleep(1) status.value = FINISHED # test time.sleep(0.5) def run(execute_code): status = Value('i', WAITING ) process = Value('f', 0.0) # mem_cache.set('%s_status' % execute_code, status.value, 0) # mem_cache.set('%s_process' % execute_code, process .value, 0) p = Process(target=execute_method, args=(status, process)) p.start() start_time = datetime.datetime.now() while True: print(status.value) now_time = datetime.datetime.now() if (now_time - start_time).seconds > 30: # 超過30sbreak # mem_cache.delete('%s_status' % execute_code) # mem_cache.delete('%s_process' % execute_code) print('execute failed') p.terminate() break if status.value == 3: # mem_cache.delete('%s_status' % execute_code) # mem_cache.delete('%s_process' % execute_code) print('end execute') break else: # mem_cache.set('%s_status' % execute_code, status.value, 0) # mem_cache.set('%s_process' % execute_code, process .value, 0) print('waiting or executing') time.sleep(0.5) p.join()
下例爲仿照博客中的服務進程的例子,簡單的展現了Manager
的常見的共享方式。
一個multiprocessing.Manager對象會控制一個服務器進程,其餘進程能夠經過代理的方式來訪問這個服務器進程。 常見的共享方式有如下幾種:
1. Namespace。建立一個可分享的命名空間。
2. Value/Array。和上面共享ctypes對象的方式同樣。
dict/list。建立一個可分享的
3. dict/list,支持對應數據結構的方法。
4. Condition/Event/Lock/Queue/Semaphore。建立一個可分享的對應同步原語的對象。
# -*- coding: utf-8 -*- from multiprocessing import Manager, Process def modify(ns, lproxy, dproxy): ns.name = 'new_name' lproxy.append('new_value') dproxy['new'] = 'new_value' def run(): # 數據準備 manager = Manager() ns = manager.Namespace() ns.name = 'origin_name' lproxy = manager.list() lproxy.append('origin_value') dproxy = manager.dict() dproxy['origin'] = 'origin_value' # 子進程 p = Process(target=modify, args=(ns, lproxy, dproxy)) p.start() print(p.pid) p.join() print('ns.name: %s' % ns.name) print('lproxy: %s' % lproxy) print('dproxy: %s' % dproxy) if __name__ == '__main__': run()
上例主要是展現了Manager
中的共享對象類型和代理,查看源碼知是經過register()
方法。
multiprocessing/managers.py:
# # Definition of SyncManager # class SyncManager(BaseManager): ''' Subclass of `BaseManager` which supports a number of shared object types. The types registered are those intended for the synchronization of threads, plus `dict`, `list` and `Namespace`. The `multiprocessing.Manager()` function creates started instances of this class. ''' SyncManager.register('Queue', queue.Queue) SyncManager.register('JoinableQueue', queue.Queue) SyncManager.register('Event', threading.Event, EventProxy) SyncManager.register('Lock', threading.Lock, AcquirerProxy) SyncManager.register('RLock', threading.RLock, AcquirerProxy) SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy) SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore, AcquirerProxy) SyncManager.register('Condition', threading.Condition, ConditionProxy) SyncManager.register('Barrier', threading.Barrier, BarrierProxy) SyncManager.register('Pool', pool.Pool, PoolProxy) SyncManager.register('list', list, ListProxy) SyncManager.register('dict', dict, DictProxy) SyncManager.register('Value', Value, ValueProxy) SyncManager.register('Array', Array, ArrayProxy) SyncManager.register('Namespace', Namespace, NamespaceProxy) # types returned by methods of PoolProxy SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False) SyncManager.register('AsyncResult', create_method=False)
除了在子進程中,還可利用Manager()
來在不一樣進程間通訊,以下面的分佈式進程簡單實現。
和上例的主要區別是,非子進程間進行通訊。
manager_server.py:
# -*- coding: utf-8 -*- from multiprocessing.managers import BaseManager host = '127.0.0.1' port = 8080 authkey = b'python' shared_list = [] class ServerManager(BaseManager): pass ServerManager.register('get_list', callable=lambda: shared_list) server_manager = ServerManager(address=(host, port), authkey=authkey) server = server_manager.get_server() server.serve_forever()
manager_client.py
# -*- coding: utf-8 -*- from multiprocessing.managers import BaseManager host = '127.0.0.1' port = 8080 authkey = b'python' class ClientManager(BaseManager): pass ClientManager.register('get_list') client_manager = ClientManager(address=(host, port), authkey=authkey) client_manager.connect() l = client_manager.get_list() print(l) l.append('new_value') print(l)
運行屢次後,shared_list
中會不斷添加new_value
。
仿照廖雪峯教程上的分佈式進程加以適當修改。
manager_server.py:
# -*- coding: utf-8 -*- from multiprocessing.managers import BaseManager from multiprocessing import Condition, Value import queue host = '127.0.0.1' port = 8080 authkey = b'python' task_q = queue.Queue(10) result_q = queue.Queue(20) cond = Condition() done = Value('i', 0) def double(n): return n * 2 class ServerManager(BaseManager): pass ServerManager.register('get_task_queue', callable=lambda: task_q) ServerManager.register('get_result_queue', callable=lambda: result_q) ServerManager.register('get_cond', callable=lambda: cond) ServerManager.register('get_done', callable=lambda: done) ServerManager.register('get_double', callable=double) server_manager = ServerManager(address=(host, port), authkey=authkey) server = server_manager.get_server() print('start server') server.serve_forever(
manager_producer.py:
# -*- coding: utf-8 -*- from multiprocessing.managers import BaseManager import random import time host = '127.0.0.1' port = 8080 authkey = b'python' class ProducerManager(BaseManager): pass ProducerManager.register('get_task_queue') ProducerManager.register('get_cond') ProducerManager.register('get_done') producer_manager = ProducerManager(address=(host, port), authkey=authkey) producer_manager.connect() task_q = producer_manager.get_task_queue() cond = producer_manager.get_cond() # done = producer_manager.get_done() count = 20 # 最多有20個任務 while count > 0: if cond.acquire(): if not task_q.full(): n = random.randint(0, 10) task_q.put(n) print("Producer:deliver one, now tasks:%s" % task_q.qsize()) cond.notify() count -= 1 time.sleep(0.5) else: print("Producer:already full, stop deliver, now tasks:%s" % task_q.qsize()) cond.wait() cond.release() # done.value = 1 print('Producer break')
manager_consumer.py:
# -*- coding: utf-8 -*- from multiprocessing.managers import BaseManager host = '127.0.0.1' port = 8080 authkey = b'python' class ConsumerManager(BaseManager): pass ConsumerManager.register('get_task_queue') ConsumerManager.register('get_result_queue') ConsumerManager.register('get_cond') # ConsumerManager.register('get_done') ConsumerManager.register('get_double') consumer_manager = ConsumerManager(address=(host, port), authkey=authkey) consumer_manager.connect() task_q = consumer_manager.get_task_queue() result_q = consumer_manager.get_result_queue() cond = consumer_manager.get_cond() # done = consumer_manager.get_done() while 1: if result_q.full(): print('result queue is full') break if cond.acquire(): if not task_q.empty(): arg = task_q.get() res = consumer_manager.get_double(arg) print("Consumer:consume one, now tasks:%s" % task_q.qsize()) result_q.put(res) cond.notify() else: print("Consumer:only 0, stop consume, products") cond.wait() cond.release() while 1: if result_q.empty(): break result = result_q.get() print('result is: %s' % result)