#-*-coding:utf-8-*- '''分佈式進程指的是將Process進程分不到多臺機器上,充分利用多臺機器的性能完成複雜的任務''' #服務器端 #---------------------------------------Linux版---------------------------------------------- # import random,time,Queue # from multiprocessing.managers import BaseManager # #1、創建task_queue和result_queue用來存聽任務和結果 # task_queue=Queue.Queue() # result_queue=Queue.Queue() # class Queuemanger(BaseManager): # pass # #2、把建立的兩個隊列註冊在網絡上,利用reister方法,clallble參數關聯了Queue對象,將Queue對象在網絡中暴露 # Queuemanger.register('get_task_queue',callable=lambda:task_queue) # Queuemanger.register('get_result_queue',callable=lambda:result_queue) # #3、綁定端口8001,設置端口口令;admin,至關於對象的初始化 # manager=Queuemanger(address=('',8001),authkey='admin') # #4、啓動管理監聽信息通道 # manager.start() # #5、經過管理實例的方法得到經過網絡訪問的Queue對象 # task=manager.get_task_queue() # result=manager.get_result_queue() # #6、添加任務 # for url in ["ImageUrl_"+str(i) for i in range(10)]: # print("put task %s..."%url) # task.put(url) # #獲取返回結果 # print("try get result...") # for i in range(10): # print("result is %s"%result.get(timeout=10)) # #關閉管理 # manager.shutdown() #--------------------------------------------------Windows版----------------------------------- #taskManager for Windows import Queue from multiprocessing.managers import BaseManager from multiprocessing import freeze_support #任務個數 task_number=20 #定義收發隊列 task_queue=Queue.Queue(task_number) result_queue=Queue.Queue(task_number) def get_task(): return task_queue def get_result(): return task_queue() #建立相似的QueueManager class QueueManager(BaseManager): pass def win_run(): #windows下綁定調用接口不能使用lambda因此只能先定義函數再綁定 QueueManager.register('get_task_queue',callable=get_task) QueueManager.register('get_result_queue',callable=get_result) #綁定端口並設置驗證口令,Windows下須要填寫IP地址,linux下不填寫默認使用本地IP地址 manager=QueueManager(address=('127.0.0.1',8001),authkey='admin') #啓動 manager.start() try: #經過網絡獲取任務隊列和結果隊列 task=manager.get_task_queue() result=manager.get_result_queue() #添加任務 for url in["ImageUrl_"+str(i) for i in range(10)]: print('put task %s... '%url) task.put(url) print('try get result...') for i in range(10): print('result is %s '%result.get(timeout=10)) except Exception as e: print('Manager error:%s'%e) finally: #不論程序執行成功或是失敗finally都會執行,即必定要將管道關閉,不然彙報錯誤 manager.shutdown() if __name__=="__name__": #windows下多進程可能會有問題,添加如下代碼能夠緩解 freeze_support() win_run()
#客戶端python
#-*-coding:utf-8-*- #任務進程TaskWorker.py import time from multiprocessing.managers import BaseManager #建立相似的QueueManager: class QueueManager(BaseManager): pass #1、使用QueueManger註冊用於獲取Queue的方法名稱 QueueManager.register('get_task_queue') QueueManager.register('get_result_queue') #2、連接到服務器 server_addr='127.0.0.1' print('Connect to server %s...'%server_addr) #端口和驗證口令須要與服務器保持一致 m=QueueManager(address=(server_addr,8001),authkey='admin') #從網絡連接 m.connect() #3、獲取Queue的對象 task=m.get_task_queue() result=m.get_result_queue() #4、從 task隊列獲取任務,並把結果寫入result while(not task.empty()): image_url=task.get(True,timeout=5) print('run task download %s...'%image_url) time.sleep(1) result.put('%s--->sucess'%image_url) print('worker exit.')