python分佈式進程

分佈式進程指的是將Process進程分佈到多臺機器上,充分利用多態機器的性能完成複雜的任務

  • 分佈式進程在python 中依然要用到multiprocessing 模塊。multiprocessing模塊不但支持多進程,其中managers子模塊還支持把多進程分佈到多臺機器上。能夠寫一個服務進程做爲調度者,將任務分佈到其餘多
    個進程中,依靠網絡通訊進行管理。例子:在作爬蟲程序時,抓取某個網站的全部圖片,若是使用多進程的話,通常是一個進程負責抓取圖片的連接地址,將連接地址放到queue中,另外的進程負責從queue中取連接地址進行下載和存儲到本地。如今把這個過程作成分佈式,一臺機器上的進程負責抓取連接地址,其餘機器上的進程負責系在存儲。那麼遇到的主要問題是將queue 暴露到網絡中,讓其餘機器進程均可以訪問,分佈式進程就是將這個過程進行了封裝,咱們能夠將這個過程稱爲本地隊列的網絡化python

要實現上面例子的功能,建立分佈式進程須要分爲 六個步驟

  • 創建隊列Queue ,用來進行進程間通訊。服務進程建立任務隊列task_queue 用來做爲傳遞任務給任務進程的通道;服務進程建立結果隊列result_queue ,做爲任務進程完成任務後回覆服務進程的通道。在分佈式多進程環境下,必須由Queuemanager得到Queue 接口來添加任務
  • 把第一步中創建的隊列在網絡上註冊,暴露給其餘進程(主機),註冊後得到網絡隊列,至關於本地隊列的映像
  • 創建一個對象(Queuemanager(BaseManager))實例manager,綁定端口和驗證口令
  • 啓動第三步中創建的實例,即啓動管理manager,監管信息通道
  • 經過管理實例的方法得到經過網絡訪問的Queue對象,即再把網絡隊列實體化成可使用的本地隊列
  • 建立任務到 「本地」隊列中,自動上傳任務到網絡隊列中,分配給任務進程進行處理

接下來經過程序實現上面的列子(window版),首先編寫的是服務進程(taskManager.py)

#!coding:utf-8
from multiprocessing.managers import BaseManager
from multiprocessing import freeze_support, Queue

# 任務個數
task_number = 10

# 收發隊列
task_quue = Queue(task_number)
result_queue = Queue(task_number)


def get_task():
    return task_quue


def get_result():
    return result_queue


# 建立相似的queueManager
class QueueManager(BaseManager):
    pass


def win_run():
    # 註冊在網絡上,callable 關聯了Queue 對象
    # 將Queue對象在網絡中暴露
    #window下綁定調用接口不能直接使用lambda,因此只能先定義函數再綁定
    QueueManager.register('get_task_queue', callable=get_task)
    QueueManager.register('get_result_queue', callable=get_result)
    # 綁定端口和設置驗證口令
    manager = QueueManager(address=('127.0.0.1', 8001), authkey='qiye'.encode())

    # 啓動管理,監聽信息通道
    manager.start()

    try:

        # 經過網絡獲取任務隊列和結果隊列
        task = manager.get_task_queue()
        result = manager.get_result_queue()

        # 添加任務
        for url in ["ImageUrl_" + str(i) for i in range(10)]:
            print('url is %s' % url)
            task.put(url)

        print('try get result')
        for i in range(10):
            print('result is %s' % result.get(timeout=10))

    except:
        print 'Manager error'
    finally:
        manager.shutdown()


if __name__ == '__main__':
    # window下多進程可能有問題,添加這句話緩解
    freeze_support()
    win_run()

服務進程已經編寫好,接下來任務進程(taskWorker.py)建立四步驟:

  • 使用QueueManager註冊用於獲取Queue的方法名稱,任務進程只能經過名稱來網絡獲取Queue
  • 鏈接服務器,端口和驗證口令注意保持與服務器進程中徹底一致
  • 從網絡獲取Queue,進行本地化
  • 從task隊列獲取任務,而且把結果寫入result隊列
#coding:utf-8
import time
from multiprocessing.managers import BaseManager
# 建立相似的QueueManager:
class QueueManager(BaseManager):
    pass
# 實現第一步:使用QueueManager註冊獲取Queue的方法名稱
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')
# 實現第二步:鏈接到服務器:
server_addr = '127.0.0.1'
print('Connect to server %s...' % server_addr)
# 端口和驗證口令注意保持與服務進程設置的徹底一致:
m = QueueManager(address=(server_addr, 8001), authkey='qiye')
# 從網絡鏈接:
m.connect()
# 實現第三步:獲取Queue的對象:
task = m.get_task_queue()
result = m.get_result_queue()
# 實現第四步:從task隊列取任務,並把結果寫入result隊列:
while(not task.empty()):
        image_url = task.get(True,timeout=5)
        print('run task download %s...' % image_url)
        time.sleep(1)
        result.put('%s--->success'%image_url)
# 處理結束:
print('worker exit.')

運行結果:

  • taskManager.py
C:\Python27\python.exe F:/python_scrapy/python_study/taskManager.py
url is ImageUrl_0
url is ImageUrl_1
url is ImageUrl_2
url is ImageUrl_3
url is ImageUrl_4
url is ImageUrl_5
url is ImageUrl_6
url is ImageUrl_7
url is ImageUrl_8
url is ImageUrl_9
try get result
result is ImageUrl_0--->success
result is ImageUrl_1--->success
result is ImageUrl_2--->success
result is ImageUrl_3--->success
result is ImageUrl_4--->success
result is ImageUrl_5--->success
result is ImageUrl_6--->success
result is ImageUrl_7--->success
result is ImageUrl_8--->success
result is ImageUrl_9--->success

Process finished with exit code 0
  • 任務進程(taskWorker.py)
C:\Python27\python.exe F:/python_scrapy/python_study/taskWorker.py
Connect to server 127.0.0.1...
run task download ImageUrl_0...
run task download ImageUrl_1...
run task download ImageUrl_2...
run task download ImageUrl_3...
run task download ImageUrl_4...
run task download ImageUrl_5...
run task download ImageUrl_6...
run task download ImageUrl_7...
run task download ImageUrl_8...
run task download ImageUrl_9...
worker exit.

Process finished with exit code 0
相關文章
相關標籤/搜索