只有發送消息設置了tags,消費方在訂閱消息時,才能夠利用tags在broker作消息過濾java
message.setTags("TagA");
● 少數生產者使用異步發送方式(3~5個就夠了)
● 經過setInstanceName方法,給每一個生產者設置一個實例名shell
sendStatus
類裏定義
● SEND_ OK : 消息發送成功
● FLUSH_ DISK_ _TIMEOUT: 消息發送成功, 可是服務器刷盤超時,消息已經進入
服務器隊列,只有此時服務器宕機,消息纔會丟失
● FLUSH_ SLAVE_ TIMEOUT: 消息發送成功,可是服務器同步到Slave時超時,
消息已經進入服務器隊列,只有此時服務器宕機,消息纔會丟失
● SLAVE_ NOT_ AVAILABLE: 消息發送成功, 可是此時slave不可用, 消息已經進入服務器隊列,只有此時服務器宕機,消息纔會丟失編程
● 若是狀態是FLUSH_ DISK_ TIMEOUT或FLUSH SLAVE_ _TIMEOUT,而且Broker正好關閉
此時,能夠丟棄這條消息,或者重發。但建議最好重發,由消費端去重segmentfault
● Producer向Broker發送請求會等待響應,但若是達到最大等待時間,未獲得響應,則客戶端將拋出RemotingTimeoutException
● 默認等待時間是3秒,若是使用send(msg, timeout),則能夠本身設定超時時間,
但超時時間不能設置過小,應爲Borker須要一些時間來刷新磁盤或與從屬設備同步
● 若是該值超過syncFlushTimeout,則該值可能影響不大,由於Broker可能會在超時以前返回FLUSH_ SLAVE_ TIMEOUT或FLUSH_ SLAVE_ TIMEOUT的響應bash
Producer的send方法自己支持內部重試:
● 至多重試3次
● 若是發送失敗,則輪轉到下一-個Broker
● 這個方法的總耗時時間不超過sendMsgTimeout設置的值,默認10s
因此,若是自己向broker發送消息產生超時異常,就不會再作重試服務器
以上策略仍然不能保證消息必定發送成功,爲保證消息必定成功,建議將消息存儲到db,由後臺線程定時重試,保證消息必定到達Broker
每一個消息在業務層面的惟一標識碼,要設置到keys字段,方便未來定位消息丟失問題dom
服務器會爲每一個消息建立索引(哈希索引),應用能夠經過topic, key來查詢這條消息內容,以及消息被誰消費異步
因爲是哈希索引,請務必保證key儘量惟一,這樣能夠避免潛在的哈希衝突maven
String orderld =「1250689524981"; message.setKeys(orderld);
console客戶端使GUI
分佈式
--server.port=8081 --rocketmq.config. namesrvAddr=192.168.1.17:9876
不一樣的消費羣體能夠獨立地消費一樣的主題,而且每一個消費者都有本身的消費偏移量(offsets) 。
確保同一組中的每一個消費者訂閱相同的主題
消費者將鎖定每一個MessageQueue,以確保每一個消息被一個按順序使用。
這將致使性能損失
若是關心消息的順序時,它就頗有用了。不建議拋出異常,能夠返回
ConsumeOrderlyStatus. SUSPEND_ CURRENT_ QUEUE_ A_ MOMENT代替
對於MessageListenerConcurrently,能夠返回RECONSUME_ LATER告訴消費者,當前不能消費它而且但願之後從新消費。而後能夠繼續使用其餘消息
對於MessageListenerOrderly, 若是關心順序,就不能跳過消息,能夠返回SUSPEND_ CURRENT_ QUEUE_ A_ MOMENT來告訴消費者等待片刻。
不建議阻塞Listener,由於它會阻塞線程池,最終可能會中止消費程序
DefaultMQPushConsumer
消費者使用一個ThreadPoolExecutor來處理內部的消費,所以能夠經過設
置
更改它
● 當創建一個新的Consumer Group時,須要決定是否須要消費Broker中已經
存在的歷史消息。
● CONSUME_ FROM LAST_ OFFSET將忽略歷史消息,並消費此後生成的任何
內容。
● CONSUME_ FROM_ FIRST_ OFFSET將消耗Broker中存在的全部消息。還可使用CONSUME_ FROM_ TIMESTAMP 來消費在指定的時間戳以後生成的消息。
RocketMQ沒法避免消息重複,若是業務對重複消費很是敏感,務必在業務層面作去重:
● 經過記錄消息惟一鍵進行去重
● 使用業務層面的狀態機制去重
在Apache RocketMQ中,NameServer用於協調分佈式系統的每一個組件,主要經過管理主題路由信息
來實現協調。
管理由兩部分組成:
所以,在啓動brokers和clients以前,咱們須要告訴他們如何經過給他們提
供的一個名稱服務器地址列表來訪問名稱服務器。
在Apache RocketMQ中,能夠用四種方式完成。
namesrvAddr=name-server-ip1:port;name-server-ip2:port
DefaultMQProducer producer = new DefaultMQProducer(" please_ rename_ unique_ group name"); producer.setNamesrvAddr("name-server1-ip:port;name-server2-ip:port"); DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(" please_ rename_ unique_ _group_ name"); consumer.setNamesrvAddr(" name-server1-ip:port;name-server2-ip:port");
sh mqadmin command-name -n name-server-ip1:port;name-server-ip2:port -X OTHER-OPTION
sh mqadmin -n localhost:9876 clusterList
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(" please_ rename_ _unique_ group_ _name"); defaultMQAdminExt.setNamesrvAddr("name-server1-ip:port;name-server2-ip:port");
NameServer的地址列表也能夠經過java參數rocketmq.namesrv.addr
在啓動以前指定
能夠設置NAMESRV_ ADDR環境變量。若是設置了,Broker和clients將檢 查並使用其值
若是沒有使用前面提到的方法指定NameServer地址列表,Apache RocketMQ將每2分鐘發送一次HTTP請求,以獲取和更新NameServer地址列表,初始延遲10秒。
默認狀況下,訪問的HTTP地址是:
http://jmenv.tbsite.net:8080/rocketmq/nsaddr
經過Java參數rocketmq.namesrv.domain,能夠修改jmenv.tbsite.net
經過Java參數rocketmq.namesrv.domain.subgroup,能夠修改nsaddr
編程方式> Java參數>環境變量> HTTP方式
推薦使用JDK 1.8版本,使用服務器編譯器和8g堆。
設置相同的Xms和Xmx值,以防止JVM動態調整堆大小以得到更好的性能。
簡單的JVM配置以下所示:
-server -Xms8g -Xmx8g -Xmn4g
若是不關心Broker的啓動時間,能夠預先觸摸Java堆,以確保在JVM初始化期間分配頁是更好的選擇。
-XX:+AlwaysPreTouch
-XX: UseBiasedL ocking
-XX:+UseG1GC -XX:G1HeapRegionSize= 16m -XX:G lReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30
這些GC選項看起來有點激進,但事實證實它在生產環境中具備良好的性能。
-XX:MaxGCPauseMillis不要設置過小的值,不然JVM將使用一個小的新生代,這將致使很是頻繁的新生代GC。
-XX:+UseGCLogFileRotation -Xx:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m
-Xloggc:/dev/shm/mq_ gc. _%p.log
https://www.kernel.org/doc/Do...
本文由博客一文多發平臺 OpenWrite 發佈!