深刻理解RocketMq普通消息和順序消息使用,原理,優化

1. 背景

最近一直再作一些系統上的壓測,並對一些問題作了優化,從這些裏面收穫了一些不少好的優化經驗,後續的文章都會以這方面爲主。緩存

此次打壓的過程當中收穫比較的大的是,對RocketMq的一些優化。最開始咱們公司使用的是RabbitMq,再一些流量高峯的場景下,發現隊列堆積比較嚴重,致使RabbitMq掛了。爲了應對這個場景,最終咱們引入了阿里雲的RocketMq,RocketMq能夠處理能夠處理不少消息堆積,而且服務的穩定不掛也能夠由阿里雲保證。引入了RocketMq了以後,的確解決了隊列堆積致使消息隊列宕機的問題。安全

原本覺得使用了RocketMq以後,能夠萬事無憂,可是其實在打壓過程當中發現了很多問題,這裏先提幾個問題,你們帶着這幾個問題在文中去尋找答案:併發

  1. 在RocketMq中,若是消息隊列發生堆積,consumer會發生什麼樣的影響?
  2. 在RocketMq中,普通消息和順序消息有沒有什麼辦法提高消息消費速度?
  3. 消息失敗重試次數怎麼設置較爲合理?順序消息和普通消息有不一樣嗎?

2. 普通消息 VS 順序消息

再RocketMq中提供了多種消息類型讓咱們進行配置:app

  • 普通消息:沒有特殊功能的消息。
  • 分區順序消息:以分區緯度保持順序進行消費的消息。
  • 全局順序消息:全局順序消息能夠看做是隻分一個區,始終再同一個分區上進行消費。
  • 定時/延時消息:消息能夠延遲一段特定時間進行消費。
  • 事務消息:二階段事務消息,先進行prepare投遞消息,此時不能進行消息消費,當二階段發出commit或者rollback的時候纔會進行消息的消費或者回滾。

雖然配置種類比較繁多,可是使用得仍是普通消息和分區順序消息。後續主要講得也是這兩種消息。異步

2.1 發送消息

2.1.1 普通消息

普通消息的發送的代碼比較簡單,以下所示:ide

public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("test_group_producer");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        
        Message msg =
                new Message("Test_Topic", "test_tag", ("Hello World").getBytes(RemotingHelper.DEFAULT_CHARSET));
        SendResult sendResult = producer.send(msg);
        System.out.printf("%s%n", sendResult);
        producer.shutdown();
    }

其內部核心代碼爲:高併發

private SendResult sendDefaultImpl(Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        // 1. 根據 topic找到publishInfo
        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
        if (topicPublishInfo != null && topicPublishInfo.ok()) {
            boolean callTimeout = false;
            MessageQueue mq = null;
            Exception exception = null;
            SendResult sendResult = null;
            // 若是是同步 就三次 不然就1次
            int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
            int times = 0;
            String[] brokersSent = new String[timesTotal];
            // 循環
            for (; times < timesTotal; times++) {
                String lastBrokerName = null == mq ? null : mq.getBrokerName();
                MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
                if (mqSelected != null) {
                    mq = mqSelected;
                    brokersSent[times] = mq.getBrokerName();
                    try {
                        beginTimestampPrev = System.currentTimeMillis();
                        if (times > 0) {
                            //Reset topic with namespace during resend.
                            msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
                        }
                        long costTime = beginTimestampPrev - beginTimestampFirst;
                        if (timeout < costTime) {
                            callTimeout = true;
                            break;
                        }

                        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                        endTimestamp = System.currentTimeMillis();
                        // 更新延遲
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                        switch (communicationMode) {
                            case ASYNC:
                                return null;
                            case ONEWAY:
                                return null;
                            case SYNC:
                                if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                    if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                        continue;
                                    }
                                }

                                return sendResult;
                            default:
                                break;
                        }
                    } 
                } else {
                    break;
                }
            }
        // 省略
            
    }

主要流程以下:學習

  • Step 1: 根據Topic 獲取TopicPublishInfo,TopicPublishInfo中有咱們的Topic發佈消息的信息(),這個數據先從本地獲取若是本地沒有,則從NameServer去拉取,而且定時每隔20s會去獲取TopicPublishInfo。fetch

  • Step 2: 獲取總共執行次數(用於重試),若是發送方式是同步,那麼總共次數會有3次,其餘狀況只會有1次。優化

  • Step 3: 從MessageQueue中選取一個進行發送,MessageQueue的概念能夠等同於Kafka的partion分區,看做發送消息的最小粒度。這個選擇有兩種方式:

    • 根據發送延遲進行選擇,若是上一次發送的Broker是可用的,則從當前Broker選擇遍歷循環選擇一個,若是不可用那麼須要選擇一個延遲最低的Broker從當前Broker上選擇MessageQueue。
    • 經過輪訓的方式進行選擇MessageQueue。
  • Step 4: 將Message發送至選擇出來的MessageQueue上的Broker。

  • Step 5: 更新Broker的延遲。

  • Step 6: 根據不一樣的發送方式來處理結果:

    • Async: 異步發送,經過callBack關心結果,因此這裏不進行處理。
    • OneWay: 顧名思義,就是單向發送,只須要發給broker,不須要關心結果,這裏連callback都不須要。
    • Sync: 同步發送,須要關心結果,根據結果判斷是否須要進行重試,而後回到Step3。

能夠看見Rocketmq發送普通消息的流程比較清晰簡單,下面來看看順序消息。

2.1.2 順序消息

順序消息分爲分區順序消息和全局順序消息,全局順序消息比較容易理解,也就是哪條消息先進入,哪條消息就會先被消費,符合咱們的FIFO,不少時候全局消息的實現代價很大,因此就出現了分區順序消息。分區順序消息的概念能夠以下圖所示:

咱們經過對消息的key,進行hash,相同hash的消息會被分配到同一個分區裏面,固然若是要作全局順序消息,咱們的分區只須要一個便可,因此全局順序消息的代價是比較大的。

對RocketMq熟悉的小夥伴會發現,它其實並無提供順序消息發送相關的API,可是在阿里雲的RocketMq版本提供了順序消息的API,原理比較簡單,其實也是對現有API的一個封裝:

SendResult sendResultRMQ = this.defaultMQProducer.send(msgRMQ, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg,
                    Object shardingKey) {
                    int select = Math.abs(shardingKey.hashCode());
                    if (select < 0) {
                        select = 0;
                    }
                    return mqs.get(select % mqs.size());
                }
            }, shardingKey);

能夠看見順序消息將MessageQueue的選擇交由咱們發送方去作,因此咱們直接利用咱們shardingKey的hashCode進行發送分區。

3.1 消費消息

3.1.1 普通消息

普通消息使用比較簡單,以下面代碼所示:

public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Test_Consumer");
        consumer.subscribe("TopicTest", "*");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.setConsumeThreadMin(10);
        consumer.setConsumeThreadMax(10);
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
  • Step1:首先新建一個DefaultMQPushConsumer,並註冊對應的Topic和NameServer的信息。
  • Step2: 註冊消息監聽器,再RocketMq中有兩種消息監聽器,一個是MessageListenerConcurrently,用於咱們普通消息併發消費,還有一個是MessageListenerOrderly,用於咱們順序消息。這裏咱們使用的MessageListenerConcurrently。
  • Step3: 設置ConsumeThread大小,用於控制咱們的線程池去消費他。
  • Step4: 啓動Consumer。

啓動Consumer以後,咱們就開始真正的從Broker去進行消費了,可是咱們如何從Broker去消費的呢?首先在咱們的第一步裏面咱們訂閱了一個Topic,咱們就會定時去刷新Topic的相關信息好比MessageQueue的變動,而後將對應的MessageQueue分配給當前Consumer:

// 這個數據 是10s更新一次 從內存中獲取
                Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
                // 這個數據實時去拉取
                List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
                if (null == mqSet) {
                    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                        log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
                    }
                }

                if (null == cidAll) {
                    log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
                }

                if (mqSet != null && cidAll != null) {
                    List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
                    mqAll.addAll(mqSet);

                    Collections.sort(mqAll);
                    Collections.sort(cidAll);

                    AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

                    List<MessageQueue> allocateResult = null;
                    try {
                    //經過默認的分配策略進行分配
                        allocateResult =
                                strategy.allocate(this.consumerGroup, this.mQClientFactory.getClientId(), mqAll,
                                        cidAll);
                    } catch (Throwable e) {
                        log.error(
                                "AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}",
                                strategy.getName(), e);
                        return;
                    }

                    Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
                    if (allocateResult != null) {
                        allocateResultSet.addAll(allocateResult);
                    }

                    boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);

這裏首先向Broker拿到當前消費全部的ConsumerId默認是對應機器的Ip+實例名字,Broker中的ConsumerId信息是Consumer經過心跳定時進行上報得來的,而後根據消費分配策略將消息分配給Consumer,這裏默認是平均分配,將咱們分配到的消息隊列,記錄在 processQueueTable中,若是出現了新增,那麼咱們須要建立一個PullRequst表明這拉取消息的請求,異步去處理:

List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
        for (MessageQueue mq : mqSet) {
            if (!this.processQueueTable.containsKey(mq)) {
                if (isOrder && !this.lock(mq)) {
                    log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
                    continue;
                }

                this.removeDirtyOffset(mq);
                ProcessQueue pq = new ProcessQueue();
                // 這裏就是獲取咱們第一次應該拿什麼offset
                long nextOffset = this.computePullFromWhere(mq);
                if (nextOffset >= 0) {
                    ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
                    if (pre != null) {
                        log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
                    } else {
                        log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
                        PullRequest pullRequest = new PullRequest();
                        pullRequest.setConsumerGroup(consumerGroup);
                        pullRequest.setNextOffset(nextOffset);
                        pullRequest.setMessageQueue(mq);
                        pullRequest.setProcessQueue(pq);
                        pullRequestList.add(pullRequest);
                        changed = true;
                    }
                } else {
                    log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
                }
            }
        }

        this.dispatchPullRequest(pullRequestList);

在PullService中會不斷的從PullRequestQueue拿取數據,而後進行拉取數據。

while (!this.isStopped()) {
            try {
                // rebalance 以後第一次向這個隊列放數據 後續消費的時候會繼續放
                PullRequest pullRequest = this.pullRequestQueue.take();
                this.pullMessage(pullRequest);
            } catch (InterruptedException ignored) {
            } catch (Exception e) {
                log.error("Pull Message Service Run Method exception", e);
            }
        }

拉取數據以後,這裏會給PullCallBack進行響應:

PullCallback pullCallback = new PullCallback() {
            @Override
            public void onSuccess(PullResult pullResult) {
                if (pullResult != null) {
                    pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
                        subscriptionData);

                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                                firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
                                boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
                                DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
                                    pullResult.getMsgFoundList(),
                                    processQueue,
                                    pullRequest.getMessageQueue(),
                                    dispatchToConsume);

                                if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
                                    DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
                                        DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
                                } else {
                                    DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                                }

若是這裏成功拉取到消息的話,咱們首先將拉取的消息存入到咱們的ProcessQueue中,ProcessQueue用於咱們消費者處理的狀態以及待處理的消息,而後提交到咱們的Consumer線程池中進行真正的業務邏輯消費,而後再提交一個PullRequest用於咱們下次消費。

你們看到這裏有沒有發現這個模式和咱們的netty中的單線程accpet,多個線程來處理業務邏輯很類似,其原理都是同樣,由一個線程不斷的去拉取,而後由咱們業務上定義的線程池進行處理。以下圖所示:

咱們發現咱們拉取消息實際上是一個循環的過程,這裏就來到了第一個問題,若是消息隊列消費的速度跟不上消息發送的速度,那麼就會出現消息堆積,不少同窗根據過程來看可能會覺得,咱們的拉取消息一直在進行,因爲咱們的消費速度比較慢,會有不少message以隊列的形式存在於咱們的內存中,那麼會致使咱們的JVM出現OOM也就是內存溢出。

那麼到底會不會出現OOM呢?實際上是不會的,RocketMq對安全性方面作得很好,有下面兩段代碼:

if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
            System.out.println(cachedMessageCount + ":"+pullRequest);
            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
            return;
        }

        if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
            return;
        }

首先是會判斷當前內存緩存的Message數量是否大於限制的值默認是1000,若是大於則延遲一段時間再次提交pullRequest。 而後判斷當前內存緩存的Size大小是否大於了某個值,默認是100M,若是大於也會延遲一段時間再次提交pullRequest。 因此在咱們consumer上若是出現消息堆積,基本也沒有什麼影響。

那咱們想一想第二個問題應該怎麼解決呢?再普通消息的場景下,如何提高消費速度?

  • 首先確定是須要提高咱們自己的處理速度,處理速度提高,消費速度天然就會提高。
  • 其次是要設置一個合理大小的consumer線程池,過小的話機器的資源得不到充分利用,太大的話線程的上下文切換可能會很快,通常來講根據消費者的業務來判斷,若是是cpu密集型線程設置cpu大小就好,若是是io密集型設置兩倍cpu大小。
  • 還有個就是MessageQueue,細心的同窗確定在上面看見咱們消費者消費消息以前,會被分配MessageQueue來進行獲取消費,因此天然而然就會想到,若是多分配一點MessageQueue數量是否是就會加快咱們的消費速度,其實MessageQueue對於咱們的普通消息消費提高幫助是很小的,由於全部的消費請求會被提交到線程池裏面去消費,MessageQueue再多也無濟於事,除非當咱們的Consumer機器不少的時候,MessageQueue數量小於Consumer機器的時候,這個時候增長MessageQueue纔會有提高效果,正所謂讓咱們的機器雨露均沾嘛。

3.1.1.1普通消息-消費結果處理

在rocketmq中對消息的消費結果處理也比較重要,這裏仍是先提三個問題:

  • 咱們的普通消息是怎麼處理結果的呢?
  • 若是消費失敗會怎麼辦呢?
  • 在普通消息消費的時候,是併發處理,若是出現offset靠後的消息先被消費完,可是咱們的offset靠前的尚未被消費完,這個時候出現了宕機,咱們的offset靠前的這部分數據是否會丟失呢?也就是下次消費的時候是否會從offset靠後的沒有被消費的開始消費呢?若是不是的話,rocketmq是怎麼作到的呢?

首先咱們來看第一個問題,怎麼處理消費結果,在processResult中有以下代碼:

public void processConsumeResult(
        final ConsumeConcurrentlyStatus status,
        final ConsumeConcurrentlyContext context,
        final ConsumeRequest consumeRequest
    ) {
        int ackIndex = context.getAckIndex();
        switch (status) {
            case CONSUME_SUCCESS:
                int ok = ackIndex + 1;
                int failed = consumeRequest.getMsgs().size() - ok;
                this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
                this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
                break;
            case RECONSUME_LATER:
                ackIndex = -1;
                this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
                    consumeRequest.getMsgs().size());
                break;
            default:
                break;
        }

        switch (this.defaultMQPushConsumer.getMessageModel()) {
            case BROADCASTING:
                for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                    MessageExt msg = consumeRequest.getMsgs().get(i);
                    log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
                }
                break;
            case CLUSTERING:
                List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
                for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                    MessageExt msg = consumeRequest.getMsgs().get(i);
                    boolean result = this.sendMessageBack(msg, context);
                    if (!result) {
                        msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
                        msgBackFailed.add(msg);
                    }
                }

                if (!msgBackFailed.isEmpty()) {
                    consumeRequest.getMsgs().removeAll(msgBackFailed);

                    this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
                }
                break;
            default:
                break;
        }

        long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
        if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
            this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
        }
    }
  • Step 1: 首先獲取ackIndex,即確認成功的數量,默認是int的最大數,表明着所有成功。
  • Step 2: 獲取 ConsumeConcurrentlyStatus,根據不一樣的狀態進行處理,ConsumeConcurrentlyStatus有兩個:
    • CONSUME_SUCCESS: 表明着消費成功,記錄成功的TPS和失敗的TPS。
    • RECONSUME_LATER: 表明着須要從新消費,通常是失敗纔會返回這個狀態,記錄失敗的TPS。
  • Step 3: 而後根據消息類型,進行不一樣的邏輯重試,消息消費類型有兩種:
    • BROADCASTING: 廣播消費,廣播消費不會進行重試,這裏會直接打一個warn日誌而後丟棄。
    • CLUSTERING:集羣消費,這裏會首先將失敗的消息發送回當前的topic,若是發送失敗,這裏會繼續進行本地消費重試。若是在Broker中發現這個消息重試次數已經達到上限,就會將這個消息發送至RetryTopic,而後由RetryTopic發送至死信隊列。
  • Step 4: 獲取message的offset,更新當前消費進度

在上面的第四步中,若是不深刻進去看內部邏輯,這裏會誤覺得,他會將當前消息的offset給更新到最新的消費進度,那問題三中說的中間的offset是有可能被丟失的,但其實是不會發生的,具體的邏輯保證在removeMessage中:

public long removeMessage(final List<MessageExt> msgs) {
        long result = -1;
        final long now = System.currentTimeMillis();
        try {
            this.lockTreeMap.writeLock().lockInterruptibly();
            this.lastConsumeTimestamp = now;
            try {
                if (!msgTreeMap.isEmpty()) {
                    result = this.queueOffsetMax + 1;
                    int removedCnt = 0;
                    for (MessageExt msg : msgs) {
                        MessageExt prev = msgTreeMap.remove(msg.getQueueOffset());
                        if (prev != null) {
                            removedCnt--;
                            msgSize.addAndGet(0 - msg.getBody().length);
                        }
                    }
                    msgCount.addAndGet(removedCnt);

                    if (!msgTreeMap.isEmpty()) {
                        result = msgTreeMap.firstKey();
                    }
                }
            } finally {
                this.lockTreeMap.writeLock().unlock();
            }
        } catch (Throwable t) {
            log.error("removeMessage exception", t);
        }
        return result;
    }

在removeMessage中經過msgTreeMap去作了一個保證,msgTreeMap是一個TreeMap,根據offset升序排序,若是treeMap中有值的話,他返回的offset就會是當前msgTreeMap中的firstKey,而不是當前的offset,從而就解決了問題三。

上面的過程總結爲下圖所示:

3.1.2 順序消息

順序消息的消費前面過程和普通消息基本同樣,這裏咱們須要關注的是將消息丟給咱們消費線程池以後的邏輯:

final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
            synchronized (objLock) {
                // 省略
                List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);
                status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
                // 省略
            }

能夠發現這裏比普通消息多了一個步驟,那就是加鎖,這裏會獲取到以messageQueue爲緯度去加鎖,而後去咱們的processQueue中獲取到咱們的Message, 這裏也是用的咱們的msgTreeMap, 獲取的最小offset的Message。

因此咱們以前的線程池提升併發速度的策略在這裏沒有用了,那麼應該怎麼辦呢?既然咱們加鎖是以messageQueue爲緯度,那麼增長MessageQueue就行了,因此這裏的提高消費速度恰好和普通消息相反,再普通消息中提高Messagequeue可能效果並無那麼大,可是在順序消息的消費中提高就很大了。

咱們在壓測的時候,發現順序消息消費很慢,消息堆積很嚴重,通過調試發現阿里雲上的rocketmq默認讀寫隊列爲16,咱們consumer機器有10臺,每一個consumer線程池大小爲10,理論併發應該有100,可是因爲順序消息的緣由致使實際併發只有16,最後找阿里的技術人員將讀寫隊列擴至100,這樣充分利用咱們的資源,極大的增長了順序消息消費的速度,消息基本不會再堆積。

3.1.2.1 順序消息-消費結果處理

順序消息的結果處理和普通消息的處理流程,稍有不一樣,代碼以下:

public boolean processConsumeResult(
        final List<MessageExt> msgs,
        final ConsumeOrderlyStatus status,
        final ConsumeOrderlyContext context,
        final ConsumeRequest consumeRequest
    ) {
        boolean continueConsume = true;
        long commitOffset = -1L;
        if (context.isAutoCommit()) {
            switch (status) {
                case SUCCESS:
                    commitOffset = consumeRequest.getProcessQueue().commit();
                    this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
                    break;
                case SUSPEND_CURRENT_QUEUE_A_MOMENT:
                    this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
                    if (checkReconsumeTimes(msgs)) {
                        consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
                        this.submitConsumeRequestLater(
                            consumeRequest.getProcessQueue(),
                            consumeRequest.getMessageQueue(),
                            context.getSuspendCurrentQueueTimeMillis());
                        continueConsume = false;
                    } else {
                        commitOffset = consumeRequest.getProcessQueue().commit();
                    }
                    break;
                default:
                    break;
            }
        } else {
            switch (status) {
                case SUCCESS:
                    this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
                    break;
                case SUSPEND_CURRENT_QUEUE_A_MOMENT:
                    this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
                    if (checkReconsumeTimes(msgs)) {
                        consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
                        this.submitConsumeRequestLater(
                            consumeRequest.getProcessQueue(),
                            consumeRequest.getMessageQueue(),
                            context.getSuspendCurrentQueueTimeMillis());
                        continueConsume = false;
                    }
                    break;
                default:
                    break;
            }
        }

        if (commitOffset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
            this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), commitOffset, false);
        }

        return continueConsume;
    }
  • Step 1: 判斷當前offset是不是自動提交更新,通常autoCommit不須要設置,默認是自動提交,除非有特別的需求才會作這樣一個設置。
  • Step 2: 若是是自動提交,須要判斷狀態:
    • SUCCESS: 若是是成功狀態則獲取當前須要提交的offset,而後記錄到OK的TPS中
    • SUSPEND_CURRENT_QUEUE_A_MOMENT:注意在普通消息中若是失敗會返回RECONSUME_LATER,有什麼不一樣呢?再這個狀態下面,並不會向當前topic再次發送,而是會在本地線程池再次提交一個ConsumeRequest,延遲重試,這裏默認時間是1s。若是大於了最大重試次數這裏會將數據發送至RetryTopic。
  • Step 3: 若是不是自動提交的話,和步驟2相似,可是不會獲取提交的offset。
  • Step 4: 更新offset。

這裏回到咱們的第三個問題,如何設置消息消費的重試次數呢?因爲咱們直接使用的阿里雲的mq,因此咱們又包裝了一層,方便接入。再接入層中咱們最開始統一配置了最大重試2000次,這裏設置2000次的緣由主要是想讓咱們的消息隊列儘可能無限重試,由於咱們默認消息基本最終會成功,可是爲了以防萬一,因此這裏設置了一個較大的數值2000次。設置2000次對於咱們的普通消息,基本沒什麼影響,由於他會從新投遞至broker,可是咱們的順序消息是不行的,若是順序消息設置重試2000次,當遇到了這種不可能成功的消息的時候就會致使消息一直在本地進行重試,而且因爲對隊列加鎖了,因此當前MessageQueue將會一直被阻塞,致使後續消息不會被消費,若是設置2000次那麼至少會阻塞半個小時以上。因此這裏應該將順序消息設置一個較小的值,目前咱們設置爲16。

4. 最後

以前沒怎麼看過Rocketmq的源碼,通過此次打壓,從Rocketmq中學習到了不少精妙優秀的設計,將一些經驗提煉成了文中的一些問題,但願你們能仔細閱讀,找到答案。

若是你們以爲這篇文章對你有幫助,你的關注和轉發是對我最大的支持,O(∩_∩)O:

å

相關文章
相關標籤/搜索