RabbitMQ系列之RPC實現

1.前一篇介紹了 RabbitMQ 中的消息確認機制;
2.本篇主要介紹一下使用 SpringBoot + RabbitMQ 怎麼實現 RPC,且詳細記錄了可能遇到的坑及解決辦法;
3.在文末提供完整實例代碼下載地址。

一. 什麼是RPC

(RPC)Remote Procedure Call Protocol 遠程過程調用協議。通俗一點解釋就是 容許一臺計算機程序遠程調用另一臺計算機的子程序,而不用去關心底層網絡通訊

二. 使用RPC場景

在一個大型的公司,系統每每是由大大小小的服務構成,不一樣的團隊維護不一樣的代碼,且部署在不一樣的機器上;
可是在作開發時候每每須要調用其餘團隊開發的方法,因爲這些服務部署在不一樣的機器上,想要調用就須要網絡通訊,並且效率優點將是須要考慮的很是重要的一塊;
這個時候 RPC 的優點就比較明顯了(RPC 主要是基於 TCP/IP 協議的,HTTP 服務主要是基於HTTP協議,在傳輸層協議 TCP 之上的)。

三. RabbitMQ實現RPC的流程

1. 流程

rabbitmq-rpc.jpeg

在 RabbitMQ 中實現 RPC 的流程很簡單:git

  1. 生產者(也稱 RPC 客戶端)發送一條帶有標籤(消息ID(correlation_id)+ 回調隊列名稱)的消息到發送隊列;
  2. 消費者(也稱 RPC 服務端)從發送隊列獲取消息並處理業務,解析標籤的信息將業務結果發送到指定的回調隊列;
  3. 生產者(也稱 RPC 客戶端)從回調隊列中根據標籤的信息(檢查correlationId 屬性,若是與request中匹配)獲取發送消息的返回結果。

2. 實現RPC的好處

  1. MQ 實現的 RPC 服務端高可用,只須要簡單地啓動多個 RPC 服務便可,不須要額外的服務註冊發現以及負載均衡;
  2. 若是原有的 MQ 的普通消息須要知道執行結果,能夠很方便地切換到 RPC 模式;
  3. RabbitMQ RPC 的工做方式很是擅長處理異步回調式的任務。

四. SpringBoot中使用RabbitMQ的RPC功能

環境介紹

macOS Sierra + SpringBoot2.1.8.RELEASE + RabbitMQ 3.8.3 + Erlang 22.3.3github

1. 客戶端

1.1 application.properties
server.port=10420 spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.username=guest spring.rabbitmq.password=guest \# 開啓發送確認 spring.rabbitmq.publisher-confirms=true \# 開啓發送失敗退回(消息有沒有找到合適的隊列) spring.rabbitmq.publisher-returns=true
1.2 rabbitmqConfig配置類
/**
 * RPC客戶端
 *
 * @author lyf
 * @公衆號 全棧在路上
 * @GitHub https://github.com/liuyongfei1
 * @date 2020-05-25 17:20
 */
@Slf4j
@Configuration
public class RabbitConfig {

    /**
     * 設置同步RPC隊列
     */
    @Bean
    public Queue syncRPCQueue() {
        return new Queue(QueueConstants.RPC_QUEUE1);
    }

    /**
     * 設置返回隊列
     */
    @Bean
    public Queue replyQueue() {
        return new Queue(QueueConstants.RPC_QUEUE2);
    }

    /**
     * 設置交換機
     */
    @Bean
    public TopicExchange exchange() {
        return new TopicExchange(QueueConstants.RPC_EXCHANGE);
    }

    /**
     * 請求隊列和交換器綁定
     */
    @Bean
    public Binding tmpBinding() {
        return BindingBuilder.bind(syncRPCQueue()).to(exchange()).with(QueueConstants.RPC_QUEUE1);
    }

    /**
     * 返回隊列和交換器綁定
     */
    @Bean
    public Binding replyBinding() {
        return BindingBuilder.bind(replyQueue()).to(exchange()).with(QueueConstants.RPC_QUEUE2);
    }


    /**
     * 使用 RabbitTemplate發送和接收消息
     * 並設置回調隊列地址
     */
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        // 設置回調隊列地址
        template.setReplyAddress(QueueConstants.RPC_QUEUE2);
        // 設置請求超時時間爲6s
        template.setReplyTimeout(60000);
        return template;
    }


    /**
     * 給返回隊列設置監聽器
     */
    @Bean
    public SimpleMessageListenerContainer replyContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(QueueConstants.RPC_QUEUE2);
        container.setMessageListener(rabbitTemplate(connectionFactory));
        return container;
    }
}

備註:spring

  • 這裏的隊列監聽器必不可少,不然客戶端是沒法收到服務端迴應的消息。
1.3 客戶端
/**
 * RPC客戶端
 *
 * @author lyf
 * @公衆號 全棧在路上
 * @GitHub https://github.com/liuyongfei1
 * @date 2020-05-25 19:30
 */
@Slf4j
@RestController
public class RPCClient {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/sendMessage")
    public String send(String message) {
        // 封裝Message,直接發送message對象
        Message newMessage = convertMessage(message);

        log.info("客戶端發送的消息:" + newMessage.toString());

        // 備註:使用sendAndReceive 這個方法發送消息時,消息的correlationId會變成系統動編制的 1,2,3 這種格式,所以經過手動set的方式沒有用
        Message result = rabbitTemplate.sendAndReceive(QueueConstants.RPC_EXCHANGE, QueueConstants.RPC_QUEUE1,
                newMessage);

        String response = "";
        if (result != null) {
            // 獲取已發送的消息的惟一消息id
            String correlationId = newMessage.getMessageProperties().getCorrelationId();

            // 提取RPC迴應內容的header
            HashMap<String, Object> headers = (HashMap<String, Object>) result.getMessageProperties().getHeaders();

            // 獲取RPC迴應消息的消息id(備註:rabbitmq的配置參數裏面必須開啓spring.rabbitmq.publisher-confirms=true,不然headers裏沒有該項)
            String msgId = (String) headers.get("spring_returned_message_correlation");

            // 客戶端從回調隊列獲取消息,匹配與發送消息correlationId相同的消息爲應答結果
            if (msgId.equals(correlationId)) {
                // 提取RPC迴應內容body
                response = new String(result.getBody());
                log.info("收到RPCServer返回的消息爲:" + response);
            }
        }
        return response;
    }

    /**
     * 將發送消息封裝成Message
     *
     * @param message
     * @return org.springframework.amqp.core.Message
     * @Author Liuyongfei
     * @Date 下午1:23 2020/5/27
     **/
    public Message convertMessage(String message) {
        MessageProperties mp = new MessageProperties();
        byte[] src = message.getBytes(Charset.forName("UTF-8"));
        // 注意:因爲在發送消息的時候,系統會自動生成消息惟一id,所以在這裏手動設置的方式是無效的
        // CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
        // mp.setCorrelationId("123456");
        mp.setContentType("application/json");
        mp.setContentEncoding("UTF-8");
        mp.setContentLength((long) message.length());
        return new Message(src, mp);
    }
}

你可能會遇到的坑1
  • 使用 sendAndReceive 這個方法發送消息時,消息的 correlationId 會變成系統自動生成的 1,2,3 這種格式,所以經過手動 set 的方式沒有用。
你可能會遇到的坑2
  • 所以爲了拿到當前已發送消息的 correlationId,只能在消息發送以後注意這裏必須在消息發送以後再獲取)經過 getMessageProperties().getCorrelationId() 的方式來獲取到;

2. 服務端

2.1 application.properties
server.port=10420
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
# 開啓發送確認
spring.rabbitmq.publisher-confirms=true
# 開啓發送失敗退回(消息有沒有找到合適的隊列)
spring.rabbitmq.publisher-returns=true
2.2 rabbitmqConfig配置類

代碼同 RPC 客戶端的 rabbitmqConfig 配置類。json

2.3 服務端
/**
 * RPC服務端
 *
 * @author lyf
 * @公衆號 全棧在路上
 * @GitHub https://github.com/liuyongfei1
 * @date 2020-05-25 22:00
 */
@Slf4j
@Component
public class RPCServer {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RabbitListener(queues = QueueConstants.RPC_QUEUE1)
    public void process(Message msg) {
        log.info("Server收到發送的消息爲: " + msg.toString());

        int millis = (int) (Math.random() * 2 * 1000);
        // 模擬處理業務邏輯
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // 數據處理,返回Message
        String msgBody = new String(msg.getBody());
        String newMessage = msgBody + ",sleep " + millis + " ms。";
        Message response = convertMessage(newMessage, msg.getMessageProperties().getCorrelationId());
        CorrelationData correlationData = new CorrelationData(msg.getMessageProperties().getCorrelationId());
        rabbitTemplate.sendAndReceive(QueueConstants.RPC_EXCHANGE, QueueConstants.RPC_QUEUE2, response, correlationData);
    }

    @RabbitListener(queues = QueueConstants.RPC_QUEUE2)
    public void receiveTopic2(Message msg) {
        System.out.println("...隊列2:" + msg.toString());
    }

    /**
     * 封裝消息
     *
     * @param s  消息
     * @param id 消息id
     * @return org.springframework.amqp.core.Message
     * @Author Liuyongfei
     * @Date 下午1:25 2020/5/27
     **/
    public Message convertMessage(String s, String id) {
        MessageProperties mp = new MessageProperties();
        byte[] src = s.getBytes(Charset.forName("UTF-8"));
        mp.setContentType("application/json");
        mp.setContentEncoding("UTF-8");
        mp.setCorrelationId(id);
        return new Message(src, mp);
    }
}

3. 客戶端向服務端發送消息

啓動 RPC 客戶端服務,使用postman 請求發送消息接口,發送一個 hello字符串:
rpcclient-postman.png微信

4. 服務端收到客戶端的消息

啓動 RPC 服務端服務,經過打斷點,查看收到的消息格式:
rpcserver-msg1.png網絡

從圖中咱們能夠看出:app

  • 生產者(RPC客戶端)發出的這條消息包含了標籤ID和回調隊列名稱,符合了 RPC 實現流程的第一步要求。

5. 服務端向指定的的回調隊列發送消息

在服務端,處理相關的業務邏輯後,須要將消息經過指定的回調隊列發送給客戶端。 一樣是經過藉助 sendAndReceive 來發送消息:負載均衡

// 數據處理,返回Message
        String msgBody = new String(msg.getBody());
        String newMessage = msgBody + ",sleep " + millis + " ms。";
        Message response = convertMessage(newMessage, msg.getMessageProperties().getCorrelationId());
        CorrelationData correlationData = new CorrelationData(msg.getMessageProperties().getCorrelationId());
        rabbitTemplate.sendAndReceive(QueueConstants.RPC_EXCHANGE, QueueConstants.RPC_QUEUE2, response, correlationData);
你可能會遇到的坑3
  • 必定要注意這裏使用的隊列爲回調隊列(RPC_QUEUE2);
你可能會遇到的坑4
  • 這裏在發送消息的時候必定要使用第四個參數 correlationData,不然客戶端有可能收不到數據;
  • 因爲客戶端在收到消息後要取 correlationId 與以前發出的消息的 correlationId 進行匹配,所以這裏在發送消息的時候必定要使用第四個參數 correlationData

6. 客戶端收到服務端迴應的消息

因爲客戶端已經設置了回調隊列監聽器,所以能夠監聽到 RPC 服務端返回的消息:
rabbitmq-return2.pngdom

6.1 客戶端根據correlationId來匹配消息

RPC 客戶端從回調隊列中根據標籤的信息(檢查 correlationId 屬性,若是與發送的消息 correlationId 匹配)獲取發送消息的返回結果,主要代碼以下:異步

// 獲取已發送的消息的惟一消息id
            String correlationId = newMessage.getMessageProperties().getCorrelationId();

            // 提取RPC迴應內容的header
            HashMap<String, Object> headers = (HashMap<String, Object>) result.getMessageProperties().getHeaders();

            // 獲取RPC迴應消息的消息id(備註:rabbitmq的配置參數裏面必須開啓spring.rabbitmq.publisher-confirms=true,不然headers裏沒有該項)
            String msgId = (String) headers.get("spring_returned_message_correlation");

            // 客戶端從回調隊列獲取消息,匹配與發送消息correlationId相同的消息爲應答結果
            if (msgId.equals(correlationId)) {
                // 提取RPC迴應內容body
                response = new String(result.getBody());
                log.info("收到RPCServer返回的消息爲:" + response);
            }

備註:

  • 回調隊列監聽器詳見 rabbitmqConfig 配置類。
你可能會遇到的坑5
  • 在 RPC 服務端返回的消息 headers 裏找不到 spring_returned_message_correlation 屬性:

rabbitmq-reutrn.png

那麼去確認一下在 `application.properties`裏是否開啓了發送確認:

# 開啓發送確認 
spring.rabbitmq.publisher-confirms=true 
# 開啓發送失敗退回(消息有沒有找到合適的隊列) spring.rabbitmq.publisher-returns=true

demo下載地址

  • https://github.com/liuyongfei...
  • 在本篇實例中,我將消息生產端和消費端部署爲兩個單獨的服務,你們克隆完畢後請切換到 feature/rabbitmq-rpc 分支進行啓動測試。
  • 歡迎你們關注微信公衆號:微信公衆號二維碼.jpg
相關文章
相關標籤/搜索