程序要求:html
1. 用Rabbit MQ實現RPC多線程
1. 能夠異步地執行多條命令app
2. 能夠對一次性對多個機器執行命令框架
程序效果:dom
---》run dir host1 host2 。。。。異步
---》get task_idsocket
---》taskId:xxxx host: xxxxxxide
---》check task_idui
--->打印結果spa
程序分析:
爲了達到異步地效果,能夠使用多線程或協程,即每執行一條命令就啓動一條線程或協程。客戶端發送命令到隊列、從返回隊列接收結果分離,不能寫到一塊兒。
業務邏輯:
代碼實現:
README
#author:Wu zhiHao #博客地址:https://www.cnblogs.com/BUPT-MrWu/p/10364619.html #程序目錄框架: |--RPC |--RPC_server #服務端 |--bin |--start.py #程序入口 |--core |--RpcServer.py #服務端主要邏輯 |--RPC_client #客戶端 |--bin |--start.py #程序入口 |--core |--main.py #程序主要邏輯 |--modules |--RpcClient.py #客戶端主要邏輯 |--conf |--settings.py #配置文件 |--READ_ME #命令格式: 1. run command host1 host2..... #執行命令 2. all_task #獲取所有task_id 3. check task_id #獲取命令結果
RPC_server\\bin\\start.py
import sys,os BASE_dir = os.path.abspath(os.path.dirname(os.path.dirname(__file__))) sys.path.append(BASE_dir) from core import RpcServer if __name__ == '__main__': obj = RpcServer.RpcServer() obj.channel.start_consuming()
RPC_server\\core\\RpcServer.py
import pika import os import socket from conf import settings class RpcServer(object): def __init__(self): self.credentials = pika.PlainCredentials(settings.RabbitMq_name,settings.RabbitMq_password) #RabbiMQ用戶認證 self.connection = pika.BlockingConnection( pika.ConnectionParameters( settings.RabbitMq_ip,settings.RabbitMq_port,"/",self.credentials, ) ) self.My_Ip = self.get_ip() #獲取服務端IP地址 self.channel = self.connection.channel() self.result = self.channel.queue_declare(exclusive=True) self.queue_name = self.result.method.queue self.channel.exchange_declare( exchange="Rpc", exchange_type="direct", ) self.channel.queue_bind( exchange="Rpc", queue=self.queue_name, routing_key=self.My_Ip, ) self.channel.basic_consume( self.on_response, queue=self.queue_name, ) def on_response(self,ch,method,properties,body): command = body.decode() command_result = self.on_request(command) self.channel.basic_publish( exchange="", routing_key=properties.reply_to, properties=pika.BasicProperties( correlation_id=properties.correlation_id, ), body=command_result ) def on_request(self,command): return os.popen(command).read() def get_ip(self): computer_name = socket.getfqdn(socket.gethostname( )) computer_Ip = socket.gethostbyname(computer_name) return computer_Ip
RPC_client\\bin\\start.py
import sys,os BASE_dir = os.path.abspath(os.path.dirname(os.path.dirname(__file__))) sys.path.append(BASE_dir) from core import main if __name__ == '__main__': obj = main.run() obj.start()
RPC_client\\core\\main.py
import random import threading from modules import RpcClient class run(object): def __init__(self): self.client = RpcClient.RpcClient() self.information = {} def start(self): while True: try: command = input("-->") if not command:continue t = threading.Thread(target=self.select,args=(command,)) t.start() except Exception as e: print(e) def select(self,command): '''解析命令''' try: keyword = command.split()[0] func = getattr(self,keyword) func(command) except Exception as e: print(e) def run(self,command): '''執行命令''' try: task_id = str(random.randint(100,1000)) self.information[task_id] = {} keyword = command.split()[1] for host in command.split()[2:]: result = self.client.on_request(host,keyword) self.information[task_id][host] = [result[0],result[1]] except Exception as e: print(e) def check(self,command): '''獲取命令結果''' try: task_id = command.split()[1] for host in self.information[task_id]: corr_id = self.information[task_id][host][0] callback_queue = self.information[task_id][host][1] command_result = self.client.get_response(corr_id,callback_queue) print("%s:\n%s"%(host,command_result)) self.information.pop(task_id) #刪除task_id except Exception as e: print(e) def all_task(self,command): '''獲取所有task_id''' try: for task_id in self.information: all_host = [] for host in self.information[task_id]: all_host.append(host) print("task_id: %s host: %s\n"%(task_id,all_host)) except Exception as e: print(e)
RPC_client\\conf\\settings.py
RabbitMq_name = "XXX" #RabbitMq用戶名 RabbitMq_password = "XXX" #rabbitmq用戶密碼 RabbitMq_ip = "XXX" #RabbitMq端的IP地址 RabbitMq_port = 5672 #RabbitMq端的端口號
RPC_client\\mudules\\RpcClient.py
import pika import uuid from conf import settings class RpcClient(object): def __init__(self): self.credentials = pika.PlainCredentials(settings.RabbitMq_name,settings.RabbitMq_password) #RabbiMQ用戶認證 self.connection = pika.BlockingConnection( pika.ConnectionParameters( settings.RabbitMq_ip,settings.RabbitMq_port,"/",self.credentials, ) ) self.channel = self.connection.channel() def get_response(self,corr_id,callback_queue): '''從隊列裏取值''' self.corr_id = corr_id self.response = None self.channel.basic_consume( self.on_response, queue=callback_queue, ) while self.response is None: self.connection.process_data_events() #非阻塞版的start_consuming return self.response def on_response(self,ch,method,properties,body): '''當隊列裏有數據時執行''' if self.corr_id == properties.correlation_id: self.response = body.decode() def on_request(self,host,command): '''發送命令''' result = self.channel.queue_declare(exclusive=False) #生成另外一個queue時,這個queue不會消失 callback_queue = result.method.queue #返回queue corr_id = str(uuid.uuid4()) #驗證碼 self.channel.exchange_declare( exchange="Rpc", exchange_type="direct" ) self.channel.basic_publish( exchange="Rpc", routing_key=host, properties=pika.BasicProperties( correlation_id=corr_id, reply_to=callback_queue, ), body=command, ) return corr_id,callback_queue #返回驗證值和返回queue
程序執行實例: