說在前面java
DefaultMQProducer、DefaultMQPullConsumer、DefaultMQPushConsumer 處理過程算法
源碼解析apache
PushConsumer緩存
public static void main(String[] args) throws InterruptedException, MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");consumer.subscribe("Jodie_topic_1023", "*");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);//wrong time format 2017_0422_221800consumer.setConsumeTimestamp("20170422221800");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.printf("Consumer Started.%n"); }
進入方法,建立DefaultMQPushConsumer,org.apache.rocketmq.client.consumer.DefaultMQPushConsumer#DefaultMQPushConsumer(java.lang.String)微信
public DefaultMQPushConsumer(final String consumerGroup) {// 這裏採用平均散列隊列算法this(consumerGroup, null, new AllocateMessageQueueAveragely()); }
進入方法,org.apache.rocketmq.client.consumer.DefaultMQPushConsumer#subscribe(java.lang.String, java.lang.String)併發
@Overridepublic void subscribe(String topic, String subExpression) throws MQClientException {this.defaultMQPushConsumerImpl.subscribe(topic, subExpression); }
進入方法,org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#subscribe(java.lang.String, java.lang.String)app
public void subscribe(String topic, String subExpression) throws MQClientException {try {// 構建topic訂閱數據,默認集羣消費=》SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),topic, subExpression);// 存儲topic訂閱數據=》this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);if (this.mQClientFactory != null) {// 同步向全部的broker發送心跳監測=》this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();}} catch (Exception e) {throw new MQClientException("subscription exception", e);} }
進入方法,構建topic訂閱數據,默認集羣消費,org.apache.rocketmq.common.filter.FilterAPI#buildSubscriptionData介紹過了。負載均衡
進入方法,同步向全部的broker發送心跳監測,org.apache.rocketmq.client.impl.factory.MQClientInstance#sendHeartbeatToAllBrokerWithLock介紹過了。ide
進入方法,org.apache.rocketmq.client.consumer.DefaultMQPushConsumer#startoop
public synchronized void start() throws MQClientException {switch (this.serviceState) {case CREATE_JUST:log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());this.serviceState = ServiceState.START_FAILED;// 檢查配置=》this.checkConfig();// copy訂閱配置=》this.copySubscription();if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {this.defaultMQPushConsumer.changeInstanceNameToPID();}// 建立消費者this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);this.pullAPIWrapper = new PullAPIWrapper(mQClientFactory,this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());// 註冊過濾消息的鉤子方法this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);if (this.defaultMQPushConsumer.getOffsetStore() != null) {this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();} else {switch (this.defaultMQPushConsumer.getMessageModel()) {case BROADCASTING:// 廣播 本地存儲this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());break;case CLUSTERING:// 集羣 遠程存儲this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());break;default:break;}this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);}// 加載offset=》this.offsetStore.load();if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {this.consumeOrderly = true;this.consumeMessageService =new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {this.consumeOrderly = false;this.consumeMessageService =new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());}// 消息消息服務啓動=》this.consumeMessageService.start();boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);if (!registerOK) {this.serviceState = ServiceState.CREATE_JUST;this.consumeMessageService.shutdown();throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),null);}mQClientFactory.start();log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());this.serviceState = ServiceState.RUNNING;break;case RUNNING:case START_FAILED:case SHUTDOWN_ALREADY:throw new MQClientException("The PushConsumer service state not OK, maybe started once, "+ this.serviceState+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),null);default:break;}this.updateTopicSubscribeInfoWhenSubscriptionChanged();this.mQClientFactory.checkClientInBroker();this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();this.mQClientFactory.rebalanceImmediately(); }
進入方法,檢查配置,org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#checkConfig
private void checkConfig() throws MQClientException {Validators.checkGroup(this.defaultMQPushConsumer.getConsumerGroup());if (null == this.defaultMQPushConsumer.getConsumerGroup()) {throw new MQClientException("consumerGroup is null"+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);}// 消費組名稱不能和DEFAULT_CONSUMER同樣if (this.defaultMQPushConsumer.getConsumerGroup().equals(MixAll.DEFAULT_CONSUMER_GROUP)) {throw new MQClientException("consumerGroup can not equal "+ MixAll.DEFAULT_CONSUMER_GROUP+ ", please specify another one."+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);}// 必須指定消息消費模式,默認集羣消費if (null == this.defaultMQPushConsumer.getMessageModel()) {throw new MQClientException("messageModel is null"+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);}// 默認從最後的offset消費if (null == this.defaultMQPushConsumer.getConsumeFromWhere()) {throw new MQClientException("consumeFromWhere is null"+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);}// 默認消費時間半小時以前Date dt = UtilAll.parseDate(this.defaultMQPushConsumer.getConsumeTimestamp(), UtilAll.YYYYMMDDHHMMSS);if (null == dt) {throw new MQClientException("consumeTimestamp is invalid, the valid format is yyyyMMddHHmmss,but received "+ this.defaultMQPushConsumer.getConsumeTimestamp()+ " " + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), null);}// allocateMessageQueueStrategy 能夠執行消息分配算法if (null == this.defaultMQPushConsumer.getAllocateMessageQueueStrategy()) {throw new MQClientException("allocateMessageQueueStrategy is null"+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);}// subscriptionif (null == this.defaultMQPushConsumer.getSubscription()) {throw new MQClientException("subscription is null"+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);}// messageListenerif (null == this.defaultMQPushConsumer.getMessageListener()) {throw new MQClientException("messageListener is null"+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);}// 書否順序消費boolean orderly = this.defaultMQPushConsumer.getMessageListener() instanceof MessageListenerOrderly;// 是否集羣消費boolean concurrently = this.defaultMQPushConsumer.getMessageListener() instanceof MessageListenerConcurrently;if (!orderly && !concurrently) {throw new MQClientException("messageListener must be instanceof MessageListenerOrderly or MessageListenerConcurrently"+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);}// consumeThreadMin 消費最小線程數大於1小於1000,默認20if (this.defaultMQPushConsumer.getConsumeThreadMin() < 1|| this.defaultMQPushConsumer.getConsumeThreadMin() > 1000) {throw new MQClientException("consumeThreadMin Out of range [1, 1000]"+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);}// consumeThreadMax 消費最大線程數大於1小於1000,默認64if (this.defaultMQPushConsumer.getConsumeThreadMax() < 1 || this.defaultMQPushConsumer.getConsumeThreadMax() > 1000) {throw new MQClientException("consumeThreadMax Out of range [1, 1000]"+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);}// consumeThreadMin can't be larger than consumeThreadMax 消費最小線程數不能大於最大線程數if (this.defaultMQPushConsumer.getConsumeThreadMin() > this.defaultMQPushConsumer.getConsumeThreadMax()) {throw new MQClientException("consumeThreadMin (" + this.defaultMQPushConsumer.getConsumeThreadMin() + ") "+ "is larger than consumeThreadMax (" + this.defaultMQPushConsumer.getConsumeThreadMax() + ")",null);}// consumeConcurrentlyMaxSpan 併發消費最大 大於1小於65535,默認2000if (this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan() < 1|| this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan() > 65535) {throw new MQClientException("consumeConcurrentlyMaxSpan Out of range [1, 65535]"+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);}// pullThresholdForQueue 拉取消息批量大小 大於1小於65536 默認1000if (this.defaultMQPushConsumer.getPullThresholdForQueue() < 1 || this.defaultMQPushConsumer.getPullThresholdForQueue() > 65535) {throw new MQClientException("pullThresholdForQueue Out of range [1, 65535]"+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);}// pullThresholdForTopicif (this.defaultMQPushConsumer.getPullThresholdForTopic() != -1) {if (this.defaultMQPushConsumer.getPullThresholdForTopic() < 1 || this.defaultMQPushConsumer.getPullThresholdForTopic() > 6553500) {throw new MQClientException("pullThresholdForTopic Out of range [1, 6553500]"+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);}}// pullThresholdSizeForQueue 拉取消息批量大小 大於1M小於1G,默認100if (this.defaultMQPushConsumer.getPullThresholdSizeForQueue() < 1 || this.defaultMQPushConsumer.getPullThresholdSizeForQueue() > 1024) {throw new MQClientException("pullThresholdSizeForQueue Out of range [1, 1024]"+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);}if (this.defaultMQPushConsumer.getPullThresholdSizeForTopic() != -1) {// pullThresholdSizeForTopicif (this.defaultMQPushConsumer.getPullThresholdSizeForTopic() < 1 || this.defaultMQPushConsumer.getPullThresholdSizeForTopic() > 102400) {throw new MQClientException("pullThresholdSizeForTopic Out of range [1, 102400]"+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);}}// pullInterval 拉取消息頻次 大於0小於65535 默認0if (this.defaultMQPushConsumer.getPullInterval() < 0 || this.defaultMQPushConsumer.getPullInterval() > 65535) {throw new MQClientException("pullInterval Out of range [0, 65535]"+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);}// consumeMessageBatchMaxSize 消費消息批量最大 大於1小於1024 默認1if (this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize() < 1|| this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize() > 1024) {throw new MQClientException("consumeMessageBatchMaxSize Out of range [1, 1024]"+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);}// pullBatchSize 批量拉取大小 大於1小於1024,默認32if (this.defaultMQPushConsumer.getPullBatchSize() < 1 || this.defaultMQPushConsumer.getPullBatchSize() > 1024) {throw new MQClientException("pullBatchSize Out of range [1, 1024]"+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);} }
返回方法,copy訂閱配置,org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#copySubscription
private void copySubscription() throws MQClientException {try {Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();if (sub != null) {for (final Map.Entry<String, String> entry : sub.entrySet()) {final String topic = entry.getKey();final String subString = entry.getValue();// 構建訂閱配置=》SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),topic, subString);// 存儲訂閱配置=》this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);}}if (null == this.messageListenerInner) {this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();}switch (this.defaultMQPushConsumer.getMessageModel()) {// 廣播case BROADCASTING:break;// 集羣消費case CLUSTERING:// 重試topicfinal String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());// 構建重試topic訂閱配置SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),retryTopic, SubscriptionData.SUB_ALL);// 存儲訂閱配置=》this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);break;default:break;}} catch (Exception e) {throw new MQClientException("subscription exception", e);} }
返回方法,消息消息服務啓動,org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#start
public void start() {this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {// 清除過時的消息=》cleanExpireMsg();}// 15min 消費超時}, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES); }
進入方法,清除過時的消息,org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#cleanExpireMsg
private void cleanExpireMsg() {Iterator<Map.Entry<MessageQueue, ProcessQueue>> it =this.defaultMQPushConsumerImpl.getRebalanceImpl().getProcessQueueTable().entrySet().iterator();while (it.hasNext()) {Map.Entry<MessageQueue, ProcessQueue> next = it.next();ProcessQueue pq = next.getValue();// =》pq.cleanExpiredMsg(this.defaultMQPushConsumer);} }
進入方法,org.apache.rocketmq.client.impl.consumer.ProcessQueue#cleanExpiredMsg
public void cleanExpiredMsg(DefaultMQPushConsumer pushConsumer) {if (pushConsumer.getDefaultMQPushConsumerImpl().isConsumeOrderly()) {return;}int loop = msgTreeMap.size() < 16 ? msgTreeMap.size() : 16;for (int i = 0; i < loop; i++) {MessageExt msg = null;try {this.lockTreeMap.readLock().lockInterruptibly();try {if (!msgTreeMap.isEmpty() && System.currentTimeMillis() - Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msgTreeMap.firstEntry().getValue())) > pushConsumer.getConsumeTimeout() * 60 * 1000) {msg = msgTreeMap.firstEntry().getValue();} else {break;}} finally {this.lockTreeMap.readLock().unlock();}} catch (InterruptedException e) {log.error("getExpiredMsg exception", e);}try {// 發送消息=》pushConsumer.sendMessageBack(msg, 3);log.info("send expire msg back. topic={}, msgId={}, storeHost={}, queueId={}, queueOffset={}", msg.getTopic(), msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(), msg.getQueueOffset());try {this.lockTreeMap.writeLock().lockInterruptibly();try {if (!msgTreeMap.isEmpty() && msg.getQueueOffset() == msgTreeMap.firstKey()) {try {removeMessage(Collections.singletonList(msg));} catch (Exception e) {log.error("send expired msg exception", e);}}} finally {this.lockTreeMap.writeLock().unlock();}} catch (InterruptedException e) {log.error("getExpiredMsg exception", e);}} catch (Exception e) {log.error("send expired msg exception", e);}} }
進入方法,發送消息,org.apache.rocketmq.client.consumer.DefaultMQPushConsumer#sendMessageBack(org.apache.rocketmq.common.message.MessageExt, int)
@Overridepublic void sendMessageBack(MessageExt msg, int delayLevel)throws RemotingException, MQBrokerException, InterruptedException, MQClientException {this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, null); }
進入方法,org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#sendMessageBack
public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)throws RemotingException, MQBrokerException, InterruptedException, MQClientException {try {// 按brokerName找到master broker地址String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName): RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());// =》this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg,// 最大消費次數,默認16this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());} catch (Exception e) {log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e);// 建立重試topic消息Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());String originMsgId = MessageAccessor.getOriginMessageId(msg);MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);newMsg.setFlag(msg.getFlag());MessageAccessor.setProperties(newMsg, msg.getProperties());MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1));MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());// 發送消息=》this.mQClientFactory.getDefaultMQProducer().send(newMsg);} }
進入方法,org.apache.rocketmq.client.impl.MQClientAPIImpl#consumerSendMessageBack
public void consumerSendMessageBack(final String addr,final MessageExt msg,final String consumerGroup,final int delayLevel,final long timeoutMillis,final int maxConsumeRetryTimes) throws RemotingException, MQBrokerException, InterruptedException {ConsumerSendMsgBackRequestHeader requestHeader = new ConsumerSendMsgBackRequestHeader();RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader);requestHeader.setGroup(consumerGroup);requestHeader.setOriginTopic(msg.getTopic());requestHeader.setOffset(msg.getCommitLogOffset());requestHeader.setDelayLevel(delayLevel);requestHeader.setOriginMsgId(msg.getMsgId());requestHeader.setMaxReconsumeTimes(maxConsumeRetryTimes);// 同步執行RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),request, timeoutMillis);assert response != null;switch (response.getCode()) {case ResponseCode.SUCCESS: {return;}default:break;}throw new MQBrokerException(response.getCode(), response.getRemark()); }
返回方法,發送消息,org.apache.rocketmq.client.producer.DefaultMQProducer#send(org.apache.rocketmq.common.message.Message)
@Overridepublic SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {// =》return this.defaultMQProducerImpl.send(msg); }
進入方法,org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#send(org.apache.rocketmq.common.message.Message)介紹過了。
返回方法,org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService#start
public void start() {if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {// 鎖定消費隊列=》ConsumeMessageOrderlyService.this.lockMQPeriodically();}}, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);} }
進入方法,鎖定消費隊列,org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService#lockMQPeriodically
public synchronized void lockMQPeriodically() {if (!this.stopped) {// =》this.defaultMQPushConsumerImpl.getRebalanceImpl().lockAll();} }
進入方法,org.apache.rocketmq.client.impl.consumer.RebalanceImpl#lockAll
public void lockAll() {// 按brokerName構建處理隊列=》HashMap<String, Set<MessageQueue>> brokerMqs = this.buildProcessQueueTableByBrokerName();Iterator<Entry<String, Set<MessageQueue>>> it = brokerMqs.entrySet().iterator();while (it.hasNext()) {Entry<String, Set<MessageQueue>> entry = it.next();final String brokerName = entry.getKey();final Set<MessageQueue> mqs = entry.getValue();if (mqs.isEmpty())continue;// 找到broker master地址=》FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, true);if (findBrokerResult != null) {LockBatchRequestBody requestBody = new LockBatchRequestBody();requestBody.setConsumerGroup(this.consumerGroup);requestBody.setClientId(this.mQClientFactory.getClientId());requestBody.setMqSet(mqs);try {// 批量鎖定消息隊列=》Set<MessageQueue> lockOKMQSet =this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);for (MessageQueue mq : lockOKMQSet) {ProcessQueue processQueue = this.processQueueTable.get(mq);if (processQueue != null) {if (!processQueue.isLocked()) {log.info("the message queue locked OK, Group: {} {}", this.consumerGroup, mq);}// 鎖定處理隊列processQueue.setLocked(true);processQueue.setLastLockTimestamp(System.currentTimeMillis());}}// 消息隊列鎖定失敗=》for (MessageQueue mq : mqs) {if (!lockOKMQSet.contains(mq)) {ProcessQueue processQueue = this.processQueueTable.get(mq);if (processQueue != null) {processQueue.setLocked(false);log.warn("the message queue locked Failed, Group: {} {}", this.consumerGroup, mq);}}}} catch (Exception e) {log.error("lockBatchMQ exception, " + mqs, e);}}} }
進入方法,按brokerName構建處理隊列,org.apache.rocketmq.client.impl.consumer.RebalanceImpl#buildProcessQueueTableByBrokerName
private HashMap<String/* brokerName */, Set<MessageQueue>> buildProcessQueueTableByBrokerName() {HashMap<String, Set<MessageQueue>> result = new HashMap<String, Set<MessageQueue>>();for (MessageQueue mq : this.processQueueTable.keySet()) {Set<MessageQueue> mqs = result.get(mq.getBrokerName());if (null == mqs) {mqs = new HashSet<MessageQueue>();result.put(mq.getBrokerName(), mqs);}mqs.add(mq);}return result; }
返回方法,找到broker master地址,org.apache.rocketmq.client.impl.factory.MQClientInstance#findBrokerAddressInSubscribe
public FindBrokerResult findBrokerAddressInSubscribe(final String brokerName,final long brokerId,final boolean onlyThisBroker) {String brokerAddr = null;boolean slave = false;boolean found = false;// 獲取broker的緩存信息HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);if (map != null && !map.isEmpty()) {brokerAddr = map.get(brokerId);slave = brokerId != MixAll.MASTER_ID;found = brokerAddr != null;if (!found && !onlyThisBroker) {Entry<Long, String> entry = map.entrySet().iterator().next();brokerAddr = entry.getValue();slave = entry.getKey() != MixAll.MASTER_ID;found = true;}}if (found) {return new FindBrokerResult(brokerAddr, slave, findBrokerVersion(brokerName, brokerAddr));}return null;}// 獲取broker的版本信息public int findBrokerVersion(String brokerName, String brokerAddr) {if (this.brokerVersionTable.containsKey(brokerName)) {if (this.brokerVersionTable.get(brokerName).containsKey(brokerAddr)) {return this.brokerVersionTable.get(brokerName).get(brokerAddr);}}return 0; }
返回方法,批量鎖定消息隊列,org.apache.rocketmq.client.impl.MQClientAPIImpl#lockBatchMQ
public Set<MessageQueue> lockBatchMQ(final String addr,final LockBatchRequestBody requestBody,final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException {RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.LOCK_BATCH_MQ, null);request.setBody(requestBody.encode());// 同步執行=》RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),request, timeoutMillis);switch (response.getCode()) {case ResponseCode.SUCCESS: {LockBatchResponseBody responseBody = LockBatchResponseBody.decode(response.getBody(), LockBatchResponseBody.class);Set<MessageQueue> messageQueues = responseBody.getLockOKMQSet();return messageQueues;}default:break;}throw new MQBrokerException(response.getCode(), response.getRemark()); }
進入方法,org.apache.rocketmq.client.impl.factory.MQClientInstance#start介紹過了。
進入方法,org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#updateTopicSubscribeInfoWhenSubscriptionChanged
private void updateTopicSubscribeInfoWhenSubscriptionChanged() {Map<String, SubscriptionData> subTable = this.getSubscriptionInner();if (subTable != null) {for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {final String topic = entry.getKey();this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);}} }
進入方法,org.apache.rocketmq.client.impl.factory.MQClientInstance#updateTopicRouteInfoFromNameServer(java.lang.String, boolean, org.apache.rocketmq.client.producer.DefaultMQProducer)介紹過了。
進入方法,org.apache.rocketmq.client.impl.factory.MQClientInstance#sendHeartbeatToAllBrokerWithLock介紹過了。
進入方法,org.apache.rocketmq.client.impl.factory.MQClientInstance#rebalanceImmediately
public void rebalanceImmediately() {this.rebalanceService.wakeup(); }
進入方法,負載均衡處理,org.apache.rocketmq.client.impl.factory.MQClientInstance#doRebalance 前面介紹過了,請翻閱前面章節。
返回方法,
org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start結束。
說在最後
本次解析僅表明我的觀點,僅供參考。
加入技術微信羣
釘釘技術羣