跟我學RocketMQ之消息消費源碼解析(1)

本文咱們接着分析一下RocektMQ實現消息消費的源碼細節,這部分的內容較多,所以拆分爲幾個章節分別進行講解。java

本章節重點講解DefaultMQPushConsumer的代碼邏輯。算法

DefaultMQPushConsumer使用樣例

按照慣例仍是先看一下DefaultMQPushConsumer的使用樣例。spring

@PostConstruct
    public void init() {
        defaultMQPushConsumer = new DefaultMQPushConsumer("ORDER_RESULT_NOTIFY_GROUP");
        defaultMQPushConsumer.setNamesrvAddr(nameSrvAddr);
        // 從頭開始消費
        defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        // 消費模式:集羣模式
        defaultMQPushConsumer.setMessageModel(MessageModel.CLUSTERING);
        // 註冊監聽器
        defaultMQPushConsumer.registerMessageListener(messageListener);
        // 訂閱全部消息
        try {
            defaultMQPushConsumer.subscribe("ORDER_RESULT_NOTIFY_TOPIC", "*");
            defaultMQPushConsumer.start();
        } catch (MQClientException e) {
            throw new RuntimeException("[訂單結果通知消息消費者]--NotifySendConsumer加載異常!", e);
        }
        LOGGER.info("[訂單結果通知消息消費者]--NotifySendConsumer加載完成!");
    }複製代碼

初始化過程當中須要調用registerMessageListener將具體的消費實現Listener注入。數組

@Component(value = "notifySendListenerImpl")
    public class NotifySendListenerImpl implements MessageListenerConcurrently {複製代碼

...省略部分代碼...複製代碼
@Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {複製代碼
try {
                for (MessageExt msg : msgs) {
                    // 消息解碼
                    String message = new String(msg.getBody());
                    // 消費次數
                    int reconsumeTimes = msg.getReconsumeTimes();
                    String msgId = msg.getMsgId();
                    String logSuffix = ",msgId=" + msgId + ",reconsumeTimes=" + reconsumeTimes;複製代碼
LOGGER.info("[通知發送消息消費者]-OrderNotifySendProducer-接收到消息,message={},{}", message, logSuffix);
                    // 請求組裝
                    OrderResultNofityProtocol protocol = new OrderResultNofityProtocol();
                    protocol.decode(message);
                    // 參數加簽,獲取用戶privatekey
                    String privateKey = protocol.getPrivateKey();
                    String notifyUrl = protocol.getMerchantNotifyUrl();
                    String purseId = protocol.getPurseId();
                    ChargeNotifyRequest chargeNotifyRequest = new ChargeNotifyRequest();
                    chargeNotifyRequest.setChannel_orderid(protocol.getChannelOrderId())
                            .setFinish_time(DateUtil.formatDate(new Date(System.currentTimeMillis())))
                            .setOrder_status(NotifyConstant.NOTIFY_SUCCESS)
                            .setPlat_orderid(protocol.getOrderId())
                            .setSign(chargeNotifyRequest.sign(privateKey));
                    LOGGER.info("[通知發送消息消費者]-OrderNotifySendProducer-訂單結果通知入參:{},{}", chargeNotifyRequest.toString(), logSuffix);
                    // 通知發送
                    return sendNotifyByPost(reconsumeTimes, logSuffix, protocol, notifyUrl, purseId, chargeNotifyRequest);
                }
            } catch (Exception e) {
                LOGGER.error("[通知發送消息消費者]消費異常,e={}", LogExceptionWapper.getStackTrace(e));
            }
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }複製代碼

上面就是一個較爲標準的在spring框架中使用RocektMQ的DefaultMQPushConsumer進行消費的主流程。app

接下來咱們重點分析一下源碼實現。負載均衡

初始化DefaultMQPushConsumer

首先看一下DefaultMQPushConsumer的初始化過程。框架

進入DefaultMQPushConsumer.java類,查看構造方法:ide

public DefaultMQPushConsumer(final String consumerGroup) {
        this(null, consumerGroup, null, new AllocateMessageQueueAveragely());
    }複製代碼

調用了它的同名構造,採用AllocateMessageQueueAveragely策略(平均散列隊列算法)源碼分析

public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook,
        AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
        this.consumerGroup = consumerGroup;
        this.namespace = namespace;
        this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
        defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
    }複製代碼

能夠看到實際初始化是經過DefaultMQPushConsumerImpl實現的,DefaultMQPushConsumer持有一個defaultMQPushConsumerImpl的引用。ui

[DefaultMQPushConsumerImpl.java]
    public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer, RPCHook rpcHook) {
        // 初始化DefaultMQPushConsumerImpl,將defaultMQPushConsumer的實際引用傳入
        this.defaultMQPushConsumer = defaultMQPushConsumer;
        // 傳入rpcHook並指向本類的引用
        this.rpcHook = rpcHook;
    }
複製代碼

註冊消費監聽MessageListener

咱們接着看一下注冊消費監聽器的流程。

消費監聽接口MessageListener有兩個具體的實現,分別爲

MessageListenerConcurrently     -- 並行消費監聽
    MessageListenerOrderly          -- 順序消費監聽複製代碼

本文以MessageListenerConcurrently爲主要講解的對象。

查看MessageListenerConcurrently的註冊過程。

@Override
    public void registerMessageListener(
                MessageListenerConcurrently messageListener) {
        // 將實現指向本類引用
        this.messageListener = messageListener;
        // 進行真實註冊
        this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
    }複製代碼

接着看defaultMQPushConsumerImpl.registerMessageListener

DefaultMQPushConsumerImpl.java
    public void registerMessageListener(MessageListener messageListener) {
        this.messageListenerInner = messageListener;
    }複製代碼

能夠看到DefaultMQPushConsumerImpl將真實的messageListener實現指向它本類的messageListener引用。

訂閱topic

接着看一下訂閱topic的主流程。

topic訂閱主要經過方法subscribe實現,首先看一下DefaultMQPushConsumer的subscribe實現

@Override
    public void subscribe(String topic, String subExpression) 
                                        throws MQClientException {
        this.defaultMQPushConsumerImpl
            .subscribe(withNamespace(topic), subExpression);
    }複製代碼

能夠看到是調用了DefaultMQPushConsumerImpl的subscribe方法。

public void subscribe(String topic, String subExpression) throws MQClientException {
        try {
            // 構建主題的訂閱數據,默認爲集羣消費
            SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
                topic, subExpression);
            // 將topic的訂閱數據進行保存
            this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
            if (this.mQClientFactory != null) {
                // 若是MQClientInstance不爲空,則向全部的broker發送心跳包,加鎖
                this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
            }
        } catch (Exception e) {
            throw new MQClientException("subscription exception", e);
        }
    }複製代碼

看一下buildSubscriptionData代碼邏輯

[FilterAPI.java]
    public static SubscriptionData buildSubscriptionData(final String consumerGroup, String topic,
        String subString) throws Exception {
        // 構造一個SubscriptionData實體,設置topic、表達式(tag)
        SubscriptionData subscriptionData = new SubscriptionData();
        subscriptionData.setTopic(topic);
        subscriptionData.setSubString(subString);複製代碼
// 若是tag爲空或者爲"*",統一設置爲"*",即訂閱全部消息
        if (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) {
            subscriptionData.setSubString(SubscriptionData.SUB_ALL);
        } else {
            // tag不爲空,則先按照‘|’進行分割
            String[] tags = subString.split("\\|\\|");
            if (tags.length > 0) {
                // 遍歷tag表達式數組
                for (String tag : tags) {
                    if (tag.length() > 0) {
                        String trimString = tag.trim();
                        if (trimString.length() > 0) {
                            // 將每一個tag的值設置到tagSet中
                            subscriptionData.getTagsSet().add(trimString);
                            subscriptionData.getCodeSet().add(trimString.hashCode());
                        }
                    }
                }
            } else {
                // tag解析異常
                throw new Exception("subString split error");
            }
        }
        return subscriptionData;
    }複製代碼

看一下sendHeartbeatToAllBrokerWithLock代碼邏輯

[MQClientInstance.java]
    public void sendHeartbeatToAllBrokerWithLock() {
        if (this.lockHeartbeat.tryLock()) {
            try {
                // 發送心跳包
                this.sendHeartbeatToAllBroker();
                this.uploadFilterClassSource();
            } catch (final Exception e) {
                log.error("sendHeartbeatToAllBroker exception", e);
            } finally {
                this.lockHeartbeat.unlock();
            }
        } else {
            log.warn("lock heartBeat, but failed.");
        }
    }複製代碼

能夠看到,同步發送心跳包給全部的broker,而該過程是經過RemotingClient統一實現的,經過調用RemotingClient.invokeSync實現心跳包的發送,底層是經過Netty實現的。具體細節本文不進行展開。

啓動消費客戶端

上述初始化流程執行完畢以後,經過start()方法啓動消費客戶端。

@Override
    public void start() throws MQClientException {
        // 設置消費者組
        setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
        // 啓動消費客戶端
        this.defaultMQPushConsumerImpl.start();
        // trace處理邏輯
        if (null != traceDispatcher) {
            try {
                traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
            } catch (MQClientException e) {
                log.warn("trace dispatcher start failed ", e);
            }
        }
    }複製代碼

關於trace的處理邏輯,本文再也不展開,感興趣的同窗能夠移步 跟我學RocketMQ之消息軌跡實戰與源碼分析

接着看defaultMQPushConsumerImpl.start()方法邏輯

[DefaultMQPushConsumerImpl.java]
    public synchronized void start() throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST:
                log.info("the consumer [{}] start beginning. messageModel={},
                 isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
                    this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
                this.serviceState = ServiceState.START_FAILED;複製代碼

首次啓動後,執行配置檢查,該方法爲前置校驗方法,主要進行消費屬性校驗。

this.checkConfig();複製代碼

將訂閱關係配置信息進行復制

this.copySubscription();複製代碼

若是當前爲集羣消費模式,修改實例名爲pid

if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
                    this.defaultMQPushConsumer.changeInstanceNameToPID();
                }複製代碼

建立一個新的MQClientInstance實例,若是已經存在直接使用該存在的MQClientInstance

this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);複製代碼

爲消費者負載均衡實現rebalanceImpl設置屬性

// 設置消費者組
                this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
                // 設置消費模式
                this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
                // 設置隊列分配策略
                this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
                // 設置當前的MQClientInstance實例
                this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
複製代碼
this.pullAPIWrapper = new PullAPIWrapper(
                    mQClientFactory,
                    this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
                // 註冊消息過濾鉤子
                this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);複製代碼

處理offset存儲方式

// offsetStore不爲空則使用當前的offsetStore方式
                if (this.defaultMQPushConsumer.getOffsetStore() != null) {
                    this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
                } else {
                    // 不然根據消費方式選擇具體的offsetStore方式存儲offset
                    switch (this.defaultMQPushConsumer.getMessageModel()) {
                        // 若是是廣播方式,則使用本地存儲方式
                        case BROADCASTING:
                            this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                            break;
                        // 若是是集羣方式,則使用遠端broker存儲方式存儲offset
                        case CLUSTERING:
                            this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                            break;
                        default:
                            break;
                    }
                    this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
                }
                 // 加載當前的offset
                this.offsetStore.load();複製代碼

根據MessageListener的具體實現方式選取具體的消息拉取線程實現。

// 若是是MessageListenerOrderly順序消費接口實現
                // 消息消費服務選擇:ConsumeMessageOrderlyService(順序消息消費服務)
                if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
                    this.consumeOrderly = true;
                    this.consumeMessageService =
                        new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
                } 
                // 若是是MessageListenerConcurrently並行消息消費接口實現
                // 消息消費服務選擇:ConsumeMessageConcurrentlyService(並行消息消費服務)
                else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
                    this.consumeOrderly = false;
                    this.consumeMessageService =
                        new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
                }複製代碼

選擇並初始化完成具體的消息消費服務以後,啓動消息消費服務。consumeMessageService主要負責對消息進行消費,它的內部維護了一個線程池。

// 啓動消息消費服務
                this.consumeMessageService.start();複製代碼

接着向MQClientInstance註冊消費者,並啓動MQClientInstance。這裏再次強調

一個JVM中全部消費者、生產者持有同一個MQClientInstance,且MQClientInstance只會啓動一次

boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
                if (!registerOK) {
                    this.serviceState = ServiceState.CREATE_JUST;
                    this.consumeMessageService.shutdown();
                    throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                        null);
                }複製代碼
mQClientFactory.start();
                log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
                this.serviceState = ServiceState.RUNNING;
                break;複製代碼

若是MQClientInstance已經啓動,或者已經關閉,或者啓動失敗,重複調用start會報錯。這裏也能直觀的反映出:MQClientInstance的啓動只有一次

case RUNNING:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
                throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
                    + this.serviceState
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                    null);
            default:
                break;
        }複製代碼

啓動完成執行後續收尾工做

// 訂閱關係改變,更新Nameserver的訂閱關係表
        this.updateTopicSubscribeInfoWhenSubscriptionChanged();
        // 檢查客戶端狀態
        this.mQClientFactory.checkClientInBroker();
        // 發送心跳包
        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
        // 喚醒執行消費者負載均衡
        this.mQClientFactory.rebalanceImmediately();
    }
複製代碼

copySubscription(),消息重試topic處理邏輯

消費者啓動流程較爲重要,咱們接着對其中的重點方法展開講解。這部份內容能夠暫時跳過,不影響對主流程的把控。

咱們研究一下copySubscription方法的實現細節。

[DefaultMQPushConsumerImpl.java]
    private void copySubscription() throws MQClientException {
        try {複製代碼
// 首先獲取訂閱信息
            Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();
            if (sub != null) {
                for (final Map.Entry<String, String> entry : sub.entrySet()) {
                    final String topic = entry.getKey();
                    final String subString = entry.getValue();
                    SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
                        topic, subString);
                    this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
                }
            }複製代碼
// 爲defaultMQPushConsumer設置具體的MessageListener實現
            if (null == this.messageListenerInner) {
                this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();
            }複製代碼

根據消費類型選擇是否進行重試topic訂閱

switch (this.defaultMQPushConsumer.getMessageModel()) {複製代碼
// 若是是廣播消費模式,則不進行任何處理,即無重試
                case BROADCASTING:
                    break;複製代碼
// 若是是集羣消費模式,訂閱重試主題消息
                case CLUSTERING:
                    final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
                    SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
                        retryTopic, SubscriptionData.SUB_ALL);
                    this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
                    break;
                default:
                    break;
            }
        } catch (Exception e) {
            throw new MQClientException("subscription exception", e);
        }
    }複製代碼

若是是集羣消費模式,會訂閱重試主題消息

獲取重試topic,規則爲 RETRYGROUPTOPIC_PREFIX + consumerGroup,即:"%RETRY%"+消費組名

爲重試topic設置訂閱關係,訂閱全部的消息;

消費者啓動的時候會自動訂閱該重試主題,並參與該topic的消息隊列負載過程。

小結

到此,咱們就DefaultMQPushConsumer的初始化、啓動、校驗以及topic訂閱、重試等代碼實現細節進行了較爲詳細的講解。

下一章節,我將帶領讀者對消息消費線程 consumeMessageService 的實現進行分析,咱們下篇文章見。

版權聲明: 原創不易,洗文可恥。除非註明,本博文章均爲原創,轉載請以連接形式標明本文地址。
相關文章
相關標籤/搜索