Spring Cloud Alibaba 實戰(八) - 審覈業務的分佈式事務處理實現

Github博客地址html

本文主要講解RabbitMQ的介紹和安裝,Spring Cloud Stream核心概念,Spring Cloud Alibaba RocketMQ學習,異步消息推送與消費java

1 審覈業務的實現

  • com/javaedge/contentcenter/service/content/ShareService.java

假設添加積分操做很耗時,咱們的主要操做是審覈,而不關心積分,因此能夠將其異步化react

1.1 Spring實現異步的方法

◆ AsyncRestTemplategit

  • 參考文檔
    Spring 的異步HTTP請求AsyncRestTemplate

◆ @ Async註解github

  • 參考文檔
    https://spring.io/guides/gs/async-method/

◆ WebClient ( Spring 5.0引入 ,爲取代AsyncRestTemplate)web

  • 參考文檔
    https://docs.spring.io/spring/docs5.1. RELEASE/spring-framework-reference/web-reactive.html#webflux-client

◆ MQ咱們採用此法spring

2 引入MQ後的架構演進

3 MQ適用場景

  • 異步處理
  • 流量削峯填谷
  • 解耦微服務

4 MQ的選擇

流行的MQ那麼多,如何選擇?apache

  • Kafka、RabbitMQ、 RocketMQ、 ActiveMQ...

5 搭建RocketMQ

6 搭建RocketMQ控制檯

  • 修改pom.xml版本
  • 修改代碼

7 Spring消息編程模型

  • 推薦Maven依賴版本分析插件

7.1 編寫生產者

content-center編程

開始拿出三板斧:網絡

  • 引入依賴
  • 添加註解
  • 寫配置
  • 服務類添加模板類

7.2 編寫消費者

user-center

  • 依賴
  • 配置
  • com.javaedge.contentcenter.rocketmq.AddBonusTransactionListener

小結

  • RocketMQ : RocketMQMessageListener
  • ActiveMQ/Artemis : JmsListener
  • RabbitMQ : RabbitListener
  • Kafka : KafkaListener

8 分佈式事務

流程剖析、概念術語、如何實現事務呢,咱們知道Spring有事務註解,那麼直接就添加@Transaction註解吧!可這樣是萬無一失了嗎?顯然不行,由於消息已經發出,無法撤回了那麼看看RocketMQ是怎麼解決分佈式事務問題呢

8.1 實現分佈式事務流程

業務流程圖

  1. 半消息,雖然被存儲到MQserver,但會被標記爲暫時不能投遞,因此消費者不會接受到該消息
  2. 半消息發送成功,開始3
  3. 開始執行本地事務
  4. 生產者根據本地事務,發送二次確認請求
    MQServer若是從4中接收到的是
    • commit,就把消息置爲可投遞,這樣消費者就可消費該消息了
    • rollback:將該消息刪除
  5. MQServer未收到4中的二次確認消息,就會回查
  6. 生產者檢查本地事務的執行結果
  7. 根據本地事務執行結果,發送commit/rollback消息

整體來講,就是生產者把消息發送到MQ,但MQ只是將其標記,不讓消費者消費而後生產者就執行本地事務,執行完後就知道究竟是該投遞仍是丟棄該消息了!這其實就是典型的二次確認消費回查就是防止二次確認消息發送異常的容錯處理

8.2 關鍵概念

◆ 半消息( Half(Prepare) Message )暫時沒法消費的消息。生產者將消息發送到了MQ server ,但這個消息會被標記爲"暫不能投遞"狀態,先存儲起來;消費者不會去消費這條消息。並非消息的狀態,只是一種特殊的消息而已◆ 消息回查(Message Status Check )網絡斷開或生產者重啓可能致使丟失事務消息的第二次確認。當MQ Server發現消息長時間處於半消息狀態時,將向消息生產者發送請求,詢問該消息的最終狀態(提交或回滾)。

8.3 事務消息三狀態

◆ Commit提交事務消息,消費者能夠消費此消息◆ Rollback回滾事務消息, broker會刪除該消息,消費者不能消費.◆UNKNOWNbroker須要回查確認該消息的狀態

9 分佈式事務 - 編碼實現

  • 在內容中心新增事務日誌表rocketmq_transaction_log

    對照上一小節流程圖,開始code!

9.1/2 發半消息

改造接口

  • 將原先以下代碼刪除
  • 改成以下接方法

9.3 執行本地事務

  • 新建rocketmq包,並在其中建立一個新類AddBonusTransactionListener
  • 必定要在該類上加@RocketMQTransactionListener註解
    其中的txProducerGroup必定要對應哦

注意這裏的

  • msg參數即對應
  • args參數即對應
  • 回查本地事務執行結果,即經過查詢日誌記錄表,該表在執行完本地事務後更新
  • 這便可回查啦

參考

相關文章
相關標籤/搜索