RocketMQ最佳實踐

1 Producer

  1. 一個應用盡量用一個Topic,消息子類型用tags來標識,tags能夠由應用自由設置

只有發送消息設置了tags,消費方在訂閱消息時,才能夠利用tags在broker作消息過濾java

message.setTags("TagA");
  1. 若有可靠性須要,消息發送成功或者失敗,要打印消息日誌(sendresult和key信 息)
  2. 若是相同性質的消息量大,使用批量消息,能夠提高性能
  3. 建議消息大小不超過512KB
  4. send(msg)會阻塞,若是有性能要求,可使用異步的方式: send(msg, callback)
  5. 若是在一個JVM中,有多個生產者進行大數據處理,建議:

● 少數生產者使用異步發送方式(3~5個就夠了)
● 經過setInstanceName方法,給每一個生產者設置一個實例名shell

  1. send消息方法,只要不拋異常,就表明發送成功 , 可是發送成功會有多個狀態, 在sendStatus類裏定義


● SEND_ OK : 消息發送成功
● FLUSH_ DISK_ _TIMEOUT: 消息發送成功, 可是服務器刷盤超時,消息已經進入
服務器隊列,只有此時服務器宕機,消息纔會丟失
● FLUSH_ SLAVE_ TIMEOUT: 消息發送成功,可是服務器同步到Slave時超時,
消息已經進入服務器隊列,只有此時服務器宕機,消息纔會丟失
● SLAVE_ NOT_ AVAILABLE: 消息發送成功, 可是此時slave不可用, 消息已經進入服務器隊列,只有此時服務器宕機,消息纔會丟失編程

● 若是狀態是FLUSH_ DISK_ TIMEOUT或FLUSH SLAVE_ _TIMEOUT,而且Broker正好關閉
此時,能夠丟棄這條消息,或者重發。但建議最好重發,由消費端去重segmentfault

● Producer向Broker發送請求會等待響應,但若是達到最大等待時間,未獲得響應,則客戶端將拋出RemotingTimeoutException
● 默認等待時間是3秒,若是使用send(msg, timeout),則能夠本身設定超時時間,
但超時時間不能設置過小,應爲Borker須要一些時間來刷新磁盤或與從屬設備同步
● 若是該值超過syncFlushTimeout,則該值可能影響不大,由於Broker可能會在超時以前返回FLUSH_ SLAVE_ TIMEOUT或FLUSH_ SLAVE_ TIMEOUT的響應bash

  1. 對於消息不可丟失應用,務必要有消息重發機制

Producer的send方法自己支持內部重試:
● 至多重試3次
● 若是發送失敗,則輪轉到下一-個Broker
● 這個方法的總耗時時間不超過sendMsgTimeout設置的值,默認10s
因此,若是自己向broker發送消息產生超時異常,就不會再作重試服務器

以上策略仍然不能保證消息必定發送成功,爲保證消息必定成功,建議將消息存儲到db,由後臺線程定時重試,保證消息必定到達Broker

2 Consumer

每一個消息在業務層面的惟一標識碼,要設置到keys字段,方便未來定位消息丟失問題dom

服務器會爲每一個消息建立索引(哈希索引),應用能夠經過topic, key來查詢這條消息內容,以及消息被誰消費異步

因爲是哈希索引,請務必保證key儘量惟一,這樣能夠避免潛在的哈希衝突maven

String orderld =「1250689524981";
message.setKeys(orderld);

console客戶端使GUI
分佈式

  • mvn clean package -Dmaven.test.skip=true
--server.port=8081 --rocketmq.config. namesrvAddr=192.168.1.17:9876

2.1 消費者組和訂閱

不一樣的消費羣體能夠獨立地消費一樣的主題,而且每一個消費者都有本身的消費偏移量(offsets) 。

確保同一組中的每一個消費者訂閱相同的主題

2.2 消息監聽器(MessageListener)

2.2.1 順序 (Orderly)

消費者將鎖定每一個MessageQueue,以確保每一個消息被一個按順序使用。
這將致使性能損失

若是關心消息的順序時,它就頗有用了。不建議拋出異常,能夠返回
ConsumeOrderlyStatus. SUSPEND_ CURRENT_ QUEUE_ A_ MOMENT代替

2.2.2 消費狀態(Consume Status)


對於MessageListenerConcurrently,能夠返回RECONSUME_ LATER告訴消費者,當前不能消費它而且但願之後從新消費。而後能夠繼續使用其餘消息


對於MessageListenerOrderly, 若是關心順序,就不能跳過消息,能夠返回SUSPEND_ CURRENT_ QUEUE_ A_ MOMENT來告訴消費者等待片刻。

阻塞(Blocking)

不建議阻塞Listener,由於它會阻塞線程池,最終可能會中止消費程序

線程數

DefaultMQPushConsumer
消費者使用一個ThreadPoolExecutor來處理內部的消費,所以能夠經過設

更改它

從何處開始消費

● 當創建一個新的Consumer Group時,須要決定是否須要消費Broker中已經
存在的歷史消息。
● CONSUME_ FROM LAST_ OFFSET將忽略歷史消息,並消費此後生成的任何
內容。
● CONSUME_ FROM_ FIRST_ OFFSET將消耗Broker中存在的全部消息。還可使用CONSUME_ FROM_ TIMESTAMP 來消費在指定的時間戳以後生成的消息。

重複(冪等性)

RocketMQ沒法避免消息重複,若是業務對重複消費很是敏感,務必在業務層面作去重:
● 經過記錄消息惟一鍵進行去重
● 使用業務層面的狀態機制去重

3 最佳實踐之 NameServer

在Apache RocketMQ中,NameServer用於協調分佈式系統的每一個組件,主要經過管理主題路由信息來實現協調。

管理由兩部分組成:

  1. Brokers按期更新保存在每一個名稱服務器中的元數據
  2. 名稱服務器是爲客戶端提供最新的路由信息服務的,包括生產者、消費者和命令行客戶端。

所以,在啓動brokers和clients以前,咱們須要告訴他們如何經過給他們提
供的一個名稱服務器地址列表來訪問名稱服務器。

在Apache RocketMQ中,能夠用四種方式完成。

3.1 編程方式

  • 對於brokers,咱們能夠在broker的配置文件中指定
namesrvAddr=name-server-ip1:port;name-server-ip2:port
  • 對於生產者和消費者,咱們能夠給他們提供姓名服務器地址列表以下:
DefaultMQProducer producer = new DefaultMQProducer(" please_ rename_ unique_ group name");
producer.setNamesrvAddr("name-server1-ip:port;name-server2-ip:port"); 
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(" please_ rename_ unique_ _group_ name");
consumer.setNamesrvAddr(" name-server1-ip:port;name-server2-ip:port");
  • 若是從shell中使用管理命令行,也能夠這樣指定:
sh mqadmin command-name -n name-server-ip1:port;name-server-ip2:port -X OTHER-OPTION
  • 一個簡單的例子,在NameServer節點上查詢集羣信息:
sh mqadmin -n localhost:9876 clusterList
  • 若是將管理工具集成到本身的項目中,能夠這樣
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(" please_ rename_ _unique_ group_ _name");
defaultMQAdminExt.setNamesrvAddr("name-server1-ip:port;name-server2-ip:port");

3.2 Java參數

NameServer的地址列表也能夠經過java參數rocketmq.namesrv.addr
在啓動以前指定

3.3 環境變量

能夠設置NAMESRV_ ADDR環境變量。若是設置了,Broker和clients將檢 查並使用其值

3.4 HTTP端點(HTTP Endpoint)

若是沒有使用前面提到的方法指定NameServer地址列表,Apache RocketMQ將每2分鐘發送一次HTTP請求,以獲取和更新NameServer地址列表,初始延遲10秒。

默認狀況下,訪問的HTTP地址是:

http://jmenv.tbsite.net:8080/rocketmq/nsaddr

經過Java參數rocketmq.namesrv.domain,能夠修改jmenv.tbsite.net
經過Java參數rocketmq.namesrv.domain.subgroup,能夠修改nsaddr

3.5 優先級

編程方式> Java參數>環境變量> HTTP方式

4 JVM與Linux內核配置

4.1 JVM配置

推薦使用JDK 1.8版本,使用服務器編譯器和8g堆。
設置相同的Xms和Xmx值,以防止JVM動態調整堆大小以得到更好的性能。

簡單的JVM配置以下所示:

-server -Xms8g -Xmx8g -Xmn4g

若是不關心Broker的啓動時間,能夠預先觸摸Java堆,以確保在JVM初始化期間分配頁是更好的選擇。

-XX:+AlwaysPreTouch
  • 禁用偏置鎖定可能會減小JVM暫停:
-XX: UseBiasedL ocking
  • 對於垃圾回收,建議使用G1收集器:
-XX:+UseG1GC -XX:G1HeapRegionSize= 16m -XX:G lReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30

這些GC選項看起來有點激進,但事實證實它在生產環境中具備良好的性能。

-XX:MaxGCPauseMillis不要設置過小的值,不然JVM將使用一個小的新生代,這將致使很是頻繁的新生代GC。

  • 推薦使用滾動GC日誌文件:
-XX:+UseGCLogFileRotation -Xx:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m
  • 若是寫入GC文件會增長代理的延遲,請將重定向GC日誌文件考慮在內存文件系統中:
-Xloggc:/dev/shm/mq_ gc. _%p.log

4.2 Linux內核配置

  • 在bin目錄中,有一個os.sh腳本列出了許多內核參數,只須要稍微的修改,就能夠用於生產環境。
  • 如下參數須要注意,詳細信息請參考

https://www.kernel.org/doc/Do...

本文由博客一文多發平臺 OpenWrite 發佈!
相關文章
相關標籤/搜索