本文主要分析RocketMQ中如何保證消息有序的。java
RocketMQ的版本爲:4.2.0 release。多線程
仍是老規矩,先把分析過程的時序圖擺出來:併發
1 DefaultMQProducer#send:發送消息,入參中有自定義的消息隊列選擇器。負載均衡
// DefaultMQProducer#send public SendResult send(Message msg, MessageQueueSelector selector, Object arg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return this.defaultMQProducerImpl.send(msg, selector, arg); }
1.1 DefaultMQProducerImpl#makeSureStateOK:確保Producer的狀態是運行狀態-ServiceState.RUNNING。異步
// DefaultMQProducerImpl#makeSureStateOK private void makeSureStateOK() throws MQClientException { if (this.serviceState != ServiceState.RUNNING) { throw new MQClientException("The producer service state not OK, "+ this.serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null); } }
1.2 DefaultMQProducerImpl#tryToFindTopicPublishInfo:根據Topic獲取發佈Topic用到的路由信息。ide
// DefaultMQProducerImpl#tryToFindTopicPublishInfo private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) { TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic); if (null == topicPublishInfo || !topicPublishInfo.ok()) { this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo()); this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);// 爲空則從 NameServer更新獲取,false,不傳入 defaultMQProducer topicPublishInfo = this.topicPublishInfoTable.get(topic); } if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {// 有了路由信息並且狀態OK,則返回 return topicPublishInfo; } else { this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer); topicPublishInfo = this.topicPublishInfoTable.get(topic); return topicPublishInfo; } }
1.3 調用自定義消息隊列選擇器的select方法。源碼分析
// DefaultMQProducerImpl#sendSelectImpl MessageQueue mq = null; try { mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg); } catch (Throwable e) { throw new MQClientException("select message queue throwed exception.", e); } // Producer#main SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); } }, orderId);
1.4 DefaultMQProducerImpl#sendKernelImpl:發送消息的核心實現方法。ui
// DefaultMQProducerImpl#sendKernelImpl ...... switch (communicationMode) { case SYNC: long costTimeSync = System.currentTimeMillis() - beginStartTime; if (timeout < costTimeSync) { throw new RemotingTooMuchRequestException("sendKernelImpl call timeout"); } sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( brokerAddr, mq.getBrokerName(), msg, requestHeader, timeout - costTimeSync, communicationMode, context, this); break; ......
1.4.1 MQClientAPIImpl#sendMessage:發送消息。this
// MQClientAPIImpl#sendMessage ...... switch (communicationMode) {// 根據發送消息的模式(同步/異步)選擇不一樣的方式,默認是同步 case SYNC: long costTimeSync = System.currentTimeMillis() - beginStartTime; if (timeoutMillis < costTimeSync) { throw new RemotingTooMuchRequestException("sendMessage call timeout"); } return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request); ......
1.4.1.1 MQClientAPIImpl#sendMessageSync:發送同步消息。spa
// MQClientAPIImpl#sendMessageSync private SendResult sendMessageSync( final String addr, final String brokerName, final Message msg, final long timeoutMillis, final RemotingCommand request ) throws RemotingException, MQBrokerException, InterruptedException { RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis); assert response != null; return this.processSendResponse(brokerName, msg, response); }
1.4.1.1.1 NettyRemotingClient#invokeSync:構造RemotingCommand,調用的方式是同步。
// NettyRemotingClient#invokeSync RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime); if (this.rpcHook != null) { this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel), request, response); } return response;
1 DefaultMQPushConsumer#registerMessageListener:把Consumer傳入的消息監聽器加入到messageListener中。
// DefaultMQPushConsumer#registerMessageListener public void registerMessageListener(MessageListenerOrderly messageListener) { this.messageListener = messageListener; this.defaultMQPushConsumerImpl.registerMessageListener(messageListener); }
1.1 DefaultMQPushConsumerImpl#registerMessageListener:把Consumer傳入的消息監聽器加入到messageListenerInner中。
// DefaultMQPushConsumerImpl#registerMessageListener public void registerMessageListener(MessageListener messageListener) { this.messageListenerInner = messageListener; }
2 DefaultMQPushConsumer#start:啓動Consumer。
// DefaultMQPushConsumer#start public void start() throws MQClientException { this.defaultMQPushConsumerImpl.start(); }
2.1 DefaultMQPushConsumerImpl#start:啓動ConsumerImpl。
// DefaultMQPushConsumerImpl#start switch (this.serviceState) { case CREATE_JUST:// 剛剛建立 ...... if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {// 有序消息服務 this.consumeOrderly = true; this.consumeMessageService = new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner()); } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {// 併發無序消息服務 this.consumeOrderly = false; this.consumeMessageService = new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner()); } ...... this.consumeMessageService.start();// 啓動消息服務 ...... mQClientFactory.start();// 啓動MQClientInstance ......
2.1.1 new ConsumeMessageOrderlyService():構造順序消息服務。
// ConsumeMessageOrderlyService#ConsumeMessageOrderlyService public ConsumeMessageOrderlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl, MessageListenerOrderly messageListener) { this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl; this.messageListener = messageListener; this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer(); this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup(); this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>(); this.consumeExecutor = new ThreadPoolExecutor(// 主消息消費線程池,正常執行收到的ConsumeRequest。多線程 this.defaultMQPushConsumer.getConsumeThreadMin(), this.defaultMQPushConsumer.getConsumeThreadMax(), 1000 * 60, TimeUnit.MILLISECONDS, this.consumeRequestQueue, new ThreadFactoryImpl("ConsumeMessageThread_")); this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_")); }
2.1.2 ConsumeMessageOrderlyService#start:啓動消息隊列客戶端實例。
// DefaultMQPushConsumerImpl#start this.consumeMessageService.start(); // ConsumeMessageOrderlyService#start public void start() { if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { ConsumeMessageOrderlyService.this.lockMQPeriodically();// 定時向broker發送批量鎖住當前正在消費的隊列集合的消息 } }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS); } }
2.1.2.1 ConsumeMessageOrderlyService#lockMQPeriodically:定時向broker發送批量鎖住當前正在消費的隊列集合的消息。
2.1.2.1.1 RebalanceImpl#lockAll:鎖住全部正在消息的隊列。
// ConsumeMessageOrderlyService#lockMQPeriodically if (!this.stopped) { this.defaultMQPushConsumerImpl.getRebalanceImpl().lockAll(); } // RebalanceImpl#lockAll HashMap<String, Set<MessageQueue>> brokerMqs = this.buildProcessQueueTableByBrokerName();// 根據brokerName從processQueueTable獲取正在消費的隊列集合 ...... Set<MessageQueue> lockOKMQSet = this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);// 向Broker發送鎖住消息隊列的指令 for (MessageQueue mq : lockOKMQSet) { ProcessQueue processQueue = this.processQueueTable.get(mq); if (processQueue != null) { if (!processQueue.isLocked()) { log.info("the message queue locked OK, Group: {} {}", this.consumerGroup, mq); } processQueue.setLocked(true); processQueue.setLastLockTimestamp(System.currentTimeMillis()); } } ......
2.1.3 MQClientInstance#start:啓動MQClientInstance。過程較複雜,放到大標題四中分析。
// DefaultMQPushConsumerImpl#start mQClientFactory.start();
1 MQClientInstance#start:啓動客戶端實例MQClientInstance。
// MQClientInstance#start synchronized (this) { switch (this.serviceState) { case CREATE_JUST: ...... // Start pull service 啓動拉取消息服務 this.pullMessageService.start(); // Start rebalance service 啓動消費端負載均衡服務 this.rebalanceService.start(); ......
1.1 PullMessageService#run:啓動拉取消息服務。實際調用的是DefaultMQPushConsumerImpl的pullMessage方法。
// PullMessageService#run public void run() { log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { PullRequest pullRequest = this.pullRequestQueue.take(); this.pullMessage(pullRequest); } catch (InterruptedException ignored) { } catch (Exception e) { log.error("Pull Message Service Run Method exception", e); } } log.info(this.getServiceName() + " service end"); } // PullMessageService#pullMessage private void pullMessage(final PullRequest pullRequest) { final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup()); if (consumer != null) { DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer; impl.pullMessage(pullRequest);// 調用DefaultMQPushConsumerImpl的pullMessage } else { log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest); } }
1.1.1.1 DefaultMQPushConsumerImpl#pullMessage:拉取消息。提交到ConsumeMessageOrderlyService的線程池consumeExecutor中執行。
// DefaultMQPushConsumerImpl#pullMessage ...... PullCallback pullCallback = new PullCallback() { @Override public void onSuccess(PullResult pullResult) { switch (pullResult.getPullStatus()) { case FOUND: long prevRequestOffset = pullRequest.getNextOffset(); pullRequest.setNextOffset(pullResult.getNextBeginOffset()); long pullRT = System.currentTimeMillis() - beginTimestamp; ...... DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest( pullResult.getMsgFoundList(), processQueue, pullRequest.getMessageQueue(), dispatchToConsume); ......
1.1.1.1.1.1.1 ConsumeRequest#run:處理消息消費的線程。
// ConsumeMessageOrderlyService.ConsumeRequest#run List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize); ...... long beginTimestamp = System.currentTimeMillis(); ConsumeReturnType returnType = ConsumeReturnType.SUCCESS; boolean hasException = false; try { this.processQueue.getLockConsume().lock(); if (this.processQueue.isDropped()) { log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",this.messageQueue); break; } status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);// 實際消費消息的地方,回調消息監聽器的consumeMessage方法 } catch (Throwable e) { log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", RemotingHelper.exceptionSimpleDesc(e), ConsumeMessageOrderlyService.this.consumerGroup, msgs,messageQueue); hasException = true; } finally { this.processQueue.getLockConsume().unlock(); } ......
1.2 RebalanceService#run:啓動消息端負載均衡服務。
// RebalanceService#run public void run() { log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { this.waitForRunning(waitInterval); this.mqClientFactory.doRebalance(); } log.info(this.getServiceName() + " service end"); } // MQClientInstance#doRebalance public void doRebalance() { for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) { MQConsumerInner impl = entry.getValue(); if (impl != null) { try { impl.doRebalance(); } catch (Throwable e) { log.error("doRebalance exception", e); } } } } // DefaultMQPushConsumerImpl#doRebalance public void doRebalance() { if (!this.pause) { this.rebalanceImpl.doRebalance(this.isConsumeOrderly()); } }
1.2.1.1.1 RebalanceImpl#doRebalance:負載均衡服務類處理。
// RebalanceImpl#doRebalance public void doRebalance(final boolean isOrder) { Map<String, SubscriptionData> subTable = this.getSubscriptionInner(); if (subTable != null) { for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) { final String topic = entry.getKey(); try { this.rebalanceByTopic(topic, isOrder); } catch (Throwable e) { if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn("rebalanceByTopic Exception", e); } } } } this.truncateMessageQueueNotMyTopic(); } // RebalanceImpl#rebalanceByTopic switch (messageModel) { case BROADCASTING: { Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic); if (mqSet != null) { boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);// 根據Toipc去除queue if (changed) { this.messageQueueChanged(topic, mqSet, mqSet); log.info("messageQueueChanged {} {} {} {}", consumerGroup, topic, mqSet, mqSet); } } else { ...... // RebalanceImpl#updateProcessQueueTableInRebalance this.dispatchPullRequest(pullRequestList);// RebalancePushImpl分發消息
1.2.1.1.1.1.1.1 RebalancePushImpl#dispatchPullRequest:RebalancePushImpl分發。
// RebalancePushImpl#dispatchPullRequest public void dispatchPullRequest(List<PullRequest> pullRequestList) { for (PullRequest pullRequest : pullRequestList) { this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest); log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest); } }
相比Producer的發送流程,Consumer的接收流程稍微複雜一點。經過上面的源碼分析,能夠知道RocketMQ是怎樣保證消息的有序的:
1.經過ReblanceImp的lockAll方法,每隔一段時間定時鎖住當前消費端正在消費的隊列。設置本地隊列ProcessQueue的locked屬性爲true。保證broker中的每一個消息隊列只對應一個消費端;
2.另外,消費端也是經過鎖,保證每一個ProcessQueue只有一個線程消費。