Spring Cloud異步場景分佈式事務怎樣作?試試RocketMQ

mark

1、背景

在微服務架構中,咱們經常使用異步化的手段來提高系統的 吞吐量解耦 上下游,而構建異步架構最經常使用的手段就是使用 消息隊列(MQ),那異步架構怎樣才能實現數據一致性呢?本文主要介紹如何使用RocketMQ事務消息來解決一致性問題。git

RocketMQ 是阿里巴巴開源的分佈式消息中間件,目前已成爲 Apache 的頂級項目。歷經屢次天貓雙十一海量消息考驗,具備高性能、低延時和高可靠等特性

PS:同步場景怎樣保證一致性?請看文章《Spring Cloud同步場景分佈式事務怎樣作?試試Seataspring

 

2、MQ選型

能夠看到在 業務處理 方面來講 RocketMQ 優於其餘對手,並且原生支持 事務消息segmentfault

mark

PS:業務系統用的是其餘 MQ 產品可是又須要 事務消息 怎麼辦?學習原理本身開發實現!服務器

 

3、什麼是事務消息

例以下圖的場景:生成訂單記錄 -> MQ -> 增長積分網絡

mark

咱們是應該先 建立訂單記錄,仍是先 發送MQ消息 呢?架構

  1. 先發送MQ消息:這個明顯是不行的,由於若是消息發送成功,而訂單建立失敗的話是沒辦法把消息收回來的
  2. 先建立訂單記錄:若是訂單建立成功後MQ消息發送失敗 拋出異常,由於兩個操做都在本地事務中因此訂單數據是能夠 回滾

上面的 方式二 看似沒問題,可是 網絡是不可靠的!若是 MQ 的響應由於網絡緣由沒有收到,因此在面對不肯定的結果只好進行回滾;可是 MQ 端又確實是收到了這條消息的,只是回給客戶端的 響應丟失 了!
 
因此 事務消息 就是用來保證 本地事務MQ消息發送 的原子性!併發

 

4、RocketMQ事務消息原理

mark

主要的邏輯分爲兩個流程:框架

  • 事務消息發送及提交運維

    1. 發送 half消息
    2. MQ服務端 響應消息寫入結果
    3. 根據發送結果執行 本地事務(若是寫入失敗,此時half消息對業務 不可見,本地邏輯不執行)
    4. 根據本地事務狀態執行 Commit 或者 Rollback(Commit操做生成消息索引,消息對消費者 可見

 

  • 回查流程異步

    1. 對於長時間沒有 Commit/Rollback 的事務消息(pending 狀態的消息),從服務端發起一次 回查
    2. Producer 收到回查消息,檢查回查消息對應的 本地事務狀態
    3. 根據本地事務狀態,從新 Commit 或者 Rollback

 
邏輯時序圖

mark

 

5、異步架構一致性實現思路

從上面的原理能夠發現 事務消息 僅僅只是保證本地事務和MQ消息發送造成總體的 原子性,而投遞到MQ服務器後,並沒有法保證消費者必定能消費成功!
 
若是 消費端消費失敗 後的處理方式,建議是記錄異常信息而後 人工處理,並不建議回滾上游服務的數據(由於二者是 解耦 的,並且 複雜度 過高)
 
咱們能夠利用 MQ 的兩個特性 重試死信隊列 來協助消費端處理:

  1. 消費失敗後進行必定次數的 重試
  2. 重試後也失敗的話該消息丟進 死信隊列
  3. 另外起一個線程監聽消費 死信隊列 裏的消息,記錄日誌而且預警!

由於有 重試 因此消費者須要實現 冪等性

 

6、分佈式事務場景樣例

下面就用剛剛提到的場景:生成訂單記錄 -> MQ -> 增長積分;來簡單講一下 Spring Cloud 中應該怎麼作,詳細代碼請 下載demo 查看。
PS:怎樣安裝部署RocketMQ能夠參考《Apache RocketMQ 消息隊列部署與可視化界面安裝

6.1. 引入依賴

使用 spring-cloud-stream 框架來訪問 RocketMQ

mark

Spring Cloud Stream 是一個構建消息驅動的框架,經過抽象的定義實現應用與MQ消息隊列之間的解耦,目前支持 RabbitMQkafkaRocketMQ
mark

 

6.2. 開啓事務消息

消息生產者須要添加 transactional: true 開啓 事務消息

mark

 

6.3. 訂單服務發送half消息

mark

由於開啓了 事務消息 因此這裏發送的是 half消息 對於消費端是 不可見

 

6.4. 訂單服務監聽half消息

使用 @RocketMQTransactionListener 註解監聽 半消息,並實現 RocketMQLocalTransactionListener 接口,該接口有兩個方法

  • executeLocalTransaction:用於提交本地事務
  • checkLocalTransaction:用於事務回查

mark

若是提交事務消息失敗,需等待約1分鐘左右 事務回查 方法纔會被調用

 

6.5. 積分服務消費消息

mark

注意:由於有 重試,這裏若是是真實的業務須要自行實現 冪等性

 

6.6. 消費死信隊列預警

mark

監聽並消費死信隊列中的消息,用於記錄錯誤日誌,而且預警通知運維人員等

 

6.7. 測試用例

demo中提供了3個接口分別測試不一樣的場景:

  • 事務成功
    http://localhost:11002/success
    流程以下:

    1. 訂單建立 成功
    2. 提交事務消息 成功
    3. 消費消息增長積分 成功
  • 訂單建立成功但提交事務消息失敗
    http://localhost:11002/produceError
    流程以下:

    1. 訂單建立 成功
    2. 提交事務消息 失敗
    3. 事務回查(等待1分鐘左右) 成功
    4. 提交事務消息 成功
    5. 消費消息增長積分 成功
  • 消費消息失敗
    http://localhost:11002/consumeError
    流程以下:

    1. 訂單建立 成功
    2. 提交事務消息 成功
    3. 消費消息增長積分 失敗
    4. 重試消費消息 失敗
    5. 進入死信隊列 成功
    6. 消費死信隊列的消息 成功
    7. 記錄日誌併發出預警 成功

 

7、demo下載地址

https://gitee.com/zlt2000/microservices-platform/tree/master/zlt-demo/rocketmq-demo/rocketmq-transactional

 

推薦閱讀

 
掃碼關注有驚喜!

file

相關文章
相關標籤/搜索