Python多進程編程詳解

本文代碼在 Python 3.6 環境下測試經過。python

簡介

多進程(multiprocessing)模塊是在 Python 2.6 版本中加入的,和多線程(threading)模塊相似,都是用來作並行運算的。不過Python既然有了threading,爲何還要搞一個multiprocessing呢?這是由於Python內部有一個全局解釋鎖(GIL),任何一個進程任什麼時候候只容許一個線程進行CPU運算,若是一個進程中的某個線程在進行CPU運算時得到GIL,其餘線程將沒法進行CPU運算只能等待,使得多線程沒法利用CPU多核的特性。多進程處理實際上對每一個任務都會生成一個操做系統的進程,而且每個進程都被單獨賦予了Python的解釋器和GIL,因此程序在運行中有多個GIL存在,每一個運行者的線程都會拿到一個GIL,在不一樣的環境下運行,天然也能夠被分配到不一樣的處理器上。編程

建立進程(Process)

multiprocessing模塊提供了一個Process類能夠建立進程對象。建立進程有兩種方式,第一種經過Process類直接建立,參數target指定子進程要執行的程序。第二種經過繼承Process類來實現。安全

咱們先用第一種方式建立子進程,子進程會將傳遞給它的參數擴大一倍,代碼以下:多線程

#-*- coding:utf8 -*-
import os
from multiprocessing import Process, current_process

def doubler(number):
    result = number * 2
    # 獲取子進程ID
    proc_id = os.getpid()
    # 獲取子進程名稱
    proc_name = current_process().name
    print('proc_id:{0} proc_name:{1} result:{2}'.format(proc_id, proc_name, result))

if __name__ == '__main__':
    numbers = [5, 10, 15, 20, 25]
    procs = []	
    # 父進程ID和名稱
    print('parent_proc_id:{0} parent_proc_name:{1}'.format(os.getpid(), current_process().name))
    
    for num in numbers:
    	# 建立子進程
    	proc = Process(target=doubler, args=(num,))
    	procs.append(proc)
    	# 啓動子進程
    	proc.start()
    
    # join方法會讓父進程等待子進程結束後再執行
    for proc in procs:
    	proc.join()
    
    print("Done.")
複製代碼

第二種方式經過繼承Process類,並重寫run方法:app

class MyProcess(Process):
    def __init__(self, number):
    	# 必須調用父類的init方法
    	super(MyProcess, self).__init__()
    	self.number = number
    
    def run(self):
    	result = self.number * 2
    	# 獲取子進程ID
    	# self.pid
    	proc_id = os.getpid()
    	# 獲取子進程名稱
    	# self.name
    	proc_name = current_process().name
    	print('proc_id:{0} proc_name:{1} result:{2}'.format(proc_id, proc_name, result))

if __name__ == '__main__':
    numbers = [5, 10, 15, 20, 25]
    procs = []
    # 父進程的ID和名稱
    print('parent_proc_id:{0} parent_proc_name:{1}'.format(os.getpid(), current_process().name))
    
    for num in numbers:
    	# 建立子進程
    	proc = MyProcess(num)
    	procs.append(proc)
    	# 啓動子進程,啓動一個新進程實際就是執行本進程對應的run方法
    	proc.start()
    
    # join方法會讓父進程等待子進程結束後再執行
    for proc in procs:
    	proc.join()
    
    print("Done.")
複製代碼

進程鎖(Lock)

multiprocessing模塊和threading模塊同樣也支持鎖。經過acquire獲取鎖,執行操做後經過release釋放鎖。dom

#-*- coding:utf8 -*-
from multiprocessing import Process, Lock

def printer(item, lock):
    # 獲取鎖
    lock.acquire()
    try:
    	print(item)
    except Exception as e:
    	print(e)
    else:
    	print('no exception.')
    finally:
        # 釋放鎖
    	lock.release()

if __name__ == '__main__':
    # 實例化全局鎖
    lock = Lock()
    items = ['PHP', 'Python', 'Java']
    procs = []
    
    for item in items:
    	proc = Process(target=printer, args=(item, lock))
    	procs.append(proc)
    	proc.start()
    
    for proc in procs:
    	proc.join()
    
    print('Done.')
複製代碼

進程池(Pool)

Pool類表示工做進程的池子,它能夠提供指定數量的進程供用戶調用,當有請求提交到進程池時,若是進程池有空閒進程或進程數還沒到達指定上限,就會分配一個進程響應請求,不然請求只能等待。Pool類主要在執行目標多且須要控制進程數量的狀況下使用,若是目標少且不用控制進程數量可使用Process類。async

進程池能夠經過map和apply_async方法來調用執行代碼,首先咱們來看map方法:函數

#-*- coding:utf8 -*-
import os
from multiprocessing import Pool, current_process

def doubler(number):
    result = number * 2
    proc_id = os.getpid()
    proc_name = current_process().name
    print('proc_id:{0} proc_name:{1} result:{2}'.format(proc_id, proc_name, result))

if __name__ == '__main__':
    numbers = [5, 10, 15, 20, 25]
    pool = Pool(processes=3)
    pool.map(doubler, numbers)
    
    # 關閉pool使其再也不接受新的任務
    pool.close()
    
    # 關閉pool,結束工做進程,不在處理未完成的任務
    # pool.terminate()
    
    # 主進程阻塞,結束工做進程,再也不處理未完成的任務,join方法要在close或terminate以後使用
    pool.join()
    
    print('Done')
複製代碼

map只能向處理函數傳遞一個參數測試

下面來看一下apply/apply_async函數,apply函數是阻塞的,apply_async函數是阻塞的,這裏咱們以apply_async函數爲例:ui

#-*- coding:utf8 -*-
import os, time
from multiprocessing import Pool, current_process

def doubler(number, parent_proc_id, parent_proc_name):
    result = number * 2
    proc_id = os.getpid()
    proc_name = current_process().name
    # 設置等待時間,能夠驗證apply和apply_async的阻塞和非阻塞
    time.sleep(2)
    print('parent_proc_id:{0} parent_proc_name:{1} proc_id:{2} proc_name:{3} number:{4} result:{5}'.format(parent_proc_id, parent_proc_name, proc_id, proc_name, number, result))

if __name__ == '__main__':
    numbers = [5, 10, 15, 20, 25]
    parent_proc_id = os.getpid()
    parent_proc_name = current_process().name
    pool = Pool(processes=3)
    for num in numbers:
    	# 非阻塞
    	pool.apply_async(doubler, (num, parent_proc_id, parent_proc_name))
    	# 阻塞其它進程
    	# pool.apply_async(doubler, (num, parent_proc_id, parent_proc_name))
    	
    # 關閉pool使其再也不接受新的任務
    pool.close()
    
    # 關閉pool,結束工做進程,不在處理未完成的任務
    # pool.terminate()
    
    # 主進程阻塞,結束工做進程,再也不處理未完成的任務,join方法要在close或terminate以後使用
    pool.join()
    
    print('Done')
複製代碼

進程間通訊

進程間通訊的方式通常有管道(Pipe)、信號(Signal)、消息隊列(Message)、信號量(Semaphore)、共享內存(Shared Memory)、套接字(Socket)等。這裏咱們着重講一下在Python多進程編程中經常使用的進程方式multiprocessing.Pipe函數和multiprocessing.Queue類。

Pipe

multiprocessing.Pipe()即管道模式,調用Pipe()方法返回管道的兩端的Connection。Pipe方法返回(conn1, conn2)表明一個管道的兩個端。Pipe方法有duplex參數,若是duplex參數爲True(默認值),那麼這個管道是全雙工模式,也就是說conn1和conn2都可收發;duplex爲False,conn1只負責接受消息,conn2只負責發送消息。send()和recv()方法分別是發送和接受消息的方法。一個進程從Pipe某一端輸入對象,而後被Pipe另外一端的進程接收,單向管道只容許管道一端的進程輸入另外一端的進程接收,不能夠反向通訊;而雙向管道則容許從兩端輸入和從兩端接收。

#-*- coding:utf8 -*-
import os, time
from multiprocessing import Process, Pipe, current_process

def proc1(pipe, data):
    for msg in range(1, 6):
        print('{0} 發送 {1}'.format(current_process().name, msg))
        pipe.send(msg)
        time.sleep(1)
    pipe.close()

def proc2(pipe, length):
    count = 0
    while True:
        count += 1
        if count == length:
            pipe.close()
        try:
            # 若是沒有接收到數據recv會一直阻塞,若是管道被關閉,recv方法會拋出EOFError
            msg = pipe.recv()
            print('{0} 接收到 {1}'.format(current_process().name, msg))
        except Exception as e:
            print(e)
            break

if __name__ == '__main__':
    conn1, conn2 = Pipe(True)
    data = range(0, 6)
    length = len(data)
    proc1 = Process(target=proc1, args=(conn1, data))
    proc2 = Process(target=proc2, args=(conn2, length))
    
    proc1.start()
    proc2.start()

    proc1.join()
    proc2.join()
    
    print('Done.')
複製代碼

Queue

Queue是多進程安全的隊列,可使用Queue實現多進程之間的數據傳遞。Queue的使用主要是一邊put(),一邊get(),可是Queue能夠是多個Process進行put()操做,也能夠是多個Process進行get()操做。

  • put方法用來插入數據到隊列中,put方法還有兩個可選參數:block和timeout。若是block爲True(默認值),而且timeout爲正值,該方法會阻塞timeout指定的時間,直到該隊列有剩餘的空間。若是超時,會拋出Queue.Full異常。若是block爲False,但該Queue已滿,會當即拋出Queue.Full異常。
  • get方法能夠從隊列讀取而且刪除一個元素。一樣,get方法有兩個可選參數:block和timeout。若是block爲True(默認值),而且timeout爲正值,那麼在等待時間內沒有取到任何元素,會拋出Queue.Empty異常。若是block爲False,有兩種狀況存在,若是Queue有一個值可用,則當即返回該值;不然,若是隊列爲空,則當即拋出Queue.Empty異常。

在父進程中建立兩個子進程,一個往Queue裏寫數據,一個從Queue裏讀數據:

#-*- coding:utf8 -*-
import os, time, random
from multiprocessing import Process, Queue

def write(q):
    print('Process to write: %s' % os.getpid())
    for val in range(0, 6):
        print('Put %s to queue...' % val)
        q.put(val)
        time.sleep(random.random())

def read(q):
    print('Process to read: %s' % os.getpid())
    while True:
        try:
            val = q.get(block=True, timeout=5)
            print('Get %s from queue.' % val)
        except Exception as e:
            if q.empty():
                print('隊列消費完畢.')
                break
                
if __name__ == '__main__':
    q = Queue()
    
    proc1 = Process(target=write, args=(q,))
    proc2 = Process(target=read, args=(q,))

    proc1.start()
    proc2.start()

    proc1.join()
    proc2.join()
    
    # 若是proc2不break的話會一直阻塞,不調用join調用terminate方法能夠終止進程
    # proc2.terminate()
    
    print('Done.')
複製代碼

Pipe的讀寫效率要高於Queue。那麼咱們如何的選擇它們呢?

  • 若是你的環境是多生產者和消費者,那麼你只能是選擇queue隊列。
  • 若是兩個進程間處理的邏輯簡單,可是就是要求絕對的速度,那麼pipe是個好選擇。
相關文章
相關標籤/搜索