1.前一篇介紹了 RabbitMQ 中的消息確認機制;
2.本篇主要介紹一下使用 SpringBoot + RabbitMQ 怎麼實現 RPC,且詳細記錄了可能遇到的坑及解決辦法;
3.在文末提供完整實例代碼下載地址。
(RPC)Remote Procedure Call Protocol 遠程過程調用協議。通俗一點解釋就是 容許一臺計算機程序遠程調用另一臺計算機的子程序,而不用去關心底層網絡通訊。
在一個大型的公司,系統每每是由大大小小的服務構成,不一樣的團隊維護不一樣的代碼,且部署在不一樣的機器上;
可是在作開發時候每每須要調用其餘團隊開發的方法,因爲這些服務部署在不一樣的機器上,想要調用就須要網絡通訊,並且效率優點將是須要考慮的很是重要的一塊;
這個時候 RPC 的優點就比較明顯了(RPC 主要是基於 TCP/IP 協議的,HTTP 服務主要是基於HTTP協議,在傳輸層協議 TCP 之上的)。
在 RabbitMQ 中實現 RPC 的流程很簡單:git
macOS Sierra + SpringBoot2.1.8.RELEASE + RabbitMQ 3.8.3 + Erlang 22.3.3github
server.port=10420 spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.username=guest spring.rabbitmq.password=guest \# 開啓發送確認 spring.rabbitmq.publisher-confirms=true \# 開啓發送失敗退回(消息有沒有找到合適的隊列) spring.rabbitmq.publisher-returns=true
/** * RPC客戶端 * * @author lyf * @公衆號 全棧在路上 * @GitHub https://github.com/liuyongfei1 * @date 2020-05-25 17:20 */ @Slf4j @Configuration public class RabbitConfig { /** * 設置同步RPC隊列 */ @Bean public Queue syncRPCQueue() { return new Queue(QueueConstants.RPC_QUEUE1); } /** * 設置返回隊列 */ @Bean public Queue replyQueue() { return new Queue(QueueConstants.RPC_QUEUE2); } /** * 設置交換機 */ @Bean public TopicExchange exchange() { return new TopicExchange(QueueConstants.RPC_EXCHANGE); } /** * 請求隊列和交換器綁定 */ @Bean public Binding tmpBinding() { return BindingBuilder.bind(syncRPCQueue()).to(exchange()).with(QueueConstants.RPC_QUEUE1); } /** * 返回隊列和交換器綁定 */ @Bean public Binding replyBinding() { return BindingBuilder.bind(replyQueue()).to(exchange()).with(QueueConstants.RPC_QUEUE2); } /** * 使用 RabbitTemplate發送和接收消息 * 並設置回調隊列地址 */ @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate template = new RabbitTemplate(connectionFactory); // 設置回調隊列地址 template.setReplyAddress(QueueConstants.RPC_QUEUE2); // 設置請求超時時間爲6s template.setReplyTimeout(60000); return template; } /** * 給返回隊列設置監聽器 */ @Bean public SimpleMessageListenerContainer replyContainer(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setQueueNames(QueueConstants.RPC_QUEUE2); container.setMessageListener(rabbitTemplate(connectionFactory)); return container; } }
備註:spring
/** * RPC客戶端 * * @author lyf * @公衆號 全棧在路上 * @GitHub https://github.com/liuyongfei1 * @date 2020-05-25 19:30 */ @Slf4j @RestController public class RPCClient { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/sendMessage") public String send(String message) { // 封裝Message,直接發送message對象 Message newMessage = convertMessage(message); log.info("客戶端發送的消息:" + newMessage.toString()); // 備註:使用sendAndReceive 這個方法發送消息時,消息的correlationId會變成系統動編制的 1,2,3 這種格式,所以經過手動set的方式沒有用 Message result = rabbitTemplate.sendAndReceive(QueueConstants.RPC_EXCHANGE, QueueConstants.RPC_QUEUE1, newMessage); String response = ""; if (result != null) { // 獲取已發送的消息的惟一消息id String correlationId = newMessage.getMessageProperties().getCorrelationId(); // 提取RPC迴應內容的header HashMap<String, Object> headers = (HashMap<String, Object>) result.getMessageProperties().getHeaders(); // 獲取RPC迴應消息的消息id(備註:rabbitmq的配置參數裏面必須開啓spring.rabbitmq.publisher-confirms=true,不然headers裏沒有該項) String msgId = (String) headers.get("spring_returned_message_correlation"); // 客戶端從回調隊列獲取消息,匹配與發送消息correlationId相同的消息爲應答結果 if (msgId.equals(correlationId)) { // 提取RPC迴應內容body response = new String(result.getBody()); log.info("收到RPCServer返回的消息爲:" + response); } } return response; } /** * 將發送消息封裝成Message * * @param message * @return org.springframework.amqp.core.Message * @Author Liuyongfei * @Date 下午1:23 2020/5/27 **/ public Message convertMessage(String message) { MessageProperties mp = new MessageProperties(); byte[] src = message.getBytes(Charset.forName("UTF-8")); // 注意:因爲在發送消息的時候,系統會自動生成消息惟一id,所以在這裏手動設置的方式是無效的 // CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString()); // mp.setCorrelationId("123456"); mp.setContentType("application/json"); mp.setContentEncoding("UTF-8"); mp.setContentLength((long) message.length()); return new Message(src, mp); } }
server.port=10420 spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.username=guest spring.rabbitmq.password=guest # 開啓發送確認 spring.rabbitmq.publisher-confirms=true # 開啓發送失敗退回(消息有沒有找到合適的隊列) spring.rabbitmq.publisher-returns=true
代碼同 RPC 客戶端的 rabbitmqConfig 配置類。json
/** * RPC服務端 * * @author lyf * @公衆號 全棧在路上 * @GitHub https://github.com/liuyongfei1 * @date 2020-05-25 22:00 */ @Slf4j @Component public class RPCServer { @Autowired private RabbitTemplate rabbitTemplate; @RabbitListener(queues = QueueConstants.RPC_QUEUE1) public void process(Message msg) { log.info("Server收到發送的消息爲: " + msg.toString()); int millis = (int) (Math.random() * 2 * 1000); // 模擬處理業務邏輯 try { Thread.sleep(millis); } catch (InterruptedException e) { e.printStackTrace(); } // 數據處理,返回Message String msgBody = new String(msg.getBody()); String newMessage = msgBody + ",sleep " + millis + " ms。"; Message response = convertMessage(newMessage, msg.getMessageProperties().getCorrelationId()); CorrelationData correlationData = new CorrelationData(msg.getMessageProperties().getCorrelationId()); rabbitTemplate.sendAndReceive(QueueConstants.RPC_EXCHANGE, QueueConstants.RPC_QUEUE2, response, correlationData); } @RabbitListener(queues = QueueConstants.RPC_QUEUE2) public void receiveTopic2(Message msg) { System.out.println("...隊列2:" + msg.toString()); } /** * 封裝消息 * * @param s 消息 * @param id 消息id * @return org.springframework.amqp.core.Message * @Author Liuyongfei * @Date 下午1:25 2020/5/27 **/ public Message convertMessage(String s, String id) { MessageProperties mp = new MessageProperties(); byte[] src = s.getBytes(Charset.forName("UTF-8")); mp.setContentType("application/json"); mp.setContentEncoding("UTF-8"); mp.setCorrelationId(id); return new Message(src, mp); } }
啓動 RPC 客戶端服務,使用postman 請求發送消息接口,發送一個 hello
字符串:微信
啓動 RPC 服務端服務,經過打斷點,查看收到的消息格式:網絡
從圖中咱們能夠看出:app
在服務端,處理相關的業務邏輯後,須要將消息經過指定的回調隊列發送給客戶端。 一樣是經過藉助 sendAndReceive 來發送消息:負載均衡
// 數據處理,返回Message String msgBody = new String(msg.getBody()); String newMessage = msgBody + ",sleep " + millis + " ms。"; Message response = convertMessage(newMessage, msg.getMessageProperties().getCorrelationId()); CorrelationData correlationData = new CorrelationData(msg.getMessageProperties().getCorrelationId()); rabbitTemplate.sendAndReceive(QueueConstants.RPC_EXCHANGE, QueueConstants.RPC_QUEUE2, response, correlationData);
RPC_QUEUE2
);correlationData
,不然客戶端有可能收不到數據;correlationData
;因爲客戶端已經設置了回調隊列監聽器,所以能夠監聽到 RPC 服務端返回的消息:dom
RPC 客戶端從回調隊列中根據標籤的信息(檢查 correlationId 屬性,若是與發送的消息 correlationId 匹配)獲取發送消息的返回結果,主要代碼以下:異步
// 獲取已發送的消息的惟一消息id String correlationId = newMessage.getMessageProperties().getCorrelationId(); // 提取RPC迴應內容的header HashMap<String, Object> headers = (HashMap<String, Object>) result.getMessageProperties().getHeaders(); // 獲取RPC迴應消息的消息id(備註:rabbitmq的配置參數裏面必須開啓spring.rabbitmq.publisher-confirms=true,不然headers裏沒有該項) String msgId = (String) headers.get("spring_returned_message_correlation"); // 客戶端從回調隊列獲取消息,匹配與發送消息correlationId相同的消息爲應答結果 if (msgId.equals(correlationId)) { // 提取RPC迴應內容body response = new String(result.getBody()); log.info("收到RPCServer返回的消息爲:" + response); }
備註:
那麼去確認一下在 `application.properties`裏是否開啓了發送確認:
# 開啓發送確認 spring.rabbitmq.publisher-confirms=true # 開啓發送失敗退回(消息有沒有找到合適的隊列) spring.rabbitmq.publisher-returns=true
feature/rabbitmq-rpc
分支進行啓動測試。