在進行經常使用的三種消息類型例子展現的時候,咱們先來講一說RocketMQ的幾個重要概念:java
- PullConsumer與PushConsumer:主要區別在於Pull與Push的區別。對於PullConsumer,消費者會主動從broker中拉取消息進行消費。而對於PushConsumer,會封裝包含消息獲取、消息處理以及其餘相關操做的接口給程序調用
- Tag: Tag能夠看作是一個子主題(sub-topic),能夠進一步細化主題下的相關子業務。提升程序的靈活性和可擴展性
- Broker:RocketMQ的核心組件之一。用來從生產者處接收消息,存儲消息以及將消息推送給消費者。同時RocketMQ的broker也用來存儲消息相關的數據,好比消費者組、消費處理的偏移量、主題以及消息隊列等
- Name Server: 能夠看作是一個信息路由器。生產者和消費者從NameServer中查找對應的主題以及相應的broker
這裏咱們不玩虛的,直接將三個類型的生產者,消費者代碼實例給出(在官網給出的例子上作了些許改動和註釋說明):apache
生產者代碼app
/** * 多種類型組合消息測試 * @author ziyuqi * */ public class MultiTypeProducer { public static void main(String[] args) throws Exception { // 順序消息生產者 FIFO OrderedProducer orderedProducer = new OrderedProducer(); orderedProducer.produce(); // 廣播消息生產者 /*BroadcastProducer broadcastProducer = new BroadcastProducer(); broadcastProducer.produce();*/ // 定時任務消息生產者 /*ScheduledProducer scheduledProducer = new ScheduledProducer(); scheduledProducer.produce();*/ } } /** * 按順序發送消息的生產者 * @author ziyuqi * */ class OrderedProducer { public void produce() throws Exception { DefaultMQProducer producer = new DefaultMQProducer("GroupD"); producer.setNamesrvAddr("localhost:9876"); producer.start(); String[] tags = new String[] {"tagA", "tagB", "tagC", "tagD", "tagE"}; for (int i=0; i<50; i++) { Message message = new Message("OrderedTopic", tags[i % tags.length], "KEY" + i, ("Ordered Msg:" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(message, new MessageQueueSelector() { /** * 所謂的順序,只能保證同一MessageQueue放入的消息知足FIFO。該方法返回應該將消息放入那個MessageQueue,最後一個參數爲send傳入的最後一個參數 * 若是須要全局保持FIFO,則全部消息應該依次放入同一隊列中去mqs隊列中的同一下標 */ @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { // 消息被分開放入多個隊列,每一個隊列中的消息保證按順序被消費FIFO /*int index = (Integer) arg % mqs.size(); System.out.println("QueueSize:" + mqs.size()); return mqs.get(index);*/ // 消息所有放入同一隊列,全局保持順序性 return mqs.get(0); } }, i); System.out.println(sendResult); } producer.shutdown(); } } /** * 廣播生產者 * @author ziyuqi * */ class BroadcastProducer { public void produce() throws Exception { DefaultMQProducer producer = new DefaultMQProducer("GroupA"); // 也必須設置nameServer producer.setNamesrvAddr("localhost:9876"); producer.start(); for (int i=0; i<50; i++) { Message message = new Message("BroadcastTopic", "tagA", "OrderID188", ("Ordered Msg:" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(message); System.out.println(sendResult); } producer.shutdown(); } } /** * 定時消息發送者 * @author ziyuqi * */ class ScheduledProducer { public void produce() throws Exception { DefaultMQProducer producer = new DefaultMQProducer("GroupA"); producer.setNamesrvAddr("localhost:9876"); producer.start(); for (int i=0; i<50; i++) { Message message = new Message("scheduledTopic", ("Message:" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); // 設置投遞的延遲時間 message.setDelayTimeLevel(3); SendResult sendResult = producer.send(message); System.out.println(sendResult); } producer.shutdown(); } }
消費者代碼異步
public class MultiTypeConsumer { public static void main(String[] args) throws Exception { // 按順序消費者 OrderedConsumer orderedConsumer = new OrderedConsumer(); orderedConsumer.consume(); // 廣播消費者 /*BroadcastConsumer broadcastConsumer = new BroadcastConsumer(); broadcastConsumer.consume();*/ // 定時任務消費者 /*ScheduledConsumer scheduledConsumer = new ScheduledConsumer(); scheduledConsumer.consume();*/ } } /** * 按順序的消費者 * @author ziyuqi * */ class OrderedConsumer { public void consume() throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GroupD"); /* * 設置從哪裏開始消費 : * 當設置爲: ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.setNamesrvAddr("localhost:9876"); // 設置定於的主題和tag(必須顯示指定tag) consumer.subscribe("OrderedTopic", "tagA || tagB || tagC || tagD || tagE"); consumer.setMessageListener(new MessageListenerOrderly() { AtomicLong num = new AtomicLong(0); @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { /** * 設置是否自動提交: 默認自動提交,提交以後消息就不可以被再次消費。 * 非自動提交時,消息可能會被重複消費 */ context.setAutoCommit(false); this.num.incrementAndGet(); try { for (MessageExt msg : msgs) { System.out.println("Received:num=" + this.num.get() +", queueId=" + msg.getQueueId() + ", Keys=" + msg.getKeys() + ", value=" + new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET)); } } catch (UnsupportedEncodingException e) { e.printStackTrace(); } /*try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }*/ if (this.num.get() % 3 == 0) { // return ConsumeOrderlyStatus.ROLLBACK; } else if (this.num.get() % 4 == 0) { return ConsumeOrderlyStatus.COMMIT; } else if (this.num.get() % 5 == 0) { context.setSuspendCurrentQueueTimeMillis(3000); return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } // 非主動提交的時候,SUCCESS不會致使隊列消息提交,消息未提交就能夠被循環消費 return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); } } class BroadcastConsumer { public void consume() throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GroupA"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); // 即便是廣播形式下,nameServer仍是要設置 consumer.setNamesrvAddr("localhost:9876"); // 設置消費的消息類型爲廣播類消息 consumer.setMessageModel(MessageModel.BROADCASTING); consumer.subscribe("BroadcastTopic", "tagA || tagB || tagC"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { try { for (MessageExt msg : msgs) { System.out.println("Received:" + new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET)); } } catch (UnsupportedEncodingException e) { e.printStackTrace(); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); } } /** * 定時任務消費者 * @author ziyuqi * */ class ScheduledConsumer { public void consume() throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GroupA"); consumer.setNamesrvAddr("localhost:9876"); consumer.subscribe("scheduledTopic", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { try { System.out.println("Received:[" + new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET) + "]" + (System.currentTimeMillis() - msg.getStoreTimestamp()) + " ms later!"); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); } }
結合我上面的測試代碼,以及我在測試中主要針對順序消費的疑惑和源碼調試。我這裏簡單分析下順序消費者的相關執行過程,大體的執行步驟以下:ide
消費者啓動函數
咱們知道每次consumer建立以後,都會調用consumer.start()
方法來啓動消費者。跟進代碼嵌套,不難發現最終會進入DefaultMQPushConsumerImpl
的start
方法中,該方法的主要代碼以下:源碼分析
public synchronized void start() throws MQClientException { switch (this.serviceState) { // 消費者啓動狀態知足Create_just case CREATE_JUST: log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(), this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode()); this.serviceState = ServiceState.START_FAILED; // 配置檢查 this.checkConfig(); this.copySubscription(); if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) { this.defaultMQPushConsumer.changeInstanceNameToPID(); } this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook); this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup()); this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel()); this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy()); this.rebalanceImpl.setmQClientFactory(this.mQClientFactory); this.pullAPIWrapper = new PullAPIWrapper( mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode()); this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList); if (this.defaultMQPushConsumer.getOffsetStore() != null) { this.offsetStore = this.defaultMQPushConsumer.getOffsetStore(); } else { switch (this.defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup()); break; case CLUSTERING: this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup()); break; default: break; } this.defaultMQPushConsumer.setOffsetStore(this.offsetStore); } this.offsetStore.load(); if (this.getMessageListenerInner() instanceof MessageListenerOrderly) { this.consumeOrderly = true; this.consumeMessageService = new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner()); } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) { this.consumeOrderly = false; this.consumeMessageService = new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner()); } this.consumeMessageService.start(); boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this); if (!registerOK) { this.serviceState = ServiceState.CREATE_JUST; this.consumeMessageService.shutdown(); throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null); } // 主要方法在這,啓動MQ客戶端工廠,進行消息拉取 mQClientFactory.start(); log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup()); this.serviceState = ServiceState.RUNNING; break; case RUNNING: case START_FAILED: case SHUTDOWN_ALREADY: throw new MQClientException("The PushConsumer service state not OK, maybe started once, " + this.serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null); default: break; } this.updateTopicSubscribeInfoWhenSubscriptionChanged(); this.mQClientFactory.checkClientInBroker(); this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); this.mQClientFactory.rebalanceImmediately(); }
MQClient啓動性能
上一段源碼咱們發現最終調用了mQClientFactory.start();
.咱們繼續跟進該方法,發現實際調用的是MQClientInstance.start()
測試
public void start() throws MQClientException { synchronized (this) { switch (this.serviceState) { case CREATE_JUST: this.serviceState = ServiceState.START_FAILED; // If not specified,looking address from name server if (null == this.clientConfig.getNamesrvAddr()) { this.mQClientAPIImpl.fetchNameServerAddr(); } // Start request-response channel this.mQClientAPIImpl.start(); // Start various schedule tasks this.startScheduledTask(); // Start pull service 關鍵點在這調用了pullMessageService的start方法 this.pullMessageService.start(); // Start rebalance service this.rebalanceService.start(); // Start push service this.defaultMQProducer.getDefaultMQProducerImpl().start(false); log.info("the client factory [{}] start OK", this.clientId); this.serviceState = ServiceState.RUNNING; break; case RUNNING: break; case SHUTDOWN_ALREADY: break; case START_FAILED: throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null); default: break; } } }
消息拉取fetch
根據上一段代碼的註釋,咱們進入到核心的消息推送代碼PullMessageService
的start
方法(實際上PullMessage繼承自Thread類,調用的是run方法):
@Override public void run() { log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { PullRequest pullRequest = this.pullRequestQueue.take(); this.pullMessage(pullRequest); // 重點轉移到該方法具體推送實現 } catch (InterruptedException ignored) { } catch (Exception e) { log.error("Pull Message Service Run Method exception", e); } } log.info(this.getServiceName() + " service end"); } private void pullMessage(final PullRequest pullRequest) { final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup()); if (consumer != null) { DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer; impl.pullMessage(pullRequest); // 調用默認的拉消息消費者實現 } else { log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest); } }
咱們繼續跟進DefaultMQPushConsumerImpl
的pullMessage
方法:
public void pullMessage(final PullRequest pullRequest) { // ... 省略 final long beginTimestamp = System.currentTimeMillis(); // 該回調函數實際是對消息消費的具體處理 PullCallback pullCallback = new PullCallback() { @Override public void onSuccess(PullResult pullResult) { if (pullResult != null) { pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult, subscriptionData); switch (pullResult.getPullStatus()) { case FOUND: long prevRequestOffset = pullRequest.getNextOffset(); pullRequest.setNextOffset(pullResult.getNextBeginOffset()); long pullRT = System.currentTimeMillis() - beginTimestamp; DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullRT); long firstMsgOffset = Long.MAX_VALUE; if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) { DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); } else { firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset(); DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size()); boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList()); // 向線程池丟入消費請求任務 DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest( pullResult.getMsgFoundList(), processQueue, pullRequest.getMessageQueue(), dispatchToConsume); if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) { DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval()); } else { DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); } } if (pullResult.getNextBeginOffset() < prevRequestOffset || firstMsgOffset < prevRequestOffset) { log.warn( "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}", pullResult.getNextBeginOffset(), firstMsgOffset, prevRequestOffset); } break; case NO_NEW_MSG: pullRequest.setNextOffset(pullResult.getNextBeginOffset()); DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest); DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); break; case NO_MATCHED_MSG: pullRequest.setNextOffset(pullResult.getNextBeginOffset()); DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest); DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); break; case OFFSET_ILLEGAL: log.warn("the pull request offset illegal, {} {}", pullRequest.toString(), pullResult.toString()); pullRequest.setNextOffset(pullResult.getNextBeginOffset()); pullRequest.getProcessQueue().setDropped(true); DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() { @Override public void run() { try { DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(), pullRequest.getNextOffset(), false); DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue()); DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue()); log.warn("fix the pull request offset, {}", pullRequest); } catch (Throwable e) { log.error("executeTaskLater Exception", e); } } }, 10000); break; default: break; } } } @Override public void onException(Throwable e) { if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn("execute the pull request exception", e); } DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION); } }; // ... 省略 try { this.pullAPIWrapper.pullKernelImpl( // 定義消息拉取核心實現的相關參數:包括拉取方式、回調函數等,最終會經過Netty遠程請求消息而後請求成功後調用回調方法 pullRequest.getMessageQueue(), subExpression, subscriptionData.getExpressionType(), subscriptionData.getSubVersion(), pullRequest.getNextOffset(), this.defaultMQPushConsumer.getPullBatchSize(), sysFlag, commitOffsetValue, BROKER_SUSPEND_MAX_TIME_MILLIS, CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, CommunicationMode.ASYNC, pullCallback ); } catch (Exception e) { log.error("pullKernelImpl exception", e); this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION); } }
以上代碼註釋有三個重點的地方,具體的處理流程大體是這樣。首先this.pullAPIWrapper.pullKernelImpl
這個方法定義了具體的消息拉取策略,內部實現其實會根據消息類型取拉取消息。對於默認的集羣消息模式,實際會調用Netty進行消息拉取,拉取結束後會調用註釋中的回調函數進行處理。最終實際會進入DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest
,而實際上對於順序消息消費會進入ConsumeMessageOrderlyService
的submitConsumeRequest
方法。該方法直接向消費線程池中放入一個消費請求任務。
消費請求任務
咱們繼續跟進ConsumeRequest
消費請求任務的具體實現:
@Override public void run() { if (this.processQueue.isDropped()) { log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue); return; } final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue); synchronized (objLock) { if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel()) || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) { final long beginTime = System.currentTimeMillis(); for (boolean continueConsume = true; continueConsume; ) { if (this.processQueue.isDropped()) { log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue); break; } if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel()) && !this.processQueue.isLocked()) { log.warn("the message queue not locked, so consume later, {}", this.messageQueue); ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10); break; } if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel()) && this.processQueue.isLockExpired()) { log.warn("the message queue lock expired, so consume later, {}", this.messageQueue); ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10); break; } long interval = System.currentTimeMillis() - beginTime; if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) { ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10); break; } final int consumeBatchSize = ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize(); List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize); if (!msgs.isEmpty()) { final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue); ConsumeOrderlyStatus status = null; ConsumeMessageContext consumeMessageContext = null; if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) { consumeMessageContext = new ConsumeMessageContext(); consumeMessageContext .setConsumerGroup(ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumerGroup()); consumeMessageContext.setMq(messageQueue); consumeMessageContext.setMsgList(msgs); consumeMessageContext.setSuccess(false); // init the consume context type consumeMessageContext.setProps(new HashMap<String, String>()); ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext); } long beginTimestamp = System.currentTimeMillis(); ConsumeReturnType returnType = ConsumeReturnType.SUCCESS; boolean hasException = false; try { this.processQueue.getLockConsume().lock(); if (this.processQueue.isDropped()) { log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}", this.messageQueue); break; } // 調用註冊的listener消費消息,而且獲得返回結果 status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context); } catch (Throwable e) { log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", RemotingHelper.exceptionSimpleDesc(e), ConsumeMessageOrderlyService.this.consumerGroup, msgs, messageQueue); hasException = true; } finally { this.processQueue.getLockConsume().unlock(); } if (null == status || ConsumeOrderlyStatus.ROLLBACK == status || ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) { log.warn("consumeMessage Orderly return not OK, Group: {} Msgs: {} MQ: {}", ConsumeMessageOrderlyService.this.consumerGroup, msgs, messageQueue); } long consumeRT = System.currentTimeMillis() - beginTimestamp; if (null == status) { if (hasException) { returnType = ConsumeReturnType.EXCEPTION; } else { returnType = ConsumeReturnType.RETURNNULL; } } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) { returnType = ConsumeReturnType.TIME_OUT; } else if (ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) { returnType = ConsumeReturnType.FAILED; } else if (ConsumeOrderlyStatus.SUCCESS == status) { returnType = ConsumeReturnType.SUCCESS; } if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) { consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name()); } if (null == status) { status = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) { consumeMessageContext.setStatus(status.toString()); consumeMessageContext .setSuccess(ConsumeOrderlyStatus.SUCCESS == status || ConsumeOrderlyStatus.COMMIT == status); ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext); } ConsumeMessageOrderlyService.this.getConsumerStatsManager() .incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT); // 處理Listener的返回結果 continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this); } else { continueConsume = false; } } } else { if (this.processQueue.isDropped()) { log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue); return; } ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100); } } }
能夠看出咱們開始會調用咱們實現的MessageListener對拉取到的消息進行消費,消費完成以後咱們會拿到消費結果,並對消費結果進行處理。
消費結果處理(COMMIT ROLLBACK)
咱們直接跟進消費結果處理代碼:
public boolean processConsumeResult( final List<MessageExt> msgs, final ConsumeOrderlyStatus status, final ConsumeOrderlyContext context, final ConsumeRequest consumeRequest ) { boolean continueConsume = true; long commitOffset = -1L; if (context.isAutoCommit()) { // 自動提交的狀況下 switch (status) { case COMMIT: case ROLLBACK: log.warn("the message queue consume result is illegal, we think you want to ack these message {}", consumeRequest.getMessageQueue()); case SUCCESS: commitOffset = consumeRequest.getProcessQueue().commit(); this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size()); break; case SUSPEND_CURRENT_QUEUE_A_MOMENT: this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size()); if (checkReconsumeTimes(msgs)) { consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs); this.submitConsumeRequestLater( consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue(), context.getSuspendCurrentQueueTimeMillis()); continueConsume = false; } else { commitOffset = consumeRequest.getProcessQueue().commit(); } break; default: break; } } else { switch (status) { // 非自動提交,需區別對待返回的處理結果 case SUCCESS: this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size()); break; case COMMIT: commitOffset = consumeRequest.getProcessQueue().commit(); break; case ROLLBACK: consumeRequest.getProcessQueue().rollback(); this.submitConsumeRequestLater( consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue(), context.getSuspendCurrentQueueTimeMillis()); continueConsume = false; break; case SUSPEND_CURRENT_QUEUE_A_MOMENT: this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size()); if (checkReconsumeTimes(msgs)) { consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs); this.submitConsumeRequestLater( consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue(), context.getSuspendCurrentQueueTimeMillis()); continueConsume = false; } break; default: break; } } if (commitOffset >= 0 && !consumeRequest.getProcessQueue().isDropped()) { this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), commitOffset, false); } return continueConsume; }
由於咱們例子中寫的是非自動提交,咱們就來看看非自動提交下ROLLBACK和COMMIT的具體實現(對應ProcessQueue
的相關方法):
public void rollback() { try { this.lockTreeMap.writeLock().lockInterruptibly(); try { /** * 當消費到KEY2的時候,由於num=3因此進入rollback方法 * 此時: * this.msgTreeMap包含全部未消費的消息 此時有 KEY3 --- KEY49 * this.consumingMsgOrderlyTreeMap 有全部按順序消費過的消息 KEY0 --- KEY2 * 不難看出一旦執行rollback,不只僅是將當前消費的消息從新放入消息隊列供再次消費,前面已經處理的消息 * 將都會從新放入消息隊列供再次消費。也就能解釋前面所出現的爲何自動提交設置爲false以後,消息重複消費 */ this.msgTreeMap.putAll(this.consumingMsgOrderlyTreeMap); this.consumingMsgOrderlyTreeMap.clear(); } finally { this.lockTreeMap.writeLock().unlock(); } } catch (InterruptedException e) { log.error("rollback exception", e); } } public long commit() { try { this.lockTreeMap.writeLock().lockInterruptibly(); try { // 獲取已順序消費消息隊列中最後一個消息的偏移值 Long offset = this.consumingMsgOrderlyTreeMap.lastKey(); // 原隊列消息個數減去已順序消費但未提交的消息個數爲剩下可繼續消費的消息個數 msgCount.addAndGet(0 - this.consumingMsgOrderlyTreeMap.size()); // 隊列消息總長度減去待提交的隊列消息總長度 for (MessageExt msg : this.consumingMsgOrderlyTreeMap.values()) { msgSize.addAndGet(0 - msg.getBody().length); } // 將已消費未提交的隊列列表清空 this.consumingMsgOrderlyTreeMap.clear(); if (offset != null) { return offset + 1; } } finally { this.lockTreeMap.writeLock().unlock(); } } catch (InterruptedException e) { log.error("commit exception", e); } return -1; }
至此,整個簡單的消費流程分析完成。
消費流程源碼分析總結
- Pull OR Push:即便是Push模式的Consumer,其最終實現仍是是經過Pull的方式來進行的
- Netty:集羣模式的遠程消息獲取是經過Netty來實現的
RocketMQ的經常使用三種消息生產消費模式到如今咱們就基本分析完了。我的認爲順序消息消費給須要順序執行的流程異步實現提供了強有力的支持。這一點特別適用於阿里當前的相關領域。固然RocketMQ也不是盡善盡美的,我我的在測試的時候發現順序消息消費的性能不算特別高,固然具體什麼緣由只有留到後續分析了。還有,由於這個項目開始是阿里內部研發的,可能源碼註釋上相比於其餘開源項目仍是要少一些,也沒有那麼清楚。以致於consumer.setConsumeFromWhere
這個的不一樣設值的具體區別在哪我尚未探究出來(想一想Spring的事務隔離級別以及傳遞特性相關常量的註釋基本一看就懂了),限於篇幅還有我趕忙趕去上班,就再也不繼續深究了(後面繼續)。