1.RocketMQ 源碼分析 — Message 發送與接收

1.概述

  • Producer 發送消息。主要是同步發送消息源碼,涉及到 異步/Oneway發送消息,事務消息會跳過。
  • Broker 接收消息。

總體交互發送時序圖以下: git

時序圖

2.Producer 發送消息:

時序圖

/*
 * Instantiate with a producer group name.
 */
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
複製代碼
DefaultMQProducer#send(Message)
@Override
public SendResult send(
    Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    Validators.checkMessage(msg, this);             //a
    msg.setTopic(withNamespace(msg.getTopic()));    //b
    return this.defaultMQProducerImpl.send(msg);    //c
}
複製代碼
public static String wrapNamespace(String namespace, String resourceWithOutNamespace) {
        if (StringUtils.isEmpty(namespace) || StringUtils.isEmpty(resourceWithOutNamespace)) {
            return resourceWithOutNamespace;
        }

        if (isSystemResource(resourceWithOutNamespace) || isAlreadyWithNamespace(resourceWithOutNamespace, namespace)) {
            return resourceWithOutNamespace;
        }

        String resourceWithoutRetryAndDLQ = withOutRetryAndDLQ(resourceWithOutNamespace);
        StringBuffer strBuffer = new StringBuffer();

        if (isRetryTopic(resourceWithOutNamespace)) {   //判斷是否爲重試隊列
            strBuffer.append(MixAll.RETRY_GROUP_TOPIC_PREFIX);
        }

        if (isDLQTopic(resourceWithOutNamespace)) {     //判斷是否爲死信隊列
            strBuffer.append(MixAll.DLQ_GROUP_TOPIC_PREFIX);
        }

        return strBuffer.append(namespace).append(NAMESPACE_SEPARATOR).append(resourceWithoutRetryAndDLQ).toString();
}
複製代碼

以上源碼:github

a處驗證消息和topic是否爲空apache

b處經過nameSpace進行判斷該topic是否爲特定的消息類型(重試消息或者死信消息類型的TOPIC)開頭,如果而且對應的NameSpace不爲空,在原來的topic基礎上拼接namespace。NamespaceUtil#wrapNamespace()方法。數組

c處發送同步消息,DefaultMQProducer#send(Message) 對 DefaultMQProducerImpl#send(Message) 進行封裝。緩存

DefaultMQProducerImpl#send()
/**
 * DEFAULT SYNC -------------------------------------------------------
 */
public SendResult send(
    Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    return send(msg, this.defaultMQProducer.getSendMsgTimeout());
}


public SendResult send(Message msg,
    long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}


private SendResult sendDefaultImpl(
    Message msg,
    final CommunicationMode communicationMode,
    final SendCallback sendCallback,
    final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    // 校驗 Producer 處於運行狀態
    this.makeSureStateOK();
    // 校驗消息格式
    Validators.checkMessage(msg, this.defaultMQProducer);

    // 調用編號;用於下面打印日誌,標記爲同一次發送消息
    final long invokeID = random.nextLong();
    long beginTimestampFirst = System.currentTimeMillis();
    long beginTimestampPrev = beginTimestampFirst;
    long endTimestamp = beginTimestampFirst;
    // 獲取 Topic路由信息     <a>
    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    if (topicPublishInfo != null && topicPublishInfo.ok()) {
        boolean callTimeout = false;
        MessageQueue mq = null;         // 最後選擇消息要發送到的隊列實例
        Exception exception = null;
        SendResult sendResult = null;   // 最後一次發送結果
        int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;   // 同步幾回調用,同步和異步狀況下默認爲3次
        int times = 0;                                  //第幾回發送
        String[] brokersSent = new String[timesTotal];  // 存儲每次發送消息選擇的broker名
        // 循環調用timesTotal次數發送消息,直到成功
        for (; times < timesTotal; times++) {
            String lastBrokerName = null == mq ? null : mq.getBrokerName();
            // 選擇消息要發送到的隊列   <b>
            MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
            if (mqSelected != null) {
                mq = mqSelected;
                brokersSent[times] = mq.getBrokerName();
                try {
                    beginTimestampPrev = System.currentTimeMillis();
                    if (times > 0) {
                        //Reset topic with namespace during resend.
                        msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
                    }
                    long costTime = beginTimestampPrev - beginTimestampFirst;
                    if (timeout < costTime) {
                        callTimeout = true;
                        break;
                    }
    
                    // 調用發送消息核心方法     <c>
                    sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                    endTimestamp = System.currentTimeMillis();
                    // 更新Broker可用性信息,在選擇發送到的消息隊列時,會參考Broker發送消息的延遲
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                    switch (communicationMode) {
                        case ASYNC:
                            return null;
                        case ONEWAY:
                            return null;
                        case SYNC:
                            if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                    continue;
                                }
                            }

                            return sendResult;
                        default:
                            break;
                    }
                } catch (RemotingException e) {// 打印異常,更新Broker可用性信息,更新繼續循環
                    //當拋出RemotingException時,若是進行消息發送失敗重試,則可能致使消息發送重複。例如,發送消息超時(RemotingTimeoutException),實際Broker接收到該消息並處理成功。所以,Consumer在消費時,須要保證冪等性。
                    endTimestamp = System.currentTimeMillis();
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                    log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                    log.warn(msg.toString());
                    exception = e;
                    continue;
                } catch (MQClientException e) {// 打印異常,更新Broker可用性信息,繼續循環
                    endTimestamp = System.currentTimeMillis();
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                    log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                    log.warn(msg.toString());
                    exception = e;
                    continue;
                } catch (MQBrokerException e) {// 打印異常,更新Broker可用性信息,部分狀況下的異常,直接返回,結束循環
                    endTimestamp = System.currentTimeMillis();
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                    log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                    log.warn(msg.toString());
                    exception = e;
                    switch (e.getResponseCode()) {
                        //如下幾種狀況所有繼續重試發送消息
                        case ResponseCode.TOPIC_NOT_EXIST:
                        case ResponseCode.SERVICE_NOT_AVAILABLE:
                        case ResponseCode.SYSTEM_ERROR:
                        case ResponseCode.NO_PERMISSION:
                        case ResponseCode.NO_BUYER_ID:
                        case ResponseCode.NOT_IN_CURRENT_UNIT:
                            continue;
                        default:// 若是有發送結果,進行返回,不然,拋出異常
                            if (sendResult != null) {
                                return sendResult;
                            }

                            throw e;
                    }
                } catch (InterruptedException e) {
                    endTimestamp = System.currentTimeMillis();
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                    log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                    log.warn(msg.toString());

                    log.warn("sendKernelImpl exception", e);
                    log.warn(msg.toString());
                    throw e;
                }
            } else {
                break;
            }
        }

        // 若發送結果不爲空,返回發送結果
        if (sendResult != null) {
            return sendResult;
        }

        // 根據不一樣狀況,拋出不一樣的異常
        String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",
            times,
            System.currentTimeMillis() - beginTimestampFirst,
            msg.getTopic(),
            Arrays.toString(brokersSent));

        info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);

        MQClientException mqClientException = new MQClientException(info, exception);
        if (callTimeout) {
            throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
        }

        if (exception instanceof MQBrokerException) {
            mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
        } else if (exception instanceof RemotingConnectException) {
            mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
        } else if (exception instanceof RemotingTimeoutException) {
            mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
        } else if (exception instanceof MQClientException) {
            mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
        }

        throw mqClientException;
    }

    // Namesrv找不到異常
    List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
    if (null == nsList || nsList.isEmpty()) {
        throw new MQClientException(
            "No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);
    }

    // 消息路由找不到異常
    throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
        null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
}
複製代碼

以上兩個send方法對sendsendDefaultImpl封裝。bash

<b>處調用MQFaultStrategy#selectOneMessageQueue()方法。 <c>處調用發送消息核心方法。網絡

繼續深刻<a>處獲取Topic路由信息方法 :app

DefaultMQProducerImpl#tryToFindTopicPublishInfo()
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
    // 從緩存中獲取 Topic發佈信息
    TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
    // 當無可用的 Topic發佈信息時,從Namesrv獲取一次
    if (null == topicPublishInfo || !topicPublishInfo.ok()) {
        this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
    }

    // 若獲取的 Topic發佈信息時候可用,則返回
    if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
        return topicPublishInfo;
    } else {
        //當從 Namesrv 沒法獲取時,DefaultMQProducer#createTopicKey對應的Topic發佈信息。目的是當 Broker 開啓自動建立 Topic開關時,Broker 接收到消息後自動建立Topic
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
        return topicPublishInfo;
    }
}
複製代碼

得到 Topic發佈信息。優先從緩存topicPublishInfoTable,若獲取不到則從Namesrv中得到。dom

繼續深刻<b>處選擇消息要發送到的隊列,MQFaultStrategy類MQFaultStrategy的類圖: eclipse

類圖

MQFaultStrategy
public class MQFaultStrategy {
    private final static InternalLogger log = ClientLogger.getLog();
    // 延遲故障容錯,維護每一個Broker的發送消息的延遲
    private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();

    // 發送消息延遲容錯開關
    private boolean sendLatencyFaultEnable = false;

    // 延遲級別數組
    private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
    // 不可用時長數組
    private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};

    public long[] getNotAvailableDuration() {
        return notAvailableDuration;
    }

    public void setNotAvailableDuration(final long[] notAvailableDuration) {
        this.notAvailableDuration = notAvailableDuration;
    }

    public long[] getLatencyMax() {
        return latencyMax;
    }

    public void setLatencyMax(final long[] latencyMax) {
        this.latencyMax = latencyMax;
    }

    public boolean isSendLatencyFaultEnable() {
        return sendLatencyFaultEnable;
    }

    public void setSendLatencyFaultEnable(final boolean sendLatencyFaultEnable) {
        this.sendLatencyFaultEnable = sendLatencyFaultEnable;
    }

    /**
     * @Description 根據Topic路由信息 選擇一個消息隊列
     * @param       tpInfo
     * @param       lastBrokerName
     * @return      org.apache.rocketmq.common.message.MessageQueue
     **/
    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
        if (this.sendLatencyFaultEnable) {
            try {
                // 獲取 brokerName=lastBrokerName 而且 可用的一個消息隊列
                int index = tpInfo.getSendWhichQueue().getAndIncrement();
                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                    if (pos < 0)
                        pos = 0;
                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                        if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                            return mq;
                    }
                }

                // 選擇一個相對好的broker,並得到其對應的一個消息隊列,不考慮該隊列的可用性
                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
                if (writeQueueNums > 0) {
                    final MessageQueue mq = tpInfo.selectOneMessageQueue();
                    if (notBestBroker != null) {
                        mq.setBrokerName(notBestBroker);
                        mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                    }
                    return mq;
                } else {
                    latencyFaultTolerance.remove(notBestBroker);
                }
            } catch (Exception e) {
                log.error("Error occurred when selecting message queue", e);
            }

            // 選擇一個消息隊列,不考慮隊列的可用性
            return tpInfo.selectOneMessageQueue();
        }

        // 得到 lastBrokerName 對應的一個消息隊列,不考慮該隊列的可用性,未開啓容錯策略選擇消息隊列邏輯
        return tpInfo.selectOneMessageQueue(lastBrokerName);
    }

    /**
     * @Description 更新延遲容錯信息
     * @param       brokerName
     * @param       currentLatency
     * @param       isolation
     * @return      void
     **/
    public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
        if (this.sendLatencyFaultEnable) {
            long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
            this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
        }
    }

    /**
     * @Description 計算延遲對應的不可用時間
     * @param       currentLatency
     * @return      long
     **/
    private long computeNotAvailableDuration(final long currentLatency) {
        for (int i = latencyMax.length - 1; i >= 0; i--) {
            if (currentLatency >= latencyMax[i])
                return this.notAvailableDuration[i];
        }

        return 0;
    }
}
複製代碼

Producer消息發送容錯策略。默認狀況下容錯策略關閉,即sendLatencyFaultEnable=false。

若開啓容錯的策略。優先獲取可用隊列,其次選擇一個broker獲取隊列,最差返回任意broker的一個隊列。

#updateFaultItem方法更新延遲容錯信息。當 Producer發送消息時間過長,則邏輯認爲N秒內不可用。按照latencyMax,notAvailableDuration的配置,對應以下:

producer發送消息消耗時長 Broker不可用時長
>= 15000 ms 600000ms
>= 3000 ms 180000 ms
>= 2000 ms 120000 ms
>= 1000 ms 60000 ms
>= 550 ms 30000 ms
>= 100 ms 0 ms
>= 50 ms 0 ms

繼續看延遲故障容錯的接口和實現:

LatencyFaultTolerance
//延遲故障容錯接口
public interface LatencyFaultTolerance<T> {
    /**
     * 更新對應的延遲和不可用時長
     * @param name  對象
     * @param currentLatency 延遲
     * @param notAvailableDuration  不可用時長
     */
    void updateFaultItem(final T name, final long currentLatency, final long notAvailableDuration);

    /**
     * 對象是否可用
     * @param name  對象
     * @return  對象
     */
    boolean isAvailable(final T name);

    /**
     * 移除對象
     * @param name 對象
     */
    void remove(final T name);

    /**
     * 獲取一個對象
     * @return  對象
     */
    T pickOneAtLeast();
}
複製代碼
LatencyFaultToleranceImpl
// 延遲故障容錯實現。維護每一個對象的信息
public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String> {
    // 對象故障信息Table
    private final ConcurrentHashMap<String, FaultItem> faultItemTable = new ConcurrentHashMap<String, FaultItem>(16);

    // 對象選擇Index
    private final ThreadLocalIndex whichItemWorst = new ThreadLocalIndex();

    @Override
    public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
        FaultItem old = this.faultItemTable.get(name);
        if (null == old) {
            final FaultItem faultItem = new FaultItem(name);
            faultItem.setCurrentLatency(currentLatency);
            faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);

            old = this.faultItemTable.putIfAbsent(name, faultItem);
            if (old != null) {
                old.setCurrentLatency(currentLatency);
                old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
            }
        } else {
            old.setCurrentLatency(currentLatency);
            old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
        }
    }

    @Override
    public boolean isAvailable(final String name) {
        final FaultItem faultItem = this.faultItemTable.get(name);
        if (faultItem != null) {
            return faultItem.isAvailable();
        }
        return true;
    }

    @Override
    public void remove(final String name) {
        this.faultItemTable.remove(name);
    }

    /**
     * 選擇一個相對優秀的對象
     * @return
     */
    @Override
    public String pickOneAtLeast() {
        // 建立數組
        final Enumeration<FaultItem> elements = this.faultItemTable.elements();
        List<FaultItem> tmpList = new LinkedList<FaultItem>();
        while (elements.hasMoreElements()) {
            final FaultItem faultItem = elements.nextElement();
            tmpList.add(faultItem);
        }

        if (!tmpList.isEmpty()) {
            //先打亂再排序
            Collections.shuffle(tmpList);
            Collections.sort(tmpList);

            // 選擇順序在前一半的對象
            final int half = tmpList.size() / 2;
            if (half <= 0) {
                return tmpList.get(0).getName();
            } else {
                final int i = this.whichItemWorst.getAndIncrement() % half;
                return tmpList.get(i).getName();
            }
        }

        return null;
    }

    @Override
    public String toString() {
        return "LatencyFaultToleranceImpl{" +
            "faultItemTable=" + faultItemTable +
            ", whichItemWorst=" + whichItemWorst +
            '}';
    }

    // 對象故障信息。維護對象的名字、延遲、開始可用的時間
    class FaultItem implements Comparable<FaultItem> {
        // 對象名
        private final String name;
        // 延遲
        private volatile long currentLatency;
        // 開始可用時間
        private volatile long startTimestamp;

        public FaultItem(final String name) {
            this.name = name;
        }

        /**
         * 比較對象
         * 可用性 > 延遲 > 開始能夠時間
         * @param other
         * @return
         */
        @Override
        public int compareTo(final FaultItem other) {
            if (this.isAvailable() != other.isAvailable()) {
                if (this.isAvailable())
                    return -1;

                if (other.isAvailable())
                    return 1;
            }

            if (this.currentLatency < other.currentLatency)
                return -1;
            else if (this.currentLatency > other.currentLatency) {
                return 1;
            }

            if (this.startTimestamp < other.startTimestamp)
                return -1;
            else if (this.startTimestamp > other.startTimestamp) {
                return 1;
            }

            return 0;
        }

        /**
         * 是否可用:當開始可用時間大於當前時間
         * @return
         */
        public boolean isAvailable() {
            return (System.currentTimeMillis() - startTimestamp) >= 0;
        }

        @Override
        public int hashCode() {
            int result = getName() != null ? getName().hashCode() : 0;
            result = 31 * result + (int) (getCurrentLatency() ^ (getCurrentLatency() >>> 32));
            result = 31 * result + (int) (getStartTimestamp() ^ (getStartTimestamp() >>> 32));
            return result;
        }

        @Override
        public boolean equals(final Object o) {
            if (this == o)
                return true;
            if (!(o instanceof FaultItem))
                return false;

            final FaultItem faultItem = (FaultItem) o;

            if (getCurrentLatency() != faultItem.getCurrentLatency())
                return false;
            if (getStartTimestamp() != faultItem.getStartTimestamp())
                return false;
            return getName() != null ? getName().equals(faultItem.getName()) : faultItem.getName() == null;

        }

        @Override
        public String toString() {
            return "FaultItem{" +
                "name='" + name + '\'' + ", currentLatency=" + currentLatency + ", startTimestamp=" + startTimestamp + '}'; } public String getName() { return name; } public long getCurrentLatency() { return currentLatency; } public void setCurrentLatency(final long currentLatency) { this.currentLatency = currentLatency; } public long getStartTimestamp() { return startTimestamp; } public void setStartTimestamp(final long startTimestamp) { this.startTimestamp = startTimestamp; } } } 複製代碼

繼續看調用發送消息核心方法DefaultMQProducerImpl#sendKernelImpl()方法:

DefaultMQProducerImpl#sendKernelImpl()
// 發送消息核心方法。該方法真正發起網絡請求,發送消息給 Broker。
private SendResult sendKernelImpl(final Message msg,
                                      final MessageQueue mq,
                                      final CommunicationMode communicationMode,
                                      final SendCallback sendCallback,
                                      final TopicPublishInfo topicPublishInfo,
                                      final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        long beginStartTime = System.currentTimeMillis();
        // 獲取broker地址
        String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
        if (null == brokerAddr) {
            tryToFindTopicPublishInfo(mq.getTopic());
            brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
        }

        SendMessageContext context = null;
        if (brokerAddr != null) {
            // 是否使用broker vip通道,broker會開啓兩個端口對外服務
            brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);

            byte[] prevBody = msg.getBody();
            try {
                //對於MessageBatch,已在生成過程當中設置了ID
                if (!(msg instanceof MessageBatch)) {
                    // 若不是批量發送則設置惟一編號
                    MessageClientIDSetter.setUniqID(msg);
                }

                boolean topicWithNamespace = false;
                if (null != this.mQClientFactory.getClientConfig().getNamespace()) {
                    msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());
                    topicWithNamespace = true;
                }

                // 消息壓縮
                int sysFlag = 0;
                boolean msgBodyCompressed = false;
                if (this.tryToCompressMessage(msg)) {
                    sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
                    msgBodyCompressed = true;
                }

                // 事務消息
                final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
                    sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
                }

                // 發送消息校驗
                if (hasCheckForbiddenHook()) {
                    CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
                    checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
                    checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
                    checkForbiddenContext.setCommunicationMode(communicationMode);
                    checkForbiddenContext.setBrokerAddr(brokerAddr);
                    checkForbiddenContext.setMessage(msg);
                    checkForbiddenContext.setMq(mq);
                    checkForbiddenContext.setUnitMode(this.isUnitMode());
                    this.executeCheckForbiddenHook(checkForbiddenContext);
                }

                // 發送消息前邏輯
                if (this.hasSendMessageHook()) {
                    context = new SendMessageContext();
                    context.setProducer(this);
                    context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                    context.setCommunicationMode(communicationMode);
                    context.setBornHost(this.defaultMQProducer.getClientIP());
                    context.setBrokerAddr(brokerAddr);
                    context.setMessage(msg);
                    context.setMq(mq);
                    context.setNamespace(this.defaultMQProducer.getNamespace());
                    String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                    if (isTrans != null && isTrans.equals("true")) {
                        context.setMsgType(MessageType.Trans_Msg_Half);
                    }

                    if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
                        context.setMsgType(MessageType.Delay_Msg);
                    }
                    this.executeSendMessageHookBefore(context);
                }

                // 構建發送消息請求
                SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
                requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                requestHeader.setTopic(msg.getTopic());
                requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
                requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
                requestHeader.setQueueId(mq.getQueueId());
                requestHeader.setSysFlag(sysFlag);
                requestHeader.setBornTimestamp(System.currentTimeMillis());
                requestHeader.setFlag(msg.getFlag());
                requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
                requestHeader.setReconsumeTimes(0);
                requestHeader.setUnitMode(this.isUnitMode());
                requestHeader.setBatch(msg instanceof MessageBatch);
                if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
                    if (reconsumeTimes != null) {
                        requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
                        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
                    }

                    String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
                    if (maxReconsumeTimes != null) {
                        requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
                        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
                    }
                }
                // 跟據不一樣的消息模式發送消息
                SendResult sendResult = null;
                switch (communicationMode) {
                    case ASYNC:
                        Message tmpMessage = msg;
                        boolean messageCloned = false;
                        if (msgBodyCompressed) {
                            //If msg body was compressed, msgbody should be reset using prevBody.
                            //Clone new message using commpressed message body and recover origin massage.
                            //Fix bug:https://github.com/apache/rocketmq-externals/issues/66
                            tmpMessage = MessageAccessor.cloneMessage(msg);
                            messageCloned = true;
                            msg.setBody(prevBody);
                        }

                        if (topicWithNamespace) {
                            if (!messageCloned) {
                                tmpMessage = MessageAccessor.cloneMessage(msg);
                                messageCloned = true;
                            }
                            msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
                        }

                        long costTimeAsync = System.currentTimeMillis() - beginStartTime;
                        if (timeout < costTimeAsync) {
                            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                        }
                        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                            brokerAddr,
                            mq.getBrokerName(),
                            tmpMessage,
                            requestHeader,
                            timeout - costTimeAsync,
                            communicationMode,
                            sendCallback,
                            topicPublishInfo,
                            this.mQClientFactory,
                            this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
                            context,
                            this);
                        break;
                    case ONEWAY:
                    case SYNC:
                        long costTimeSync = System.currentTimeMillis() - beginStartTime;
                        if (timeout < costTimeSync) {
                            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                        }
                        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                            brokerAddr,
                            mq.getBrokerName(),
                            msg,
                            requestHeader,
                            timeout - costTimeSync,
                            communicationMode,
                            context,
                            this);
                        break;
                    default:
                        assert false;
                        break;
                }

                // 發送消息後置邏輯
                if (this.hasSendMessageHook()) {
                    context.setSendResult(sendResult);
                    this.executeSendMessageHookAfter(context);
                }

                // 返回發送結果
                return sendResult;
            } catch (RemotingException e) {
                if (this.hasSendMessageHook()) {
                    context.setException(e);
                    this.executeSendMessageHookAfter(context);
                }
                throw e;
            } catch (MQBrokerException e) {
                if (this.hasSendMessageHook()) {
                    context.setException(e);
                    this.executeSendMessageHookAfter(context);
                }
                throw e;
            } catch (InterruptedException e) {
                if (this.hasSendMessageHook()) {
                    context.setException(e);
                    this.executeSendMessageHookAfter(context);
                }
                throw e;
            } finally {
                msg.setBody(prevBody);
                msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
            }
        }

        // broker爲空拋出異常
        throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
    }
複製代碼

3.Broker接收消息

接收時序圖:

接收時序圖

對應的類圖:

類圖

SendMessageProcessor#sendMessage
//處理接收的消息請求
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
                                      RemotingCommand request) throws RemotingCommandException {
    SendMessageContext mqtraceContext;
    switch (request.getCode()) {
        case RequestCode.CONSUMER_SEND_MSG_BACK:
            return this.consumerSendMsgBack(ctx, request);
        default:
            //解析請求
            SendMessageRequestHeader requestHeader = parseRequestHeader(request);
            if (requestHeader == null) {
                return null;
            }

            // 發送Context,在hook場景下使用
            mqtraceContext = buildMsgContext(ctx, requestHeader);
            // hook:處理髮送消息前邏輯
            this.executeSendMessageHookBefore(ctx, request, mqtraceContext);

            RemotingCommand response;
            // 處理髮送消息邏輯
            if (requestHeader.isBatch()) {
                response = this.sendBatchMessage(ctx, request, mqtraceContext, requestHeader);
            } else {
                response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);
            }

            // hook:處理髮送消息後邏輯
            this.executeSendMessageHookAfter(response, mqtraceContext);
            return response;
    }
}

// 發送消息,並返回發送消息結果
private RemotingCommand sendMessage(final ChannelHandlerContext ctx,
                                        final RemotingCommand request,
                                        final SendMessageContext sendMessageContext,
                                        final SendMessageRequestHeader requestHeader) throws RemotingCommandException {

    //初始化建立響應
    final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
    final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();

    response.setOpaque(request.getOpaque());

    response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
    response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));

    log.debug("receive SendMessage request command, {}", request);

    // 若是未開始接收消息,拋出系統異常
    final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
    if (this.brokerController.getMessageStore().now() < startTimstamp) {
        response.setCode(ResponseCode.SYSTEM_ERROR);
        response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp)));
        return response;
    }

    // 消息配置(Topic配置)校驗         <a>
    response.setCode(-1);
    super.msgCheck(ctx, requestHeader, response);
    if (response.getCode() != -1) {
        return response;
    }

    final byte[] body = request.getBody();

    // 若是隊列編號小於0,從可用隊列隨機選擇
    int queueIdInt = requestHeader.getQueueId();
    TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());

    if (queueIdInt < 0) {
        queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();
    }
    // 建立MessageExtBrokerInner
    MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
    msgInner.setTopic(requestHeader.getTopic());
    msgInner.setQueueId(queueIdInt);

    //處理是否重試和死信
    if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {
        return response;
    }

    msgInner.setBody(body);
    msgInner.setFlag(requestHeader.getFlag());
    MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
    msgInner.setPropertiesString(requestHeader.getProperties());
    msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
    msgInner.setBornHost(ctx.channel().remoteAddress());
    msgInner.setStoreHost(this.getStoreHost());
    msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
    PutMessageResult putMessageResult = null;
    Map<String, String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
    // 校驗是否不容許發送事務消息
    String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
    if (traFlag != null && Boolean.parseBoolean(traFlag)) {
        if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
            response.setCode(ResponseCode.NO_PERMISSION);
            response.setRemark(
                "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                    + "] sending transaction message is forbidden");
            return response;
        }
        putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
    } else {
        putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);      // <b>
    }

    // 添加消息
    return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt);

}


private boolean handleRetryAndDLQ(SendMessageRequestHeader requestHeader, RemotingCommand response,
                                      RemotingCommand request,
                                      MessageExt msg, TopicConfig topicConfig) {
    // 對RETRY類型的消息處理。若是超過最大消費次數,則topic修改爲"%DLQ%" + 分組名,即加入死信隊列(Dead Letter Queue)
    String newTopic = requestHeader.getTopic();
    if (null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
        // 獲取訂閱分組配置
        String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
        SubscriptionGroupConfig subscriptionGroupConfig =
            this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName);
        if (null == subscriptionGroupConfig) {
            response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
            response.setRemark(
                "subscription group not exist, " + groupName + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
            return false;
        }

        // 計算最大可消費次數
        int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
        if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
            maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
        }
        int reconsumeTimes = requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes();
        if (reconsumeTimes >= maxReconsumeTimes) {      // 超過最大消費次數
            newTopic = MixAll.getDLQTopic(groupName);
            int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
            topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
                DLQ_NUMS_PER_GROUP,
                PermName.PERM_WRITE, 0
            );
            msg.setTopic(newTopic);
            msg.setQueueId(queueIdInt);
            if (null == topicConfig) {
                response.setCode(ResponseCode.SYSTEM_ERROR);
                response.setRemark("topic[" + newTopic + "] not exist");
                return false;
            }
        }
    }
    int sysFlag = requestHeader.getSysFlag();
    if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) {
        sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG;
    }
    msg.setSysFlag(sysFlag);
    return true;
}

private RemotingCommand handlePutMessageResult(PutMessageResult putMessageResult, RemotingCommand response,
                                                   RemotingCommand request, MessageExt msg,
                                                   SendMessageResponseHeader responseHeader, SendMessageContext sendMessageContext, ChannelHandlerContext ctx,
                                                   int queueIdInt) {
    if (putMessageResult == null) {
        response.setCode(ResponseCode.SYSTEM_ERROR);
        response.setRemark("store putMessage return null");
        return response;
    }
    boolean sendOK = false;

    switch (putMessageResult.getPutMessageStatus()) {
        // Success
        case PUT_OK:
            sendOK = true;
            response.setCode(ResponseCode.SUCCESS);
            break;
        case FLUSH_DISK_TIMEOUT:
            response.setCode(ResponseCode.FLUSH_DISK_TIMEOUT);
            sendOK = true;
            break;
        case FLUSH_SLAVE_TIMEOUT:
            response.setCode(ResponseCode.FLUSH_SLAVE_TIMEOUT);
            sendOK = true;
            break;
        case SLAVE_NOT_AVAILABLE:
            response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
            sendOK = true;
            break;

        // Failed
        case CREATE_MAPEDFILE_FAILED:
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark("create mapped file failed, server is busy or broken.");
            break;
        case MESSAGE_ILLEGAL:
        case PROPERTIES_SIZE_EXCEEDED:
            response.setCode(ResponseCode.MESSAGE_ILLEGAL);
            response.setRemark(
                "the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k.");
            break;
        case SERVICE_NOT_AVAILABLE:
            response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE);
            response.setRemark(
                "service not available now, maybe disk full, " + diskUtil() + ", maybe your broker machine memory too small.");
            break;
        case OS_PAGECACHE_BUSY:
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark("[PC_SYNCHRONIZED]broker busy, start flow control for a while");
            break;
        case UNKNOWN_ERROR:
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark("UNKNOWN_ERROR");
            break;
        default:
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark("UNKNOWN_ERROR DEFAULT");
            break;
    }

    String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);
    if (sendOK) {
        // 統計
        this.brokerController.getBrokerStatsManager().incTopicPutNums(msg.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1);
        this.brokerController.getBrokerStatsManager().incTopicPutSize(msg.getTopic(),
            putMessageResult.getAppendMessageResult().getWroteBytes());
        this.brokerController.getBrokerStatsManager().incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum());

        // 響應
        response.setRemark(null);

        responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());
        responseHeader.setQueueId(queueIdInt);
        responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());

        doResponse(ctx, request, response);

        // hook:設置發送成功到context
        if (hasSendMessageHook()) {
            sendMessageContext.setMsgId(responseHeader.getMsgId());
            sendMessageContext.setQueueId(responseHeader.getQueueId());
            sendMessageContext.setQueueOffset(responseHeader.getQueueOffset());

            int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount();
            int wroteSize = putMessageResult.getAppendMessageResult().getWroteBytes();
            int incValue = (int)Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT) * commercialBaseCount;

            sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_SUCCESS);
            sendMessageContext.setCommercialSendTimes(incValue);
            sendMessageContext.setCommercialSendSize(wroteSize);
            sendMessageContext.setCommercialOwner(owner);
        }
        
        // 響應給 Producer 可能發生異常,#doResponse(ctx, request, response)已經進行返回。若發生異常直接打印日誌方便排查 Broker 接收消息成功後響應是否存在異常
        return null;
    } else {
        // hook:設置發送失敗到context
        if (hasSendMessageHook()) {
            int wroteSize = request.getBody().length;
            int incValue = (int)Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT);

            sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_FAILURE);
            sendMessageContext.setCommercialSendTimes(incValue);
            sendMessageContext.setCommercialSendSize(wroteSize);
            sendMessageContext.setCommercialOwner(owner);
        }
    }
    return response;
}
複製代碼

sendMessage和sendBatchMessage處理邏輯基本一致,只是sendBatchMessage不支持對RETRY類型的消息的處理和事務消息處理。

繼續深刻<a>處的#msgCheck方法:

AbstractSendMessageProcessor#msgCheck
// 校驗消息是否正確,主要是Topic配置方面,例如:Broker 是否有寫入權限,topic配置是否存在,隊列編號是否正確。
protected RemotingCommand msgCheck(final ChannelHandlerContext ctx,
    final SendMessageRequestHeader requestHeader, final RemotingCommand response) {
    // 檢查 broker 是否有寫入權限
    if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())
        && this.brokerController.getTopicConfigManager().isOrderTopic(requestHeader.getTopic())) {
        response.setCode(ResponseCode.NO_PERMISSION);
        response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
            + "] sending message is forbidden");
        return response;
    }

    // 檢查topic是否能夠被髮送。目前是{@link MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC}不被容許發送
    if (!this.brokerController.getTopicConfigManager().isTopicCanSendMessage(requestHeader.getTopic())) {
        String errorMsg = "the topic[" + requestHeader.getTopic() + "] is conflict with system reserved words.";
        log.warn(errorMsg);
        response.setCode(ResponseCode.SYSTEM_ERROR);
        response.setRemark(errorMsg);
        return response;
    }

    TopicConfig topicConfig =
        this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
    if (null == topicConfig) { // 不存在topicConfig,則進行建立
        int topicSysFlag = 0;
        if (requestHeader.isUnitMode()) {
            if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
            } else {
                topicSysFlag = TopicSysFlag.buildSysFlag(true, false);
            }
        }

        log.warn("the topic {} not exist, producer: {}", requestHeader.getTopic(), ctx.channel().remoteAddress());
        // 跟據requestHeader和ctx建立topic配置
        topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod(
            requestHeader.getTopic(),
            requestHeader.getDefaultTopic(),
            RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
            requestHeader.getDefaultTopicQueueNums(), topicSysFlag);

        if (null == topicConfig) {
            if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                topicConfig =
                    this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
                        requestHeader.getTopic(), 1, PermName.PERM_WRITE | PermName.PERM_READ,
                        topicSysFlag);
            }
        }

        // 若仍是沒配置,則報錯提示進行手動建立topic配置
        //建立會存在不成功的狀況,例如說:defaultTopic的Topic配置不存在,又或者存在可是不容許繼承
        if (null == topicConfig) {
            response.setCode(ResponseCode.TOPIC_NOT_EXIST);
            response.setRemark("topic[" + requestHeader.getTopic() + "] not exist, apply first please!"
                + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
            return response;
        }
    }

    // 隊列編號是否正確
    int queueIdInt = requestHeader.getQueueId();
    int idValid = Math.max(topicConfig.getWriteQueueNums(), topicConfig.getReadQueueNums());
    if (queueIdInt >= idValid) {
        String errorInfo = String.format("request queueId[%d] is illegal, %s Producer: %s",
            queueIdInt,
            topicConfig.toString(),
            RemotingHelper.parseChannelRemoteAddr(ctx.channel()));

        log.warn(errorInfo);
        response.setCode(ResponseCode.SYSTEM_ERROR);
        response.setRemark(errorInfo);

        return response;
    }
    return response;
}
複製代碼

校驗消息是否正確,主要是Topic配置方面,例如:Broker 是否有寫入權限,topic配置是否存在,隊列編號是否正確。

繼續深刻<b>處DefaultMessageStore#putMessage方法:

DefaultMessageStore#putMessage
/**
 * @Description 存儲消息封裝,最終存儲須要 CommitLog 實現。
 * @param       msg
 * @return      org.apache.rocketmq.store.PutMessageResult
 **/
public PutMessageResult putMessage(MessageExtBrokerInner msg) {
    if (this.shutdown) {
        log.warn("message store has shutdown, so putMessage is forbidden");
        return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
    }

    // 從節點不容許寫入
    if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
        long value = this.printTimes.getAndIncrement();
        if ((value % 50000) == 0) {
            log.warn("message store is slave mode, so putMessage is forbidden ");
        }

        return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
    }

    // store是否容許寫入
    if (!this.runningFlags.isWriteable()) {
        long value = this.printTimes.getAndIncrement();
        if ((value % 50000) == 0) {
            log.warn("message store is not writeable, so putMessage is forbidden " + this.runningFlags.getFlagBits());
        }

        return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
    } else {
        this.printTimes.set(0);
    }

    // 消息的topic過長
    if (msg.getTopic().length() > Byte.MAX_VALUE) {
        log.warn("putMessage message topic length too long " + msg.getTopic().length());
        return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
    }

    // 消息的附加屬性過長
    if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {
        log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());
        return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);
    }

    // 操做系統緩存頁是否繁忙
    if (this.isOSPageCacheBusy()) {
        return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
    }

    long beginTime = this.getSystemClock().now();
    // 添加消息到commitLog中
    PutMessageResult result = this.commitLog.putMessage(msg);

    long eclipseTime = this.getSystemClock().now() - beginTime;
    if (eclipseTime > 500) {
        log.warn("putMessage not in lock eclipse time(ms)={}, bodyLength={}", eclipseTime, msg.getBody().length);
    }
    this.storeStatsService.setPutMessageEntireTimeMax(eclipseTime);

    if (null == result || !result.isOk()) {
        this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
    }

    return result;
}
複製代碼

存儲消息的封裝,最終存儲須要從CommitLog 實現。

相關文章
相關標籤/搜索