說在前面apache
發送消息處理器微信
源碼解析併發
進入這個方法,org.apache.rocketmq.broker.processor.SendMessageProcessor#processRequestapp
@Overridepublic RemotingCommand processRequest(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {SendMessageContext mqtraceContext;switch (request.getCode()) {case RequestCode.CONSUMER_SEND_MSG_BACK:// 消費者發送消息返回=》return this.consumerSendMsgBack(ctx, request);default:// 解析請求消息頭,隊序列化作了些優化,消息頭中字段過多,字段過長併發狀況下會對序列化效率產生影響=》SendMessageRequestHeader requestHeader = parseRequestHeader(request);if (requestHeader == null) {return null;}// 構建消息上下文=》mqtraceContext = buildMsgContext(ctx, requestHeader);// 發送消息以前執行鉤子方法=》this.executeSendMessageHookBefore(ctx, request, mqtraceContext);RemotingCommand response;if (requestHeader.isBatch()) {// 批量消息發送=》response = this.sendBatchMessage(ctx, request, mqtraceContext, requestHeader);} else {// 發送消息=》response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);}// 執行發送消息以後的鉤子方法=》this.executeSendMessageHookAfter(response, mqtraceContext);return response;} }
進入這個方法,消費者發送消息返回,org.apache.rocketmq.broker.processor.SendMessageProcessor#consumerSendMsgBackdom
private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request)throws RemotingCommandException {final RemotingCommand response = RemotingCommand.createResponseCommand(null);final ConsumerSendMsgBackRequestHeader requestHeader =(ConsumerSendMsgBackRequestHeader)request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);if (this.hasConsumeMessageHook() && !UtilAll.isBlank(requestHeader.getOriginMsgId())) {ConsumeMessageContext context = new ConsumeMessageContext();context.setConsumerGroup(requestHeader.getGroup());context.setTopic(requestHeader.getOriginTopic());context.setCommercialRcvStats(BrokerStatsManager.StatsType.SEND_BACK);context.setCommercialRcvTimes(1);context.setCommercialOwner(request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER));// 執行消費消息的鉤子方法this.executeConsumeMessageHookAfter(context);}// 獲取消費組的訂閱配置信息=》SubscriptionGroupConfig subscriptionGroupConfig =this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup());if (null == subscriptionGroupConfig) {response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);response.setRemark("subscription group not exist, " + requestHeader.getGroup() + " "+ FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));return response;}// 無寫權限if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending message is forbidden");return response;}if (subscriptionGroupConfig.getRetryQueueNums() <= 0) {response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response;}// 重試topic=%RETRY%+消費組String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();int topicSysFlag = 0;if (requestHeader.isUnitMode()) {topicSysFlag = TopicSysFlag.buildSysFlag(false, true);}// 在發送消息返回後建立topic配置信息=》TopicConfig topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,subscriptionGroupConfig.getRetryQueueNums(),PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);if (null == topicConfig) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("topic[" + newTopic + "] not exist");return response;}// 無寫權限if (!PermName.isWriteable(topicConfig.getPerm())) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark(String.format("the topic[%s] sending message is forbidden", newTopic));return response;}// 按offset查詢消息=》MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset());if (null == msgExt) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("look message by offset failed, " + requestHeader.getOffset());return response;}final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);if (null == retryTopic) {MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic());}msgExt.setWaitStoreMsgOK(false);int delayLevel = requestHeader.getDelayLevel();// 最大重試次數,默認16次int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();}// 重試16次後if (msgExt.getReconsumeTimes() >= maxReconsumeTimes|| delayLevel < 0) {// 建立%DLQ%+消費組 topicnewTopic = MixAll.getDLQTopic(requestHeader.getGroup());queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;// 建立topic配置信息在發送消息返回=》topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,DLQ_NUMS_PER_GROUP,PermName.PERM_WRITE, 0);if (null == topicConfig) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("topic[" + newTopic + "] not exist");return response;}} else {if (0 == delayLevel) {delayLevel = 3 + msgExt.getReconsumeTimes();}msgExt.setDelayTimeLevel(delayLevel);}MessageExtBrokerInner msgInner = new MessageExtBrokerInner();msgInner.setTopic(newTopic);msgInner.setBody(msgExt.getBody());msgInner.setFlag(msgExt.getFlag());MessageAccessor.setProperties(msgInner, msgExt.getProperties());msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags()));msgInner.setQueueId(queueIdInt);msgInner.setSysFlag(msgExt.getSysFlag());msgInner.setBornTimestamp(msgExt.getBornTimestamp());msgInner.setBornHost(msgExt.getBornHost());msgInner.setStoreHost(this.getStoreHost());msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);String originMsgId = MessageAccessor.getOriginMessageId(msgExt);MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);// 存儲消息=》PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);if (putMessageResult != null) {switch (putMessageResult.getPutMessageStatus()) {case PUT_OK:String backTopic = msgExt.getTopic();String correctTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);if (correctTopic != null) {backTopic = correctTopic;}this.brokerController.getBrokerStatsManager().incSendBackNums(requestHeader.getGroup(), backTopic);response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response;default:break;}response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark(putMessageResult.getPutMessageStatus().name());return response;}response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("putMessageResult is null");return response; }
進入這個方法,執行消費消息的鉤子方法,org.apache.rocketmq.broker.processor.SendMessageProcessor#executeConsumeMessageHookAfteride
public void executeConsumeMessageHookAfter(final ConsumeMessageContext context) {if (hasConsumeMessageHook()) {for (ConsumeMessageHook hook : this.consumeMessageHookList) {try {// 客戶能夠實現本身的消費消息後的鉤子方法hook.consumeMessageAfter(context);} catch (Throwable e) {// Ignore}}} }
往上返回到這個方法,獲取消費組的訂閱配置信息,org.apache.rocketmq.broker.subscription.SubscriptionGroupManager#findSubscriptionGroupConfig前面介紹過了。優化
往上返回到這個方法,在發送消息返回後建立topic配置信息,org.apache.rocketmq.broker.topic.TopicConfigManager#createTopicInSendMessageBackMethodui
public TopicConfig createTopicInSendMessageBackMethod(final String topic,final int clientDefaultTopicQueueNums,final int perm,final int topicSysFlag) {// 獲取topic配置信息TopicConfig topicConfig = this.topicConfigTable.get(topic);if (topicConfig != null)return topicConfig;boolean createNew = false;try {if (this.lockTopicConfigTable.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {try {topicConfig = this.topicConfigTable.get(topic);if (topicConfig != null)return topicConfig;topicConfig = new TopicConfig(topic);topicConfig.setReadQueueNums(clientDefaultTopicQueueNums);topicConfig.setWriteQueueNums(clientDefaultTopicQueueNums);topicConfig.setPerm(perm);topicConfig.setTopicSysFlag(topicSysFlag);log.info("create new topic {}", topicConfig);// 存儲重試的tpic配置信息this.topicConfigTable.put(topic, topicConfig);createNew = true;// 修飾數據的版本號this.dataVersion.nextVersion();// 持久化=》this.persist();} finally {this.lockTopicConfigTable.unlock();}}} catch (InterruptedException e) {log.error("createTopicInSendMessageBackMethod exception", e);}// 若是topic配置信息是從新建立的,註冊到broker集羣中=》if (createNew) {this.brokerController.registerBrokerAll(false, true,true);}return topicConfig; }
進入這個方法,若是topic配置信息是從新建立的,註冊到broker集羣中,org.apache.rocketmq.broker.BrokerController#registerBrokerAll前面介紹過了。this
往上返回到這個方法,按offset查詢消息,org.apache.rocketmq.store.DefaultMessageStore#lookMessageByOffset(long)前面介紹過了。debug
往上返回到這個方法,建立topic配置信息在發送消息返回,org.apache.rocketmq.broker.topic.TopicConfigManager#createTopicInSendMessageBackMethod前面介紹過了。
往上返回到這個方法,存儲消息,org.apache.rocketmq.store.DefaultMessageStore#putMessage前面介紹過了。
往上返回到這個方法,解析請求消息頭,隊序列化作了些優化,消息頭中字段過多,字段過長併發狀況下會對序列化效率產生影響,org.apache.rocketmq.broker.processor.AbstractSendMessageProcessor#parseRequestHeader
protected SendMessageRequestHeader parseRequestHeader(RemotingCommand request)throws RemotingCommandException {SendMessageRequestHeaderV2 requestHeaderV2 = null;SendMessageRequestHeader requestHeader = null;switch (request.getCode()) {case RequestCode.SEND_BATCH_MESSAGE:case RequestCode.SEND_MESSAGE_V2:requestHeaderV2 =(SendMessageRequestHeaderV2) request.decodeCommandCustomHeader(SendMessageRequestHeaderV2.class);case RequestCode.SEND_MESSAGE:if (null == requestHeaderV2) {requestHeader =(SendMessageRequestHeader) request.decodeCommandCustomHeader(SendMessageRequestHeader.class);} else {requestHeader = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV1(requestHeaderV2);}default:break;}return requestHeader; }
返回到這個方法,發送消息以前執行鉤子方法,org.apache.rocketmq.broker.processor.AbstractSendMessageProcessor#executeSendMessageHookBefore
public void executeSendMessageHookBefore(final ChannelHandlerContext ctx, final RemotingCommand request,SendMessageContext context) {if (hasSendMessageHook()) {for (SendMessageHook hook : this.sendMessageHookList) {try {// 解析消息頭,對序列化的優化final SendMessageRequestHeader requestHeader = parseRequestHeader(request);if (null != requestHeader) {context.setProducerGroup(requestHeader.getProducerGroup());context.setTopic(requestHeader.getTopic());context.setBodyLength(request.getBody().length);context.setMsgProps(requestHeader.getProperties());context.setBornHost(RemotingHelper.parseChannelRemoteAddr(ctx.channel()));context.setBrokerAddr(this.brokerController.getBrokerAddr());context.setQueueId(requestHeader.getQueueId());}// 執行發送消息以前的鉤子方法hook.sendMessageBefore(context);if (requestHeader != null) {requestHeader.setProperties(context.getMsgProps());}} catch (Throwable e) {// Ignore}}} }
返回到這個方法,批量消息發送,org.apache.rocketmq.broker.processor.SendMessageProcessor#sendBatchMessage
private RemotingCommand sendBatchMessage(final ChannelHandlerContext ctx,final RemotingCommand request,final SendMessageContext sendMessageContext,final SendMessageRequestHeader requestHeader) throws RemotingCommandException {final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();response.setOpaque(request.getOpaque());response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));log.debug("Receive SendMessage request command {}", request);// 開始接受發送請求的時間,默認是當即發送final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();if (this.brokerController.getMessageStore().now() < startTimstamp) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp)));return response;}response.setCode(-1);// 消息檢查=》super.msgCheck(ctx, requestHeader, response);if (response.getCode() != -1) {return response;}int queueIdInt = requestHeader.getQueueId();// 查詢topic配置TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());if (queueIdInt < 0) {queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();}// topic長度大於127,非法if (requestHeader.getTopic().length() > Byte.MAX_VALUE) {response.setCode(ResponseCode.MESSAGE_ILLEGAL);response.setRemark("message topic length too long " + requestHeader.getTopic().length());return response;}// 若是是重試topic,非法if (requestHeader.getTopic() != null && requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {response.setCode(ResponseCode.MESSAGE_ILLEGAL);response.setRemark("batch request does not support retry group " + requestHeader.getTopic());return response;}MessageExtBatch messageExtBatch = new MessageExtBatch();messageExtBatch.setTopic(requestHeader.getTopic());messageExtBatch.setQueueId(queueIdInt);int sysFlag = requestHeader.getSysFlag();if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) {sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG;}messageExtBatch.setSysFlag(sysFlag);messageExtBatch.setFlag(requestHeader.getFlag());MessageAccessor.setProperties(messageExtBatch, MessageDecoder.string2messageProperties(requestHeader.getProperties()));messageExtBatch.setBody(request.getBody());messageExtBatch.setBornTimestamp(requestHeader.getBornTimestamp());messageExtBatch.setBornHost(ctx.channel().remoteAddress());messageExtBatch.setStoreHost(this.getStoreHost());messageExtBatch.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());// 存儲批量消息=》PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessages(messageExtBatch);// 解析存儲結果=》return handlePutMessageResult(putMessageResult, response, request, messageExtBatch, responseHeader, sendMessageContext, ctx, queueIdInt); }
進入這個方法,消息檢查,org.apache.rocketmq.broker.processor.AbstractSendMessageProcessor#msgCheck
protected RemotingCommand msgCheck(final ChannelHandlerContext ctx,final SendMessageRequestHeader requestHeader, final RemotingCommand response) {if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())&& this.brokerController.getTopicConfigManager().isOrderTopic(requestHeader.getTopic())) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()+ "] sending message is forbidden");return response;}// 不是自動建立的topic不能自動發送消息=》if (!this.brokerController.getTopicConfigManager().isTopicCanSendMessage(requestHeader.getTopic())) {String errorMsg = "the topic[" + requestHeader.getTopic() + "] is conflict with system reserved words.";log.warn(errorMsg);response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark(errorMsg);return response;}// 查詢topic配置TopicConfig topicConfig =this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());if (null == topicConfig) {int topicSysFlag = 0;if (requestHeader.isUnitMode()) {// 根據是否是重試topic設置topicSysFlagif (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {topicSysFlag = TopicSysFlag.buildSysFlag(false, true);} else {topicSysFlag = TopicSysFlag.buildSysFlag(true, false);}}log.warn("the topic {} not exist, producer: {}", requestHeader.getTopic(), ctx.channel().remoteAddress());// 建立topic配置在發送消息以後=》topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod(requestHeader.getTopic(),requestHeader.getDefaultTopic(),RemotingHelper.parseChannelRemoteAddr(ctx.channel()),requestHeader.getDefaultTopicQueueNums(), topicSysFlag);if (null == topicConfig) {// 若是是重試topicif (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {// 在發送消息以後建立topic配置=》topicConfig =this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(requestHeader.getTopic(), 1, PermName.PERM_WRITE | PermName.PERM_READ,topicSysFlag);}}if (null == topicConfig) {response.setCode(ResponseCode.TOPIC_NOT_EXIST);response.setRemark("topic[" + requestHeader.getTopic() + "] not exist, apply first please!"+ FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));return response;}}int queueIdInt = requestHeader.getQueueId();int idValid = Math.max(topicConfig.getWriteQueueNums(), topicConfig.getReadQueueNums());if (queueIdInt >= idValid) {String errorInfo = String.format("request queueId[%d] is illegal, %s Producer: %s",queueIdInt,topicConfig.toString(),RemotingHelper.parseChannelRemoteAddr(ctx.channel()));log.warn(errorInfo);response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark(errorInfo);return response;}return response; }
進入這個方法,建立topic配置在發送消息以後,org.apache.rocketmq.broker.topic.TopicConfigManager#createTopicInSendMessageMethod前面介紹過了。
接下篇。
說在最後
本次解析僅表明我的觀點,僅供參考。
加入技術微信羣
釘釘技術羣