Python 多進程並行編程實踐: multiprocessing 模塊

前言

並行計算是使用並行計算機來減小單個計算問題所須要的時間,咱們能夠經過利用編程語言顯式的說明計算中的不一樣部分如何再不一樣的處理器上同時執行來設計咱們的並行程序,最終達到大幅度提高程序效率的目的。html

衆所周知,Python中的GIL限制了Python多線程並行對多核CPU的利用,可是咱們仍然能夠經過各類其餘的方式來讓Python真正利用多核資源, 例如經過C/C++擴展來實現多線程/多進程, 以及直接利用Python的多進程模塊multiprocessing來進行多進程編程。python

本文主要嘗試僅僅經過python內置的multiprocessing模塊對本身的動力學計算程序來進行優化和效率提高,其中:編程

  • 實現了單機利用多核資源來實現並行並進行加速對比
  • 使用manager模塊實現了簡單的多機的分佈式計算

本文並非對Python的multiprocessing模塊的接口進行翻譯介紹,須要熟悉multiprocessing的童鞋能夠參考官方文檔https://docs.python.org/2/library/multiprocessing.html。網絡

正文

最近想用本身的微觀動力學程序進行一系列的求解並將結果繪製成二維Map圖進行可視化,這樣就須要對二維圖上的多個點進行計算並將結果收集起來並進行繪製,因爲每一個點都須要進行一次ODE積分以及牛頓法求解方程組,所以要串行地繪製整張圖可能會遇到極低的效率問題尤爲是對參數進行測試的時候,每畫一張圖都須要等好久的時間。其中繪製的二維圖中每一個點都是獨立計算的,因而很天然而然的想到了進行並行化處理。多線程

串行的原始版本

因爲腳本比較長,並且實現均爲本身的程序,腳本的大體結構以下, 本質是一個二重循環,循環的變量分別爲反應物氣體(O2 和 CO)的分壓的值:app

1編程語言

2分佈式

3函數

4工具

5

6

7

8

9

10

11

12

13

14

15

16

17

import time

import numpy as np

# 省略若干...

pCOs = np.linspace(1e-5, 0.5, 10)

pO2s = np.linspace(1e-5, 0.5, 10)

if "__main__" == __name__:

    try:

        start = time.time()

        for i, pO2 in enumerate(pO2s):

            # ...

            for j, pCO in enumerate(pCOs):

                # 針對當前的分壓值 pCO, pO2進行動力學求解

                # 具體代碼略...

        end = time.time()

        t = end - start

    finally:

        # 收集計算的結果並進行處理繪圖

總體過程就這麼簡單,我須要作的就是使用multiprocessing的接口來對這個二重循環進行並行化。

使用單核串行繪製100個點所須要的時間以下, 總共花了240.76秒:

python學習交流羣:923414804,羣內天天分享乾貨,包括最新的企業級案例學習資料和零基礎入門教程,歡迎小夥伴入羣學習。

二維map圖繪製的效果以下:

進行多進程並行處理

multiprocessing模塊

multiprocessing模塊提供了相似threading模塊的接口,並對進程的各類操做進行了良好的封裝,提供了各類進程間通訊的接口例如PipeQueue等等,能夠幫助咱們實現進程間的通訊,同步等操做。

使用Process類來動態建立進程實現並行

multiprocessing模塊提供了Process能讓咱們經過建立進程對象並執行該進程對象的start方法來建立一個真正的進程來執行任務,該接口相似threading模塊中的線程類Thread.

可是當被操做對象數目不大的時候可使用Process動態生成多個進程,可是若是須要的進程數一旦不少的時候,手動限制進程的數量以及處理不一樣進程返回值會變得異常的繁瑣,所以這個時候咱們須要使用進程池來簡化操做。

使用進程池來管理進程

multiprocessing模塊提供了一個進程池Pool類,負責建立進程池對象,並提供了一些方法來說運算任務offload到不一樣的子進程中執行,並很方便的獲取返回值。例如咱們如今要進行的循環並行便很容易的將其實現。

對於這裏的單指令多數據流的並行,咱們能夠直接使用Pool.map()來將函數映射到參數列表中。Pool.map實際上是map函數的並行版本,此函數將會阻塞直到全部進程所有結束,並且此函數返回的結果順序仍然不變。

首先,我先把針對每對分壓數據的處理過程封裝成一個函數,這樣能夠將函數對象傳遞給子進程執行。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

import time

from multiprocessing import Pool

import numpy as np

# 省略若干...

pCOs = np.linspace(1e-5, 0.5, 10)

pO2s = np.linspace(1e-5, 0.5, 10)

def task(pO2):

    '''接受一個O2分壓,根據當前的CO分壓進行動力學求解'''

    # 代碼細節省略...

if "__main__" == __name__:

    try:

        start = time.time()

        pool = Pool()                # 建立進程池對象,進程數與multiprocessing.cpu_count()相同

        tofs = pool.map(task, pCOs)  # 並行執行函數

        end = time.time()

        t = end - start

    finally:

        # 收集計算的結果並進行處理繪圖

使用兩個核心進行計算,計算時間從240.76s降到了148.61秒, 加速比爲1.62

對不一樣核心的加速效果進行測試

爲了查看使用不一樣核心數對程序效率的改善,我對不一樣的核心數和加速比進行了測試繪圖,效果以下:

運行核心數與程序運行時間:

運行核心數與加速比:

可見,因爲我外層循環只循環了10次所以使用的核心數超過10之後核心數的增長並不能對程序進行加速,也就是多餘的核心都浪費掉了。

使用manager實現簡單的分佈式計算

前面使用了multiprocessing包提供的接口咱們使用了再一臺機器上進行多核心計算的並行處理,可是multiprocessing的用處還有更多,經過multiprocessing.managers模塊,咱們能夠實現簡單的多機分佈式並行計算,將計算任務分佈到不一樣的計算機中運行。

Managers提供了另外的多進程通訊工具,他提供了在多臺計算機之間共享數據的接口和數據對象,這些數據對象所有都是經過代理類實現的,好比ListProxyDictProxy等等,他們都實現了與原生listdict相同的接口,可是他們能夠經過網絡在不一樣計算機中的進程中進行共享。

關於managers模塊的接口的詳細使用能夠參考官方文檔:https://docs.python.org/2/library/multiprocessing.html#managers

好了如今咱們開始嘗試將繪圖程序改形成能夠在多臺計算機中分佈式並行的程序。改造的主要思想是:

  1. 使用一臺計算機做爲服務端(server),此臺計算機經過一個Manager對象來管理共享對象,任務分配以及結果的接收,並再收集結果之後進行後處理(繪製二維map圖)。
  2. 其餘多臺計算機能夠做爲客戶端來接收server的數據進行計算,並將結果傳到共享數據中,讓server能夠收集。同時再client端能夠同時進行上文所實現的多進程並行來充分利用計算機的多核優點。

大體可總結爲下圖:

服務進程

首先服務端須要一個manager對象來管理共享對象

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

def get_manager():

    '''建立服務端manager對象.

    '''

 

    # 自定義manager類

    class JobManager(BaseManager):

        pass

 

    # 建立任務隊列,並將此數據對象共享在網絡中

    jobid_queue = Queue()

    JobManager.register('get_jobid_queue', callable=lambda: jobid_queue)

 

    # 建立列表代理類,並將其共享再網絡中

    tofs = [None]*N

    JobManager.register('get_tofs_list', callable=lambda: tofs, proxytype=ListProxy)

 

    # 將分壓參數共享到網絡中

    JobManager.register('get_pCOs', callable=lambda: pCOs, proxytype=ListProxy)

    JobManager.register('get_pO2s', callable=lambda: pCOs, proxytype=ListProxy)

 

    # 建立manager對象並返回

    manager = JobManager(address=(ADDR, PORT), authkey=AUTHKEY)

 

    return manager

  1. BaseManager.register是一個類方法,它能夠將某種類型或者可調用的對象綁定到manager對象並共享到網絡中,使得其餘在網絡中的計算機可以獲取相應的對象。
    例如,

1

JobManager.register('get_jobid_queue', callable=lambda: jobid_queue)

我就將一個返回任務隊列的函數對象同manager對象綁定並共享到網絡中,這樣在網絡中的進程就能夠經過本身的manager對象的get_jobid_queue方法獲得相同的隊列,這樣便實現了數據的共享.

2. 建立manager對象的時候須要兩個參數,

  • address, 即是manager所在的ip以及用於監聽與服務端鏈接的端口號,例如我若是是在內網中的192.168.0.1地址的5000端口進行監聽,那麼此參數能夠是('192.169.0.1, 5000)`
  • authkey, 顧名思義,就是一個認證碼,用於驗證客戶端時候能夠鏈接到服務端,此參數必須是一個字符串對象.

進行任務分配

上面咱們將一個任務隊列綁定到了manager對象中,如今我須要將隊列進行填充,這樣才能將任務發放到不一樣的客戶端來進行並行執行。

1

2

3

4

5

6

7

8

9

10

def fill_jobid_queue(manager, nclient):

    indices = range(N)

    interval = N/nclient

    jobid_queue = manager.get_jobid_queue()

    start = 0

    for i in range(nclient):

        jobid_queue.put(indices[start: start+interval])

        start += interval

    if N % nclient > 0:

        jobid_queue.put(indices[start:])

這裏所謂的任務其實就是相應參數在list中的index值,這樣不一樣計算機中獲得的結果能夠按照相應的index將結果填入到結果列表中,這樣服務端就能在共享的網絡中收集各個計算機計算的結果。

啓動服務端進行監聽

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

def run_server():

    # 獲取manager

    manager = get_manager()

    print "Start manager at {}:{}...".format(ADDR, PORT)

    # 建立一個子進程來啓動manager

    manager.start()

    # 填充任務隊列

    fill_jobid_queue(manager, NNODE)

    shared_job_queue = manager.get_jobid_queue()

    shared_tofs_list = manager.get_tofs_list()

 

    queue_size = shared_job_queue.qsize()

 

    # 循環進行監聽,直到結果列表被填滿

    while None in shared_tofs_list:

        if shared_job_queue.qsize() < queue_size:

            queue_size = shared_job_queue.qsize()

            print "Job picked..."

 

    return manager

任務進程

服務進程負責進行簡單的任務分配和調度,任務進程則只負責獲取任務並進行計算處理。

在任務進程(客戶端)中基本代碼與咱們上面單機中的多核運行的腳本基本相同(由於都是同一個函數處理不一樣的數據),可是咱們也須要爲客戶端建立一個manager來進行任務的獲取和返回。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

def get_manager():

    class WorkManager(BaseManager):

        pass

 

    # 因爲只是從共享網絡中獲取,所以只須要註冊名字便可

    WorkManager.register('get_jobid_queue')

    WorkManager.register('get_tofs_list')

    WorkManager.register('get_pCOs')

    WorkManager.register('get_pO2s')

 

    # 這裏的地址和驗證碼要與服務端相同才能夠進行數據共享

    manager = WorkManager(address=(ADDR, PORT), authkey=AUTHKEY)

 

    return manager

在客戶端咱們仍然能夠多進程利用多核資源來加速計算。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

if "__main__" == __name__:

 

    manager = get_manager()

    print "work manager connect to {}:{}...".format(ADDR, PORT)

 

    # 將客戶端本地的manager鏈接到相應的服務端manager

    manager.connect()

 

    # 獲取共享的結果收集列表

    shared_tofs_list = manager.get_tofs_list()

 

    # 獲取共享的任務隊列

    shared_jobid_queue = manager.get_jobid_queue()

 

    # 從服務端獲取計算參數

    pCOs = manager.get_pCOs()

    shared_pO2s = manager.get_pO2s()

 

    # 建立進程池在本地計算機進行多核並行

    pool = Pool()

 

    while 1:

        try:

            indices = shared_jobid_queue.get_nowait()

            pO2s = [shared_pO2s[i] for i in indices]

            print "Run {}".format(str(pO2s))

            tofs_2d = pool.map(task, pO2s)

 

            # Update shared tofs list.

            for idx, tofs_1d in zip(indices, tofs_2d):

                shared_tofs_list[idx] = tofs_1d

        # 直到將任務隊列中的任務所有取完,結束任務進程

        except Queue.Empty:

            break

下面我將在3臺在同一局域網中的電腦來進行簡單的分佈式計算測試,

  • 其中一臺是實驗室器羣中的管理節點, 內網ip爲10.10.10.245
  • 另外一臺爲集羣中的一個節點, 共有12個核心
  • 最後一臺爲本身的本本,4個核心
  1.  先在服務端運行服務腳本進行任務分配和監聽:

1

python server.py

2. 在兩個客戶端運行任務腳原本獲取任務隊列中的任務並執行

1

python worker.py

當任務隊列爲空且任務完成時,任務進程終止; 當結果列表中的結果收集完畢時,服務進程也會終止。

執行過程如圖:

執行結果以下圖:

上面的panel爲服務端監聽,左下爲本身的筆記本運行結果,右下panel爲集羣中的其中一個節點。

可見運行時間爲56.86s,無奈,是個人本子脫了後腿(-_-!)

總結

本文經過python內置模塊multiprocessing實現了單機內多核並行以及簡單的多臺計算機的分佈式並行計算,multiprocessing爲咱們提供了封裝良好而且友好的接口來使咱們的Python程序更方面利用多核資源加速本身的計算程序,但願能對使用python實現並行話的童鞋有所幫助。

參考

相關文章
相關標籤/搜索