跟我學RocketMQ之消息消費源碼解析(2)

本文咱們接着分析RocketMQ消息消費的邏輯。併發

接上文,DefaultMQPushConsumerImpl啓動過程當中,啓動了consumeMessageService消息消費線程。異步

if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
                    this.consumeOrderly = true;
                    this.consumeMessageService =
                        new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
                } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
                    this.consumeOrderly = false;
                    this.consumeMessageService =
                        new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
                }
                this.consumeMessageService.start();複製代碼

能夠看到,是根據MessageListener的具體實現選擇具體的consumeMessageService實現,咱們重點講解並行消費服務ConsumeMessageConcurrentlyService。ide

ConsumeMessageConcurrentlyService初始化

首先看一下ConsumeMessageConcurrentlyService的成員變量,具體的解釋寫在註釋上函數

public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {
        private static final InternalLogger log = ClientLogger.getLog();複製代碼
// 消費推模式實現類
        private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;複製代碼
// 消費者引用
        private final DefaultMQPushConsumer defaultMQPushConsumer;複製代碼
// 併發消息事件監聽回調
        private final MessageListenerConcurrently messageListener;複製代碼
// 消息消費任務隊列
        private final BlockingQueue<Runnable> consumeRequestQueue;複製代碼
// 消息消費線程池
        private final ThreadPoolExecutor consumeExecutor;複製代碼
// 消息消費組
        private final String consumerGroup;複製代碼
// 添加消費任務到consumeExecutor定時調度器
        private final ScheduledExecutorService scheduledExecutorService;複製代碼
// 定時刪除過時任務線程池
        private final ScheduledExecutorService cleanExpireMsgExecutors;複製代碼

接着看它的構造方法:性能

public ConsumeMessageConcurrentlyService(
        DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,
        MessageListenerConcurrently messageListener) {複製代碼
// 初始化defaultMQPushConsumerImpl,messageListener
        // 本地引用指向外部具體實現
        this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
        this.messageListener = messageListener;
        this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();複製代碼
// 消費者組
        this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
        // 初始化消費請求隊列爲LinkedBlockingQueue無界隊列
        this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();複製代碼
// 初始化線程池,指向消費調度線程池
        this.consumeExecutor = new ThreadPoolExecutor(
            this.defaultMQPushConsumer.getConsumeThreadMin(),
            this.defaultMQPushConsumer.getConsumeThreadMax(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.consumeRequestQueue,
            new ThreadFactoryImpl("ConsumeMessageThread_"));複製代碼
// 初始化消費定時任務線程池,線程數=1
        this.scheduledExecutorService = 
            Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
        // 初始化清除過時消息線程池,線程數=1
        this.cleanExpireMsgExecutors = 
            Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CleanExpireMsgScheduledThread_"));
    }複製代碼

ConsumeMessageConcurrentlyService啓動

ConsumeMessageConcurrentlyService啓動邏輯爲start方法this

public void start() {
        this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {複製代碼
@Override
            public void run() {
                cleanExpireMsg();
            }複製代碼
},
         // 15min 消費超時
         this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES);
    }複製代碼

能夠看到start方法是對cleanExpireMsgExecutors進行處理,開啓清除過時消息的調度過程。spa

咱們重點看一下cleanExpireMsg方法。.net

private void cleanExpireMsg() {
        Iterator<Map.Entry<MessageQueue, ProcessQueue>> it =
            this.defaultMQPushConsumerImpl.getRebalanceImpl().getProcessQueueTable().entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<MessageQueue, ProcessQueue> next = it.next();
            ProcessQueue pq = next.getValue();
            pq.cleanExpiredMsg(this.defaultMQPushConsumer);
        }
    }複製代碼

能夠看到,cleanExpireMsg方法定時對ProcessQueue進行處理,將其中的消息進行清理。這部份內容不是講解的重點,暫時打住。線程

ConsumeMessageConcurrentlyService消息消費

咱們重點研究一下ConsumeMessageConcurrentlyService的消息消費過程。日誌

ConsumeMessageConcurrentlyService的消息消費過程主要方法爲submitConsumeRequest。

經過submitConsumeRequest提交消費請求進行消費過程。

@Override
    public void submitConsumeRequest(
        // 消息列表 默認一次從服務端拉取最多32條消息
        final List<MessageExt> msgs,            
        // 消息處理隊列    
        final ProcessQueue processQueue,    
        // 消息所屬的消息隊列        
        final MessageQueue messageQueue,      
        // 是否轉發到消費線程池 併發消費時忽略該參數      
        final boolean dispatchToConsume) {      複製代碼

獲取批量消費數量,這個值爲ConsumeMessageBatchMaxSize,默認爲1

final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();複製代碼

若是消息的大小小於等於consumeBatchSize,組裝消費請求,提交到消費線程池中進行消費操做。

若是一場則稍後再次提交消費請求,經過方法submitConsumeRequestLater實現。

if (msgs.size() <= consumeBatchSize) {
            // 拉取的消息小於等於consumeBatchSize(默認爲1)
            // 提交消費請求到線程池中進行消費
            ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
            try {
                this.consumeExecutor.submit(consumeRequest);
            } catch (RejectedExecutionException e) {
                this.submitConsumeRequestLater(consumeRequest);
            }複製代碼

若是拉取的消息條數大於consumeBatchSize,則對拉取到消息進行分頁處理;

每頁大小爲:consumeBatchSize。

經過循環迭代的方式,建立多個ConsumeRequest消費請求任務,提交到消費線程池中。

} else {
            // 拉取的消息大於consumeBatchSize 進行分頁提交任務到線程池
            for (int total = 0; total < msgs.size(); ) {
                List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
                for (int i = 0; i < consumeBatchSize; i++, total++) {
                    if (total < msgs.size()) {
                        msgThis.add(msgs.get(total));
                    } else {
                        break;
                    }
                }
                ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
                try {
                    // 提交消費任務到消費線程池
                    this.consumeExecutor.submit(consumeRequest);複製代碼

若是觸發拒絕提交異常,則稍後繼續提交。實際上,因爲任務隊列是LinkedBlockingQueue無界隊列,所以理論上不會出現拒絕提交。

} catch (RejectedExecutionException e) {
                    for (; total < msgs.size(); total++) {
                        msgThis.add(msgs.get(total));
                    }
                    this.submitConsumeRequestLater(consumeRequest);
                }
            }
        }
    }複製代碼

submitConsumeRequestLater

這裏插入對submitConsumeRequestLater的解釋,這一部分能夠直接選擇跳過,對主流程沒有影響。

private void submitConsumeRequestLater(final ConsumeRequest consumeRequest) {複製代碼
this.scheduledExecutorService.schedule(new Runnable() {複製代碼
@Override
            public void run() {
                ConsumeMessageConcurrentlyService.this.consumeExecutor.submit(consumeRequest);
            }
        }, 5000, TimeUnit.MILLISECONDS);
    }複製代碼

能夠看到經過scheduledExecutorService進行調度,每5秒再次提交一次消息消費請求。

咱們能夠看到消費消息服務的核心代碼爲

this.consumeExecutor.submit(consumeRequest);複製代碼

根據咱們對線程池調度的瞭解,能夠知道submit接受一個Runnable接口實現,也就是這裏的ConsumeRequest;經過調用該Runnable的run方法實現具體的調度邏輯。

咱們接着看一下ConsumeRequest的run方法。

ConsumeRequest.run()

大段代碼預警.....

@Override
        public void run() {複製代碼

step1. 首先檢查processQueue的dropped是否爲true,若是是true,則中止消費,直接return。

當發生消息rebalance時,會設置dropped==true,這麼作的目的是防止消費者消費不屬於本身的消息隊列。

if (this.processQueue.isDropped()) {
                log.info("the message queue not be able to consume, 
                because it's dropped. group={} {}", 
                    ConsumeMessageConcurrentlyService.this.consumerGroup,
                    this.messageQueue);
                return;
            }

複製代碼
MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
            ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
            ConsumeConcurrentlyStatus status = null;
            defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());
            ConsumeMessageContext consumeMessageContext = null;複製代碼

step2. 若是消費者存在鉤子函數,則經過 executeHookBefore 調用該鉤子函數

if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                consumeMessageContext = new ConsumeMessageContext();
                consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());
                consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());
                consumeMessageContext.setProps(new HashMap<String, String>());
                consumeMessageContext.setMq(messageQueue);
                consumeMessageContext.setMsgList(msgs);
                consumeMessageContext.setSuccess(false);
                ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
            }複製代碼

step3. 【重點!】此處的代碼是消費的核心部分。

首先判斷msgs是否爲空,若是不爲空,則迭代msgs,設置消費開始時間戳,回調客戶端實現的MessageListenerConcurrently.consumeMessage方法執行具體消費邏輯,得到其消費結果status。

long beginTimestamp = System.currentTimeMillis();
            boolean hasException = false;
            ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
            try {
                if (msgs != null && !msgs.isEmpty()) {
                    for (MessageExt msg : msgs) {
                        MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
                    }
                }複製代碼
// 經過Collections.unmodifiableList將msgs包裝爲不可修改的視圖
                status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);複製代碼
// 若是消費執行異常則hasException = true;
            } catch (Throwable e) {
                log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
                    RemotingHelper.exceptionSimpleDesc(e),
                    ConsumeMessageConcurrentlyService.this.consumerGroup,
                    msgs,
                    messageQueue);
                hasException = true;
            }
            // 計算消費耗時
            long consumeRT = System.currentTimeMillis() - beginTimestamp;複製代碼

step4. 根據具體的status返回值進行後續處理:

// 若是status爲空,且hasException==true,則返回ConsumeReturnType.EXCEPTION,
            // 不然返回 ConsumeReturnType.RETURNNULL
            if (null == status) {
                if (hasException) {
                    returnType = ConsumeReturnType.EXCEPTION;
                } else {
                    returnType = ConsumeReturnType.RETURNNULL;
                }
            // 消費超時
            } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
                returnType = ConsumeReturnType.TIME_OUT;
            // 業務側返回RECONSUME_LATER,須要從新消費,returnType爲消費失敗
            } else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {
                returnType = ConsumeReturnType.FAILED;
            // 業務側返回CONSUME_SUCCESS,消費成功,returnType爲消費成功
            } else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {
                returnType = ConsumeReturnType.SUCCESS;
            }
            ......
            // 若是客戶端返回的status爲null,則賦值爲RECONSUME_LATER,以便重複消費。
            if (null == status) {
                log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",
                    ConsumeMessageConcurrentlyService.this.consumerGroup,
                    msgs,
                    messageQueue);
                status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }複製代碼

step5. 若是存在鉤子函數,則執行鉤子函數executeHookAfter

if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                consumeMessageContext.setStatus(status.toString());
                consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);
                ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
            }複製代碼
ConsumeMessageConcurrentlyService.this.getConsumerStatsManager()
                .incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);複製代碼

step6. 執行消費邏輯以後,再次判斷processQueue的dropped狀態;若是爲true,則不進行任何處理;當非true時,調用processConsumeResult對消費結果進行處理。

if (!processQueue.isDropped()) {
                ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
            } else {
                log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
            }
        }複製代碼

之因此當processQueue的dropped狀態爲true時不作任何處理,是由於當processQueue.dropped==true時,說明此時可能出現了新消費者的加入/原消費者down機等狀況,致使原先消費者的隊列在rebalance以後分配給了新的消費者。那麼,這部分消息會被從新消費,所以此處就不須要作多餘的處理,等待從新消費就能夠了。

processConsumeResult解析消費結果

到processConsumeResult方法,就進入本文的結束部分,即:解析消費結果。

這部分的邏輯主要是消費進度offset進行處理。

public void processConsumeResult(
        // 並行消費結果
        final ConsumeConcurrentlyStatus status,
        // 並行消費上下文
        final ConsumeConcurrentlyContext context,
        // 消費請求
        final ConsumeRequest consumeRequest
    ) {
        int ackIndex = context.getAckIndex();複製代碼
if (consumeRequest.getMsgs().isEmpty())
            return;複製代碼

判斷消費結果,若是是CONSUMESUCCESS則設置ackIndex=msgs.size()-1;若是是RECONSUMELATER則設置ackIndex=-1。爲發送消息確認ACK作準備。

switch (status) {
            case CONSUME_SUCCESS:
                if (ackIndex >= consumeRequest.getMsgs().size()) {
                    ackIndex = consumeRequest.getMsgs().size() - 1;
                }
                int ok = ackIndex + 1;
                int failed = consumeRequest.getMsgs().size() - ok;
                this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
                this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
                break;
            case RECONSUME_LATER:
                ackIndex = -1;
                this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
                    consumeRequest.getMsgs().size());
                break;
            default:
                break;
        }複製代碼

根據消費類型,進行處理,若是是廣播模式:業務側返回RECONSUME_LATER不會從新消費,只會打印告警日誌;

若是是集羣模式,消息消費成功不執行sendMessageBack;當業務側返回RECONSUME_LATER時,這批消息須要將ACK發送給broker。

須要將它們從新封裝爲consumeRequest,延遲五秒後從新消費。

switch (this.defaultMQPushConsumer.getMessageModel()) {
            case BROADCASTING:
                for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                    MessageExt msg = consumeRequest.getMsgs().get(i);
                    log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
                }
                break;
            case CLUSTERING:
                List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
                for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                    MessageExt msg = consumeRequest.getMsgs().get(i);
                    boolean result = this.sendMessageBack(msg, context);
                    if (!result) {
                        msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
                        msgBackFailed.add(msg);
                    }
                }
                if (!msgBackFailed.isEmpty()) {
                    consumeRequest.getMsgs().removeAll(msgBackFailed);
                    this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
                }
                break;
            default:
                break;
        }複製代碼

最後,從ProcessQueue中將這批成功消費的消息移除,經過offset更新消費進度;以便後續可以從上次的消費位點繼續消費,避免重複消費。

long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
        if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
            this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
        }
    }複製代碼

submitConsumeRequestLater解析

咱們看一下submitConsumeRequestLater這個方法又作了哪些處理。

private void submitConsumeRequestLater(
        final List<MessageExt> msgs,
        final ProcessQueue processQueue,
        final MessageQueue messageQueue
    ) {複製代碼
this.scheduledExecutorService.schedule(new Runnable() {複製代碼
@Override
            public void run() {
                ConsumeMessageConcurrentlyService.this.submitConsumeRequest(msgs, processQueue, messageQueue, true);
            }
        }, 5000, TimeUnit.MILLISECONDS);
    }複製代碼

能夠看到就是在這個方法中調用了submitConsumeRequest進行了消息消費處理。這樣咱們的消費流程就完美的閉環了。

小結

本文咱們主要講解了ConsumeMessageConcurrentlyService消息消費服務是如何異步地對消息進行消費,着重分析了它的生命週期以及消費狀態的流轉過程。

到此,咱們還有一個問題沒有解決,那就是ConsumeMessageConcurrentlyService消費的消息是從何處得到的?

這裏就涉及到RocketMQ消息消費時的消息拉取流程,這個流程也是異步的,RocketMQ中大量使用了異步線程模型。這種方式便於理解,也有利於性能的提高,該異步流程咱們會在接下來的文章中繼續分析。


版權聲明:
原創不易,洗文可恥。除非註明,本博文章均爲原創,轉載請以連接形式標明本文地址。
博客地址: wuwenliang.net
相關文章
相關標籤/搜索