消息重試分爲兩種:Producer發送消息的重試 和 Consumer消息消費的重試。java
Producer端重試是指: Producer往MQ上發消息沒有發送成功,好比網絡緣由致使生產者發送消息到MQ失敗。數據庫
看一下代碼:apache
@Slf4j public class RocketMQTest { /** * 生產者組 */ private static String PRODUCE_RGROUP = "test_producer"; public static void main(String[] args) throws Exception { //一、建立生產者對象 DefaultMQProducer producer = new DefaultMQProducer(PRODUCE_RGROUP); //設置重試次數(默認2次) producer.setRetryTimesWhenSendFailed(3000); //綁定name server producer.setNamesrvAddr("74.49.203.55:9876"); producer.start(); //建立消息 Message message = new Message("topic_family", ("小小今年3歲" ).getBytes()); //發送 這裏填寫超時時間是5毫秒 因此每次都會發送失敗 SendResult sendResult = producer.send(message,5); log.info("輸出生產者信息={}",sendResult); } }
超時重試
針對網上說的超時異常會重試的說法都是錯誤的,想一想都以爲可怕,我查的因此文章都說超時異常都會重試,難道這麼多人都沒有去測試一下 或者去看個源碼。服務器
我發現這個問題,是由於我上面超時時間設置爲5毫秒 ,按照正常確定會報超時異常,但我設置1次重試和3000次的重試,雖然最終都會報下面異常,但輸出錯誤時間報網絡
顯然不該該是一個級別。但測試發現不管我設置的多少次的重試次數,報異常的時間都差很少。異步
org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout
針對這個疑惑,我去看了源碼以後,才恍然大悟。函數
/** * 說明 抽取部分代碼 */ private SendResult sendDefaultImpl(Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout) { //一、獲取當前時間 long beginTimestampFirst = System.currentTimeMillis(); long beginTimestampPrev ; //二、去服務器看下有沒有主題消息 TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); if (topicPublishInfo != null && topicPublishInfo.ok()) { boolean callTimeout = false; //三、經過這裏能夠很明顯看出 若是不是同步發送消息 那麼消息重試只有1次 int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; //四、根據設置的重試次數,循環再去獲取服務器主題消息 for (times = 0; times < timesTotal; times++) { MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); beginTimestampPrev = System.currentTimeMillis(); long costTime = beginTimestampPrev - beginTimestampFirst; //五、先後時間對比 若是先後時間差 大於 設置的等待時間 那麼直接跳出for循環了 這就說明鏈接超時是不進行屢次鏈接重試的 if (timeout < costTime) { callTimeout = true; break; } //六、若是超時直接報錯 if (callTimeout) { throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout"); } } }
經過這段源碼很明顯能夠看出如下幾點測試
異步發送
那麼重試次數只有1次超時異常也是不會再去重試
。真是實踐出真知!!!this
消費端比較有意思,並且在實際開發過程當中,咱們也更應該考慮的是消費端的重試。3d
消費者端的失敗主要分爲2種狀況,Exception
和 Timeout
。
@Slf4j @Component public class Consumer { /** * 消費者實體對象 */ private DefaultMQPushConsumer consumer; /** * 消費者組 */ public static final String CONSUMER_GROUP = "test_consumer"; /** * 經過構造函數 實例化對象 */ public Consumer() throws MQClientException { consumer = new DefaultMQPushConsumer(CONSUMER_GROUP); consumer.setNamesrvAddr("47.99.203.55:9876;47.99.203.55:9877"); //訂閱topic和 tags( * 表明全部標籤)下信息 consumer.subscribe("topic_family", "*"); //註冊消費的監聽 並在此監聽中消費信息,並返回消費的狀態信息 consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { //一、獲取消息 Message msg = msgs.get(0); try { //二、消費者獲取消息 String body = new String(msg.getBody(), "utf-8"); //三、獲取重試次數 int count = ((MessageExt) msg).getReconsumeTimes(); log.info("當前消費重試次數爲 = {}", count); //四、這裏設置重試大於3次 那麼經過保存數據庫 人工來兜底 if (count >= 2) { log.info("該消息已經重試3次,保存數據庫。topic={},keys={},msg={}", msg.getTopic(), msg.getKeys(), body); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } //直接拋出異常 throw new Exception("=======這裏出錯了============"); //return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (Exception e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } }); //啓動監聽 consumer.start(); } }
這裏的代碼意思很明顯: 主動拋出一個異常,而後若是超過3次,那麼就不繼續重試下去,而是將該條記錄保存到數據庫由人工來兜底。
看下運行結果
注意
消費者和生產者的重試仍是有區別的,主要有兩點
一、默認重試次數:Product默認是2次,而Consumer默認是16次。
二、重試時間間隔:Product是馬上重試,而Consumer是有必定時間間隔的。它照1S,5S,10S,30S,1M,2M····2H
進行重試。
說明
這裏的超時異常並不是真正意義上的超時,它指的是指獲取消息後,由於某種緣由沒有給RocketMQ返回消費的狀態,即沒有return ConsumeConcurrentlyStatus.CONSUME_SUCCESS
或 return ConsumeConcurrentlyStatus.RECONSUME_LATER
。
那麼 RocketMQ會認爲該消息沒有發送,會一直髮送。由於它會認爲該消息根本就沒有發送給消費者,因此確定沒消費。
作這個測試很簡單。
//一、消費者得到消息 String body = new String(msg.getBody(), "utf-8"); //二、獲取重試次數 int count = ((MessageExt) msg).getReconsumeTimes(); log.info("當前消費重試次數爲 = {}", count); //三、這裏睡眠60秒 Thread.sleep(60000); log.info("休眠60秒 看還能不能走到這裏。topic={},keys={},msg={}", msg.getTopic(), msg.getKeys(), body); //返回成功 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
當得到 當前消費重試次數爲 = 0 後 , 關掉該進程。再從新啓動該進程,那麼依然可以獲取該條消息
consumer消費者 當前消費重試次數爲 = 0 休眠60秒 看還能不能走到這裏。topic=topic_family,keys=1a2b3c4d5f,msg=小小今年3歲
只要本身變優秀了,其餘的事情纔會跟着好起來(上將2)