RocketMQ(5)---RocketMQ重試機制

RocketMQ重試機制

消息重試分爲兩種:Producer發送消息的重試 Consumer消息消費的重試java

1、Producer端重試

Producer端重試是指: Producer往MQ上發消息沒有發送成功,好比網絡緣由致使生產者發送消息到MQ失敗。數據庫

看一下代碼:apache

@Slf4j
public class RocketMQTest {
    /**
     * 生產者組
     */
    private static String PRODUCE_RGROUP = "test_producer";
  
    public static void main(String[] args) throws Exception {
        //一、建立生產者對象
        DefaultMQProducer producer = new DefaultMQProducer(PRODUCE_RGROUP);
        //設置重試次數(默認2次)
        producer.setRetryTimesWhenSendFailed(3000);
        //綁定name server
        producer.setNamesrvAddr("74.49.203.55:9876");
        producer.start();
        //建立消息
        Message message = new Message("topic_family", ("小小今年3歲" ).getBytes());
        //發送 這裏填寫超時時間是5毫秒 因此每次都會發送失敗
        SendResult sendResult = producer.send(message,5);
        log.info("輸出生產者信息={}",sendResult);
    }
}

超時重試 針對網上說的超時異常會重試的說法都是錯誤的,想一想都以爲可怕,我查的因此文章都說超時異常都會重試,難道這麼多人都沒有去測試一下 或者去看個源碼。服務器

我發現這個問題,是由於我上面超時時間設置爲5毫秒 ,按照正常確定會報超時異常,但我設置1次重試和3000次的重試,雖然最終都會報下面異常,但輸出錯誤時間報網絡

顯然不該該是一個級別。但測試發現不管我設置的多少次的重試次數,報異常的時間都差很少。異步

org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout

針對這個疑惑,我去看了源碼以後,才恍然大悟。函數

/**
     * 說明 抽取部分代碼
     */
    private SendResult sendDefaultImpl(Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout) {
        
        //一、獲取當前時間
        long beginTimestampFirst = System.currentTimeMillis();
        long beginTimestampPrev ;
        //二、去服務器看下有沒有主題消息
        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
        if (topicPublishInfo != null && topicPublishInfo.ok()) {
            boolean callTimeout = false;
            //三、經過這裏能夠很明顯看出 若是不是同步發送消息 那麼消息重試只有1次
            int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
            //四、根據設置的重試次數,循環再去獲取服務器主題消息
            for (times = 0; times < timesTotal; times++) {
                MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
                beginTimestampPrev = System.currentTimeMillis();
                long costTime = beginTimestampPrev - beginTimestampFirst;
                //五、先後時間對比 若是先後時間差 大於 設置的等待時間 那麼直接跳出for循環了 這就說明鏈接超時是不進行屢次鏈接重試的
                if (timeout < costTime) {
                    callTimeout = true;
                    break;

                }
                //六、若是超時直接報錯
                if (callTimeout) {
                    throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
                }
        }
    }

經過這段源碼很明顯能夠看出如下幾點測試

  1. 若是是異步發送 那麼重試次數只有1次
  2. 對於同步而言,超時異常也是不會再去重試
  3. 若是發生重試是在一個for 循環裏去重試,因此它是當即重試而不是隔一段時間去重試。

真是實踐出真知!!!this


2、 Consumer端重試

消費端比較有意思,並且在實際開發過程當中,咱們也更應該考慮的是消費端的重試。3d

消費者端的失敗主要分爲2種狀況,ExceptionTimeout

一、Exception

@Slf4j
@Component
public class Consumer {
    /**
     * 消費者實體對象
     */
    private DefaultMQPushConsumer consumer;
    /**
     * 消費者組
     */
    public static final String CONSUMER_GROUP = "test_consumer";
    /**
     * 經過構造函數 實例化對象
     */
    public Consumer() throws MQClientException {
        consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
        consumer.setNamesrvAddr("47.99.203.55:9876;47.99.203.55:9877");
        //訂閱topic和 tags( * 表明全部標籤)下信息
        consumer.subscribe("topic_family", "*");
        //註冊消費的監聽 並在此監聽中消費信息,並返回消費的狀態信息
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            //一、獲取消息
            Message msg = msgs.get(0);
            try {
                //二、消費者獲取消息
                String body = new String(msg.getBody(), "utf-8");
                //三、獲取重試次數
                int count = ((MessageExt) msg).getReconsumeTimes();
                log.info("當前消費重試次數爲 = {}", count);
                //四、這裏設置重試大於3次 那麼經過保存數據庫 人工來兜底
                if (count >= 2) {
                    log.info("該消息已經重試3次,保存數據庫。topic={},keys={},msg={}", msg.getTopic(), msg.getKeys(), body);
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
                //直接拋出異常
                throw new Exception("=======這裏出錯了============");
                //return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            } catch (Exception e) {
                e.printStackTrace();
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        });
        //啓動監聽
        consumer.start();
    }
}

這裏的代碼意思很明顯: 主動拋出一個異常,而後若是超過3次,那麼就不繼續重試下去,而是將該條記錄保存到數據庫由人工來兜底。

看下運行結果

注意 消費者和生產者的重試仍是有區別的,主要有兩點

一、默認重試次數:Product默認是2次,而Consumer默認是16次

二、重試時間間隔:Product是馬上重試,而Consumer是有必定時間間隔的。它照1S,5S,10S,30S,1M,2M····2H進行重試。

二、Timeout

說明 這裏的超時異常並不是真正意義上的超時,它指的是指獲取消息後,由於某種緣由沒有給RocketMQ返回消費的狀態,即沒有return ConsumeConcurrentlyStatus.CONSUME_SUCCESSreturn ConsumeConcurrentlyStatus.RECONSUME_LATER

那麼 RocketMQ會認爲該消息沒有發送,會一直髮送。由於它會認爲該消息根本就沒有發送給消費者,因此確定沒消費。

作這個測試很簡單。

//一、消費者得到消息
        String body = new String(msg.getBody(), "utf-8");
        //二、獲取重試次數
        int count = ((MessageExt) msg).getReconsumeTimes();
        log.info("當前消費重試次數爲 = {}", count);
        //三、這裏睡眠60秒
        Thread.sleep(60000);
       log.info("休眠60秒 看還能不能走到這裏。topic={},keys={},msg={}", msg.getTopic(), msg.getKeys(), body);
        //返回成功
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

當得到 當前消費重試次數爲 = 0 後 , 關掉該進程。再從新啓動該進程,那麼依然可以獲取該條消息

consumer消費者  當前消費重試次數爲 = 0
休眠60秒 看還能不能走到這裏。topic=topic_family,keys=1a2b3c4d5f,msg=小小今年3歲




只要本身變優秀了,其餘的事情纔會跟着好起來(上將2)
相關文章
相關標籤/搜索