環境:windows或者Linux,python3.6,rabbitmq3.5
要求:
能夠對指定機器異步的執行多個命令
例子:
>>:run "df -h" --hosts 192.168.3.55 10.4.3.4
task id: 45334
>>: check_task 45334
>>:
注意,每執行一條命令,即馬上生成一個任務ID,不需等待結果返回,經過命令check_task TASK_ID來獲得任務結果
項目結構:
rpc_client ---|
bin ---|
start_client.py ......啓動生成者
core---|
main.py ......生產者主程序
rpc_server ---|
bin ---|
start_server.py ......啓動消費者
core---|
ftp_server.py ......消費者主程序
用法:
啓動start_client.py,輸入命令格式爲 run "shell指令或者dos指令" --hosts ip ip
啓動start_server.py
若是消費者本機ip爲輸入Ip之一,則回收到指令並返回指令結果
返回後打印task id
而後經過指令check_task id 則能夠查詢返回結果
producer:python
#!/usr/bin/env python # -*-coding:utf-8-*- # Author:zh import pika import random class RpcClient(object): # 這個類做爲生成者,用來發消息 def __init__(self, command, ip, corr_id): self.response = None # 消息 self.command = command # 命令 self.queue_name = ip # 管道名稱爲IP self.corr_id = str(corr_id) # 隨機生成的task id self.connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost")) self.channel = self.connection.channel() self.channel.queue_declare(queue=self.queue_name, durable=True) result = self.channel.queue_declare(exclusive=True) self.callback_queue = result.method.queue self.channel.basic_consume(self.on_response, no_ack=True, queue=self.callback_queue) def on_response(self, ch, method, props, body): # 收到消息後根據corr_id判斷消息是由誰返回 if self.corr_id == props.correlation_id: self.response = body.decode() def call(self): # 發送命令 self.channel.basic_publish(exchange='', routing_key=self.queue_name, properties=pika.BasicProperties( reply_to=self.callback_queue, correlation_id=self.corr_id, delivery_mode=2, ), body=self.command) while self.response is None: self.connection.process_data_events() # 非阻塞版的start_consume return self.response class Command(object): # 這個類用來解析命令並傳給生產者 def __init__(self): self.used_id = [] # 隨機生成ID,避免重複,已生成的ID放入此列表中 self.id_data = {} # 將返回結果放入字典中存放 def run(self, *args): # 這個方法用來解析命令並傳給生產者 cmd = args[0] cmd_str = cmd.split("\"") try: command = cmd_str[1] ip_list = cmd_str[2].split() ip_list.remove('--hosts') corr_id = random.randint(10000, 99999) while corr_id in self.used_id: # 防止corr_id重複 corr_id = random.randint(10000, 99999) else: self.used_id.append(corr_id) return_body = {} for i in ip_list: rpc_obj = RpcClient(command, i, corr_id) return_body[i] = rpc_obj.call() self.id_data[corr_id] = return_body print("task id:", corr_id) except IndexError: print("輸入格式錯誤") self.help() except ValueError: print("no --hosts") self.help() def check_task(self, *args): # 這個方法用來獲取ID結果 cmd = args[0] cmd_str = cmd.split() if len(cmd_str) == 2: corr_id = int(cmd_str[1]) if corr_id in list(self.id_data.keys()): result = self.id_data.pop(corr_id) for key in result: print("\033[32;1m%s:\033[0m" % key) for i in eval(result[key]): print(i) else: print("no this task id") @staticmethod def help(): print('''請輸入如下命令格式:\033[32;1m start command: run "command" --hosts ip ip get answer: check_task:id\033[0m ''') def start(): cmd_obj = Command() while True: cmd = input("-->:") if len(cmd) == 0: continue cmd_str = cmd.split() cmd_title = cmd_str[0] if hasattr(cmd_obj, cmd_title): func = getattr(cmd_obj, cmd_title) func(cmd) else: cmd_obj.help()
consumer:shell
#!/usr/bin/env python # -*-coding:utf-8-*- # Author:zh import pika import subprocess import locale import codecs import platform import socket import os class RpcServer(object): # 這個類用來接收發來本機的消息,管道名爲本機IP,將結果發往生成者 def __init__(self, queue_name): self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) self.channel = self.connection.channel() self.channel.queue_declare(queue=queue_name, durable=True) self.channel.basic_consume(self.on_request, queue=queue_name) def start(self): # 開始等待消息 self.channel.start_consuming() def on_request(self, ch, method, props, body): # 收到消息後返回消息結果 response = self.run_order(body.decode()) ch.basic_publish(exchange='', routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id=props.correlation_id, delivery_mode=2,), body=str(response)) ch.basic_ack(delivery_tag=method.delivery_tag) @staticmethod def run_order(cmd): # 這個方法用來執行命令並返回結果 cmd_result = subprocess.Popen(args=cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) return [ cmd_result.stdout.read().decode(codecs.lookup(locale.getpreferredencoding()).name), cmd_result.stderr.read().decode(codecs.lookup(locale.getpreferredencoding()).name) ] if __name__ == "__main__": hostname = socket.gethostname() system_type = platform.system() if system_type == 'Windows': ip = socket.gethostbyname(hostname) elif system_type == "Linux": ip = str(os.popen("LANG=C ifconfig | grep \"inet addr\" | grep -v \"127.0.0.1\" | awk -F " "\":\" '{print $2}' | awk '{print $1}'").readlines()[0]).strip() # 從Linux獲取ip prc_obj = RpcServer(ip) prc_obj.start()