介紹
multiprocessing
是一個使用相似於threading
模塊的API支持生成進程的包。該multiprocessing
軟件包提供本地和遠程併發,經過使用子進程而不是線程有效地支持 全局解釋器鎖。multiprocessing模塊充分利用給定機器上的多個處理器
。它能夠在Unix和Windows
上運行。python
該multiprocessing模塊還引入了threading模塊中沒有模擬的API 。一個主要的例子是該 Pool對象提供了一種方便的方法,能夠跨多個輸入值並行化函數的執行,跨過程分配輸入數據(數據並行)。如下示例演示了在模塊中定義此類函數的常見作法,以便子進程能夠成功導入該模塊。這個數據並行的基本例子使用Pool編程
from multiprocessing import Pool def f(x): return x*x if __name__ == '__main__': with Pool(5) as p: print(p.map(f, [1, 2, 3])) >>>[1, 4, 9]
#encoding:utf-8 # __author__ = 'donghao' # __time__ = 2019/4/1 11:27 from multiprocessing import Pool import time import os # 進程池 # 大量進程建立,使用pool的方法 def worker(msg): start = time.time() print('%s開始執行,進程號%d'%(msg,os.getpid())) time.sleep(1) end = time.time() print('耗時%0.2f'%(end-start)) if __name__ == '__main__': po = Pool(3) for i in range(10): po.apply_async(worker, (i,)) print('——tart____') po.close() # 關閉進程池,關閉後再也不接受新的請求 po.join() # 等待全部的子進程執行完成,必須放到close以後
apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞,apply(func[, args[, kwds]])是阻塞的
close() 關閉pool,使其不在接受新的任務。
terminate() 結束工做進程,不在處理未完成的任務。
join() 主進程阻塞,等待子進程的退出, join方法要在close或terminate以後使用。安全
Process
在multiprocessing
,經過建立Process
對象而後調用其start()
方法來生成進程。 Process 遵循API的threading.Thread
服務器
from multiprocessing import Process def f(name): print('hello', name) if __name__ == '__main__': p = Process(target=f, args=('bob',)) p.start() p.join()
顯示所涉及的各個進程ID網絡
from multiprocessing import Process import os def info(title): print(title) print('module name:', __name__) print('父進程:', os.getppid()) print('進程:', os.getpid()) def f(name): info('函數 f') print('我是', name) if __name__ == '__main__': info('main line') p = Process(target=f, args=('魯班七號',)) p.start() p.join() >>> main line module name: __main__ 父進程: 1668 進程: 1368 函數 f module name: __mp_main__ 父進程: 1368 進程: 4644 我是 魯班七號
multiprocessing 支持進程之間的兩種通訊
隊列
併發
這個Queue是近乎克隆的queue.Queue。例如:app
from multiprocessing import Process, Queue def f(q): q.put(['魯班七號', '妲己', '後裔']) if __name__ == '__main__': q = Queue() p = Process(target=f, args=(q,)) p.start() print(q.get()) # prints ['魯班七號', '妲己', '後裔'] p.join()
隊列是線程和進程安全的。async
管道
函數
from multiprocessing import Process, Pipe def f(conn): conn.send(['魯班七號', '妲己', '後裔']) conn.close() if __name__ == '__main__': parent_conn,child_conn = Pipe() p = Process(target=f,args=(child_conn,)) p.start() print(parent_conn.recv()) p.join() parent_conn.close()
返回的兩個鏈接對象Pipe()表示管道的兩端。每一個鏈接對象都有send()
和 recv()
方法(以及其餘)。請注意,若是兩個進程(或線程)同時嘗試讀取或寫入管道的同一端,則管道中的數據可能會損壞。固然,同時使用管道的不一樣端的進程不存在損壞的風險spa
進程間的同步
multiprocessing
包含全部同步原語的等價物threading
。例如,可使用鎖來確保一次只有一個進程打印到標準輸出:
from multiprocessing import Process, Lock def f(l, i): print('hello world', i) if __name__ == '__main__': lock = Lock() for num in range(10): Process(target=f, args=(lock, num)).start()
不使用來自不一樣進程的鎖輸出容易被混淆。
進程間共享狀態
在進行併發編程時,一般最好儘可能避免使用共享狀態。使用多個進程時尤爲如此。
可是,若是您確實須要使用某些共享數據,那麼 multiprocessing提供了幾種方法。
共享內存
可使用Value
或 將數據存儲在共享存儲器映射中Array
。例如,如下代碼
from multiprocessing import Process, Value, Array def f(n, a): n.value = 3.1415927 for i in range(len(a)): a[i] = -a[i] if __name__ == '__main__': num = Value('d', 0.0) arr = Array('i', range(10)) p = Process(target=f, args=(num, arr)) p.start() p.join() print(num.value) print(arr[:]) >>> 3.1415927 [0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
服務器進程
Manager()
控制器返回的管理器對象控制一個服務器進程,該進程保存Python對象並容許其餘進程使用代理操做它們
經過返回的經理Manager()將支持類型
list,dict,Namespace,Lock, RLock,Semaphore,BoundedSemaphore, Condition,Event,Barrier, Queue,Value和Array
例如
from multiprocessing import Process, Manager def f(d, l, kills): d['name'] = '程咬金' d['slogan'] = '真男人,必需要有強健的肌肉,身體和精神' d['裝備'] = None l.reverse() kills.append('後裔') if __name__ == '__main__': with Manager() as manager: d = manager.dict() l = manager.list(range(10)) kills = manager.list(['達摩','魯班七號']) p = Process(target=f, args=(d, l, kills)) p.start() p.join() print(d) print(l) print(kills) >>> {'name': '程咬金', 'slogan': '真男人,必需要有強健的肌肉,身體和精神', '裝備': None} [9, 8, 7, 6, 5, 4, 3, 2, 1, 0] ['達摩', '魯班七號', '後裔']
服務器進程管理器比使用共享內存對象更靈活,由於它們能夠支持任意對象類型。此外,單個管理器能夠經過網絡由不一樣計算機上的進程共享。可是,它們比使用共享內存慢。
daemon程序
# 不加daemon屬性 import multiprocessing import time def worker(interval): print("work start:{0}".format(time.ctime())) time.sleep(interval) print("work end:{0}".format(time.ctime())) if __name__ == "__main__": p = multiprocessing.Process(target = worker, args = (3,)) p.start() print("end!") >>> end! work start:Mon Apr 1 16:08:40 2019 work end:Mon Apr 1 16:08:43 2019
#加上daemon屬性 import multiprocessing import time def worker(interval): print("work start:{0}".format(time.ctime())) time.sleep(interval) print("work end:{0}".format(time.ctime())) if __name__ == "__main__": p = multiprocessing.Process(target = worker, args = (3,)) p.daemon = True p.start() print("end!") >>> end!
注:因子進程設置了daemon屬性,主進程結束,它們就隨着結束了。
Event用來實現進程間同步通訊。
import multiprocessing import time def wait_for_event(e): print("wait_for_event: starting") e.wait() print("wairt_for_event: e.is_set()->" + str(e.is_set())) def wait_for_event_timeout(e, t): print("wait_for_event_timeout:starting") e.wait(t) print("wait_for_event_timeout:e.is_set->" + str(e.is_set())) if __name__ == "__main__": e = multiprocessing.Event() w1 = multiprocessing.Process(name = "block", target = wait_for_event, args = (e,)) w2 = multiprocessing.Process(name = "non-block", target = wait_for_event_timeout, args = (e, 1)) w1.start() w2.start() time.sleep(5) e.set() print("main: event is set") >>> wait_for_event: starting wait_for_event_timeout:starting wait_for_event_timeout:e.is_set->False main: event is set wairt_for_event: e.is_set()->True
文件拷貝器:
#encoding:utf-8 # __author__ = 'donghao' # __time__ = 2019/4/1 14:14 from multiprocessing import pool,Manager,Queue import os,time def mycopy(old_file_name, new_file_name, filename, queue): f = open(old_file_name+'/' + filename,'rb') content = f.read() f.close() w = open(new_file_name+'/' + filename,'wb') w.write(content) w.close() queue.put(filename) def main(): old_file_name = input('請輸入文件名稱') path = os.listdir(old_file_name) length = len(path) po = pool.Pool(5) queue = Manager().Queue() try: new_file_name = old_file_name+'[副本]' os.mkdir(new_file_name) except: pass for filename in path: po.apply_async(mycopy,args=(old_file_name, new_file_name, filename, queue)) po.close() copy_file_nums = 0 while True: filename = queue.get() copy_file_nums += 1 print('\r 拷貝進度: %0.2f %%'%(copy_file_nums*100/length),end='') if copy_file_nums >= length: break print('\n文件拷貝成功!') po.join() if __name__ == '__main__': main()