本文檔旨在描述RocketMQ使用過程當中的一些最佳實踐,建議用戶這樣作,可是非必須。java
一個應用盡量用一個Topic,消息子類型用tags來標識,tags能夠由應用自由設置。只有發送消息設置了tags,消費方在訂閱消息時,才能夠利用tags在broker作消息過濾。message.setTags(「TagA」);web
每一個消息在業務層面的惟一標識碼,要設置到keys字段,方便未來定位消息丟失問題。服務器會爲每一個消息建立索引(哈希索引),應用能夠經過topic,key來查詢這條消息內容,以及消息被誰消費。因爲是哈希索引,請務必保證key儘量惟一,這樣能夠避免潛在的哈希衝突。
// 訂單Id
String orderId = 「20034568923546」;
message.setKeys(orderId);sql
消息發送成功或者失敗,要打印消息日誌,務必要打印sendresult和key字段。數據庫
send消息方法,只要不拋異常,就表明發送成功。可是發送成功會有多個狀態,在sendResult裏定義。後端
對於精衛發送順序消息的應用,因爲順序消息的侷限性,可能會涉及到主備自動切換問題,因此若是sendresult中的status字段不等於SEND_OK,就應該嘗試重試。對於其餘應用,則沒有必要這樣。服務器
Producer的send方法自己支持內部重試,重試邏輯以下:網絡
以上策略仍然不能保證消息必定發送成功,爲保證消息必定成功,建議應用這樣作若是調用send同步方法發送失敗, 則嘗試將消息存儲到db,由後臺線程定時重試,保證消息必定到達Broker。運維
上述db重試方式爲何沒有集成到MQ客戶端內部作,而是要求應用本身去完成,咱們基於如下幾點考慮異步
綜上,建議重試過程交由應用來控制。socket
一個RPC調用,一般是這樣一個過程
如《RocketMQ 原理簡介》中所述,RocketMQ沒法避免消息重複,因此若是業務對消費重複很是敏感,務必要在業務層面去重,有如下幾種去重方式
絕大部分消息消費行爲屬於IO密集型,便可能是操做數據庫,或者調用RPC,這類消費行爲的消費速度在於後端數據庫或者外系統的吞吐量,經過增長消費並行度,能夠提升總的消費吞吐量,可是並行度增長到必定程度,反而會降低,如圖所示,呈現拋物線形式。因此應用必需要設置合理的並行度。CPU密集型應用除外。
某些業務流程若是支持批量方式消費,則能夠很大程度上提升消費吞吐量,例如訂單扣款類應用,一次處理一個訂單耗時1秒鐘,一次處理10個訂單可能也只耗時2秒鐘,這樣便可大幅度提升消費的吞吐量,經過設置consumer的consumeMessageBatchMaxSize這個參數,默認是1,即一次只消費一條消息,例如設置爲N,那麼每次消費的消息數小於等於N。
發生消息堆積時,若是消費速度一直追不上發送速度,能夠選擇丟棄不重要的消息
如何判斷消費發生了堆積?
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { longoffset = msgs.get(0).getQueueOffset(); String maxOffset = msgs.get(0).getProperty(Message.PROPERTY_MAX_OFFSET); longdiff = Long.parseLong(maxOffset) - offset; if (diff > 100000) { // TODO消息堆積狀況的特殊處理 returnConsumeConcurrentlyStatus.CONSUME_SUCCESS; } // TODO正常消費過程 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { longoffset = msgs.get(0).getQueueOffset(); String maxOffset = msgs.get(0).getProperty(Message.PROPERTY_MAX_OFFSET); longdiff = Long.parseLong(maxOffset) - offset; if (diff > 100000) { // TODO消息堆積狀況的特殊處理 returnConsumeConcurrentlyStatus.CONSUME_SUCCESS; } // TODO正常消費過程 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }
如以上代碼所示,當某個隊列的消息數堆積到100000條以上,則嘗試丟棄部分或所有消息,這樣就能夠快速追上發送消息的速度。
舉例以下,某條消息的消費過程以下
1.根據消息從DB查詢數據1
2.根據消息從DB查詢數據2
3.複雜的業務計算
4.向DB插入數據3
5.向DB插入數據4
這條消息的消費過程與DB交互了4次,若是按照每次5ms計算,那麼總共耗時20ms,假設業務計算耗時5ms,那麼總過耗時25ms,若是能把4次DB交互優化爲2次,那麼總耗時就能夠優化到15ms,也就是說整體性能提升了40%。
對於Mysql等DB,若是部署在磁盤,那麼與DB進行交互,若是數據沒有命中cache,每次交互的RT會直線上升,若是採用SSD,則RT上升趨勢要明顯好於磁盤。個別應用可能會遇到這種狀況:在線下壓測消費過程當中,db表現很是好,每次RT都很短,可是上線運行一段時間,RT就會變長,消費吞吐量直線降低。
主要緣由是線下壓測時間太短,線上運行一段時間後,cache命中率降低,那麼RT就會增長。建議在線下壓測時,要測試足夠長時間,儘量模擬線上環境,壓測過程當中,數據的分佈也很重要,數據不一樣,可能cache的命中率也會徹底不一樣。3.4消費打印日誌若是消息量較少,建議在消費入口方法打印消息,方便後續排查問題。
若是消息量較少,建議在消費入口方法打印消息,方便後續排查問題。
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { log.info("RECEIVE_MSG_BEGIN: " + msgs.toString()); // TODO正常消費過程 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;} }
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { log.info("RECEIVE_MSG_BEGIN: " + msgs.toString()); // TODO正常消費過程 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;} }
若是能打印每條消息消費耗時,那麼在排查消費慢等線上問題時,會更方便。