其實最好的學習方式就是互相交流,最近也有跟網友討論了一些關於 RocketMQ 消息拉取與重平衡的問題,我姑且在這裏寫下個人一些總結。java
以前發表了一篇關於重平衡的文章:「Kafka 重平衡機制」,裏面有說到 RocketMQ 重平衡機制是每隔 20s 從任意一個 Broker 節點獲取消費組的消費 ID 以及訂閱信息,再根據這些訂閱信息進行分配,而後將分配到的信息封裝成 pullRequest 對象 pull 到 pullRequestQueue 隊列中,拉取線程喚醒後執行拉取任務,流程圖以下:apache
可是其中有一些是沒有詳細說的,好比每次拉消息都要等 20s 嗎?真的有個網友問了我以下問題:後端
很顯然他的項目是用了 push 模式進行消息拉取,要回答這個問題,就要從 RockeMQ 的消息拉取提及:緩存
RocketMQ 的 push 模式的實現是基於 pull 模式,只不過在 pull 模式上套了一層,因此RocketMQ push 模式並非真正意義上的 」推模式「,所以,在 push 模式下,消費者拉取完消息後,立馬就有開始下一個拉取任務,並不會真的等 20s 重平衡後才拉取,至於 push 模式是怎麼實現的,那就從源碼去找答案。併發
以前有寫過一篇文章:「RocketMQ爲何要保證訂閱關係的一致性?」,裏面有說過 消息拉取是從 PullRequestQueue 阻塞隊列中取出 PullRequest 拉取任務進行消息拉取的,但 PullRequest 是怎麼放進 PullRequestQueue 阻塞隊列中的呢?學習
RocketMQ 一共提供瞭如下方法:fetch
org.apache.rocketmq.client.impl.consumer.PullMessageService#executePullRequestImmediately:this
public void executePullRequestImmediately(final PullRequest pullRequest) { try { this.pullRequestQueue.put(pullRequest); } catch (InterruptedException e) { log.error("executePullRequestImmediately pullRequestQueue.put", e); } }
從調用鏈發現,除了重平衡會調用該方法以外,在 push 模式下,PullCallback 回調對象中的 onSuccess 方法在消息消費時,也調用了該方法:spa
org.apache.rocketmq.client.consumer.PullCallback#onSuccess:線程
case FOUND:
// 若是本次拉取消息爲空,則繼續將pullRequest放入阻塞隊列中 if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) { DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); } else { // 將消息放入消費者消費線程去執行 DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(// pullResult.getMsgFoundList(), // processQueue, // pullRequest.getMessageQueue(), // dispathToConsume); // 將pullRequest放入阻塞隊列中 DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); }
當從 broker 拉取到消息後,若是消息被過濾掉,則繼續將pullRequest放入阻塞隊列中繼續循環執行消息拉取任務,不然將消息放入消費者消費線程去執行,在pullRequest放入阻塞隊列中。
case NO_NEW_MESSAGE:
case NO_MATCHED_MSG:
pullRequest.setNextOffset(pullResult.getNextBeginOffset()); DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest); DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
若是從 broker 端沒有可拉取的新消息或者沒有匹配到消息,則將pullRequest放入阻塞隊列中繼續循環執行消息拉取任務。
從以上消息消費邏輯能夠看出,當消息處理完後,當即將 pullRequest 從新放入阻塞隊列中,所以這就很好解釋爲何 push 模式能夠持續拉取消息了:
在 push 模式下消息消費完後,還會調用該方法從新將 PullRequest 對象放進 PullRequestQueue 阻塞隊列中,不斷地從 broker 中拉取消息,實現 push 效果。
繼續再想一個問題,若是重平衡後,發現某個隊列被新的消費者分配了,怎麼辦,總不能繼續從該隊列中拉取消息吧?
RocketMQ 重平衡後會檢查 pullRequest 是否還在新分配的列表中,若是不在,則丟棄,調用 isDrop() 可查出該pullRequest是否已丟棄:
org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage:
final ProcessQueue processQueue = pullRequest.getProcessQueue(); if (processQueue.isDropped()) { log.info("the pull request[{}] is dropped.", pullRequest.toString()); return; }
在消息拉取以前,首先判斷該隊列是否被丟棄,若是已丟棄,則直接放棄本次拉取任務。
那何時隊列被丟棄呢?
org.apache.rocketmq.client.impl.consumer.RebalanceImpl#updateProcessQueueTableInRebalance:
Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator(); while (it.hasNext()) { Entry<MessageQueue, ProcessQueue> next = it.next(); MessageQueue mq = next.getKey(); ProcessQueue pq = next.getValue(); if (mq.getTopic().equals(topic)) { // 判斷當前緩存 MessageQueue 是否包含在最新的 mqSet 中,若是不存在則將隊列丟棄 if (!mqSet.contains(mq)) { pq.setDropped(true); if (this.removeUnnecessaryMessageQueue(mq, pq)) { it.remove(); changed = true; log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq); } } else if (pq.isPullExpired()) { // 若是隊列拉取過時則丟棄 switch (this.consumeType()) { case CONSUME_ACTIVELY: break; case CONSUME_PASSIVELY: pq.setDropped(true); if (this.removeUnnecessaryMessageQueue(mq, pq)) { it.remove(); changed = true; log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it", consumerGroup, mq); } break; default: break; } } } }
updateProcessQueueTableInRebalance 方法在重平衡時執行,用於更新 processQueueTable,它是當前消費者的隊列緩存列表,以上方法邏輯判斷當前緩存 MessageQueue 是否包含在最新的 mqSet 中,若是不包含其中,則說明通過此次重平衡後,該隊列被分配給其它消費者了,或者拉取時間間隔太大過時了,則調用 setDropped(true) 方法將隊列置爲丟棄狀態。
可能你會問,processQueueTable 跟 pullRequest 裏面 processQueue 有什麼關聯,往下看:
org.apache.rocketmq.client.impl.consumer.RebalanceImpl#updateProcessQueueTableInRebalance:
// 新建 ProcessQueue ProcessQueue pq = new ProcessQueue(); long nextOffset = this.computePullFromWhere(mq); if (nextOffset >= 0) { // 將ProcessQueue放入processQueueTable中 ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq); if (pre != null) { log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq); } else { log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq); PullRequest pullRequest = new PullRequest(); pullRequest.setConsumerGroup(consumerGroup); pullRequest.setNextOffset(nextOffset); pullRequest.setMessageQueue(mq); // 將ProcessQueue放入pullRequest拉取任務對象中 pullRequest.setProcessQueue(pq); pullRequestList.add(pullRequest); changed = true; } }
能夠看出,重平衡時會建立 ProcessQueue 對象,將其放入 processQueueTable 緩存隊列表中,再將其放入 pullRequest 拉取任務對象中,也就是 processQueueTable 中的 ProcessQueue 與 pullRequest 的中 ProcessQueue 是同一個對象。
以前在羣裏有個網友提了這個問題:
我當時回答他 RocketMQ 正常也是沒有重複消費,但後來發現其實 RocketMQ 在某些狀況下,也是會出現消息重複消費的現象。
前面講到,RocketMQ 消息消費時,會將消息放進消費線程中去執行,代碼以下:
org.apache.rocketmq.client.consumer.PullCallback#onSuccess:
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(// pullResult.getMsgFoundList(), // processQueue, // pullRequest.getMessageQueue(), // dispathToConsume);
ConsumeMessageService 類實現消息消費的邏輯,它有兩個實現類:
// 併發消息消費邏輯實現類 org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService; // 順序消息消費邏輯實現類 org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService;
先看併發消息消費相關處理邏輯:
ConsumeMessageConcurrentlyService:
org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.ConsumeRequest#run:
if (this.processQueue.isDropped()) { log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue); return; } // 消息消費邏輯 // ... // 若是隊列被設置爲丟棄狀態,則不提交消息消費進度 if (!processQueue.isDropped()) { ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this); } else { log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs); }
ConsumeRequest 是一個繼承了 Runnable 的類,它是消息消費核心邏輯的實現類,submitConsumeRequest 方法將 ConsumeRequest 放入 消費線程池中執行消息消費,從它的 run 方法中可看出,若是在執行消息消費邏輯中有節點加入,重平衡後該隊列被分配給其它節點進行消費了,此時的隊列被丟棄,則不提交消息消費進度,由於以前已經消費了,此時就會形成消息重複消費的狀況。
再來看看順序消費相關處理邏輯:
ConsumeMessageOrderlyService:
org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService.ConsumeRequest#run:
public void run() { // 判斷隊列是否被丟棄 if (this.processQueue.isDropped()) { log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue); return; } final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue); synchronized (objLock) { // 若是不是廣播模式,且隊列已加鎖且鎖沒有過時 if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel()) || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) { final long beginTime = System.currentTimeMillis(); for (boolean continueConsume = true; continueConsume; ) { // 再次判斷隊列是否被丟棄 if (this.processQueue.isDropped()) { log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue); break; } // 消息消費處理邏輯 // ... continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this); } else { continueConsume = false; } } } else { if (this.processQueue.isDropped()) { log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue); return; } ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100); } } }
RocketMQ 順序消息消費會將隊列鎖定,當隊列獲取鎖以後才能進行消費,因此,即便消息在消費過程當中有節點加入,重平衡後該隊列被分配給其它節點進行消費了,此時的隊列被丟棄,依然不會形成重複消費。
更多精彩文章請關注做者維護的公衆號「後端進階」,這是一個專一後端相關技術的公衆號。
關注公衆號並回復「後端」免費領取後端相關電子書籍。
歡迎分享,轉載請保留出處。