要讓Python程序實現多進程(multiprocessing),咱們先了解操做系統的相關知識。python
Unix/Linux操做系統提供了一個fork()
系統調用,它很是特殊。普通的函數調用,調用一次,返回一次,可是fork()
調用一次,返回兩次,由於操做系統自動把當前進程(稱爲父進程)複製了一份(稱爲子進程),而後,分別在父進程和子進程內返回。windows
子進程永遠返回0
,而父進程返回子進程的ID。這樣作的理由是,一個父進程能夠fork出不少子進程,因此,父進程要記下每一個子進程的ID,而子進程只須要調用getppid()
就能夠拿到父進程的ID。多線程
Python的os
模塊封裝了常見的系統調用,其中就包括fork
,能夠在Python程序中輕鬆建立子進程:app
import os print('Process (%s) start...' % os.getpid()) # Only works on Unix/Linux/Mac: pid = os.fork() if pid == 0: print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid())) else: print('I (%s) just created a child process (%s).' % (os.getpid(), pid))
以上代碼在windows上會運行失敗,由於沒有fork調用dom
python提供了multiprocessing包供多線程的開發,其提供了一個Process類來表明一個進程對象,使用方式和多線程Threading同樣async
from multiprocessing import Process import os # 子進程要執行的代碼 def run_proc(name): print('Run child process %s (%s)...' % (name, os.getpid())) if __name__=='__main__': print('Parent process %s.' % os.getpid()) p = Process(target=run_proc, args=('test',)) print('Child process will start.') p.start() p.join() print('Child process end.') # 輸出: Parent process 8768. Child process will start. Run child process test (8572)... Child process end.
建立子進程時,只須要傳入一個執行函數和函數的參數,建立一個Process
實例,用start()
方法啓動函數
join()
方法能夠等待子進程結束後再繼續往下運行,一般用於進程間的同步。ui
若是要啓動大量的子進程,能夠用進程池的方式批量建立子進程:spa
from multiprocessing import Pool, current_process import os, time, random def long_time_task(name): print('Run task %s %s (%s)...' % (name, current_process().name, os.getpid())) start = time.time() time.sleep(random.random() * 3) end = time.time() print('Task %s %s runs %0.2f seconds.' % (name, current_process().name, (end - start))) return name def done(name): print("Task %s %s is done" % (name, current_process().name)) if __name__=='__main__': print('Parent process %s.' % os.getpid()) p = Pool(4) for i in range(5): p.apply_async(long_time_task, args=(i,), callback=done) print('Waiting for all subprocesses done...') p.close() p.join() print('All subprocesses done.') # 輸出: Parent process 10040. Run task 0 PoolWorker-4 (5528)... Waiting for all subprocesses done... Run task 1 PoolWorker-1 (4844)... Run task 2 PoolWorker-2 (4892)... Run task 3 PoolWorker-3 (7492)... Task 0 PoolWorker-4 runs 1.70 seconds. Run task 4 PoolWorker-4 (5528)... Task 0 MainProcess is done Task 2 PoolWorker-2 runs 1.94 seconds. Task 2 MainProcess is done Task 1 PoolWorker-1 runs 2.26 seconds. Task 1 MainProcess is done Task 3 PoolWorker-3 runs 2.27 seconds. Task 3 MainProcess is done Task 4 PoolWorker-4 runs 1.83 seconds. Task 4 MainProcess is done All subprocesses done. # 如是 p.apply(long_time_task, args=(i,)) 阻塞版本的話(此處無callback參數),輸出: Parent process 7624. Run task 0 PoolWorker-3 (2128)... Task 0 PoolWorker-3 runs 2.98 seconds. Run task 1 PoolWorker-4 (5460)... Task 1 PoolWorker-4 runs 1.51 seconds. Run task 2 PoolWorker-2 (8780)... Task 2 PoolWorker-2 runs 0.66 seconds. Run task 3 PoolWorker-1 (7044)... Task 3 PoolWorker-1 runs 1.13 seconds. Run task 4 PoolWorker-3 (2128)... Task 4 PoolWorker-3 runs 2.94 seconds. Waiting for all subprocesses done... All subprocesses done.
對Pool
對象調用join()
方法會等待全部子進程執行完畢,調用join()
以前必須先調用close()
,調用close()
以後就不能繼續添加新的Process
了。操作系統
這裏Pool(4)即表明同時跑4個進程,不填寫默認是當前CPU個數。
上述代碼中的p.apply_async()
是apply()
函數的變體,apply_async()
是apply()
的並行版本,apply()
是apply_async()
的阻塞版本,使用apply()
主進程會被阻塞直到子進程執行結束。apply()
既是Pool
的方法,也是Python內置的函數,二者等價。
apply_async()
是能夠有callback回調函數的,回調函數的參數即爲子進程函數的返回值(這裏咱們能夠設計多種業務,回傳任務id,結束更改任務狀態等)
apply()
沒有回調函數,由於自己就是阻塞的,會等待返回子進程函數的返回值
對於進程間的通訊,multiprocessing提供了Queue,Pipes,Value等多種方式來交換數據
以Queue爲例,在父進程中建立兩個子進程,一個往Queue
裏寫數據,一個從Queue
裏讀數據:
# -*- coding: utf-8 -*- from multiprocessing import Process, Queue import os, time, random # 寫數據進程執行的代碼: def write(q): print('Process to write: %s' % os.getpid()) for value in ['A', 'B', 'C']: print('Put %s to queue...' % value) q.put(value) time.sleep(random.random()) # 讀數據進程執行的代碼: def read(q): print('Process to read: %s' % os.getpid()) while True: value = q.get(True) print('Get %s from queue.' % value) if __name__=='__main__': # 父進程建立Queue,並傳給各個子進程: q = Queue() pw = Process(target=write, args=(q,)) pr = Process(target=read, args=(q,)) # 啓動子進程pw,寫入: pw.start() # 啓動子進程pr,讀取: pr.start() # 等待pw結束: pw.join() # pr進程裏是死循環,沒法等待其結束,只能強行終止: pr.terminate() # 輸出: Process to read: 8868 Process to write: 6832 Put A to queue... Get A from queue. Put B to queue... Get B from queue. Put C to queue... Get C from queue.
在Unix/Linux下,multiprocessing
模塊封裝了fork()
調用,使咱們不須要關注fork()
的細節。因爲Windows沒有fork
調用,所以,multiprocessing
須要「模擬」出fork
的效果,父進程全部Python對象都必須經過pickle序列化再傳到子進程去,全部,若是multiprocessing
在Windows下調用失敗了,要先考慮是否是pickle失敗了。
當多個子進程操做同一個東西的時候,就可能會出現混亂的狀況,好比咱們啓動2個進程,對其中一個變量+1 和 +3
# -*- coding: utf-8 -*- import time from multiprocessing import Process, Value # 更改value值 def write(v, n): for i in range(10): time.sleep(0.1) v.value += n print v.value if __name__ == '__main__': """ typecode_to_type = { 'c': ctypes.c_char, 'b': ctypes.c_byte, 'B': ctypes.c_ubyte, 'h': ctypes.c_short, 'H': ctypes.c_ushort, 'i': ctypes.c_int, 'I': ctypes.c_uint, 'l': ctypes.c_long, 'L': ctypes.c_ulong, 'f': ctypes.c_float, 'd': ctypes.c_double } """ # int 類型的值 初始化0,其餘類型見上 v = Value("i", 0) p1 = Process(target=write, args=(v, 1)) p2 = Process(target=write, args=(v, 3)) # 啓動子進程p1,把v + 1: p1.start() # 啓動子進程p2,把v + 3: p2.start() # 等待結束: p1.join() p2.join() # 輸出: 1 4 5 8 9 12 13 16 17 20 21 24 25 28 29 32 33 36 37 40
可見輸出比較混亂,沒有按預想的進程1,輸出1,2,3,4,5... 進程2輸出3,6,9...,這時就須要進程間的鎖
# -*- coding: utf-8 -*- import time from multiprocessing import Process, Value, Lock def write(v, n, lock): with lock: for i in range(10): time.sleep(0.1) v.value += n print v.value if __name__ == '__main__': """ typecode_to_type = { 'c': ctypes.c_char, 'b': ctypes.c_byte, 'B': ctypes.c_ubyte, 'h': ctypes.c_short, 'H': ctypes.c_ushort, 'i': ctypes.c_int, 'I': ctypes.c_uint, 'l': ctypes.c_long, 'L': ctypes.c_ulong, 'f': ctypes.c_float, 'd': ctypes.c_double } """ lock = Lock() # int 類型的值 初始化0,其餘類型見上 v = Value("i", 0) p1 = Process(target=write, args=(v, 1, lock)) p2 = Process(target=write, args=(v, 3, lock)) # 啓動子進程p1,把v + 1: p1.start() # 啓動子進程p2,把v + 3: p2.start() # 等待結束: p1.join() p2.join() # 輸出: 1 2 3 4 5 6 7 8 9 10 13 16 19 22 25 28 31 34 37 40
此時結果就對了,可是這裏由於有鎖的存在,會串行,致使效率降低些
在Unix/Linux下,可使用fork()
調用實現多進程。
要實現跨平臺的多進程,可使用multiprocessing
模塊。
進程間通訊是經過Queue
、Pipes
等實現的。