說在前面java
請求處理 通知消費者訂閱信息發生了改變算法
源碼解析apache
進入這個方法,org.apache.rocketmq.client.impl.ClientRemotingProcessor#notifyConsumerIdsChanged緩存
public RemotingCommand notifyConsumerIdsChanged(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {try {final NotifyConsumerIdsChangedRequestHeader requestHeader =(NotifyConsumerIdsChangedRequestHeader) request.decodeCommandCustomHeader(NotifyConsumerIdsChangedRequestHeader.class);log.info("receive broker's notification[{}], the consumer group: {} changed, rebalance immediately",RemotingHelper.parseChannelRemoteAddr(ctx.channel()),requestHeader.getConsumerGroup());this.mqClientFactory.rebalanceImmediately();} catch (Exception e) {log.error("notifyConsumerIdsChanged exception", RemotingHelper.exceptionSimpleDesc(e));}return null; }
進入這個方法,org.apache.rocketmq.client.impl.factory.MQClientInstance#rebalanceImmediately微信
public void rebalanceImmediately() {this.rebalanceService.wakeup(); }
進入這個方法,org.apache.rocketmq.common.ServiceThread#wakeup負載均衡
public void wakeup() {if (hasNotified.compareAndSet(false, true)) {waitPoint.countDown(); // notify org.apache.rocketmq.client.impl.consumer.RebalanceService.run() 在await} }
進入這個方法,org.apache.rocketmq.client.impl.consumer.RebalanceService#rundom
@Overridepublic void run() {log.info(this.getServiceName() + " service started");while (!this.isStopped()) {this.waitForRunning(waitInterval);// 負載均衡處理 =》this.mqClientFactory.doRebalance();}log.info(this.getServiceName() + " service end"); }
進入這個方法,負載均衡處理,org.apache.rocketmq.client.impl.factory.MQClientInstance#doRebalance,org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl#doRebalanceide
@Overridepublic void doRebalance() {if (this.rebalanceImpl != null) {// =》this.rebalanceImpl.doRebalance(false);} }
進入這個方法,org.apache.rocketmq.client.impl.consumer.RebalanceImpl#doRebalancethis
public void doRebalance(final boolean isOrder) {// 獲取topic的訂閱信息=》Map<String, SubscriptionData> subTable = this.getSubscriptionInner();if (subTable != null) {for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {final String topic = entry.getKey();try {// 根據topic進行負載均衡處理=》this.rebalanceByTopic(topic, isOrder);} catch (Throwable e) {if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {log.warn("rebalanceByTopic Exception", e);}}}}// 根據topic刪除消息隊列=》this.truncateMessageQueueNotMyTopic(); }
進入這個方法,根據topic進行負載均衡處理,org.apache.rocketmq.client.impl.consumer.RebalanceImpl#rebalanceByTopicspa
private void rebalanceByTopic(final String topic, final boolean isOrder) {switch (messageModel) {case BROADCASTING: {// 獲取topic的消息隊列Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);if (mqSet != null) {// 更新負載均衡處理隊列表 =》boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);if (changed) {// 消息隊列發生改變=》this.messageQueueChanged(topic, mqSet, mqSet);log.info("messageQueueChanged {} {} {} {}",consumerGroup,topic,mqSet,mqSet);}} else {log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);}break;}case CLUSTERING: {// 從topic訂閱信息中獲取消息隊列Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);// 獲取消費者=》List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);if (null == mqSet) {if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);}}if (null == cidAll) {log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);}if (mqSet != null && cidAll != null) {List<MessageQueue> mqAll = new ArrayList<MessageQueue>();mqAll.addAll(mqSet);Collections.sort(mqAll);Collections.sort(cidAll);AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;List<MessageQueue> allocateResult = null;try {// 3九、消費隊列分配策略算法=》allocateResult = strategy.allocate(this.consumerGroup,this.mQClientFactory.getClientId(),mqAll,cidAll);} catch (Throwable e) {log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),e);return;}Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();if (allocateResult != null) {allocateResultSet.addAll(allocateResult);}// 更新處理隊列=》boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);if (changed) {log.info("rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),allocateResultSet.size(), allocateResultSet);// 消息隊列更新=》this.messageQueueChanged(topic, mqSet, allocateResultSet);}}break;}default:break;} }
進入這個方法,更新負載均衡處理隊列表,org.apache.rocketmq.client.impl.consumer.RebalanceImpl#updateProcessQueueTableInRebalance
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,final boolean isOrder) {boolean changed = false;// 遍歷消息隊列的處理隊列Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();while (it.hasNext()) {Entry<MessageQueue, ProcessQueue> next = it.next();MessageQueue mq = next.getKey();ProcessQueue pq = next.getValue();if (mq.getTopic().equals(topic)) {if (!mqSet.contains(mq)) {// 若是消息隊列中不包含證實已刪掉了pq.setDropped(true);// 刪掉不須要的消息隊列if (this.removeUnnecessaryMessageQueue(mq, pq)) {it.remove();changed = true;log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);}// 處理隊列pull請求過時了} else if (pq.isPullExpired()) {switch (this.consumeType()) {case CONSUME_ACTIVELY:break;case CONSUME_PASSIVELY:pq.setDropped(true);if (this.removeUnnecessaryMessageQueue(mq, pq)) {it.remove();changed = true;log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",consumerGroup, mq);}break;default:break;}}}}List<PullRequest> pullRequestList = new ArrayList<PullRequest>();for (MessageQueue mq : mqSet) {if (!this.processQueueTable.containsKey(mq)) {if (isOrder && !this.lock(mq)) {log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);continue;}// 刪除offset髒數據=》this.removeDirtyOffset(mq);ProcessQueue pq = new ProcessQueue();// 計算要拉取消息的下個offset=》long nextOffset = this.computePullFromWhere(mq);if (nextOffset >= 0) {// 更新處理隊列的下個offset=》ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);if (pre != null) {log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);} else {log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);PullRequest pullRequest = new PullRequest();pullRequest.setConsumerGroup(consumerGroup);pullRequest.setNextOffset(nextOffset);pullRequest.setMessageQueue(mq);pullRequest.setProcessQueue(pq);pullRequestList.add(pullRequest);changed = true;}} else {log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);}}}// 轉發pull請求=》this.dispatchPullRequest(pullRequestList);return changed; }
進入這個方法,計算要拉取消息的下個offset,org.apache.rocketmq.client.impl.consumer.RebalancePullImpl#computePullFromWhere
@Overridepublic long computePullFromWhere(MessageQueue mq) {return 0; }
往上返回到這個方法,消息隊列發生改變,org.apache.rocketmq.client.impl.consumer.RebalancePullImpl#messageQueueChanged
@Overridepublic void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {MessageQueueListener messageQueueListener = this.defaultMQPullConsumerImpl.getDefaultMQPullConsumer().getMessageQueueListener();if (messageQueueListener != null) {try {// 消息隊列監聽器處理=》messageQueueListener.messageQueueChanged(topic, mqAll, mqDivided);} catch (Throwable e) {log.error("messageQueueChanged exception", e);}} }
進入這個方法,消息隊列監聽器處理,org.apache.rocketmq.client.consumer.MQPullConsumerScheduleService.MessageQueueListenerImpl#messageQueueChanged
@Overridepublic void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {// 獲取默認mq拉去消息服務consumer的消息消費類型是集羣消費仍是廣播消費,集羣消費是爲了集羣中節點能夠負載均衡// 默認值是集羣消費MessageModel messageModel =MQPullConsumerScheduleService.this.defaultMQPullConsumer.getMessageModel();switch (messageModel) {// 廣播消費=》case BROADCASTING:MQPullConsumerScheduleService.this.putTask(topic, mqAll);break;// 集羣消費=》case CLUSTERING:MQPullConsumerScheduleService.this.putTask(topic, mqDivided);break;default:break;} }
進入這個方法,消息消費,org.apache.rocketmq.client.consumer.MQPullConsumerScheduleService#putTask
public void putTask(String topic, Set<MessageQueue> mqNewSet) {// 遍歷拉取任務信息Iterator<Entry<MessageQueue, PullTaskImpl>> it = this.taskTable.entrySet().iterator();while (it.hasNext()) {Entry<MessageQueue, PullTaskImpl> next = it.next();if (next.getKey().getTopic().equals(topic)) {if (!mqNewSet.contains(next.getKey())) {next.getValue().setCancelled(true);it.remove();}}}for (MessageQueue mq : mqNewSet) {// 消息隊列不在消息拉取服務緩存中建立消息隊列的消息拉取服務並當即開始執行消息拉取服務if (!this.taskTable.containsKey(mq)) {PullTaskImpl command = new PullTaskImpl(mq);this.taskTable.put(mq, command);this.scheduledThreadPoolExecutor.schedule(command, 0, TimeUnit.MILLISECONDS);}} }
進入這個方法,拉取消息,org.apache.rocketmq.client.consumer.MQPullConsumerScheduleService.PullTaskImpl#run
@Overridepublic void run() {String topic = this.messageQueue.getTopic();if (!this.isCancelled()) {PullTaskCallback pullTaskCallback =MQPullConsumerScheduleService.this.callbackTable.get(topic);if (pullTaskCallback != null) {final PullTaskContext context = new PullTaskContext();context.setPullConsumer(MQPullConsumerScheduleService.this.defaultMQPullConsumer);try {// 執行消息拉取的回調方法,這個須要客戶端實現pullTaskCallback.doPullTask(this.messageQueue, context);} catch (Throwable e) {context.setPullNextDelayTimeMillis(1000);log.error("doPullTask Exception", e);}// 出現異常,1000ms後從新拉取if (!this.isCancelled()) {MQPullConsumerScheduleService.this.scheduledThreadPoolExecutor.schedule(this,context.getPullNextDelayTimeMillis(), TimeUnit.MILLISECONDS);} else {log.warn("The Pull Task is cancelled after doPullTask, {}", messageQueue);}} else {log.warn("Pull Task Callback not exist , {}", topic);}} else {log.warn("The Pull Task is cancelled, {}", messageQueue);} }
進入這個方法,集羣消費的方式,按topic、消費組獲取消費者,org.apache.rocketmq.client.impl.factory.MQClientInstance#findConsumerIdList
public List<String> findConsumerIdList(final String topic, final String group) {// 獲取broker master地址String brokerAddr = this.findBrokerAddrByTopic(topic);if (null == brokerAddr) {// 從namesrv更新topic路由=》this.updateTopicRouteInfoFromNameServer(topic);brokerAddr = this.findBrokerAddrByTopic(topic);}if (null != brokerAddr) {try {// 獲取消費者=》return this.mQClientAPIImpl.getConsumerIdListByGroup(brokerAddr, group, 3000);} catch (Exception e) {log.warn("getConsumerIdListByGroup exception, " + brokerAddr + " " + group, e);}}return null; }
進入這個方法,獲取broker master地址,org.apache.rocketmq.client.impl.factory.MQClientInstance#findBrokerAddrByTopic
public String findBrokerAddrByTopic(final String topic) {// 獲取topic理由信息TopicRouteData topicRouteData = this.topicRouteTable.get(topic);if (topicRouteData != null) {List<BrokerData> brokers = topicRouteData.getBrokerDatas();if (!brokers.isEmpty()) {int index = random.nextInt(brokers.size());BrokerData bd = brokers.get(index % brokers.size());return bd.selectBrokerAddr();}}return null; }
往上返回到這個方法,從namesrv更新topic路由,org.apache.rocketmq.client.impl.factory.MQClientInstance#updateTopicRouteInfoFromNameServer(java.lang.String, boolean, org.apache.rocketmq.client.producer.DefaultMQProducer)
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,DefaultMQProducer defaultMQProducer) {try {if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {try {TopicRouteData topicRouteData;if (isDefault && defaultMQProducer != null) {// 獲取默認的topic路由信息 =》topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),1000 * 3);if (topicRouteData != null) {// 獲取隊列信息for (QueueData data : topicRouteData.getQueueDatas()) {// 讀寫隊列最大數量4int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());data.setReadQueueNums(queueNums);data.setWriteQueueNums(queueNums);}}} else {// 獲取topic路由信息=》topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);}if (topicRouteData != null) {TopicRouteData old = this.topicRouteTable.get(topic);// 判斷topic路由是否改變=》boolean changed = topicRouteDataIsChange(old, topicRouteData);if (!changed) {// 須要更新路由信息changed = this.isNeedUpdateTopicRouteInfo(topic);} else {log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);}if (changed) {TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();for (BrokerData bd : topicRouteData.getBrokerDatas()) {// 更新broker的地址列表this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());}// Update Pub info{// topic路由信息轉換成topic發佈信息=》TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);publishInfo.setHaveTopicRouterInfo(true);// 遍歷生產者信息Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();while (it.hasNext()) {Entry<String, MQProducerInner> entry = it.next();MQProducerInner impl = entry.getValue();if (impl != null) {// 更新topic發佈信息=》impl.updateTopicPublishInfo(topic, publishInfo);}}}// Update sub info{// 獲取消息隊列訂閱信息Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);// 遍歷消費者Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();while (it.hasNext()) {Entry<String, MQConsumerInner> entry = it.next();MQConsumerInner impl = entry.getValue();if (impl != null) {// 更新topic的訂閱信息=》impl.updateTopicSubscribeInfo(topic, subscribeInfo);}}}log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);this.topicRouteTable.put(topic, cloneTopicRouteData);return true;}} else {log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}", topic);}} catch (Exception e) {if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {log.warn("updateTopicRouteInfoFromNameServer Exception", e);}} finally {this.lockNamesrv.unlock();}} else {log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms", LOCK_TIMEOUT_MILLIS);}} catch (InterruptedException e) {log.warn("updateTopicRouteInfoFromNameServer Exception", e);}return false; }
進入這個方法, 獲取默認的topic路由信息,org.apache.rocketmq.client.impl.MQClientAPIImpl#getTopicRouteInfoFromNameServer(java.lang.String, long, boolean)
public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis,boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();requestHeader.setTopic(topic);RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader);// 同步獲取topic的路由信息=》RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);assert response != null;switch (response.getCode()) {case ResponseCode.TOPIC_NOT_EXIST: {if (allowTopicNotExist && !topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {log.warn("get Topic [{}] RouteInfoFromNameServer is not exist value", topic);}break;}case ResponseCode.SUCCESS: {byte[] body = response.getBody();if (body != null) {return TopicRouteData.decode(body, TopicRouteData.class);}}default:break;}throw new MQClientException(response.getCode(), response.getRemark()); }
進入這個方法,同步獲取topic的路由信息,org.apache.rocketmq.remoting.netty.NettyRemotingClient#invokeSync 前面介紹過了。
往上返回到這個方法,獲取topic路由信息,org.apache.rocketmq.client.impl.MQClientAPIImpl#getTopicRouteInfoFromNameServer(java.lang.String, long, boolean) 前面介紹了。
往上返回到這個方法,判斷topic路由是否改變,org.apache.rocketmq.client.impl.factory.MQClientInstance#topicRouteDataIsChange
private boolean topicRouteDataIsChange(TopicRouteData olddata, TopicRouteData nowdata) {if (olddata == null || nowdata == null)return true;TopicRouteData old = olddata.cloneTopicRouteData();TopicRouteData now = nowdata.cloneTopicRouteData();Collections.sort(old.getQueueDatas());Collections.sort(old.getBrokerDatas());Collections.sort(now.getQueueDatas());Collections.sort(now.getBrokerDatas());return !old.equals(now);}
進入這個方法,須要更新路由信息,org.apache.rocketmq.client.impl.factory.MQClientInstance#isNeedUpdateTopicRouteInfo
private boolean isNeedUpdateTopicRouteInfo(final String topic) {boolean result = false;{// 遍歷生產者Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();while (it.hasNext() && !result) {Entry<String, MQProducerInner> entry = it.next();MQProducerInner impl = entry.getValue();if (impl != null) {// 獲取topic的發佈信息是否須要更新=》result = impl.isPublishTopicNeedUpdate(topic);}}}{// 遍歷消費者Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();while (it.hasNext() && !result) {Entry<String, MQConsumerInner> entry = it.next();MQConsumerInner impl = entry.getValue();if (impl != null) {// 訂閱的信息是否須要更新=》result = impl.isSubscribeTopicNeedUpdate(topic);}}}return result; }
進入這個方法,獲取topic的發佈信息是否須要更新,org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#isPublishTopicNeedUpdate
@Overridepublic boolean isPublishTopicNeedUpdate(String topic) {TopicPublishInfo prev = this.topicPublishInfoTable.get(topic);return null == prev || !prev.ok(); }
往上返回到這個方法,訂閱的信息是否須要更新,org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl#isSubscribeTopicNeedUpdate
@Overridepublic boolean isSubscribeTopicNeedUpdate(String topic) {Map<String, SubscriptionData> subTable = this.rebalanceImpl.getSubscriptionInner();if (subTable != null) {if (subTable.containsKey(topic)) {return !this.rebalanceImpl.topicSubscribeInfoTable.containsKey(topic);}}return false; }
往上返回到這個方法,獲取消費者,org.apache.rocketmq.client.impl.MQClientAPIImpl#getConsumerIdListByGroup
public List<String> getConsumerIdListByGroup(final String addr,final String consumerGroup,final long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,MQBrokerException, InterruptedException {GetConsumerListByGroupRequestHeader requestHeader = new GetConsumerListByGroupRequestHeader();requestHeader.setConsumerGroup(consumerGroup);RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_LIST_BY_GROUP, requestHeader);// =》RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),request, timeoutMillis);assert response != null;switch (response.getCode()) {case ResponseCode.SUCCESS: {if (response.getBody() != null) {GetConsumerListByGroupResponseBody body =GetConsumerListByGroupResponseBody.decode(response.getBody(), GetConsumerListByGroupResponseBody.class);return body.getConsumerIdList();}}default:break;}throw new MQBrokerException(response.getCode(), response.getRemark()); }
往上返回到這個方法,消費隊列分配策略算法,org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy#allocate
List<MessageQueue> allocate(final String consumerGroup,final String currentCID,final List<MessageQueue> mqAll,final List<String> cidAll );
有如下幾種策略
AllocateMachineRoomNearby 一種基於機房近端優先級的分配策略代理
AllocateMessageQueueAveragely 平均散列隊列算法
AllocateMessageQueueAveragelyByCircle 循環平均哈希隊列算法
AllocateMessageQueueByConfig 按配置分配消息隊列
AllocateMessageQueueByMachineRoom 機房哈希隊列算法
AllocateMessageQueueConsistentHash 一致性哈希隊列算法
每一個算法這裏就不作詳細介紹了。
往上返回到這個方法,更新處理隊列,org.apache.rocketmq.client.impl.consumer.RebalanceImpl#updateProcessQueueTableInRebalance 前面介紹過了。
進入這個方法,消息隊列更新,org.apache.rocketmq.client.impl.consumer.RebalancePullImpl#messageQueueChanged
@Overridepublic void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {MessageQueueListener messageQueueListener = this.defaultMQPullConsumerImpl.getDefaultMQPullConsumer().getMessageQueueListener();if (messageQueueListener != null) {try {// 消息隊列監聽器處理=》messageQueueListener.messageQueueChanged(topic, mqAll, mqDivided);} catch (Throwable e) {log.error("messageQueueChanged exception", e);}} }
進入這個方法,org.apache.rocketmq.client.consumer.MQPullConsumerScheduleService.MessageQueueListenerImpl#messageQueueChanged,前面介紹過了。
往上返回到這個方法,根據topic刪除消息隊列,org.apache.rocketmq.client.impl.consumer.RebalanceImpl#truncateMessageQueueNotMyTopic
private void truncateMessageQueueNotMyTopic() {// 獲取訂閱信息Map<String, SubscriptionData> subTable = this.getSubscriptionInner();// 遍歷處理隊列for (MessageQueue mq : this.processQueueTable.keySet()) {if (!subTable.containsKey(mq.getTopic())) {// 刪除處理隊列ProcessQueue pq = this.processQueueTable.remove(mq);if (pq != null) {pq.setDropped(true);log.info("doRebalance, {}, truncateMessageQueueNotMyTopic remove unnecessary mq, {}", consumerGroup, mq);}}} }
本篇介紹了MQPullConsumer的邏輯,下篇介紹MQPushConsumer的邏輯。
說在最後
本次解析僅表明我的觀點,僅供參考。
加入技術微信羣
釘釘技術羣