RocketMQ實戰(二)

在上一篇《RocketMQ實戰(一)》中已經爲你們初步介紹了下RocketMQ以及搭建了雙Master環境,接下來繼續爲你們介紹!網絡

Quick Start

寫一個簡單的生產者、消費者,帶你們快速體驗RocketMQ~app


Maven配置:負載均衡

wKioL1j0xAiTk3gjAAAS3SZK_Ow748.png


生產者:ide

wKiom1j0xJnDiVJfAAB1w6cONE4056.png

消費者:
ui


wKioL1j0xLWSH6jSAACcSdQYm7Y283.png

不管生產者、消費者都必須給出GroupName,並且具備惟一性!spa

生產到哪一個Topic的哪一個Tag下,消費者也是從Topic的哪一個Tag進行消費,可見這個Tag有點相似於JMS Selector機制,即實現消息的過濾。線程

生產者、消費者須要設置NameServer地址。3d

這裏,採用的是Consumer Push的方式,即設置Listener機制回調,至關於開啓了一個線程。之後爲你們介紹Consumer Pull的方式。日誌


咱們看一下運行結果:orm

wKioL1j0xPaCxhzcAABchYPgkjQ427.png

仔細看看生產者結果輸出,你會發現,有的消息發往broker-a,有的在broker-b上,自動實現了消息的負載均衡!

wKiom1j0xRiSK1UjAAAomhW9rLk688.png


這裏消費消息是沒有什麼順序的,之後咱們在來談消息的順序性。

咱們再來看一看管控臺:

wKioL1j0xUqyT_TYAABlcVWCvNc900.png

wKiom1j0xWawZ7e-AAB4z7SttCE144.png

在多Master模式中,若是某個Master進程掛了,顯然這臺broker將不可用,上面的消息也將沒法消費,要知道開源版本的RocketMQ是沒有提供切換程序,來自動恢復故障的,所以在實際開發中,咱們通常提供一個監聽程序,用於監控Master的狀態。

在ActiveMQ中,生產消息的時候會提供是否持久化的選擇,可是對於RocketMQ而言,消息是必定會被持久化的!

上面的消費者採用的是Push Consumer的方式,那麼監聽的Listener中的消息List究竟是多少條呢?雖然提供了API,如consumer.setConsumeMessageBatchMaxSize(10),實際上即便設置了批量的條數,可是注意了,是最大是10,並不意味着每次batch的都是10,只有在消息有擠壓的狀況下才有可能。並且Push Consumer的最佳實踐方式就是一條條的消費,若是須要batch,可使用Pull Consumer。

務必保證先啓動消費者進行Topic訂閱,而後在啓動生產者進行生產(不然極有可能致使消息的重複消費,重複消費,重複消費!重要的事情說三遍!關於消息的重複問題後續給你們介紹~)。並且在實際開發中,有時候不會批量的處理消息,而是原子性的,單線程的去一條一條的處理消息,這樣就是實時的在處理消息。(批量的處理海量的消息,能夠考慮Kafka)


初步瞭解消息失敗重試機制

消息失敗,無非涉及到2端:從生產者端發往MQ的失敗;消費者端從MQ消費消息的失敗;

生產者端的失敗重試

wKiom1j0xbmwIoXyAABfKnUa1Es539.png


生產者端的消息失敗:好比網絡抖動致使生產者發送消息到MQ失敗。

上圖代碼示例的處理手段是:若是該條消息在1S內沒有發送成功,那麼重試3次。


消費者端的失敗重試

消費者端的失敗,分爲2種狀況,一個是timeout,一個是exception

timeout,好比因爲網絡緣由致使消息壓根就沒有從MQ到消費者上,在RocketMQ內部會不斷的嘗試發送這條消息,直至發送成功爲止!(好比集羣中一個broker失敗,就嘗試另外一個broker)

exception,消息正常的到了消費者,結果消費者發生異常,處理失敗了。這裏涉及到一些問題,須要咱們思考下,好比,消費者消費消息的狀態有哪些定義?若是失敗,MQ將採起什麼策略進行重試?假設一次性批量PUSH了10條,其中某條數據消費異常,那麼消息重試是10條呢,仍是1條呢?並且在重試的過程當中,須要保證不重複消費嗎?

wKiom1j0xhizGpa1AAAh4Zdnhz8872.png


消息消費的狀態,有2種,一個是成功(CONSUME_SUCCESS),一個是失敗&稍後重試(RECONSUME_LATER)


wKioL1j0xlKgxHKiAAAsLq17Zts611.png


在啓動broker的過程當中,能夠觀察下日誌,你會發現RECONSUME_LATER的策略。

若是消費失敗,那麼1S後再次消費,若是失敗,那麼5S後,再次消費,......直至2H後若是消費還失敗,那麼該條消息就會終止發送給消費者了!

RocketMQ爲咱們提供了這麼屢次數的失敗重試,可是在實際中也許咱們並不須要這麼多重試,好比重試3次,尚未成功,咱們但願把這條消息存儲起來並採用另外一種方式處理,並且但願RocketMQ不要在重試呢,由於重試解決不了問題了!這該如何作呢?


咱們先來看一下一條消息MessageExt對象的輸出:

MessageExt [queueId=0, storeSize=137, queueOffset=0, sysFlag=0, bornTimestamp=1492213846916, bornHost=/192.168.99.219:50478, storeTimestamp=1492213846981, storeHost=/192.168.99.121:10911, msgId=C0A8637900002A9F0000000000000000, commitLogOffset=0, bodyCRC=613185359, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest2, flag=0, properties={TAGS=TagA, WAIT=true, MAX_OFFSET=3, MIN_OFFSET=0}, body=16]]

注意到reconsumeTimes屬性,這個屬性就表明消息重試的次數!來看一段代碼:

wKioL1j0xozDthM0AACJ7SsgW48295.png


注意了,對於消費消息而言,存在2種指定的狀態(成功 OR 失敗重試),若是一條消息在消費端處理沒有返回這2個狀態,那麼至關於這條消息沒有達到消費者,勢必會再次發送給消費者!也便是消息的處理必須有返回值,不然就進行重發。


自然的消息負載均衡及高效的水平擴展機制

wKiom1j0xrbRP7mCAABGU6_EqJ0970.png


對於RocketMQ而言,經過ConsumeGroup的機制,實現了自然的消息負載均衡!通俗點來講,RocketMQ中的消息經過ConsumeGroup實現了將消息分發到C1/C2/C3/......的機制,這意味着咱們將很是方便的經過加機器來實現水平擴展!

咱們考慮一下這種狀況:好比C2發生了重啓,一條消息發往C3進行消費,可是這條消息的處理須要0.1S,而此時C2恰好完成重啓,那麼C2是否可能會收到這條消息呢?答案是確定的,也就是consume broker的重啓,或者水平擴容,或者不遵照先訂閱後生產消息,均可能致使消息的重複消費!關於去重的話題會在後續中予以介紹!

至於消息分發到C1/C2/C3,其實也是能夠設置策略的。


wKiom1j0xtvQMhDcAAA1kCxs4nM128.png


集羣消費 AND 廣播消費

RocketMQ的消費方式有2種,在默認狀況下,就是集羣消費,也就是上面說起的消息的負載均衡消費。另外一種消費模式,是廣播消費。廣播消費,相似於ActiveMQ中的發佈訂閱模式,消息會發給Consume Group中的每個消費者進行消費。

wKioL1j0xyey-URcAAARlvOeTPQ481.png


wKiom1j0x0LjPJi7AAAOo-VBsQ8858.png


OK,到這裏,本期的RocketMQ就結束了,我們下期見~

相關文章
相關標籤/搜索