關於我
編程界的一名小程序猿,目前在一個創業團隊任team lead,技術棧涉及Android、Python、Java和Go,這個也是咱們團隊的主要技術棧。 聯繫:hylinux1024@gmail.comhtml
上一篇文章介紹了線程的使用。然而Python
中因爲Global Interpreter Lock
(全局解釋鎖GIL
)的存在,每一個線程在在執行時須要獲取到這個GIL
,在同一時刻中只有一個線程獲得解釋鎖的執行,Python
中的線程並無真正意義上的併發執行,多線程的執行效率也不必定比單線程的效率更高。 若是要充分利用現代多核CPU
的併發能力,就要使用multipleprocessing
模塊了。python
與使用線程的threading
模塊相似,multipleprocessing
模塊提供許多高級API
。最多見的是Pool
對象了,使用它的接口能很方便地寫出併發執行的代碼。linux
from multiprocessing import Pool
def f(x):
return x * x
if __name__ == '__main__':
with Pool(5) as p:
# map方法的做用是將f()方法併發地映射到列表中的每一個元素
print(p.map(f, [1, 2, 3]))
# 執行結果
# [1, 4, 9]
複製代碼
關於Pool
下文中還會提到,這裏咱們先來看Process
。編程
要建立一個進程可使用Process
類,使用start()
方法啓動進程。小程序
from multiprocessing import Process
import os
def echo(text):
# 父進程ID
print("Process Parent ID : ", os.getppid())
# 進程ID
print("Process PID : ", os.getpid())
print('echo : ', text)
if __name__ == '__main__':
p = Process(target=echo, args=('hello process',))
p.start()
p.join()
# 執行結果
# Process Parent ID : 27382
# Process PID : 27383
# echo : hello process
複製代碼
正如開篇提到的multiprocessing
模塊提供了Pool
類能夠很方便地實現一些簡單多進程場景。 它主要有如下接口安全
apply(func[, args[, kwds]])
func(args,kwds)
方法,在方法結束返回前會阻塞。apply_async(func[, args[, kwds[, callback[, error_callback]]]])
func(args,kwds)
,會當即返回一個result
對象,若是指定了callback
參數,結果會經過回調方法返回,還能夠指定執行出錯的回調方法error_callback()
map(func, iterable[, chunksize])
map()
,能夠併發執行func
,是同步方法map_async(func, iterable[, chunksize[, callback[, error_callback]]])
map
close()
terminate()
join()
close()
或者terminate()
from multiprocessing import Pool
def f(x):
return x * x
if __name__ == '__main__':
with Pool(5) as p:
# map方法的做用是將f()方法併發地映射到列表中的每一個元素
a = p.map(f, [1, 2, 3])
print(a)
# 異步執行map
b = p.map_async(f, [3, 5, 7, 11])
# b 是一個result對象,表明方法的執行結果
print(b)
# 爲了拿到結果,使用join方法等待池中工做進程退出
p.close()
# 調用join方法前,需先執行close或terminate方法
p.join()
# 獲取執行結果
print(b.get())
# 執行結果
# [1, 4, 9]
# <multiprocessing.pool.MapResult object at 0x10631b710>
# [9, 25, 49, 121]
複製代碼
map_async()
和apply_async()
執行後會返回一個class multiprocessing.pool.AsyncResult
對象,經過它的get()
能夠獲取到執行結果,ready()
能夠判斷AsyncResult
的結果是否準備好。多線程
multiprocessing
模塊提供了兩種方式用於進程間的數據共享:隊列(Queue
)和管道(Pipe
)併發
Queue
是線程安全,也是進程安全的。使用Queue
能夠實現進程間的數據共享,例以下面的demo
中子進程put
一個對象,在主進程中就能get
到這個對象。 任何能夠序列化的對象均可以經過Queue
來傳輸。app
from multiprocessing import Process, Queue
def f(q):
q.put([42, None, 'hello'])
if __name__ == '__main__':
# 使用Queue進行數據通訊
q = Queue()
p = Process(target=f, args=(q,))
p.start()
# 主進程取得子進程中的數據
print(q.get()) # prints "[42, None, 'hello']"
p.join()
# 執行結果
# [42, None, 'hello']
複製代碼
Pipe()
返回一對經過管道鏈接的Connection
對象。這兩個對象能夠理解爲管道的兩端,它們經過send()
和recv()
發送和接收數據。異步
from multiprocessing import Process, Pipe
def write(conn):
# 子進程中發送一個對象
conn.send([42, None, 'hello'])
conn.close()
def read(conn):
# 在讀的進程中經過recv接收對象
data = conn.recv()
print(data)
if __name__ == '__main__':
# Pipe()方法返回一對鏈接對象
w_conn, r_conn = Pipe()
wp = Process(target=write, args=(w_conn,))
rp = Process(target=read, args=(r_conn,))
wp.start()
rp.start()
# 執行結果
# [42, None, 'hello']
複製代碼
須要注意的是,兩個進程不能同時對一個鏈接對象進行send
或recv
操做。
咱們知道線程間的同步是經過鎖機制來實現的,進程也同樣。
from multiprocessing import Process, Lock
import time
def print_with_lock(l, i):
l.acquire()
try:
time.sleep(1)
print('hello world', i)
finally:
l.release()
def print_without_lock(i):
time.sleep(1)
print('hello world', i)
if __name__ == '__main__':
lock = Lock()
# 先執行有鎖的
for num in range(5):
Process(target=print_with_lock, args=(lock, num)).start()
# 再執行無鎖的
# for num in range(5):
# Process(target=print_without_lock, args=(num,)).start()
複製代碼
有鎖的代碼將每秒依次打印
hello world 0
hello world 1
hello world 2
hello world 3
hello world 4
複製代碼
若是執行無鎖的代碼,則在個人電腦上執行結果是這樣的
hello worldhello world 0
1
hello world 2
hello world 3
hello world 4
複製代碼
除了Lock
,還包括RLock
、Condition
、Semaphore
和Event
等進程間的同步原語。其用法也與線程間的同步原語很相似。API
使用能夠參考文末中引用的文檔連接。
在工程中實現進程間的數據共享應當優先使用隊列或管道。
本文對multiprocessing
模塊中常見的API
做了簡單的介紹。講述了Process
和Pool
的常見用法,同時介紹了進程間的數據方式:隊列和管道。最後簡單瞭解了進程間的同步原語。
經過與上篇的對比學習,本文的內容應該是更加容易掌握的。