「 從0到1學習微服務SpringCloud 」11 補充篇 RabbitMq實現延遲消費和延遲重試

Mq的使用中,延遲隊列是不少業務都須要用到的,最近我也是剛在項目中用到,就在跟你們講講吧。git

何爲延遲隊列?

延遲隊列就是進入該隊列的消息會被延遲消費的隊列。而通常的隊列,消息一旦入隊了以後就會被消費者立刻消費。github

業務場景

延遲隊列能作什麼?最多見的是如下兩種場景:web

  • 消費

好比:用戶生成訂單以後,須要過一段時間校驗訂單的支付狀態,若是訂單仍未支付則須要及時地關閉訂單;用戶註冊成功以後,須要過一段時間好比一週後校驗用戶的使用狀況,若是發現用戶活躍度較低,則發送郵件或者短信來提醒用戶使用。spring

  • 重試

好比消費者從隊列裏消費消息時失敗了,可是想要延遲一段時間後自動重試。網絡

若是不使用延遲隊列,那麼咱們只能經過一個輪詢掃描程序去完成。這種方案既不優雅,也不方便作成統一的服務便於開發人員使用。可是使用延遲隊列的話,咱們就能夠垂手可得地完成。svg

實現思路

在介紹具體思路錢,先介紹RabbitMQ的兩個特性:Time-To-Live Extensions(消息存活時間) 和 Dead Letter Exchanges(死信交換機)學習

Time-To-Live Extensions

RabbitMQ容許咱們爲消息或者隊列設置TTL(time to live),也就是過時時間。TTL代表了一條消息可在隊列中存活的最大時間,單位爲毫秒。當某條消息被設置了TTL或者當某條消息進入了設置了TTL的隊列時,這條消息會在通過TTL秒後「死亡」,成爲Dead Letter(死信)。若是既配置了消息的TTL,又配置了隊列的TTL,那麼較小的那個值會被取用。測試

Dead Letter Exchanges

設置了TTL的消息在過時後會成爲Dead Letter。其實在RabbitMQ中,一共有三種消息的「死亡」形式:ui

  • 消息被拒絕。經過調用basic.reject或者basic.nack而且設置的requeue參數爲false。code

  • 消息由於設置了TTL而過時。

  • 消息進入了一條已經達到最大長度的隊列。

若是隊列設置了Dead Letter Exchange(DLX),那麼這些Dead Letter就會被從新publish(推送)到Dead Letter Exchange,經過Dead Letter Exchange路由到其餘隊列。

實現流程

延遲消費

延遲消費是延遲隊列最爲經常使用的使用模式。以下圖所示,生產者產生的消息首先會進入緩衝隊列(圖中紅色隊列)。經過RabbitMQ提供的TTL擴展,這些消息會被設置過時時間,等消息過時以後,這些消息會經過配置好的DLX轉發到實際消費隊列(圖中藍色隊列),以此達到延遲消費的效果。

image

延遲重試

延遲重試本質上也是延遲消費的一種。

以下圖所示,消費者發現該消息處理出現了異常,好比是由於網絡波動引發的異常。那麼若是不等待一段時間,直接就重試的話,極可能會致使在這期間內一直沒法成功,形成必定的資源浪費。那麼咱們能夠將其先放在緩衝隊列中(圖中紅色隊列),等消息通過一段的延遲時間後再次進入實際消費隊列中(圖中藍色隊列),此時因爲已通過了「較長」的時間了,異常的一些波動一般已經恢復,這些消息能夠被正常地消費。

image

代碼實現

這裏只貼上最主要的代碼,所有的代碼可查看github

1.延遲消費

Mq隊列與交換機實例建立

/**
     * 緩衝隊列
     */
    private String DELAY_BUFFER_QUEUE = "delay_buffer_queue";

    /**
     * 實際消費交換機(DLX)
     */
    private String DELAY_SERVICE_EXCHANGE = "delay_service_exchange";

    /**
     * 實際消費隊列
     */
    private String DELAY_SERVICE_QUEUE = "delay_service_queue";

    /**
     * 消息過時時間 3秒
     */
    private Integer QUEUE_EXPIRATION = 3 * 1000;

    /**
     * 實際消費隊列
     * @return
     */
    @Bean
    Queue delayServiceQueue(){
        return QueueBuilder.durable(DELAY_SERVICE_QUEUE).build();
    }

    /**
     * 實際消費交換機
     * @return
     */
    @Bean
    DirectExchange delayServiceExchange() {
        return new DirectExchange(DELAY_SERVICE_EXCHANGE);
    }

    /**
     * 實際消費隊列綁定實際消費交換機(DLX)
     * @param delayServiceQueue
     * @param delayServiceExchange
     * @return
     */
    @Bean
    Binding delayBinding(Queue delayServiceQueue, DirectExchange delayServiceExchange) {
        return BindingBuilder.bind(delayServiceQueue)
                .to(delayServiceExchange)
                .with(DELAY_SERVICE_QUEUE);
    }

    /**
     * 緩衝隊列配置
     * @return
     */
    @Bean
    Queue delayBufferQueue(){
        return QueueBuilder.durable(DELAY_BUFFER_QUEUE)
                // 死信交換機 DLX
                .withArgument("x-dead-letter-exchange", DELAY_SERVICE_EXCHANGE)
                // 目標routing key
                .withArgument("x-dead-letter-routing-key", DELAY_SERVICE_QUEUE)
                // 設置隊列的過時時間
                .withArgument("x-message-ttl", QUEUE_EXPIRATION)
                .build();
    }

監聽實際消費隊列

@Component
public class DelayMsgListener {
    @RabbitListener(queues="delay_service_queue")
    public void listenServiceMsg(Message message){
        System.out.println(new Date()+ "收到延遲消息啦:"+new String(message.getBody()));
    }
}

測試:發送消息到緩衝隊列

@Test
    public void send1(){
        System.out.println(new Date() +"發送延遲消息!!!");
        amqpTemplate.convertAndSend("delay_buffer_queue","Hello!Delay Message!");
    }

結果以下
能夠看到,在發消息後3秒(TTL),實際消費隊列接收到了消息並被消費

image

2.延遲重試

Mq隊列與交換機實例建立

  /**
     * 緩衝隊列
     */
    private String RETRY_BUFFER_QUEUE = "retry_buffer_queue";
    /**
     * 緩衝交換機
     */
    private String RETRY_BUFFER_EXCHANGE = "retry_buffer_exchange";

    /**
     * 實際消費交換機(DLX)
     */
    private String RETRY_SERVICE_EXCHANGE = "retry_service_exchange";

    /**
     * 實際消費隊列
     */
    private String RETRY_SERVICE_QUEUE = "retry_service_queue";


    /**
     * 實際消費隊列
     * @return
     */
    @Bean
    Queue retryServiceQueue(){
        return QueueBuilder.durable(RETRY_SERVICE_QUEUE).build();
    }

    /**
     * 實際消費交換機
     * @return
     */
    @Bean
    DirectExchange retryServiceExchange() {
        return new DirectExchange(RETRY_SERVICE_EXCHANGE);
    }

    /**
     * 實際消費隊列綁定實際消費交換機(DLX)
     * @param retryServiceQueue
     * @param retryServiceExchange
     * @return
     */
    @Bean
    Binding retryBinding(Queue retryServiceQueue, DirectExchange retryServiceExchange) {
        return BindingBuilder.bind(retryServiceQueue)
                .to(retryServiceExchange)
                .with(RETRY_SERVICE_QUEUE);
    }

    /**
     * 緩衝隊列配置
     * @return
     */
    @Bean
    Queue retryBufferQueue(){
        return QueueBuilder.durable(RETRY_BUFFER_QUEUE)
                // 死信交換機 DLX
                .withArgument("x-dead-letter-exchange", RETRY_SERVICE_EXCHANGE)
                // 目標routing key
                .withArgument("x-dead-letter-routing-key", RETRY_SERVICE_QUEUE)
                // 設置隊列的過時時間
                .withArgument("x-message-ttl", QUEUE_EXPIRATION)
                .build();
    }

    /**
     * 緩衝交換機
     * @return
     */
    @Bean
    DirectExchange retryBufferExchange() {
        return new DirectExchange(RETRY_BUFFER_EXCHANGE);
    }

    /**
     * 緩衝隊列綁定緩衝交換機
     * @param retryBufferQueue
     * @param retryBufferQueue
     * @return
     */
    @Bean
    Binding bufferBinding(Queue retryBufferQueue, DirectExchange retryBufferExchange) {
        return BindingBuilder.bind(retryBufferQueue)
                .to(retryBufferExchange)
                .with(RETRY_BUFFER_QUEUE);
    }

監聽實際消費隊列

@Component
public class RetryMsgListener {
    /**
     * 緩衝隊列
     */
    private String RETRY_BUFFER_QUEUE = "retry_buffer_queue";
    /**
     * 緩衝交換機
     */
    private String RETRY_BUFFER_EXCHANGE = "retry_buffer_exchange";

    @Autowired
    private MessagePropertiesConverter messagePropertiesConverter;

    @RabbitListener(queues="retry_service_queue")
    public void listenServiceMsg(@Payload Message message, Channel channel){
        try {
            System.out.println(new Date() + "收到消息:" + new String(message.getBody()));
            //TODO 業務邏輯
            //忽然出現異常
            throw  new RuntimeException("特殊異常");
        }catch (Exception e){
            Map<String,Object> headers = message.getMessageProperties().getHeaders();
            try{
                Long retryCount = getRetryCount(headers);
                //重試3次
                if(retryCount < 3){
                    retryCount += 1;
                    System.out.println("消費異常,準備重試,第"+retryCount+"次");

                    //轉換爲RabbitMQ 的Message Properties對象
                    AMQP.BasicProperties rabbitMQProperties =
                            messagePropertiesConverter.fromMessageProperties( message.getMessageProperties(), "UTF-8");
                    //設置headers
                    rabbitMQProperties.builder().headers(headers);
                    //程序異常重試
                    //這裏必須把rabbitMQProperties也傳進來,不然死信隊列沒法識別是不是同一條信息,致使重試次數沒法遞增
                    channel.basicPublish(RETRY_BUFFER_EXCHANGE,RETRY_BUFFER_QUEUE,rabbitMQProperties, message.getBody());
                }else {
                    //TODO 重試失敗,須要人工處理 (發送到失敗隊列或發郵件/信息)
                    System.out.println("已重試3次,需人工處理!");
                }
            }catch (IOException ioe){
                System.out.println("消息重試失敗!");
                ioe.printStackTrace();
            }
        }
    }

    /**
     * 獲取重試次數
     * 若是這條消息是死信,header中會有一個x-death的記錄相關信息
     * 其中包含死亡次數
     * @param headers
     * @return
     */
    private long getRetryCount(Map<String, Object> headers) {
        long retryCount = 0;
        if(null != headers) {
            if(headers.containsKey("x-death")) {
                List<Map<String, Object>> deathList = (List<Map<String, Object>>) headers.get("x-death");
                if(!deathList.isEmpty()) {
                    Map<String, Object> deathEntry = deathList.get(0);
                    retryCount = (Long)deathEntry.get("count");
                }
            }
        }
        return retryCount;
    }
}

測試:發送消息到實際消費隊列

@Test
    public void send2(){
        System.out.println(new Date() +"發送延遲重試消息!!!");
        //直接發消息到實際消費隊列
        amqpTemplate.convertAndSend("retry_service_queue","Hello!Retry Message!");
    }

結果以下:
能夠看到,消費異常後,重試了3次

image

延遲隊列在實際業務中是常常被用到的,同窗們最好都學學哦,代碼已上傳github
https://github.com/zhangwenkang0/springcloud-learning-from-0-to-1/tree/master/rabbitmq-demo

若是以爲不錯,分享給你的朋友!

一個立志成大腿而天天努力奮鬥的年輕人

伴學習伴成長,成長之路你並不孤單!

掃描二維碼,關注公衆號

相關文章
相關標籤/搜索