分佈式進程在python 中依然要用到multiprocessing 模塊。multiprocessing模塊不但支持多進程,其中managers子模塊還支持把多進程分佈到多臺機器上。能夠寫一個服務進程做爲調度者,將任務分佈到其餘多
個進程中,依靠網絡通訊進行管理。例子:在作爬蟲程序時,抓取某個網站的全部圖片,若是使用多進程的話,通常是一個進程負責抓取圖片的連接地址,將連接地址放到queue中,另外的進程負責從queue中取連接地址進行下載和存儲到本地。如今把這個過程作成分佈式,一臺機器上的進程負責抓取連接地址,其餘機器上的進程負責系在存儲。那麼遇到的主要問題是將queue 暴露到網絡中,讓其餘機器進程均可以訪問,分佈式進程就是將這個過程進行了封裝,咱們能夠將這個過程稱爲本地隊列的網絡化python
#!coding:utf-8 from multiprocessing.managers import BaseManager from multiprocessing import freeze_support, Queue # 任務個數 task_number = 10 # 收發隊列 task_quue = Queue(task_number) result_queue = Queue(task_number) def get_task(): return task_quue def get_result(): return result_queue # 建立相似的queueManager class QueueManager(BaseManager): pass def win_run(): # 註冊在網絡上,callable 關聯了Queue 對象 # 將Queue對象在網絡中暴露 #window下綁定調用接口不能直接使用lambda,因此只能先定義函數再綁定 QueueManager.register('get_task_queue', callable=get_task) QueueManager.register('get_result_queue', callable=get_result) # 綁定端口和設置驗證口令 manager = QueueManager(address=('127.0.0.1', 8001), authkey='qiye'.encode()) # 啓動管理,監聽信息通道 manager.start() try: # 經過網絡獲取任務隊列和結果隊列 task = manager.get_task_queue() result = manager.get_result_queue() # 添加任務 for url in ["ImageUrl_" + str(i) for i in range(10)]: print('url is %s' % url) task.put(url) print('try get result') for i in range(10): print('result is %s' % result.get(timeout=10)) except: print 'Manager error' finally: manager.shutdown() if __name__ == '__main__': # window下多進程可能有問題,添加這句話緩解 freeze_support() win_run()
#coding:utf-8 import time from multiprocessing.managers import BaseManager # 建立相似的QueueManager: class QueueManager(BaseManager): pass # 實現第一步:使用QueueManager註冊獲取Queue的方法名稱 QueueManager.register('get_task_queue') QueueManager.register('get_result_queue') # 實現第二步:鏈接到服務器: server_addr = '127.0.0.1' print('Connect to server %s...' % server_addr) # 端口和驗證口令注意保持與服務進程設置的徹底一致: m = QueueManager(address=(server_addr, 8001), authkey='qiye') # 從網絡鏈接: m.connect() # 實現第三步:獲取Queue的對象: task = m.get_task_queue() result = m.get_result_queue() # 實現第四步:從task隊列取任務,並把結果寫入result隊列: while(not task.empty()): image_url = task.get(True,timeout=5) print('run task download %s...' % image_url) time.sleep(1) result.put('%s--->success'%image_url) # 處理結束: print('worker exit.')
C:\Python27\python.exe F:/python_scrapy/python_study/taskManager.py url is ImageUrl_0 url is ImageUrl_1 url is ImageUrl_2 url is ImageUrl_3 url is ImageUrl_4 url is ImageUrl_5 url is ImageUrl_6 url is ImageUrl_7 url is ImageUrl_8 url is ImageUrl_9 try get result result is ImageUrl_0--->success result is ImageUrl_1--->success result is ImageUrl_2--->success result is ImageUrl_3--->success result is ImageUrl_4--->success result is ImageUrl_5--->success result is ImageUrl_6--->success result is ImageUrl_7--->success result is ImageUrl_8--->success result is ImageUrl_9--->success Process finished with exit code 0
C:\Python27\python.exe F:/python_scrapy/python_study/taskWorker.py Connect to server 127.0.0.1... run task download ImageUrl_0... run task download ImageUrl_1... run task download ImageUrl_2... run task download ImageUrl_3... run task download ImageUrl_4... run task download ImageUrl_5... run task download ImageUrl_6... run task download ImageUrl_7... run task download ImageUrl_8... run task download ImageUrl_9... worker exit. Process finished with exit code 0