本文代碼在 Python 3.6 環境下測試經過。python
多進程(multiprocessing)模塊是在 Python 2.6 版本中加入的,和多線程(threading)模塊相似,都是用來作並行運算的。不過Python既然有了threading,爲何還要搞一個multiprocessing呢?這是由於Python內部有一個全局解釋鎖(GIL),任何一個進程任什麼時候候只容許一個線程進行CPU運算,若是一個進程中的某個線程在進行CPU運算時得到GIL,其餘線程將沒法進行CPU運算只能等待,使得多線程沒法利用CPU多核的特性。多進程處理實際上對每一個任務都會生成一個操做系統的進程,而且每個進程都被單獨賦予了Python的解釋器和GIL,因此程序在運行中有多個GIL存在,每一個運行者的線程都會拿到一個GIL,在不一樣的環境下運行,天然也能夠被分配到不一樣的處理器上。編程
multiprocessing模塊提供了一個Process類能夠建立進程對象。建立進程有兩種方式,第一種經過Process類直接建立,參數target指定子進程要執行的程序。第二種經過繼承Process類來實現。安全
咱們先用第一種方式建立子進程,子進程會將傳遞給它的參數擴大一倍,代碼以下:多線程
#-*- coding:utf8 -*-
import os
from multiprocessing import Process, current_process
def doubler(number):
result = number * 2
# 獲取子進程ID
proc_id = os.getpid()
# 獲取子進程名稱
proc_name = current_process().name
print('proc_id:{0} proc_name:{1} result:{2}'.format(proc_id, proc_name, result))
if __name__ == '__main__':
numbers = [5, 10, 15, 20, 25]
procs = []
# 父進程ID和名稱
print('parent_proc_id:{0} parent_proc_name:{1}'.format(os.getpid(), current_process().name))
for num in numbers:
# 建立子進程
proc = Process(target=doubler, args=(num,))
procs.append(proc)
# 啓動子進程
proc.start()
# join方法會讓父進程等待子進程結束後再執行
for proc in procs:
proc.join()
print("Done.")
複製代碼
第二種方式經過繼承Process類,並重寫run方法:app
class MyProcess(Process):
def __init__(self, number):
# 必須調用父類的init方法
super(MyProcess, self).__init__()
self.number = number
def run(self):
result = self.number * 2
# 獲取子進程ID
# self.pid
proc_id = os.getpid()
# 獲取子進程名稱
# self.name
proc_name = current_process().name
print('proc_id:{0} proc_name:{1} result:{2}'.format(proc_id, proc_name, result))
if __name__ == '__main__':
numbers = [5, 10, 15, 20, 25]
procs = []
# 父進程的ID和名稱
print('parent_proc_id:{0} parent_proc_name:{1}'.format(os.getpid(), current_process().name))
for num in numbers:
# 建立子進程
proc = MyProcess(num)
procs.append(proc)
# 啓動子進程,啓動一個新進程實際就是執行本進程對應的run方法
proc.start()
# join方法會讓父進程等待子進程結束後再執行
for proc in procs:
proc.join()
print("Done.")
複製代碼
multiprocessing模塊和threading模塊同樣也支持鎖。經過acquire獲取鎖,執行操做後經過release釋放鎖。dom
#-*- coding:utf8 -*-
from multiprocessing import Process, Lock
def printer(item, lock):
# 獲取鎖
lock.acquire()
try:
print(item)
except Exception as e:
print(e)
else:
print('no exception.')
finally:
# 釋放鎖
lock.release()
if __name__ == '__main__':
# 實例化全局鎖
lock = Lock()
items = ['PHP', 'Python', 'Java']
procs = []
for item in items:
proc = Process(target=printer, args=(item, lock))
procs.append(proc)
proc.start()
for proc in procs:
proc.join()
print('Done.')
複製代碼
Pool類表示工做進程的池子,它能夠提供指定數量的進程供用戶調用,當有請求提交到進程池時,若是進程池有空閒進程或進程數還沒到達指定上限,就會分配一個進程響應請求,不然請求只能等待。Pool類主要在執行目標多且須要控制進程數量的狀況下使用,若是目標少且不用控制進程數量可使用Process類。async
進程池能夠經過map和apply_async方法來調用執行代碼,首先咱們來看map方法:函數
#-*- coding:utf8 -*-
import os
from multiprocessing import Pool, current_process
def doubler(number):
result = number * 2
proc_id = os.getpid()
proc_name = current_process().name
print('proc_id:{0} proc_name:{1} result:{2}'.format(proc_id, proc_name, result))
if __name__ == '__main__':
numbers = [5, 10, 15, 20, 25]
pool = Pool(processes=3)
pool.map(doubler, numbers)
# 關閉pool使其再也不接受新的任務
pool.close()
# 關閉pool,結束工做進程,不在處理未完成的任務
# pool.terminate()
# 主進程阻塞,結束工做進程,再也不處理未完成的任務,join方法要在close或terminate以後使用
pool.join()
print('Done')
複製代碼
map只能向處理函數傳遞一個參數測試
下面來看一下apply/apply_async函數,apply函數是阻塞的,apply_async函數是阻塞的,這裏咱們以apply_async函數爲例:ui
#-*- coding:utf8 -*-
import os, time
from multiprocessing import Pool, current_process
def doubler(number, parent_proc_id, parent_proc_name):
result = number * 2
proc_id = os.getpid()
proc_name = current_process().name
# 設置等待時間,能夠驗證apply和apply_async的阻塞和非阻塞
time.sleep(2)
print('parent_proc_id:{0} parent_proc_name:{1} proc_id:{2} proc_name:{3} number:{4} result:{5}'.format(parent_proc_id, parent_proc_name, proc_id, proc_name, number, result))
if __name__ == '__main__':
numbers = [5, 10, 15, 20, 25]
parent_proc_id = os.getpid()
parent_proc_name = current_process().name
pool = Pool(processes=3)
for num in numbers:
# 非阻塞
pool.apply_async(doubler, (num, parent_proc_id, parent_proc_name))
# 阻塞其它進程
# pool.apply_async(doubler, (num, parent_proc_id, parent_proc_name))
# 關閉pool使其再也不接受新的任務
pool.close()
# 關閉pool,結束工做進程,不在處理未完成的任務
# pool.terminate()
# 主進程阻塞,結束工做進程,再也不處理未完成的任務,join方法要在close或terminate以後使用
pool.join()
print('Done')
複製代碼
進程間通訊的方式通常有管道(Pipe)、信號(Signal)、消息隊列(Message)、信號量(Semaphore)、共享內存(Shared Memory)、套接字(Socket)等。這裏咱們着重講一下在Python多進程編程中經常使用的進程方式multiprocessing.Pipe函數和multiprocessing.Queue類。
multiprocessing.Pipe()即管道模式,調用Pipe()方法返回管道的兩端的Connection。Pipe方法返回(conn1, conn2)表明一個管道的兩個端。Pipe方法有duplex參數,若是duplex參數爲True(默認值),那麼這個管道是全雙工模式,也就是說conn1和conn2都可收發;duplex爲False,conn1只負責接受消息,conn2只負責發送消息。send()和recv()方法分別是發送和接受消息的方法。一個進程從Pipe某一端輸入對象,而後被Pipe另外一端的進程接收,單向管道只容許管道一端的進程輸入另外一端的進程接收,不能夠反向通訊;而雙向管道則容許從兩端輸入和從兩端接收。
#-*- coding:utf8 -*-
import os, time
from multiprocessing import Process, Pipe, current_process
def proc1(pipe, data):
for msg in range(1, 6):
print('{0} 發送 {1}'.format(current_process().name, msg))
pipe.send(msg)
time.sleep(1)
pipe.close()
def proc2(pipe, length):
count = 0
while True:
count += 1
if count == length:
pipe.close()
try:
# 若是沒有接收到數據recv會一直阻塞,若是管道被關閉,recv方法會拋出EOFError
msg = pipe.recv()
print('{0} 接收到 {1}'.format(current_process().name, msg))
except Exception as e:
print(e)
break
if __name__ == '__main__':
conn1, conn2 = Pipe(True)
data = range(0, 6)
length = len(data)
proc1 = Process(target=proc1, args=(conn1, data))
proc2 = Process(target=proc2, args=(conn2, length))
proc1.start()
proc2.start()
proc1.join()
proc2.join()
print('Done.')
複製代碼
Queue是多進程安全的隊列,可使用Queue實現多進程之間的數據傳遞。Queue的使用主要是一邊put(),一邊get(),可是Queue能夠是多個Process進行put()操做,也能夠是多個Process進行get()操做。
在父進程中建立兩個子進程,一個往Queue裏寫數據,一個從Queue裏讀數據:
#-*- coding:utf8 -*-
import os, time, random
from multiprocessing import Process, Queue
def write(q):
print('Process to write: %s' % os.getpid())
for val in range(0, 6):
print('Put %s to queue...' % val)
q.put(val)
time.sleep(random.random())
def read(q):
print('Process to read: %s' % os.getpid())
while True:
try:
val = q.get(block=True, timeout=5)
print('Get %s from queue.' % val)
except Exception as e:
if q.empty():
print('隊列消費完畢.')
break
if __name__ == '__main__':
q = Queue()
proc1 = Process(target=write, args=(q,))
proc2 = Process(target=read, args=(q,))
proc1.start()
proc2.start()
proc1.join()
proc2.join()
# 若是proc2不break的話會一直阻塞,不調用join調用terminate方法能夠終止進程
# proc2.terminate()
print('Done.')
複製代碼
Pipe的讀寫效率要高於Queue。那麼咱們如何的選擇它們呢?