此次源碼學習的方法是帶着問題學習源碼實現,問題列表以下web
Consumer Group的概念是什麼?express
Consumer pull過程是怎樣的?服務器
Consumer 支持push嗎?網絡
Consumer 怎麼實現單隊列並行消費?app
Consumer 怎麼過濾消息?負載均衡
Consumer 怎麼保證一條消息只被Group中的一個服務消費?異步
Consumer 負載均衡怎麼實現?分佈式
Consumer 消費失敗怎麼辦?ide
Consumer 能夠回溯消費嗎?源碼分析
消息消費者,負責消費消息,通常是後臺系統負責異步消費。
一類 Consumer 的集合名稱,這類 Consumer 一般消費一類消息,且消費邏輯一致。通常狀況下group中Consumer的數量不能超過訂閱的topic中queue的數量,否則會有閒置的Consumer.
分析過Producer,看Consumer有種似曾相識的感受
主要邏輯
1. 根據mq信息去找broker路由信息 2. 根據相關參數構建請求頭 3. 委託netty去broker獲取消息
代碼走讀
MQPullConsumer.pull的參數需指定MessageQueue,和offset(位置偏移)的.
PullResult pull(final MessageQueue mq, final String subExpression, final long offset, final int maxNums) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
再看Pull操做的返回,有本次獲取的數據信息MessageExt,即位置信息offset
public class PullResult { //pull狀態 private final PullStatus pullStatus; //下次pull的偏移量 private final long nextBeginOffset; //最小偏移量 private final long minOffset; //最大偏移量 private final long maxOffset; //獲取到的消息 private List<MessageExt> msgFoundList; }
MQPullConsumer.pull
-> DefaultMQPullConsumer.pull
-> DefaultMQPullConsumerImpl.pull
-> DefaultMQPullConsumerImpl.pullSyncImpl
-> DefaultMQPullConsumerImpl.pullKernelImpl
public PullResult pullKernelImpl( final MessageQueue mq, final String subExpression, final String expressionType, final long subVersion, final long offset, final int maxNums, final int sysFlag, final long commitOffset, final long brokerSuspendMaxTimeMillis, final long timeoutMillis, final CommunicationMode communicationMode, final PullCallback pullCallback ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { //獲取broker信息 FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), this.recalculatePullFromWhichNode(mq), false); if (null == findBrokerResult) { this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()); findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), this.recalculatePullFromWhichNode(mq), false); } if (findBrokerResult != null) { { // check version if (!ExpressionType.isTagType(expressionType) && findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) { throw new MQClientException("The broker[" + mq.getBrokerName() + ", " + findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null); } } int sysFlagInner = sysFlag; if (findBrokerResult.isSlave()) { sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner); } // 構建pull請求頭 PullMessageRequestHeader requestHeader = new PullMessageRequestHeader(); requestHeader.setConsumerGroup(this.consumerGroup); requestHeader.setTopic(mq.getTopic()); requestHeader.setQueueId(mq.getQueueId()); requestHeader.setQueueOffset(offset); requestHeader.setMaxMsgNums(maxNums); requestHeader.setSysFlag(sysFlagInner); requestHeader.setCommitOffset(commitOffset); requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis); requestHeader.setSubscription(subExpression); requestHeader.setSubVersion(subVersion); requestHeader.setExpressionType(expressionType); String brokerAddr = findBrokerResult.getBrokerAddr(); if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) { brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr); } //委託Netty去獲取信息 PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage( brokerAddr, requestHeader, timeoutMillis, communicationMode, pullCallback); return pullResult; } throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null); }
pull消息比較簡單,一次請求返回,由Consumer管理offset.通常來講一個Consumer Group中Consumer的數量不能大於MessageQueue的數量.
Push Consumer
Consumer 的一種,應用一般向 Consumer 對象註冊一個 Listener 接口,一旦收到消息,Consumer 對象立
刻回調 Listener 接口方法。JMS標準中爲MessageListener類的onMessage方法.
Pull Consumer
Consumer 的一種,應用一般主動調用 Consumer 的拉消息方法從 Broker 拉消息,主動權由應用控制。
RocketMQ的Consumer都是從Broker pull消息來消費,可是爲了能作到實時收消息,RocketMQ 使用長輪詢方式,能夠保證消息實時性同Push方式一致。這種長輪詢方式相似於WebQQ收發消息機制。請參考如下信息瞭解更多Comet:基於 HTTP 長鏈接的「服務器推」技術
雖然RocketMQ的consumer都是經過pull來實現的可是其封裝了push接口,咱們先來看其使用方法
public static void main(String[] args) throws InterruptedException, MQClientException { /** * 一個應用建立一個Consumer,由應用來維護此對象,能夠設置爲全局對象或者單例 * 注意:ConsumerGroupName須要由應用來保證惟一 */ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testmerchantLeagueConsumerGroup"); consumer.setNamesrvAddr("ip:port"); /** * 訂閱指定topic下tags分別等於TagA或TagB */ consumer.subscribe("broker-a", "TagB || TagA"); /** * 設置Consumer第一次啓動是從隊列頭部開始消費仍是隊列尾部開始消費 * 若是非第一次啓動,那麼按照上次消費的位置繼續消費 */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //真正的處理消息邏輯在這裏 consumer.registerMessageListener(new MessageListenerConcurrently() { /** * 默認msgs裏只有一條消息,能夠經過設置consumeMessageBatchMaxSize參數來批量接收消息 */ @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs); MessageExt msg = msgs.get(0); if (msg.getTopic().equals("broker-a")) { // 執行TopicTest1的消費邏輯 if (msg.getTags() != null && msg.getTags().equals("TagA")) { // 執行TagA的消費 String message = new String(msg.getBody()); System.out.println(message); } else if (msg.getTags() != null && msg.getTags().equals("TagB")) { // 執行TagB的消費 String message = new String(msg.getBody()); System.out.println(message); } } //消費者向mq服務器返回消費成功的消息 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //Consumer對象在使用以前必需要調用start初始化,初始化一次便可 consumer.start(); }
RocketMQ push的實現 :
消息的拉取邏輯
維護一個pullRequestQueue,先放入一個pullRequest,當pullResult爲成功時,再構建新的pullRequest放入pullRequestQueue,另起一個線程監測pullRequestQueue,當起不爲空時,輪詢pull消息
DefaultMQPushConsumer.start
-> DefaultMQPushConsumerImpl.start
-> MQClientInstance.start
-> PullMessageService.start
咱們來看PullMessageService的run方法,
//請求消息阻塞鏈表 private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>(); @Override public void run() { log.info(this.getServiceName() + " service started"); //只要有請求就去pull消息 while (!this.isStopped()) { try { PullRequest pullRequest = this.pullRequestQueue.take(); if (pullRequest != null) { this.pullMessage(pullRequest); } } catch (InterruptedException e) { } catch (Exception e) { log.error("Pull Message Service Run Method exception", e); } } log.info(this.getServiceName() + " service end"); }
pullRequestQueue在在哪裏put呢?
在class裏找到在executePullRequestLater方法內會put
public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) { this.scheduledExecutorService.schedule(new Runnable() { @Override public void run() { PullMessageService.this.executePullRequestImmediately(pullRequest); } }, timeDelay, TimeUnit.MILLISECONDS); } public void executePullRequestImmediately(final PullRequest pullRequest) { try { this.pullRequestQueue.put(pullRequest); } catch (InterruptedException e) { log.error("executePullRequestImmediately pullRequestQueue.put", e); } }
查看此方法的調用關係,發如今run中的pullMessage方法中onSuccess回調中會構建下一次的pullRequestQueue待下次請求
PullCallback pullCallback = new PullCallback() { @Override public void onSuccess(PullResult pullResult) { if (pullResult != null) { pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult, subscriptionData); switch (pullResult.getPullStatus()) { case FOUND: long prevRequestOffset = pullRequest.getNextOffset(); //請求成功就構建新的pullRequest pullRequest.setNextOffset(pullResult.getNextBeginOffset()); long pullRT = System.currentTimeMillis() - beginTimestamp; DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullRT); long firstMsgOffset = Long.MAX_VALUE; if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) { DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); } else { firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset(); DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size()); boolean dispathToConsume = processQueue.putMessage(pullResult.getMsgFoundList()); DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(// pullResult.getMsgFoundList(), // processQueue, // pullRequest.getMessageQueue(), // dispathToConsume); //放到pullRequestQueue if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) { DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval()); } else { DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); } } if (pullResult.getNextBeginOffset() < prevRequestOffset// || firstMsgOffset < prevRequestOffset) { log.warn( "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}", // pullResult.getNextBeginOffset(), // firstMsgOffset, // prevRequestOffset); } break; case NO_NEW_MSG: pullRequest.setNextOffset(pullResult.getNextBeginOffset()); DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest); DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); break; case NO_MATCHED_MSG: pullRequest.setNextOffset(pullResult.getNextBeginOffset()); DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest); DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); break; case OFFSET_ILLEGAL: log.warn("the pull request offset illegal, {} {}", // pullRequest.toString(), pullResult.toString()); pullRequest.setNextOffset(pullResult.getNextBeginOffset()); pullRequest.getProcessQueue().setDropped(true); DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() { @Override public void run() { try { DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(), pullRequest.getNextOffset(), false); DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue()); DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue()); log.warn("fix the pull request offset, {}", pullRequest); } catch (Throwable e) { log.error("executeTaskLater Exception", e); } } }, 10000); break; default: break; } } }
至此獲取消息已經搞定,再看怎麼觸發MessageListener的消費方法.
仍是在DefaultMQPushConsumerImpl.pullMessage方法內的回調,有下列代碼,把消息提供給consumeMessageService處理.
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(// pullResult.getMsgFoundList(), // processQueue, // pullRequest.getMessageQueue(), // dispathToConsume);
構建ConsumeRequest,而後提交至線程池消費
@Override public void submitConsumeRequest(// final List<MessageExt> msgs, // final ProcessQueue processQueue, // final MessageQueue messageQueue, // final boolean dispatchToConsume) { final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize(); if (msgs.size() <= consumeBatchSize) { ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue); try { this.consumeExecutor.submit(consumeRequest); } catch (RejectedExecutionException e) { this.submitConsumeRequestLater(consumeRequest); } } else { for (int total = 0; total < msgs.size(); ) { List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize); for (int i = 0; i < consumeBatchSize; i++, total++) { if (total < msgs.size()) { msgThis.add(msgs.get(total)); } else { break; } } ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue); try { this.consumeExecutor.submit(consumeRequest); } catch (RejectedExecutionException e) { for (; total < msgs.size(); total++) { msgThis.add(msgs.get(total)); } this.submitConsumeRequestLater(consumeRequest); } } } }
終於在ConsumeRequest的run方法中找到了listner的consumeMessage
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
這下整個pull邏輯就完成了.
上節代碼就是取得並行的例子,簡單來講就是把消息提交給線程池,而不阻塞,就單隊列並行消費了
入口仍是在DefaultMQPushConsumerImpl.pullMessage
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult, subscriptionData);
往裏面看,發現有過濾消息的邏輯
public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult, final SubscriptionData subscriptionData) { PullResultExt pullResultExt = (PullResultExt) pullResult; this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId()); if (PullStatus.FOUND == pullResult.getPullStatus()) { ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary()); List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer); List<MessageExt> msgListFilterAgain = msgList; if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) { msgListFilterAgain = new ArrayList<MessageExt>(msgList.size()); for (MessageExt msg : msgList) { if (msg.getTags() != null) { if (subscriptionData.getTagsSet().contains(msg.getTags())) { msgListFilterAgain.add(msg); } } } } //消息過濾 if (this.hasHook()) { FilterMessageContext filterMessageContext = new FilterMessageContext(); filterMessageContext.setUnitMode(unitMode); filterMessageContext.setMsgList(msgListFilterAgain); this.executeHook(filterMessageContext); } for (MessageExt msg : msgListFilterAgain) { MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MIN_OFFSET, Long.toString(pullResult.getMinOffset())); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MAX_OFFSET, Long.toString(pullResult.getMaxOffset())); } pullResultExt.setMsgFoundList(msgListFilterAgain); } pullResultExt.setMessageBinary(null); return pullResult; }
由於topic的MessageQueue只能對應Group中的一個Consumer,因此一條消息只被Group中的一個服務消費
概念:
consumer同時消費多個MessageQueue,當topic中的MessageQueue變動時,動態調整消費MessageQueue的數量
//RebalanceImpl public void doRebalance(final boolean isOrder) { Map<String, SubscriptionData> subTable = this.getSubscriptionInner(); if (subTable != null) { for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) { final String topic = entry.getKey(); try { this.rebalanceByTopic(topic, isOrder); } catch (Throwable e) { if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn("rebalanceByTopic Exception", e); } } } } this.truncateMessageQueueNotMyTopic(); }
咱們只關心集羣模式
主要邏輯:
1. 獲取topic全部MessageQueue 2. 獲取同ConsumerGroup組全部Consumer信息 3. 根據制定策略分配給此Consumer
private void rebalanceByTopic(final String topic, final boolean isOrder) { switch (messageModel) { case BROADCASTING: { Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic); if (mqSet != null) { boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder); if (changed) { this.messageQueueChanged(topic, mqSet, mqSet); log.info("messageQueueChanged {} {} {} {}", // consumerGroup, // topic, // mqSet, // mqSet); } } else { log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic); } break; } case CLUSTERING: { //獲取該topic全部MessageQueue Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic); //獲取同consumerGroup信息 List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup); if (null == mqSet) { if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic); } } if (null == cidAll) { log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic); } if (mqSet != null && cidAll != null) { List<MessageQueue> mqAll = new ArrayList<MessageQueue>(); mqAll.addAll(mqSet); Collections.sort(mqAll); Collections.sort(cidAll); AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy; List<MessageQueue> allocateResult = null; try { //根據分配策略分配MessageQueue給當前Consumer allocateResult = strategy.allocate(// this.consumerGroup, // this.mQClientFactory.getClientId(), // mqAll, // cidAll); } catch (Throwable e) { log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(), e); return; } Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>(); if (allocateResult != null) { allocateResultSet.addAll(allocateResult); } boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder); if (changed) { log.info( "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}", strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(), allocateResultSet.size(), allocateResultSet); this.messageQueueChanged(topic, mqSet, allocateResultSet); } } break; } default: break; } }
RocketMQ提供了幾種策略供使用
實現類 | 策略名 |
---|---|
AllocateMessageQueueAveragelyByCircle | 輪詢平均分配策略 |
AllocateMessageQueueByMachineRoom | 根據機房分配策略 |
AllocateMessageQueueConsistentHash | 一致Hash分配策略 |
本節編寫參考分佈式消息隊列RocketMQ源碼分析之4 -- Consumer負載均衡與Kafka的Consumer負載均衡之不一樣點
Consumer 消費消息失敗後,要提供一種重試機制,令消息再消費一次。Consumer 消費消息失敗一般能夠認爲 有如下幾種狀況
因爲消息自己的緣由,例如反序列化失敗,消息數據自己沒法處理(例如話費充值,當前消息的手機號被註銷,沒法充值)等。這種錯誤一般須要跳過這條消息,再消費其餘消息,而這條失敗的消息即便馬上重試消費,99%也不成功,因此最好提供一種定時重試機制,即過 10s 秒後再重試。
因爲依賴的下游應用服務不可用,例如db鏈接不可用,外系統網絡不可達等。遇到這種錯誤,即便跳過當前失敗的消息,消費其餘消息一樣也會報錯。這種狀況建議應用 sleep 30s,再 消費下一條消息,這樣能夠減輕 Broker 重試消息的壓力。
具體到代碼實現,會根據消費狀態進行處理,當無返回時會重試.
if (null == status) { log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}", ConsumeMessageConcurrentlyService.this.consumerGroup, msgs, messageQueue); //設置狀態爲重試 status = ConsumeConcurrentlyStatus.RECONSUME_LATER; }
public void processConsumeResult(// final ConsumeConcurrentlyStatus status, // final ConsumeConcurrentlyContext context, // final ConsumeRequest consumeRequest// ) { int ackIndex = context.getAckIndex(); if (consumeRequest.getMsgs().isEmpty()) return; switch (status) { case CONSUME_SUCCESS: if (ackIndex >= consumeRequest.getMsgs().size()) { ackIndex = consumeRequest.getMsgs().size() - 1; } int ok = ackIndex + 1; int failed = consumeRequest.getMsgs().size() - ok; this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok); this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed); break; case RECONSUME_LATER: ackIndex = -1; this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), consumeRequest.getMsgs().size()); break; default: break; } switch (this.defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { MessageExt msg = consumeRequest.getMsgs().get(i); log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString()); } break; case CLUSTERING: List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size()); for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { MessageExt msg = consumeRequest.getMsgs().get(i); boolean result = this.sendMessageBack(msg, context); if (!result) { msg.setReconsumeTimes(msg.getReconsumeTimes() + 1); msgBackFailed.add(msg); } } if (!msgBackFailed.isEmpty()) { consumeRequest.getMsgs().removeAll(msgBackFailed); //請求重試消費 this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue()); } break; default: break; } long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs()); if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) { this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true); } }
回溯消費是指Consumer已經消費成功的消息,因爲業務上需求須要從新消費,要支持此功能,Broker在向Consumer投遞成功消息後,消息仍然須要保留。而且從新消費通常是按照時間維度,例如因爲Consumer系統故障,恢復後須要從新消費 1 小時前的數據,那麼Broker要提供一種機制,能夠按照時間維度來回退消費進度。RocketMQ支持按照時間回溯消費,時間維度精確到毫秒,能夠向前回溯,也能夠向後回溯。
邏輯: 請求broker按參數返回offset,按照offset重置消費offset,從而實現回溯消費
public Map<MessageQueue, Long> invokeBrokerToResetOffset(final String addr, final String topic, final String group, final long timestamp, final boolean isForce, final long timeoutMillis, boolean isC) throws RemotingException, MQClientException, InterruptedException { ResetOffsetRequestHeader requestHeader = new ResetOffsetRequestHeader(); requestHeader.setTopic(topic); requestHeader.setGroup(group); requestHeader.setTimestamp(timestamp); requestHeader.setForce(isForce); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.INVOKE_BROKER_TO_RESET_OFFSET, requestHeader); if (isC) { request.setLanguage(LanguageCode.CPP); } RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis); assert response != null; switch (response.getCode()) { case ResponseCode.SUCCESS: { if (response.getBody() != null) { ResetOffsetBody body = ResetOffsetBody.decode(response.getBody(), ResetOffsetBody.class); return body.getOffsetTable(); } } default: break; } throw new MQClientException(response.getCode(), response.getRemark()); }