聊聊rocketmq的updateConsumeOffsetToBroker

本文主要研究一下rocketmq的updateConsumeOffsetToBrokerjava

updateConsumeOffsetToBroker

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.javagit

public class RemoteBrokerOffsetStore implements OffsetStore {

    //......

    /**
     * Update the Consumer Offset synchronously, once the Master is off, updated to Slave,
     * here need to be optimized.
     */
    @Override
    public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
        MQBrokerException, InterruptedException, MQClientException {
        FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
        if (null == findBrokerResult) {

            this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
            findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
        }

        if (findBrokerResult != null) {
            UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader();
            requestHeader.setTopic(mq.getTopic());
            requestHeader.setConsumerGroup(this.groupName);
            requestHeader.setQueueId(mq.getQueueId());
            requestHeader.setCommitOffset(offset);

            if (isOneway) {
                this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway(
                    findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
            } else {
                this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffset(
                    findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
            }
        } else {
            throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
        }
    }

    //......
}
  • RemoteBrokerOffsetStore的updateConsumeOffsetToBroker方法首先經過mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName())獲取findBrokerResult
  • 若返回null,則執行mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()),而後再執行mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName())獲取findBrokerResult
  • 以後對於findBrokerResult不爲null的狀況構建UpdateConsumerOffsetRequestHeader,而後執行mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway或者mQClientFactory.getMQClientAPIImpl().updateConsumerOffset

findBrokerAddressInAdmin

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/factory/MQClientInstance.javagithub

public class MQClientInstance {

    //......

    public FindBrokerResult findBrokerAddressInAdmin(final String brokerName) {
        String brokerAddr = null;
        boolean slave = false;
        boolean found = false;

        HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);
        if (map != null && !map.isEmpty()) {
            for (Map.Entry<Long, String> entry : map.entrySet()) {
                Long id = entry.getKey();
                brokerAddr = entry.getValue();
                if (brokerAddr != null) {
                    found = true;
                    if (MixAll.MASTER_ID == id) {
                        slave = false;
                    } else {
                        slave = true;
                    }
                    break;

                }
            } // end of for
        }

        if (found) {
            return new FindBrokerResult(brokerAddr, slave, findBrokerVersion(brokerName, brokerAddr));
        }

        return null;
    }

    //......
}
  • findBrokerAddressInAdmin方法首先從brokerAddrTable獲取指定brokerName的brokerId及address的map,而後遍歷map,對於brokerAddr不爲null的標記found爲true,標記brokerId爲MixAll.MASTER_ID的slave爲false,不然爲true,最後跳出循環;若found爲true則構造FindBrokerResult返回,不然返回null

小結

  • RemoteBrokerOffsetStore的updateConsumeOffsetToBroker方法首先經過mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName())獲取findBrokerResult
  • 若返回null,則執行mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()),而後再執行mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName())獲取findBrokerResult
  • 以後對於findBrokerResult不爲null的狀況構建UpdateConsumerOffsetRequestHeader,而後執行mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway或者mQClientFactory.getMQClientAPIImpl().updateConsumerOffset

doc

相關文章
相關標籤/搜索