匠心零度 轉載請註明原創出處,謝謝!git
說明: rocketmq系列都將會以rocketmq-4.1.0-incubating進行介紹。github
在閱讀源碼時作了必定的註釋,公衆號【匠心零度】回覆:rocketmq,可得到基於rocketmq4.1.0加詳細中文代碼註釋 。歡迎你們 star、fork !緩存
廝大說過消息中間件的本質消息中間件大道至簡:一發一存一消費 ,今天主要來討論下發,就是RocketMQ網絡部署圖中用顏色標記的部分。bash
上面的圖大概就是producer發送message到broker的核心邏輯了。服務器
問題思考:網絡
把broker相關信息緩存到客戶端減小了與namesrv的交互,可是也下降了broker變化的實時性了,如何突然有一臺broker不可用了會怎麼樣呢?(後續看看rocketmq的處理),爲何producer發送會那麼快呢?本質是因爲netty的writeAndFlush?producer如何作到異步發送?同步發送?oneway發送的呢?若是發送失敗會怎麼處理呢?異步
因爲發送還涉及到定時發送,順序發送,批量發送等狀況,本篇考慮到篇幅問題就是通常的發送邏輯講解,後面繼續分享其餘狀況。ide
閱讀本篇前應該重點閱讀下:RocketMQ(二):RPC通信。函數
如何在本地調試以前文章也分享過了,在此就不提了,發送的邏輯相對於存儲以及消費來講是最簡單的(直接根據一條線不斷的跟下去基本就差很少了),而存儲最複雜,其次消費(這些過程可能一條線很差找,後續分享)。ui
備註: 能夠參考RocketMQ快速入門便可。
/**
* Start this producer instance.
* </p>
*
* <strong>
* Much internal initializing procedures are carried out to make this instance prepared, thus, it's a must to invoke * this method before sending or querying messages. * </strong> * </p> * * @throws MQClientException if there is any unexpected error. */ @Override public void start() throws MQClientException { this.defaultMQProducerImpl.start(); } 複製代碼
主要作了下列事情(核心事情):
producer是以Message對象進行發送的,看看Message構造:
public Message() {
}
public Message(String topic, byte[] body) {
this(topic, "", "", 0, body, true);
}
public Message(String topic, String tags, String keys, int flag, byte[] body, boolean waitStoreMsgOK) {
this.topic = topic;
this.flag = flag;
this.body = body;
if (tags != null && tags.length() > 0)
this.setTags(tags);
if (keys != null && keys.length() > 0)
this.setKeys(keys);
this.setWaitStoreMsgOK(waitStoreMsgOK);
}
public Message(String topic, String tags, byte[] body) {
this(topic, tags, "", 0, body, true);
}
public Message(String topic, String tags, String keys, byte[] body) {
this(topic, tags, keys, 0, body, true);
}
複製代碼
備註: 主要就是topic、tags、以及body真實內容等。
SendResult sendResult = producer.send(msg);
複製代碼
進行發送處理。下面咱們重點看看send如何處理。
發送的幾種方式:同步 異步 oneway(應該選擇哪一種,須要本身根據狀況進行判斷)
以同步發送爲例子,默認超時時間爲3s,
SendResult sendResult = producer.send(msg);
複製代碼
這個就是發送的觸發方法,咱們一直跟進去就好了,**第一初步感覺:**經過跟蹤進去第一感受就是涉及到了JUC相關使用,大量運用享元模式(本質一個map進行緩存)以及netty使用。
核心邏輯:
代碼就不大量複製了,須要的github裏面獲取基於rocketmq4.1.0加詳細中文代碼註釋 。歡迎你們 star、fork !
判斷服務是否可用? 不可用直接結束流程。
消息的驗證:
獲取topic路由信息
緩存中有就獲取,沒有就namesrv交互一次(也可能2次)因爲topic信息在broker服務端不必定存在,若是不存在就用默認的(TBW102)。
封裝請求頭信息:
// Namesrv 根據Topic獲取Broker Name、隊列數(包含讀隊列與寫隊列)
public static final int GET_ROUTEINTO_BY_TOPIC = 105;
複製代碼
namesrv服務端接受到這個請求的處理狀況。
最後獲得的路由信息相似下面的:
發送模式是sync 會有3次其餘1次
//發送模式是sync 會有3次其餘1次
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
複製代碼
選擇一個queue
如何選擇發送那個broker的那個queueid上面?(客戶端本身負載),因爲broker相關信息緩存在客戶端裏面,問題來了(因爲30s會同步一次信息,那麼在30s以內broker出現問題會怎麼樣呢? )rocketmq是這樣處理的:sendLatencyFaultEnable開關是否打開
1.打開--> 有多長時間內不可用狀況
2.不打開(默認)-->直接隨機一個(若是帶了lastBrokerName不爲空 儘可能換不是這個broker的,若是都沒有又是隨機一個)
調用sendKernelImpl發送消息 發送消息核心
根據broker的name獲取到ip地址,若是通道沒有創建而且保存。
設置設置UNIQ_id,裏面保護客戶端ip地址信息。
發送的時候 會有鉤子函數提供執行(禁止消息鉤子 ,發送消息鉤子(executeSendMessageHookBefore、executeSendMessageHookAfter)。
構建SendMessageRequestHeader,包括生成消息時間戳,因此各各機器時間最好一致,(這樣後期也能夠查下broker接受消息花了多少時間)。
根據發送消息模式,選擇發送方式
下面此次主要看同步發送狀況。
若是1狀況執行nettywriteAndFlush發送成功者跳出來,到達3狀況進行等等最多等待3s。這裏何時喚醒呢? 實際上是在broker狀況響應客戶端的時候進行喚醒的:
備註: 這裏使用CountDownLatch異步轉同步的。
若是是2狀況表示發送失敗,直接喚醒3狀況不進行阻塞了(最後拋異常表示發送失敗)
更新broker可用時間
retryAnotherBrokerWhenNotStoreOK狀況判斷
若是設置爲retryAnotherBrokerWhenNotStoreOK爲true以後,在發送失敗的時候,會選擇換一個broker。
以下異常continue,進行發送消息重試
客戶端發送流程大概到這裏就分析完成了。
若是讀完以爲有收穫的話,歡迎點贊、關注、加公衆號【匠心零度】,查閱更多精彩歷史!!!
加入知識星球,一塊兒探討!