RocketMq源碼的分享

一:RocketMq架構設計

1 技術架構

rocketmq_architecture_1.png

RocketMQ架構上主要分爲四部分,如上圖所示:apache

  • Producer:消息發佈的角色,支持分佈式集羣方式部署。Producer經過MQ的負載均衡模塊選擇相應的Broker集羣隊列進行消息投遞,投遞的過程支持快速失敗而且低延遲
  • Consumer:消息消費的角色,支持分佈式集羣方式部署。支持以push推,pull拉兩種模式對消息進行消費。同時也支持集羣方式和廣播方式的消費,它提供實時消息訂閱機制,這樣基本能夠知足大多數用戶的需求。
  • NameServer:NameServer是一個很是簡單的Topic路由註冊中心,其角色相似Dubbo中的zookeeper,支持Broker的動態註冊與發現。主要包括兩個功能:Broker管理,NameServer接受Broker集羣的註冊信息而且保存下來做爲路由信息的基本數據。而後提供心跳檢測機制,檢查Broker是否還存活;路由信息管理,每一個NameServer將保存關於Broker集羣的整個路由信息和用於客戶端查詢的隊列信息。而後Producer和Conumser經過NameServer就能夠知道整個Broker集羣的路由信息,從而進行消息的投遞和消費。NameServer一般也是集羣的方式部署,各實例間相互不進行信息通信。Broker是向每一臺NameServer註冊本身的路由信息,因此每個NameServer實例上面都保存一份完整的路由信息。當某個NameServer因某種緣由下線了,Broker仍然能夠向其它NameServer同步其路由信息,Producer,Consumer仍然能夠動態感知Broker的路由的信息。
  • BrokerServer:Broker主要負責消息的存儲、投遞和查詢以及服務高可用保證,爲了實現這些功能,Broker包含了如下幾個重要子模塊。
  • Remoting Module:整個Broker的實體,負責處理來自clients端的請求。
  • Client Manager:負責管理客戶端(Producer/Consumer)和維護Consumer的Topic訂閱信息
  • Store Service:提供方便簡單的API接口處理消息存儲到物理硬盤和查詢功能。
  • HA Service:高可用服務,提供Master Broker 和 Slave Broker之間的數據同步功能。
  • Index Service:根據特定的Message key對投遞到Broker的消息進行索引服務,以提供消息的快速查詢。

rocketmq_architecture_2.png

2 部署架構

rocketmq_architecture_3.png

3 RocketMQ 網絡部署特色
  • NameServer是一個幾乎無狀態節點,可集羣部署,節點之間無任何信息同步。
  • Broker部署相對複雜,Broker分爲Master與Slave,一個Master能夠對應多個Slave,可是一個Slave只能對應一個Master,Master與Slave 的對應關係經過指定相同的BrokerName,不一樣的BrokerId 來定義,BrokerId爲0表示Master,非0表示Slave。Master也能夠部署多個。每一個Broker與NameServer集羣中的全部節點創建長鏈接,定時註冊Topic信息到全部NameServer。 注意:當前RocketMQ版本在部署架構上支持一Master多Slave,但只有BrokerId=1的從服務器纔會參與消息的讀負載。
  • Producer與NameServer集羣中的其中一個節點(隨機選擇)創建長鏈接,按期從NameServer獲取Topic路由信息,並向提供Topic 服務的Master創建長鏈接,且定時向Master發送心跳。Producer徹底無狀態,可集羣部署。
  • Consumer與NameServer集羣中的其中一個節點(隨機選擇)創建長鏈接,按期從NameServer獲取Topic路由信息,並向提供Topic服務的Master、Slave創建長鏈接,且定時向Master、Slave發送心跳。Consumer既能夠從Master訂閱消息,也能夠從Slave訂閱消息,消費者在向Master拉取消息時,Master服務器會根據拉取偏移量與最大偏移量的距離(判斷是否讀老消息,產生讀I/O),以及從服務器是否可讀等因素建議下一次是從Master仍是Slave拉取。
4.結合部署架構圖,描述集羣工做流程:
  • A: 啓動NameServer,NameServer起來後監聽端口,等待Broker、Producer、Consumer連上來,至關於一個路由控制中心。
  • B:Broker啓動,跟全部的NameServer保持長鏈接,定時發送心跳包。心跳包中包含當前Broker信息(IP+端口等)以及存儲全部Topic信息。註冊成功後,NameServer集羣中就有Topic跟Broker的映射關係。
  • C:收發消息前,先建立Topic,建立Topic時須要指定該Topic要存儲在哪些Broker上,也能夠在發送消息時自動建立Topic。
  • D:Producer發送消息,啓動時先跟NameServer集羣中的其中一臺創建長鏈接,並從NameServer中獲取當前發送的Topic存在哪些Broker上,輪詢從隊列列表中選擇一個隊列,而後與隊列所在的Broker創建長鏈接從而向Broker發消息。
  • E:Consumer跟Producer相似,跟其中一臺NameServer創建長鏈接,獲取當前訂閱Topic存在哪些Broker上,而後直接跟Broker創建鏈接通道,開始消費消息。

二:RocketMQ之NameServer部分

1.NameServer的初始化和啓動流程

namesrv.png

2.RouteInfoManager類

基礎:Namesrv用來存儲路由的基礎信息都放在RouteInfoManager類中,RouteInfoManager類也能夠看作是Namesrv的資源類,不少操做都是對此類中的數據進行實時更改,
主要成員變量:緩存

private final HashMap<String/\* topic \*/, List<QueueData>> topicQueueTable;  
private final HashMap<String/\* brokerName \*/, BrokerData> brokerAddrTable;  
private final HashMap<String/\* clusterName \*/, Set<String/\* brokerName \*/\>> clusterAddrTable;  
private final HashMap<String/\* brokerAddr \*/, BrokerLiveInfo> brokerLiveTable;  
private final HashMap<String/\* brokerAddr \*/, List<String>/\* Filter Server \*/\> filterServerTable;

topicQueueTable:Topic消息隊列路由信息,消息發送時根據路由表進行負載均衡
brokerAddrTable:Broker基礎信息,包含brokerName、所屬集羣名稱、主備Broker地址
clusterAddrTable:Broke集羣信息,存儲集羣中全部Broker名稱
brokerLiveTable:Broker狀態信息,NameServer每次收到心跳包時會替換該信息
filterServerTable:Broker上的FilterServer列表,用於類模式消息過濾
QQ截圖20200116014201.png服務器

3.NameServer的路由信息註冊功能

其主要方法是RouteInfoManager類的registerBroker()方法網絡

public RegisterBrokerResult registerBroker(  
    final String clusterName,  
    final String brokerAddr,  
    final String brokerName,  
    final long brokerId,  
    final String haServerAddr,  
    final TopicConfigSerializeWrapper topicConfigWrapper,  
    final List<String> filterServerList,  
    final Channel channel)

主要流程如上圖數據結構

4.NameServer的路由信息發現功能

RMQ路由發現是非實時的,當Topic發生變化後,Namesrv不知道推送給客戶端,而是由客戶端主動拉取最新的路由。
其主要方法是RouteInfoManager類的pickupTopicRouteData()方法架構

public TopicRouteData pickupTopicRouteData(final String topic)

主要流程以下圖:
QQ圖片20200116004252.pngapp

5.NameServer的broker自動剔除機制

路由刪除會從topicQueueTable、brokerAddrTable、brokerLiveTable、filterServerTable中刪除與該Broker相關的信息。
其主要方法是RouteInfoManager類的scanNotActiveBroker()和onChannelDestroy(String remoteAddr, Channel channel)方法負載均衡

public void scanNotActiveBroker() {  
    Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();     //遍歷每一個存活broker
    while (it.hasNext()) {  
        Entry<String, BrokerLiveInfo> next = it.next();  
        long last = next.getValue().getLastUpdateTimestamp();  
        //默認超過兩分鐘,關閉和broker的channel,把本身從brokerLiveTable中剔除
        //調用 onChannelDestroy
        if ((last + BROKER\_CHANNEL\_EXPIRED\_TIME) < System.currentTimeMillis()) {  
            RemotingUtil.closeChannel(next.getValue().getChannel());  
            it.remove();  
            log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER\_CHANNEL\_EXPIRED\_TIME);  
            this.onChannelDestroy(next.getKey(), next.getValue().getChannel());  
        }  
    }  
}

onChannelDestroy()方法的主要流程以下:
1.申請寫鎖,根據brokerAddress從brokerLiveTable、filterServerTable移除
2.維護brokerAddrTable,遍歷brokerAddrTable,根據參數remoteAddr從brokerAddrTable中刪除brokerAddr,若是BrokerAddrs集合爲空,則從brokerAddrTable中刪除brokerName。
3.根據BrokerName,從ClusterAddrTable中找到Broker並從集羣中移除, 若是移除後,集羣中不包含任何Broker,則將該集羣從clusterAddrTable中移除。
4.根據brokerName,遍歷topicQueueTable,找到QueueData並從List<QueueData>移除,若是List<QueueData>爲空,則把相應的topic從topicQueueTable中移除。
5.釋放鎖異步

三:RocketMQ之Producer部分

1.Producer的啓動流程

1.1:總體流程以下圖
20200116144705.jpg
1.2: 在應用裏初始化DefaultMQProducer時候,會以Producer名或者RPCHook的任一個或兩個做爲參數初始化DefaultMQProducer對象,而後對DefaultMQProducer對象設置NameServer地址等參數,而後調用start方法啓動Producer,其實內部調用了DefaultMQProducerImpl.start 方法,其大體流程以下:
20200116132819.jpg分佈式

2.向Broker發送心跳消息

public void sendHeartbeatToAllBrokerWithLock() {  
  if (this.lockHeartbeat.tryLock()) {  
  try {  
  this.sendHeartbeatToAllBroker();  
  //向Filter過濾服務器發送REGISTER_MESSAGE_FILTER_CLASS請求碼,更新過濾服務器中的Filterclass文件
  this.uploadFilterClassSource();  
        } catch (final Exception e) {  
  log.error("sendHeartbeatToAllBroker exception", e);  
        } finally {  
  this.lockHeartbeat.unlock();  
        }  
 } else {  
  log.warn("lock heartBeat, but failed.");  
    }  
}

sendHeartbeatToAllBroker()的主要流程以下:
一、初始化 HeartbeatData 對象,將該 Producer 或 Consumer 的 ClientID 賦值給 HeartbeatData 對象的 clientID 變量。
二、遍歷 MQClientInstance.consumerTable和producerTable,根據每一個 MQConsumerInner 對象的值初始化ConsumerData 對象和ProducerData對象。
三、若 ConsumerData 集合和 ProducerData 集合都爲空,說明沒有 consumer或 produer,則不發送心跳信息。
四、若不是都爲空,則遍歷 MQClientInstance.brokerAddrTable 列表,向每一個 Broker 地址發送請求碼爲 HEART_BEAT 的心跳消息,可是當存在 Consumer 時才向全部 Broker 發送心跳消息,不然若不存在 Consumer 則只向主用 Broker 地址發送心跳消息。
5.根據broker返回結果,更新MQClientInstance.brokerVersionTable。

3.消息發送

3.1:消息發送的總體時序流程以下圖
20190228103418595.jpg
其中幾個主要方法文字說明以下:
3.2 消息的數據結構
rocketmq的消息封裝在org.apache.rocketmq.common.message類中,屬性:

private String topic;                   //消息所屬topic
    private int flag;                       //消息flag (沒啥用)
    private Map<String, String> properties; //擴展屬性 (消息的TAGS 消息的延遲等級。)
    private byte[] body;                    //消息主體內容
    private String transactionId;           //交易id

3.3 普通消息的發送
對於普通消息的發送,能夠從DefaultProducerImpl的send方法入手,

public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {  
  return send(msg, this.defaultMQProducer.getSendMsgTimeout());  
}

發送流程中幾個主要方法以下:

3.3.1 消息驗證

驗證消息是否符合規範,包括topicName、body不能爲空、length不能爲0且最大爲4_1024_1024

public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer)  
  throws MQClientException {  
  if (null == msg) {  
  throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null");  
    }  
  // topic  
  Validators.checkTopic(msg.getTopic());  
  if (null == msg.getBody()) {  
  throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null");  
    }  
  
  if (0 == msg.getBody().length) {  
  throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero");  
    }  
  
  if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {  
  throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,  
            "the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());  
    }  
}
3.3.2 查找該消息topic的路由信息

獲取主題的路由信息,查找要發送的具體的Broker節點

private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {  
  TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);  
    if (null == topicPublishInfo || !topicPublishInfo.ok()) {  
  this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());  
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);  
        topicPublishInfo = this.topicPublishInfoTable.get(topic);  
    }  
  
  if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {  
  return topicPublishInfo;  
    } else {  
  this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);  
        topicPublishInfo = this.topicPublishInfoTable.get(topic);  
        return topicPublishInfo;  
    }  
}

生產者中若是緩存了topic的路由信息則直接返回,不然向Namesrv查詢,再未查詢到則嘗試用默認主題createTopicKey=TBW102去查詢,都查詢步到則會拋出異常。Producer關於更新和維護路由信息MQClientInstance.topicRouteTable緩存操做都在updateTopicRoutInfoFromNameServer方法中。

3.3.3 根據TopicPublishInfo選擇MessageQueue

根據路由信息選擇消息隊列,返回的消息體按照broker、序號排序。
首先採用重試機制,由retryTimesWhenSendFailed指定同步方式重試次數,異步由retryTimesWhenSendAsyncFailed指定,而後使用循環執行的方式,選擇消息隊列 、發送消息,發送成功則返回,收到異常則重試。
選擇消息隊列有2種方式:
1.sendLatencyFaultEnable=false ,默認不啓用 Broker 故障延遲機制,調用 TopicPublishlnfo的slectOneMessageQueue每次獲取index的時候都是從本地線程變量ThreadLocal中獲取,沒有的狀況下就是隨機生成一個,加1取絕對值後返回,再對隊列列表的長度取模,因此在同一線程中,會輪訓的從隊列列表獲取隊列。而若是是不一樣線程的話,index是隨機生成的,因此就是隨機從隊列列表中獲取。
能夠看到選擇隊列方法的入參有一個lastBrokerName的入參,此參數的目的是在發送消息失敗的狀況下,producer會重試再次發送,而再次發送選擇的隊列須要另選一個broker,lastBrokerName就是要過濾掉失敗的broker,選擇下一個broker的隊列進行發送消息。

public MessageQueue selectOneMessageQueue(final String lastBrokerName) {  
  if (lastBrokerName == null) {  
  return selectOneMessageQueue();  
    } else {  
  int index = this.sendWhichQueue.getAndIncrement();  
        for (int i = 0; i < this.messageQueueList.size(); i++) {  
  int pos = Math.abs(index++) % this.messageQueueList.size();  
            if (pos < 0)  
  pos = 0;  
            MessageQueue mq = this.messageQueueList.get(pos);  
            if (!mq.getBrokerName().equals(lastBrokerName)) {  
  return mq;  
            }  
 }  return selectOneMessageQueue();  
    }  
}  
  
public MessageQueue selectOneMessageQueue() {  
  int index = this.sendWhichQueue.getAndIncrement();  
    int pos = Math.abs(index) % this.messageQueueList.size();  
    if (pos < 0)  
  pos = 0;  
    return this.messageQueueList.get(pos);  
}

2.sendLatencyFaultEnable=true ,啓用 Broker 障延遲機制,保證低延遲,調用MQFaultStrategy.selectOneMessageQueue
開啓延遲故障,每當發送完一次消息,無論成功仍是失敗,都會把這次存儲消息的broker給保存下來,記錄故障狀況下此broker須要延長多長時間才能再次發送,目前看到在代碼裏面寫死了,故障下30s以內是不能再向此broker發送消息了

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {  
//開啓此策略能夠保證低延遲
  if (this.sendLatencyFaultEnable) {  
  try {  
  int index = tpInfo.getSendWhichQueue().getAndIncrement();  
  //輪詢看能不能有低延遲的MessageQueue
            for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {  
  int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();  
                if (pos < 0)  
  pos = 0;  
                MessageQueue mq = tpInfo.getMessageQueueList().get(pos);  
                if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {  
  if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))  
  return mq;  
                }  
 }  
  //若是沒找到就隨機找一個
  final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();  
            int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);  
            if (writeQueueNums > 0) {  
  final MessageQueue mq = tpInfo.selectOneMessageQueue();  
                if (notBestBroker != null) {  
  mq.setBrokerName(notBestBroker);  
                    mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);  
                }  
  return mq;  
            } else {  
  latencyFaultTolerance.remove(notBestBroker);  
            }  
 } catch (Exception e) {  
 log.error("Error occurred when selecting message queue", e);  
        }  
  
  return tpInfo.selectOneMessageQueue();  
    }  
  
  return tpInfo.selectOneMessageQueue(lastBrokerName);  
}

3.3.4 真正消息的發送
1.消息發送的入口在DefaultMQProducerimpl的sendKernerlmpl方法中:根據 MessageQueue 獲取 Broker 的網絡地址 若是 MQClientlnstance.brokeraddrTable沒緩存該 Broke 的信息,則從 NameServer 主動更新一 topic 的路由信
若是路由更新後還 找不到Broker信息,則拋出異常,提示Broer不存在

String brokerAddr = this.mQClientFactory.findBrokerAddressinPublish(mq.getBrokerName());
    if (null == brokerAddr) {
        tryToFindTopicPublishinfo(mq.getTopic());
        brokerAddr = this.mQClientFactory.findBrokerAddressinPublish
            (mq.getBrokerName()); 
    }

2.爲消息分配全局惟一id,若是消息默認超 4K(compressMsgBodyOverHowmuch),會對消息體採用 zip 壓縮,並設置消息的系統標記爲 MessageSysFlag.COMPRESSED_FLAG,若是是事務Prepared消息,則設消息的系統標記爲 MessageSysFlag.TRANSACTION_PREPARED TYPE

//for MessageBatch,ID has been set in the generating process  
if (!(msg instanceof MessageBatch)) {  
  MessageClientIDSetter.setUniqID(msg);  
}  
  
boolean topicWithNamespace = false;  
if (null != this.mQClientFactory.getClientConfig().getNamespace()) {  
  msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());  
    topicWithNamespace = true;  
}  
  
int sysFlag = 0;  
boolean msgBodyCompressed = false;  
if (this.tryToCompressMessage(msg)) {  
  sysFlag |= MessageSysFlag.COMPRESSED_FLAG;  
    msgBodyCompressed = true;  
}  
  
final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);  
if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {  
  sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;  
}
  1. 若是刪了消息發送鉤子函數, 則執行消息發送以前的加強邏輯 經過 DefaultMQProducerlmpl.registerSendMessageHook 註冊鉤子處理類,而且能夠註冊多個,簡單看下鉤子處理類接口
if (this.hasSendMessageHook()) {  
  context = new SendMessageContext();  
    context.setProducer(this);  
    context.setProducerGroup(this.defaultMQProducer.getProducerGroup());  
    context.setCommunicationMode(communicationMode);  
    context.setBornHost(this.defaultMQProducer.getClientIP());  
    context.setBrokerAddr(brokerAddr);  
    context.setMessage(msg);  
    context.setMq(mq);  
    context.setNamespace(this.defaultMQProducer.getNamespace());  
    String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);  
    if (isTrans != null && isTrans.equals("true")) {  
  context.setMsgType(MessageType.Trans\_Msg\_Half);  
    }  
  
  if (msg.getProperty("\_\_STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY\_DELAY\_TIME\_LEVEL) != null) {  
  context.setMsgType(MessageType.Delay\_Msg);  
    }  
  this.executeSendMessageHookBefore(context);  
}

4.構建消息發送請求 主要包含以下重要信息:生產者組、主題名稱、默認建立主題 Key 、該主題在單個Broker默認隊列數、隊ID (隊列序號)、消息系統標( MessageSysFlag 消息發時間 、消息標記( RocketMQ對消息中的flag不作任何處理供應用程序使用) 消息擴展屬性 、消息重試次數、是不是批量消息等

SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
    requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
    requestHeader.setTopic(msg.getTopic());
    requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
    requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
    requestHeader.setQueueId(mq.getQueueId());
    requestHeader.setSysFlag(sysFlag);
    requestHeader.setBornTimestamp(System.currentTimeMillis());
    requestHeader.setFlag(msg.getFlag());
    requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
    requestHeader.setReconsumeTimes(0);
    requestHeader.setUnitMode(this.isUnitMode());
    requestHeader.setBatch(msg instanceof MessageBatch);
    if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
        String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
        if (reconsumeTimes != null) {
            requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
            MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
        }
        String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
        if (maxReconsumeTimes != null) {
            requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
            MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
        }
    }

5.根據 發送方式,同步、異步、單式進行網絡傳輸,消息發送在MQClientAPIImpl的sendMessage()方法中

public SendResult sendMessage(  
  final String addr,  
    final String brokerName,  
    final Message msg,  
    final SendMessageRequestHeader requestHeader,  
    final long timeoutMillis,  
    final CommunicationMode communicationMode,  
    final SendCallback sendCallback,  
    final TopicPublishInfo topicPublishInfo,  
    final MQClientInstance instance,  
    final int retryTimesWhenSendFailed,  
    final SendMessageContext context,  
    final DefaultMQProducerImpl producer  
) throws RemotingException, MQBrokerException, InterruptedException {  
  long beginStartTime = System.currentTimeMillis();  
    RemotingCommand request = null;  
    if (sendSmartMsg || msg instanceof MessageBatch) {  
  SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);  
        request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND\_BATCH\_MESSAGE : RequestCode.SEND\_MESSAGE\_V2, requestHeaderV2);  
    } else {  
  request = RemotingCommand.createRequestCommand(RequestCode.SEND\_MESSAGE, requestHeader);  
    }  
  request.setBody(msg.getBody());  
    switch (communicationMode) {  
  case ONEWAY:  
            this.remotingClient.invokeOneway(addr, request, timeoutMillis);  
            return null;  
        case ASYNC:  
            final AtomicInteger times = new AtomicInteger();  
            long costTimeAsync = System.currentTimeMillis() - beginStartTime;  
            if (timeoutMillis < costTimeAsync) {  
  throw new RemotingTooMuchRequestException("sendMessage call timeout");  
            }  
  this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,  
                retryTimesWhenSendFailed, times, context, producer);  
            return null;  
        case SYNC:  
            long costTimeSync = System.currentTimeMillis() - beginStartTime;  
            if (timeoutMillis < costTimeSync) {  
  throw new RemotingTooMuchRequestException("sendMessage call timeout");  
            }  
  return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);  
        default:  
            assert false;  
            break;  
    }  
  
  return null;  
}
  1. 若是註冊了消息發送鉤子函數,執行after邏輯。注意,就算消息發送過程當中發生 RemotingException MQBrokerException InterruptedException時,該方法也會執行
if (this.hasSendMessageHook()) {
        context.setException(e);
        this.executeSendMessageHookAfter(context);
    }
相關文章
相關標籤/搜索