先來看下producer核心的類設計,以下圖:java
一、核心發佈消息的類DefaultMQProducer
,繼承自MQProducer接口,此接口定義了一系列發送消息的方法,如普通消息,順序消息,延時消息等,最終進行網絡通訊會交給MQClientAPIImpl
處理。spring
二、rocketmq從4.1.3版本開始又支持了事務消息,由TransactionMQProducer
類提供(以後會有專門的文章進行詳細解讀事務消息)shell
咱們看到DefaultMQProducer
繼承了一個客戶端的公共配置類ClientConfig
(與consumer公用),其實就是一個普通的javaBean,既能夠代碼中設置屬性,也能夠集成spring來配置服務器
參數名 | 默認值 | 說明 |
---|---|---|
namesrvAddr | 無 | nameserver的地址列表,用分號隔開 |
clientIP | 本機ip地址 | 客戶端ip地址,有時候沒法識別,須要手動配置 |
instanceName | DEFAULT | 客戶端實例名稱,客戶端建立的多個 Producer、Consumer 實際是共用一個內部實例(這個實例包含網絡鏈接、線程資源等) |
clientCallbackExecutorThreads | cpu核數 | 通訊層客戶端處理請求的線程數 |
pollNameServerInterval | 30000 | 輪詢nameserver的時間間隔,單位ms |
heartbeatBrokerInterval | 30000 | 向broker發送心跳的時間間隔,單位ms |
persistConsumerOffsetInterval | 5000 | 持久化 Consumer 消費進度間隔時間,單位ms |
producer獨有的配置:網絡
參數名 | 默認值 | 說明 |
---|---|---|
producerGroup | DEFAULT_PRODUCER | Producer組名,相同分組的producer應該有相同的發送消息邏輯 |
createTopicKey | AUTO_CREATE_TOPIC_KEY | 自動建立topic時,以此默認topic爲模板建立指定topic |
defaultTopicQueueNums | 4 | 自動建立topic隊列數量 |
sendMsgTimeout | 3000 | 發送消息的超時時間,單位ms |
compressMsgBodyOverHowmuch | 4098 | 消息體超過多大會進行壓縮,單位字節 |
retryTimesWhenSendFailed | 2 | 同步發送消息,發送失敗重試次數 |
retryTimesWhenSendAsyncFailed | 2 | 異步發送消息,發送失敗的重試次數 |
retryAnotherBrokerWhenNotStoreOK | false | 同步發送消息,消息存儲失敗是否重試其餘broker |
maxMessageSize | 4194304 | 客戶端限制消息的大小,默認4M |
TransactionListener | 事務消息時,必須設置的回查監聽器 |
咱們在建立producer時必需要指定一個group,這裏有兩個做用:負載均衡
簡單說明下整個啓動流程:dom
一、首先在DefaultMQProducerImpl
中會作一些參數校驗,如group是否合法;而後會建立MQClientInstance
實例,此實例包含網絡鏈接、線程資源等,相同的clientId會共享此實例,因此經過MQClientManager
來管理。異步
二、核心的啓動流程在MQClientInstance
類中,若是nameserver地址沒有配置的話,會先經過靜態的http服務器地址去抓取nameserver的地址;再則啓動netty客戶端。工具
三、啓動一些定時任務,跟producer有關的以下幾個:this
RocketMQ 有多種配置方式能夠令客戶端找到 NameServer, 而後經過 NameServer 再找到 Broker,分別以下,
優先級由高到低,高優優先級會覆蓋低優先級
一、代碼中指定 Name Server 地址
producer.setNamesrvAddr("192.168.0.1:9876;192.168.0.2:9876");
二、啓動參數指定
-Drocketmq.namesrv.addr=192.168.0.1:9876;192.168.0.2:9876
三、環境變量指定 Name Server 地址
export NAMESRV_ADDR=192.168.0.1:9876;192.168.0.2:9876
四、HTTP 靜態服務器尋址(默認)
若是以上三種都沒有設置name server的地址,客戶端啓動後先會訪問一個靜態http服務器獲取name server的地址,而後會啓動一個定時任務訪問這個靜態 HTTP 服務器,地址以下:
http://jmenv.tbsite.net:8080/rocketmq/nsaddr
這是默認的地址,固然你也能夠更改,作以下設置:
代碼:
System.setProperty("rocketmq.namesrv.domain","localhost"); System.setProperty("rocketmq.namesrv.domain.subgroup","nameServer")
或者啓動參數指定:
-Drocketmq.namesrv.domain=localhost -Drocketmq.namesrv.domain.subgroup=nameServer
以上設置後http服務器地址就變成:
http://localhsot:8080/rocketmq/nameServer
這個 URL 的返回內容格式以下:
192.168.0.1:9876;192.168.0.2:9876
客戶端每隔 2 分鐘訪問一次這個 HTTP 服務器,並更新本地的 Name Server 地址。
推薦使用 HTTP 靜態服務器尋址方式,好處是客戶端部署簡單,且 Name Server 集羣能夠熱升級。
一、broker在啓動的時候經過參數autoCreateTopicEnable
設置是否自動建立topic,默認爲true,此時會建立一個名爲TBW102
(4.3版本已經更名爲AUTO_CREATE_TOPIC_KEY
)的topic(參見類TopicConfigManager
),broker在向namesrv註冊時會把默認的topic註冊上去。若是設置false,則不會註冊。
二、producer在發送消息時會在本地獲取路由信息,第一次發送的話本地確定沒有,就會去namesrv獲取,若是此時namesrv也沒有,則會獲取TBW102的topic信息(參見DefaultMQProducerImpl.tryToFindTopicPublishInfo),以此爲模板建立topic,而後選擇topic下的一臺broker發,broker建立後,會經過心跳註冊到namesrv上。
三、若是autoCreateTopicEnable設置false的話,producer發送消息會報找不到路由的異常,此時必須手動建立topic。
能夠經過管理工具mqadmin來手動建立topic
sh mqadmin updateTopic -c [集羣名稱] -n [nameserver地址] -t [topic名稱] -w [寫隊列數] -r [讀隊列數]
這裏講一下自動建立的topic的隊列數如何設置,首先broker建立的模板topic=AUTO_CREATE_TOPIC_KEY
的隊列是8,參見類TopicConfigManager:
public TopicConfigManager(BrokerController brokerController) { //省略無關代碼 if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) { String topic = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC; TopicConfig topicConfig = new TopicConfig(topic); this.systemTopicList.add(topic); topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig() .getDefaultTopicQueueNums()); topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig() .getDefaultTopicQueueNums()); int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE; topicConfig.setPerm(perm); this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); } //省略無關代碼 }
BrokerConfig:
private int defaultTopicQueueNums = 8;
DefaultMQProducer端默認知道要建立的topic的隊列數是4
private volatile int defaultTopicQueueNums = 4;
在MQClientInstance
類的方法updateTopicRouteInfoFromNameServer
中有這樣一段邏輯:
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault, DefaultMQProducer defaultMQProducer) { //省略無關代碼 for (QueueData data : topicRouteData.getQueueDatas()) { int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums()); data.setReadQueueNums(queueNums); data.setWriteQueueNums(queueNums); } //省略無關代碼 }
建立隊列是取二者最小的一個,也就是4,因此要設置topic的隊列數量,很明顯了設置broker的defaultTopicQueueNums的值和DefaultMQProducer的defaultTopicQueueNums值就能夠了。這是自動建立Topic時隊列數的設置方法,上面也提到生成環境通常不會開啓自動建立Topic的功能,能夠經過上面的手動建立Topic的指令來設置讀寫隊列數。你可能注意到了Topic下有讀寫隊兩個隊列數,分別表明上面意思呢?讀寫隊列實際上是個邏輯概念,一個broker下topic的總隊列數是以寫隊列爲準,而讀隊列意思是容許多少隊列能夠被消費者消費,也就是說讀多寫少的狀況下,沒有問題,隊列均可以被消費掉,若是寫多讀少的話,那麼就會存在隊列不會被消費的狀況。
前面咱們講到了如何獲取topic的路由信息,如何建立topic的隊列數,一個topic下有多個隊列,又能夠分佈在不一樣的broker上面,因此topic的總隊列數應該是全部broker上的topic下隊列數的總和。
備註:若是手動在每一個broker上分別建立topic的話,相同topic在不一樣broker上的隊列數能夠不同。
那麼問題來了,在發送消息時根據怎麼樣的策略來選擇一個隊列發送呢?rocketmq提供了一個MQFaultStrategy
策略類來負責選擇隊列,這裏會有一個參數sendLatencyFaultEnable
是否開啓延遲故障,
該值默認爲false
,在不開啓的狀況下,相同線程發送消息是輪詢topic下的全部隊列,不一樣線程發送是隨機的,核心代碼以下:
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { if (this.sendLatencyFaultEnable) { //省略沒必要要的代碼...... } return tpInfo.selectOneMessageQueue(lastBrokerName); } //以上代碼邏輯參見類MQFaultStrategy.selectOneMessageQueue public MessageQueue selectOneMessageQueue(final String lastBrokerName) { if (lastBrokerName == null) { return selectOneMessageQueue(); } else { //省略沒必要要的代碼...... } } public MessageQueue selectOneMessageQueue() { int index = this.sendWhichQueue.getAndIncrement(); int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0) pos = 0; return this.messageQueueList.get(pos); } //以上代碼邏輯參見類TopicPublishInfo public int getAndIncrement() { Integer index = this.threadLocalIndex.get();//ThreadLocal中獲取 if (null == index) {//爲空,隨機生成一個 index = Math.abs(random.nextInt()); if (index < 0) index = 0; this.threadLocalIndex.set(index); } index = Math.abs(index + 1); if (index < 0) index = 0; this.threadLocalIndex.set(index); return index; } //以上代碼參見類ThreadLocalIndex
每次獲取index的時候都是從本地線程變量ThreadLocal中獲取,沒有的狀況下就是隨機生成一個,加1取絕對值後返回,再對隊列列表的長度取模,因此在同一線程中,會輪訓的從隊列列表獲取隊列。而若是是不一樣線程的話,index是隨機生成的,因此就是隨機從隊列列表中獲取。以下圖所示:
能夠看到選擇隊列方法的入參有一個lastBrokerName
的入參,此參數的目的是在發送消息失敗的狀況下,producer會重試再次發送,而再次發送選擇的隊列須要另選一個broker,lastBrokerName
就是要過濾掉失敗的broker,選擇下一個broker的隊列進行發送消息。
producer的send方法自己支持內部重試,重試邏輯以下:
一、最大重試次數默認2次,能夠經過參數retryTimesWhenSendFailed
設置
二、發送失敗,則輪詢到下一個broker,若是此時只有一個broker在線呢?那就會輪訓這個broker下的其餘隊列。
三、這個方法的總耗時時間不超過 sendMsgTimeout 設置的值,默認爲3s。
若是發送消息,broker返回結果超時,這種超時不會進行重試了;若是是方法自己耗時超過sendMsgTimeout ,還將來得及調用發送消息,此時的超時也不會重試。
以上策略其實也很難保證同步發送消息必定成功,若是應用要保證消息不丟失,最好先把消息存儲到db,後臺啓線程定時重試,確保消息必定存儲到broker。