rabbitmq direct reply-to 在springAMQP和python之間的使用

背景

公司的一個項目使用rabbitmq做爲broker進行交互,而且數據的查詢方法使用RPC模式,RPC Client端使用java編寫並使用springAMQP包與rabbitmq交互,在RPC Server端使用python的 pika包與rabbitmq交互。兩端都使用標準官方例程,發如今Client端發送的消息能夠被Server端接收並處理而後返回結果,可是Client端只會會收到一個null值。html

問題排查

1 理解傳統的RPC模式運行流程

傳統模式下 Client端向一個指定的隊列裏推送消息,並聲明一個一次性排他隊列,而後將發送消息頭部的reply-to屬性的值設置爲隊列的名字,correlation_id屬性設置爲一個隨機生成的值用於消息鑑定而後發送消息。在發送後Client端監聽聲明的排他隊列,當收到消息後比對correaltiion_id,正確則處理消息斷開監聽鏈接,而後此隊列被系統自動回收。 在Server端收到消息後處理消息而後將消息返回,返回的消息的routing-key設置爲reply-to的值,properties中設置correlation_id爲收到的correlation_id值。這樣就完成一次RPC交互模式。
要解決今天這個問題咱們還要知道幾個知識點:java

  • 1當消息發送到exchange後若是沒有隊列接收此消息,那麼此消息就會丟失。
  • 2 一次性的排他隊列在Client不在監聽此隊列就會自動被rabbitmq刪除。

排查1 Client端收到的Null值從哪裏來?

由於我是使用python寫RPC Server端而且我也不怎麼會java代碼。……
因此這個null值從那裏來我就沒法從Client端下手。那咱們只能從Server端進行排查。(最後我認爲是在java代碼編寫錯誤(是本身的代碼)的狀況下 springAMQP返回的一個默認值)python

排查2 Server端收到消息後是否正確的將消息返回

在Server端打印收到的message並打印此消息的header信息和body信息,看到在reply-to中就是Client端設置的隊列。而且經過rabbitmq也看到了這條消息的返回。spring

排查3 觀察消息有沒有被推送回reply-to隊列

而後我在Server端收到消息後的callback函數的頭部大了斷點,接收到消息後Server端程序掛起。此時我去查看reply-to中的隊列,發現其已經不存在於rabbitmq中了。 由上面的傳統RPC模式我推斷出 多是Client端發送代碼後沒有監聽reply-to隊列形成隊列消失,而後Server端發送的消息由於沒有接收隊列而被丟棄。此時咱們基本已經將問題鎖定在Client端了。可是Client端的代碼是按照rabbitmq官方給的例程書寫,應該是沒有問題的。此時彷佛陷入了僵局。服務器

定位問題:Google大發加官方文檔

這時候我Google一下SpringAMQP框架的是如何寫RPC代碼?在一些帖子中我發現有的代碼會添加一個Listener的類,但有的又不添加。咱們假設他們都是能夠運行的。那麼是什麼緣由會形成這種狀況呢?我第一個就是想到了版本問題。隨着版本的改變可能代碼也會發生變化。以後我就在SpringAMQP的官方文檔裏面進行查找。果真被我找到了,官方文檔裏面有這樣一段描述:框架

Starting with version 3.4.0, the RabbitMQ server now supports Direct reply-to; this eliminates the main reason for a fixed reply queue (to avoid the need to create a temporary queue for each request). Starting with Spring AMQP version 1.4.1 Direct reply-to will be used by default (if supported by the server) instead of creating temporary reply queues. When no replyQueue  is provided (or it is set with the name amq.rabbitmq.reply-to), the RabbitTemplate will automatically detect whether Direct reply-to is supported and either use it or fall back to using a temporary reply queue. When using Direct reply-to, a reply-listener is not required and should not be configured.ide

springAMQP官方地址
翻譯一下大致意思就是在RabbitMQ3.4.0版本之後官方提供一種叫作Direct reply-to的方式來實現RPC(這種方式能夠極大的提升RPC的性能,由於他不須要每次請求Client端都要新申請一個隊列,以後我會再寫一篇來詳細介紹(翻譯 o(∩_∩)o 哈哈 )這個特性。而且在SpringAMQP version 1.4.1版本以後默認使用特性,看了一下服務器上的rabbitmq版本3.3.0 這個真的老果真不支持,SpringAMQP的版本果真也是高於這個版本,問題找到。開心 , 可是怎麼解決呢?
Direct reply-to 官方介紹函數

解決方案

一: 提高rabbitmq版本,並使兩端代碼適配direct reply-to 方式

  • 難點1 python的官網沒有給例程 ,不過給了介紹也告訴瞭如何來實現
  • 難點2 服務器提高版本,已經有業務跑在上面了,我這種對rabbitmq的萌新對rabbitmq各版本升級後的改變並非很瞭解,估計是難說動領導換了。

針對難點2 我就不想了 不過難點1的我已經寫出來python如何適配direct reply-to的代碼。
更改都是在Client端,Server端仍是能夠保持不變。主要主機這幾個方面性能

  • 1 reply-to的名字更改成‘amq.rabbitmq.reply-to’這條虛擬隊列,你在rabbitmq的控制檯上是看不到這條隊列的。
  • 2 而後Client監聽這條隊列的時候要設爲爲no-ack模式。

下面是根據官方python RPC代碼更改的 適配 Direct reply-to的python代碼
Client端 python代碼fetch

# -*- coding:utf-8 -*-  
#!/usr/bin/env python
import pika
import uuid


class FibonacciRpcClient(object):
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

        self.channel = self.connection.channel()

        # result = self.channel.queue_declare(exclusive=True)
        # self.callback_queue = result.method.queue
        # self.channel.basic_consume(self.on_response, no_ack=True,
        #                            queue=self.callback_queue)
        # 監聽隊列爲 amp.rabbitmq.reply-to 啓動no_ack 模式
        self.channel.basic_consume(self.on_response,
                                   queue='amq.rabbitmq.reply-to',
                                   no_ack=True)

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body

    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(exchange='',
                                   routing_key='rpc_queue',
                                   properties=pika.BasicProperties(
                                         # reply_to = self.callback_queue,
                                         # 更改了隊列名字
                                         reply_to='amq.rabbitmq.reply-to',
                                         correlation_id=self.corr_id,
                                         ),
                                   body=str(n))
        while self.response is None:
            self.connection.process_data_events()
        return int(self.response)

fibonacci_rpc = FibonacciRpcClient()

print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print(" [.] Got %r" % response)

Server端代碼 沒有改動

#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

channel = connection.channel()

channel.queue_declare(queue='rpc_queue')

def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n-1) + fib(n-2)

def on_request(ch, method, props, body):
    n  = int(body)
    print(" [.] fib(%s)" % n)
    response = fib(n)
    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(correlation_id = \
                                                         props.correlation_id),
                     body=str(response))
    # ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue='rpc_queue')

print(" [x] Awaiting RPC requests")
channel.start_consuming()

解決辦法2 java代碼不使用默認的direct reply-to模式

這個辦法由於我不是寫java的因此我只能寫一些我在官方文檔裏面理解的東西了。就是當你不使用SpringAMQP的默認RPC模式的化須要增長Listener對象來監聽本身的隊列。

RabbitTemplate rabbitTempete=new RabbitTemplate(connectionFactory);  
            rabbitTempete.setExchange(exchangeName);  
            rabbitTempete.setRoutingKey(topic);  
           //比官方文檔多的
            Queue  replyqQueue=replyQueue();  
            admin.declareQueue(replyqQueue); 
            rabbitTempete.setReplyQueue(replyqQueue);  
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();  
            container.setConnectionFactory(connectionFactory);  
            container.setQueues(replyqQueue);  
            container.setMessageListener(rabbitTempete);  
            container.start();  
            //比官方文檔多的中止
            Object  response=rabbitTempete.convertSendAndReceive(t);

SpringAMQP書寫官方文檔
相比較要本身申請隊列本身監聽。不過我也沒試過這段代碼就不知道能不能用了。

總結

這個問題基本獲得很好的解決了。解決一個問題首先你要明白一個東西正常狀況下是一種什麼情況,而後出了問題就從前日後,從後往前,從中往兩邊等等等。而後Google,或者官方文檔,官方論壇。我我的認爲官方文檔真的是好東西。無數的淺坑的解決辦法都在官方文檔。固然深坑就不說了那就是論壇加能力加運氣才能排查出來的了。不過官方大多都是英文。真是愁人,我 增強英語能力吧。

相關文章
相關標籤/搜索