RabbitMQ中RPC的實現及其通訊機制

RabbitMQ中RPC的實現:客戶端發送請求消息,服務端回覆響應消息,爲了接受響應response,客戶端須要發送一個回調隊列的地址來接受響應,每條消息在發送的時候會帶上一個惟一的correlation_id,相應的服務端處理計算後會將結果返回到對應的correlation_id。
服務器

RPC調用流程:fetch

當生產者啓動時,它會建立一個匿名的獨佔回調隊列,對於一個RPC請求,生產者發送一條具備兩個屬性的消息:reply_to(回調隊列),correlation_id(每一個請求的惟一值),請求被髮送到rpc_queue隊列,消費者等待該隊列上的請求。當一個請求出現時,它會執行該任務,將帶有結果的消息發送回生產者。生產者等待回調隊列上的數據,當消息出現時,它檢查相關ID屬性,若是它與請求中的值匹配,則返回對應用程序的響應。優化

 RabbitMQ斐波拉契計算的RPC,消費者實現:ui

"""
基於RabbitMQ實現RPC通訊機制 --> 服務端
"""

import pika
import uuid
from functools import lru_cache


class RabbitServer(object):
    def __init__(self):
        self.conn = pika.BlockingConnection(
            pika.ConnectionParameters(host='localhost', port=5672)
        )
        self.channel = self.conn.channel()

        # 聲明一個隊列,並進行持久化,exclusive設置爲false
        self.channel.queue_declare(
            exclusive=False, durable=True, queue='task_queue'
        )

        # 聲明一個exhange交換機,類型爲topic
        self.channel.exchange_declare(
            exchange='logs_rpc', exchange_type='topic', durable=True
        )

        # 將隊列與交換機進行綁定
        routing_keys = ['#']  # 接受全部的消息
        for routing_key in routing_keys:
            self.channel.queue_bind(
                exchange='logs_rpc', queue='task_queue', routing_key=routing_key
            )

    @lru_cache()
    def fib(self, n):
        """
        斐波那契數列.===>程序的處理邏輯
        使用lru_cache 優化遞歸
        :param n:
        :return:
        """
        if n == 0:
            return 0
        elif n == 1:
            return 1
        else:
            return self.fib(n - 1) + self.fib(n - 2)

    def call_back(self, channel, method, properties, body):
        print('------------------------------------------')
        print('接收到的消息爲(斐波那契數列的入參項爲):{}'.format(str(body)))
        print('消息的相關屬性爲:')
        print(properties)
        value = self.fib(int(body))
        print('斐波那契數列的運行結果爲:{}'.format(str(value)))

        # 交換機將消息發送到隊列
        self.channel.basic_publish(
            exchange='',
            routing_key=properties.reply_to,
            body=str(value),
            properties=pika.BasicProperties(
                delivery_mode=2,
                correlation_id=properties.correlation_id,
            ))

        # 消費者對消息進行確認
        self.channel.basic_ack(delivery_tag=method.delivery_tag)

    def receive_msg(self):
        print('開始接受消息...')
        self.channel.basic_qos(prefetch_count=1)
        self.channel.basic_consume(
            consumer_callback=self.call_back,
            queue='task_queue',
            no_ack=False,  # 消費者對消息進行確認
            consumer_tag=str(uuid.uuid4())
        )

    def consume(self):
        self.receive_msg()
        self.channel.start_consuming()


if __name__ == '__main__':
    rabbit_consumer = RabbitServer()
    rabbit_consumer.consume()

 生產者實現:spa

"""
基於RabbitMQ實現RPC通訊機制 --> 客戶端
"""

import pika
import uuid
import time


class RabbitClient(object):
    def __init__(self):
        # 與RabbitMq服務器創建鏈接
        self.conn = pika.BlockingConnection(
            pika.ConnectionParameters(host='localhost', port=5672)
        )
        self.channel = self.conn.channel()

        # 聲明一個exchange交換機,交換機的類型爲topic
        self.channel.exchange_declare(
            exchange='logs_rpc', exchange_type='topic', durable=True
        )

        # 聲明一個回調隊列,用於接受RPC回調結果的運行結果
        result = self.channel.queue_declare(durable=True, exclusive=False)
        self.call_queue = result.method.queue

        # 從回調隊列當中獲取運行結果.
        self.channel.basic_consume(
            consumer_callback=self.on_response,
            queue=self.call_queue,
            no_ack=False
        )

    def on_response(self, channel, method, properties, body):
        """
        對收到的消息進行確認
        找到correlation_id與服務端的消息標識匹配的消息結果
        :param channel:
        :param method:
        :param properties:
        :param body:
        :return:
        """
        if self.corr_id == properties.correlation_id:
            self.response = body
            print('斐波那契數列的RPC返回結果是:{}'.format(body))
            print('相關屬性信息:')
            print(properties)
        self.channel.basic_ack(delivery_tag=method.delivery_tag)

    def send_msg(self, routing_key, message):
        """
        exchange交換機將根據消息的路由鍵將消息路由到對應的queue當中
        :param routing_key: 消息的路由鍵
        :param message: 生成者發送的消息
        :return:
        """
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(
            exchange='logs_rpc',
            routing_key=routing_key,
            body=message,
            properties=pika.BasicProperties(
                delivery_mode=2,
                correlation_id=self.corr_id,
                reply_to=self.call_queue,
            ))

        while self.response is None:
            print('等待遠程服務端的返回結果...')
            self.conn.process_data_events()  # 非阻塞式的不斷獲取消息.

        return self.response

    def close(self):
        self.conn.close()


if __name__ == "__main__":
    rabbit_producer = RabbitClient()
    routing_key = 'hello every one'
    start_time = int(time.time())
    for item in range(2000):
        num = str(item)
        print('生產者發送的消息爲:{}'.format(num))
        rabbit_producer.send_msg(routing_key, num)
    end_time = int(time.time())
    print("耗時{}s".format(str(end_time - start_time)))

計算2000之內的斐波拉契數列,執行結果以下:code

 

相關文章
相關標籤/搜索