RocketMQ生產者流程篇

系列文章

RocketMQ入門篇
RocketMQ生產者流程篇
RocketMQ生產者消息篇git

前言

生產者向消息隊列裏面寫入消息,不一樣的業務場景會採用不一樣的寫入策略,好比:同步發送,異步發送,延遲發送,事務消息等;本文首先從分析生產者發送消息的流程開始,而後再來介紹各類發送消息的策略。github

生產者流程

1.流程概述

生產者首先須要設置namesrv,或者指定其餘方式更新namesrv;而後從namesrv獲取topic的路由信息,路由信息包括broker以及Message Queue等信息,同時將路由信息保存在本地內存中,方便下次使用;最後從Message Queue列表中選擇合適的Queue發送消息,實現負載均衡;apache

2.啓動過程

DefaultMQProducer實例化提供了兩個參數分別是:生產者組名稱以及RPCHook,RPCHook是一個接口,具體實現交由業務端實現,兩個方法分別是:doBeforeRequest和doAfterResponse,表示在執行請求以前和接收返回以後分別執行相關邏輯;
接下來就是調用DefaultMQProducer的start方法,相關的初始化操做都在裏面進行,內部其實調用的是DefaultMQProducerImpl的start方法,具體代碼以下:segmentfault

public void start(final boolean startFactory) throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;

                this.checkConfig();

                if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                    this.defaultMQProducer.changeInstanceNameToPID();
                }

                this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);

                boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
                if (!registerOK) {
                    this.serviceState = ServiceState.CREATE_JUST;
                    throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                        null);
                }

                this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());

                if (startFactory) {
                    mQClientFactory.start();
                }

                log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
                    this.defaultMQProducer.isSendMessageWithVIPChannel());
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
                throw new MQClientException("The producer service state not OK, maybe started once, "
                    + this.serviceState
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                    null);
            default:
                break;
        }

        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
    }

默認serviceState的狀態爲CREATE_JUST,剛進入設置爲START_FAILED,初始化完成以後設置爲RUNNING,再次初始化時會直接報錯,下面看一下具體初始化了哪些信息;數組

2.1檢查配置

這裏的檢查其實就是對producerGroup進行合法性校驗;負載均衡

2.2設置instanceName

若是producerGroup不等於默認的"CLIENT_INNER_PRODUCER",則設置DefaultMQProducer的instanceName爲進程的pid;框架

2.3建立MQClientInstance對象

首先檢查 ConcurrentMap<String/ clientId /, MQClientInstance> factoryTable中是否已經存在已clientId爲key的MQClientInstance,若是存在則取出,不存在則實例化;clientId是已ip地址,instanceName以及unitName組成的,例如:10.13.83.7@12500dom

2.4註冊producer

將DefaultMQProducerImpl註冊到MQClientInstance中,已producerGroup做爲key,註冊到ConcurrentMap<String/ group /, MQProducerInner> producerTable中,若是已經存在此Group,則拋出異常;異步

2.5初始化TopicPublishInfo

已topic名稱爲"TBW102"爲key,實例化TopicPublishInfo做爲value,存放在ConcurrentMap<String/ topic /, TopicPublishInfo> topicPublishInfoTable中,TopicPublishInfo用來存放topic的路由信息;函數

2.6啓動MQClientInstance

MQClientInstance啓動會啓動不少相關服務,具體能夠看以下代碼:

public void start() throws MQClientException {

        synchronized (this) {
            switch (this.serviceState) {
                case CREATE_JUST:
                    this.serviceState = ServiceState.START_FAILED;
                    // If not specified,looking address from name server
                    if (null == this.clientConfig.getNamesrvAddr()) {
                        this.mQClientAPIImpl.fetchNameServerAddr();
                    }
                    // Start request-response channel
                    this.mQClientAPIImpl.start();
                    // Start various schedule tasks
                    this.startScheduledTask();
                    // Start pull service
                    this.pullMessageService.start();
                    // Start rebalance service
                    this.rebalanceService.start();
                    // Start push service
                    this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                    log.info("the client factory [{}] start OK", this.clientId);
                    this.serviceState = ServiceState.RUNNING;
                    break;
                case RUNNING:
                    break;
                case SHUTDOWN_ALREADY:
                    break;
                case START_FAILED:
                    throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
                default:
                    break;
            }
        }
    }

默認serviceState的狀態爲CREATE_JUST,剛進入設置爲START_FAILED,初始化完成以後設置爲RUNNING,防止重複初始化;

2.6.1初始化NameServerAddr

首先判斷DefaultMQProducer是否配置了NameServerAddr,若是沒有配置會到一個地址下獲取,地址默認爲:http://jmenv.tbsite.net:8080/rocketmq/nsaddr,相關的邏輯在MixAll類中,代碼以下:

public static String getWSAddr() {
        String wsDomainName = System.getProperty("rocketmq.namesrv.domain", DEFAULT_NAMESRV_ADDR_LOOKUP);
        String wsDomainSubgroup = System.getProperty("rocketmq.namesrv.domain.subgroup", "nsaddr");
        String wsAddr = "http://" + wsDomainName + ":8080/rocketmq/" + wsDomainSubgroup;
        if (wsDomainName.indexOf(":") > 0) {
            wsAddr = "http://" + wsDomainName + "/rocketmq/" + wsDomainSubgroup;
        }
        return wsAddr;
    }

正常狀況下咱們須要設置本身的地址,能夠經過以下方式設置:

System.setProperty("rocketmq.namesrv.domain", "localhost");

這種狀況下就能夠不用手動設置NameServerAddr;

2.6.2初始化RemotingClient

RemotingClient是一個接口類,底層使用的通信框架是Netty,提供了實現類NettyRemotingClient,RemotingClient在初始化的時候實例化Bootstrap,方便後續用來建立Channel;

2.6.3啓動定時器任務

總共啓動了5個定時器任務,分別是:定時更新NameServerAddr信息,定時更新topic的路由信息,定時清理下線的broker,定時持久化Consumer的Offset信息,定時調整線程池;

2.6.3啓動服務

pullMessageService和rebalanceService被用在消費端的兩個服務類,分別是:從broker拉取消息的服務和均衡消息隊列服務,負責分配消費者可消費的消息隊列;

3.發送消息

相關發送消息的代碼在DefaultMQProducerImpl的sendDefaultImpl方法中,部分代碼以下所示:

private SendResult sendDefaultImpl(
        Message msg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        this.makeSureStateOK();
        Validators.checkMessage(msg, this.defaultMQProducer);

        final long invokeID = random.nextLong();
        long beginTimestampFirst = System.currentTimeMillis();
        long beginTimestampPrev = beginTimestampFirst;
        long endTimestamp = beginTimestampFirst;
        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
        if (topicPublishInfo != null && topicPublishInfo.ok()) {
            boolean callTimeout = false;
            MessageQueue mq = null;
            Exception exception = null;
            SendResult sendResult = null;
            int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
            int times = 0;
            String[] brokersSent = new String[timesTotal];
            for (; times < timesTotal; times++) {
                String lastBrokerName = null == mq ? null : mq.getBrokerName();
                MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
                if (mqSelected != null) {
                    mq = mqSelected;
                    brokersSent[times] = mq.getBrokerName();
                    try {
                        beginTimestampPrev = System.currentTimeMillis();
                        long costTime = beginTimestampPrev - beginTimestampFirst;
                        if (timeout < costTime) {
                            callTimeout = true;
                            break;
                        }

                        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                        switch (communicationMode) {
                            case ASYNC:
                                return null;
                            case ONEWAY:
                                return null;
                            case SYNC:
                                if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                    if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                        continue;
                                    }
                                }

                                return sendResult;
                            default:
                                break;
                        }
                        ...如下代碼省略...

此方法的四個參數分別是:msg爲要發送的消息,communicationMode要使用的發送方式包括同步、異步、單向,sendCallback在異步狀況下的回調函數,timeout發送消息的超時時間;

3.1獲取topic的路由信息

經過msg中設置的topic獲取路由信息,具體代碼以下:

private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
        TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
        if (null == topicPublishInfo || !topicPublishInfo.ok()) {
            this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
        }

        if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
            return topicPublishInfo;
        } else {
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
            return topicPublishInfo;
        }
    }

首先從變量ConcurrentMap<String/ topic /, TopicPublishInfo> topicPublishInfoTable中獲取是否存在指定topic的路由信息,若是獲取不到則使用topic去nameServer獲取路由信息,若是仍是獲取不到則使用默認的topic名稱爲"TBW102"去獲取路由信息,須要使用默認名稱去獲取的狀況是沒有建立topic,須要broker自動建立topic的狀況;獲取路由信息使用的是MQClientInstance中的updateTopicRouteInfoFromNameServer方法,此方法根據topic獲取路由信息,具體鏈接哪臺nameServer,會從列表中順序的選擇nameServer,實現負載均衡;
注:名稱爲"TBW102"的topic是系統自動建立的;

3.2選擇MessageQueue

成功獲取到路由信息以後,須要從MessageQueue列表中選擇一個,在這以前獲取了默認發送消息失敗的重試次數,默認爲3次(只有發送模式是同步的狀況下),代碼以下:

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
        if (this.sendLatencyFaultEnable) {
            try {
                int index = tpInfo.getSendWhichQueue().getAndIncrement();
                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                    if (pos < 0)
                        pos = 0;
                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                        if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                            return mq;
                    }
                }

                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
                if (writeQueueNums > 0) {
                    final MessageQueue mq = tpInfo.selectOneMessageQueue();
                    if (notBestBroker != null) {
                        mq.setBrokerName(notBestBroker);
                        mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                    }
                    return mq;
                } else {
                    latencyFaultTolerance.remove(notBestBroker);
                }
            } catch (Exception e) {
                log.error("Error occurred when selecting message queue", e);
            }

            return tpInfo.selectOneMessageQueue();
        }

        return tpInfo.selectOneMessageQueue(lastBrokerName);
    }

以上代碼在MQFaultStrategy,此類提供了選擇MessageQueue的策略,相關策略能夠看類變量:

private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();

    private boolean sendLatencyFaultEnable = false;

    private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
    private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};

latencyFaultTolerance:延遲容錯對象,維護brokers的延遲信息;
sendLatencyFaultEnable:延遲容錯開關,默認不開啓;
latencyMax:延遲級別數組;
notAvailableDuration :根據延遲級別,對應broker不可用的時長;

這樣上面的這段代碼就好理解了,首先斷定是否開啓開關,若是開啓首先獲取index,index初始爲一個隨機值,後面每次+1,這樣在後面與MessageQueue長度取模的時候每一個MessageQueue能夠按順序獲取;獲取
MessageQueue以後須要斷定其對應的Broker是否可用,同時也須要和當前指定的brokerName進行匹配;若是沒有獲取到就選擇一個延遲相對小的,pickOneAtLeast會作排序處理;若是都不行就直接獲取一個MessageQueue,無論其餘條件了;

3.3發送消息

在發送以前會作超時檢測,若是前面兩步已經超時了,則直接報超時,默認超時時間是3秒;部分代碼以下:

private SendResult sendKernelImpl(final Message msg,
                                      final MessageQueue mq,
                                      final CommunicationMode communicationMode,
                                      final SendCallback sendCallback,
                                      final TopicPublishInfo topicPublishInfo,
                                      final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        long beginStartTime = System.currentTimeMillis();
        String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
        if (null == brokerAddr) {
            tryToFindTopicPublishInfo(mq.getTopic());
            brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
        }

        SendMessageContext context = null;
        if (brokerAddr != null) {
            brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);

            byte[] prevBody = msg.getBody();
            try {
                //for MessageBatch,ID has been set in the generating process
                if (!(msg instanceof MessageBatch)) {
                    MessageClientIDSetter.setUniqID(msg);
                }

                int sysFlag = 0;
                boolean msgBodyCompressed = false;
                if (this.tryToCompressMessage(msg)) {
                    sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
                    msgBodyCompressed = true;
                }

                final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
                    sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
                }

                if (hasCheckForbiddenHook()) {
                    CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
                    checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
                    checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
                    checkForbiddenContext.setCommunicationMode(communicationMode);
                    checkForbiddenContext.setBrokerAddr(brokerAddr);
                    checkForbiddenContext.setMessage(msg);
                    checkForbiddenContext.setMq(mq);
                    checkForbiddenContext.setUnitMode(this.isUnitMode());
                    this.executeCheckForbiddenHook(checkForbiddenContext);
                }

                if (this.hasSendMessageHook()) {
                    context = new SendMessageContext();
                    context.setProducer(this);
                    context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                    context.setCommunicationMode(communicationMode);
                    context.setBornHost(this.defaultMQProducer.getClientIP());
                    context.setBrokerAddr(brokerAddr);
                    context.setMessage(msg);
                    context.setMq(mq);
                    String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                    if (isTrans != null && isTrans.equals("true")) {
                        context.setMsgType(MessageType.Trans_Msg_Half);
                    }

                    if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
                        context.setMsgType(MessageType.Delay_Msg);
                    }
                    this.executeSendMessageHookBefore(context);
                }

                SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
                requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                requestHeader.setTopic(msg.getTopic());
                requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
                requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
                requestHeader.setQueueId(mq.getQueueId());
                requestHeader.setSysFlag(sysFlag);
                requestHeader.setBornTimestamp(System.currentTimeMillis());
                requestHeader.setFlag(msg.getFlag());
                requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
                requestHeader.setReconsumeTimes(0);
                requestHeader.setUnitMode(this.isUnitMode());
                requestHeader.setBatch(msg instanceof MessageBatch);
                if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
                    if (reconsumeTimes != null) {
                        requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
                        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
                    }

                    String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
                    if (maxReconsumeTimes != null) {
                        requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
                        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
                    }
                }

                SendResult sendResult = null;
                switch (communicationMode) {
                    case ASYNC:
                        Message tmpMessage = msg;
                        if (msgBodyCompressed) {
                            //If msg body was compressed, msgbody should be reset using prevBody.
                            //Clone new message using commpressed message body and recover origin massage.
                            //Fix bug:https://github.com/apache/rocketmq-externals/issues/66
                            tmpMessage = MessageAccessor.cloneMessage(msg);
                            msg.setBody(prevBody);
                        }
                        long costTimeAsync = System.currentTimeMillis() - beginStartTime;
                        if (timeout < costTimeAsync) {
                            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                        }
                        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                            brokerAddr,
                            mq.getBrokerName(),
                            tmpMessage,
                            requestHeader,
                            timeout - costTimeAsync,
                            communicationMode,
                            sendCallback,
                            topicPublishInfo,
                            this.mQClientFactory,
                            this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
                            context,
                            this);
                        break;
                    case ONEWAY:
                    case SYNC:
                        long costTimeSync = System.currentTimeMillis() - beginStartTime;
                        if (timeout < costTimeSync) {
                            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                        }
                        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                            brokerAddr,
                            mq.getBrokerName(),
                            msg,
                            requestHeader,
                            timeout - costTimeSync,
                            communicationMode,
                            context,
                            this);
                        break;
                    default:
                        assert false;
                        break;
                }

                if (this.hasSendMessageHook()) {
                    context.setSendResult(sendResult);
                    this.executeSendMessageHookAfter(context);
                }

                return sendResult;

此處的6個參數分別是:msg消息自己,mq要發送到的隊列,communicationMode發送策略,sendCallback異步回調函數,topicPublishInfo路由信息,timeout發送超時時間(3秒-前2步消耗的時間);
首先須要獲取指定broker的地址,這要才能建立channel與broker鏈接;而後就是一些hock處理;接下來就是準備發送的消息頭SendMessageRequestHeader,最後根據不一樣的發送策略執行發送消息,此處就不在進入更加深刻的分析;

總結

本文重點介紹了發送者的啓動,以及發送消息的大概流程;關於消息的發送策略,以及消息的類型包括:順序消息,事務消息等,將在後面的文章介紹。

相關文章
相關標籤/搜索