Python: 多進程的分佈式進程multiprocessing.managers

multiprocessing.managers

 

在Thread和Process中,應當優選Process,由於Process更穩定,並且,Process能夠分佈到多臺機器上,而Thread最多隻能分佈到同一臺機器的多個CPU上。html

Python的multiprocessing模塊不但支持多進程,其中managers子模塊還支持把多進程分佈到多臺機器上。一個服務進程能夠做爲調度者,將任務分佈到其餘多個進程中,依靠網絡通訊。因爲managers模塊封裝很好,沒必要了解網絡通訊的細節,就能夠很容易地編寫分佈式多進程程序。python

 

 

Server process

Manager()返回一個manager對象。它控制一個服務器進程,這個進程會管理Python對象並容許其餘進程經過代理的方式來操做這些對象。git

manager對象支持多種類型。例子見下:github

from multiprocessing import Process, Manager

def f(d, l):
    d[1] = "1"
    d["2"] = 2
    d[0.25] = None
    l.reverse()

if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()       #產生一個代理對象d
        l = manager.list(range(10))

        p = Process(target=f, args=(d,l))
        p.start()
        p.join()

        print(d)
        print(l)

解釋:bootstrap

with語句:見這篇文章

with 語句是從 Python 2.5 開始引入的一種與異常處理相關的功能(2.5 版本中要經過 from __future__ import with_statement 導入後纔可使用),從 2.6 版本開始缺省可用(參考 What's new in Python 2.6? 中 with 語句相關部分介紹)。windows

with 語句適用於對資源進行訪問的場合,確保無論使用過程當中是否發生異常都會執行必要的「清理」操做,釋放資源,好比文件使用後自動關閉、線程中鎖的自動獲取acquire和release等。服務器

⚠️,with語句的實現相似try..finally。網絡

 

代理對象:

  • 指向其餘共享對象的對象。
  • 共享對象也能夠說是代理 指涉 的對象。
  • 多個代理對象可能指向同一個指涉對象。

代理對象代理了指涉對象的一系列方法調用(雖然並非指涉對象的每一個方法都有必要被代理)。經過這種方式,代理的使用方法能夠和它的指涉對象同樣:app

>>> from multiprocessing import Manager >>> manager = Manager() >>> l = manager.list([i*i for i in range(10)]) >>> print(l) [0, 1, 4, 9, 16, 25, 36, 49, 64, 81] >>> print(repr(l)) <ListProxy object, typeid 'list' at 0x...> >>> l[4] 16 >>> l[2:5] [4, 9, 16]

上面使用了list(), dict()方法dom

 

管理器的特色:

服務器進程管理器比使用共享內存對象更靈活,它們支持二進制對象類型。

同時,一個單獨的manager能夠被網絡上的不一樣計算機的進程共享。

缺點是比使用shared memory慢。

 

使用manager對象能夠建立一個共享queue。具體見下一章節:


 

 

Managers

 

Managers提供了建立一種數據的方法,這個數據能夠被不一樣的進程共享。這種共享也包括經過網絡在不一樣計算機的進程上共享。

 

multiprocessing.Manager()

返回一個已啓動的SyncManager對象(BaseManager的子類的實例對象),用於在進程之間分享數據。

SyncManager對象(點擊查看方法)對應一個已經啓動的子進程,它擁有一系列方法,能夠爲大部分經常使用數據類型建立並返回 代理對象 代理,用於進程間同步。甚至包括共享列表和字典。(👆的代碼例子)

 

當管理器被垃圾回收或者父進程退出時,管理器進程會當即退出。

 

class multiprocessing.managers.BaseManager([address[, authkey]])

建立一個BaseManager對象。建立後,須要調用start()或get_server().server_forever()確保對象對於的管理器進程已經啓動。

  • address參數,管理器服務進程監聽的地址。若是值是None,則任意主機的請求都能創建鏈接。
  • authkey參數,byte類的字符串。認真標識(驗證碼)

 start(), 爲管理器開啓子進程。

 get_server(),返回一個Server對象。

 connect(), 鏈接本地管理器對象到一個遠程管理器進程

 shutdown() 中止管理器的進程。配合start()。

 register(typid, callable)⚠️最重要的類方法,凡是註冊到管理器的類型/對象,就能夠被網絡上的不一樣進程共享了。

  


 

例子

下面是一個簡單的Master/Worker模型,實現一個簡單的分佈計算。若是要啓動多個worker,就能夠把任務分配到多臺機器上了,

好比把計算n*n的代碼替換成發送郵件,就實現了郵件隊列的異步發送。

 

經過manager模塊的支持,多進程分佈到多臺機器上。一個服務進程能夠做爲調度者,將任務分佈到其餘多個進程中。

 

⚠️

注意Queue的做用是用來傳遞任務和接收結果,每一個任務的描述數據量要儘可能小。

好比發送一個處理日誌文件的任務,就不要發送幾百兆的日誌文件自己,而是發送日誌文件存放的完整路徑,由Worker進程再去共享的磁盤上讀取文件。

 

案例代碼 參考了https://www.liaoxuefeng.com/wiki/1016959663602400/1017631559645600#0,但這個代碼不適合python3.8版本的了。

會報告2個錯誤。

第一個❌

#_pickle.PicklingError: Can't pickle <function <lambda> at 0x107ef8670>: attribute lookup <lambda> on __main__ failed

網上查了一下,https://github.com/scikit-learn/scikit-learn/issues/9467這篇文章指出pickle模塊不指出lambda函數。

看文檔,https://docs.python.org/zh-cn/3/library/pickle.html ,被封存對象不能是lambda函數返回的對象。只能是def定義返回的對象。

第二個❌

RuntimeError:
        An attempt has been made to start a new process before the
        current process has finished its bootstrapping phase.

        This probably means that you are not using fork to start your
        child processes and you have forgotten to use the proper idiom
        in the main module:

            if __name__ == '__main__':
                freeze_support()
                ...

看文檔https://docs.python.org/zh-cn/3/library/multiprocessing.html, 3.8版本增長了freeze_support()函數。主要是爲了支持windows可執行文件。畢竟multiprocessing可用於分佈式進程。

因此必須引入freeze_support:

看代碼:

服務器上的代碼:

import random, time, queue
from multiprocessing.managers import BaseManager
from multiprocessing import freeze_support

# 創建2個隊列,一個發送,一個接收
task_queue = queue.Queue()
result_queue = queue.Queue()

def get_task():
    return task_queue

def get_result():
    return result_queue

class QueueManager(BaseManager): pass
# 服務器的管理器上註冊2個共享隊列
QueueManager.register('get_task', callable=get_task)
QueueManager.register('get_result', callable=get_result)
# 設置端口,地址默認爲空。驗證碼authkey須要設定。
manager = QueueManager(address=('', 5000), authkey=b'abc')

def manager_run():
    manager.start()
    # 經過管理器訪問共享隊列。
    task = manager.get_task()
    result = manager.get_result()

    #對隊列進行操做, 往task隊列放進任務。
    for value in range(10):
        n = random.randint(0,100)
        print('Put task %d' % n)
        task.put(n)
    # 從result隊列取出結果
    print('Try get result...')
    try:
        for value in range(10):
            r = result.get(timeout=10)
            print('Result: %s' % r)
    except queue.Empty:
        print('result is empty')
    # 關閉管理器。
    manager.shutdown()
    print('master exit.')

if __name__ == '__main__':
    freeze_support()
    manager_run()

 

另外一臺機器(或本機啓動也能夠):

import time, sys, queue
from multiprocessing.managers import BaseManager

class QueueManager(BaseManager): pass

# 從網絡上的服務器上獲取Queue,因此註冊時只提供服務器上管理器註冊的隊列的名字:
QueueManager.register('get_task')
QueueManager.register('get_result')

server_addr = '127.0.0.1'
print('Connect to server %s...' % server_addr)
# b'abc'至關於'abc'.encode('ascii'),類型是bytes
m = QueueManager(address=(server_addr, 5000), authkey=b'abc')
# 鏈接服務器
m.connect()
# 得到服務器上的隊列對象
task = m.get_task()
result = m.get_result()

for value in range(10):
    try:
        n = task.get(timeout=1)
        print('run task %d * %d...' % (n, n))
        r = '%d * %d = %d' % (n , n, n*n)
        time.sleep(1)
        result.put(r)
    except queue.Empty:
        print('task queue is empty')

print('worker exit.')
相關文章
相關標籤/搜索