說在前面java
DefaultMQProducer、DefaultMQPullConsumer、DefaultMQPushConsumer 處理過程算法
源碼解析express
PullConsumerapache
public static void main(String[] args) throws MQClientException {DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");consumer.start();Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest1");for (MessageQueue mq : mqs) {System.out.printf("Consume from the queue: %s%n", mq);SINGLE_MQ:while (true) {try {PullResult pullResult =consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);System.out.printf("%s%n", pullResult);putMessageQueueOffset(mq, pullResult.getNextBeginOffset());switch (pullResult.getPullStatus()) {case FOUND:break;case NO_MATCHED_MSG:break;case NO_NEW_MSG:break SINGLE_MQ;case OFFSET_ILLEGAL:break;default:break;}} catch (Exception e) {e.printStackTrace();}}}consumer.shutdown(); }
進入方法,pullConsumer啓動,org.apache.rocketmq.client.consumer.DefaultMQPullConsumer#start緩存
@Overridepublic void start() throws MQClientException {// 服務啓動=》this.defaultMQPullConsumerImpl.start(); }
進入方法,服務啓動,org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl#start微信
public synchronized void start() throws MQClientException {switch (this.serviceState) {case CREATE_JUST:this.serviceState = ServiceState.START_FAILED;// 檢查配置=》this.checkConfig();// copy topic訂閱配置=》this.copySubscription();if (this.defaultMQPullConsumer.getMessageModel() == MessageModel.CLUSTERING) {this.defaultMQPullConsumer.changeInstanceNameToPID();}// 建立mqclient對象=》this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPullConsumer, this.rpcHook);this.rebalanceImpl.setConsumerGroup(this.defaultMQPullConsumer.getConsumerGroup());this.rebalanceImpl.setMessageModel(this.defaultMQPullConsumer.getMessageModel());this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPullConsumer.getAllocateMessageQueueStrategy());this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);this.pullAPIWrapper = new PullAPIWrapper(mQClientFactory,this.defaultMQPullConsumer.getConsumerGroup(), isUnitMode());// 註冊過濾消息鉤子方法this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);if (this.defaultMQPullConsumer.getOffsetStore() != null) {this.offsetStore = this.defaultMQPullConsumer.getOffsetStore();} else {switch (this.defaultMQPullConsumer.getMessageModel()) {// 廣播 本地存儲case BROADCASTING:this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer.getConsumerGroup());break;// 集羣 遠程存儲case CLUSTERING:this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer.getConsumerGroup());break;default:break;}this.defaultMQPullConsumer.setOffsetStore(this.offsetStore);}// offset加載=》this.offsetStore.load();// 註冊消費者=》boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPullConsumer.getConsumerGroup(), this);if (!registerOK) {this.serviceState = ServiceState.CREATE_JUST;throw new MQClientException("The consumer group[" + this.defaultMQPullConsumer.getConsumerGroup()+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),null);}// 服務啓動=》mQClientFactory.start();log.info("the consumer [{}] start OK", this.defaultMQPullConsumer.getConsumerGroup());this.serviceState = ServiceState.RUNNING;break;case RUNNING:case START_FAILED:case SHUTDOWN_ALREADY:throw new MQClientException("The PullConsumer service state not OK, maybe started once, "+ this.serviceState+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),null);default:break;}}
進入方法,檢查配置,org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl#checkConfigapp
private void checkConfig() throws MQClientException {// check consumerGroup =》Validators.checkGroup(this.defaultMQPullConsumer.getConsumerGroup());// consumerGroupif (null == this.defaultMQPullConsumer.getConsumerGroup()) {throw new MQClientException("consumerGroup is null"+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);}// consumerGroup 消費組名稱不能和DEFAULT_CONSUMER同樣if (this.defaultMQPullConsumer.getConsumerGroup().equals(MixAll.DEFAULT_CONSUMER_GROUP)) {throw new MQClientException("consumerGroup can not equal "+ MixAll.DEFAULT_CONSUMER_GROUP+ ", please specify another one."+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);}// messageModel 消費模式不能爲空,默認集羣消費if (null == this.defaultMQPullConsumer.getMessageModel()) {throw new MQClientException("messageModel is null"+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);}// allocateMessageQueueStrategy 默認平均散列隊列算法if (null == this.defaultMQPullConsumer.getAllocateMessageQueueStrategy()) {throw new MQClientException("allocateMessageQueueStrategy is null"+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);}// allocateMessageQueueStrategy 30s超時if (this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() < this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis()) {throw new MQClientException("Long polling mode, the consumer consumerTimeoutMillisWhenSuspend must greater than brokerSuspendMaxTimeMillis"+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);} }
進入方法, copy topic訂閱配置,org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl#copySubscriptiondom
private void copySubscription() throws MQClientException {try {// 全部註冊的topicSet<String> registerTopics = this.defaultMQPullConsumer.getRegisterTopics();if (registerTopics != null) {for (final String topic : registerTopics) {// 構建topic訂閱配置=》SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(),topic, SubscriptionData.SUB_ALL);// 存儲topic訂閱配置=》this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);}}} catch (Exception e) {throw new MQClientException("subscription exception", e);} }
進入方法,構建topic訂閱配置,org.apache.rocketmq.common.filter.FilterAPI#buildSubscriptionData異步
public static SubscriptionData buildSubscriptionData(final String consumerGroup, String topic,String subString) throws Exception {SubscriptionData subscriptionData = new SubscriptionData();subscriptionData.setTopic(topic);subscriptionData.setSubString(subString);// 訂閱全部的topicif (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) {subscriptionData.setSubString(SubscriptionData.SUB_ALL);} else {// 解析tags,多個tag用|分開String[] tags = subString.split("\\|\\|");if (tags.length > 0) {for (String tag : tags) {if (tag.length() > 0) {String trimString = tag.trim();if (trimString.length() > 0) {// 解析訂閱信息中的tagsubscriptionData.getTagsSet().add(trimString);subscriptionData.getCodeSet().add(trimString.hashCode());}}}} else {throw new Exception("subString split error");}}return subscriptionData; }
返回方法,建立mqclient對象,org.apache.rocketmq.client.impl.MQClientManager#getAndCreateMQClientInstance(org.apache.rocketmq.client.ClientConfig, org.apache.rocketmq.remoting.RPCHook)ide
public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {String clientId = clientConfig.buildMQClientId();// 從本地緩存中獲取client對象,簡單的通常會concurrentHashMap當本地緩存,性能很高MQClientInstance instance = this.factoryTable.get(clientId);if (null == instance) {instance =new MQClientInstance(clientConfig.cloneClientConfig(),this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);if (prev != null) {instance = prev;log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);} else {log.info("Created new MQClientInstance for clientId:[{}]", clientId);}}return instance; }
返回方法,offset加載,org.apache.rocketmq.client.consumer.store.LocalFileOffsetStore#load
@Overridepublic void load() throws MQClientException {// 讀取本地的offset=》OffsetSerializeWrapper offsetSerializeWrapper = this.readLocalOffset();if (offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null) {offsetTable.putAll(offsetSerializeWrapper.getOffsetTable());for (MessageQueue mq : offsetSerializeWrapper.getOffsetTable().keySet()) {AtomicLong offset = offsetSerializeWrapper.getOffsetTable().get(mq);log.info("load consumer's offset, {} {} {}",this.groupName,mq,offset.get());}} }
進入方法,org.apache.rocketmq.client.consumer.store.LocalFileOffsetStore#readLocalOffset
private OffsetSerializeWrapper readLocalOffset() throws MQClientException {String content = null;try {content = MixAll.file2String(this.storePath);} catch (IOException e) {log.warn("Load local offset store file exception", e);}// 若是文件中讀取不到,從備份文件中讀取=》if (null == content || content.length() == 0) {return this.readLocalOffsetBak();} else {OffsetSerializeWrapper offsetSerializeWrapper = null;try {offsetSerializeWrapper =OffsetSerializeWrapper.fromJson(content, OffsetSerializeWrapper.class);} catch (Exception e) {log.warn("readLocalOffset Exception, and try to correct", e);return this.readLocalOffsetBak();}return offsetSerializeWrapper;} }
返回方法,註冊消費者,org.apache.rocketmq.client.impl.factory.MQClientInstance#registerConsumer
public boolean registerConsumer(final String group, final MQConsumerInner consumer) {if (null == group || null == consumer) {return false;}MQConsumerInner prev = this.consumerTable.putIfAbsent(group, consumer);if (prev != null) {log.warn("the consumer group[" + group + "] exist already.");return false;}return true; }
進入方法,服務啓動,org.apache.rocketmq.client.impl.factory.MQClientInstance#start 前面介紹過了,客戶以翻閱前面的章節。
進入方法,按topic找到訂閱的消費隊列,org.apache.rocketmq.client.consumer.DefaultMQPullConsumer#fetchSubscribeMessageQueues
@Overridepublic Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClientException {return this.defaultMQPullConsumerImpl.fetchSubscribeMessageQueues(topic); }
進入方法,org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl#fetchSubscribeMessageQueues
public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClientException {this.makeSureStateOK();return this.mQClientFactory.getMQAdminImpl().fetchSubscribeMessageQueues(topic); }
進入方法,org.apache.rocketmq.client.impl.MQAdminImpl#fetchSubscribeMessageQueues
public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClientException {try {// 找到topic的topic路由信息TopicRouteData topicRouteData = this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(topic, timeoutMillis);if (topicRouteData != null) {Set<MessageQueue> mqList = MQClientInstance.topicRouteData2TopicSubscribeInfo(topic, topicRouteData);if (!mqList.isEmpty()) {return mqList;} else {throw new MQClientException("Can not find Message Queue for this topic, " + topic + " Namesrv return empty", null);}}} catch (Exception e) {throw new MQClientException("Can not find Message Queue for this topic, " + topic + FAQUrl.suggestTodo(FAQUrl.MQLIST_NOT_EXIST),e);}throw new MQClientException("Unknow why, Can not find Message Queue for this topic, " + topic, null); }
進入方法,拉取消息,org.apache.rocketmq.client.consumer.DefaultMQPullConsumer#pullBlockIfNotFound(org.apache.rocketmq.common.message.MessageQueue, java.lang.String, long, int)
@Overridepublic PullResult pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums)throws MQClientException, RemotingException, MQBrokerException, InterruptedException {// 拉取消息,若是沒找到阻塞=》return this.defaultMQPullConsumerImpl.pullBlockIfNotFound(mq, subExpression, offset, maxNums); }
進入方法,拉取消息,若是沒找到阻塞,org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl#pullBlockIfNotFound(org.apache.rocketmq.common.message.MessageQueue, java.lang.String, long, int)
public PullResult pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums)throws MQClientException, RemotingException, MQBrokerException, InterruptedException {// =》return this.pullSyncImpl(mq, subExpression, offset, maxNums, true, this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis()); }
進入方法,org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl#pullSyncImpl
private PullResult pullSyncImpl(MessageQueue mq, String subExpression, long offset, int maxNums, boolean block,long timeout)throws MQClientException, RemotingException, MQBrokerException, InterruptedException {this.makeSureStateOK();if (null == mq) {throw new MQClientException("mq is null", null);}if (offset < 0) {throw new MQClientException("offset < 0", null);}if (maxNums <= 0) {throw new MQClientException("maxNums <= 0", null);}// 自動訂閱=》this.subscriptionAutomatically(mq.getTopic());int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false);SubscriptionData subscriptionData;try {// 構建訂閱數據=》subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(),mq.getTopic(), subExpression);} catch (Exception e) {throw new MQClientException("parse subscription error", e);}// 30s超時long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;// 拉取消息=》PullResult pullResult = this.pullAPIWrapper.pullKernelImpl(mq,subscriptionData.getSubString(),0L,offset,maxNums,sysFlag,0,this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(),timeoutMillis,CommunicationMode.SYNC,null);// 處理拉取消息請求=》this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData);if (!this.consumeMessageHookList.isEmpty()) {ConsumeMessageContext consumeMessageContext = null;consumeMessageContext = new ConsumeMessageContext();consumeMessageContext.setConsumerGroup(this.groupName());consumeMessageContext.setMq(mq);consumeMessageContext.setMsgList(pullResult.getMsgFoundList());consumeMessageContext.setSuccess(false);this.executeHookBefore(consumeMessageContext);consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString());consumeMessageContext.setSuccess(true);// 執行消費消息後的鉤子方法this.executeHookAfter(consumeMessageContext);}return pullResult; }
進入方法,自動訂閱,org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl#subscriptionAutomatically
public void subscriptionAutomatically(final String topic) {if (!this.rebalanceImpl.getSubscriptionInner().containsKey(topic)) {try {// 構建訂閱數據=》SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(),topic, SubscriptionData.SUB_ALL);this.rebalanceImpl.subscriptionInner.putIfAbsent(topic, subscriptionData);} catch (Exception ignore) {}} }
進入方法,構建訂閱數據,org.apache.rocketmq.common.filter.FilterAPI#buildSubscriptionData介紹過了。
進入方法,拉取消息,org.apache.rocketmq.client.impl.consumer.PullAPIWrapper#pullKernelImpl(org.apache.rocketmq.common.message.MessageQueue, java.lang.String, long, long, int, int, long, long, long, org.apache.rocketmq.client.impl.CommunicationMode, org.apache.rocketmq.client.consumer.PullCallback)
public PullResult pullKernelImpl(final MessageQueue mq,final String subExpression,final long subVersion,final long offset,final int maxNums,final int sysFlag,final long commitOffset,final long brokerSuspendMaxTimeMillis,final long timeoutMillis,final CommunicationMode communicationMode,final PullCallback pullCallback) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {// =》return pullKernelImpl(mq,subExpression,ExpressionType.TAG,subVersion, offset,maxNums,sysFlag,commitOffset,brokerSuspendMaxTimeMillis,timeoutMillis,communicationMode,pullCallback); }
進入方法,org.apache.rocketmq.client.impl.consumer.PullAPIWrapper#pullKernelImpl(org.apache.rocketmq.common.message.MessageQueue, java.lang.String, java.lang.String, long, long, int, int, long, long, long, org.apache.rocketmq.client.impl.CommunicationMode, org.apache.rocketmq.client.consumer.PullCallback)
public PullResult pullKernelImpl(final MessageQueue mq,final String subExpression,final String expressionType,final long subVersion,final long offset,final int maxNums,final int sysFlag,final long commitOffset,final long brokerSuspendMaxTimeMillis,final long timeoutMillis,final CommunicationMode communicationMode,final PullCallback pullCallback) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {// 查詢broker=》FindBrokerResult findBrokerResult =this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),this.recalculatePullFromWhichNode(mq), false);if (null == findBrokerResult) {// 從namesrv更新topic路由=》this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());// 查詢broker=》findBrokerResult =this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),this.recalculatePullFromWhichNode(mq), false);}if (findBrokerResult != null) {{// check version 不是tag表達式類型if (!ExpressionType.isTagType(expressionType)&& findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) {throw new MQClientException("The broker[" + mq.getBrokerName() + ", "+ findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null);}}int sysFlagInner = sysFlag;if (findBrokerResult.isSlave()) {sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);}PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();requestHeader.setConsumerGroup(this.consumerGroup);requestHeader.setTopic(mq.getTopic());requestHeader.setQueueId(mq.getQueueId());requestHeader.setQueueOffset(offset);requestHeader.setMaxMsgNums(maxNums);requestHeader.setSysFlag(sysFlagInner);requestHeader.setCommitOffset(commitOffset);requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);requestHeader.setSubscription(subExpression);requestHeader.setSubVersion(subVersion);requestHeader.setExpressionType(expressionType);String brokerAddr = findBrokerResult.getBrokerAddr();if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {// 找到拉取消息的broker=》brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);}// 拉取消息=》PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(brokerAddr,requestHeader,timeoutMillis,communicationMode,pullCallback);return pullResult;}throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null); }
進入方法,查詢broker,org.apache.rocketmq.client.impl.factory.MQClientInstance#findBrokerAddressInSubscribe
public FindBrokerResult findBrokerAddressInSubscribe(final String brokerName,final long brokerId,final boolean onlyThisBroker) {String brokerAddr = null;boolean slave = false;boolean found = false;// 獲取broker的緩存信息HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);if (map != null && !map.isEmpty()) {brokerAddr = map.get(brokerId);slave = brokerId != MixAll.MASTER_ID;found = brokerAddr != null;if (!found && !onlyThisBroker) {Entry<Long, String> entry = map.entrySet().iterator().next();brokerAddr = entry.getValue();slave = entry.getKey() != MixAll.MASTER_ID;found = true;}}if (found) {return new FindBrokerResult(brokerAddr, slave, findBrokerVersion(brokerName, brokerAddr));}return null;}// 獲取broker的版本信息public int findBrokerVersion(String brokerName, String brokerAddr) {if (this.brokerVersionTable.containsKey(brokerName)) {if (this.brokerVersionTable.get(brokerName).containsKey(brokerAddr)) {return this.brokerVersionTable.get(brokerName).get(brokerAddr);}}return 0; }
返回方法,從namesrv更新topic路由,org.apache.rocketmq.client.impl.factory.MQClientInstance#updateTopicRouteInfoFromNameServer(java.lang.String, boolean, org.apache.rocketmq.client.producer.DefaultMQProducer)介紹過了。
返回方法,查詢broker,org.apache.rocketmq.client.impl.factory.MQClientInstance#findBrokerAddressInSubscribe介紹過了。
返回方法,找到拉取消息的broker,org.apache.rocketmq.client.impl.consumer.PullAPIWrapper#computPullFromWhichFilterServer
private String computPullFromWhichFilterServer(final String topic, final String brokerAddr)throws MQClientException {// 獲取topic路由=》ConcurrentMap<String, TopicRouteData> topicRouteTable = this.mQClientFactory.getTopicRouteTable();if (topicRouteTable != null) {TopicRouteData topicRouteData = topicRouteTable.get(topic);List<String> list = topicRouteData.getFilterServerTable().get(brokerAddr);// 隨機策略if (list != null && !list.isEmpty()) {return list.get(randomNum() % list.size());}}throw new MQClientException("Find Filter Server Failed, Broker Addr: " + brokerAddr + " topic: "+ topic, null); }
返回方法,拉取消息,org.apache.rocketmq.client.impl.MQClientAPIImpl#pullMessage
public PullResult pullMessage(final String addr,final PullMessageRequestHeader requestHeader,final long timeoutMillis,final CommunicationMode communicationMode,final PullCallback pullCallback) throws RemotingException, MQBrokerException, InterruptedException {RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);switch (communicationMode) {case ONEWAY:assert false;return null;case ASYNC:// =》this.pullMessageAsync(addr, request, timeoutMillis, pullCallback);return null;case SYNC:// =》return this.pullMessageSync(addr, request, timeoutMillis);default:assert false;break;}return null; }
進入方法,org.apache.rocketmq.client.impl.MQClientAPIImpl#pullMessageAsync
private void pullMessageAsync(final String addr,final RemotingCommand request,final long timeoutMillis,final PullCallback pullCallback) throws RemotingException, InterruptedException {// 異步拉取消息=》this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {@Overridepublic void operationComplete(ResponseFuture responseFuture) {RemotingCommand response = responseFuture.getResponseCommand();if (response != null) {try {// 處理拉取響應=》PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response);assert pullResult != null;pullCallback.onSuccess(pullResult);} catch (Exception e) {pullCallback.onException(e);}} else {if (!responseFuture.isSendRequestOK()) {pullCallback.onException(new MQClientException("send request failed to " + addr + ". Request: " + request, responseFuture.getCause()));} else if (responseFuture.isTimeout()) {pullCallback.onException(new MQClientException("wait response from " + addr + " timeout :" + responseFuture.getTimeoutMillis() + "ms" + ". Request: " + request,responseFuture.getCause()));} else {pullCallback.onException(new MQClientException("unknown reason. addr: " + addr + ", timeoutMillis: " + timeoutMillis + ". Request: " + request, responseFuture.getCause()));}}}}); }
進入方法,異步拉取消息,org.apache.rocketmq.remoting.netty.NettyRemotingClient#invokeAsync介紹過了。
返回方法,處理拉取響應,org.apache.rocketmq.client.impl.MQClientAPIImpl#processPullResponse
private PullResult processPullResponse(final RemotingCommand response) throws MQBrokerException, RemotingCommandException {PullStatus pullStatus = PullStatus.NO_NEW_MSG;switch (response.getCode()) {case ResponseCode.SUCCESS:pullStatus = PullStatus.FOUND;break;case ResponseCode.PULL_NOT_FOUND:pullStatus = PullStatus.NO_NEW_MSG;break;case ResponseCode.PULL_RETRY_IMMEDIATELY:pullStatus = PullStatus.NO_MATCHED_MSG;break;case ResponseCode.PULL_OFFSET_MOVED:pullStatus = PullStatus.OFFSET_ILLEGAL;break;default:throw new MQBrokerException(response.getCode(), response.getRemark());}PullMessageResponseHeader responseHeader =(PullMessageResponseHeader) response.decodeCommandCustomHeader(PullMessageResponseHeader.class);return new PullResultExt(pullStatus, responseHeader.getNextBeginOffset(), responseHeader.getMinOffset(),responseHeader.getMaxOffset(), null, responseHeader.getSuggestWhichBrokerId(), response.getBody()); }
返回方法,處理拉取消息請求,org.apache.rocketmq.client.impl.consumer.PullAPIWrapper#processPullResult
public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,final SubscriptionData subscriptionData) {PullResultExt pullResultExt = (PullResultExt) pullResult;// 根據brokerId肯定從哪一個節點拉去消息=》this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());// 若是消息沒找到if (PullStatus.FOUND == pullResult.getPullStatus()) {ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());// 消息解碼List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);List<MessageExt> msgListFilterAgain = msgList;if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) {msgListFilterAgain = new ArrayList<MessageExt>(msgList.size());for (MessageExt msg : msgList) {if (msg.getTags() != null) {if (subscriptionData.getTagsSet().contains(msg.getTags())) {msgListFilterAgain.add(msg);}}}}// 是否有消息過濾鉤子方法if (this.hasHook()) {FilterMessageContext filterMessageContext = new FilterMessageContext();filterMessageContext.setUnitMode(unitMode);filterMessageContext.setMsgList(msgListFilterAgain);// 執行鉤子方法=》this.executeHook(filterMessageContext);}for (MessageExt msg : msgListFilterAgain) {String traFlag = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (traFlag != null && Boolean.parseBoolean(traFlag)) {msg.setTransactionId(msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));}MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MIN_OFFSET,Long.toString(pullResult.getMinOffset()));MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MAX_OFFSET,Long.toString(pullResult.getMaxOffset()));}pullResultExt.setMsgFoundList(msgListFilterAgain);}pullResultExt.setMessageBinary(null);return pullResult; }
pullConsumer啓動、拉取消息解析完畢。
說在最後
本次解析僅表明我的觀點,僅供參考。
加入技術微信羣
釘釘技術羣