說在前面apache
發小消息處理器微信
源碼解析app
返回方法,在發送消息以後建立topic配置,org.apache.rocketmq.broker.topic.TopicConfigManager#createTopicInSendMessageBackMethod介紹過了。this
返回方法,存儲批量消息,org.apache.rocketmq.store.DefaultMessageStore#putMessages介紹過了。code
返回方法,解析存儲結果,org.apache.rocketmq.broker.processor.SendMessageProcessor#handlePutMessageResultserver
private RemotingCommand handlePutMessageResult(PutMessageResult putMessageResult, RemotingCommand response,RemotingCommand request, MessageExt msg,SendMessageResponseHeader responseHeader, SendMessageContext sendMessageContext, ChannelHandlerContext ctx,int queueIdInt) {if (putMessageResult == null) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("store putMessage return null");return response;}boolean sendOK = false;switch (putMessageResult.getPutMessageStatus()) {// Successcase PUT_OK:sendOK = true;response.setCode(ResponseCode.SUCCESS);break;case FLUSH_DISK_TIMEOUT:response.setCode(ResponseCode.FLUSH_DISK_TIMEOUT);sendOK = true;break;case FLUSH_SLAVE_TIMEOUT:response.setCode(ResponseCode.FLUSH_SLAVE_TIMEOUT);sendOK = true;break;case SLAVE_NOT_AVAILABLE:response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);sendOK = true;break;// Failedcase CREATE_MAPEDFILE_FAILED:response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("create mapped file failed, server is busy or broken.");break;case MESSAGE_ILLEGAL:case PROPERTIES_SIZE_EXCEEDED:response.setCode(ResponseCode.MESSAGE_ILLEGAL);response.setRemark("the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k.");break;case SERVICE_NOT_AVAILABLE:response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE);response.setRemark("service not available now, maybe disk full, " + diskUtil() + ", maybe your broker machine memory too small.");break;case OS_PAGECACHE_BUSY:response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("[PC_SYNCHRONIZED]broker busy, start flow control for a while");break;case UNKNOWN_ERROR:response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("UNKNOWN_ERROR");break;default:response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("UNKNOWN_ERROR DEFAULT");break;}String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);if (sendOK) {this.brokerController.getBrokerStatsManager().incTopicPutNums(msg.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1);this.brokerController.getBrokerStatsManager().incTopicPutSize(msg.getTopic(),putMessageResult.getAppendMessageResult().getWroteBytes());this.brokerController.getBrokerStatsManager().incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum());response.setRemark(null);responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());responseHeader.setQueueId(queueIdInt);responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());// 響應=》doResponse(ctx, request, response);if (hasSendMessageHook()) {sendMessageContext.setMsgId(responseHeader.getMsgId());sendMessageContext.setQueueId(responseHeader.getQueueId());sendMessageContext.setQueueOffset(responseHeader.getQueueOffset());int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount();int wroteSize = putMessageResult.getAppendMessageResult().getWroteBytes();int incValue = (int)Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT) * commercialBaseCount;sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_SUCCESS);sendMessageContext.setCommercialSendTimes(incValue);sendMessageContext.setCommercialSendSize(wroteSize);sendMessageContext.setCommercialOwner(owner);}return null;} else {if (hasSendMessageHook()) {int wroteSize = request.getBody().length;int incValue = (int)Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT);sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_FAILURE);sendMessageContext.setCommercialSendTimes(incValue);sendMessageContext.setCommercialSendSize(wroteSize);sendMessageContext.setCommercialOwner(owner);}}return response; }
進入方法,響應,org.apache.rocketmq.broker.processor.AbstractSendMessageProcessor#doResponseblog
protected void doResponse(ChannelHandlerContext ctx, RemotingCommand request,final RemotingCommand response) {if (!request.isOnewayRPC()) {try {ctx.writeAndFlush(response);} catch (Throwable e) {log.error("SendMessageProcessor process request over, but response failed", e);log.error(request.toString());log.error(response.toString());}} }
返回方法,發送消息,org.apache.rocketmq.broker.processor.SendMessageProcessor#sendMessage介紹過了。ci
返回方法,執行發送消息以後的鉤子方法,org.apache.rocketmq.broker.processor.AbstractSendMessageProcessor#executeSendMessageHookAfterget
public void executeSendMessageHookAfter(final RemotingCommand response, final SendMessageContext context) {if (hasSendMessageHook()) {for (SendMessageHook hook : this.sendMessageHookList) {try {if (response != null) {final SendMessageResponseHeader responseHeader =(SendMessageResponseHeader) response.readCustomHeader();context.setMsgId(responseHeader.getMsgId());context.setQueueId(responseHeader.getQueueId());context.setQueueOffset(responseHeader.getQueueOffset());context.setCode(response.getCode());context.setErrorMsg(response.getRemark());}hook.sendMessageAfter(context);} catch (Throwable e) {// Ignore}}} }
返回方法,org.apache.rocketmq.broker.processor.SendMessageProcessor#processRequest結束。源碼
說在最後
本次解析僅表明我的觀點,僅供參考。
加入技術微信羣
釘釘技術羣