Multiprocessing是一個相似於threading模塊的生成多進程的包,這個包提供了本地和遠程的進程併發。使用multiprocessing可以有效的解決python由於在GIL(全局解釋鎖)下在CPU密集型任務中的瓶頸問題,容許使用多核處理器來運行python腳本程序。官方介紹https://docs.python.org/2/library/multiprocessing.html。
html
multiprocessing生成進程大體是一下的流程:python
1.經過 multiprocessing.Process生成進程對象linux
2.調用start()方法啓動進程安全
3.調用join()方法,阻塞主進程知道 子進程執行完畢 數據結構
下面實例代碼展現如何經過multiprocessing建立子進程執行任務併發
# -*- coding: utf-8 -*- import multiprocessing import os def func(m): print("called function in process : %s , process id is %s" % (str(m), str(os.getpid()))) if __name__ == '__main__': ProcessJob = [] for i in range(5): # 依次開啓5個進程 p = multiprocessing.Process(target=func, args=(i,)) ProcessJob.append(p) p.start() # 啓動進程 p.join() # 阻塞進程直至 當前進程中的任務完成
結果: app
進程的命名dom
對與每一個進程在建立的時候,能夠經過name參數對進程名稱進行命名 例如異步
# -*- coding: utf-8 -*- import multiprocessing import os def func(m): name = multiprocessing.current_process().name print("current process name is " + name) print("called function in process : %s , process id is %s" % (str(m), str(os.getpid()))) if __name__ == '__main__': ProcessJob = [] for i in range(5): # 依次開啓5個進程 p = multiprocessing.Process(target=func,name='PROCESS_NAME_' + str(i), args=(i,)) ProcessJob.append(p) p.start() # 啓動進程 p.join() # 阻塞進程直至 當前進程中的任務完成
在子類中使用進程 async
在子類使用進程,可使類繼承自multiprocessing.Process達到效果
# -*- coding: utf-8 -*- import multiprocessing import os class MyProcess(multiprocessing.Process): def run(self): # 重寫 run函數 print("called rub method in process: %s, process id is : %s" % (multiprocessing.current_process().name, os.getpid())) if __name__ == '__main__': jobs = [] for i in range(5): p = MyProcess() # 建立對象 jobs.append(p) p.start() # 啓動進程對象 p.join() # 阻塞進程對象
結果:
called rub method in process: MyProcess-1, process id is : 1524
called rub method in process: MyProcess-2, process id is : 1525
called rub method in process: MyProcess-3, process id is : 1526
called rub method in process: MyProcess-4, process id is : 1527
called rub method in process: MyProcess-5, process id is : 1528
進程間如何交換數據對象
多進程間進行交換數據, multiprocessing提供了兩個數據通道,一個是隊列,一個是通道。
1. 經過隊列進行數據交換
建立一個進程共享的隊列,這個共享隊列是線程安全與進程安全的 在操做該共享隊列的時候不須要經過鎖來保持訪問的安全性
經過multiprocessing.Queue()來建立共享隊列。
2.經過管道進行數據交換
管道建立以後會返回一對鏈接對象, 每一個對象都有send/receive 方法 , 實現了進程間的通訊
以下例子:
# -*- coding: utf-8 -*- import multiprocessing import os def create_items(pipe): output_pipe , _ = pipe for item in range(10): output_pipe.send(item) output_pipe.close() def multiply_items(pipe_1, pipe_2): close, input_pipe = pipe_1 close.close() output_pipe, _ = pipe_2 try: while True: item = input_pipe.recv() output_pipe.send(item*item) except EOFError as err: output_pipe.close() if __name__ == '__main__': pipe_1 = multiprocessing.Pipe(True) # 建立 通道 process_pipe_1 = multiprocessing.Process(target= create_items, args=(pipe_1, )) #建立進程1 給通道1裏載入數據 process_pipe_1.start() pipe_2 = multiprocessing.Pipe(True) # 建立通道2 process_pipe_2 = multiprocessing.Process(target=multiply_items, args=(pipe_1, pipe_2, )) #將通道1裏的數據接收以後 放入 通道2 process_pipe_2.start()
pipe_1[0].close()
pipe_2[0].close()
try:
while True:
print(pipe_2[1].recv())
except EOFError as err:
print("end")
進程的同步
當多個進程協同來完成一個任務的時候,多個進程須要共享數據, 按照必定的順序來訪問數據完成任務,須要用到同步原語來控制獲取數據的順序,保證任務的正常進行。
1.Lock:使用acquire和release來管理共享資源的訪問
2. 事件:實現了進程間的簡單通訊, 一個進程發出事件 ,其餘一個進程或多個進程響應事件。經過Event對象來實現
3.信號量: 用越共享的公共資源。例如一個資源支持同時訪問的的數量。
4.屏障: 使全部的涉及屏障的進程到達某一條件後,在執行,屏障以前和以後的代碼不可以並行執行
詳細可參見 https://docs.python.org/2/library/multiprocessing.html#multiprocessing.managers.SyncManager
multiprocessing中的managers
這個managers是multiprocessing提供的一種管理器,用來協調多進程之間的共享信息,能夠用來實現分佈式進程之間的數據共享,協做完成任務。
例如,咱們在 一臺機器上的任務隊列中發佈任務, 遠程進程獲取 任務,執行完任務以後再將輸入回放到結果隊列, 不單單是隊列 ,還能夠是字典,list等其餘的數據結構
下面是一個 隊列的例子:
server端:
# -*- coding: utf-8 -*- import multiprocessing from multiprocessing.managers import BaseManager import random, time import queue # 繼承自BaseManager 用於註冊 下面的兩個隊列 class QueueManager(BaseManager): pass task_queue = queue.Queue() # 申明兩個隊列 一個任務隊列 一個結果隊列 result_queue = queue.Queue() def return_task_queue(): global task_queue return task_queue def return_result_queue(): global result_queue return result_queue if __name__ == '__main__': QueueManager.register('get_task_from_queue', callable=return_task_queue)# 註冊兩個隊列 QueueManager.register('put_result_to_queue', callable=return_result_queue) # 綁定端口以及驗證碼(window平臺下須要寫127.0.0.1, linux下能夠爲空 或者0.0.0.0, authkey 不能直接填字符串 會提示沒有正確的編碼 能夠用b'abc' 的方式 或者'abc'.encode('utf-8')) mng = QueueManager(address=('127.0.0.1', 5000), authkey=b'abc') # 啓動 mng.start() task = mng.get_task_from_queue() result = mng.put_result_to_queue() # 在任務隊列中放入任務 for i in range(10): n = random.randint(0, 1000) print("had put task into task_queue %s" % str(n)) task.put(n) # 等待 任務完成以後放入 結果隊列 取出打印 while True: if result.empty(): print("i am wait for result") time.sleep(1) else: rep = result.get() print("had get result from result_queue %s" % str(rep)) # mng.shutdown()
client端:
# -*- coding: utf-8 -*- import time, sys, queue import math from multiprocessing.managers import BaseManager class QueueManager(BaseManager): pass task_queue = queue.Queue() result_queue = queue.Queue() def return_task_queue(): global task_queue return task_queue def return_result_queue(): global result_queue return result_queue if __name__ == "__main__": server_addr = '127.0.0.1' QueueManager.register('get_task_from_queue', callable=return_task_queue) QueueManager.register('put_result_to_queue', callable=return_result_queue) mng = QueueManager(address=(server_addr, 5000), authkey=b'abc') mng.connect() task_get = mng.get_task_from_queue() result_put = mng.put_result_to_queue() while True: if task_get.empty(): print("wait for task") time.sleep(1) else: m = task_get.get(timeout = 1) rep = math.sqrt(m) time.sleep(1) result_put.put(str(rep))
multiprocessing中的進程池
進程的頻繁 建立和銷燬時很是耗費資源的 , multiprocessing.Pool提供給用戶一個常駐的進程池,當有任務來臨時,有空閒的進程則執行任務,沒有空閒的任務的時候,等待進程池中有空閒進程後,分配空閒進程給該任務執行
# -*- coding: utf-8 -*- import multiprocessing import time import random def func_square(x): print("process is exc %s", multiprocessing.current_process().name) time.sleep(2) return x*x if __name__ == "__main__": res = [] pool = multiprocessing.Pool(4) for i in range(10): res.append(pool.apply_async(func=func_square, args=(random.randint(0, 100), ))) # 異步執行 pool.close() pool.join() print("result is:") for r in res: print(r.get())
代碼中的 Pool.apply_async是並行執行 Pool.apply是阻塞的同步執行, 相似的還有Pool.map, Pool.map_async.