RocketMQ消息隊列的最佳實踐

1 生產者

1.1 發送消息注意事項

1 Tags的使用

一個應用盡量用一個Topic,而消息子類型則能夠用tags來標識。java

tags可由應用自行設置,只有生產者在發送消息設置了tags,消費方在訂閱消息時才能夠利用tags經過broker作消息過濾:算法

message.setTags("TagA");

2 Keys的使用

每一個消息在業務層面的惟一標識碼要設置到keys字段,方便往後定位消息丟失問題。spring

服務器會爲每一個消息建立哈希索引,應用能夠經過topic、key來查詢這條消息內容,以及消息被誰消費。數據庫

哈希索引,請保證key儘量惟一,避免潛在的哈希衝突。後端

// 訂單Id   
String orderId = "20034568923546";   
message.setKeys(orderId);

3 日誌的打印

​消息發送成功或者失敗要打印消息日誌,務必要打印SendResult和key字段。send消息方法只要不拋異常,就表明發送成功。發送成功會有多個狀態,在sendResult裏定義。如下對每一個狀態進行說明:緩存

  • SEND_OK

消息發送成功。要注意的是消息發送成功也不意味着它是可靠的。要確保不會丟失任何消息,還應啓用同步Master服務器或同步刷盤,即SYNC_MASTER或SYNC_FLUSH。安全

  • FLUSH_DISK_TIMEOUT

消息發送成功可是服務器刷盤超時。此時消息已經進入服務器隊列(內存),只有服務器宕機,消息纔會丟失。消息存儲配置參數中能夠設置刷盤方式和同步刷盤時間長度,若是Broker服務器設置了刷盤方式爲同步刷盤,即FlushDiskType=SYNC_FLUSH(默認爲異步刷盤方式),當Broker服務器未在同步刷盤時間內(默認爲5s)完成刷盤,則將返回該狀態——刷盤超時。服務器

  • FLUSH_SLAVE_TIMEOUT

消息發送成功,可是服務器同步到Slave時超時。此時消息已經進入服務器隊列,只有服務器宕機,消息纔會丟失。若是Broker服務器的角色是同步Master,即SYNC_MASTER(默認是異步Master即ASYNC_MASTER),而且從Broker服務器未在同步刷盤時間(默認爲5秒)內完成與主服務器的同步,則將返回該狀態——數據同步到Slave服務器超時。網絡

  • SLAVE_NOT_AVAILABLE

消息發送成功,可是此時Slave不可用。若是Broker服務器的角色是同步Master,即SYNC_MASTER(默認是異步Master服務器即ASYNC_MASTER),但沒有配置slave Broker服務器,則將返回該狀態——無Slave服務器可用。數據結構

1.2 消息發送失敗處理方式

Producer的send方法自己支持內部重試,重試邏輯以下:

  • 至多重試2次(同步發送爲2次,異步發送爲0次)。
  • 若是發送失敗,則輪轉到下一個Broker。這個方法的總耗時時間不超過sendMsgTimeout設置的值,默認10s。
  • 若是自己向broker發送消息產生超時異常,就不會再重試。

必定程度上保證了消息能夠發送成功。若是業務對消息可靠性要求比較高,建議應用增長相應的重試邏輯:好比調用send同步方法發送失敗時,嘗試將消息存儲到DB,而後由後臺線程定時重試,確保消息必定到達Broker。

那麼DB重試方案爲何沒有集成到MQ客戶端內部,而要求應用本身完成?

  • MQ的客戶端設計爲無狀態模式,方便任意的水平擴展,且對機器資源的消耗僅僅是cpu、內存、網絡
  • 若是MQ客戶端內部集成一個KV存儲模塊,那麼數據只有同步落盤才能較可靠,而同步落盤自己性能開銷較大,因此一般會採用異步落盤,又因爲應用關閉過程不受MQ運維人員控制,可能常常會發生 kill -9 這樣暴力方式關閉,形成數據沒有及時落盤而丟失
  • Producer所在機器的可靠性較低,通常爲虛擬機,不適合存儲重要數據。綜上,推薦重試過程交由應用控制

1.3選擇oneway形式發送

消息發送過程:

  • 客戶端發送請求到服務器
  • 服務器處理請求
  • 服務器向客戶端返回應答

因此,一次消息發送的耗時時間是上述三個步驟的總和,而某些場景要求耗時很是短,可是對可靠性要求並不高,例如日誌收集類應用,此類應用能夠採用oneway形式調用,oneway形式只發送請求不等待應答,而發送請求在客戶端實現層面僅僅是一個os系統調用的開銷,即將數據寫入客戶端的socket緩衝區,此過程耗時一般在微秒級。

2 消費者

2.1 消費過程冪等

RocketMQ沒法避免消息重複(Exactly-Once),因此若是業務對消費重複很是敏感,務必要在業務層面進行去重處理。能夠藉助關係數據庫進行去重。首先須要肯定消息的惟一鍵,能夠是msgId,也能夠是消息內容中的惟一標識字段,例如訂單Id等。在消費以前判斷惟一鍵是否在關係數據庫中存在。若是不存在則插入,並消費,不然跳過。(實際過程要考慮原子性問題,判斷是否存在能夠嘗試插入,若是報主鍵衝突,則插入失敗,直接跳過)

msgId必定是全局惟一標識符,可是實際使用中,可能會存在相同的消息有兩個不一樣msgId的狀況(消費者主動重發、因客戶端重投機制致使的重複等),這種狀況就須要使業務字段進行重複消費。

2.2 消費速度慢的處理方式

1 提升消費並行度

絕大部分消息消費行爲都屬於 IO 密集型,便可能是操做數據庫,或者調用 RPC,這類消費行爲的消費速度在於後端數據庫或者外系統的吞吐量,經過增長消費並行度,能夠提升總的消費吞吐量,可是並行度增長到必定程度,反而會降低。因此,應用必需要設置合理的並行度。 以下有幾種修改消費並行度的方法:

  • 同一個 ConsumerGroup 下,經過增長 Consumer 實例數量來提升並行度(須要注意的是超過訂閱隊列數的 Consumer 實例無效)。能夠經過加機器,或者在已有機器啓動多個進程的方式。
  • 提升單個 Consumer 的消費並行線程,經過修改參數 consumeThreadMin、consumeThreadMax實現。

2 批量方式消費

某些業務流程若是支持批量方式消費,則能夠很大程度上提升消費吞吐量,例如訂單扣款類應用,一次處理一個訂單耗時 1 s,一次處理 10 個訂單可能也只耗時 2 s,這樣便可大幅度提升消費的吞吐量,經過設置 consumer的 consumeMessageBatchMaxSize 返個參數,默認是 1,即一次只消費一條消息,例如設置爲 N,那麼每次消費的消息數小於等於 N。

3 跳過非重要消息

發生消息堆積時,若是消費速度一直追不上發送速度,若是業務對數據要求不高的話,能夠選擇丟棄不重要的消息。例如,當某個隊列的消息數堆積到100000條以上,則嘗試丟棄部分或所有消息,這樣就能夠快速追上發送消息的速度。示例代碼以下:

public ConsumeConcurrentlyStatus consumeMessage(
            List<MessageExt> msgs,
            ConsumeConcurrentlyContext context) {
        long offset = msgs.get(0).getQueueOffset();
        String maxOffset =
                msgs.get(0).getProperty(Message.PROPERTY_MAX_OFFSET);
        long diff = Long.parseLong(maxOffset) - offset;
        if (diff > 100000) {
            // TODO 消息堆積狀況的特殊處理
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
        // TODO 正常消費過程
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }

4 優化每條消息消費過程

舉例以下,某條消息的消費過程以下:

  • 根據消息從 DB 查詢【數據 1】
  • 根據消息從 DB 查詢【數據 2】
  • 複雜的業務計算
  • 向 DB 插入【數據 3】
  • 向 DB 插入【數據 4】

這條消息的消費過程當中有4次與 DB的 交互,若是按照每次 5ms 計算,那麼總共耗時 20ms,假設業務計算耗時 5ms,那麼總過耗時 25ms,因此若是能把 4 次 DB 交互優化爲 2 次,那麼總耗時就能夠優化到 15ms,即整體性能提升了 40%。因此應用若是對時延敏感的話,能夠把DB部署在SSD硬盤,相比於SCSI磁盤,前者的RT會小不少。

2.3 消費打印日誌

若是消息量較少,建議在消費入口方法打印消息,消費耗時等,方便後續排查問題。

public ConsumeConcurrentlyStatus consumeMessage(
            List<MessageExt> msgs,
            ConsumeConcurrentlyContext context) {
        log.info("RECEIVE_MSG_BEGIN: " + msgs.toString());
        // TODO 正常消費過程
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }

若是能打印每條消息消費耗時,那麼在排查消費慢等線上問題時,會更方便。

2.4 其餘消費建議

1 關於消費者和訂閱

​第一件須要注意的事情是,不一樣的消費者組能夠獨立的消費一些 topic,而且每一個消費者組都有本身的消費偏移量,請確保同一組內的每一個消費者訂閱信息保持一致。

2 關於有序消息

消費者將鎖定每一個消息隊列,以確保他們被逐個消費,雖然這將會致使性能降低,可是當你關心消息順序的時候會頗有用。咱們不建議拋出異常,你能夠返回 ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT 做爲替代。

3 關於併發消費

顧名思義,消費者將併發消費這些消息,建議你使用它來得到良好性能,咱們不建議拋出異常,你能夠返回 ConsumeConcurrentlyStatus.RECONSUME_LATER 做爲替代。

4 關於消費狀態Consume Status

對於併發的消費監聽器,你能夠返回 RECONSUME_LATER 來通知消費者如今不能消費這條消息,而且但願能夠稍後從新消費它。而後,你能夠繼續消費其餘消息。對於有序的消息監聽器,由於你關心它的順序,因此不能跳過消息,可是你能夠返回SUSPEND_CURRENT_QUEUE_A_MOMENT 告訴消費者等待片刻。

5 關於Blocking

不建議阻塞監聽器,由於它會阻塞線程池,並最終可能會終止消費進程

6 關於線程數設置

消費者使用 ThreadPoolExecutor 在內部對消息進行消費,因此你能夠經過設置 setConsumeThreadMin 或 setConsumeThreadMax 來改變它。

7 關於消費位點

當創建一個新的消費者組時,須要決定是否須要消費已經存在於 Broker 中的歷史消息CONSUME_FROM_LAST_OFFSET 將會忽略歷史消息,並消費以後生成的任何消息。CONSUME_FROM_FIRST_OFFSET 將會消費每一個存在於 Broker 中的信息。你也可使用 CONSUME_FROM_TIMESTAMP 來消費在指定時間戳後產生的消息。

3 Broker

3.1 Broker 角色

​ Broker 角色分爲 ASYNC_MASTER(異步主機)、SYNC_MASTER(同步主機)以及SLAVE(從機)。若是對消息的可靠性要求比較嚴格,能夠採用 SYNC_MASTER加SLAVE的部署方式。若是對消息可靠性要求不高,能夠採用ASYNC_MASTER加SLAVE的部署方式。若是隻是測試方便,則能夠選擇僅ASYNC_MASTER或僅SYNC_MASTER的部署方式。

3.2 FlushDiskType

​ SYNC_FLUSH(同步刷新)相比於ASYNC_FLUSH(異步處理)會損失不少性能,可是也更可靠,因此須要根據實際的業務場景作好權衡。

3.3 Broker 配置

參數名 默認值 說明
listenPort 10911 接受客戶端鏈接的監聽端口
namesrvAddr null nameServer 地址
brokerIP1 網卡的 InetAddress 當前 broker 監聽的 IP
brokerIP2 跟 brokerIP1 同樣 存在主從 broker 時,若是在 broker 主節點上配置了 brokerIP2 屬性,broker 從節點會鏈接主節點配置的 brokerIP2 進行同步
brokerName null broker 的名稱
brokerClusterName DefaultCluster 本 broker 所屬的 Cluser 名稱
brokerId 0 broker id, 0 表示 master, 其餘的正整數表示 slave
storePathCommitLog $HOME/store/commitlog/ 存儲 commit log 的路徑
storePathConsumerQueue $HOME/store/consumequeue/ 存儲 consume queue 的路徑
mappedFileSizeCommitLog 1024 * 1024 * 1024(1G) commit log 的映射文件大小
deleteWhen 04 在天天的什麼時間刪除已經超過文件保留時間的 commit log
fileReservedTime 72 以小時計算的文件保留時間
brokerRole ASYNC_MASTER SYNC_MASTER/ASYNC_MASTER/SLAVE
flushDiskType ASYNC_FLUSH SYNC_FLUSH/ASYNC_FLUSH SYNC_FLUSH 模式下的 broker 保證在收到確認生產者以前將消息刷盤。ASYNC_FLUSH 模式下的 broker 則利用刷盤一組消息的模式,能夠取得更好的性能。

4 NameServer

​RocketMQ 中,Name Servers 被設計用來作簡單的路由管理。其職責包括:

  • Brokers 按期向每一個名稱服務器註冊路由數據。
  • 名稱服務器爲客戶端,包括生產者,消費者和命令行客戶端提供最新的路由信息。

5 客戶端配置

​ 相對於RocketMQ的Broker集羣,生產者和消費者都是客戶端。本小節主要描述生產者和消費者公共的行爲配置。

5.1 客戶端尋址方式

RocketMQ能夠令客戶端找到Name Server, 而後經過Name Server再找到Broker。以下所示有多種配置方式,優先級由高到低,高優先級會覆蓋低優先級。

  • 代碼中指定Name Server地址,多個namesrv地址之間用分號分割
producer.setNamesrvAddr("192.168.0.1:9876;192.168.0.2:9876");  

consumer.setNamesrvAddr("192.168.0.1:9876;192.168.0.2:9876");
  • Java啓動參數中指定Name Server地址
-Drocketmq.namesrv.addr=192.168.0.1:9876;192.168.0.2:9876
  • 環境變量指定Name Server地址
export   NAMESRV_ADDR=192.168.0.1:9876;192.168.0.2:9876
  • HTTP靜態服務器尋址(默認)

客戶端啓動後,會定時訪問一個靜態HTTP服務器,地址以下:http://jmenv.tbsite.net:8080/rocketmq/nsaddr,這個URL的返回內容以下:

192.168.0.1:9876;192.168.0.2:9876

客戶端默認每隔2分鐘訪問一次這個HTTP服務器,並更新本地的Name Server地址。URL已經在代碼中硬編碼,可經過修改/etc/hosts文件來改變要訪問的服務器,例如在/etc/hosts增長以下配置:

10.232.22.67    jmenv.taobao.net

推薦使用HTTP靜態服務器尋址方式,好處是客戶端部署簡單,且Name Server集羣能夠熱升級。

5.2 客戶端配置

DefaultMQProducer、TransactionMQProducer、DefaultMQPushConsumer、DefaultMQPullConsumer都繼承於ClientConfig類,ClientConfig爲客戶端的公共配置類。客戶端的配置都是get、set形式,每一個參數均可以用spring來配置,也能夠在代碼中配置,例如namesrvAddr這個參數能夠這樣配置,producer.setNamesrvAddr(「192.168.0.1:9876」),其餘參數同理。

1 客戶端的公共配置

參數名 默認值 說明
namesrvAddr   Name Server地址列表,多個NameServer地址用分號隔開
clientIP 本機IP 客戶端本機IP地址,某些機器會發生沒法識別客戶端IP地址狀況,須要應用在代碼中強制指定
instanceName DEFAULT 客戶端實例名稱,客戶端建立的多個Producer、Consumer實際是共用一個內部實例(這個實例包含網絡鏈接、線程資源等)
clientCallbackExecutorThreads 4 通訊層異步回調線程數
pollNameServerInteval 30000 輪詢Name Server間隔時間,單位毫秒
heartbeatBrokerInterval 30000 向Broker發送心跳間隔時間,單位毫秒
persistConsumerOffsetInterval 5000 持久化Consumer消費進度間隔時間,單位毫秒

2 Producer配置

參數名 默認值 說明
producerGroup DEFAULT_PRODUCER Producer組名,多個Producer若是屬於一個應用,發送一樣的消息,則應該將它們歸爲同一組
createTopicKey TBW102 在發送消息時,自動建立服務器不存在的topic,須要指定Key,該Key可用於配置發送消息所在topic的默認路由。
defaultTopicQueueNums 4 在發送消息,自動建立服務器不存在的topic時,默認建立的隊列數
sendMsgTimeout 10000 發送消息超時時間,單位毫秒
compressMsgBodyOverHowmuch 4096 消息Body超過多大開始壓縮(Consumer收到消息會自動解壓縮),單位字節
retryAnotherBrokerWhenNotStoreOK FALSE 若是發送消息返回sendResult,可是sendStatus!=SEND_OK,是否重試發送
retryTimesWhenSendFailed 2 若是消息發送失敗,最大重試次數,該參數只對同步發送模式起做用
maxMessageSize 4MB 客戶端限制的消息大小,超過報錯,同時服務端也會限制,因此須要跟服務端配合使用。
transactionCheckListener   事務消息回查監聽器,若是發送事務消息,必須設置
checkThreadPoolMinSize 1 Broker回查Producer事務狀態時,線程池最小線程數
checkThreadPoolMaxSize 1 Broker回查Producer事務狀態時,線程池最大線程數
checkRequestHoldMax 2000 Broker回查Producer事務狀態時,Producer本地緩衝請求隊列大小
RPCHook null 該參數是在Producer建立時傳入的,包含消息發送前的預處理和消息響應後的處理兩個接口,用戶能夠在第一個接口中作一些安全控制或者其餘操做。

3 PushConsumer配置

參數名 默認值 說明
consumerGroup DEFAULT_CONSUMER Consumer組名,多個Consumer若是屬於一個應用,訂閱一樣的消息,且消費邏輯一致,則應該將它們歸爲同一組
messageModel CLUSTERING 消費模型支持集羣消費和廣播消費兩種
consumeFromWhere CONSUME_FROM_LAST_OFFSET Consumer啓動後,默認從上次消費的位置開始消費,這包含兩種狀況:一種是上次消費的位置未過時,則消費從上次停止的位置進行;一種是上次消費位置已通過期,則從當前隊列第一條消息開始消費
consumeTimestamp 半個小時前 只有當consumeFromWhere值爲CONSUME_FROM_TIMESTAMP時才起做用。
allocateMessageQueueStrategy AllocateMessageQueueAveragely Rebalance算法實現策略
subscription   訂閱關係
messageListener   消息監聽器
offsetStore   消費進度存儲
consumeThreadMin 10 消費線程池最小線程數
consumeThreadMax 20 消費線程池最大線程數
consumeConcurrentlyMaxSpan 2000 單隊列並行消費容許的最大跨度
pullThresholdForQueue 1000 拉消息本地隊列緩存消息最大數
pullInterval 0 拉消息間隔,因爲是長輪詢,因此爲0,可是若是應用爲了流控,也能夠設置大於0的值,單位毫秒
consumeMessageBatchMaxSize 1 批量消費,一次消費多少條消息
pullBatchSize 32 批量拉消息,一次最多拉多少條

4 PullConsumer配置

參數名 默認值 說明
consumerGroup DEFAULT_CONSUMER Consumer組名,多個Consumer若是屬於一個應用,訂閱一樣的消息,且消費邏輯一致,則應該將它們歸爲同一組
brokerSuspendMaxTimeMillis 20000 長輪詢,Consumer拉消息請求在Broker掛起最長時間,單位毫秒
consumerTimeoutMillisWhenSuspend 30000 長輪詢,Consumer拉消息請求在Broker掛起超過指定時間,客戶端認爲超時,單位毫秒
consumerPullTimeoutMillis 10000 非長輪詢,拉消息超時時間,單位毫秒
messageModel BROADCASTING 消息支持兩種模式:集羣消費和廣播消費
messageQueueListener   監聽隊列變化
offsetStore   消費進度存儲
registerTopics   註冊的topic集合
allocateMessageQueueStrategy AllocateMessageQueueAveragely Rebalance算法實現策略

5 Message數據結構

字段名 默認值 說明
Topic null 必填,消息所屬topic的名稱
Body null 必填,消息體
Tags null 選填,消息標籤,方便服務器過濾使用。目前只支持每一個消息設置一個tag
Keys null 選填,表明這條消息的業務關鍵詞,服務器會根據keys建立哈希索引,設置後,能夠在Console系統根據Topic、Keys來查詢消息,因爲是哈希索引,請儘量保證key惟一,例如訂單號,商品Id等。
Flag 0 選填,徹底由應用來設置,RocketMQ不作干預
DelayTimeLevel 0 選填,消息延時級別,0表示不延時,大於0會延時特定的時間纔會被消費
WaitStoreMsgOK TRUE 選填,表示消息是否在服務器落盤後才返回應答。

6 系統配置

本小節主要介紹系統(JVM/OS)相關的配置。

6.1 JVM選項

​ 推薦使用最新發布的JDK 1.8版本。經過設置相同的Xms和Xmx值來防止JVM調整堆大小以得到更好的性能。簡單的JVM配置以下所示:

​ ​-server -Xms8g -Xmx8g -Xmn4g ​


若是您不關心RocketMQ Broker的啓動時間,還有一種更好的選擇,就是經過「預觸摸」Java堆以確保在JVM初始化期間每一個頁面都將被分配。那些不關心啓動時間的人能夠啓用它:
​ -XX:+AlwaysPreTouch
禁用偏置鎖定可能會減小JVM暫停,
​ -XX:-UseBiasedLocking
至於垃圾回收,建議使用帶JDK 1.8的G1收集器。

-XX:+UseG1GC -XX:G1HeapRegionSize=16m   
-XX:G1ReservePercent=25 
-XX:InitiatingHeapOccupancyPercent=30

​ 這些GC選項看起來有點激進,但事實證實它在咱們的生產環境中具備良好的性能。另外不要把-XX:MaxGCPauseMillis的值設置過小,不然JVM將使用一個小的年輕代來實現這個目標,這將致使很是頻繁的minor GC,因此建議使用rolling GC日誌文件:

-XX:+UseGCLogFileRotation   
-XX:NumberOfGCLogFiles=5 
-XX:GCLogFileSize=30m

若是寫入GC文件會增長代理的延遲,能夠考慮將GC日誌文件重定向到內存文件系統:

-Xloggc:/dev/shm/mq_gc_%p.log123

6.2 Linux內核參數

​ os.sh腳本在bin文件夾中列出了許多內核參數,能夠進行微小的更改而後用於生產用途。下面的參數須要注意,更多細節請參考/proc/sys/vm/*的文檔

  • vm.extra_free_kbytes,告訴VM在後臺回收(kswapd)啓動的閾值與直接回收(經過分配進程)的閾值之間保留額外的可用內存。RocketMQ使用此參數來避免內存分配中的長延遲。(與具體內核版本相關)
  • vm.min_free_kbytes,若是將其設置爲低於1024KB,將會巧妙的將系統破壞,而且系統在高負載下容易出現死鎖。
  • vm.max_map_count,限制一個進程可能具備的最大內存映射區域數。RocketMQ將使用mmap加載CommitLog和ConsumeQueue,所以建議將爲此參數設置較大的值。(agressiveness --> aggressiveness)
  • vm.swappiness,定義內核交換內存頁面的積極程度。較高的值會增長***性,較低的值會減小交換量。建議將值設置爲10來避免交換延遲。
  • File descriptor limits,RocketMQ須要爲文件(CommitLog和ConsumeQueue)和網絡鏈接打開文件描述符。咱們建議設置文件描述符的值爲655350。
  • Disk scheduler,RocketMQ建議使用I/O截止時間調度器,它試圖爲請求提供有保證的延遲。
相關文章
相關標籤/搜索