這將是RocketMQ實戰系列的最後一篇文章,該系列的文章列表以下:併發
在上一篇博客中,已經知道RocketMQ 3.0.8是支持事務回查機制,可是在RocketMQ 3.2.6中取消了這個功能,下面咱們繼續以轉帳功能分析咱們本身如何解決這個問題。性能
在正常狀況下,固然沒有問題,若是第五步(向MQ發送確認消息)出現失敗,加上RocketMQ 3.2.6版本沒有事務回查機制,就會致使這條轉帳消息,在A銀行完成了操做,可是遲遲對B銀行系統不可見!ui
用戶U1從A銀行系統轉帳給B銀行系統的用戶U2的處理過程以下:spa
第一步:A銀行系統生成一條轉帳消息,以事務消息的方式寫入RocketMQ,此時B銀行系統不可見這條消息線程
第二步:寫入MQ成功後,回調A銀行系統,對T1,T2表進行操做(很顯然須要是一個事務)orm
咱們重點關注下T2表,這個表是用來幹嗎的呢?每條轉帳消息都會在T2表中,該表有2個特殊的字段:status,updatetime。(用途會在後文詳述)blog
第三步:完成第二步,接下來發送確認消息給MQ,若是這個確認消息發送成功,那麼這條轉帳消息,將對B銀行系統可見。而後B銀行系統,會在一個事務中完成對t3,t5的操做。
若是發送確認消息給MQ失敗的處理思路:
首先,B銀行系統,有一個定時任務(好比說每隔1MIN執行一次),掃描表t5,取得一段時間內的數據,發送給A銀行系統。要知道t5中的數據,必然是A銀行系統成功處理併發送確認消息成功的轉帳數據。爲何要發送給A銀行系統呢,其實就是爲了找到那些發送確認消息失敗的轉帳數據。那麼怎麼發給A銀行系統呢,這個方式比較多,能夠考慮在來一個Topic,也能夠考慮Netty等。發送給A銀行系統,其實就是爲了更新t2表的status,updatetime。
這裏有一個關鍵,如何「掃描表t5,取得一段時間內的數據」?這就是t4的做用,在t4中記錄一個time字段,每次定時任務啓動,先更新time(好比設定爲當前系統時間,設置前的的時間爲old),而後掃描出t5中大於這個old時間的轉帳數據,如此循環往復。
其次,A銀行系統,也有一個定時任務(能夠根據業務消費能力定,能夠大一些),掃描t2表(指定status及updatetime條件),將那些確認消息發送失敗的轉帳消息找出來,更新updatetime併發送給MQ。
這樣,咱們並無改動RocketMQ 3.2.6的源碼,而是在外圍解決了事務回查!
其實到這裏,你能夠發現RocketMQ的一個特色,就是將生產者和MQ綁定,而不須要特別處理消費者,這是爲何呢?由於消息只要發往RocketMQ成功,那麼就意味着成功,爲何這麼說?
前面,咱們說過,消費者端消費消息只會產生2種錯誤,第一:timeout,第二:exception。要知道RocketMQ對於超時,會不斷重試;對於消費異常,會根據消費端的返回碼,會有重試機制保證。也就是,RocketMQ必定會讓消息獲得消費,若是消費有問題,只能是消費者的問題,而不會是RocketMQ的問題!
在前面的博客已經提到,在RocketMQ中Consumer分爲2類:Push Consumer、Pull Consumer。之前的例子都是Push Consumer,接下來,爲你們介紹下Pull Consumer。
從表面意思上來看,好像Push是MQ推送給消費者,而Pull是消費者從MQ中拉取;其實本質上都是拉取模式PULL,即消費者從MQ中輪詢取得消息。
在Push模式下,Consumer把輪詢過程封裝了,並註冊了MessageListener監聽器,取到消息後,喚醒MessageListener監聽器中的consumeMessage()進行消費,因此給咱們形成了感受上好像是「推消息」。
在Pull模式下,須要特別注意的是,本質上是從一個Topic下的全部Queue進行拉取,並且每一個Queue都必須記錄拉取位置,不然會致使重複消費。還有拉取的時間間隔,拉取的大小等等。不過全部的這一切,MQPullConsumerScheduleService都替咱們考慮清楚了,提供updateConsumeOffset去更新消費的隊列的位置(默認5S同步一次),提供setPullNextDelayTimeMillis設置下次拉取的時間間隔(應該設置的大一些,至少大於5S)。
仔細回想下,對於Push方式的回調 和 Pull方式的回調,還有什麼關鍵區別麼?
對於Push而言,不管是基於MessageListenerConcurrently的,仍是基於MessageListenerOrderly的,都有返回值的;而Pull的doPullTask的返回值倒是void?
這意味,咱們須要在pull方式中,注意本身處理每條消息消費的異常狀況!
經過運行結果,能夠印證上面的觀點:爲何每次消費都是4條開始,4條結束呢?由於一個Topic下有4個Queue,並且上面的代碼實際上會針對每一個Queue開啓一個線程去消費!
對於ActiveMQ而言,咱們能夠經過JMS Selectors機制(就是相似於SQL的語法)來實現過濾,很easy。那麼和RocketMQ Filter組件有什麼區別呢?
雖然,2者都能實現過濾,可是RocketMQ Filter的性能要更高效些,由於RocketMQ是在broker上將過濾後的數據發往filter,而後消費者直接從filter上取得數據;而ActiveMQ是消費者直接在broker上進行過濾消費!(固然,對於RocketMQ而言,Tag機制已經足夠應付平常絕大數的過濾功能,除非你的業務對性能有特別高的要求)
具體怎麼作呢?這裏我就不演示了,網上有不少例子,這裏只說下大體的過程:
第一:broker-xxx.properties中指定filter個數
第二:上傳一段JAVA代碼,其實就是一個類