rocketmq源碼解析consumer、producer處理過程②

說在前面java

DefaultMQProducer、DefaultMQPullConsumer、DefaultMQPushConsumer 處理過程git

 

源碼解析github

進入方法,獲取默認的topic路由信息,org.apache.rocketmq.client.impl.MQClientAPIImpl#getDefaultTopicRouteInfoFromNameServerapache

public TopicRouteData getDefaultTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis)throws RemotingException, MQClientException, InterruptedException {return getTopicRouteInfoFromNameServer(topic, timeoutMillis, false);    }

進入方法,org.apache.rocketmq.client.impl.MQClientAPIImpl#getTopicRouteInfoFromNameServer(java.lang.String, long, boolean)微信




public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis,boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();requestHeader.setTopic(topic);RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader);//        同步獲取topic的路由信息=》RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);assert response != null;switch (response.getCode()) {case ResponseCode.TOPIC_NOT_EXIST: {if (allowTopicNotExist && !topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {log.warn("get Topic [{}] RouteInfoFromNameServer is not exist value", topic);}break;}case ResponseCode.SUCCESS: {byte[] body = response.getBody();if (body != null) {return TopicRouteData.decode(body, TopicRouteData.class);}}default:break;}throw new MQClientException(response.getCode(), response.getRemark());    }

進入方法,同步獲取topic的路由信息,org.apache.rocketmq.remoting.netty.NettyRemotingClient#invokeSync介紹過了。app

返回方法,獲取topic路由信息,org.apache.rocketmq.client.impl.MQClientAPIImpl#getTopicRouteInfoFromNameServer(java.lang.String, long, boolean)介紹過了。異步

返回方法,判斷topic路由是否改變,org.apache.rocketmq.client.impl.factory.MQClientInstance#topicRouteDataIsChangeasync

private boolean topicRouteDataIsChange(TopicRouteData olddata, TopicRouteData nowdata) {if (olddata == null || nowdata == null)return true;TopicRouteData old = olddata.cloneTopicRouteData();TopicRouteData now = nowdata.cloneTopicRouteData();Collections.sort(old.getQueueDatas());Collections.sort(old.getBrokerDatas());Collections.sort(now.getQueueDatas());Collections.sort(now.getBrokerDatas());return !old.equals(now);}

返回方法,按brokerName選擇一個消息隊列,org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#selectOneMessageQueueide

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {//        =》return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);    }

進入方法,org.apache.rocketmq.client.latency.MQFaultStrategy#selectOneMessageQueueui



public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {if (this.sendLatencyFaultEnable) {try {int index = tpInfo.getSendWhichQueue().getAndIncrement();for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();if (pos < 0)pos = 0;MessageQueue mq = tpInfo.getMessageQueueList().get(pos);if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))return mq;}}//                找到一個brokerfinal String notBestBroker = latencyFaultTolerance.pickOneAtLeast();//                獲取寫隊列數量int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);if (writeQueueNums > 0) {//                    選擇一個消息隊列=》final MessageQueue mq = tpInfo.selectOneMessageQueue();if (notBestBroker != null) {mq.setBrokerName(notBestBroker);mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);}return mq;} else {latencyFaultTolerance.remove(notBestBroker);}} catch (Exception e) {log.error("Error occurred when selecting message queue", e);}//            選擇一個消息隊列=》return tpInfo.selectOneMessageQueue();}//        按brokerName選擇一個消息隊列=》return tpInfo.selectOneMessageQueue(lastBrokerName);    }

進入方法,選擇一個消息隊列,org.apache.rocketmq.client.impl.producer.TopicPublishInfo#selectOneMessageQueue()

public MessageQueue selectOneMessageQueue() {int index = this.sendWhichQueue.getAndIncrement();int pos = Math.abs(index) % this.messageQueueList.size();if (pos < 0)pos = 0;return this.messageQueueList.get(pos);    }

返回方法,發送,org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendKernelImpl













private SendResult sendKernelImpl(final Message msg,final MessageQueue mq,final CommunicationMode communicationMode,final SendCallback sendCallback,final TopicPublishInfo topicPublishInfo,final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {long beginStartTime = System.currentTimeMillis();//        按brokerName找到master地址=》String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());if (null == brokerAddr) {//            找到topic的發佈信息=》tryToFindTopicPublishInfo(mq.getTopic());//            找到broker的master地址=》brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());}SendMessageContext context = null;if (brokerAddr != null) {brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);byte[] prevBody = msg.getBody();try {//for MessageBatch,ID has been set in the generating processif (!(msg instanceof MessageBatch)) {MessageClientIDSetter.setUniqID(msg);}int sysFlag = 0;boolean msgBodyCompressed = false;//                默認對4k的消息壓縮=》if (this.tryToCompressMessage(msg)) {sysFlag |= MessageSysFlag.COMPRESSED_FLAG;msgBodyCompressed = true;}//                是不是事務消息final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;}if (hasCheckForbiddenHook()) {CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());checkForbiddenContext.setCommunicationMode(communicationMode);checkForbiddenContext.setBrokerAddr(brokerAddr);checkForbiddenContext.setMessage(msg);checkForbiddenContext.setMq(mq);checkForbiddenContext.setUnitMode(this.isUnitMode());this.executeCheckForbiddenHook(checkForbiddenContext);}if (this.hasSendMessageHook()) {context = new SendMessageContext();context.setProducer(this);context.setProducerGroup(this.defaultMQProducer.getProducerGroup());context.setCommunicationMode(communicationMode);context.setBornHost(this.defaultMQProducer.getClientIP());context.setBrokerAddr(brokerAddr);context.setMessage(msg);context.setMq(mq);String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (isTrans != null && isTrans.equals("true")) {context.setMsgType(MessageType.Trans_Msg_Half);}if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {context.setMsgType(MessageType.Delay_Msg);}//                    執行發送消息以前的鉤子方法實現=》this.executeSendMessageHookBefore(context);}SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());requestHeader.setTopic(msg.getTopic());requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());requestHeader.setQueueId(mq.getQueueId());requestHeader.setSysFlag(sysFlag);requestHeader.setBornTimestamp(System.currentTimeMillis());requestHeader.setFlag(msg.getFlag());requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));requestHeader.setReconsumeTimes(0);requestHeader.setUnitMode(this.isUnitMode());requestHeader.setBatch(msg instanceof MessageBatch);//                若是是重試的topicif (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {//                    重試消費次數String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);if (reconsumeTimes != null) {requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);}//                    最大重試消費次數String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);if (maxReconsumeTimes != null) {requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);}}SendResult sendResult = null;switch (communicationMode) {case ASYNC:Message tmpMessage = msg;if (msgBodyCompressed) {//If msg body was compressed, msgbody should be reset using prevBody.//Clone new message using commpressed message body and recover origin massage.//Fix bug:https://github.com/apache/rocketmq-externals/issues/66tmpMessage = MessageAccessor.cloneMessage(msg);msg.setBody(prevBody);}long costTimeAsync = System.currentTimeMillis() - beginStartTime;if (timeout < costTimeAsync) {throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");}//                        發送消息=》sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr,mq.getBrokerName(),tmpMessage,requestHeader,timeout - costTimeAsync,communicationMode,sendCallback,topicPublishInfo,this.mQClientFactory,//                            重試2次this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),context,this);break;case ONEWAY:case SYNC:long costTimeSync = System.currentTimeMillis() - beginStartTime;if (timeout < costTimeSync) {throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");}//                        同步發送=》sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr,mq.getBrokerName(),msg,requestHeader,timeout - costTimeSync,communicationMode,context,this);break;default:assert false;break;}if (this.hasSendMessageHook()) {context.setSendResult(sendResult);//                    執行發送消息後的鉤子方法this.executeSendMessageHookAfter(context);}return sendResult;} catch (RemotingException e) {if (this.hasSendMessageHook()) {context.setException(e);this.executeSendMessageHookAfter(context);}throw e;} catch (MQBrokerException e) {if (this.hasSendMessageHook()) {context.setException(e);this.executeSendMessageHookAfter(context);}throw e;} catch (InterruptedException e) {if (this.hasSendMessageHook()) {context.setException(e);this.executeSendMessageHookAfter(context);}throw e;} finally {msg.setBody(prevBody);}}throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);    }

進入方法,按brokerName找到master地址,org.apache.rocketmq.client.impl.factory.MQClientInstance#findBrokerAddressInPublish

public String findBrokerAddressInPublish(final String brokerName) {HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);if (map != null && !map.isEmpty()) {return map.get(MixAll.MASTER_ID);}return null;    }

進入方法,找到topic的發佈信息,org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#tryToFindTopicPublishInfo

private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);if (null == topicPublishInfo || !topicPublishInfo.ok()) {this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());//           從namesrv更新topic路由信息=》this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);topicPublishInfo = this.topicPublishInfoTable.get(topic);}if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {return topicPublishInfo;} else {//            從namesrv更新topic路由信息this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);topicPublishInfo = this.topicPublishInfoTable.get(topic);return topicPublishInfo;}    }

進入方法,從namesrv更新topic路由信息,org.apache.rocketmq.client.impl.factory.MQClientInstance#updateTopicRouteInfoFromNameServer(java.lang.String)

public boolean updateTopicRouteInfoFromNameServer(final String topic) {//        =》return updateTopicRouteInfoFromNameServer(topic, false, null);    }

進入方法,org.apache.rocketmq.client.impl.factory.MQClientInstance#updateTopicRouteInfoFromNameServer(java.lang.String, boolean, org.apache.rocketmq.client.producer.DefaultMQProducer)





public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,DefaultMQProducer defaultMQProducer) {try {if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {try {TopicRouteData topicRouteData;if (isDefault && defaultMQProducer != null) {//                        獲取默認的topic路由信息 =》topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),1000 * 3);if (topicRouteData != null) {//                            獲取隊列信息for (QueueData data : topicRouteData.getQueueDatas()) {//                                讀寫隊列最大數量4int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());data.setReadQueueNums(queueNums);data.setWriteQueueNums(queueNums);}}} else {//                        獲取topic路由信息=》topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);}if (topicRouteData != null) {TopicRouteData old = this.topicRouteTable.get(topic);//                        判斷topic路由是否改變=》boolean changed = topicRouteDataIsChange(old, topicRouteData);if (!changed) {//                            須要更新路由信息changed = this.isNeedUpdateTopicRouteInfo(topic);} else {log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);}if (changed) {TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();for (BrokerData bd : topicRouteData.getBrokerDatas()) {//                                更新broker的地址列表this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());}// Update Pub info{//                                topic路由信息轉換成topic發佈信息=》TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);publishInfo.setHaveTopicRouterInfo(true);//                                遍歷生產者信息Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();while (it.hasNext()) {Entry<String, MQProducerInner> entry = it.next();MQProducerInner impl = entry.getValue();if (impl != null) {//                                        更新topic發佈信息=》impl.updateTopicPublishInfo(topic, publishInfo);}}}// Update sub info{//                                獲取消息隊列訂閱信息Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);//                                遍歷消費者Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();while (it.hasNext()) {Entry<String, MQConsumerInner> entry = it.next();MQConsumerInner impl = entry.getValue();if (impl != null) {//                                        更新topic的訂閱信息=》impl.updateTopicSubscribeInfo(topic, subscribeInfo);}}}log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);this.topicRouteTable.put(topic, cloneTopicRouteData);return true;}} else {log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}", topic);}} catch (Exception e) {if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {log.warn("updateTopicRouteInfoFromNameServer Exception", e);}} finally {this.lockNamesrv.unlock();}} else {log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms", LOCK_TIMEOUT_MILLIS);}} catch (InterruptedException e) {log.warn("updateTopicRouteInfoFromNameServer Exception", e);}return false;    }

進入方法,獲取默認的topic路由信息,org.apache.rocketmq.client.impl.MQClientAPIImpl#getTopicRouteInfoFromNameServer(java.lang.String, long, boolean)




public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis,boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();requestHeader.setTopic(topic);RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader);//        同步獲取topic的路由信息=》RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);assert response != null;switch (response.getCode()) {case ResponseCode.TOPIC_NOT_EXIST: {if (allowTopicNotExist && !topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {log.warn("get Topic [{}] RouteInfoFromNameServer is not exist value", topic);}break;}case ResponseCode.SUCCESS: {byte[] body = response.getBody();if (body != null) {return TopicRouteData.decode(body, TopicRouteData.class);}}default:break;}throw new MQClientException(response.getCode(), response.getRemark());    }

返回方法,獲取topic路由信息,org.apache.rocketmq.client.impl.MQClientAPIImpl#getTopicRouteInfoFromNameServer(java.lang.String, long, boolean)介紹過了。

返回方法,判斷topic路由是否改變,org.apache.rocketmq.client.impl.factory.MQClientInstance#topicRouteDataIsChange

private boolean topicRouteDataIsChange(TopicRouteData olddata, TopicRouteData nowdata) {if (olddata == null || nowdata == null)return true;TopicRouteData old = olddata.cloneTopicRouteData();TopicRouteData now = nowdata.cloneTopicRouteData();Collections.sort(old.getQueueDatas());Collections.sort(old.getBrokerDatas());Collections.sort(now.getQueueDatas());Collections.sort(now.getBrokerDatas());return !old.equals(now);}

進入方法,須要更新路由信息,org.apache.rocketmq.client.impl.factory.MQClientInstance#isNeedUpdateTopicRouteInfo


private boolean isNeedUpdateTopicRouteInfo(final String topic) {boolean result = false;{//            遍歷生產者Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();while (it.hasNext() && !result) {Entry<String, MQProducerInner> entry = it.next();MQProducerInner impl = entry.getValue();if (impl != null) {//                    獲取topic的發佈信息是否須要更新=》result = impl.isPublishTopicNeedUpdate(topic);}}}{//            遍歷消費者Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();while (it.hasNext() && !result) {Entry<String, MQConsumerInner> entry = it.next();MQConsumerInner impl = entry.getValue();if (impl != null) {//                    訂閱的信息是否須要更新=》result = impl.isSubscribeTopicNeedUpdate(topic);}}}return result;    }

返回方法,找到broker的master地址,org.apache.rocketmq.client.impl.factory.MQClientInstance#findBrokerAddressInPublish

進入方法,發送消息,org.apache.rocketmq.client.impl.MQClientAPIImpl#sendMessage(java.lang.String, java.lang.String, org.apache.rocketmq.common.message.Message, org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader, long, org.apache.rocketmq.client.impl.CommunicationMode, org.apache.rocketmq.client.producer.SendCallback, org.apache.rocketmq.client.impl.producer.TopicPublishInfo, org.apache.rocketmq.client.impl.factory.MQClientInstance, int, org.apache.rocketmq.client.hook.SendMessageContext, org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl)public String findBrokerAddressInPublish(final String brokerName) {HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);if (map != null && !map.isEmpty()) {return map.get(MixAll.MASTER_ID);}return null;    }


public SendResult sendMessage(final String addr,final String brokerName,final Message msg,final SendMessageRequestHeader requestHeader,final long timeoutMillis,final CommunicationMode communicationMode,final SendCallback sendCallback,final TopicPublishInfo topicPublishInfo,final MQClientInstance instance,final int retryTimesWhenSendFailed,final SendMessageContext context,final DefaultMQProducerImpl producer) throws RemotingException, MQBrokerException, InterruptedException {long beginStartTime = System.currentTimeMillis();RemotingCommand request = null;if (sendSmartMsg || msg instanceof MessageBatch) {SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);} else {request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);}request.setBody(msg.getBody());switch (communicationMode) {case ONEWAY://                單途請求=》this.remotingClient.invokeOneway(addr, request, timeoutMillis);return null;case ASYNC:final AtomicInteger times = new AtomicInteger();long costTimeAsync = System.currentTimeMillis() - beginStartTime;//                發送超時if (timeoutMillis < costTimeAsync) {throw new RemotingTooMuchRequestException("sendMessage call timeout");}//                異步發送=》this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,retryTimesWhenSendFailed, times, context, producer);return null;case SYNC:long costTimeSync = System.currentTimeMillis() - beginStartTime;if (timeoutMillis < costTimeSync) {throw new RemotingTooMuchRequestException("sendMessage call timeout");}//                同步發送=》return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);default:assert false;break;}return null;    }

進入方法,org.apache.rocketmq.remoting.netty.NettyRemotingClient#invokeOneway介紹過了。

返回方法,異步發送,org.apache.rocketmq.client.impl.MQClientAPIImpl#sendMessageAsync





private void sendMessageAsync(final String addr,final String brokerName,final Message msg,final long timeoutMillis,final RemotingCommand request,final SendCallback sendCallback,final TopicPublishInfo topicPublishInfo,final MQClientInstance instance,final int retryTimesWhenSendFailed,final AtomicInteger times,final SendMessageContext context,final DefaultMQProducerImpl producer) throws InterruptedException, RemotingException {//        異步發送this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {@Overridepublic void operationComplete(ResponseFuture responseFuture) {RemotingCommand response = responseFuture.getResponseCommand();if (null == sendCallback && response != null) {try {//                       處理髮送響應=》SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response);if (context != null && sendResult != null) {context.setSendResult(sendResult);//                            執行發送消息後的鉤子方法context.getProducer().executeSendMessageHookAfter(context);}} catch (Throwable e) {}producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);return;}if (response != null) {try {//                        處理髮送響應=》SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response);assert sendResult != null;if (context != null) {context.setSendResult(sendResult);//                            執行發送消息後的鉤子方法context.getProducer().executeSendMessageHookAfter(context);}try {//                            執行回調=》sendCallback.onSuccess(sendResult);} catch (Throwable e) {}producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);} catch (Exception e) {producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);//                        異常處理=》onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,retryTimesWhenSendFailed, times, e, context, false, producer);}} else {producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);if (!responseFuture.isSendRequestOK()) {MQClientException ex = new MQClientException("send request failed", responseFuture.getCause());onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,retryTimesWhenSendFailed, times, ex, context, true, producer);} else if (responseFuture.isTimeout()) {MQClientException ex = new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms",responseFuture.getCause());onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,retryTimesWhenSendFailed, times, ex, context, true, producer);} else {MQClientException ex = new MQClientException("unknow reseaon", responseFuture.getCause());onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,retryTimesWhenSendFailed, times, ex, context, true, producer);}}}});    }

進入方法,異步發送,org.apache.rocketmq.remoting.netty.NettyRemotingClient#invokeAsync介紹過了。

返回方法,處理髮送響應,org.apache.rocketmq.client.impl.MQClientAPIImpl#processSendResponse




private SendResult processSendResponse(final String brokerName,final Message msg,final RemotingCommand response) throws MQBrokerException, RemotingCommandException {switch (response.getCode()) {case ResponseCode.FLUSH_DISK_TIMEOUT:case ResponseCode.FLUSH_SLAVE_TIMEOUT:case ResponseCode.SLAVE_NOT_AVAILABLE: {}case ResponseCode.SUCCESS: {SendStatus sendStatus = SendStatus.SEND_OK;switch (response.getCode()) {case ResponseCode.FLUSH_DISK_TIMEOUT:sendStatus = SendStatus.FLUSH_DISK_TIMEOUT;break;case ResponseCode.FLUSH_SLAVE_TIMEOUT:sendStatus = SendStatus.FLUSH_SLAVE_TIMEOUT;break;case ResponseCode.SLAVE_NOT_AVAILABLE:sendStatus = SendStatus.SLAVE_NOT_AVAILABLE;break;case ResponseCode.SUCCESS:sendStatus = SendStatus.SEND_OK;break;default:assert false;break;}SendMessageResponseHeader responseHeader =(SendMessageResponseHeader) response.decodeCommandCustomHeader(SendMessageResponseHeader.class);MessageQueue messageQueue = new MessageQueue(msg.getTopic(), brokerName, responseHeader.getQueueId());String uniqMsgId = MessageClientIDSetter.getUniqID(msg);if (msg instanceof MessageBatch) {StringBuilder sb = new StringBuilder();for (Message message : (MessageBatch) msg) {sb.append(sb.length() == 0 ? "" : ",").append(MessageClientIDSetter.getUniqID(message));}uniqMsgId = sb.toString();}SendResult sendResult = new SendResult(sendStatus,uniqMsgId,responseHeader.getMsgId(), messageQueue, responseHeader.getQueueOffset());sendResult.setTransactionId(responseHeader.getTransactionId());String regionId = response.getExtFields().get(MessageConst.PROPERTY_MSG_REGION);String traceOn = response.getExtFields().get(MessageConst.PROPERTY_TRACE_SWITCH);if (regionId == null || regionId.isEmpty()) {regionId = MixAll.DEFAULT_TRACE_REGION_ID;}if (traceOn != null && traceOn.equals("false")) {sendResult.setTraceOn(false);} else {sendResult.setTraceOn(true);}sendResult.setRegionId(regionId);return sendResult;}default:break;}throw new MQBrokerException(response.getCode(), response.getRemark());    }

返回方法,異常處理,org.apache.rocketmq.client.impl.MQClientAPIImpl#onExceptionImpl


private void onExceptionImpl(final String brokerName,final Message msg,final long timeoutMillis,final RemotingCommand request,final SendCallback sendCallback,final TopicPublishInfo topicPublishInfo,final MQClientInstance instance,final int timesTotal,final AtomicInteger curTimes,final Exception e,final SendMessageContext context,final boolean needRetry,final DefaultMQProducerImpl producer) {int tmp = curTimes.incrementAndGet();if (needRetry && tmp <= timesTotal) {String retryBrokerName = brokerName;//by default, it will send to the same brokerif (topicPublishInfo != null) { //select one message queue accordingly, in order to determine which broker to send//                按brokerName查找消息隊列=》MessageQueue mqChosen = producer.selectOneMessageQueue(topicPublishInfo, brokerName);retryBrokerName = mqChosen.getBrokerName();}//            按brokerName查找master地址=》String addr = instance.findBrokerAddressInPublish(retryBrokerName);log.info("async send msg by retry {} times. topic={}, brokerAddr={}, brokerName={}", tmp, msg.getTopic(), addr,retryBrokerName);try {request.setOpaque(RemotingCommand.createNewRequestId());//                異步發送消息=》sendMessageAsync(addr, retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance,timesTotal, curTimes, context, producer);} catch (InterruptedException e1) {onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,context, false, producer);} catch (RemotingConnectException e1) {producer.updateFaultItem(brokerName, 3000, true);onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,context, true, producer);} catch (RemotingTooMuchRequestException e1) {onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,context, false, producer);} catch (RemotingException e1) {producer.updateFaultItem(brokerName, 3000, true);onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,context, true, producer);}} else {if (context != null) {context.setException(e);//                執行發送消息後的鉤子方法context.getProducer().executeSendMessageHookAfter(context);}try {//                執行回調sendCallback.onException(e);} catch (Exception ignored) {}}    }

進入方法,按brokerName查找消息隊列,org.apache.rocketmq.client.latency.MQFaultStrategy#selectOneMessageQueue介紹過了。

返回方法,按brokerName查找master地址,org.apache.rocketmq.client.impl.factory.MQClientInstance#findBrokerAddressInPublish

public String findBrokerAddressInPublish(final String brokerName) {HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);if (map != null && !map.isEmpty()) {return map.get(MixAll.MASTER_ID);}return null;    }

返回方法,異步發送消息,org.apache.rocketmq.client.impl.MQClientAPIImpl#sendMessageAsync介紹過了。

返回方法,同步發送,org.apache.rocketmq.client.impl.MQClientAPIImpl#sendMessageSync

private SendResult sendMessageSync(final String addr,final String brokerName,final Message msg,final long timeoutMillis,final RemotingCommand request) throws RemotingException, MQBrokerException, InterruptedException {//        =》RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);assert response != null;//       處理請求響應=》return this.processSendResponse(brokerName, msg, response);    }

進入方法,org.apache.rocketmq.remoting.netty.NettyRemotingClient#invokeSync介紹過了。

返回方法,處理請求響應,org.apache.rocketmq.client.impl.MQClientAPIImpl#processSendResponse介紹過了。

返回方法,org.apache.rocketmq.client.producer.DefaultMQProducer#send(org.apache.rocketmq.common.message.Message) 生產者啓動、發送消息解析完畢。

 

說在最後

本次解析僅表明我的觀點,僅供參考。

 

加入技術微信羣

釘釘技術羣

相關文章
相關標籤/搜索