- 一、multiprocess模塊詳解
Python的os模塊封裝了常見的系統調用,其中就包含 「fork函數」,經過這個函數能夠輕鬆的建立子進程,可是要注意一點,在Windows系統上是沒法使用fork函數的,Python爲咱們提供了可跨平臺的multiprocess模塊。該模塊提供了一個Process類來表明一個進程對象,用法和Thread很是類似。php
建立一個進程的代碼示例以下:css
from multiprocessing import Process
import os
def show_msg(name):
print("子進程運行中:name = %s , pid = %d " % (name, os.getpid()))
if __name__ == '__main__':
print("父進程 %d" % os.getpid())
p = Process(target=show_msg, args=('測試',))
print("開始執行子進程~")
p.start()
p.join()
print("子進程執行完畢!")
複製代碼
運行結果以下:python
父進程 26539
開始執行子進程~
子進程運行中:name = 測試 , pid = 26540
子進程執行完畢!
複製代碼
Process構造函數:數組
Process(group=None, target=None, name=None, args=(), kwargs={})
複製代碼
參數詳解:安全
- group:分組,不多用到
- target:調用對象,傳入任務執行函數做爲參數
- name:進程別名
- args:給調用對象以元組的形式提供參數,好比有兩個參數args=(a,b),若是隻有一個參數,要這樣寫args=(a,),不能把逗號漏掉,否則會被當作括號運算符使用!
- kwargs:調用對象的關鍵字參數字典
Process的經常使用函數:bash
- is_alive():判斷進程實例是否還在執行;
- join([timeout]):是否等待進程實例執行結束,或等待多少秒;
- start():啓動進程實例(建立子進程);
- run():若是沒有給定target參數,對這個對象調用start()方法時,
就將執行對象中的run()方法;- terminate():無論任務是否完成,當即終止;
除了使用fork函數和上述操做建立進程的方式外,還能夠自定義一個Process類,重寫__init__
和run函數
便可,代碼示例以下:多線程
from multiprocessing import Process
import os
class MyProcess(Process):
def __init__(self, name):
Process.__init__(self)
self.msg = name
def run(self):
print("子進程運行中:name = %s , pid = %d " % (self.msg, os.getpid()))
if __name__ == '__main__':
print("父進程 %d" % os.getpid())
p = MyProcess('測試')
print("開始執行子進程~")
p.start()
p.join()
print("子進程執行完畢!")
複製代碼
運行結果以下:併發
父進程 26794
開始執行子進程~
子進程運行中:name = 測試 , pid = 26795
子進程執行完畢!
複製代碼
知道了如何建立進程,那麼實現多進程有不是什麼難事了,一個循環建立多個便可,可是有個問題,進程但是重量級別的程序,重複進程建立和銷燬會形成必定的性能開銷! Python爲咱們提供了一個進程池對象Pool用來緩解進程重複關啓帶來的性能消耗問題。在建立進程池的時候指定一個容量,若是接收到一個新任務,而池沒滿的話,會建立一個新的進程來執行這個任務,若是池滿的話,任務則會進入等待狀態,直到池中有進程結束,纔會建立新的進程來執行這個任務。app
Pool的構造函數:async
Pool(processes=None, initializer=None, initargs=(),maxtasksperchild=None, context=None)
複製代碼
通常只用到第一個參數,processes用於設置進程池的容量,即最多併發的進程數量,若是不寫默認使用os.cpu_count()
返回的值。
Pool經常使用函數詳解:
- apply(func, args=(), kwds={}):使用堵塞方式調用func,堵塞的意思是一個進程結束,
釋放回進程池,下一個進程才能夠開始,args爲傳遞給func的參數列表,kwds爲傳遞給
func的關鍵字參數列表,該方法在Python 2.3後就不建議使用了。- apply_async(func, args=(), kwds={}, callback=None,error_callback=None) :使用非阻塞方式調用func,進程池進程最大數能夠同時執行,還支持返回結果後進行回調。
- close():關閉進程池,使其再也不接受新的任務;
- terminate():結束工做進程,再也不處理未處理的任務,無論任務是否完成,當即終止;
- join():主進程阻塞,等待⼦進程的退出,必須在close或terminate以後使用;
- map(func, iterable, chunksize=None):這裏的map函數和Python內置的高階函數map相似,只是這裏的map方法是在進程池多進程併發進行的,接收一個函數和可迭代對象,把函數做用到每一個元素,獲得一個新的list返回。
最簡單的進程池代碼示例以下:
import multiprocessing as mp
import time
def func(msg):
time.sleep(1)
print(mp.current_process().name + " : " + msg)
if __name__ == '__main__':
pool = mp.Pool()
for i in range(20):
msg = "Do Something %d" % (i)
pool.apply_async(func, (msg,))
pool.close()
pool.join()
print("子進程執行任務完畢!")
複製代碼
運行結果以下:
ForkPoolWorker-4 : Do Something 3
ForkPoolWorker-2 : Do Something 1
ForkPoolWorker-1 : Do Something 0
ForkPoolWorker-3 : Do Something 2
ForkPoolWorker-5 : Do Something 4
ForkPoolWorker-6 : Do Something 5
ForkPoolWorker-7 : Do Something 6
ForkPoolWorker-8 : Do Something 7
ForkPoolWorker-2 : Do Something 9
ForkPoolWorker-4 : Do Something 8
ForkPoolWorker-1 : Do Something 11
ForkPoolWorker-7 : Do Something 12
ForkPoolWorker-5 : Do Something 13
ForkPoolWorker-6 : Do Something 14
ForkPoolWorker-3 : Do Something 10
ForkPoolWorker-8 : Do Something 15
ForkPoolWorker-6 : Do Something 19
ForkPoolWorker-1 : Do Something 17
ForkPoolWorker-5 : Do Something 18
ForkPoolWorker-7 : Do Something 16
子進程執行任務完畢!
複製代碼
上面的輸出結果順序並無按照循環中的順序輸出,能夠利用apply_async
的返回值是:被進程調用的函數的返回值,來規避,修改後的代碼以下:
import multiprocessing as mp
import time
def func(msg):
time.sleep(1)
return mp.current_process().name + " : " + msg
if __name__ == '__main__':
pool = mp.Pool()
results = []
for i in range(20):
msg = "Do Something %d" % i
results.append(pool.apply_async(func, (msg,)))
pool.close()
pool.join()
for result in results:
print(result.get())
print("子進程執行任務完畢!")
複製代碼
運行結果以下:
ForkPoolWorker-1 : Do Something 0
ForkPoolWorker-2 : Do Something 1
ForkPoolWorker-3 : Do Something 2
ForkPoolWorker-4 : Do Something 3
ForkPoolWorker-5 : Do Something 4
ForkPoolWorker-7 : Do Something 6
ForkPoolWorker-6 : Do Something 5
ForkPoolWorker-8 : Do Something 7
ForkPoolWorker-1 : Do Something 8
ForkPoolWorker-2 : Do Something 9
ForkPoolWorker-4 : Do Something 11
ForkPoolWorker-3 : Do Something 10
ForkPoolWorker-7 : Do Something 12
ForkPoolWorker-8 : Do Something 13
ForkPoolWorker-5 : Do Something 14
ForkPoolWorker-6 : Do Something 15
ForkPoolWorker-1 : Do Something 16
ForkPoolWorker-2 : Do Something 17
ForkPoolWorker-4 : Do Something 18
ForkPoolWorker-3 : Do Something 19
子進程執行任務完畢!
複製代碼
感受仍是有點模糊,經過一個多進程統計目錄下文件的行數和字符個數的腳原本鞏固,代碼示例以下:
import multiprocessing as mp
import time
import os
result_file = 'result.txt' # 統計結果寫入文件名
# 得到路徑下的文件列表
def get_files(path):
file_list = []
for file in os.listdir(path):
if file.endswith('py'):
file_list.append(os.path.join(path, file))
return file_list
# 統計每一個文件中函數與字符數
def get_msg(path):
with open(path, 'r', encoding='utf-8') as f:
content = f.readlines()
f.close()
lines = len(content)
char_count = 0
for i in content:
char_count += len(i.strip("\n"))
return lines, char_count, path
# 將數據寫入到文件中
def write_result(result_list):
with open(result_file, 'a', encoding='utf-8') as f:
for result in result_list:
f.write(result[2] + " 行數:" + str(result[0]) + " 字符數:" + str(result[1]) + "\n")
f.close()
if __name__ == '__main__':
start_time = time.time()
file_list = get_files(os.getcwd())
pool = mp.Pool()
result_list = pool.map(get_msg, file_list)
pool.close()
pool.join()
write_result(result_list)
print("處理完畢,用時:", time.time() - start_time)
複製代碼
運行結果以下:
# 控制檯輸出
處理完畢,用時: 0.13662314414978027
# result.txt文件內容
/Users/jay/Project/Python/Book/Chapter 11/11_4.py 行數:33 字符數:621
/Users/jay/Project/Python/Book/Chapter 11/11_1.py 行數:32 字符數:578
/Users/jay/Project/Python/Book/Chapter 11/11_5.py 行數:52 字符數:1148
/Users/jay/Project/Python/Book/Chapter 11/11_13.py 行數:20 字符數:333
/Users/jay/Project/Python/Book/Chapter 11/11_16.py 行數:62 字符數:1320
/Users/jay/Project/Python/Book/Chapter 11/11_12.py 行數:23 字符數:410
/Users/jay/Project/Python/Book/Chapter 11/11_15.py 行數:48 字符數:1087
/Users/jay/Project/Python/Book/Chapter 11/11_8.py 行數:17 字符數:259
/Users/jay/Project/Python/Book/Chapter 11/11_11.py 行數:18 字符數:314
/Users/jay/Project/Python/Book/Chapter 11/11_10.py 行數:46 字符數:919
/Users/jay/Project/Python/Book/Chapter 11/11_14.py 行數:20 字符數:401
/Users/jay/Project/Python/Book/Chapter 11/11_9.py 行數:31 字符數:623
/Users/jay/Project/Python/Book/Chapter 11/11_2.py 行數:32 字符數:565
/Users/jay/Project/Python/Book/Chapter 11/11_6.py 行數:23 字符數:453
/Users/jay/Project/Python/Book/Chapter 11/11_7.py 行數:37 字符數:745
/Users/jay/Project/Python/Book/Chapter 11/11_3.py 行數:29 字符數:518
複製代碼
涉及到了多個進程,不可避免的要處理進程間數據交換問題,多進程不像多線程,不一樣進程之間內存是不共享的,multiprocessing模塊提供了四種進程間共享數據的方式:Queue,Value和Array,Manager.dict和pipe。下面一一介紹這四種方式的具體用法。
多進程安全的隊列,put方法用以插入數據到隊列中,put方法有兩個可選參數:blocked和timeout。若blocked爲True(默認)且timeout爲正值,該方法會阻塞timeout指定的時間,直到該隊列有剩餘的空間。若是超時,會拋出Queue.Full異常。若是blocked爲False,但該Queue已滿,會當即拋出Queue.Full異常,而get方法則從隊列讀取而且刪除一個元素,參數規則同拋出的一場是Queue.Empty。另外Queue不止適用於進程通訊,也適用於線程,順道寫一個比較單線程,多線程
和多進程的運行效率對比示例,具體代碼以下:
import threading as td
import multiprocessing as mp
import time
def do_something(queue):
result = 0
for i in range(100000):
result += i ** 2
queue.put(result)
# 單線程
def normal():
result = 0
for _ in range(3):
for i in range(100000):
result += i ** 2
print("單線程處理結果:", result)
# 多線程
def multi_threading():
q = mp.Queue()
t1 = td.Thread(target=do_something, args=(q,))
t2 = td.Thread(target=do_something, args=(q,))
t3 = td.Thread(target=do_something, args=(q,))
t1.start()
t2.start()
t3.start()
t1.join()
t2.join()
t3.join()
print("多線程處理結果:", (q.get() + q.get() + q.get()))
# 多進程
def multi_process():
q = mp.Queue()
p1 = mp.Process(target=do_something, args=(q,))
p2 = mp.Process(target=do_something, args=(q,))
p3 = mp.Process(target=do_something, args=(q,))
p1.start()
p2.start()
p3.start()
p1.join()
p2.join()
p3.join()
print("多進程處理結果:", (q.get() + q.get() + q.get()))
if __name__ == '__main__':
start_time_1 = time.time()
normal()
start_time_2 = time.time()
print("單線程處理耗時:", start_time_2 - start_time_1)
multi_threading()
start_time_3 = time.time()
print("多線程處理耗時:", start_time_3 - start_time_2)
multi_process()
start_time_4 = time.time()
print("多進程處理耗時:", start_time_4 - start_time_3)
複製代碼
運行結果以下:
單線程處理結果: 999985000050000
單線程處理耗時: 0.10726284980773926
多線程處理結果: 999985000050000
多線程處理耗時: 0.13849401473999023
多進程處理結果: 999985000050000
多進程處理耗時: 0.041596174240112305
複製代碼
從上面的結果能夠明顯看出在處理CPU密集型任什麼時候,多進程更優。
二者是經過「共享內存」的方式來共享數據的,前者用於須要共享單個值,後者用於
共享多個值(數組),構造函數的第一個元素爲數據類型,第二個元素爲值。數據類型對照如表所示。
標記 | 數據類型 | 標記 | 數據類型 |
---|---|---|---|
'c' | ctypes.c_char | 'u' | ctypes.c_wchar |
'b' | ctypes.c_byte | 'B' | ctypes.c_ubyte |
'h' | ctypes.c_short | 'H' | ctypes.c_ushort |
'i' | ctypes.c_int | 'I' | ctypes.c_uint |
'l' | ctypes.c_long | 'L' | ctypes.c_ulong |
'f' | ctypes.c_float | 'd' | ctypes.c_double |
使用代碼示例以下:
import multiprocessing as mp
def do_something(num, arr):
num.value += 1
for i in range(len(arr)):
arr[i] = arr[i] * 2
if __name__ == '__main__':
value = mp.Value('i', 1)
array = mp.Array('i', range(5))
print("剛開始的值:", value.value, array[:])
# 建立進程1
p1 = mp.Process(target=do_something, args=(value, array))
p1.start()
p1.join()
print("進程1操做後的值:", value.value, array[:])
# 建立進程2
p2 = mp.Process(target=do_something, args=(value, array))
p2.start()
p2.join()
print("進程2操做後的值:", value.value, array[:])
複製代碼
運行結果以下:
剛開始的值: 1 [0, 1, 2, 3, 4]
進程1操做後的值: 2 [0, 2, 4, 6, 8]
進程2操做後的值: 3 [0, 4, 8, 12, 16]
複製代碼
Python還爲咱們提供更增強大的數據共享類,支持更豐富的數據類型,好比Value、Array、dict、list、Lock、Semaphore等等,另外Manager還能夠共享類的實例對象。有一點要注意:進程間通訊應該儘可能避免使用共享數據的方式!
使用代碼示例以下:
import multiprocessing as mp
import os
import time
def do_something(dt):
dt[os.getpid()] = int(time.time())
print(data_dict)
if __name__ == '__main__':
manager = mp.Manager()
data_dict = manager.dict()
for i in range(3):
p=mp.Process(target=do_something,args=(data_dict,))
p.start()
p.join()
複製代碼
運行結果以下:
{5432: 1533200189}
{5432: 1533200189, 5433: 1533200189}
{5432: 1533200189, 5433: 1533200189, 5434: 1533200189}
複製代碼
管道,簡化版的Queue,經過Pipe()構造函數能夠建立一個進程通訊用的管道對象,默認雙向,意味着使用管道只能同時開啓兩個進程!若是想設置單向,能夠添加參數「duplex=False」,雙向便可發送也可接受,可是隻容許前面的端口用於接收,後面的端口用於發送。管道對象發送和接收信息的函數依次爲send()和recv()。使用代碼示例以下:
import multiprocessing as mp
def p_1(p):
p.send("你好啊!")
print("P1-收到信息:", p.recv())
def p_2(p):
print("P2-收到信息:", p.recv())
p.send("你也好啊!")
if __name__ == '__main__':
pipe = mp.Pipe()
p1 = mp.Process(target=p_1, args=(pipe[0],))
p2 = mp.Process(target=p_2, args=(pipe[1],))
p1.start()
p2.start()
p1.join()
p2.join()
複製代碼
運行結果以下:
P2-收到信息: 你好啊!
P1-收到信息: 你也好啊!
複製代碼
關於多進程加鎖的,能夠參見前面多進程加鎖部份內容,這裏就不重複講解了,只是導包換成了multiprocessing,好比threading.Lock換成了multiprocessing.Lock。
若是本文對你有所幫助,歡迎
留言,點贊,轉發
素質三連,謝謝😘~