本文主要研究一下rocketmq的PushConsumerImpljava
io/openmessaging/rocketmq/consumer/PushConsumerImpl.javagit
public class PushConsumerImpl implements PushConsumer { private final DefaultMQPushConsumer rocketmqPushConsumer; private final KeyValue properties; private boolean started = false; private final Map<String, MessageListener> subscribeTable = new ConcurrentHashMap<>(); private final ClientConfig clientConfig; public PushConsumerImpl(final KeyValue properties) { this.rocketmqPushConsumer = new DefaultMQPushConsumer(); this.properties = properties; this.clientConfig = BeanUtils.populate(properties, ClientConfig.class); String accessPoints = clientConfig.getOmsAccessPoints(); if (accessPoints == null || accessPoints.isEmpty()) { throw new OMSRuntimeException("-1", "OMS AccessPoints is null or empty."); } this.rocketmqPushConsumer.setNamesrvAddr(accessPoints.replace(',', ';')); String consumerGroup = clientConfig.getRmqConsumerGroup(); if (null == consumerGroup || consumerGroup.isEmpty()) { throw new OMSRuntimeException("-1", "Consumer Group is necessary for RocketMQ, please set it."); } this.rocketmqPushConsumer.setConsumerGroup(consumerGroup); this.rocketmqPushConsumer.setMaxReconsumeTimes(clientConfig.getRmqMaxRedeliveryTimes()); this.rocketmqPushConsumer.setConsumeTimeout(clientConfig.getRmqMessageConsumeTimeout()); this.rocketmqPushConsumer.setConsumeThreadMax(clientConfig.getRmqMaxConsumeThreadNums()); this.rocketmqPushConsumer.setConsumeThreadMin(clientConfig.getRmqMinConsumeThreadNums()); String consumerId = OMSUtil.buildInstanceName(); this.rocketmqPushConsumer.setInstanceName(consumerId); properties.put(PropertyKeys.CONSUMER_ID, consumerId); this.rocketmqPushConsumer.registerMessageListener(new MessageListenerImpl()); } @Override public KeyValue properties() { return properties; } @Override public void resume() { this.rocketmqPushConsumer.resume(); } @Override public void suspend() { this.rocketmqPushConsumer.suspend(); } @Override public boolean isSuspended() { return this.rocketmqPushConsumer.getDefaultMQPushConsumerImpl().isPause(); } @Override public PushConsumer attachQueue(final String queueName, final MessageListener listener) { this.subscribeTable.put(queueName, listener); try { this.rocketmqPushConsumer.subscribe(queueName, "*"); } catch (MQClientException e) { throw new OMSRuntimeException("-1", String.format("RocketMQ push consumer can't attach to %s.", queueName)); } return this; } @Override public synchronized void startup() { if (!started) { try { this.rocketmqPushConsumer.start(); } catch (MQClientException e) { throw new OMSRuntimeException("-1", e); } } this.started = true; } @Override public synchronized void shutdown() { if (this.started) { this.rocketmqPushConsumer.shutdown(); } this.started = false; } //...... }
io/openmessaging/rocketmq/consumer/PushConsumerImpl.javagithub
class MessageListenerImpl implements MessageListenerConcurrently { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> rmqMsgList, ConsumeConcurrentlyContext contextRMQ) { MessageExt rmqMsg = rmqMsgList.get(0); BytesMessage omsMsg = OMSUtil.msgConvert(rmqMsg); MessageListener listener = PushConsumerImpl.this.subscribeTable.get(rmqMsg.getTopic()); if (listener == null) { throw new OMSRuntimeException("-1", String.format("The topic/queue %s isn't attached to this consumer", rmqMsg.getTopic())); } final KeyValue contextProperties = OMS.newKeyValue(); final CountDownLatch sync = new CountDownLatch(1); contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, ConsumeConcurrentlyStatus.RECONSUME_LATER.name()); ReceivedMessageContext context = new ReceivedMessageContext() { @Override public KeyValue properties() { return contextProperties; } @Override public void ack() { sync.countDown(); contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, ConsumeConcurrentlyStatus.CONSUME_SUCCESS.name()); } @Override public void ack(final KeyValue properties) { sync.countDown(); contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, properties.getString(NonStandardKeys.MESSAGE_CONSUME_STATUS)); } }; long begin = System.currentTimeMillis(); listener.onMessage(omsMsg, context); long costs = System.currentTimeMillis() - begin; long timeoutMills = clientConfig.getRmqMessageConsumeTimeout() * 60 * 1000; try { sync.await(Math.max(0, timeoutMills - costs), TimeUnit.MILLISECONDS); } catch (InterruptedException ignore) { } return ConsumeConcurrentlyStatus.valueOf(contextProperties.getString(NonStandardKeys.MESSAGE_CONSUME_STATUS)); } }
org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.javaapache
public synchronized void start() throws MQClientException { switch (this.serviceState) { case CREATE_JUST: log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(), this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode()); this.serviceState = ServiceState.START_FAILED; this.checkConfig(); this.copySubscription(); if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) { this.defaultMQPushConsumer.changeInstanceNameToPID(); } this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook); this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup()); this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel()); this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy()); this.rebalanceImpl.setmQClientFactory(this.mQClientFactory); this.pullAPIWrapper = new PullAPIWrapper( mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode()); this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList); if (this.defaultMQPushConsumer.getOffsetStore() != null) { this.offsetStore = this.defaultMQPushConsumer.getOffsetStore(); } else { switch (this.defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup()); break; case CLUSTERING: this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup()); break; default: break; } this.defaultMQPushConsumer.setOffsetStore(this.offsetStore); } this.offsetStore.load(); if (this.getMessageListenerInner() instanceof MessageListenerOrderly) { this.consumeOrderly = true; this.consumeMessageService = new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner()); } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) { this.consumeOrderly = false; this.consumeMessageService = new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner()); } this.consumeMessageService.start(); boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this); if (!registerOK) { this.serviceState = ServiceState.CREATE_JUST; this.consumeMessageService.shutdown(); throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null); } mQClientFactory.start(); log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup()); this.serviceState = ServiceState.RUNNING; break; case RUNNING: case START_FAILED: case SHUTDOWN_ALREADY: throw new MQClientException("The PushConsumer service state not OK, maybe started once, " + this.serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null); default: break; } this.updateTopicSubscribeInfoWhenSubscriptionChanged(); this.mQClientFactory.checkClientInBroker(); this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); this.mQClientFactory.rebalanceImmediately(); }
org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.javaapp
@Override public ConsumeMessageDirectlyResult consumeMessageDirectly(MessageExt msg, String brokerName) { ConsumeMessageDirectlyResult result = new ConsumeMessageDirectlyResult(); result.setOrder(false); result.setAutoCommit(true); List<MessageExt> msgs = new ArrayList<MessageExt>(); msgs.add(msg); MessageQueue mq = new MessageQueue(); mq.setBrokerName(brokerName); mq.setTopic(msg.getTopic()); mq.setQueueId(msg.getQueueId()); ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(mq); this.resetRetryTopic(msgs); final long beginTime = System.currentTimeMillis(); log.info("consumeMessageDirectly receive new message: {}", msg); try { ConsumeConcurrentlyStatus status = this.messageListener.consumeMessage(msgs, context); if (status != null) { switch (status) { case CONSUME_SUCCESS: result.setConsumeResult(CMResult.CR_SUCCESS); break; case RECONSUME_LATER: result.setConsumeResult(CMResult.CR_LATER); break; default: break; } } else { result.setConsumeResult(CMResult.CR_RETURN_NULL); } } catch (Throwable e) { result.setConsumeResult(CMResult.CR_THROW_EXCEPTION); result.setRemark(RemotingHelper.exceptionSimpleDesc(e)); log.warn(String.format("consumeMessageDirectly exception: %s Group: %s Msgs: %s MQ: %s", RemotingHelper.exceptionSimpleDesc(e), ConsumeMessageConcurrentlyService.this.consumerGroup, msgs, mq), e); } result.setSpentTimeMills(System.currentTimeMillis() - beginTime); log.info("consumeMessageDirectly Result: {}", result); return result; }
org/apache/rocketmq/client/impl/ClientRemotingProcessor.javaide
public class ClientRemotingProcessor implements NettyRequestProcessor { private final Logger log = ClientLogger.getLog(); private final MQClientInstance mqClientFactory; public ClientRemotingProcessor(final MQClientInstance mqClientFactory) { this.mqClientFactory = mqClientFactory; } @Override public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { switch (request.getCode()) { case RequestCode.CHECK_TRANSACTION_STATE: return this.checkTransactionState(ctx, request); case RequestCode.NOTIFY_CONSUMER_IDS_CHANGED: return this.notifyConsumerIdsChanged(ctx, request); case RequestCode.RESET_CONSUMER_CLIENT_OFFSET: return this.resetOffset(ctx, request); case RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT: return this.getConsumeStatus(ctx, request); case RequestCode.GET_CONSUMER_RUNNING_INFO: return this.getConsumerRunningInfo(ctx, request); case RequestCode.CONSUME_MESSAGE_DIRECTLY: return this.consumeMessageDirectly(ctx, request); default: break; } return null; } private RemotingCommand consumeMessageDirectly(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final ConsumeMessageDirectlyResultRequestHeader requestHeader = (ConsumeMessageDirectlyResultRequestHeader) request .decodeCommandCustomHeader(ConsumeMessageDirectlyResultRequestHeader.class); final MessageExt msg = MessageDecoder.decode(ByteBuffer.wrap(request.getBody())); ConsumeMessageDirectlyResult result = this.mqClientFactory.consumeMessageDirectly(msg, requestHeader.getConsumerGroup(), requestHeader.getBrokerName()); if (null != result) { response.setCode(ResponseCode.SUCCESS); response.setBody(result.encode()); } else { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark(String.format("The Consumer Group <%s> not exist in this consumer", requestHeader.getConsumerGroup())); } return response; } //...... }
org/apache/rocketmq/client/impl/factory/MQClientInstance.javaui
public ConsumeMessageDirectlyResult consumeMessageDirectly(final MessageExt msg, final String consumerGroup, final String brokerName) { MQConsumerInner mqConsumerInner = this.consumerTable.get(consumerGroup); if (null != mqConsumerInner) { DefaultMQPushConsumerImpl consumer = (DefaultMQPushConsumerImpl) mqConsumerInner; ConsumeMessageDirectlyResult result = consumer.getConsumeMessageService().consumeMessageDirectly(msg, brokerName); return result; } return null; }
實現MessageListenerConcurrently接口
)的consumeMessage方法實現MessageListenerOrderly接口
)的consumeMessage方法