一個應用盡量用一個Topic,而消息子類型則能夠用tags來標識。java
tags可由應用自行設置,只有生產者在發送消息設置了tags,消費方在訂閱消息時才能夠利用tags經過broker作消息過濾:算法
message.setTags("TagA");
每一個消息在業務層面的惟一標識碼要設置到keys字段,方便往後定位消息丟失問題。spring
服務器會爲每一個消息建立哈希索引,應用能夠經過topic、key來查詢這條消息內容,以及消息被誰消費。數據庫
哈希索引,請保證key儘量惟一,避免潛在的哈希衝突。後端
// 訂單Id String orderId = "20034568923546"; message.setKeys(orderId);
消息發送成功或者失敗要打印消息日誌,務必要打印SendResult和key字段。send消息方法只要不拋異常,就表明發送成功。發送成功會有多個狀態,在sendResult裏定義。如下對每一個狀態進行說明:緩存
消息發送成功。要注意的是消息發送成功也不意味着它是可靠的。要確保不會丟失任何消息,還應啓用同步Master服務器或同步刷盤,即SYNC_MASTER或SYNC_FLUSH。安全
消息發送成功可是服務器刷盤超時。此時消息已經進入服務器隊列(內存),只有服務器宕機,消息纔會丟失。消息存儲配置參數中能夠設置刷盤方式和同步刷盤時間長度,若是Broker服務器設置了刷盤方式爲同步刷盤,即FlushDiskType=SYNC_FLUSH(默認爲異步刷盤方式),當Broker服務器未在同步刷盤時間內(默認爲5s)完成刷盤,則將返回該狀態——刷盤超時。服務器
消息發送成功,可是服務器同步到Slave時超時。此時消息已經進入服務器隊列,只有服務器宕機,消息纔會丟失。若是Broker服務器的角色是同步Master,即SYNC_MASTER(默認是異步Master即ASYNC_MASTER),而且從Broker服務器未在同步刷盤時間(默認爲5秒)內完成與主服務器的同步,則將返回該狀態——數據同步到Slave服務器超時。網絡
消息發送成功,可是此時Slave不可用。若是Broker服務器的角色是同步Master,即SYNC_MASTER(默認是異步Master服務器即ASYNC_MASTER),但沒有配置slave Broker服務器,則將返回該狀態——無Slave服務器可用。數據結構
Producer的send方法自己支持內部重試,重試邏輯以下:
必定程度上保證了消息能夠發送成功。若是業務對消息可靠性要求比較高,建議應用增長相應的重試邏輯:好比調用send同步方法發送失敗時,嘗試將消息存儲到DB,而後由後臺線程定時重試,確保消息必定到達Broker。
那麼DB重試方案爲何沒有集成到MQ客戶端內部,而要求應用本身完成?
消息發送過程:
因此,一次消息發送的耗時時間是上述三個步驟的總和,而某些場景要求耗時很是短,可是對可靠性要求並不高,例如日誌收集類應用,此類應用能夠採用oneway形式調用,oneway形式只發送請求不等待應答,而發送請求在客戶端實現層面僅僅是一個os系統調用的開銷,即將數據寫入客戶端的socket緩衝區,此過程耗時一般在微秒級。
RocketMQ沒法避免消息重複(Exactly-Once),因此若是業務對消費重複很是敏感,務必要在業務層面進行去重處理。能夠藉助關係數據庫進行去重。首先須要肯定消息的惟一鍵,能夠是msgId,也能夠是消息內容中的惟一標識字段,例如訂單Id等。在消費以前判斷惟一鍵是否在關係數據庫中存在。若是不存在則插入,並消費,不然跳過。(實際過程要考慮原子性問題,判斷是否存在能夠嘗試插入,若是報主鍵衝突,則插入失敗,直接跳過)
msgId必定是全局惟一標識符,可是實際使用中,可能會存在相同的消息有兩個不一樣msgId的狀況(消費者主動重發、因客戶端重投機制致使的重複等),這種狀況就須要使業務字段進行重複消費。
絕大部分消息消費行爲都屬於 IO 密集型,便可能是操做數據庫,或者調用 RPC,這類消費行爲的消費速度在於後端數據庫或者外系統的吞吐量,經過增長消費並行度,能夠提升總的消費吞吐量,可是並行度增長到必定程度,反而會降低。因此,應用必需要設置合理的並行度。 以下有幾種修改消費並行度的方法:
某些業務流程若是支持批量方式消費,則能夠很大程度上提升消費吞吐量,例如訂單扣款類應用,一次處理一個訂單耗時 1 s,一次處理 10 個訂單可能也只耗時 2 s,這樣便可大幅度提升消費的吞吐量,經過設置 consumer的 consumeMessageBatchMaxSize 返個參數,默認是 1,即一次只消費一條消息,例如設置爲 N,那麼每次消費的消息數小於等於 N。
發生消息堆積時,若是消費速度一直追不上發送速度,若是業務對數據要求不高的話,能夠選擇丟棄不重要的消息。例如,當某個隊列的消息數堆積到100000條以上,則嘗試丟棄部分或所有消息,這樣就能夠快速追上發送消息的速度。示例代碼以下:
public ConsumeConcurrentlyStatus consumeMessage( List<MessageExt> msgs, ConsumeConcurrentlyContext context) { long offset = msgs.get(0).getQueueOffset(); String maxOffset = msgs.get(0).getProperty(Message.PROPERTY_MAX_OFFSET); long diff = Long.parseLong(maxOffset) - offset; if (diff > 100000) { // TODO 消息堆積狀況的特殊處理 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } // TODO 正常消費過程 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }
舉例以下,某條消息的消費過程以下:
這條消息的消費過程當中有4次與 DB的 交互,若是按照每次 5ms 計算,那麼總共耗時 20ms,假設業務計算耗時 5ms,那麼總過耗時 25ms,因此若是能把 4 次 DB 交互優化爲 2 次,那麼總耗時就能夠優化到 15ms,即整體性能提升了 40%。因此應用若是對時延敏感的話,能夠把DB部署在SSD硬盤,相比於SCSI磁盤,前者的RT會小不少。
若是消息量較少,建議在消費入口方法打印消息,消費耗時等,方便後續排查問題。
public ConsumeConcurrentlyStatus consumeMessage( List<MessageExt> msgs, ConsumeConcurrentlyContext context) { log.info("RECEIVE_MSG_BEGIN: " + msgs.toString()); // TODO 正常消費過程 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }
若是能打印每條消息消費耗時,那麼在排查消費慢等線上問題時,會更方便。
第一件須要注意的事情是,不一樣的消費者組能夠獨立的消費一些 topic,而且每一個消費者組都有本身的消費偏移量,請確保同一組內的每一個消費者訂閱信息保持一致。
消費者將鎖定每一個消息隊列,以確保他們被逐個消費,雖然這將會致使性能降低,可是當你關心消息順序的時候會頗有用。咱們不建議拋出異常,你能夠返回 ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT 做爲替代。
顧名思義,消費者將併發消費這些消息,建議你使用它來得到良好性能,咱們不建議拋出異常,你能夠返回 ConsumeConcurrentlyStatus.RECONSUME_LATER 做爲替代。
對於併發的消費監聽器,你能夠返回 RECONSUME_LATER 來通知消費者如今不能消費這條消息,而且但願能夠稍後從新消費它。而後,你能夠繼續消費其餘消息。對於有序的消息監聽器,由於你關心它的順序,因此不能跳過消息,可是你能夠返回SUSPEND_CURRENT_QUEUE_A_MOMENT 告訴消費者等待片刻。
不建議阻塞監聽器,由於它會阻塞線程池,並最終可能會終止消費進程
消費者使用 ThreadPoolExecutor 在內部對消息進行消費,因此你能夠經過設置 setConsumeThreadMin 或 setConsumeThreadMax 來改變它。
當創建一個新的消費者組時,須要決定是否須要消費已經存在於 Broker 中的歷史消息CONSUME_FROM_LAST_OFFSET 將會忽略歷史消息,並消費以後生成的任何消息。CONSUME_FROM_FIRST_OFFSET 將會消費每一個存在於 Broker 中的信息。你也可使用 CONSUME_FROM_TIMESTAMP 來消費在指定時間戳後產生的消息。
Broker 角色分爲 ASYNC_MASTER(異步主機)、SYNC_MASTER(同步主機)以及SLAVE(從機)。若是對消息的可靠性要求比較嚴格,能夠採用 SYNC_MASTER加SLAVE的部署方式。若是對消息可靠性要求不高,能夠採用ASYNC_MASTER加SLAVE的部署方式。若是隻是測試方便,則能夠選擇僅ASYNC_MASTER或僅SYNC_MASTER的部署方式。
SYNC_FLUSH(同步刷新)相比於ASYNC_FLUSH(異步處理)會損失不少性能,可是也更可靠,因此須要根據實際的業務場景作好權衡。
參數名 | 默認值 | 說明 |
---|---|---|
listenPort | 10911 | 接受客戶端鏈接的監聽端口 |
namesrvAddr | null | nameServer 地址 |
brokerIP1 | 網卡的 InetAddress | 當前 broker 監聽的 IP |
brokerIP2 | 跟 brokerIP1 同樣 | 存在主從 broker 時,若是在 broker 主節點上配置了 brokerIP2 屬性,broker 從節點會鏈接主節點配置的 brokerIP2 進行同步 |
brokerName | null | broker 的名稱 |
brokerClusterName | DefaultCluster | 本 broker 所屬的 Cluser 名稱 |
brokerId | 0 | broker id, 0 表示 master, 其餘的正整數表示 slave |
storePathCommitLog | $HOME/store/commitlog/ | 存儲 commit log 的路徑 |
storePathConsumerQueue | $HOME/store/consumequeue/ | 存儲 consume queue 的路徑 |
mappedFileSizeCommitLog | 1024 * 1024 * 1024(1G) | commit log 的映射文件大小 |
deleteWhen | 04 | 在天天的什麼時間刪除已經超過文件保留時間的 commit log |
fileReservedTime | 72 | 以小時計算的文件保留時間 |
brokerRole | ASYNC_MASTER | SYNC_MASTER/ASYNC_MASTER/SLAVE |
flushDiskType | ASYNC_FLUSH | SYNC_FLUSH/ASYNC_FLUSH SYNC_FLUSH 模式下的 broker 保證在收到確認生產者以前將消息刷盤。ASYNC_FLUSH 模式下的 broker 則利用刷盤一組消息的模式,能夠取得更好的性能。 |
RocketMQ 中,Name Servers 被設計用來作簡單的路由管理。其職責包括:
相對於RocketMQ的Broker集羣,生產者和消費者都是客戶端。本小節主要描述生產者和消費者公共的行爲配置。
RocketMQ能夠令客戶端找到Name Server, 而後經過Name Server再找到Broker。以下所示有多種配置方式,優先級由高到低,高優先級會覆蓋低優先級。
producer.setNamesrvAddr("192.168.0.1:9876;192.168.0.2:9876"); consumer.setNamesrvAddr("192.168.0.1:9876;192.168.0.2:9876");
-Drocketmq.namesrv.addr=192.168.0.1:9876;192.168.0.2:9876
export NAMESRV_ADDR=192.168.0.1:9876;192.168.0.2:9876
客戶端啓動後,會定時訪問一個靜態HTTP服務器,地址以下:http://jmenv.tbsite.net:8080/rocketmq/nsaddr,這個URL的返回內容以下:
192.168.0.1:9876;192.168.0.2:9876
客戶端默認每隔2分鐘訪問一次這個HTTP服務器,並更新本地的Name Server地址。URL已經在代碼中硬編碼,可經過修改/etc/hosts文件來改變要訪問的服務器,例如在/etc/hosts增長以下配置:
10.232.22.67 jmenv.taobao.net
推薦使用HTTP靜態服務器尋址方式,好處是客戶端部署簡單,且Name Server集羣能夠熱升級。
DefaultMQProducer、TransactionMQProducer、DefaultMQPushConsumer、DefaultMQPullConsumer都繼承於ClientConfig類,ClientConfig爲客戶端的公共配置類。客戶端的配置都是get、set形式,每一個參數均可以用spring來配置,也能夠在代碼中配置,例如namesrvAddr這個參數能夠這樣配置,producer.setNamesrvAddr(「192.168.0.1:9876」),其餘參數同理。
參數名 | 默認值 | 說明 |
---|---|---|
namesrvAddr | Name Server地址列表,多個NameServer地址用分號隔開 | |
clientIP | 本機IP | 客戶端本機IP地址,某些機器會發生沒法識別客戶端IP地址狀況,須要應用在代碼中強制指定 |
instanceName | DEFAULT | 客戶端實例名稱,客戶端建立的多個Producer、Consumer實際是共用一個內部實例(這個實例包含網絡鏈接、線程資源等) |
clientCallbackExecutorThreads | 4 | 通訊層異步回調線程數 |
pollNameServerInteval | 30000 | 輪詢Name Server間隔時間,單位毫秒 |
heartbeatBrokerInterval | 30000 | 向Broker發送心跳間隔時間,單位毫秒 |
persistConsumerOffsetInterval | 5000 | 持久化Consumer消費進度間隔時間,單位毫秒 |
參數名 | 默認值 | 說明 |
---|---|---|
producerGroup | DEFAULT_PRODUCER | Producer組名,多個Producer若是屬於一個應用,發送一樣的消息,則應該將它們歸爲同一組 |
createTopicKey | TBW102 | 在發送消息時,自動建立服務器不存在的topic,須要指定Key,該Key可用於配置發送消息所在topic的默認路由。 |
defaultTopicQueueNums | 4 | 在發送消息,自動建立服務器不存在的topic時,默認建立的隊列數 |
sendMsgTimeout | 10000 | 發送消息超時時間,單位毫秒 |
compressMsgBodyOverHowmuch | 4096 | 消息Body超過多大開始壓縮(Consumer收到消息會自動解壓縮),單位字節 |
retryAnotherBrokerWhenNotStoreOK | FALSE | 若是發送消息返回sendResult,可是sendStatus!=SEND_OK,是否重試發送 |
retryTimesWhenSendFailed | 2 | 若是消息發送失敗,最大重試次數,該參數只對同步發送模式起做用 |
maxMessageSize | 4MB | 客戶端限制的消息大小,超過報錯,同時服務端也會限制,因此須要跟服務端配合使用。 |
transactionCheckListener | 事務消息回查監聽器,若是發送事務消息,必須設置 | |
checkThreadPoolMinSize | 1 | Broker回查Producer事務狀態時,線程池最小線程數 |
checkThreadPoolMaxSize | 1 | Broker回查Producer事務狀態時,線程池最大線程數 |
checkRequestHoldMax | 2000 | Broker回查Producer事務狀態時,Producer本地緩衝請求隊列大小 |
RPCHook | null | 該參數是在Producer建立時傳入的,包含消息發送前的預處理和消息響應後的處理兩個接口,用戶能夠在第一個接口中作一些安全控制或者其餘操做。 |
參數名 | 默認值 | 說明 |
---|---|---|
consumerGroup | DEFAULT_CONSUMER | Consumer組名,多個Consumer若是屬於一個應用,訂閱一樣的消息,且消費邏輯一致,則應該將它們歸爲同一組 |
messageModel | CLUSTERING | 消費模型支持集羣消費和廣播消費兩種 |
consumeFromWhere | CONSUME_FROM_LAST_OFFSET | Consumer啓動後,默認從上次消費的位置開始消費,這包含兩種狀況:一種是上次消費的位置未過時,則消費從上次停止的位置進行;一種是上次消費位置已通過期,則從當前隊列第一條消息開始消費 |
consumeTimestamp | 半個小時前 | 只有當consumeFromWhere值爲CONSUME_FROM_TIMESTAMP時才起做用。 |
allocateMessageQueueStrategy | AllocateMessageQueueAveragely | Rebalance算法實現策略 |
subscription | 訂閱關係 | |
messageListener | 消息監聽器 | |
offsetStore | 消費進度存儲 | |
consumeThreadMin | 10 | 消費線程池最小線程數 |
consumeThreadMax | 20 | 消費線程池最大線程數 |
consumeConcurrentlyMaxSpan | 2000 | 單隊列並行消費容許的最大跨度 |
pullThresholdForQueue | 1000 | 拉消息本地隊列緩存消息最大數 |
pullInterval | 0 | 拉消息間隔,因爲是長輪詢,因此爲0,可是若是應用爲了流控,也能夠設置大於0的值,單位毫秒 |
consumeMessageBatchMaxSize | 1 | 批量消費,一次消費多少條消息 |
pullBatchSize | 32 | 批量拉消息,一次最多拉多少條 |
參數名 | 默認值 | 說明 |
---|---|---|
consumerGroup | DEFAULT_CONSUMER | Consumer組名,多個Consumer若是屬於一個應用,訂閱一樣的消息,且消費邏輯一致,則應該將它們歸爲同一組 |
brokerSuspendMaxTimeMillis | 20000 | 長輪詢,Consumer拉消息請求在Broker掛起最長時間,單位毫秒 |
consumerTimeoutMillisWhenSuspend | 30000 | 長輪詢,Consumer拉消息請求在Broker掛起超過指定時間,客戶端認爲超時,單位毫秒 |
consumerPullTimeoutMillis | 10000 | 非長輪詢,拉消息超時時間,單位毫秒 |
messageModel | BROADCASTING | 消息支持兩種模式:集羣消費和廣播消費 |
messageQueueListener | 監聽隊列變化 | |
offsetStore | 消費進度存儲 | |
registerTopics | 註冊的topic集合 | |
allocateMessageQueueStrategy | AllocateMessageQueueAveragely | Rebalance算法實現策略 |
字段名 | 默認值 | 說明 |
---|---|---|
Topic | null | 必填,消息所屬topic的名稱 |
Body | null | 必填,消息體 |
Tags | null | 選填,消息標籤,方便服務器過濾使用。目前只支持每一個消息設置一個tag |
Keys | null | 選填,表明這條消息的業務關鍵詞,服務器會根據keys建立哈希索引,設置後,能夠在Console系統根據Topic、Keys來查詢消息,因爲是哈希索引,請儘量保證key惟一,例如訂單號,商品Id等。 |
Flag | 0 | 選填,徹底由應用來設置,RocketMQ不作干預 |
DelayTimeLevel | 0 | 選填,消息延時級別,0表示不延時,大於0會延時特定的時間纔會被消費 |
WaitStoreMsgOK | TRUE | 選填,表示消息是否在服務器落盤後才返回應答。 |
本小節主要介紹系統(JVM/OS)相關的配置。
推薦使用最新發布的JDK 1.8版本。經過設置相同的Xms和Xmx值來防止JVM調整堆大小以得到更好的性能。簡單的JVM配置以下所示:
-server -Xms8g -Xmx8g -Xmn4g
若是您不關心RocketMQ Broker的啓動時間,還有一種更好的選擇,就是經過「預觸摸」Java堆以確保在JVM初始化期間每一個頁面都將被分配。那些不關心啓動時間的人能夠啓用它:
-XX:+AlwaysPreTouch
禁用偏置鎖定可能會減小JVM暫停,
-XX:-UseBiasedLocking
至於垃圾回收,建議使用帶JDK 1.8的G1收集器。
-XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30
這些GC選項看起來有點激進,但事實證實它在咱們的生產環境中具備良好的性能。另外不要把-XX:MaxGCPauseMillis的值設置過小,不然JVM將使用一個小的年輕代來實現這個目標,這將致使很是頻繁的minor GC,因此建議使用rolling GC日誌文件:
-XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m
若是寫入GC文件會增長代理的延遲,能夠考慮將GC日誌文件重定向到內存文件系統:
-Xloggc:/dev/shm/mq_gc_%p.log123
os.sh腳本在bin文件夾中列出了許多內核參數,能夠進行微小的更改而後用於生產用途。下面的參數須要注意,更多細節請參考/proc/sys/vm/*的文檔