說在前面apache
請求處理 直接消費消息 微信
源碼解析併發
進入這個方法,直接消費消息,org.apache.rocketmq.client.impl.ClientRemotingProcessor#consumeMessageDirectlyide
private RemotingCommand consumeMessageDirectly(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {final RemotingCommand response = RemotingCommand.createResponseCommand(null);final ConsumeMessageDirectlyResultRequestHeader requestHeader =(ConsumeMessageDirectlyResultRequestHeader) request.decodeCommandCustomHeader(ConsumeMessageDirectlyResultRequestHeader.class);// 請求體編碼=》final MessageExt msg = MessageDecoder.decode(ByteBuffer.wrap(request.getBody()));// 當即消費消息=》ConsumeMessageDirectlyResult result =this.mqClientFactory.consumeMessageDirectly(msg, requestHeader.getConsumerGroup(), requestHeader.getBrokerName());if (null != result) {response.setCode(ResponseCode.SUCCESS);response.setBody(result.encode());} else {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark(String.format("The Consumer Group <%s> not exist in this consumer", requestHeader.getConsumerGroup()));}return response; }
進入這個方法,當即消費消息,org.apache.rocketmq.client.impl.factory.MQClientInstance#consumeMessageDirectlythis
public ConsumeMessageDirectlyResult consumeMessageDirectly(final MessageExt msg,final String consumerGroup,final String brokerName) {// 獲取消費組的消費者MQConsumerInner mqConsumerInner = this.consumerTable.get(consumerGroup);if (null != mqConsumerInner) {DefaultMQPushConsumerImpl consumer = (DefaultMQPushConsumerImpl) mqConsumerInner;// 當即消費消息=》ConsumeMessageDirectlyResult result = consumer.getConsumeMessageService().consumeMessageDirectly(msg, brokerName);return result;}return null; }
進入這個方法,當即消費消息,併發消費,org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#consumeMessageDirectly編碼
@Overridepublic ConsumeMessageDirectlyResult consumeMessageDirectly(MessageExt msg, String brokerName) {ConsumeMessageDirectlyResult result = new ConsumeMessageDirectlyResult();result.setOrder(false);result.setAutoCommit(true);List<MessageExt> msgs = new ArrayList<MessageExt>();msgs.add(msg);MessageQueue mq = new MessageQueue();mq.setBrokerName(brokerName);mq.setTopic(msg.getTopic());mq.setQueueId(msg.getQueueId());ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(mq);// 重置重試topic=》this.resetRetryTopic(msgs);final long beginTime = System.currentTimeMillis();log.info("consumeMessageDirectly receive new message: {}", msg);try {// 消費消息,須要開發實現本身的消息消費邏輯ConsumeConcurrentlyStatus status = this.messageListener.consumeMessage(msgs, context);if (status != null) {switch (status) {case CONSUME_SUCCESS:result.setConsumeResult(CMResult.CR_SUCCESS);break;case RECONSUME_LATER:result.setConsumeResult(CMResult.CR_LATER);break;default:break;}} else {result.setConsumeResult(CMResult.CR_RETURN_NULL);}} catch (Throwable e) {result.setConsumeResult(CMResult.CR_THROW_EXCEPTION);result.setRemark(RemotingHelper.exceptionSimpleDesc(e));log.warn(String.format("consumeMessageDirectly exception: %s Group: %s Msgs: %s MQ: %s",RemotingHelper.exceptionSimpleDesc(e),ConsumeMessageConcurrentlyService.this.consumerGroup,msgs,mq), e);}result.setSpentTimeMills(System.currentTimeMillis() - beginTime);log.info("consumeMessageDirectly Result: {}", result);return result; }
進入這個方法,重置重試topic,org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#resetRetryTopiccode
public void resetRetryTopic(final List<MessageExt> msgs) {final String groupTopic = MixAll.getRetryTopic(consumerGroup);for (MessageExt msg : msgs) {String retryTopic = msg.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);if (retryTopic != null && groupTopic.equals(msg.getTopic())) {msg.setTopic(retryTopic);}} }
進入這個方法,順序消費消息,org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService#consumeMessageDirectlyorm
@Overridepublic ConsumeMessageDirectlyResult consumeMessageDirectly(MessageExt msg, String brokerName) {ConsumeMessageDirectlyResult result = new ConsumeMessageDirectlyResult();result.setOrder(true);List<MessageExt> msgs = new ArrayList<MessageExt>();msgs.add(msg);MessageQueue mq = new MessageQueue();mq.setBrokerName(brokerName);mq.setTopic(msg.getTopic());mq.setQueueId(msg.getQueueId());ConsumeOrderlyContext context = new ConsumeOrderlyContext(mq);final long beginTime = System.currentTimeMillis();log.info("consumeMessageDirectly receive new message: {}", msg);try {// 消費消息,須要開發實現本身的消息消費邏輯=》ConsumeOrderlyStatus status = this.messageListener.consumeMessage(msgs, context);if (status != null) {switch (status) {case COMMIT:result.setConsumeResult(CMResult.CR_COMMIT);break;case ROLLBACK:result.setConsumeResult(CMResult.CR_ROLLBACK);break;case SUCCESS:result.setConsumeResult(CMResult.CR_SUCCESS);break;case SUSPEND_CURRENT_QUEUE_A_MOMENT:result.setConsumeResult(CMResult.CR_LATER);break;default:break;}} else {result.setConsumeResult(CMResult.CR_RETURN_NULL);}} catch (Throwable e) {result.setConsumeResult(CMResult.CR_THROW_EXCEPTION);result.setRemark(RemotingHelper.exceptionSimpleDesc(e));log.warn(String.format("consumeMessageDirectly exception: %s Group: %s Msgs: %s MQ: %s",RemotingHelper.exceptionSimpleDesc(e),ConsumeMessageOrderlyService.this.consumerGroup,msgs,mq), e);}result.setAutoCommit(context.isAutoCommit());result.setSpentTimeMills(System.currentTimeMillis() - beginTime);log.info("consumeMessageDirectly Result: {}", result);return result; }
往上返回到這個方法,org.apache.rocketmq.client.impl.ClientRemotingProcessor#consumeMessageDirectly結束。blog
說在最後開發
本次解析僅表明我的觀點,僅供參考。
加入技術微信羣
釘釘技術羣