https://zhuanlan.zhihu.com/p/25140744 中剖析過,consumer的每一個實例是靠隊列分配來決定如何消費消息的。那麼消費進度具體是如何管理的,又是如何保證消息成功消費的(RocketMQ有保證消息確定消費成功的特性(失敗則重試)?java
本文將詳細解析消息具體是如何ack的,又是如何保證消費確定成功的。git
因爲以上工做全部的機制都實如今PushConsumer中,因此本文的原理均只適用於RocketMQ中的PushConsumer即Java客戶端中的DefaultPushConsumer。 若使用了PullConsumer模式,相似的工做如何ack,如何保證消費等均須要使用方本身實現。github
注:廣播消費和集羣消費的處理有部分區別,如下均特指集羣消費(CLSUTER),廣播(BROADCASTING)下部分可能不適用。數據庫
PushConsumer爲了保證消息確定消費成功,只有使用方明確表示消費成功,RocketMQ纔會認爲消息消費成功。中途斷電,拋出異常等都不會認爲成功——即都會從新投遞。緩存
消費的時候,咱們須要注入一個消費回調,具體sample代碼以下:運維
consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs); doMyJob();//執行真正消費 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } });
業務實現消費回調的時候,當且僅當此回調函數返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS,RocketMQ纔會認爲這批消息(默認是1條)是消費完成的。(具體如何ACK見後面章節)ide
若是這時候消息消費失敗,例如數據庫異常,餘額不足扣款失敗等一切業務認爲消息須要重試的場景,只要返回ConsumeConcurrentlyStatus.RECONSUME_LATER,RocketMQ就會認爲這批消息消費失敗了。函數
爲了保證消息是確定被至少消費成功一次,RocketMQ會把這批消息重發回Broker(topic不是原topic而是這個消費租的RETRY topic),在延遲的某個時間點(默認是10秒,業務可設置)後,再次投遞到這個ConsumerGroup。而若是一直這樣重複消費都持續失敗到必定次數(默認16次),就會投遞到DLQ死信隊列。應用能夠監控死信隊列來作人工干預。oop
注:性能
當新實例啓動的時候,PushConsumer會拿到本消費組broker已經記錄好的消費進度(consumer offset),按照這個進度發起本身的第一次Pull請求。
若是這個消費進度在Broker並無存儲起來,證實這個是一個全新的消費組,這時候客戶端有幾個策略能夠選擇:
CONSUME_FROM_LAST_OFFSET //默認策略,從該隊列最尾開始消費,即跳過歷史消息 CONSUME_FROM_FIRST_OFFSET //從隊列最開始開始消費,即歷史消息(還儲存在broker的)所有消費一遍 CONSUME_FROM_TIMESTAMP//從某個時間點開始消費,和setConsumeTimestamp()配合使用,默認是半個小時之前
因此,社區中常常有人問:「爲何我設了CONSUME_FROM_LAST_OFFSET,歷史的消息仍是被消費了」? 緣由就在於只有全新的消費組纔會使用到這些策略,老的消費組都是按已經存儲過的消費進度繼續消費。
對於老消費組想跳過歷史消息能夠採用如下兩種方法:
RocketMQ是以consumer group+queue爲單位是管理消費進度的,以一個consumer offset標記這個這個消費組在這條queue上的消費進度。
若是某已存在的消費組出現了新消費實例的時候,依靠這個組的消費進度,就能夠判斷第一次是從哪裏開始拉取的。
每次消息成功後,本地的消費進度會被更新,而後由定時器定時同步到broker,以此持久化消費進度。
可是每次記錄消費進度的時候,只會把一批消息中最小的offset值爲消費進度值,以下圖:
這鐘方式和傳統的一條message單獨ack的方式有本質的區別。性能上提高的同時,會帶來一個潛在的重複問題——因爲消費進度只是記錄了一個下標,就可能出現拉取了100條消息如 2101-2200的消息,後面99條都消費結束了,只有2101消費一直沒有結束的狀況。
在這種狀況下,RocketMQ爲了保證消息確定被消費成功,消費進度職能維持在2101,直到2101也消費結束了,本地的消費進度纔會一會兒更新到2200。
在這種設計下,就有消費大量重複的風險。如2101在尚未消費完成的時候消費實例忽然退出(機器斷電,或者被kill)。這條queue的消費進度仍是維持在2101,當queue從新分配給新的實例的時候,新的實例從broker上拿到的消費進度仍是維持在2101,這時候就會又從2101開始消費,2102-2200這批消息實際上已經被消費過仍是會投遞一次。
對於這個場景,3.2.6以前的RocketMQ無能爲力,因此業務必需要保證消息消費的冪等性,這也是RocketMQ官方屢次強調的態度。
實際上,從源碼的角度上看,RocketMQ多是考慮過這個問題的,截止到3.2.6的版本的源碼中,能夠看到爲了緩解這個問題的影響面,DefaultMQPushConsumer中有個配置consumeConcurrentlyMaxSpan
/** * Concurrently max span offset.it has no effect on sequential consumption */ private int consumeConcurrentlyMaxSpan = 2000;
這個值默認是2000,當RocketMQ發現本地緩存的消息的最大值-最小值差距大於這個值(2000)的時候,會觸發流控——也就是說若是頭尾都卡住了部分消息,達到了這個閾值就再也不拉取消息。
但做用實際頗有限,像剛剛這個例子,2101的消費是死循環,其餘消費很是正常的話,是無能爲力的。一旦退出,在不人工干預的狀況下,2101後全部消息所有重複。
對於這個卡消費進度的問題,最顯而易見的解法是設定一個超時時間,達到超時時間的那個消費看成消費失敗處理。
後來RocketMQ顯然也發現了這個問題,而RocketMQ在3.5.8以後也就是採用這樣的方案去解決這個問題。
核心源碼以下:
//ConsumeMessageConcurrentlyService.java public void start() { this.CleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() { @Override public void run() { cleanExpireMsg(); } }, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES); } //ConsumeMessageConcurrentlyService.java private void cleanExpireMsg() { Iterator<Map.Entry<MessageQueue, ProcessQueue>> it = this.defaultMQPushConsumerImpl.getRebalanceImpl().getProcessQueueTable().entrySet().iterator(); while (it.hasNext()) { Map.Entry<MessageQueue, ProcessQueue> next = it.next(); ProcessQueue pq = next.getValue(); pq.cleanExpiredMsg(this.defaultMQPushConsumer); } } //ProcessQueue.java public void cleanExpiredMsg(DefaultMQPushConsumer pushConsumer) { if (pushConsumer.getDefaultMQPushConsumerImpl().isConsumeOrderly()) { return; } int loop = msgTreeMap.size() < 16 ? msgTreeMap.size() : 16; for (int i = 0; i < loop; i++) { MessageExt msg = null; try { this.lockTreeMap.readLock().lockInterruptibly(); try { if (!msgTreeMap.isEmpty() && System.currentTimeMillis() - Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msgTreeMap.firstEntry().getValue())) > pushConsumer.getConsumeTimeout() * 60 * 1000) { msg = msgTreeMap.firstEntry().getValue(); } else { break; } } finally { this.lockTreeMap.readLock().unlock(); } } catch (InterruptedException e) { log.error("getExpiredMsg exception", e); } try { pushConsumer.sendMessageBack(msg, 3); log.info("send expire msg back. topic={}, msgId={}, storeHost={}, queueId={}, queueOffset={}", msg.getTopic(), msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(), msg.getQueueOffset()); try { this.lockTreeMap.writeLock().lockInterruptibly(); try { if (!msgTreeMap.isEmpty() && msg.getQueueOffset() == msgTreeMap.firstKey()) { try { msgTreeMap.remove(msgTreeMap.firstKey()); } catch (Exception e) { log.error("send expired msg exception", e); } } } finally { this.lockTreeMap.writeLock().unlock(); } } catch (InterruptedException e) { log.error("getExpiredMsg exception", e); } } catch (Exception e) { log.error("send expired msg exception", e); } } }
經過源碼看這個方案,其實能夠看出有幾個不太完善的問題: