rabbitmq之rpc

環境:windows或者Linux,python3.6,rabbitmq3.5
要求:
能夠對指定機器異步的執行多個命令
例子:
>>:run "df -h" --hosts 192.168.3.55 10.4.3.4
task id: 45334
>>: check_task 45334
>>:
注意,每執行一條命令,即馬上生成一個任務ID,不需等待結果返回,經過命令check_task TASK_ID來獲得任務結果

項目結構:
rpc_client ---|
bin ---|
start_client.py ......啓動生成者
core---|
main.py ......生產者主程序
rpc_server ---|
bin ---|
start_server.py ......啓動消費者
core---|
ftp_server.py ......消費者主程序


用法:
啓動start_client.py,輸入命令格式爲 run "shell指令或者dos指令" --hosts ip ip
啓動start_server.py
若是消費者本機ip爲輸入Ip之一,則回收到指令並返回指令結果
返回後打印task id
而後經過指令check_task id 則能夠查詢返回結果

producer:python

#!/usr/bin/env python
# -*-coding:utf-8-*-
# Author:zh
import pika
import random


class RpcClient(object):
    # 這個類做爲生成者,用來發消息
    def __init__(self, command, ip, corr_id):
        self.response = None   # 消息
        self.command = command  # 命令
        self.queue_name = ip  # 管道名稱爲IP
        self.corr_id = str(corr_id)   # 隨機生成的task id
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
        self.channel = self.connection.channel()
        self.channel.queue_declare(queue=self.queue_name, durable=True)
        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue
        self.channel.basic_consume(self.on_response,
                                   no_ack=True,
                                   queue=self.callback_queue)

    def on_response(self, ch, method, props, body):
        # 收到消息後根據corr_id判斷消息是由誰返回
        if self.corr_id == props.correlation_id:
            self.response = body.decode()

    def call(self):
        # 發送命令
        self.channel.basic_publish(exchange='',
                                   routing_key=self.queue_name,
                                   properties=pika.BasicProperties(
                                       reply_to=self.callback_queue,
                                       correlation_id=self.corr_id,
                                       delivery_mode=2,
                                   ),
                                   body=self.command)
        while self.response is None:
            self.connection.process_data_events()  # 非阻塞版的start_consume
        return self.response


class Command(object):
    # 這個類用來解析命令並傳給生產者
    def __init__(self):
        self.used_id = []   # 隨機生成ID,避免重複,已生成的ID放入此列表中
        self.id_data = {}   # 將返回結果放入字典中存放

    def run(self, *args):
        # 這個方法用來解析命令並傳給生產者
        cmd = args[0]
        cmd_str = cmd.split("\"")
        try:
            command = cmd_str[1]
            ip_list = cmd_str[2].split()
            ip_list.remove('--hosts')
            corr_id = random.randint(10000, 99999)
            while corr_id in self.used_id:  # 防止corr_id重複
                corr_id = random.randint(10000, 99999)
            else:
                self.used_id.append(corr_id)
            return_body = {}
            for i in ip_list:
                rpc_obj = RpcClient(command, i, corr_id)
                return_body[i] = rpc_obj.call()
            self.id_data[corr_id] = return_body
            print("task id:", corr_id)
        except IndexError:
            print("輸入格式錯誤")
            self.help()
        except ValueError:
            print("no --hosts")
            self.help()

    def check_task(self, *args):
        # 這個方法用來獲取ID結果
        cmd = args[0]
        cmd_str = cmd.split()
        if len(cmd_str) == 2:
            corr_id = int(cmd_str[1])
            if corr_id in list(self.id_data.keys()):
                result = self.id_data.pop(corr_id)
                for key in result:
                    print("\033[32;1m%s:\033[0m" % key)
                    for i in eval(result[key]):
                        print(i)
            else:
                print("no this task id")

    @staticmethod
    def help():
        print('''請輸入如下命令格式:\033[32;1m
start command: run "command" --hosts ip ip 

get answer: check_task:id\033[0m
        ''')


def start():
    cmd_obj = Command()
    while True:
        cmd = input("-->:")
        if len(cmd) == 0:
            continue
        cmd_str = cmd.split()
        cmd_title = cmd_str[0]
        if hasattr(cmd_obj, cmd_title):
            func = getattr(cmd_obj, cmd_title)
            func(cmd)
        else:
            cmd_obj.help()
main

consumer:shell

 

#!/usr/bin/env python
# -*-coding:utf-8-*-
# Author:zh
import pika
import subprocess
import locale
import codecs
import platform
import socket
import os


class RpcServer(object):
    #  這個類用來接收發來本機的消息,管道名爲本機IP,將結果發往生成者
    def __init__(self, queue_name):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
        self.channel = self.connection.channel()
        self.channel.queue_declare(queue=queue_name, durable=True)
        self.channel.basic_consume(self.on_request, queue=queue_name)

    def start(self):
        # 開始等待消息
        self.channel.start_consuming()

    def on_request(self, ch, method, props, body):
        # 收到消息後返回消息結果
        response = self.run_order(body.decode())
        ch.basic_publish(exchange='',
                         routing_key=props.reply_to,
                         properties=pika.BasicProperties(correlation_id=props.correlation_id,
                                                         delivery_mode=2,),
                         body=str(response))
        ch.basic_ack(delivery_tag=method.delivery_tag)

    @staticmethod
    def run_order(cmd):
        # 這個方法用來執行命令並返回結果
        cmd_result = subprocess.Popen(args=cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
        return [
                cmd_result.stdout.read().decode(codecs.lookup(locale.getpreferredencoding()).name),
                cmd_result.stderr.read().decode(codecs.lookup(locale.getpreferredencoding()).name)
                ]

if __name__ == "__main__":
    hostname = socket.gethostname()
    system_type = platform.system()
    if system_type == 'Windows':
        ip = socket.gethostbyname(hostname)
    elif system_type == "Linux":
        ip = str(os.popen("LANG=C ifconfig | grep \"inet addr\" | grep -v \"127.0.0.1\" | awk -F "
                          "\":\" '{print $2}' | awk '{print $1}'").readlines()[0]).strip()  # 從Linux獲取ip
    prc_obj = RpcServer(ip)
    prc_obj.start()
View Code
相關文章
相關標籤/搜索