metaq最佳實踐

1. 前言

本文檔旨在描述RocketMQ使用過程當中的一些最佳實踐,建議用戶這樣作,可是非必須。java

2. Producer最佳實踐

2.1 發送消息注意事項

  1. 一個應用盡量用一個Topic,消息子類型用tags來標識,tags能夠由應用自由設置。只有發送消息設置了tags,消費方在訂閱消息時,才能夠利用tags在broker作消息過濾。message.setTags(「TagA」);web

  2. 每一個消息在業務層面的惟一標識碼,要設置到keys字段,方便未來定位消息丟失問題。服務器會爲每一個消息建立索引(哈希索引),應用能夠經過topic,key來查詢這條消息內容,以及消息被誰消費。因爲是哈希索引,請務必保證key儘量惟一,這樣能夠避免潛在的哈希衝突。
    // 訂單Id
    String orderId = 「20034568923546」;
    message.setKeys(orderId);sql

  3. 消息發送成功或者失敗,要打印消息日誌,務必要打印sendresult和key字段。數據庫

  4. send消息方法,只要不拋異常,就表明發送成功。可是發送成功會有多個狀態,在sendResult裏定義。後端

  • SEND_OK
    消息發送成功
  • FLUSH_DISK_TIMEOUT
    消息發送成功,可是服務器刷盤超時, 消息已經進入服務器隊列,只有此時服務器宕機,消息纔會丟失
  • FLUSH_SLAVE_TIMEOUT
    消息發送成功,可是服務器同步到Slave時超時, 消息已經進入服務器隊列,只有此時服務器宕機,消息纔會丟失
  • SLAVE_NOT_AVAILABLE
    消息發送成功,可是此時slave不可用, 消息已經進入服務器隊列,只有此時服務器宕機,消息纔會丟失

對於精衛發送順序消息的應用,因爲順序消息的侷限性,可能會涉及到主備自動切換問題,因此若是sendresult中的status字段不等於SEND_OK,就應該嘗試重試。對於其餘應用,則沒有必要這樣。服務器

  1. 對於消息不可丟失應用,務必要有消息重發機制
    例如若是消息發送失敗,存儲到數據庫,能有定時程序嘗試重發,或者人工觸發重發。

2.2消息發送失敗如何處理

Producer的send方法自己支持內部重試,重試邏輯以下:網絡

  1. 至多重試3次。
  2. 若是發送失敗,則輪轉到下一個Broker。
  3. 這個方法的總耗時時間不超過sendMsgTimeout設置的值,默認10s。因此,若是自己向broker發送消息產生超時異常,就不會再作重試。

以上策略仍然不能保證消息必定發送成功,爲保證消息必定成功,建議應用這樣作若是調用send同步方法發送失敗, 則嘗試將消息存儲到db,由後臺線程定時重試,保證消息必定到達Broker。運維

上述db重試方式爲何沒有集成到MQ客戶端內部作,而是要求應用本身去完成,咱們基於如下幾點考慮異步

  1. MQ的客戶端設計爲無狀態模式,方便任意的水平擴展,且對機器資源的消耗僅僅是cpu、內存、網絡。
  2. 若是MQ客戶端內部集成一個KV存儲模塊,那麼數據只有同步落盤才能較可靠,而同步落盤自己性能開銷較大,因此一般會採用異步落盤,又因爲應用關閉過程不受MQ運維人員控制,可能常常會發生kill -9這樣暴力方式關閉,形成數據沒有及時落盤而丟失。
  3. Producer所在機器的可靠性較低,通常爲虛擬機,不適合存儲重要數據。

綜上,建議重試過程交由應用來控制。socket

2.3選擇oneway形式發送

一個RPC調用,一般是這樣一個過程

  1. 客戶端發送請求到服務器
  2. 服務器處理該請求
  3. 服務器向客戶端返回應答因此一個RPC的耗時時間是上述三個步驟的總和,而某些場景要求耗時很是短,可是對可靠性要求並不高,例如日誌收集類應用,此類應用能夠採用oneway形式調用,oneway形式只發送請求不等待應答,而發送請求在客戶端實現層面僅僅是一個os系統調用的開銷,即將數據寫入客戶端的socket緩衝區,此過程耗時一般在微秒級。

3 Consumer最佳實踐

3.1消費過程要作到冪等(即消費端去重)

如《RocketMQ 原理簡介》中所述,RocketMQ沒法避免消息重複,因此若是業務對消費重複很是敏感,務必要在業務層面去重,有如下幾種去重方式

  1. 將消息的惟一鍵,能夠是msgId,也能夠是消息內容中的惟一標識字段,例如訂單Id等,消費以前判斷是否在Db或Tair(全局KV存儲)中存在,若是不存在則插入,並消費,不然跳過。(實際過程要考慮原子性問題,判斷是否存在能夠嘗試插入,若是報主鍵衝突,則插入失敗,直接跳過)
    msgId必定是全局惟一標識符,可是可能會存在一樣的消息有兩個不一樣msgId的狀況(有多種緣由),這種狀況可能會使業務上重複消費,建議最好使用消息內容中的惟一標識字段去重。
  2. 使用業務層面的狀態機去重

3.3消費速度慢處理方式

3.3.1提升消費並行度

絕大部分消息消費行爲屬於IO密集型,便可能是操做數據庫,或者調用RPC,這類消費行爲的消費速度在於後端數據庫或者外系統的吞吐量,經過增長消費並行度,能夠提升總的消費吞吐量,可是並行度增長到必定程度,反而會降低,如圖所示,呈現拋物線形式。因此應用必需要設置合理的並行度。CPU密集型應用除外。

3.3.2批量方式消費

某些業務流程若是支持批量方式消費,則能夠很大程度上提升消費吞吐量,例如訂單扣款類應用,一次處理一個訂單耗時1秒鐘,一次處理10個訂單可能也只耗時2秒鐘,這樣便可大幅度提升消費的吞吐量,經過設置consumer的consumeMessageBatchMaxSize這個參數,默認是1,即一次只消費一條消息,例如設置爲N,那麼每次消費的消息數小於等於N。

3.3.3跳過非重要消息

發生消息堆積時,若是消費速度一直追不上發送速度,能夠選擇丟棄不重要的消息
如何判斷消費發生了堆積?

public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { longoffset = msgs.get(0).getQueueOffset(); String maxOffset = msgs.get(0).getProperty(Message.PROPERTY_MAX_OFFSET); longdiff = Long.parseLong(maxOffset) - offset; if (diff > 100000) { // TODO消息堆積狀況的特殊處理 returnConsumeConcurrentlyStatus.CONSUME_SUCCESS; } // TODO正常消費過程 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { longoffset = msgs.get(0).getQueueOffset(); String maxOffset = msgs.get(0).getProperty(Message.PROPERTY_MAX_OFFSET); longdiff = Long.parseLong(maxOffset) - offset; if (diff > 100000) { // TODO消息堆積狀況的特殊處理 returnConsumeConcurrentlyStatus.CONSUME_SUCCESS; } // TODO正常消費過程 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }

如以上代碼所示,當某個隊列的消息數堆積到100000條以上,則嘗試丟棄部分或所有消息,這樣就能夠快速追上發送消息的速度。

3.3.4優化每條消息消費過程

舉例以下,某條消息的消費過程以下
1.根據消息從DB查詢數據1
2.根據消息從DB查詢數據2
3.複雜的業務計算
4.向DB插入數據3
5.向DB插入數據4
這條消息的消費過程與DB交互了4次,若是按照每次5ms計算,那麼總共耗時20ms,假設業務計算耗時5ms,那麼總過耗時25ms,若是能把4次DB交互優化爲2次,那麼總耗時就能夠優化到15ms,也就是說整體性能提升了40%。

對於Mysql等DB,若是部署在磁盤,那麼與DB進行交互,若是數據沒有命中cache,每次交互的RT會直線上升,若是採用SSD,則RT上升趨勢要明顯好於磁盤。個別應用可能會遇到這種狀況:在線下壓測消費過程當中,db表現很是好,每次RT都很短,可是上線運行一段時間,RT就會變長,消費吞吐量直線降低。

主要緣由是線下壓測時間太短,線上運行一段時間後,cache命中率降低,那麼RT就會增長。建議在線下壓測時,要測試足夠長時間,儘量模擬線上環境,壓測過程當中,數據的分佈也很重要,數據不一樣,可能cache的命中率也會徹底不一樣。3.4消費打印日誌若是消息量較少,建議在消費入口方法打印消息,方便後續排查問題。

3.4 消費打印日誌

若是消息量較少,建議在消費入口方法打印消息,方便後續排查問題。

public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { log.info("RECEIVE_MSG_BEGIN: " + msgs.toString()); // TODO正常消費過程 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;} } public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { log.info("RECEIVE_MSG_BEGIN: " + msgs.toString()); // TODO正常消費過程 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;} }

若是能打印每條消息消費耗時,那麼在排查消費慢等線上問題時,會更方便。

相關文章
相關標籤/搜索