本文我將帶領讀者朋友對RocketMQ生產者如何發送消息這一流程進行源碼層面的解析。內容偏幹,請自備白開水。java
進行消息發送的前提是先對生產者進行初始化,一段較爲常規的生產者初始化示例代碼以下apache
@Value("${rocketmq.nameServer}")
String nameSrvAddr;
@PostConstruct
public void init() {
DefaultMQProducer defaultMQProducer =
new DefaultMQProducer("PRODUCER_GROUP", true);
defaultMQProducer.setNamesrvAddr(nameSrvAddr);
// 發送失敗重試次數
defaultMQProducer.setRetryTimesWhenSendFailed(3);
try {
defaultMQProducer.start();
} catch (MQClientException e) {
throw new RuntimeException("Producer加載異常!", e);
}
}
複製代碼
咱們對初始化流程稍做分析。數組
首先初始化一個DefaultMQProducer實例,調用構造方法緩存
public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace) {
this(null, producerGroup, null, enableMsgTrace, null);
}
複製代碼
第二個參數爲是否開啓消息軌跡支持,關於消息軌跡的源碼解析能夠移步 《跟我學RocketMQ之消息軌跡實戰與源碼分析》。網絡
經過setNamesrvAddr(String namesrvAddr)設置nameserver地址;經過setRetryTimesWhenSendFailed(int retryTimesWhenSendFailed)設置重發次數,默認爲2。app
[DefaultMQProducer.java]
private int retryTimesWhenSendFailed = 2;
複製代碼
接着調用start()方法啓動defaultMQProducer異步
[DefaultMQProducer.java]
@Override
public void start() throws MQClientException {
this.setProducerGroup(withNamespace(this.producerGroup));
// 啓動producer實例
this.defaultMQProducerImpl.start();
...省略traceDispatcher相關邏輯...
}
複製代碼
能夠看到是調用的defaultMQProducerImpl的start()ide
[DefaultMQProducerImpl.java]
public void start() throws MQClientException {
this.start(true);
}
複製代碼
實際調用了start的重載方法,startFactory==true函數
// MQClientInstance引用
private MQClientInstance mQClientFactory;
public void start(final boolean startFactory) throws MQClientException {
switch (this.serviceState) {
// 若是當前服務狀態爲CREATE_JUST【剛建立】
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
this.checkConfig();
複製代碼
注意這句代碼源碼分析
// 判斷當前生產者組是否符合要求
// 改變生產者的實例id爲進程id
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();
}
複製代碼
這裏檢查生產者組是否符合要求,符合則改變生產者的instanceName爲進程id,具體邏輯爲
private String instanceName = System.getProperty("rocketmq.client.name", "DEFAULT");
public void changeInstanceNameToPID() {
if (this.instanceName.equals("DEFAULT")) {
this.instanceName = String.valueOf(UtilAll.getPid());
}
}
複製代碼
實例名爲配置文件配置獲得的,默認爲DEFAULT,咱們接着看start的重載方法
// 初始化一個MQ客戶端工廠,同一個clientId只有一個MQClientInstance
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
複製代碼
這裏初始化了MQ客戶端工廠,對於同一個clientId只有一個MQClientInstance。看一下getAndCreateMQClientInstance方法。
[MQClientManager.java]
public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
// 構建MQClientId
String clientId = clientConfig.buildMQClientId();
// 從clientId與MQClientInstance映射表factoryTable中獲取當前clientId對應的MQClientInstance
MQClientInstance instance = this.factoryTable.get(clientId);
// 若是MQClientInstance不存在則建立一個新的並放入映射表factoryTable中
if (null == instance) {
instance =
new MQClientInstance(clientConfig.cloneClientConfig(),
this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
if (prev != null) {
instance = prev;
log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
} else {
log.info("Created new MQClientInstance for clientId:[{}]", clientId);
}
}
return instance;
}
複製代碼
咱們接着看下clientId是如何生成的
/**
* 構建MQ客戶端id
* clientId=客戶端ip+@+實例名+unitName(可選)
* @return
*/
public String buildMQClientId() {
StringBuilder sb = new StringBuilder();
sb.append(this.getClientIP());
sb.append("@");
sb.append(this.getInstanceName());
if (!UtilAll.isBlank(this.unitName)) {
sb.append("@");
sb.append(this.unitName);
}
return sb.toString();
}
複製代碼
能夠看到,clientId的構造規則爲:
clientId=客戶端ip+@+實例名+unitName(可選),對於同一個JVM中的不一樣消費者和不一樣生產者在啓動時候獲取到的MQClientInstance是同一個。MQClientInstance是封裝了網絡調用相關的邏輯。
咱們接着回到start方法中
// 註冊生產者,將當前生產者加入到MQClientInstance中
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
// 註冊失敗,狀態==僅建立
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
// 註冊成功則將當前生產者組對應的topic與發佈關係放入topicPublishInfoTable註冊表
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
// 啓動MQClientFactory,若是已經啓動則不會再啓動一次
if (startFactory) {
mQClientFactory.start();
}
log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
this.defaultMQProducer.isSendMessageWithVIPChannel());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The producer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}
複製代碼
這裏向MQClientInstance進行註冊,將當前的生產者加入到MQClientInstance管理中。
經過mQClientFactory.start();啓動MQClientInstance,若是已經啓動則不會重複啓動,具體的代碼邏輯以下:
[MQClientInstance.java]
public void start() throws MQClientException {
// 同步當前實例
synchronized (this) {
switch (this.serviceState) {
// MQClientInstance狀態爲[剛建立],進行啓動操做
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// Start request-response channel
this.mQClientAPIImpl.start();
// Start various schedule tasks
this.startScheduledTask();
// Start pull service 啓動消息拉取線程
this.pullMessageService.start();
// Start rebalance service 啓動消息重負載線程
this.rebalanceService.start();
// Start push service 啓動生產者
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
// 若是當前服務的狀態爲RUNNING運行中則不重複啓動
case RUNNING:
break;
case SHUTDOWN_ALREADY:
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}
複製代碼
因爲生產者和消息者實例均使用同一個MQClientInstance,所以會在MQClientInstance中同時對生產者線程、消費拉取線程、rebalance線程進行啓動操做。
到此,消息發送的必要條件:生產者啓動過程就結束了,咱們接着研究一下消息發送的流程。
消息發送的關鍵API爲 send 方法,常見的一個API聲明爲
[DefaultMQProducer.java]
public SendResult send(Message msg,long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
msg.setTopic(withNamespace(msg.getTopic()));
return this.defaultMQProducerImpl.send(msg, timeout);
}
複製代碼
它調用的是 DefaultMQProducerImpl 中的send
[DefaultMQProducerImpl.java]
public SendResult send(Message msg,
long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}
複製代碼
調用了 sendDefaultImpl 方法,方法聲明及參數解釋以下
[DefaultMQProducerImpl.java]
private SendResult sendDefaultImpl(
Message msg, // 消息發送實體
final CommunicationMode communicationMode, // 發送類別,枚舉類型
final SendCallback sendCallback, // 若是是異步發送方式,則須要實現SendCallback回調
final long timeout // 超時時間
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
複製代碼
CommunicationMode 表示發送類別
public enum CommunicationMode {
SYNC, // 同步發送
ASYNC, // 異步發送
ONEWAY, // 直接發送,不關心發送結果
}
複製代碼
咱們詳細分析一下sendDefaultImpl方法邏輯:
[DefaultMQProducerImpl.java]
long beginStartTime = System.currentTimeMillis();
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
tryToFindTopicPublishInfo(mq.getTopic());
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
}
複製代碼
從當前的MQClientInstance中獲取broker地址,若是broker地址爲空,則向NameServer查找該Topic路由信息,咱們看一下findBrokerAddressInPublish方法
[DefaultMQProducerImpl.java]
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
// 從緩存的topic路由表中獲取topic路由
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
// 不存在則向NameServer發起查找
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
// 路由表中存在路由信息
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
// 返回路由信息
return topicPublishInfo;
} else {
// 從NameServer中獲取最新的路由信息,更新路由表
// 返回當前路由信息
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}
複製代碼
獲取到路由表信息後,開始進行發送前的校驗等邏輯,預先定義一些變量供後續使用
if (topicPublishInfo != null && topicPublishInfo.ok()) {
boolean callTimeout = false;
MessageQueue mq = null;
Exception exception = null;
SendResult sendResult = null;
複製代碼
獲取發送總次數,發送次數timesTotal是根據發送類別決定的。
若是是同步發送[CommunicationMode.SYNC],則發送總次數== 1+重試次數(retryTimesWhenSendFailed);
若是是異步發送[CommunicationMode.ASYNC],則發送總次數== 1;
// 獲取發送總次數
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
int times = 0;
String[] brokersSent = new String[timesTotal];
for (; times < timesTotal; times++) {
String lastBrokerName = null == mq ? null : mq.getBrokerName();
複製代碼
選擇根據topic路由表及broker名稱,獲取一個messageQueue,本次發送的隊列就是選取的隊列,關於選取隊列的方法selectOneMessageQueue,咱們立刻就展開看下細節
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
if (mqSelected != null) {
mq = mqSelected;
brokersSent[times] = mq.getBrokerName();
try {
beginTimestampPrev = System.currentTimeMillis();
if (times > 0) {
//Reset topic with namespace during resend.
msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
}
// 計算一下發送消耗的時間
long costTime = beginTimestampPrev - beginTimestampFirst;
複製代碼
咱們看一下selectOneMessageQueue方法是如何進行隊列的選擇的:
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
// 若是啓用了broker故障延遲機制
if (this.sendLatencyFaultEnable) {
try {
// 本次須要發送的隊列的index就是SendWhichQueue自增獲得的
int index = tpInfo.getSendWhichQueue().getAndIncrement();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
// index與當前路由表中的對列總個數取模
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;
// 獲取到當前對應的待發送隊列
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
return mq;
}
}
// 至少選擇一個broker
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
// 獲取broker中的可寫隊列數
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
// 若是可寫隊列數>0,則選取一個隊列
if (writeQueueNums > 0) {
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
}
return mq;
} else {
// 可寫隊列數 <= 0 移除該broker
latencyFaultTolerance.remove(notBestBroker);
}
} catch (Exception e) {
log.error("Error occurred when selecting message queue", e);
}
return tpInfo.selectOneMessageQueue();
}
return tpInfo.selectOneMessageQueue(lastBrokerName);
}
複製代碼
這段代碼的核心就是進行隊列的選取,選取的過程當中伴隨着故障檢測,對於故障broker可以作到儘量規避。
咱們回到消息發送邏輯sendDefaultImpl中,在每一次發送過程當中,計算本次發送的實際消耗時間,並與發送端設置的發送超時時間作比較。
若是設置的超時時間timeout小於實際消耗的時間,說明發送超時,代碼以下
if (timeout < costTime) {
callTimeout = true;
// 發送超時結束本次循環
break;
}
複製代碼
進行真正的消息發送流程,調用sendKernelImpl方法,代碼以下。關於sendKernelImpl邏輯在後文會展開論述。
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
複製代碼
根據發送類型進行邏輯執行
switch (communicationMode) {
case ASYNC:
return null;
case ONEWAY:
return null;
case SYNC:
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
continue;
}
}
return sendResult;
default:
break;
}
複製代碼
這段代碼較好理解,若是是異步方式,直接返回sendResult爲null,真實的發送結果是在回調SendCallback中獲取的;若是是ONEWAY方式,則根本不關心發送結果;
若是是同步方式,判斷髮送結果是否爲 SendStatus.SEND_OK,執行邏輯isRetryAnotherBrokerWhenNotStoreOK,這裏是消息發送失敗的重試邏輯:
若是消息未持久化重試下一個broker成功,則跳出本次循環,繼續下次重試。
此處省略異常處理邏輯,感興趣的能夠自行查看源碼。
if (sendResult != null) {
return sendResult;
}
複製代碼
若是獲取到發送結果sendResult不爲空,則返回該發送結果供業務側進行處理。
咱們重點來研究一下sendKernelImpl方法,它是消息發送的出口,也是真正發起消息發送調用的邏輯。
方法聲明以下:
private SendResult sendKernelImpl(
// 待發送的消息
final Message msg,
// 消息待發送的隊列,該隊列是經過selectOneMessageQueue選擇的
final MessageQueue mq,
// 消息發送模式
final CommunicationMode communicationMode,
// 若是是異步發送,則須要實現SendCallback
final SendCallback sendCallback,
// topic對應的路由信息表
final TopicPublishInfo topicPublishInfo,
// 發送超時時間,由客戶端指定
final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
複製代碼
獲取發送真實開始時間以及brokerAddr,這裏的邏輯與sendDefaultImpl的徹底一致再也不贅述,之因此再調用一次的緣由,應當是爲了準確性,時間就不說了;可用的brokerAddr列表是的動態拉取的,應當獲取當前最新的brokerAddr。
long beginStartTime = System.currentTimeMillis();
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
tryToFindTopicPublishInfo(mq.getTopic());
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
}
複製代碼
根據broker地址計算獲得VIP通道地址,計算方法爲ip+(默認端口號-2)
SendMessageContext context = null;
if (brokerAddr != null) {
brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
// 獲取消息體byte數組
byte[] prevBody = msg.getBody();
複製代碼
接着對消息進行前置處理,爲消息分配全局惟一Id,對於批量消息,它的全局惟一id是單獨生成的,後面細說。
if (!(msg instanceof MessageBatch)) {
MessageClientIDSetter.setUniqID(msg);
}
複製代碼
判斷是否爲事務消息
// 獲取消息屬性,key=PROPERTY_TRANSACTION_PREPARED = "TRAN_MSG";
// 判斷是否爲事務消息
final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
// 若是是事務消息,經過sysFlag與TRANSACTION_PREPARED_TYPE按位或,計算最新的sysFlag
if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
}
複製代碼
若是發送時註冊了發送鉤子方法,則先執行該發送鉤子邏輯進行前置加強,這種方式相似於切面的邏輯。
if (this.hasSendMessageHook()) {
// 設置消息發送上下文
context = new SendMessageContext();
context.setProducer(this);
context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
context.setCommunicationMode(communicationMode);
context.setBornHost(this.defaultMQProducer.getClientIP());
context.setBrokerAddr(brokerAddr);
context.setMessage(msg);
context.setMq(mq);
context.setNamespace(this.defaultMQProducer.getNamespace());
// 若是是事務消息,則上下文中標記消息類型爲事務半消息Trans_Msg_Half
String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (isTrans != null && isTrans.equals("true")) {
context.setMsgType(MessageType.Trans_Msg_Half);
}
複製代碼
關於事務消息的發送後續會單獨發文進行分析,此處不展開
// 若是是延時消息,則標記消息類型爲延時消息Delay_Msg
if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
context.setMsgType(MessageType.Delay_Msg);
}
// 執行發送前置鉤子方法
this.executeSendMessageHookBefore(context);
}
複製代碼
執行完發送前置的鉤子方法以後,開始正式執行發送邏輯,首先對消息發送請求頭進行實例化。
// 聲明並初始化消息發送請求頭
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
// 設置請求頭參數:發送者組
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
// 設置請求頭參數:topic
requestHeader.setTopic(msg.getTopic());
// 設置默認topic,其實就是MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC=TBW102,若是開啓了自動建立topic,則會建立該topic
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
// 默認topic對應的消息隊列數量
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
// 當前要發送的消息對應的隊列id
requestHeader.setQueueId(mq.getQueueId());
// 系統標識,前面邏輯計算獲得
requestHeader.setSysFlag(sysFlag);
// 消息誕生時間,系統當前時間
requestHeader.setBornTimestamp(System.currentTimeMillis());
// 消息flag
requestHeader.setFlag(msg.getFlag());
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
// 因爲是發送消息,因此設置爲0
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
// 是否爲批量消息
requestHeader.setBatch(msg instanceof MessageBatch);
複製代碼
若是當前消息的topic以MixAll.RETRY_GROUP_TOPIC_PREFIX開頭,
RETRY_GROUP_TOPIC_PREFIX = "%RETRY%";
代表當前topic其實是topic對應的重試topic,則執行消息重試發送相關的邏輯
// 若是當前消息topic爲重試topic
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
// 獲取重試次數
// 重試次數不爲null則清除重試次數
String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
if (reconsumeTimes != null) {
requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
}
// 獲取最大重試次數
String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
if (maxReconsumeTimes != null) {
requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
}
}
複製代碼
根據真實的發送類型選擇對應的消息發送方式:
首先來看一下發送方式爲:ASYNC(異步發送方式)的發送邏輯
switch (communicationMode) {
case ASYNC:
Message tmpMessage = msg;
boolean messageCloned = false;
// 若是消息body是壓縮的,則使用prevBody,prevBody就是真實的msgBody對應的byte[]
if (msgBodyCompressed) {
//If msg body was compressed, msgbody should be reset using prevBody.
//Clone new message using commpressed message body and recover origin massage.
tmpMessage = MessageAccessor.cloneMessage(msg);
messageCloned = true;
// 將壓縮的消息體恢復爲原消息體
msg.setBody(prevBody);
}
if (topicWithNamespace) {
if (!messageCloned) {
tmpMessage = MessageAccessor.cloneMessage(msg);
messageCloned = true;
}
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
}
long costTimeAsync = System.currentTimeMillis() - beginStartTime;
if (timeout < costTimeAsync) {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
}
複製代碼
調用MQClientInstance的getMQClientAPIImpl.sendMessage方法進行網絡通訊,並獲取發送結果
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
tmpMessage,
requestHeader,
timeout - costTimeAsync,
communicationMode,
sendCallback,
topicPublishInfo,
this.mQClientFactory,
this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
context,
this);
break;
複製代碼
咱們直接看一下MQClientAPIImpl.sendMessage邏輯是如何處理異步消息發送的
public SendResult sendMessage(
final String addr,
final String brokerName,
final Message msg,
final SendMessageRequestHeader requestHeader,
final long timeoutMillis,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final MQClientInstance instance,
final int retryTimesWhenSendFailed,
final SendMessageContext context,
final DefaultMQProducerImpl producer
) throws RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis();
RemotingCommand request = null;
// 若是消息是sendSmartMsg(org.apache.rocketmq.client.sendSmartMsg==true)
// 或者是批量消息
if (sendSmartMsg || msg instanceof MessageBatch) {
// 更換髮送請求頭
SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
} else {
// 若是消息是非批量發送
// 設置消息發送命令爲RequestCode.SEND_MESSAGE
request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
}
// 設置發送請求body爲消息的msgBody
request.setBody(msg.getBody());
switch (communicationMode) {
// 若是是ONEWAY方式,發出去不關心結果
case ONEWAY:
this.remotingClient.invokeOneway(addr, request, timeoutMillis);
return null;
// 若是是異步方式,判斷是否發送超時
case ASYNC:
final AtomicInteger times = new AtomicInteger();
long costTimeAsync = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTimeAsync) {
throw new RemotingTooMuchRequestException("sendMessage call timeout");
}
// 調用異步消息發送方法
this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, context, producer);
return null;
// 若是是同步發送,調用同步方法方法
case SYNC:
long costTimeSync = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTimeSync) {
throw new RemotingTooMuchRequestException("sendMessage call timeout");
}
return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);
default:
assert false;
break;
}
return null;
}
複製代碼
好像還沒結束,那麼咱們就分別看一下異步方式和同步方式對應的發送方法。
...方法聲明省略,實在是太長了...
// 異步方式調用發送邏輯
this.remotingClient.invokeAsync(addr, request, timeoutMillis,
// 發送回調的真實邏輯
new InvokeCallback() {
@Override
public void operationComplete(ResponseFuture responseFuture) {
RemotingCommand response = responseFuture.getResponseCommand();
// 若是業務發送方沒有實現sendCallback,可是有接口調用返回值response
if (null == sendCallback && response != null) {
try {
// 發送返回值sendResult爲processSendResponse處理獲得的
SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response);
// 刷新消息發送上下文,執行發送後鉤子方法
if (context != null && sendResult != null) {
context.setSendResult(sendResult);
context.getProducer().executeSendMessageHookAfter(context);
}
} catch (Throwable e) {
}
// 更新故障broker
producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
return;
}
// 對於實現了sendCallback的發送端
if (response != null) {
try {
// 獲取sendResult
SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response);
assert sendResult != null;
// 執行發送後鉤子方法
if (context != null) {
context.setSendResult(sendResult);
context.getProducer().executeSendMessageHookAfter(context);
}
// 回調發送成功回調方法onSuccess
try {
sendCallback.onSuccess(sendResult);
} catch (Throwable e) {
}
producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
} catch (Exception e) {
producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
// 對於處理異常的狀況,傳入sendCallback,回調其發送
// 失敗回調方法onException(e)
onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, e, context, false, producer);
}
...省略其餘異常流程,大同小異...
}
});
}
複製代碼
爲了方便你們理解,這裏對invokeAsync異步處理邏輯作一個小結:
private SendResult sendMessageSync(
final String addr,
final String brokerName,
final Message msg,
final long timeoutMillis,
final RemotingCommand request
) throws RemotingException, MQBrokerException, InterruptedException {
// 0.執行同步發送邏輯
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
// 1.校驗返回參 斷言
assert response != null;
// 2.處理髮送結果
return this.processSendResponse(brokerName, msg, response);
}
複製代碼
關於處理髮送結果方法processSendResponse的分析咱們放到批量消息發送的部分講解
講完了異步發送方式及下方的調用邏輯,咱們回到sendKernelImpl中,繼續看其餘的發送方式。
case ONEWAY:
case SYNC:
long costTimeSync = System.currentTimeMillis() - beginStartTime;
if (timeout < costTimeSync) {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
}
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
msg,
requestHeader,
timeout - costTimeSync,
communicationMode,
context,
this);
break;
default:
assert false;
break;
}
複製代碼
對於ONEWAY、同步方式,處理邏輯相同,都是直接調用 MQClientAPIImpl.sendMessage 這個方法的邏輯,該方法咱們已經在上文中解釋過,就再也不贅述了,讀者能夠經過 MQClientAPIImpl.sendMessage 三級標題自行去查看。
對於同步方式執行sendMessageSync方法,該方法在上文中已經講解過;對於oneway方式執行invokeOneway方法。
invokeOneWay的真實邏輯在NettyRemotingClient.java中實現,NettyRemotingClient封裝了底層的網絡交互,關於它的其餘內容後續會在網絡通訊部分的解析文章中展開。
@Override
public void invokeOneway(String addr, RemotingCommand request, long timeoutMillis) throws InterruptedException,
RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
// 根據broker地址建立NIO的通訊channel
final Channel channel = this.getAndCreateChannel(addr);
if (channel != null && channel.isActive()) {
try {
// 執行發送前置鉤子方法
doBeforeRpcHooks(addr, request);
// 執行真實的網絡調用,不關心發送結果
this.invokeOnewayImpl(channel, request, timeoutMillis);
} catch (RemotingSendRequestException e) {
log.warn("invokeOneway: send request exception, so close the channel[{}]", addr);
this.closeChannel(addr, channel);
throw e;
}
} else {
this.closeChannel(addr, channel);
throw new RemotingConnectException(addr);
}
}
複製代碼
OneWay發送方式執行完網絡通訊以後不關注返回結果,所以適用於對返回值不敏感的流程中,好比日誌上報、埋點上報等業務中。
咱們繼續回到DefaultMQProducerImpl.sendKernelImpl方法中.
// 若是註冊了發送後的鉤子函數
// 執行該鉤子函數
if (this.hasSendMessageHook()) {
context.setSendResult(sendResult);
this.executeSendMessageHookAfter(context);
}
return sendResult;
複製代碼
這段代碼發生在發送邏輯以後,不管是何種發送類型,若是包含了發送消息的鉤子方法,則將發送結果sendResult設置到發送消息上下文context中(對於sendOneWay方式,返回的sendResult爲null)。而後執行發送消息後的鉤子方法sendMessageAfter,邏輯以下:
public void executeSendMessageHookAfter(final SendMessageContext context) {
if (!this.sendMessageHookList.isEmpty()) {
for (SendMessageHook hook : this.sendMessageHookList) {
try {
hook.sendMessageAfter(context);
} catch (Throwable e) {
log.warn("failed to executeSendMessageHookAfter", e);
}
}
}
}
複製代碼
鉤子方法的註冊是經過 DefaultMQProducerImpl.registerSendMessageHook 方法註冊的,能夠註冊多個,爲一個list。所以上述executeSendMessageHookAfter方法中爲對該list的遍歷,每輪遍歷中執行該SendMessageHook的sendMessageAfter方法。
本文是源碼解析的第二篇文章,也是屬於偏硬核的一類文章,若是你能堅持讀到這裏,請給本身一個鼓勵,你已經強過不少人了。
筆者對RocketMQ的研究程度尚淺,所以不免出現紕漏,筆者會再接再勵。關於批量消息發送、事務消息發送等邏輯的分析,在接下來的文章將會陸續進行展開,咱們不見不散。
版權聲明: 原創不易,洗文可恥。除非註明,本博文章均爲原創,轉載請以連接形式標明本文地址。