快速瞭解Python併發編程的工程實現(下)

關於我
編程界的一名小程序猿,目前在一個創業團隊任team lead,技術棧涉及Android、Python、Java和Go,這個也是咱們團隊的主要技術棧。 聯繫:hylinux1024@gmail.comhtml

0x00 使用進程實現併發

上一篇文章介紹了線程的使用。然而Python中因爲Global Interpreter Lock(全局解釋鎖GIL)的存在,每一個線程在在執行時須要獲取到這個GIL,在同一時刻中只有一個線程獲得解釋鎖的執行,Python中的線程並無真正意義上的併發執行,多線程的執行效率也不必定比單線程的效率更高。 若是要充分利用現代多核CPU的併發能力,就要使用multipleprocessing模塊了。python

0x01 multipleprocessing

與使用線程的threading模塊相似,multipleprocessing模塊提供許多高級API。最多見的是Pool對象了,使用它的接口能很方便地寫出併發執行的代碼。linux

from multiprocessing import Pool

def f(x):
    return x * x

if __name__ == '__main__':
    with Pool(5) as p:
        # map方法的做用是將f()方法併發地映射到列表中的每一個元素
        print(p.map(f, [1, 2, 3]))

# 執行結果
# [1, 4, 9]
複製代碼

關於Pool下文中還會提到,這裏咱們先來看Process編程

Process

要建立一個進程可使用Process類,使用start()方法啓動進程。小程序

from multiprocessing import Process
import os

def echo(text):
    # 父進程ID
    print("Process Parent ID : ", os.getppid())
    # 進程ID
    print("Process PID : ", os.getpid())
    print('echo : ', text)

if __name__ == '__main__':
    p = Process(target=echo, args=('hello process',))
    p.start()
    p.join()
    
# 執行結果
# Process Parent ID : 27382
# Process PID : 27383
# echo : hello process
複製代碼
進程池

正如開篇提到的multiprocessing模塊提供了Pool類能夠很方便地實現一些簡單多進程場景。 它主要有如下接口安全

  • apply(func[, args[, kwds]])
    執行func(args,kwds)方法,在方法結束返回前會阻塞。
  • apply_async(func[, args[, kwds[, callback[, error_callback]]]])
    異步執行func(args,kwds),會當即返回一個result對象,若是指定了callback參數,結果會經過回調方法返回,還能夠指定執行出錯的回調方法error_callback()
  • map(func, iterable[, chunksize])
    相似內置函數map(),能夠併發執行func,是同步方法
  • map_async(func, iterable[, chunksize[, callback[, error_callback]]])
    異步版本的map
  • close()
    關閉進程池。當池中的全部工做進程都執行完畢時,進程會退出。
  • terminate()
    終止進程池
  • join()
    等待工做進程執行完,必需先調用close()或者terminate()
from multiprocessing import Pool

def f(x):
    return x * x

if __name__ == '__main__':
    with Pool(5) as p:
        # map方法的做用是將f()方法併發地映射到列表中的每一個元素
        a = p.map(f, [1, 2, 3])
        print(a)
        # 異步執行map
        b = p.map_async(f, [3, 5, 7, 11])
        # b 是一個result對象,表明方法的執行結果
        print(b)
        # 爲了拿到結果,使用join方法等待池中工做進程退出
        p.close()
        # 調用join方法前,需先執行close或terminate方法
        p.join()
        # 獲取執行結果
        print(b.get())

# 執行結果
# [1, 4, 9]
# <multiprocessing.pool.MapResult object at 0x10631b710>
# [9, 25, 49, 121]
複製代碼

map_async()apply_async()執行後會返回一個class multiprocessing.pool.AsyncResult對象,經過它的get()能夠獲取到執行結果,ready()能夠判斷AsyncResult的結果是否準備好。多線程

進程間數據的傳輸

multiprocessing模塊提供了兩種方式用於進程間的數據共享:隊列(Queue)和管道(Pipe)併發

Queue是線程安全,也是進程安全的。使用Queue能夠實現進程間的數據共享,例以下面的demo中子進程put一個對象,在主進程中就能get到這個對象。 任何能夠序列化的對象均可以經過Queue來傳輸。app

from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])

if __name__ == '__main__':
    # 使用Queue進行數據通訊
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    # 主進程取得子進程中的數據
    print(q.get())  # prints "[42, None, 'hello']"
    p.join()

# 執行結果
# [42, None, 'hello']
複製代碼

Pipe()返回一對經過管道鏈接的Connection對象。這兩個對象能夠理解爲管道的兩端,它們經過send()recv()發送和接收數據。異步

from multiprocessing import Process, Pipe

def write(conn):
    # 子進程中發送一個對象
    conn.send([42, None, 'hello'])
    conn.close()

def read(conn):
    # 在讀的進程中經過recv接收對象
    data = conn.recv()
    print(data)

if __name__ == '__main__':
    # Pipe()方法返回一對鏈接對象
    w_conn, r_conn = Pipe()

    wp = Process(target=write, args=(w_conn,))
    rp = Process(target=read, args=(r_conn,))

    wp.start()
    rp.start()

# 執行結果
# [42, None, 'hello']

複製代碼

須要注意的是,兩個進程不能同時對一個鏈接對象進行sendrecv操做。

同步

咱們知道線程間的同步是經過鎖機制來實現的,進程也同樣。

from multiprocessing import Process, Lock
import time

def print_with_lock(l, i):
    l.acquire()
    try:
        time.sleep(1)
        print('hello world', i)
    finally:
        l.release()

def print_without_lock(i):
    time.sleep(1)
    print('hello world', i)

if __name__ == '__main__':
    lock = Lock()

    # 先執行有鎖的
    for num in range(5):
        Process(target=print_with_lock, args=(lock, num)).start()
    # 再執行無鎖的
    # for num in range(5):
    # Process(target=print_without_lock, args=(num,)).start()

複製代碼

有鎖的代碼將每秒依次打印

hello world 0
hello world 1
hello world 2
hello world 3
hello world 4
複製代碼

若是執行無鎖的代碼,則在個人電腦上執行結果是這樣的

hello worldhello world  0
1
hello world 2
hello world 3
hello world 4
複製代碼

除了Lock,還包括RLockConditionSemaphoreEvent等進程間的同步原語。其用法也與線程間的同步原語很相似。API使用能夠參考文末中引用的文檔連接。
在工程中實現進程間的數據共享應當優先使用隊列或管道。

0x02 總結

本文對multiprocessing模塊中常見的API做了簡單的介紹。講述了ProcessPool的常見用法,同時介紹了進程間的數據方式:隊列和管道。最後簡單瞭解了進程間的同步原語。
經過與上篇的對比學習,本文的內容應該是更加容易掌握的。

0x03 引用

相關文章
相關標籤/搜索