跟我學RocketMQ之消息冪等

在上篇中,咱們瞭解了RocketMQ中的消息重試機制以及如何在Producer、Consumer兩端對重試消息進行處理。redis

RocketMQ會在消息消費時,按照必定規則推送消息到消費者端進行消息重試。這裏涉及到了消息冪等的概念。數據庫

首先咱們瞭解一下什麼是冪等,以及何爲消息冪等。apache

什麼是冪等

百度對 「冪等」 解釋以下緩存

設f爲一由X映射至X的一元運算,則f爲冪等的,當對於全部在X內的x,
f(f(x)) = f(x).
特別的是,恆等函數必定是冪等的,且任一常數函數也都是冪等的。網絡

這裏的關鍵是 f(f(x)) = f(x), 翻譯成通俗的解釋就是:併發

若是有一個操做,屢次執行與一次執行所產生的影響是相同的,咱們就稱這個操做是冪等的。異步

關於消息冪等

基於上述的概念,結合消息消費的場景,咱們可以很容易的總結出消息冪等的概念:分佈式

即:函數

若是消息重試屢次,消費者端對該重複消息消費屢次與消費一次的結果是相同的,而且屢次消費沒有對系統產生反作用,那麼咱們就稱這個過程是消息冪等的。高併發

例如:

支付場景下,消費者消費扣款消息,對一筆訂單進行扣款操做,該扣款操做須要扣除10元。

這個扣款操做重複屢次與執行一次的效果相同,只進行一次真實扣款,用戶的扣款記錄中對應該筆訂單的只有一條扣款流水。不會多扣。那麼咱們就說這個扣款操做是符合要求的,這個消費過程是消息冪等的。

須要進行消息冪等的場景

首先咱們回顧一下須要進行消息冪等的場景,也就是上一篇文章提到的消息重複的場景。

  1. 發送時重複:

生產者發送消息時,消息成功投遞到broker,但此時發生網絡閃斷或者生產者down掉,致使broker發送ACK失敗。此時生產者因爲未能收到消息發送響應,認爲發送失敗,所以嘗試從新發送消息到broker。當消息發送成功後,在broker中就會存在兩條相同內容的消息,最終消費者會拉取到兩條內容同樣而且Message ID也相同的消息。所以形成了消息的重複。

  1. 消費時重複:

消費消息時一樣會出現重複消費的狀況。當消費者在處理業務完成返回消費狀態給broker時,因爲網絡閃斷等異常狀況致使未能將消費完成的CONSUME_SUCCESS狀態返回給broker。broker爲了保證消息被至少消費一次的語義,會在網絡環境恢復以後再次投遞該條被處理的消息,最終形成消費者屢次收到內容同樣而且Message ID也相同的消息,形成了消息的重複。

能夠看到,不管是發送時重複仍是消費時重複,最終的效果均爲消費者消費時收到了重複的消息,那麼咱們就知道:只須要在消費者端統一進行冪等處理就可以實現消息冪等。

實現消息冪等

那麼如何才能實現消息冪等呢?

首先咱們要定義消息冪等的兩要素:

  1. 冪等令牌
  2. 處理惟一性的確保

咱們必須保證存在冪等令牌的狀況下保證業務處理結果的惟一性,才認爲冪等實現是成功的。

接下來分別解釋這兩個要素

冪等令牌

冪等令牌是生產者和消費者二者中的既定協議,在業務中一般是具有惟一業務標識的字符串,如:訂單號、流水號等。且通常由生產者端生成並傳遞給消費者端。

處理惟一性的確保

即服務端應當採用必定的策略保證同一個業務邏輯必定不會重複執行成功屢次。如:使用支付寶進行支付,買一個產品支付屢次只會成功一筆。

較爲經常使用的方式是採用緩存去重而且經過對業務標識添加數據庫的惟一索引實現冪等。

具體的思路爲:如支付場景下,支付的發起端生成了一個支付流水號,服務端處理該支付請求成功後,數據持久化成功。因爲表中對支付流水添加了惟一索引,所以當重複支付時會由於惟一索引的存在報錯 duplicate entry,服務端的業務邏輯捕獲該異常並返回調用側「重複支付」提示。這樣就不會重複扣款。

在上面場景的基礎上,咱們還能夠引入Redis等緩存組件實現去重:當支付請求打到服務端,首先去緩存進行判斷,根據key=「支付流水號」去get存儲的值,若是返回爲空,代表是首次進行支付操做同時將當前的支付流水號做爲key、value能夠爲任意字符串經過set(key,value,expireTime)存儲在redis中。

當重複的支付請求到來時,嘗試進行get(支付流水號)操做,這個操做會命中緩存,所以咱們能夠認爲該請求是重複的支付請求,服務端業務將重複支付的業務提示返回給請求方。

因爲咱們通常都會在緩存使用過程當中設置過時時間,緩存可能會失效從而致使請求穿透到持久化存儲中(如:MySQL)。所以不能由於引入緩存而放棄使用惟一索引,將兩者結合在一塊兒是一個比較好的方案。

RocketMQ場景下如何處理消息冪等

瞭解了兩個要素及典型案例以後,咱們回到消息消費的場景。

做爲一款高性能的消息中間件,RocketMQ可以保證消息不丟失但不保證消息不重複。若是在RocketMQ中實現消息去重實際也是能夠的,可是考慮到高可用以及高性能的需求,若是作了服務端的消息去重,RocketMQ就須要對消息作額外的rehash、排序等操做,這會花費較大的時間和空間等資源代價,收益並不明顯。RocketMQ考慮到正常狀況下出現重複消息的機率實際上是很小的,所以RocketMQ將消息冪等操做交給了業務方處理。

實際上上述問題的本質在於:網絡調用自己存在不肯定性,也就是既不成功也不失敗的第三種狀態,即所謂的 處理中 狀態,所以會有重複的狀況發生。這個問題是不少其餘的MQ產品一樣會遇到的,一般的方法就是要求消費方在消費消息時進行去重,也就是本文咱們說的消費冪等性。

對RocketMQ有必定使用經驗的讀者可能注意到,每條消息都有一個MessageID,那麼咱們可否使用該ID做爲去重依據,也就是上面提到的冪等令牌呢?

答案是否認的,由於MessageID可能出現衝突的狀況,所以不建議經過MessageID做爲處理依據而應當使用業務惟一標識如:訂單號、流水號等做爲冪等處理的關鍵依據。

上面也提到了,冪等依據應當由消息生產者生成,在發送消息時候,咱們可以經過消息的key設置該id,對應的API爲 org.apache.rocketmq.common.message.setKeys(String keys) 代碼以下:

Message sendMessage = new Message(
                    MessageProtocolConst.WALLET_PAY_TOPIC.getTopic(),
                    message.getBytes());複製代碼
sendMessage.setKeys("OD0000000001");複製代碼

當消息消費者收到該消息時,根據該消息的key作冪等處理,API爲 org.apache.rocketmq.common.message.getKeys() 代碼以下:

(msgs, context) -> {
        try {
            // 默認msgs只有一條消息
            for (MessageExt msg : msgs) {
                String key = msg.getKeys();
                return walletCharge(msg);
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        } catch (Exception e) {
            LOGGER.error("錢包扣款消費異常,e={}", e);
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
    }複製代碼

消費者經過getKeys()可以讀取到生產者設置的冪等依據(如:訂單號等),而後業務邏輯圍繞該id進行冪等處理便可。

若是你以爲每次都須要在生產者側setkey,在消費者側getkey,有點繁瑣。也能夠將該冪等依據設置在消息協議中,消費者接收到消息後解析該id進行冪等操做也是能夠的。只須要消息的生產者和消費者約定好如何解析id的協議便可。

具體的冪等邏輯視使用的場景而定,我在這裏嘗試從個人經驗進行一些總結。

消費端常見的冪等操做

  1. 業務操做以前進行狀態查詢

消費端開始執行業務操做時,經過冪等id首先進行業務狀態的查詢,如:修改訂單狀態環節,當訂單狀態爲成功/失敗則不須要再進行處理。那麼咱們只須要在消費邏輯執行以前經過訂單號進行訂單狀態查詢,一旦獲取到肯定的訂單狀態則對消息進行提交,通知broker消息狀態爲:ConsumeConcurrentlyStatus.CONSUME_SUCCESS

  1. 業務操做前進行數據的檢索

邏輯和第一點類似,即消費以前進行數據的檢索,若是可以經過業務惟一id查詢到對應的數據則不須要進行再後續的業務邏輯。如:下單環節中,在消費者執行異步下單以前首先經過訂單號查詢訂單是否已經存在,這裏能夠查庫也能夠查緩存。若是存在則直接返回消費成功,不然進行下單操做。

  1. 惟一性約束保證最後一道防線
上述第二點操做並不能保證必定不出現重複的數據,如:併發插入的場景下,若是沒有樂觀鎖、分佈式鎖做爲保證的前提下,頗有可能出現數據的重複插入操做,所以咱們務必要對冪等id添加惟一性索引,這樣就可以保證在併發場景下也能保證數據的惟一性。複製代碼
  1. 引入鎖機制
上述的第一點中,若是是併發更新的狀況,沒有使用悲觀鎖、樂觀鎖、分佈式鎖等機制的前提下,進行更新,極可能會出現屢次更新致使狀態的不許確。如:對訂單狀態的更新,業務要求訂單隻能從初始化->處理中,處理中->成功,處理中->失敗,不容許跨狀態更新。若是沒有鎖機制,極可能會將初始化的訂單更新爲成功,成功訂單更新爲失敗等異常的狀況。
    高併發下,建議經過狀態機的方式定義好業務狀態的變遷,經過樂觀鎖、分佈式鎖機制保證屢次更新的結果是肯定的,悲觀鎖在併發環境不利於業務吞吐量的提升所以不建議使用。複製代碼
  1. 消息記錄表
這種方案和業務層作的冪等操做相似,因爲咱們的消息id是惟一的,能夠藉助該id進行消息的去重操做,間接實現消費的冪等。
    首先準備一個消息記錄表,在消費成功的同時插入一條已經處理成功的消息id記錄到該表中,注意必定要 **與業務操做處於同一個事物** 中,當新的消息到達的時候,根據新消息的id在該表中查詢是否已經存在該id,若是存在則代表消息已經被消費過,那麼丟棄該消息再也不進行業務操做便可。
.....複製代碼

確定還有更多的場景我沒有涉及到,這裏說到的操做均是互相之間有關聯的,將他們配合使用更可以保證消費業務的冪等性。

不論怎樣,請牢記一個原則:緩存是不可靠的,查詢是不可靠的

在高併發的場景下,必定要經過持久化存儲的惟一索引以及引入鎖機制做爲共同保障數據準確性和完整性的最後一道防線!

總結

本文主要講解了何爲冪等及消息消費場景下如何傳遞惟一冪等id,並進一步分析瞭如何保證消息冪等的思路以及總結了常見的消息冪等處理方式。

套路是多變的,關鍵是掌握思路和方法,咱們的原則就是 無論執行多少次,業務表現出來的行爲是統一的 , 在這個前提下,咱們引入了操做前查庫、操做前查緩存、樂觀鎖/分佈式鎖機制、加入惟一索引等多重防重放策略,經過這些策略的綜合做用,最終達到了消息冪等的目的。

最後有句話分享,有道無術術可求。有術無道止於術。相信聰明的你必定會在技術的道路上結合實際場景將各類技術手段融會貫通,從而走的愈來愈遠。

相關文章
相關標籤/搜索