在微服務架構中,咱們經常使用異步化的手段來提高系統的 吞吐量 和 解耦 上下游,而構建異步架構最經常使用的手段就是使用 消息隊列(MQ)
,那異步架構怎樣才能實現數據一致性呢?本文主要介紹如何使用RocketMQ
的事務消息
來解決一致性問題。git
RocketMQ 是阿里巴巴開源的分佈式消息中間件,目前已成爲 Apache 的頂級項目。歷經屢次天貓雙十一海量消息考驗,具備高性能、低延時和高可靠等特性
PS:同步場景怎樣保證一致性?請看文章《Spring Cloud同步場景分佈式事務怎樣作?試試Seata》spring
能夠看到在 業務處理 方面來講 RocketMQ
優於其餘對手,並且原生支持 事務消息segmentfault
PS:業務系統用的是其餘 MQ
產品可是又須要 事務消息 怎麼辦?學習原理本身開發實現!服務器
例以下圖的場景:生成訂單記錄 -> MQ -> 增長積分網絡
咱們是應該先 建立訂單記錄,仍是先 發送MQ消息 呢?架構
上面的 方式二 看似沒問題,可是 網絡是不可靠的!若是 MQ
的響應由於網絡緣由沒有收到,因此在面對不肯定的結果只好進行回滾;可是 MQ
端又確實是收到了這條消息的,只是回給客戶端的 響應丟失 了!
因此 事務消息
就是用來保證 本地事務 與 MQ消息發送 的原子性!併發
主要的邏輯分爲兩個流程:框架
事務消息發送及提交:運維
half消息
MQ服務端
響應消息寫入結果本地事務
(若是寫入失敗,此時half消息對業務 不可見,本地邏輯不執行)Commit
或者 Rollback
(Commit操做生成消息索引,消息對消費者 可見)
回查流程:異步
Commit/Rollback
的事務消息(pending
狀態的消息),從服務端發起一次 回查 Producer
收到回查消息,檢查回查消息對應的 本地事務狀態
Commit
或者 Rollback
邏輯時序圖
從上面的原理能夠發現 事務消息
僅僅只是保證本地事務和MQ消息發送造成總體的 原子性
,而投遞到MQ服務器後,並沒有法保證消費者必定能消費成功!
若是 消費端消費失敗 後的處理方式,建議是記錄異常信息而後 人工處理,並不建議回滾上游服務的數據(由於二者是 解耦 的,並且 複雜度 過高)
咱們能夠利用 MQ
的兩個特性 重試
和 死信隊列
來協助消費端處理:
重試
死信隊列
裏死信隊列
裏的消息,記錄日誌而且預警!由於有 重試
因此消費者須要實現 冪等性
下面就用剛剛提到的場景:生成訂單記錄 -> MQ -> 增長積分;來簡單講一下 Spring Cloud
中應該怎麼作,詳細代碼請 下載demo 查看。
PS:怎樣安裝部署RocketMQ能夠參考《Apache RocketMQ 消息隊列部署與可視化界面安裝》
使用 spring-cloud-stream
框架來訪問 RocketMQ
Spring Cloud Stream 是一個構建消息驅動的框架,經過抽象的定義實現應用與MQ消息隊列之間的解耦,目前支持RabbitMQ
、kafka
和RocketMQ
![]()
消息生產者須要添加 transactional: true
開啓 事務消息
由於開啓了事務消息
因此這裏發送的是half消息
對於消費端是不可見
的
使用 @RocketMQTransactionListener
註解監聽 半消息,並實現 RocketMQLocalTransactionListener
接口,該接口有兩個方法
若是提交事務消息失敗,需等待約1分鐘左右 事務回查 方法纔會被調用
注意:由於有重試
,這裏若是是真實的業務須要自行實現冪等性
監聽並消費死信隊列中的消息,用於記錄錯誤日誌,而且預警通知運維人員等
demo中提供了3個接口分別測試不一樣的場景:
事務成功
http://localhost:11002/success
流程以下:
訂單建立成功但提交事務消息失敗
http://localhost:11002/produceError
流程以下:
消費消息失敗
http://localhost:11002/consumeError
流程以下:
推薦閱讀
掃碼關注有驚喜!