Python學習筆記18:標準庫之多進程(multiprocessing包)

咱們可以使用subprocess包來建立子進程。但這個包有兩個很是大的侷限性:
1) 咱們老是讓subprocess執行外部的程序,而不是執行一個Python腳本內部編寫的函數。
2) 進程間僅僅經過管道進行文本交流。
以上限制了咱們將subprocess包應用到更普遍的多進程任務。
這種比較實際是不公平的,因爲subprocessing自己就是設計成爲一個shell,而不是一個多進程管理包。



一 threading和multiprocessing

multiprocessing包是Python中的多進程管理包。
與threading.Thread相似。它可以利用multiprocessing.Process對象來建立一個進程。
該進程可以執行在Python程序內部編寫的函數。
該Process對象與Thread對象的使用方法一樣,也有start(), run(), join()的方法。
此外multiprocessing包中也有Lock/Event/Semaphore/Condition類 (這些對象可以像多線程那樣,經過參數傳遞給各個進程),用以同步進程。其使用方法與threading包中的同名類一致。
因此。multiprocessing的很是大一部份與threading使用同一套API,僅僅只是換到了多進程的情境。




但在使用這些共享API的時候,咱們要注意下面幾點:
1)在UNIX平臺上,當某個進程終結以後。該進程需要被其父進程調用wait,不然進程成爲殭屍進程(Zombie)。python


因此。有必要對每個Process對象調用join()方法 (實際上等同於wait)。對於多線程來講,由於僅僅有一個進程。因此不存在此必要性。
2)multiprocessing提供了threading包中沒有的IPC(比方Pipe和Queue),效率上更高。shell


應優先考慮Pipe和Queue,避免使用Lock/Event/Semaphore/Condition等同步方式 (因爲它們佔領的不是用戶進程的資源)。
3)多進程應該避免共享資源。數組

在多線程中,咱們可以比較easy地共享資源,比方使用全局變量或者傳遞參數。
在多進程狀況下。由於每個進程有本身獨立的內存空間。以上方法並不合適。此時咱們可以經過共享內存和Manager的方法來共享資源。
但這樣作提升了程序的複雜度,並因爲同步的需要而減小了程序的效率。Process.PID中保存有PID,假設進程尚未start()。則PID爲None。服務器




咱們可以從如下的程序中看到Thread對象和Process對象在使用上的類似性與結果上的不一樣。各個線程和進程都作一件事:打印PID。
但問題是,所有的任務在打印的時候都會向同一個標準輸出(stdout)輸出。這樣輸出的字符會混合在一塊兒,沒法閱讀。
使用Lock同步,在一個任務輸出完畢以後,再贊成還有一個任務輸出,可以避免多個任務同一時候向終端輸出。網絡




# Similarity and difference of multi thread vs. multi process
 
import os
import threading
import multiprocessing
 
# worker function
def worker(sign, lock):
    lock.acquire()
    print(sign, os.getpid())
    lock.release()
 
# Main
print('Main:',os.getpid())
 
# Multi-thread
record = []
lock  = threading.Lock()
for i in range(5):
    thread = threading.Thread(target=worker,args=('thread',lock))
    thread.start()
    record.append(thread)
 
for thread in record:
    thread.join()
 
# Multi-process
record = []
lock = multiprocessing.Lock()
for i in range(5):
    process = multiprocessing.Process(target=worker,args=('process',lock))
    process.start()
    record.append(process)
 
for process in record:
    process.join()

所有Thread的PID都與主程序一樣,而每個Process都有一個不一樣的PID。

二 Pipe和Queue

管道PIPE和消息隊列message queue,multiprocessing包中有Pipe類和Queue類來分別支持這兩種IPC機制。Pipe和Queue可以用來傳送常見的對象。


1) Pipe可以是單向(half-duplex)。也可以是雙向(duplex)。


咱們經過mutiprocessing.Pipe(duplex=False)建立單向管道 (默以爲雙向)。
一個進程從PIPE一端輸入對象,而後被PIPE還有一端的進程接收,單向管道僅僅贊成管道一端的進程輸入,而雙向管道則贊成從兩端輸入。多線程


如下的程序展現了Pipe的使用:
app

# Multiprocessing with Pipe
 
import multiprocessing as mul
 
def proc1(pipe):
    pipe.send('hello')
    print('proc1 rec:',pipe.recv())
 
def proc2(pipe):
    print('proc2 rec:',pipe.recv())
    pipe.send('hello, too')
 
# Build a pipe
pipe = mul.Pipe()
 
# Pass an end of the pipe to process 1
p1   = mul.Process(target=proc1, args=(pipe[0],))


# Pass the other end of the pipe to process 2
p2   = mul.Process(target=proc2, args=(pipe[1],))


p1.start()
p2.start()
p1.join()
p2.join()

這裏的Pipe是雙向的。
Pipe對象創建的時候,返回一個含有兩個元素的表。每個元素表明Pipe的一端(Connection對象)。
咱們對Pipe的某一端調用send()方法來傳送對象,在還有一端使用recv()來接收。


2) Queue與Pipe相相似,都是先進先出的結構。但Queue贊成多個進程放入,多個進程從隊列取出對象。
Queue使用mutiprocessing.Queue(maxsize)建立。maxsize表示隊列中可以存放對象的最大數量。
如下的程序展現了Queue的使用:
import os
import multiprocessing
import time
#==================
# input worker
def inputQ(queue):
    info = str(os.getpid()) + '(put):' + str(time.time())
    queue.put(info)
 
# output worker
def outputQ(queue,lock):
    info = queue.get()
    lock.acquire()
    print (str(os.getpid()) + '(get):' + info)
    lock.release()
#===================
# Main
record1 = []   # store input processes
record2 = []   # store output processes
lock  = multiprocessing.Lock()    # To prevent messy print
queue = multiprocessing.Queue(3)
 
# input processes
for i in range(10):
    process = multiprocessing.Process(target=inputQ,args=(queue,))
    process.start()
    record1.append(process)
 
# output processes
for i in range(10):
    process = multiprocessing.Process(target=outputQ,args=(queue,lock))
    process.start()
    record2.append(process)
 
for p in record1:
    p.join()
 
queue.close()  # No more object will come, close the queue
 
for p in record2:
    p.join()

一些進程使用put()在Queue中放入字符串,這個字符串中包括PID和時間。
還有一些進程從Queue中取出,並打印本身的PID以及get()的字符串。


三 進程池

進程池 (Process Pool)可以建立多個進程。這些進程就像是隨時待命的士兵,準備運行任務(程序)。

一個進程池中可以容納多個待命的士兵。async


比方如下的程序:
函數

import multiprocessing as mul
 
def f(x):
    return x**2
 
pool = mul.Pool(5)
rel  = pool.map(f,[1,2,3,4,5,6,7,8,9,10])
print(rel)

咱們建立了一個允許5個進程的進程池 (Process Pool) 。Pool執行的每個進程都執行f()函數。
咱們利用map()方法,將f()函數做用到表的每個元素上。這與built-in的map()函數相似。僅僅是這裏用5個進程並行處理。
假設進程執行結束後,還有需要處理的元素。那麼的進程會被用於又一次執行f()函數。除了map()方法外。Pool還有如下的常常用法。


1)apply_async(func,args)從進程池中取出一個進程運行func。args爲func的參數。
它將返回一個AsyncResult的對象。你可以對該對象調用get()方法以得到結果。ui


2)close()進程池再也不建立新的進程
3)join()wait進程池中的全部進程。

必須對Pool先調用close()方法才幹join。


四 共享資源

多進程共享資源一定會帶來進程間相互競爭。而這樣的競爭又會形成race condition,咱們的結果有可能被競爭的不肯定性所影響。
但假設需要,咱們依舊可以經過共享內存和Manager對象這麼作。


1 共享內存

依據共享內存(shared memory)的原理,這裏給出用Python實現的樣例:
import multiprocessing
 
def f(n, a):
    n.value   = 3.14
    a[0]      = 5
 
num   = multiprocessing.Value('d', 0.0)
arr   = multiprocessing.Array('i', range(10))
 
p = multiprocessing.Process(target=f, args=(num, arr))
p.start()
p.join()
 
print num.value
print arr[:]

這裏咱們實際上僅僅有主進程和Process對象表明的進程。
咱們在主進程的內存空間中建立共享的內存,也就是Value和Array兩個對象。對象Value被設置成爲雙精度數(d), 並初始化爲0.0。
而Array則相似於C中的數組,有固定的類型(i, 也就是整數)。在Process進程中,咱們改動了Value和Array對象。
回到主程序。打印出結果,主程序也看到了兩個對象的改變,說明資源確實在兩個進程之間共享。


 

2 Manager

Manager對象相似於服務器與客戶之間的通訊 (server-client),與咱們在Internet上的活動很是相似。


咱們用一個進程做爲server,創建Manager來真正存放資源。其餘的進程可以經過參數傳遞或者依據地址來訪問Manager,創建鏈接後。操做server上的資源。


在防火牆贊成的狀況下,咱們全然可以將Manager運用於多計算機。從而模仿了一個真實的網絡情境。
如下的樣例中,咱們對Manager的使用相似於shared memory。但可以共享更豐富的對象類型。

import multiprocessing
 
def f(x, arr, l):
    x.value = 3.14
    arr[0] = 5
    l.append('Hello')
 
server = multiprocessing.Manager()
x    = server.Value('d', 0.0)
arr  = server.Array('i', range(10))
l    = server.list()
 
proc = multiprocessing.Process(target=f, args=(x, arr, l))
proc.start()
proc.join()
 
print(x.value)
print(arr)
print(l)

Manager利用list()方法提供了表的共享方式。
實際上你可以利用dict()來共享詞典,Lock()來共享threading.Lock(注意,咱們共享的是threading.Lock。而不是進程的mutiprocessing.Lock。

後者自己已經實現了進程共享)等。 這樣Manager就贊成咱們共享不少其它樣的對象。

相關文章
相關標籤/搜索