Python 分佈式進程

#-*-coding:utf-8-*-
'''分佈式進程指的是將Process進程分不到多臺機器上,充分利用多臺機器的性能完成複雜的任務'''
#服務器端
#---------------------------------------Linux版----------------------------------------------
# import random,time,Queue
# from multiprocessing.managers import BaseManager
# #1、創建task_queue和result_queue用來存聽任務和結果
# task_queue=Queue.Queue()
# result_queue=Queue.Queue()
# class Queuemanger(BaseManager):
#     pass
# #2、把建立的兩個隊列註冊在網絡上,利用reister方法,clallble參數關聯了Queue對象,將Queue對象在網絡中暴露
# Queuemanger.register('get_task_queue',callable=lambda:task_queue)
# Queuemanger.register('get_result_queue',callable=lambda:result_queue)
# #3、綁定端口8001,設置端口口令;admin,至關於對象的初始化
# manager=Queuemanger(address=('',8001),authkey='admin')
# #4、啓動管理監聽信息通道
# manager.start()
# #5、經過管理實例的方法得到經過網絡訪問的Queue對象
# task=manager.get_task_queue()
# result=manager.get_result_queue()
# #6、添加任務
# for url in ["ImageUrl_"+str(i) for i in range(10)]:
#     print("put task %s..."%url)
#     task.put(url)
# #獲取返回結果
# print("try get result...")
# for i in range(10):
#     print("result is %s"%result.get(timeout=10))
# #關閉管理
# manager.shutdown()
#--------------------------------------------------Windows版-----------------------------------
#taskManager for Windows
import Queue
from multiprocessing.managers import BaseManager
from multiprocessing import freeze_support
#任務個數
task_number=20
#定義收發隊列
task_queue=Queue.Queue(task_number)
result_queue=Queue.Queue(task_number)
def get_task():
    return task_queue
def get_result():
    return task_queue()
#建立相似的QueueManager
class QueueManager(BaseManager):
    pass
def win_run():
    #windows下綁定調用接口不能使用lambda因此只能先定義函數再綁定
    QueueManager.register('get_task_queue',callable=get_task)
    QueueManager.register('get_result_queue',callable=get_result)
    #綁定端口並設置驗證口令,Windows下須要填寫IP地址,linux下不填寫默認使用本地IP地址
    manager=QueueManager(address=('127.0.0.1',8001),authkey='admin')
    #啓動
    manager.start()
    try:
        #經過網絡獲取任務隊列和結果隊列
        task=manager.get_task_queue()
        result=manager.get_result_queue()
        #添加任務
        for url in["ImageUrl_"+str(i) for i in range(10)]:
            print('put  task %s... '%url)
            task.put(url)
        print('try get result...')
        for i in range(10):
            print('result is %s '%result.get(timeout=10))
    except Exception as e:
        print('Manager error:%s'%e)
    finally:
        #不論程序執行成功或是失敗finally都會執行,即必定要將管道關閉,不然彙報錯誤
        manager.shutdown()
if __name__=="__name__":
    #windows下多進程可能會有問題,添加如下代碼能夠緩解
    freeze_support()
    win_run()

 #客戶端python

#-*-coding:utf-8-*-

#任務進程TaskWorker.py
import time
from multiprocessing.managers import BaseManager
#建立相似的QueueManager:
class QueueManager(BaseManager):
    pass
#1、使用QueueManger註冊用於獲取Queue的方法名稱
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')
#2、連接到服務器
server_addr='127.0.0.1'
print('Connect to server %s...'%server_addr)
#端口和驗證口令須要與服務器保持一致
m=QueueManager(address=(server_addr,8001),authkey='admin')
#從網絡連接
m.connect()
#3、獲取Queue的對象
task=m.get_task_queue()
result=m.get_result_queue()
#4、從 task隊列獲取任務,並把結果寫入result
while(not task.empty()):
    image_url=task.get(True,timeout=5)
    print('run task download %s...'%image_url)
    time.sleep(1)
    result.put('%s--->sucess'%image_url)
print('worker exit.')
相關文章
相關標籤/搜索