使用 RabbitMQ 實現 RPC

使用 RabbitMQ 實現 RPC

 

背景知識spring

RabbitMQ性能優化

RabbitMQ 是基於 AMQP 協議實現的一個消息隊列(Message Queue),Message Queue 是一個典型的生產者/消費者模式。生產者發佈消息,消費者消費消息,生產者和消費者之間是解耦的,互相不知道對方的存在。服務器

使用 RabbitMQ 實現 RPC

 

RPC架構

Remote Procedure Call:遠程過程調用,一次遠程過程調用的流程即客戶端發送一個請求到服務端,服務端根據請求信息進行處理後返回響應信息,客戶端收到響應信息後結束。併發

使用 RabbitMQ 實現 RPC

 

如何使用 RabbitMQ 實現 RPC?負載均衡

使用 RabbitMQ 實現 RPC,相應的角色是由生產者來做爲客戶端,消費者做爲服務端。異步

但 RPC 調用通常是同步的,客戶端和服務器也是緊密耦合的。即客戶端經過 IP/域名和端口連接到服務器,向服務器發送請求後等待服務器返回響應信息。分佈式

但 MQ 的生產者和消費者是徹底解耦的,那麼如何用 MQ 實現 RPC 呢?很明顯就是把 MQ 看成中間件實現一次雙向的消息傳遞:ide

使用 RabbitMQ 實現 RPC

 

客戶端和服務端便是生產者也是消費者。客戶端發佈請求,消費響應;服務端消費請求,發佈響應。微服務

具體實現

MQ部分的定義

請求信息的隊列

咱們須要一個隊列來存放請求信息,客戶端向這個隊列發佈請求信息,服務端消費該隊列處理請求。該隊列不須要複雜的路由規則,直接使用 RabbitMQ 默認的 direct exchange 來路由消息便可。

響應信息的隊列

存放響應信息的隊列不該只有一個。若是存在多個客戶端,不能保證響應信息被髮布請求的那個客戶端消費到。因此應爲每個客戶端建立一個響應隊列,這個隊列應該由客戶端來建立且只能由這個客戶端使用並在使用完畢後刪除,這裏可使用 RabbitMQ 提供的排他隊列(Exclusive Queue):

channel.queueDeclare(queue:"", durable:false, exclusive:true, autoDelete:false, new HashMap<>())

而且要保證隊列名惟一,聲明隊列時名稱設爲空 RabbitMQ 會生成一個惟一的隊列名。

exclusive 設爲 true 表示聲明一個排他隊列,排他隊列的特色是隻能被當前的鏈接使用,而且在鏈接關閉後被刪除。

一個簡單的 demo(使用 pull 機制)

咱們使用一個簡單的 demo 來了解客戶端和服務端的處理流程。

發佈請求

  • 編寫代碼前的一個小問題

咱們在聲明隊列時爲每個客戶端聲明瞭獨有的響應隊列,那服務器在發佈響應時如何知道發佈到哪一個隊列呢?其實就是客戶端須要告訴服務端將響應發佈到哪一個隊列,RabbitMQ 提供了這個支持,消息體的 Properties 中有一個屬性 reply_to 就是用來標記回調隊列的名稱,服務器須要將響應發佈到 reply_to 指定的回調隊列中。

解決了這個問題以後咱們就能夠編寫客戶端發佈請求的代碼了:

// 定義響應回調隊列
String replyQueueName = channel.queueDeclare("", false, true, false, new HashMap<>()).getQueue();
// 設置回調隊列到 Properties
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
 .replyTo(replyQueueName)
 .build();
String request = "request";
// 發佈請求
channel.basicPublish("", "rpc_queue", properties, request.getBytes());

Direct reply-to:

RabbitMQ 提供了一種更便捷的機制來實現 RPC,不須要客戶端每次都定義回調隊列,客戶端發佈請求時將 replyTo 設爲 amq.rabbitmq.reply-to ,消費響應時也指定消費 amq.rabbitmq.reply-to ,RabbitMQ 會爲客戶端建立一個內部隊列

消費請求

接下來是服務端處理請求的部分,接收到請求後通過處理將響應信息發佈到 reply_to 指定的回調隊列:

// 服務端 Consumer 的定義
public class RpcServer extends DefaultConsumer {
 public RpcServer(Channel channel) {
 super(channel);
 }
 @Override
 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
 String msg = new String(body);
 String response = (msg + " Received");
 // 獲取回調隊列名
 String replyTo = properties.getReplyTo();
 // 發佈響應消息到回調隊列
 this.getChannel().basicPublish("", replyTo, new AMQP.BasicProperties(), response.getBytes());
 }
}
...
// 啓動服務端 Consumer
channel.basicConsume("rpc_queue", true, new RpcServer(channel));

接收響應

客戶端如何接收服務器的響應呢?有兩種方式:1.輪詢的去 pull 回調隊列中的消息,2.異步的消費回調隊列中的消息。咱們在這裏簡單實現第一種方案。

GetResponse getResponse = null;
while (getResponse == null) {
 getResponse = channel.basicGet(replyQueueName, true);
}
String response = new String(getResponse.getBody());

一個簡單的基於 RabbitMQ 的 RPC 模型已經實現了,但這個 demo 並不實用,由於客戶端每次發送完請求都要同步的輪詢等待響應消息,只能每次處理一個請求。RabbitMQ 的 pull 模式效率也比較低。

實現一個完備可用的 RPC 模式須要作的工做還有不少,要處理的關鍵點也比較複雜,有句話叫不要重複造輪子,spring 已經實現了一個完備可用的 RPC 模式的庫,接下來咱們來了解一下。順便在此給你們推薦一個Java架構方面的交流學習羣:698581634,進羣便可獲取Java架構師資料:有Spring,MyBatis,Netty源碼分析,高併發、高性能、分佈式、微服務架構的原理,JVM性能優化這些成爲架構師必備的知識體系,羣裏必定有你須要的資料,你們趕忙加羣吧。

Spring Rabbit 中的實現

和上面 demo 的 pull 模式一次只能處理一個請求相對應的:如何異步的接收響應並處理多個請求呢?關鍵點就在於咱們須要記錄請求和響應並將它們關聯起來,RabbitMQ 也提供了支持,Properties 中的另外一個屬性 correlation_id 用來標識一個消息的惟一 id。

參考 spring-rabbit 中的 convertSendAndReceive 方法的實現,爲每一次請求生成一個惟一的 correlation_id :

private final AtomicInteger messageTagProvider = new AtomicInteger();
...
String messageTag = String.valueOf(this.messageTagProvider.incrementAndGet());
...
message.getMessageProperties().setCorrelationId(messageTag);

並使用一個 ConcurrentHashMap 來維護 correlation_id 和響應信息的映射:

private final Map<String, PendingReply> replyHolder = new ConcurrentHashMap<String, PendingReply>();
...
final PendingReply pendingReply = new PendingReply();
this.replyHolder.put(correlationId, pendingReply);

PendingReply 中有一個 BlockingQueue 存放響應信息,在發送完請求信息後調用 BlockingQueue 的 pull 方法並設置超時時間來獲取響應:

private final BlockingQueue<Object> queue = new ArrayBlockingQueue<Object>(1);

public Message get ( long timeout , TimeUnit unit ) throws InterruptedException { Object reply = this . queue . poll ( timeout , unit ); return reply == null ? null : processReply ( reply

);

}

在獲取響應後不論結果如何,都會將 PendingReply 從 replyHolder 中移除,防止 replyHolder 中積壓超時的響應消息:

try {
 reply = exchangeMessages(exchange, routingKey, message, correlationData, channel, pendingReply,messageTag);
} finally {
 this.replyHolder.remove(messageTag);
 ...
}

響應信息是什麼時候如何被放到這個 BlockingQueue 中的呢?看一下 RabbitTemplate 接收消息的地方:

public void onMessage(Message message) {
String messageTag;
 if (this.correlationKey == null) { // using standard correlationId property
 messageTag = message.getMessageProperties().getCorrelationId();
 } else {
 messageTag = (String) message.getMessageProperties()
 .getHeaders().get(this.correlationKey);
 }
 // 存在 correlation_id 才認爲是RPC的響應信息,不存在時不處理
 if (messageTag == null) {
 logger.error("No correlation header in reply");
 return;
 }
 // 從 replyHolder 中取出 correlation_id 對應的 PendingReply
 PendingReply pendingReply = this.replyHolder.get(messageTag);
 if (pendingReply == null) {
 if (logger.isWarnEnabled()) {
 logger.warn("Reply received after timeout for " + messageTag);
 }
 throw new AmqpRejectAndDontRequeueException("Reply received after timeout");
 }
 else {
 restoreProperties(message, pendingReply);
 // 將響應信息 add 到 BlockingQueue 中
 pendingReply.reply(message);
 }
}

以上的 spring 代碼隱去了不少額外部分的處理和細節,只關注關鍵的部分。

至此一個完整可用的由 RabbitMQ 做爲中間件實現的 RPC 模式就完成了。

總結

服務端

服務端的實現比較簡單,和通常的 Consumer 的區別只在於須要將請求回覆到 replyTo 指定的 queue 中並帶上消息標識 correlation_id 便可

服務端的一點小優化:

超時的處理是由客戶端來實現的,那服務端有沒有能夠優化的地方呢?

答案是有的:若是咱們的服務端處理比較耗時,如何判斷客戶端是否還在等待響應呢?

咱們可使用 passive 參數去檢查 replyTo 的 queue 是否存在,由於客戶端聲明的是內部隊列,客戶端若是斷掉連接了這個 queue 就不存在了,這時服務端就無需處理這個消息了。

客戶端

客戶端承擔了更多的工做量,包括:

  • 聲明 replyTo 隊列(使用 amq.rabbitmq.reply-to 會簡單不少)
  • 維護請求和響應消息(使用惟一的 correlation_id 來關聯)
  • 消費服務端的返回
  • 處理超時等異常狀況(使用BlockingQueue來阻塞獲取)

好在 spring 已經實現了一套完備可靠的代碼,咱們在清楚了流程和關鍵點以後,能夠直接使用 spring 提供的 RabbitTemplate ,無需本身實現。

使用 MQ 實現 RPC 的意義

經過 MQ 實現 RPC 看起來比客戶端和服務器直接通信要複雜一些,那咱們爲何要這樣作呢?或者說這樣作有什麼好處:

  1. 將客戶端和服務器解耦:客戶端只是發佈一個請求到 MQ 並消費這個請求的響應。並不關心具體由誰來處理這個請求,MQ 另外一端的請求的消費者能夠隨意替換成任何能夠處理請求的服務器,並不影響到客戶端。
  2. 減輕服務器的壓力:傳統的 RPC 模式中若是客戶端和請求過多,服務器的壓力會過大。由 MQ 做爲中間件的話,過多的請求而是被 MQ 消化掉,服務器能夠控制消費請求的頻次,並不會影響到服務器。
  3. 服務器的橫向擴展更加容易:若是服務器的處理能力不能知足請求的頻次,只須要增長服務器來消費 MQ 的消息便可,MQ會幫咱們實現消息消費的負載均衡。
  4. 能夠看出 RabbitMQ 對於 RPC 模式的支持也是比較友好地,
  5. amq.rabbitmq.reply-to , reply_to , correlation_id 這些特性都說明了這一點,再加上 spring-rabbit 的實現,可讓咱們很簡單的使用消息隊列模式的 RPC 調用。

原文連接:https://mp.weixin.qq.com/s/40SIlST9JNgBvq276ERWTA?utm_source=tuicool&utm_medium=referral

全文完

相關文章
相關標籤/搜索