Python在2.6引入了多進程的機制,並提供了豐富的組件及api以方便編寫併發應用。multiprocessing包的組件Process, Queue, Pipe, Lock等組件提供了與多線程相似的功能。使用這些組件,能夠方便地編寫多進程併發程序。
windows
多進程實例:api
import os
from multiprocessing import Process
def info(title):
print(title)
print('module name:', __name__)
print('parent process:', os.getppid())
print('process id:', os.getpid())
print("\n\n")
def f(name):
info('\033[31;1mfunction f\033[0m')
print('hello', name)
if __name__ == '__main__':
info('\033[32;1mmain process line\033[0m')
p = Process(target=info, args=('bob',))
p.start()
p.join()
實例化一個Process必需要指定target和args。target是新的進程的入口方法,能夠認爲是main方法。args是該方法的參數列表。啓動進程相似於啓動Thread,必需要調用start方法。也能夠繼承Process,覆蓋run方法,在run方法中實現該進程的邏輯。調用join方法會阻塞當前調用進程,直到被調用進程運行結束。安全
手工終止一個進程能夠調用terminate方法,在UNIX系統中,該方法會發送SIGTERM信號量,而在windows系統中,會藉助TerminateProcess方法。須要注意的是,exit處理邏輯並不會被執行,該進程的子進程不會被終止,他們只會變成孤兒進程。多線程
Queue是多進程安全的隊列,能夠使用Queue實現多進程之間的數據傳遞。put方法用以插入數據到隊列中,put方法還有兩個可選參數:blocked和timeout。若是blocked爲True(默認值),而且timeout爲正值,該方法會阻塞timeout指定的時間,直到該隊列有剩餘的空間。若是超時,會拋出Queue.Full異常。若是blocked爲False,但該Queue已滿,會當即拋出Queue.Full異常。併發
get方法能夠從隊列讀取而且刪除一個元素。一樣,get方法有兩個可選參數:blocked和timeout。若是blocked爲True(默認值),而且timeout爲正值,那麼在等待時間內沒有取到任何元素,會拋出Queue.Empty異常。若是blocked爲False,有兩種狀況存在,若是Queue有一個值可用,則當即返回該值,不然,若是隊列爲空,則當即拋出Queue.Empty異常。Queue的一段示例代碼:app
from multiprocessing import Process, Queue
def f(q):
q.put([42, None, 'hello'])
if __name__ == '__main__':
q = Queue()
p = Process(target=f, args=(q,))
p2 = Process(target=f, args=(q,))
p.start()
p2.start()
print('data1:',q.get()) # prints "[42, None, 'hello']"
print('data2:',q.get())
p.join()
Pipes
異步
Pipe方法返回(conn1, conn2)表明一個管道的兩個端。Pipe方法有duplex參數,若是duplex參數爲True(默認值),那麼這個管道是全雙工模式,也就是說conn1和conn2都可收發。duplex爲False,conn1只負責接受消息,conn2只負責發送消息。async
send和recv方法分別是發送和接受消息的方法。例如,在全雙工模式下,能夠調用conn1.send發送消息,conn1.recv接收消息。若是沒有消息可接收,recv方法會一直阻塞。若是管道已經被關閉,那麼recv方法會拋出EOFError。ide
from multiprocessing import Process, Pipe
def send(conn):
conn.send("Hello World")
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=send, args=(child_conn,))
p.start()
print(parent_conn.recv())
Managersui
from multiprocessing import Process, Manager
def f(d, l):
d[1] = '1'
d['2'] = 2
d[0.25] = None
l.append('a')
print(l)
if __name__ == '__main__':
with Manager() as manager:
d = manager.dict()
l = manager.list(range(5))
p_list = []
for i in range(10):
p = Process(target=f, args=(d, l))
p.start()
p_list.append(p)
for res in p_list:
res.join()
print(d)
print(l)
進程同步
multiprocessing包提供了Condition, Event, Lock, RLock, Semaphore等組件可用於同步。下面是使用Lock的一個示例:
from multiprocessing import Process, Lock
def f(l, i):
l.acquire()
try:
print('hello world', i)
finally:
l.release()
if __name__ == '__main__':
lock = Lock()
for num in range(10):
Process(target=f, args=(lock, num)).start()
進程池內部維護一個進程序列,當使用時,則去進程池中獲取一個進程,若是進程池序列中沒有可供使用的進進程,那麼程序就會等待,直到進程池中有可用進程爲止。
進程池中有兩個方法:
apply
apply_async
from multiprocessing import Process,Pool,freeze_support
import time
def Foo(i):
time.sleep(2)
return i+100
def Bar(arg):
print('-->exec done:',arg)
if __name__ == '__main__':
freeze_support()
pool = Pool(3)
for i in range(10):
pool.apply_async(func=Foo, args=(i,),callback=Bar) # 異步
#pool.apply(func=Foo, args=(i,)) # 同步無回調機制
print('end')
pool.close()
pool.join() #進程池中進程執行完畢後再關閉,若是註釋,那麼程序直接關閉。