Python標準庫10 多進程初步 (multiprocessing包)

做者:Vamei 出處:http://www.cnblogs.com/vamei 歡迎轉載,也請保留這段聲明。謝謝!html

 

咱們已經見過了使用subprocess包來建立子進程,但這個包有兩個很大的侷限性:1) 咱們老是讓subprocess運行外部的程序,而不是運行一個Python腳本內部編寫的函數。2) 進程間只經過管道進行文本交流。以上限制了咱們將subprocess包應用到更普遍的多進程任務。(這樣的比較實際是不公平的,由於subprocessing自己就是設計成爲一個shell,而不是一個多進程管理包)python

 

threading和multiprocessing

(請儘可能先閱讀Python多線程與同步)shell

multiprocessing包是Python中的多進程管理包。與threading.Thread相似,它能夠利用multiprocessing.Process對象來建立一個進程。該進程能夠運行在Python程序內部編寫的函數。該Process對象與Thread對象的用法相同,也有start(), run(), join()的方法。此外multiprocessing包中也有Lock/Event/Semaphore/Condition類 (這些對象能夠像多線程那樣,經過參數傳遞給各個進程),用以同步進程,其用法與threading包中的同名類一致。因此,multiprocessing的很大一部份與threading使用同一套API,只不過換到了多進程的情境。多線程

但在使用這些共享API的時候,咱們要注意如下幾點:app

  • 在UNIX平臺上,當某個進程終結以後,該進程須要被其父進程調用wait,不然進程成爲殭屍進程(Zombie)。因此,有必要對每一個Process對象調用join()方法 (實際上等同於wait)。對於多線程來講,因爲只有一個進程,因此不存在此必要性。函數

  • multiprocessing提供了threading包中沒有的IPC(好比Pipe和Queue),效率上更高。應優先考慮Pipe和Queue,避免使用Lock/Event/Semaphore/Condition等同步方式 (由於它們佔據的不是用戶進程的資源)。ui

  • 多進程應該避免共享資源。在多線程中,咱們能夠比較容易地共享資源,好比使用全局變量或者傳遞參數。在多進程狀況下,因爲每一個進程有本身獨立的內存空間,以上方法並不合適。此時咱們能夠經過共享內存Manager的方法來共享資源。但這樣作提升了程序的複雜度,並由於同步的須要而下降了程序的效率。
    spa

Process.PID中保存有PID,若是進程尚未start(),則PID爲None。線程

 

咱們能夠從下面的程序中看到Thread對象和Process對象在使用上的類似性與結果上的不一樣。各個線程和進程都作一件事:打印PID。但問題是,全部的任務在打印的時候都會向同一個標準輸出(stdout)輸出。這樣輸出的字符會混合在一塊兒,沒法閱讀。使用Lock同步,在一個任務輸出完成以後,再容許另外一個任務輸出,能夠避免多個任務同時向終端輸出。設計

# Similarity and difference of multi thread vs. multi process
# Written by Vamei

import os
import threading
import multiprocessing

# worker function
def worker(sign, lock):
    lock.acquire()
    print(sign, os.getpid())
    lock.release()

# Main
print('Main:',os.getpid())

# Multi-thread
record = []
lock  = threading.Lock()
for i in range(5):
    thread = threading.Thread(target=worker,args=('thread',lock))
    thread.start()
    record.append(thread)

for thread in record:
    thread.join()

# Multi-process
record = []
lock = multiprocessing.Lock()
for i in range(5):
    process = multiprocessing.Process(target=worker,args=('process',lock))
    process.start()
    record.append(process)

for process in record:
    process.join()

全部Thread的PID都與主程序相同,而每一個Process都有一個不一樣的PID。

(練習: 使用mutiprocessing包將Python多線程與同步中的多線程程序更改成多進程程序)

 

Pipe和Queue

正如咱們在Linux多線程中介紹的管道PIPE和消息隊列message queue,multiprocessing包中有PipeQueue類來分別支持這兩種IPC機制。Pipe和Queue能夠用來傳送常見的對象。

 

1) Pipe能夠是單向(half-duplex),也能夠是雙向(duplex)。咱們經過mutiprocessing.Pipe(duplex=False)建立單向管道 (默認爲雙向)。一個進程從PIPE一端輸入對象,而後被PIPE另外一端的進程接收,單向管道只容許管道一端的進程輸入,而雙向管道則容許從兩端輸入。

下面的程序展現了Pipe的使用:

# Multiprocessing with Pipe
# Written by Vamei

import multiprocessing as mul

def proc1(pipe):
    pipe.send('hello')
    print('proc1 rec:',pipe.recv())

def proc2(pipe):
    print('proc2 rec:',pipe.recv())
    pipe.send('hello, too')

# Build a pipe
pipe = mul.Pipe()

# Pass an end of the pipe to process 1
p1   = mul.Process(target=proc1, args=(pipe[0],))
# Pass the other end of the pipe to process 2
p2   = mul.Process(target=proc2, args=(pipe[1],))
p1.start()
p2.start()
p1.join()
p2.join()

這裏的Pipe是雙向的。

Pipe對象創建的時候,返回一個含有兩個元素的表,每一個元素表明Pipe的一端(Connection對象)。咱們對Pipe的某一端調用send()方法來傳送對象,在另外一端使用recv()來接收。

 

2) Queue與Pipe相相似,都是先進先出的結構。但Queue容許多個進程放入,多個進程從隊列取出對象。Queue使用mutiprocessing.Queue(maxsize)建立,maxsize表示隊列中能夠存放對象的最大數量。

下面的程序展現了Queue的使用:

# Written by Vamei
import os
import multiprocessing
import time
#==================
# input worker
def inputQ(queue):
    info = str(os.getpid()) + '(put):' + str(time.time())
    queue.put(info)

# output worker
def outputQ(queue,lock):
    info = queue.get()
    lock.acquire()
    print (str(os.getpid()) + '(get):' + info)
    lock.release()
#===================
# Main
record1 = []   # store input processes
record2 = []   # store output processes
lock  = multiprocessing.Lock()    # To prevent messy print
queue = multiprocessing.Queue(3)

# input processes
for i in range(10):
    process = multiprocessing.Process(target=inputQ,args=(queue,))
    process.start()
    record1.append(process)

# output processes
for i in range(10):
    process = multiprocessing.Process(target=outputQ,args=(queue,lock))
    process.start()
    record2.append(process)

for p in record1:
    p.join()

queue.close()  # No more object will come, close the queue

for p in record2:
    p.join()

一些進程使用put()在Queue中放入字符串,這個字符串中包含PID和時間。另外一些進程從Queue中取出,並打印本身的PID以及get()的字符串。

 

總結

Process, Lock, Event, Semaphore, Condition

Pipe, Queue

相關文章
相關標籤/搜索