python 多進程

要讓Python程序實現多進程(multiprocessing),咱們先了解操做系統的相關知識。python

Unix/Linux操做系統提供了一個fork()系統調用,它很是特殊。普通的函數調用,調用一次,返回一次,可是fork()調用一次,返回兩次,由於操做系統自動把當前進程(稱爲父進程)複製了一份(稱爲子進程),而後,分別在父進程和子進程內返回。windows

子進程永遠返回0,而父進程返回子進程的ID。這樣作的理由是,一個父進程能夠fork出不少子進程,因此,父進程要記下每一個子進程的ID,而子進程只須要調用getppid()就能夠拿到父進程的ID。多線程

Python的os模塊封裝了常見的系統調用,其中就包括fork,能夠在Python程序中輕鬆建立子進程:app

import os

print('Process (%s) start...' % os.getpid())
# Only works on Unix/Linux/Mac:
pid = os.fork()
if pid == 0:
    print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()))
else:
    print('I (%s) just created a child process (%s).' % (os.getpid(), pid))

以上代碼在windows上會運行失敗,由於沒有fork調用dom

multiprocessing

python提供了multiprocessing包供多線程的開發,其提供了一個Process類來表明一個進程對象,使用方式和多線程Threading同樣async

from multiprocessing import Process
import os

# 子進程要執行的代碼
def run_proc(name):
    print('Run child process %s (%s)...' % (name, os.getpid()))

if __name__=='__main__':
    print('Parent process %s.' % os.getpid())
    p = Process(target=run_proc, args=('test',))
    print('Child process will start.')
    p.start()
    p.join()
    print('Child process end.')

# 輸出:
Parent process 8768.
Child process will start.
Run child process test (8572)...
Child process end.

建立子進程時,只須要傳入一個執行函數和函數的參數,建立一個Process實例,用start()方法啓動函數

join()方法能夠等待子進程結束後再繼續往下運行,一般用於進程間的同步。ui

Pool

若是要啓動大量的子進程,能夠用進程池的方式批量建立子進程:spa

from multiprocessing import Pool, current_process
import os, time, random

def long_time_task(name):
    print('Run task %s %s (%s)...' % (name, current_process().name, os.getpid()))
    start = time.time()
    time.sleep(random.random() * 3)
    end = time.time()
    print('Task %s %s runs %0.2f seconds.' % (name, current_process().name, (end - start)))
    return name

def done(name):
    print("Task %s %s is done" % (name, current_process().name))

if __name__=='__main__':
    print('Parent process %s.' % os.getpid())
    p = Pool(4)
    for i in range(5):
        p.apply_async(long_time_task, args=(i,), callback=done)
    print('Waiting for all subprocesses done...')
    p.close()
    p.join()
    print('All subprocesses done.')

# 輸出:
Parent process 10040.
Run task 0 PoolWorker-4 (5528)...
Waiting for all subprocesses done...
Run task 1 PoolWorker-1 (4844)...
Run task 2 PoolWorker-2 (4892)...
Run task 3 PoolWorker-3 (7492)...
Task 0 PoolWorker-4 runs 1.70 seconds.
Run task 4 PoolWorker-4 (5528)...
Task 0 MainProcess is done
Task 2 PoolWorker-2 runs 1.94 seconds.
Task 2 MainProcess is done
Task 1 PoolWorker-1 runs 2.26 seconds.
Task 1 MainProcess is done
Task 3 PoolWorker-3 runs 2.27 seconds.
Task 3 MainProcess is done
Task 4 PoolWorker-4 runs 1.83 seconds.
Task 4 MainProcess is done
All subprocesses done.

# 如是 p.apply(long_time_task, args=(i,)) 阻塞版本的話(此處無callback參數),輸出:
Parent process 7624.
Run task 0 PoolWorker-3 (2128)...
Task 0 PoolWorker-3 runs 2.98 seconds.
Run task 1 PoolWorker-4 (5460)...
Task 1 PoolWorker-4 runs 1.51 seconds.
Run task 2 PoolWorker-2 (8780)...
Task 2 PoolWorker-2 runs 0.66 seconds.
Run task 3 PoolWorker-1 (7044)...
Task 3 PoolWorker-1 runs 1.13 seconds.
Run task 4 PoolWorker-3 (2128)...
Task 4 PoolWorker-3 runs 2.94 seconds.
Waiting for all subprocesses done...
All subprocesses done.

Pool對象調用join()方法會等待全部子進程執行完畢,調用join()以前必須先調用close(),調用close()以後就不能繼續添加新的Process了。操作系統

這裏Pool(4)即表明同時跑4個進程,不填寫默認是當前CPU個數。

上述代碼中的p.apply_async()apply()函數的變體,apply_async()apply()的並行版本,apply()apply_async()的阻塞版本,使用apply()主進程會被阻塞直到子進程執行結束。apply()既是Pool的方法,也是Python內置的函數,二者等價。

apply_async()是能夠有callback回調函數的,回調函數的參數即爲子進程函數的返回值(這裏咱們能夠設計多種業務,回傳任務id,結束更改任務狀態等)

apply()沒有回調函數,由於自己就是阻塞的,會等待返回子進程函數的返回值

進程間通訊

對於進程間的通訊,multiprocessing提供了Queue,Pipes,Value等多種方式來交換數據

以Queue爲例,在父進程中建立兩個子進程,一個往Queue裏寫數據,一個從Queue裏讀數據:

# -*- coding: utf-8 -*-

from multiprocessing import Process, Queue
import os, time, random

# 寫數據進程執行的代碼:
def write(q):
    print('Process to write: %s' % os.getpid())
    for value in ['A', 'B', 'C']:
        print('Put %s to queue...' % value)
        q.put(value)
        time.sleep(random.random())

# 讀數據進程執行的代碼:
def read(q):
    print('Process to read: %s' % os.getpid())
    while True:
        value = q.get(True)
        print('Get %s from queue.' % value)

if __name__=='__main__':
    # 父進程建立Queue,並傳給各個子進程:
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    # 啓動子進程pw,寫入:
    pw.start()
    # 啓動子進程pr,讀取:
    pr.start()
    # 等待pw結束:
    pw.join()
    # pr進程裏是死循環,沒法等待其結束,只能強行終止:
    pr.terminate()

# 輸出:
Process to read: 8868
Process to write: 6832
Put A to queue...
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.

在Unix/Linux下,multiprocessing模塊封裝了fork()調用,使咱們不須要關注fork()的細節。因爲Windows沒有fork調用,所以,multiprocessing須要「模擬」出fork的效果,父進程全部Python對象都必須經過pickle序列化再傳到子進程去,全部,若是multiprocessing在Windows下調用失敗了,要先考慮是否是pickle失敗了。

進程鎖

當多個子進程操做同一個東西的時候,就可能會出現混亂的狀況,好比咱們啓動2個進程,對其中一個變量+1 和 +3

# -*- coding: utf-8 -*-

import time
from multiprocessing import Process, Value


# 更改value值
def write(v, n):
    for i in range(10):
        time.sleep(0.1)
        v.value += n
        print v.value


if __name__ == '__main__':
    """
    typecode_to_type = {
    'c': ctypes.c_char,
    'b': ctypes.c_byte,  'B': ctypes.c_ubyte,
    'h': ctypes.c_short, 'H': ctypes.c_ushort,
    'i': ctypes.c_int,   'I': ctypes.c_uint,
    'l': ctypes.c_long,  'L': ctypes.c_ulong,
    'f': ctypes.c_float, 'd': ctypes.c_double
    }
    """
    # int 類型的值 初始化0,其餘類型見上
    v = Value("i", 0)
    p1 = Process(target=write, args=(v, 1))
    p2 = Process(target=write, args=(v, 3))
    # 啓動子進程p1,把v + 1:
    p1.start()
    # 啓動子進程p2,把v + 3:
    p2.start()
    # 等待結束:
    p1.join()
    p2.join()

# 輸出:
1
4
5
8
9
12
13
16
17
20
21
24
25
28
29
32
33
36
37
40

可見輸出比較混亂,沒有按預想的進程1,輸出1,2,3,4,5... 進程2輸出3,6,9...,這時就須要進程間的鎖

# -*- coding: utf-8 -*-

import time
from multiprocessing import Process, Value, Lock


def write(v, n, lock):
    with lock:
        for i in range(10):
            time.sleep(0.1)
            v.value += n
            print v.value


if __name__ == '__main__':
    """
    typecode_to_type = {
    'c': ctypes.c_char,
    'b': ctypes.c_byte,  'B': ctypes.c_ubyte,
    'h': ctypes.c_short, 'H': ctypes.c_ushort,
    'i': ctypes.c_int,   'I': ctypes.c_uint,
    'l': ctypes.c_long,  'L': ctypes.c_ulong,
    'f': ctypes.c_float, 'd': ctypes.c_double
    }
    """
    lock = Lock()
    # int 類型的值 初始化0,其餘類型見上
    v = Value("i", 0)
    p1 = Process(target=write, args=(v, 1, lock))
    p2 = Process(target=write, args=(v, 3, lock))
    # 啓動子進程p1,把v + 1:
    p1.start()
    # 啓動子進程p2,把v + 3:
    p2.start()
    # 等待結束:
    p1.join()
    p2.join()

# 輸出:
1
2
3
4
5
6
7
8
9
10
13
16
19
22
25
28
31
34
37
40

此時結果就對了,可是這裏由於有鎖的存在,會串行,致使效率降低些

小結

在Unix/Linux下,可使用fork()調用實現多進程。

要實現跨平臺的多進程,可使用multiprocessing模塊。

進程間通訊是經過QueuePipes等實現的。

相關文章
相關標籤/搜索