背景知識spring
RabbitMQ性能優化
RabbitMQ 是基於 AMQP 協議實現的一個消息隊列(Message Queue),Message Queue 是一個典型的生產者/消費者模式。生產者發佈消息,消費者消費消息,生產者和消費者之間是解耦的,互相不知道對方的存在。服務器
RPC架構
Remote Procedure Call:遠程過程調用,一次遠程過程調用的流程即客戶端發送一個請求到服務端,服務端根據請求信息進行處理後返回響應信息,客戶端收到響應信息後結束。併發
如何使用 RabbitMQ 實現 RPC?負載均衡
使用 RabbitMQ 實現 RPC,相應的角色是由生產者來做爲客戶端,消費者做爲服務端。異步
但 RPC 調用通常是同步的,客戶端和服務器也是緊密耦合的。即客戶端經過 IP/域名和端口連接到服務器,向服務器發送請求後等待服務器返回響應信息。分佈式
但 MQ 的生產者和消費者是徹底解耦的,那麼如何用 MQ 實現 RPC 呢?很明顯就是把 MQ 看成中間件實現一次雙向的消息傳遞:ide
客戶端和服務端便是生產者也是消費者。客戶端發佈請求,消費響應;服務端消費請求,發佈響應。微服務
具體實現
MQ部分的定義
請求信息的隊列
咱們須要一個隊列來存放請求信息,客戶端向這個隊列發佈請求信息,服務端消費該隊列處理請求。該隊列不須要複雜的路由規則,直接使用 RabbitMQ 默認的 direct exchange 來路由消息便可。
響應信息的隊列
存放響應信息的隊列不該只有一個。若是存在多個客戶端,不能保證響應信息被髮布請求的那個客戶端消費到。因此應爲每個客戶端建立一個響應隊列,這個隊列應該由客戶端來建立且只能由這個客戶端使用並在使用完畢後刪除,這裏可使用 RabbitMQ 提供的排他隊列(Exclusive Queue):
channel.queueDeclare(queue:"", durable:false, exclusive:true, autoDelete:false, new HashMap<>())
而且要保證隊列名惟一,聲明隊列時名稱設爲空 RabbitMQ 會生成一個惟一的隊列名。
exclusive 設爲 true 表示聲明一個排他隊列,排他隊列的特色是隻能被當前的鏈接使用,而且在鏈接關閉後被刪除。
一個簡單的 demo(使用 pull 機制)
咱們使用一個簡單的 demo 來了解客戶端和服務端的處理流程。
發佈請求
咱們在聲明隊列時爲每個客戶端聲明瞭獨有的響應隊列,那服務器在發佈響應時如何知道發佈到哪一個隊列呢?其實就是客戶端須要告訴服務端將響應發佈到哪一個隊列,RabbitMQ 提供了這個支持,消息體的 Properties 中有一個屬性 reply_to 就是用來標記回調隊列的名稱,服務器須要將響應發佈到 reply_to 指定的回調隊列中。
解決了這個問題以後咱們就能夠編寫客戶端發佈請求的代碼了:
// 定義響應回調隊列 String replyQueueName = channel.queueDeclare("", false, true, false, new HashMap<>()).getQueue(); // 設置回調隊列到 Properties AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() .replyTo(replyQueueName) .build(); String request = "request"; // 發佈請求 channel.basicPublish("", "rpc_queue", properties, request.getBytes());
Direct reply-to:
RabbitMQ 提供了一種更便捷的機制來實現 RPC,不須要客戶端每次都定義回調隊列,客戶端發佈請求時將 replyTo 設爲 amq.rabbitmq.reply-to ,消費響應時也指定消費 amq.rabbitmq.reply-to ,RabbitMQ 會爲客戶端建立一個內部隊列
消費請求
接下來是服務端處理請求的部分,接收到請求後通過處理將響應信息發佈到 reply_to 指定的回調隊列:
// 服務端 Consumer 的定義 public class RpcServer extends DefaultConsumer { public RpcServer(Channel channel) { super(channel); } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body); String response = (msg + " Received"); // 獲取回調隊列名 String replyTo = properties.getReplyTo(); // 發佈響應消息到回調隊列 this.getChannel().basicPublish("", replyTo, new AMQP.BasicProperties(), response.getBytes()); } } ... // 啓動服務端 Consumer channel.basicConsume("rpc_queue", true, new RpcServer(channel));
接收響應
客戶端如何接收服務器的響應呢?有兩種方式:1.輪詢的去 pull 回調隊列中的消息,2.異步的消費回調隊列中的消息。咱們在這裏簡單實現第一種方案。
GetResponse getResponse = null; while (getResponse == null) { getResponse = channel.basicGet(replyQueueName, true); } String response = new String(getResponse.getBody());
一個簡單的基於 RabbitMQ 的 RPC 模型已經實現了,但這個 demo 並不實用,由於客戶端每次發送完請求都要同步的輪詢等待響應消息,只能每次處理一個請求。RabbitMQ 的 pull 模式效率也比較低。
實現一個完備可用的 RPC 模式須要作的工做還有不少,要處理的關鍵點也比較複雜,有句話叫不要重複造輪子,spring 已經實現了一個完備可用的 RPC 模式的庫,接下來咱們來了解一下。順便在此給你們推薦一個Java架構方面的交流學習羣:698581634,進羣便可獲取Java架構師資料:有Spring,MyBatis,Netty源碼分析,高併發、高性能、分佈式、微服務架構的原理,JVM性能優化這些成爲架構師必備的知識體系,羣裏必定有你須要的資料,你們趕忙加羣吧。
Spring Rabbit 中的實現
和上面 demo 的 pull 模式一次只能處理一個請求相對應的:如何異步的接收響應並處理多個請求呢?關鍵點就在於咱們須要記錄請求和響應並將它們關聯起來,RabbitMQ 也提供了支持,Properties 中的另外一個屬性 correlation_id 用來標識一個消息的惟一 id。
參考 spring-rabbit 中的 convertSendAndReceive 方法的實現,爲每一次請求生成一個惟一的 correlation_id :
private final AtomicInteger messageTagProvider = new AtomicInteger(); ... String messageTag = String.valueOf(this.messageTagProvider.incrementAndGet()); ... message.getMessageProperties().setCorrelationId(messageTag);
並使用一個 ConcurrentHashMap 來維護 correlation_id 和響應信息的映射:
private final Map<String, PendingReply> replyHolder = new ConcurrentHashMap<String, PendingReply>(); ... final PendingReply pendingReply = new PendingReply(); this.replyHolder.put(correlationId, pendingReply);
PendingReply 中有一個 BlockingQueue 存放響應信息,在發送完請求信息後調用 BlockingQueue 的 pull 方法並設置超時時間來獲取響應:
private final BlockingQueue<Object> queue = new ArrayBlockingQueue<Object>(1);
public Message get ( long timeout , TimeUnit unit ) throws InterruptedException { Object reply = this . queue . poll ( timeout , unit ); return reply == null ? null : processReply ( reply
);
}
在獲取響應後不論結果如何,都會將 PendingReply 從 replyHolder 中移除,防止 replyHolder 中積壓超時的響應消息:
try { reply = exchangeMessages(exchange, routingKey, message, correlationData, channel, pendingReply,messageTag); } finally { this.replyHolder.remove(messageTag); ... }
響應信息是什麼時候如何被放到這個 BlockingQueue 中的呢?看一下 RabbitTemplate 接收消息的地方:
public void onMessage(Message message) { String messageTag; if (this.correlationKey == null) { // using standard correlationId property messageTag = message.getMessageProperties().getCorrelationId(); } else { messageTag = (String) message.getMessageProperties() .getHeaders().get(this.correlationKey); } // 存在 correlation_id 才認爲是RPC的響應信息,不存在時不處理 if (messageTag == null) { logger.error("No correlation header in reply"); return; } // 從 replyHolder 中取出 correlation_id 對應的 PendingReply PendingReply pendingReply = this.replyHolder.get(messageTag); if (pendingReply == null) { if (logger.isWarnEnabled()) { logger.warn("Reply received after timeout for " + messageTag); } throw new AmqpRejectAndDontRequeueException("Reply received after timeout"); } else { restoreProperties(message, pendingReply); // 將響應信息 add 到 BlockingQueue 中 pendingReply.reply(message); } }
以上的 spring 代碼隱去了不少額外部分的處理和細節,只關注關鍵的部分。
至此一個完整可用的由 RabbitMQ 做爲中間件實現的 RPC 模式就完成了。
總結
服務端
服務端的實現比較簡單,和通常的 Consumer 的區別只在於須要將請求回覆到 replyTo 指定的 queue 中並帶上消息標識 correlation_id 便可
服務端的一點小優化:
超時的處理是由客戶端來實現的,那服務端有沒有能夠優化的地方呢?
答案是有的:若是咱們的服務端處理比較耗時,如何判斷客戶端是否還在等待響應呢?
咱們可使用 passive 參數去檢查 replyTo 的 queue 是否存在,由於客戶端聲明的是內部隊列,客戶端若是斷掉連接了這個 queue 就不存在了,這時服務端就無需處理這個消息了。
客戶端
客戶端承擔了更多的工做量,包括:
好在 spring 已經實現了一套完備可靠的代碼,咱們在清楚了流程和關鍵點以後,能夠直接使用 spring 提供的 RabbitTemplate ,無需本身實現。
使用 MQ 實現 RPC 的意義
經過 MQ 實現 RPC 看起來比客戶端和服務器直接通信要複雜一些,那咱們爲何要這樣作呢?或者說這樣作有什麼好處:
原文連接:https://mp.weixin.qq.com/s/40SIlST9JNgBvq276ERWTA?utm_source=tuicool&utm_medium=referral
全文完