rocketmq源碼解析consumer、producer處理過程④

說在前面java

DefaultMQProducer、DefaultMQPullConsumer、DefaultMQPushConsumer 處理過程算法

 

源碼解析apache

PushConsumer緩存

public static void main(String[] args) throws InterruptedException, MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");consumer.subscribe("Jodie_topic_1023", "*");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);//wrong time format 2017_0422_221800consumer.setConsumeTimestamp("20170422221800");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.printf("Consumer Started.%n");    }

進入方法,建立DefaultMQPushConsumer,org.apache.rocketmq.client.consumer.DefaultMQPushConsumer#DefaultMQPushConsumer(java.lang.String)微信

public DefaultMQPushConsumer(final String consumerGroup) {//        這裏採用平均散列隊列算法this(consumerGroup, null, new AllocateMessageQueueAveragely());    }

進入方法,org.apache.rocketmq.client.consumer.DefaultMQPushConsumer#subscribe(java.lang.String, java.lang.String)併發

@Overridepublic void subscribe(String topic, String subExpression) throws MQClientException {this.defaultMQPushConsumerImpl.subscribe(topic, subExpression);    }

進入方法,org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#subscribe(java.lang.String, java.lang.String)app

public void subscribe(String topic, String subExpression) throws MQClientException {try {//            構建topic訂閱數據,默認集羣消費=》SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),topic, subExpression);//            存儲topic訂閱數據=》this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);if (this.mQClientFactory != null) {//                同步向全部的broker發送心跳監測=》this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();}} catch (Exception e) {throw new MQClientException("subscription exception", e);}    }

進入方法,構建topic訂閱數據,默認集羣消費,org.apache.rocketmq.common.filter.FilterAPI#buildSubscriptionData介紹過了。負載均衡

進入方法,同步向全部的broker發送心跳監測,org.apache.rocketmq.client.impl.factory.MQClientInstance#sendHeartbeatToAllBrokerWithLock介紹過了。ide

進入方法,org.apache.rocketmq.client.consumer.DefaultMQPushConsumer#startoop












public synchronized void start() throws MQClientException {switch (this.serviceState) {case CREATE_JUST:log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());this.serviceState = ServiceState.START_FAILED;//                檢查配置=》this.checkConfig();//                copy訂閱配置=》this.copySubscription();if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {this.defaultMQPushConsumer.changeInstanceNameToPID();}//                建立消費者this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);this.pullAPIWrapper = new PullAPIWrapper(mQClientFactory,this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());//                註冊過濾消息的鉤子方法this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);if (this.defaultMQPushConsumer.getOffsetStore() != null) {this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();} else {switch (this.defaultMQPushConsumer.getMessageModel()) {case BROADCASTING://                            廣播 本地存儲this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());break;case CLUSTERING://                            集羣 遠程存儲this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());break;default:break;}this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);}//                加載offset=》this.offsetStore.load();if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {this.consumeOrderly = true;this.consumeMessageService =new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {this.consumeOrderly = false;this.consumeMessageService =new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());}//                消息消息服務啓動=》this.consumeMessageService.start();boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);if (!registerOK) {this.serviceState = ServiceState.CREATE_JUST;this.consumeMessageService.shutdown();throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),null);}mQClientFactory.start();log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());this.serviceState = ServiceState.RUNNING;break;case RUNNING:case START_FAILED:case SHUTDOWN_ALREADY:throw new MQClientException("The PushConsumer service state not OK, maybe started once, "+ this.serviceState+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),null);default:break;}this.updateTopicSubscribeInfoWhenSubscriptionChanged();this.mQClientFactory.checkClientInBroker();this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();this.mQClientFactory.rebalanceImmediately();    }

進入方法,檢查配置,org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#checkConfig




















private void checkConfig() throws MQClientException {Validators.checkGroup(this.defaultMQPushConsumer.getConsumerGroup());if (null == this.defaultMQPushConsumer.getConsumerGroup()) {throw new MQClientException("consumerGroup is null"+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);}//        消費組名稱不能和DEFAULT_CONSUMER同樣if (this.defaultMQPushConsumer.getConsumerGroup().equals(MixAll.DEFAULT_CONSUMER_GROUP)) {throw new MQClientException("consumerGroup can not equal "+ MixAll.DEFAULT_CONSUMER_GROUP+ ", please specify another one."+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);}//        必須指定消息消費模式,默認集羣消費if (null == this.defaultMQPushConsumer.getMessageModel()) {throw new MQClientException("messageModel is null"+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);}//        默認從最後的offset消費if (null == this.defaultMQPushConsumer.getConsumeFromWhere()) {throw new MQClientException("consumeFromWhere is null"+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);}//        默認消費時間半小時以前Date dt = UtilAll.parseDate(this.defaultMQPushConsumer.getConsumeTimestamp(), UtilAll.YYYYMMDDHHMMSS);if (null == dt) {throw new MQClientException("consumeTimestamp is invalid, the valid format is yyyyMMddHHmmss,but received "+ this.defaultMQPushConsumer.getConsumeTimestamp()+ " " + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), null);}// allocateMessageQueueStrategy 能夠執行消息分配算法if (null == this.defaultMQPushConsumer.getAllocateMessageQueueStrategy()) {throw new MQClientException("allocateMessageQueueStrategy is null"+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);}// subscriptionif (null == this.defaultMQPushConsumer.getSubscription()) {throw new MQClientException("subscription is null"+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);}// messageListenerif (null == this.defaultMQPushConsumer.getMessageListener()) {throw new MQClientException("messageListener is null"+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);}//        書否順序消費boolean orderly = this.defaultMQPushConsumer.getMessageListener() instanceof MessageListenerOrderly;//        是否集羣消費boolean concurrently = this.defaultMQPushConsumer.getMessageListener() instanceof MessageListenerConcurrently;if (!orderly && !concurrently) {throw new MQClientException("messageListener must be instanceof MessageListenerOrderly or MessageListenerConcurrently"+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);}// consumeThreadMin 消費最小線程數大於1小於1000,默認20if (this.defaultMQPushConsumer.getConsumeThreadMin() < 1|| this.defaultMQPushConsumer.getConsumeThreadMin() > 1000) {throw new MQClientException("consumeThreadMin Out of range [1, 1000]"+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);}// consumeThreadMax 消費最大線程數大於1小於1000,默認64if (this.defaultMQPushConsumer.getConsumeThreadMax() < 1 || this.defaultMQPushConsumer.getConsumeThreadMax() > 1000) {throw new MQClientException("consumeThreadMax Out of range [1, 1000]"+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);}// consumeThreadMin can't be larger than consumeThreadMax 消費最小線程數不能大於最大線程數if (this.defaultMQPushConsumer.getConsumeThreadMin() > this.defaultMQPushConsumer.getConsumeThreadMax()) {throw new MQClientException("consumeThreadMin (" + this.defaultMQPushConsumer.getConsumeThreadMin() + ") "+ "is larger than consumeThreadMax (" + this.defaultMQPushConsumer.getConsumeThreadMax() + ")",null);}// consumeConcurrentlyMaxSpan 併發消費最大 大於1小於65535,默認2000if (this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan() < 1|| this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan() > 65535) {throw new MQClientException("consumeConcurrentlyMaxSpan Out of range [1, 65535]"+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);}// pullThresholdForQueue 拉取消息批量大小 大於1小於65536 默認1000if (this.defaultMQPushConsumer.getPullThresholdForQueue() < 1 || this.defaultMQPushConsumer.getPullThresholdForQueue() > 65535) {throw new MQClientException("pullThresholdForQueue Out of range [1, 65535]"+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);}// pullThresholdForTopicif (this.defaultMQPushConsumer.getPullThresholdForTopic() != -1) {if (this.defaultMQPushConsumer.getPullThresholdForTopic() < 1 || this.defaultMQPushConsumer.getPullThresholdForTopic() > 6553500) {throw new MQClientException("pullThresholdForTopic Out of range [1, 6553500]"+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);}}// pullThresholdSizeForQueue 拉取消息批量大小 大於1M小於1G,默認100if (this.defaultMQPushConsumer.getPullThresholdSizeForQueue() < 1 || this.defaultMQPushConsumer.getPullThresholdSizeForQueue() > 1024) {throw new MQClientException("pullThresholdSizeForQueue Out of range [1, 1024]"+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);}if (this.defaultMQPushConsumer.getPullThresholdSizeForTopic() != -1) {// pullThresholdSizeForTopicif (this.defaultMQPushConsumer.getPullThresholdSizeForTopic() < 1 || this.defaultMQPushConsumer.getPullThresholdSizeForTopic() > 102400) {throw new MQClientException("pullThresholdSizeForTopic Out of range [1, 102400]"+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);}}// pullInterval 拉取消息頻次 大於0小於65535 默認0if (this.defaultMQPushConsumer.getPullInterval() < 0 || this.defaultMQPushConsumer.getPullInterval() > 65535) {throw new MQClientException("pullInterval Out of range [0, 65535]"+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);}// consumeMessageBatchMaxSize 消費消息批量最大 大於1小於1024 默認1if (this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize() < 1|| this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize() > 1024) {throw new MQClientException("consumeMessageBatchMaxSize Out of range [1, 1024]"+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);}// pullBatchSize 批量拉取大小 大於1小於1024,默認32if (this.defaultMQPushConsumer.getPullBatchSize() < 1 || this.defaultMQPushConsumer.getPullBatchSize() > 1024) {throw new MQClientException("pullBatchSize Out of range [1, 1024]"+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);}    }

返回方法,copy訂閱配置,org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#copySubscription


private void copySubscription() throws MQClientException {try {Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();if (sub != null) {for (final Map.Entry<String, String> entry : sub.entrySet()) {final String topic = entry.getKey();final String subString = entry.getValue();//                    構建訂閱配置=》SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),topic, subString);//                    存儲訂閱配置=》this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);}}if (null == this.messageListenerInner) {this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();}switch (this.defaultMQPushConsumer.getMessageModel()) {//                廣播case BROADCASTING:break;//                    集羣消費case CLUSTERING://                    重試topicfinal String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());//                    構建重試topic訂閱配置SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),retryTopic, SubscriptionData.SUB_ALL);//                    存儲訂閱配置=》this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);break;default:break;}} catch (Exception e) {throw new MQClientException("subscription exception", e);}    }

返回方法,消息消息服務啓動,org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#start

public void start() {this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {//                清除過時的消息=》cleanExpireMsg();}//          15min 消費超時}, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES);    }

進入方法,清除過時的消息,org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#cleanExpireMsg

private void cleanExpireMsg() {Iterator<Map.Entry<MessageQueue, ProcessQueue>> it =this.defaultMQPushConsumerImpl.getRebalanceImpl().getProcessQueueTable().entrySet().iterator();while (it.hasNext()) {Map.Entry<MessageQueue, ProcessQueue> next = it.next();ProcessQueue pq = next.getValue();//            =》pq.cleanExpiredMsg(this.defaultMQPushConsumer);}    }

進入方法,org.apache.rocketmq.client.impl.consumer.ProcessQueue#cleanExpiredMsg




public void cleanExpiredMsg(DefaultMQPushConsumer pushConsumer) {if (pushConsumer.getDefaultMQPushConsumerImpl().isConsumeOrderly()) {return;}int loop = msgTreeMap.size() < 16 ? msgTreeMap.size() : 16;for (int i = 0; i < loop; i++) {MessageExt msg = null;try {this.lockTreeMap.readLock().lockInterruptibly();try {if (!msgTreeMap.isEmpty() && System.currentTimeMillis() - Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msgTreeMap.firstEntry().getValue())) > pushConsumer.getConsumeTimeout() * 60 * 1000) {msg = msgTreeMap.firstEntry().getValue();} else {break;}} finally {this.lockTreeMap.readLock().unlock();}} catch (InterruptedException e) {log.error("getExpiredMsg exception", e);}try {//                發送消息=》pushConsumer.sendMessageBack(msg, 3);log.info("send expire msg back. topic={}, msgId={}, storeHost={}, queueId={}, queueOffset={}", msg.getTopic(), msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(), msg.getQueueOffset());try {this.lockTreeMap.writeLock().lockInterruptibly();try {if (!msgTreeMap.isEmpty() && msg.getQueueOffset() == msgTreeMap.firstKey()) {try {removeMessage(Collections.singletonList(msg));} catch (Exception e) {log.error("send expired msg exception", e);}}} finally {this.lockTreeMap.writeLock().unlock();}} catch (InterruptedException e) {log.error("getExpiredMsg exception", e);}} catch (Exception e) {log.error("send expired msg exception", e);}}    }

進入方法,發送消息,org.apache.rocketmq.client.consumer.DefaultMQPushConsumer#sendMessageBack(org.apache.rocketmq.common.message.MessageExt, int)

@Overridepublic void sendMessageBack(MessageExt msg, int delayLevel)throws RemotingException, MQBrokerException, InterruptedException, MQClientException {this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, null);    }

進入方法,org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#sendMessageBack




public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)throws RemotingException, MQBrokerException, InterruptedException, MQClientException {try {//            按brokerName找到master broker地址String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName): RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());//            =》this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg,//                最大消費次數,默認16this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());} catch (Exception e) {log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e);//            建立重試topic消息Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());String originMsgId = MessageAccessor.getOriginMessageId(msg);MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);newMsg.setFlag(msg.getFlag());MessageAccessor.setProperties(newMsg, msg.getProperties());MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1));MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());//            發送消息=》this.mQClientFactory.getDefaultMQProducer().send(newMsg);}    }

進入方法,org.apache.rocketmq.client.impl.MQClientAPIImpl#consumerSendMessageBack



public void consumerSendMessageBack(final String addr,final MessageExt msg,final String consumerGroup,final int delayLevel,final long timeoutMillis,final int maxConsumeRetryTimes) throws RemotingException, MQBrokerException, InterruptedException {ConsumerSendMsgBackRequestHeader requestHeader = new ConsumerSendMsgBackRequestHeader();RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader);requestHeader.setGroup(consumerGroup);requestHeader.setOriginTopic(msg.getTopic());requestHeader.setOffset(msg.getCommitLogOffset());requestHeader.setDelayLevel(delayLevel);requestHeader.setOriginMsgId(msg.getMsgId());requestHeader.setMaxReconsumeTimes(maxConsumeRetryTimes);//        同步執行RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),request, timeoutMillis);assert response != null;switch (response.getCode()) {case ResponseCode.SUCCESS: {return;}default:break;}throw new MQBrokerException(response.getCode(), response.getRemark());    }

返回方法,發送消息,org.apache.rocketmq.client.producer.DefaultMQProducer#send(org.apache.rocketmq.common.message.Message)

@Overridepublic SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {//        =》return this.defaultMQProducerImpl.send(msg);    }

進入方法,org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#send(org.apache.rocketmq.common.message.Message)介紹過了。

返回方法,org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService#start

public void start() {if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {//                    鎖定消費隊列=》ConsumeMessageOrderlyService.this.lockMQPeriodically();}}, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);}    }

進入方法,鎖定消費隊列,org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService#lockMQPeriodically

public synchronized void lockMQPeriodically() {if (!this.stopped) {//            =》this.defaultMQPushConsumerImpl.getRebalanceImpl().lockAll();}    }

進入方法,org.apache.rocketmq.client.impl.consumer.RebalanceImpl#lockAll






public void lockAll() {//        按brokerName構建處理隊列=》HashMap<String, Set<MessageQueue>> brokerMqs = this.buildProcessQueueTableByBrokerName();Iterator<Entry<String, Set<MessageQueue>>> it = brokerMqs.entrySet().iterator();while (it.hasNext()) {Entry<String, Set<MessageQueue>> entry = it.next();final String brokerName = entry.getKey();final Set<MessageQueue> mqs = entry.getValue();if (mqs.isEmpty())continue;//            找到broker master地址=》FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, true);if (findBrokerResult != null) {LockBatchRequestBody requestBody = new LockBatchRequestBody();requestBody.setConsumerGroup(this.consumerGroup);requestBody.setClientId(this.mQClientFactory.getClientId());requestBody.setMqSet(mqs);try {//                    批量鎖定消息隊列=》Set<MessageQueue> lockOKMQSet =this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);for (MessageQueue mq : lockOKMQSet) {ProcessQueue processQueue = this.processQueueTable.get(mq);if (processQueue != null) {if (!processQueue.isLocked()) {log.info("the message queue locked OK, Group: {} {}", this.consumerGroup, mq);}//                            鎖定處理隊列processQueue.setLocked(true);processQueue.setLastLockTimestamp(System.currentTimeMillis());}}//                    消息隊列鎖定失敗=》for (MessageQueue mq : mqs) {if (!lockOKMQSet.contains(mq)) {ProcessQueue processQueue = this.processQueueTable.get(mq);if (processQueue != null) {processQueue.setLocked(false);log.warn("the message queue locked Failed, Group: {} {}", this.consumerGroup, mq);}}}} catch (Exception e) {log.error("lockBatchMQ exception, " + mqs, e);}}}    }

進入方法,按brokerName構建處理隊列,org.apache.rocketmq.client.impl.consumer.RebalanceImpl#buildProcessQueueTableByBrokerName


private HashMap<String/* brokerName */, Set<MessageQueue>> buildProcessQueueTableByBrokerName() {HashMap<String, Set<MessageQueue>> result = new HashMap<String, Set<MessageQueue>>();for (MessageQueue mq : this.processQueueTable.keySet()) {Set<MessageQueue> mqs = result.get(mq.getBrokerName());if (null == mqs) {mqs = new HashSet<MessageQueue>();result.put(mq.getBrokerName(), mqs);}mqs.add(mq);}return result;    }

返回方法,找到broker master地址,org.apache.rocketmq.client.impl.factory.MQClientInstance#findBrokerAddressInSubscribe




public FindBrokerResult findBrokerAddressInSubscribe(final String brokerName,final long brokerId,final boolean onlyThisBroker) {String brokerAddr = null;boolean slave = false;boolean found = false;//        獲取broker的緩存信息HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);if (map != null && !map.isEmpty()) {brokerAddr = map.get(brokerId);slave = brokerId != MixAll.MASTER_ID;found = brokerAddr != null;if (!found && !onlyThisBroker) {Entry<Long, String> entry = map.entrySet().iterator().next();brokerAddr = entry.getValue();slave = entry.getKey() != MixAll.MASTER_ID;found = true;}}if (found) {return new FindBrokerResult(brokerAddr, slave, findBrokerVersion(brokerName, brokerAddr));}return null;}//  獲取broker的版本信息public int findBrokerVersion(String brokerName, String brokerAddr) {if (this.brokerVersionTable.containsKey(brokerName)) {if (this.brokerVersionTable.get(brokerName).containsKey(brokerAddr)) {return this.brokerVersionTable.get(brokerName).get(brokerAddr);}}return 0;    }

返回方法,批量鎖定消息隊列,org.apache.rocketmq.client.impl.MQClientAPIImpl#lockBatchMQ


public Set<MessageQueue> lockBatchMQ(final String addr,final LockBatchRequestBody requestBody,final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException {RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.LOCK_BATCH_MQ, null);request.setBody(requestBody.encode());//        同步執行=》RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),request, timeoutMillis);switch (response.getCode()) {case ResponseCode.SUCCESS: {LockBatchResponseBody responseBody = LockBatchResponseBody.decode(response.getBody(), LockBatchResponseBody.class);Set<MessageQueue> messageQueues = responseBody.getLockOKMQSet();return messageQueues;}default:break;}throw new MQBrokerException(response.getCode(), response.getRemark());    }

進入方法,org.apache.rocketmq.client.impl.factory.MQClientInstance#start介紹過了。

進入方法,org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#updateTopicSubscribeInfoWhenSubscriptionChanged

private void updateTopicSubscribeInfoWhenSubscriptionChanged() {Map<String, SubscriptionData> subTable = this.getSubscriptionInner();if (subTable != null) {for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {final String topic = entry.getKey();this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);}}    }

進入方法,org.apache.rocketmq.client.impl.factory.MQClientInstance#updateTopicRouteInfoFromNameServer(java.lang.String, boolean, org.apache.rocketmq.client.producer.DefaultMQProducer)介紹過了。

進入方法,org.apache.rocketmq.client.impl.factory.MQClientInstance#sendHeartbeatToAllBrokerWithLock介紹過了。

進入方法,org.apache.rocketmq.client.impl.factory.MQClientInstance#rebalanceImmediately

public void rebalanceImmediately() {this.rebalanceService.wakeup();    }

進入方法,負載均衡處理,org.apache.rocketmq.client.impl.factory.MQClientInstance#doRebalance 前面介紹過了,請翻閱前面章節。

返回方法,

org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start結束。

 

說在最後

本次解析僅表明我的觀點,僅供參考。

 

加入技術微信羣

釘釘技術羣

相關文章
相關標籤/搜索