RabbitMQ中RPC的實現:客戶端發送請求消息,服務端回覆響應消息,爲了接受響應response,客戶端須要發送一個回調隊列的地址來接受響應,每條消息在發送的時候會帶上一個惟一的correlation_id,相應的服務端處理計算後會將結果返回到對應的correlation_id。
服務器
RPC調用流程:fetch
當生產者啓動時,它會建立一個匿名的獨佔回調隊列,對於一個RPC請求,生產者發送一條具備兩個屬性的消息:reply_to(回調隊列),correlation_id(每一個請求的惟一值),請求被髮送到rpc_queue隊列,消費者等待該隊列上的請求。當一個請求出現時,它會執行該任務,將帶有結果的消息發送回生產者。生產者等待回調隊列上的數據,當消息出現時,它檢查相關ID屬性,若是它與請求中的值匹配,則返回對應用程序的響應。優化
RabbitMQ斐波拉契計算的RPC,消費者實現:ui
""" 基於RabbitMQ實現RPC通訊機制 --> 服務端 """ import pika import uuid from functools import lru_cache class RabbitServer(object): def __init__(self): self.conn = pika.BlockingConnection( pika.ConnectionParameters(host='localhost', port=5672) ) self.channel = self.conn.channel() # 聲明一個隊列,並進行持久化,exclusive設置爲false self.channel.queue_declare( exclusive=False, durable=True, queue='task_queue' ) # 聲明一個exhange交換機,類型爲topic self.channel.exchange_declare( exchange='logs_rpc', exchange_type='topic', durable=True ) # 將隊列與交換機進行綁定 routing_keys = ['#'] # 接受全部的消息 for routing_key in routing_keys: self.channel.queue_bind( exchange='logs_rpc', queue='task_queue', routing_key=routing_key ) @lru_cache() def fib(self, n): """ 斐波那契數列.===>程序的處理邏輯 使用lru_cache 優化遞歸 :param n: :return: """ if n == 0: return 0 elif n == 1: return 1 else: return self.fib(n - 1) + self.fib(n - 2) def call_back(self, channel, method, properties, body): print('------------------------------------------') print('接收到的消息爲(斐波那契數列的入參項爲):{}'.format(str(body))) print('消息的相關屬性爲:') print(properties) value = self.fib(int(body)) print('斐波那契數列的運行結果爲:{}'.format(str(value))) # 交換機將消息發送到隊列 self.channel.basic_publish( exchange='', routing_key=properties.reply_to, body=str(value), properties=pika.BasicProperties( delivery_mode=2, correlation_id=properties.correlation_id, )) # 消費者對消息進行確認 self.channel.basic_ack(delivery_tag=method.delivery_tag) def receive_msg(self): print('開始接受消息...') self.channel.basic_qos(prefetch_count=1) self.channel.basic_consume( consumer_callback=self.call_back, queue='task_queue', no_ack=False, # 消費者對消息進行確認 consumer_tag=str(uuid.uuid4()) ) def consume(self): self.receive_msg() self.channel.start_consuming() if __name__ == '__main__': rabbit_consumer = RabbitServer() rabbit_consumer.consume()
生產者實現:spa
""" 基於RabbitMQ實現RPC通訊機制 --> 客戶端 """ import pika import uuid import time class RabbitClient(object): def __init__(self): # 與RabbitMq服務器創建鏈接 self.conn = pika.BlockingConnection( pika.ConnectionParameters(host='localhost', port=5672) ) self.channel = self.conn.channel() # 聲明一個exchange交換機,交換機的類型爲topic self.channel.exchange_declare( exchange='logs_rpc', exchange_type='topic', durable=True ) # 聲明一個回調隊列,用於接受RPC回調結果的運行結果 result = self.channel.queue_declare(durable=True, exclusive=False) self.call_queue = result.method.queue # 從回調隊列當中獲取運行結果. self.channel.basic_consume( consumer_callback=self.on_response, queue=self.call_queue, no_ack=False ) def on_response(self, channel, method, properties, body): """ 對收到的消息進行確認 找到correlation_id與服務端的消息標識匹配的消息結果 :param channel: :param method: :param properties: :param body: :return: """ if self.corr_id == properties.correlation_id: self.response = body print('斐波那契數列的RPC返回結果是:{}'.format(body)) print('相關屬性信息:') print(properties) self.channel.basic_ack(delivery_tag=method.delivery_tag) def send_msg(self, routing_key, message): """ exchange交換機將根據消息的路由鍵將消息路由到對應的queue當中 :param routing_key: 消息的路由鍵 :param message: 生成者發送的消息 :return: """ self.response = None self.corr_id = str(uuid.uuid4()) self.channel.basic_publish( exchange='logs_rpc', routing_key=routing_key, body=message, properties=pika.BasicProperties( delivery_mode=2, correlation_id=self.corr_id, reply_to=self.call_queue, )) while self.response is None: print('等待遠程服務端的返回結果...') self.conn.process_data_events() # 非阻塞式的不斷獲取消息. return self.response def close(self): self.conn.close() if __name__ == "__main__": rabbit_producer = RabbitClient() routing_key = 'hello every one' start_time = int(time.time()) for item in range(2000): num = str(item) print('生產者發送的消息爲:{}'.format(num)) rabbit_producer.send_msg(routing_key, num) end_time = int(time.time()) print("耗時{}s".format(str(end_time - start_time)))
計算2000之內的斐波拉契數列,執行結果以下:code