Spring Alibaba Cloud使用Seata實現分佈式事務,Nacos做爲配置中心(一)

本文目錄:java

  • 需求
  • 什麼是分佈式事務
  • 分佈式事務解決方案
  • Seata 是什麼?
  • 準備工做
  • 代碼實戰演示
  • 啓動服務功能演示
  • Seata 事務分組說明
  • Seata 分佈式事務原理解釋
  • 項目源碼地址

後端工具和環境node

  • IDE:IDEA
  • 註冊中心:nacos 1.1.3
  • Spring Cloud:Greenwich.SR3
  • Speing Alibaba Cloud:2.1.1.RELEASE
  • Seata:0.9.0
  • MybatisPlus:3.2.0

一. 需求

在開發個人開源項目 prex 時,加入工做流,解決工做流用戶與當前系統用戶同步問題時,涉及到遠程調用操做兩個數據庫所產生的事務問題,好比系統用戶在增長用戶同步工做流用戶時,系統用戶添加成功,工做流用戶沒有添加成功,則形成數據不一致問題,本地事務沒法回滾,那麼則使用分佈式事務解決方案。
開源項目:gitee.com/kaiyuantuan…mysql

二. 什麼是分佈式事務?

指一次大的操做由不一樣的小操做組成的,這些小的操做分佈在不一樣的服務器上,分佈式事務須要保證這些小操做要麼所有成功,要麼所有失敗。從本質上來講,分佈式事務就是爲了保證不一樣數據庫的數據一致性。
通俗一點說就是單體應用被拆分紅微服務應用,原來的一個模塊被拆分紅三個獨立的應用,分別使用獨立的數據源,業務操做須要調用三個服務來完成。git

三. 分佈式事務解決方案

分佈式事務做爲微服務應用中的大難題,在現有的解決方案中,我的認爲 Seata 是目前最輕量的解決方案github

四. Seata 是什麼?

Seata 是一款開源的分佈式事務解決方案,致力於提供高性能和簡單易用的分佈式事務服務。Seata 將爲用戶提供了 AT、TCC、SAGA 和 XA 事務模式,爲用戶打造一站式的分佈式解決方案。redis

AT 模式

前提

  • 基於支持本地 ACID 事務的關係型數據庫。
  • Java 應用,經過 JDBC 訪問數據庫。

總體機制

兩階段提交協議的演變:spring

  • 一階段:業務數據和回滾日誌記錄在同一個本地事務中提交,釋放本地鎖和鏈接資源。
  • 二階段:
    • 提交異步化,很是快速地完成。
    • 回滾經過一階段的回滾日誌進行反向補償。

寫隔離

  • 一階段本地事務提交前,須要確保先拿到全局鎖 。
  • 拿不到 全局鎖 ,不能提交本地事務。
  • 拿 全局鎖 的嘗試被限制在必定範圍內,超出範圍將放棄,並回滾本地事務,釋放本地鎖。

以一個示例來講明:
兩個全局事務 tx1 和 tx2,分別對 a 表的 m 字段進行更新操做,m 的初始值 1000。sql

tx1 先開始,開啓本地事務,拿到本地鎖,更新操做 m = 1000 - 100 = 900。本地事務提交前,先拿到該記錄的 全局鎖 ,本地提交釋放本地鎖。 tx2 後開始,開啓本地事務,拿到本地鎖,更新操做 m = 900 - 100 = 800。本地事務提交前,嘗試拿該記錄的 全局鎖 ,tx1 全局提交前,該記錄的全局鎖被 tx1 持有,tx2 須要重試等待 全局鎖 。數據庫

file

tx1 二階段全局提交,釋放 全局鎖 。tx2 拿到 全局鎖 提交本地事務。後端

file

若是 tx1 的二階段全局回滾,則 tx1 須要從新獲取該數據的本地鎖,進行反向補償的更新操做,實現分支的回滾。

此時,若是 tx2 仍在等待該數據的 全局鎖,同時持有本地鎖,則 tx1 的分支回滾會失敗。分支的回滾會一直重試,直到 tx2 的 全局鎖 等鎖超時,放棄 全局鎖 並回滾本地事務釋放本地鎖,tx1 的分支回滾最終成功。

由於整個過程 全局鎖 在 tx1 結束前一直是被 tx1 持有的,因此不會發生 髒寫 的問題。

讀隔離

在數據庫本地事務隔離級別 讀已提交(Read Committed) 或以上的基礎上,Seata(AT 模式)的默認全局隔離級別是 讀未提交(Read Uncommitted) 。

若是應用在特定場景下,必須要求全局的 讀已提交 ,目前 Seata 的方式是經過 SELECT FOR UPDATE 語句的代理。

file

SELECT FOR UPDATE 語句的執行會申請 全局鎖 ,若是 全局鎖 被其餘事務持有,則釋放本地鎖(回滾 SELECT FOR UPDATE 語句的本地執行)並重試。這個過程當中,查詢是被 block 住的,直到 全局鎖 拿到,即讀取的相關數據是 已提交 的,才返回。

出於整體性能上的考慮,Seata 目前的方案並無對全部 SELECT 語句都進行代理,僅針對 FOR UPDATE 的 SELECT 語句。

工做機制

以一個示例來講明整個 AT 分支的工做過程。

業務表:product

Field Type Key
id bigint(20) PRI
name varchar(100)
since varchar(100)

AT 分支事務的業務邏輯:

update product set name = 'GTS' where name = 'TXC';

一階段

過程:

  1. 解析 SQL:獲得 SQL 的類型(UPDATE),表(product),條件(where name = 'TXC')等相關的信息。
  2. 查詢前鏡像:根據解析獲得的條件信息,生成查詢語句,定位數據。
select id, name, since from product where name = 'TXC';
複製代碼

獲得前鏡像:

id name since 1 TXC 2014

  1. 執行業務 SQL:更新這條記錄的 name 爲 'GTS'。
  2. 查詢後鏡像:根據前鏡像的結果,經過 主鍵 定位數據。
select id, name, since from product where id = 1`;
複製代碼

獲得後鏡像:

id name since 1 GTS 2014

  1. 插入回滾日誌:把先後鏡像數據以及業務 SQL 相關的信息組成一條回滾日誌記錄,插入到 UNDO_LOG 表中。
{
	"branchId": 641789253,
	"undoItems": [{
		"afterImage": {
			"rows": [{
				"fields": [{
					"name": "id",
					"type": 4,
					"value": 1
				}, {
					"name": "name",
					"type": 12,
					"value": "GTS"
				}, {
					"name": "since",
					"type": 12,
					"value": "2014"
				}]
			}],
			"tableName": "product"
		},
		"beforeImage": {
			"rows": [{
				"fields": [{
					"name": "id",
					"type": 4,
					"value": 1
				}, {
					"name": "name",
					"type": 12,
					"value": "TXC"
				}, {
					"name": "since",
					"type": 12,
					"value": "2014"
				}]
			}],
			"tableName": "product"
		},
		"sqlType": "UPDATE"
	}],
	"xid": "xid:xxx"
}
複製代碼
  1. 提交前,向 TC 註冊分支:申請 product 表中,主鍵值等於 1 的記錄的 全局鎖 。
  2. 本地事務提交:業務數據的更新和前面步驟中生成的 UNDO LOG 一併提交。
  3. 將本地事務提交的結果上報給 TC。

二階段-回滾

  1. 收到 TC 的分支回滾請求,開啓一個本地事務,執行以下操做。
  2. 經過 XID 和 Branch ID 查找到相應的 UNDO LOG 記錄。
  3. 數據校驗:拿 UNDO LOG 中的後鏡與當前數據進行比較,若是有不一樣,說明數據被當前全局事務以外的動做作了修改。這種狀況,4. 須要根據配置策略來作處理,詳細的說明在另外的文檔中介紹。 根據 UNDO LOG 中的前鏡像和業務 SQL 的相關信息生成並執行回滾的語句:
update product set name = 'TXC' where id = 1;
複製代碼
  1. 提交本地事務。並把本地事務的執行結果(即分支事務回滾的結果)上報給 TC。

二階段-提交

  1. 收到 TC 的分支提交請求,把請求放入一個異步任務的隊列中,立刻返回提交成功的結果給 TC。
  2. 異步任務階段的分支提交請求將異步和批量地刪除相應 UNDO LOG 記錄。

附錄

回滾日誌表

UNDO_LOG Table:不一樣數據庫在類型上會略有差異。

以 MySQL 爲例:

Field Type
branch_id bigint PK
xid varchar(100)
context varchar(128)
rollback_info longblob
log_status tinyint
log_created datetime
log_modified datetime
-- 注意此處0.7.0+ 增長字段 context
CREATE TABLE `undo_log` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `branch_id` bigint(20) NOT NULL,
  `xid` varchar(100) NOT NULL,
  `context` varchar(128) NOT NULL,
  `rollback_info` longblob NOT NULL,
  `log_status` int(11) NOT NULL,
  `log_created` datetime NOT NULL,
  `log_modified` datetime NOT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
複製代碼

TCC 模式

回顧總覽中的描述:一個分佈式的全局事務,總體是 兩階段提交 的模型。全局事務是由若干分支事務組成的,分支事務要知足 兩階段提交 的模型要求,即須要每一個分支事務都具有本身的:

  • 一階段 prepare 行爲
  • 二階段 commit 或 rollback 行爲

file

根據兩階段行爲模式的不一樣,咱們將分支事務劃分爲 Automatic (Branch) Transaction Mode 和 Manual (Branch) Transaction Mode.

AT 模式(參考連接 TBD)基於 支持本地 ACID 事務 的 關係型數據庫:

  • 一階段 prepare 行爲:在本地事務中,一併提交業務數據更新和相應回滾日誌記錄。
  • 二階段 commit 行爲:立刻成功結束,自動 異步批量清理回滾日誌。
  • 二階段 rollback 行爲:經過回滾日誌,自動 生成補償操做,完成數據回滾。

相應的,TCC 模式,不依賴於底層數據資源的事務支持:

  • 一階段 prepare 行爲:調用 自定義 的 prepare 邏輯。
  • 二階段 commit 行爲:調用 自定義 的 commit 邏輯。
  • 二階段 rollback 行爲:調用 自定義 的 rollback 邏輯。 所謂 TCC 模式,是指支持把 自定義 的分支事務歸入到全局事務的管理中

Saga 模式

Saga 模式是 SEATA 提供的長事務解決方案,在 Saga 模式中,業務流程中每一個參與者都提交本地事務,當出現某一個參與者失敗則補償前面已經成功的參與者,一階段正向服務和二階段補償服務都由業務開發實現。

file

理論基礎:Hector & Kenneth 發表論⽂ Sagas (1987)

適用場景:

  • 業務流程長、業務流程多
  • 參與者包含其它公司或遺留系統服務,沒法提供 - TCC 模式要求的三個接口

優點:

  • 一階段提交本地事務,無鎖,高性能
  • 事件驅動架構,參與者可異步執行,高吞吐
  • 補償服務易於實現

缺點:

  • 不保證隔離性(應對方案見用戶文檔)

五. 準備工做

  • 這裏咱們使用 Nacos 做爲註冊中心,Nacos 的安裝及使用能夠參考
  • 咱們從官網下載 seata-server,這裏下載的是 seata-server-0.9.0.zip,下載地址:github.com/seata/seata…
    github 地址下載速度很慢,能夠在公衆號後臺回覆seata安裝包快速獲取百度雲下載連接
  • 下載完成後解壓 seata-server 安裝包到指定目錄

解壓完成後咱們獲得了幾個文件夾

file

  • bin
    存放各個系統的 seata server 啓動腳本
  • conf
    存在 seata server 啓動時所須要的配置信息、數據庫模式下所須要的建表語句
  • lib
    運行 seata server 所須要的依賴包列表

配置 Seata Server

seata server全部的配置都在 conf 文件夾內,該文件夾內有兩個文件咱們必需要詳細介紹下。

seata server默認使用 file(文件方式)進行存儲事務日誌、事務運行信息,咱們能夠經過-m db 腳本參數的形式來指定,目前僅支持 file、db 這兩種方式。

  • file.conf
    該文件用於配置存儲方式、透傳事務信息的 NIO 等信息,默認對應 registry.conf 文件內的 file 方式配置
  • registry.conf
    seata server 核心配置文件,能夠經過該文件配置服務註冊方式、配置讀取方式。
    註冊方式目前支持 file 、nacos 、eureka、redis、zk、consul、etcd三、sofa 等方式,默認爲 file,對應讀取 file.conf 內的註冊方式信息。
    讀取配置信息的方式支持 file、nacos 、apollo、zk、consul、etcd3 等方式,默認爲 file,對應讀取 file.conf 文件內的配置。

修改 conf 目錄下的 file.conf 配置文件,主要修改自定義事務組名稱,事務日誌存儲模式及數據庫鏈接信息

transport {
  ...省略
}
service {
  #vgroup->rgroup
  vgroup_mapping.prex_tx_group = "default" #修改事務組名稱爲:prex_tx_group,和客戶端自定義的名稱對應
  #only support single node
  default.grouplist = "127.0.0.1:8091"
  #degrade current not support
  enableDegrade = false
  #disable
  disable = false
  #unit ms,s,m,h,d represents milliseconds, seconds, minutes, hours, days, default permanent
  max.commit.retry.timeout = "-1"
  max.rollback.retry.timeout = "-1"
}

## transaction log store
store {
  ## store mode: file、db
  mode = "db" #修改此處將事務信息存儲到db數據庫中

  ## database store
  db {
    ## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp) etc.
    datasource = "druid"
    ## mysql/oracle/h2/oceanbase etc.
    db-type = "mysql"
    driver-class-name = "com.mysql.jdbc.Driver"
    url = "jdbc:mysql://localhost:3306/seat" #修改數據庫鏈接地址
    user = "root" #修改數據庫用戶名
    password = "root" #修改數據庫密碼
    min-conn = 1
    max-conn = 3
    global.table = "global_table"
    branch.table = "branch_table"
    lock-table = "lock_table"
    query-limit = 100
  }
}
複製代碼

說明:

  • 存儲事務日誌可使用 file 文件和 db 數據庫兩種方式
  • 因爲咱們使用了 db 模式存儲事務日誌,因此咱們須要建立一個 seat 數據庫,建表 sql 在 seata-server 的/conf/db_store.sql 中

file

  • 修改 conf 目錄下的registry.conf配置文件,指明註冊中心爲 nacos,及修改 nacos 鏈接信息便可;
registry {
  # file 、nacos 、eureka、redis、zk、consul、etcd三、sofa
  type = "nacos"

  nacos {
    serverAddr = "localhost:8848"
    namespace = ""
    cluster = "default"
  }
  ... 省略
}
}

複製代碼

配置完成後啓動 Seata

啓動 seata server 的腳本位於 bin 文件內,Linux/Mac 環境使用 seata-server.sh 腳本啓動,Windows 環境使用 seata-server.bat 腳本啓動。

Linux/Mac啓動方式示例以下所示:

nohup sh seata-server.sh -p 8091 -h 127.0.0.1 -m db &> seata.log &
複製代碼

經過 nohup 命令讓 seata server 在系統後臺運行。

腳本參數:

  • -p
    指定啓動 seata server 的端口號。
  • -h
    指定 seata server 所綁定的主機,這裏配置要注意指定的主機 IP 要與業務服務內的配置文件保持一致,如:-h 192.168.1.10,業務服務配置文件內應該配置 192.168.1.10,即便在同一臺主機上也要保持一致。
  • -m
    事務日誌、事務執行信息存儲的方式,目前支持 file(文件方式)、db(數據庫方式,建表語句請查看 config/db_store.sql、config/db_undo_log.sql)

查看啓動日誌

file

當咱們看到-Server started 時並未發現其餘錯誤信息,咱們的 seata server 已經啓動成功

六. 實戰演示

讓咱們從一個微服務示例開始 用戶購買商品的業務邏輯。整個業務邏輯由 3 個微服務提供支持:

  • 倉儲服務:對給定的商品扣除倉儲數量。
  • 訂單服務:根據採購需求建立訂單。
  • 賬戶服務:從用戶賬戶中扣除餘額。

架構圖

file

數據庫

建立業務數據庫

db-order:存儲訂單的數據庫
db-storage:存儲庫存的數據庫
db-account:存儲帳戶信息的數據庫

order 訂單表:

DROP TABLE IF EXISTS `order`;
CREATE TABLE `order` (
  `id` int(20) NOT NULL AUTO_INCREMENT COMMENT '主鍵Id',
  `user_id` int(20) DEFAULT NULL COMMENT '用戶Id',
  `pay_money` decimal(11,0) DEFAULT NULL COMMENT '付款金額',
  `product_id` int(20) DEFAULT NULL COMMENT '商品Id',
  `status` int(11) DEFAULT NULL COMMENT '狀態',
  `count` int(11) DEFAULT NULL COMMENT '商品數量',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=latin1 ROW_FORMAT=DYNAMIC COMMENT='訂單表';

SET FOREIGN_KEY_CHECKS = 1;
複製代碼

product 商品表:

DROP TABLE IF EXISTS `product`;
CREATE TABLE `product` (
  `id` int(20) NOT NULL COMMENT '主鍵',
  `product_id` int(11) DEFAULT NULL COMMENT '商品Id',
  `price` decimal(11,0) DEFAULT NULL COMMENT '價格',
  `count` int(11) DEFAULT NULL COMMENT '庫存數量',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=latin1 ROW_FORMAT=DYNAMIC COMMENT='倉儲服務';

-- ----------------------------
-- Records of product
-- ----------------------------
BEGIN;
INSERT INTO `product` VALUES (1, 1, 50, 100);
COMMIT;

SET FOREIGN_KEY_CHECKS = 1;
複製代碼

account 帳戶表:

DROP TABLE IF EXISTS `account`;
CREATE TABLE `account` (
  `id` int(20) NOT NULL AUTO_INCREMENT COMMENT '主鍵Id',
  `user_id` int(20) DEFAULT NULL COMMENT '用戶Id',
  `balance` decimal(11,0) DEFAULT NULL COMMENT '餘額',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=latin1 ROW_FORMAT=DYNAMIC;

-- ----------------------------
-- Records of account
-- ----------------------------
BEGIN;
INSERT INTO `account` VALUES (1, 1, 100);
COMMIT;

SET FOREIGN_KEY_CHECKS = 1;
複製代碼

建立日誌回滾表

須要在每一個數據庫中建立日誌回滾表,建表 sql 在 seata-server 的/conf/db_undo_log.sql 中。

分佈式事務問題產生

三個服務,一個訂單服務,一個倉儲服務,一個帳戶服務。當用戶下單時,會在訂單服務中建立一個訂單,而後經過遠程調用庫存服務來扣減下單商品的庫存,再經過遠程調用帳戶服務來扣減用戶帳戶裏面的餘額,最後在訂單服務中修改訂單狀態爲已完成。該操做跨越三個數據庫,有兩次遠程調用,很明顯會有分佈式事務問題
複製代碼

工程結構

file

nacos-seata-account-server 帳戶服務
nacos-seata-order-server 訂單服務
nacos-seata-storage-server 倉儲服務

客戶端配置

  • 對 nacos-seata-account-server、nacos-seata-order-server 和 nacos-seata-storage-server 三個 seata 的客戶端進行配置,它們配置大體相同,咱們下面以 nacos-seata-account-server 的配置爲例;

  • 修改 application.yml 文件,自定義事務組的名稱

spring:
  cloud:
    alibaba:
      seata:
        tx-service-group: prex_tx_group #自定義事務組名稱須要與seata-server中的對應
複製代碼
  • 添加並修改 file.conf 配置文件,主要是修改自定義事務組名稱
service {
  #vgroup->rgroup
  vgroup_mapping.prex_tx_group = "default" #修改自定義事務組名稱
  #only support single node
  default.grouplist = "127.0.0.1:8091"
  #degrade current not support
  enableDegrade = false
  #disable
  disable = false
  #unit ms,s,m,h,d represents milliseconds, seconds, minutes, hours, days, default permanent
  max.commit.retry.timeout = "-1"
  max.rollback.retry.timeout = "-1"
  disableGlobalTransaction = false
}
複製代碼

添加並修改 registry.conf 配置文件,主要是將註冊中心改成 nacos

registry {
  # file 、nacos 、eureka、redis、zk
  type = "nacos" #修改成nacos

  nacos {
    serverAddr = "localhost:8848" #修改成nacos的鏈接地址
    namespace = ""
    cluster = "default"
  }
}
複製代碼

代碼只展現核心代碼 具體代碼文章尾部連接

  • 在啓動類中取消數據源的自動建立
@EnableDiscoveryClient
@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)
@MapperScan("com.xd.example.seata.mapper")
public class NacosSeataAccountServerApplication {

	public static void main(String[] args) {
		SpringApplication.run(NacosSeataAccountServerApplication.class, args);
	}

}
複製代碼
  • 配置 MybatisPlus 使用 Seata 對數據源進行代理

MyBatisPlusConfig:

/**
 * @Classname MyBatisPlusConfig
 * @Description 配置MybatisPlus使用Seata對數據源進行代理
 * @Author Created by Lihaodong (alias:小東啊) im.lihaodong@gmail.com
 * @Date 2019-11-25 11:21
 * @Version 1.0
 */
@Configuration
public class MyBatisPlusConfig {

    @Value("${mybatis-plus.mapper-locations}")
    private String mapperLocations;

    /**
     * @param sqlSessionFactory SqlSessionFactory
     * @return SqlSessionTemplate
     */
    @Bean
    public SqlSessionTemplate sqlSessionTemplate(SqlSessionFactory sqlSessionFactory) {
        return new SqlSessionTemplate(sqlSessionFactory);
    }

    /**
     * 從配置文件獲取屬性構造datasource,注意前綴,這裏用的是druid,根據本身狀況配置,
     * 原生datasource前綴取"spring.datasource"
     *
     * @return
     */
    @Bean
    @ConfigurationProperties(prefix = "spring.datasource.hikari")
    public DataSource hikariDataSource() {
        return new HikariDataSource();
    }

    /**
     * 構造datasource代理對象,替換原來的datasource
     *
     * @param hikariDataSource
     * @return
     */
    @Primary
    @Bean("dataSource")
    public DataSourceProxy dataSourceProxy(DataSource hikariDataSource) {
        return new DataSourceProxy(hikariDataSource);
    }

    @Bean(name = "sqlSessionFactory")
    public SqlSessionFactory sqlSessionFactoryBean(DataSourceProxy dataSourceProxy) throws Exception {
        MybatisSqlSessionFactoryBean bean = new MybatisSqlSessionFactoryBean();
        bean.setDataSource(dataSourceProxy);
        ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
        bean.setMapperLocations(resolver.getResources(mapperLocations));

        SqlSessionFactory factory = null;
        try {
            factory = bean.getObject();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        return factory;
    }

    /**
     * MP 自帶分頁插件
     *
     * @return
     */
    @Bean
    public PaginationInterceptor paginationInterceptor() {
        PaginationInterceptor page = new PaginationInterceptor();
        page.setDialectType("mysql");
        return page;
    }
}

複製代碼
  • 使用@GlobalTransactional 註解開啓分佈式事務
package com.xd.example.seata.service.impl;

import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.xd.example.seata.domain.Order;
import com.xd.example.seata.mapper.OrderMapper;
import com.xd.example.seata.service.IOrderService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.xd.example.seata.service.RemoteAccountService;
import com.xd.example.seata.service.RemoteStorageService;
import io.seata.core.context.RootContext;
import io.seata.spring.annotation.GlobalTransactional;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 * <p>
 * 訂單表 服務實現類
 * </p>
 *
 * @author lihaodong
 * @since 2019-11-25
 */
@Slf4j
@Service
public class OrderServiceImpl extends ServiceImpl<OrderMapper, Order> implements IOrderService {

    @Autowired
    private RemoteStorageService remoteStorageService;

    @Autowired
    private RemoteAccountService remoteAccountService;

    @GlobalTransactional(rollbackFor = Exception.class)
    @Override
    public void createOrder(Order order) {
        log.info("下單開始,用戶:{},商品:{},數量:{},金額:{}", order.getUserId(), order.getProductId(), order.getCount(), order.getPayMoney());
        //建立訂單
        order.setStatus(0);
        boolean save = save(order);
        log.info("保存訂單{}", save ? "成功" : "失敗");
        log.info("當前 XID: {}", RootContext.getXID());
        //遠程調用庫存服務扣減庫存
        log.info("扣減庫存開始");
        remoteStorageService.decrease(order.getProductId(), order.getCount());
        log.info("扣減庫存結束");

        //遠程調用帳戶服務扣減餘額
        log.info("扣減餘額開始");
        remoteAccountService.decrease(order.getUserId(), order.getPayMoney());
        log.info("扣減餘額結束");

        //修改訂單狀態爲已完成
        log.info("修改訂單狀態開始");
        update(Wrappers.<Order>lambdaUpdate().set(Order::getStatus, 1).eq(Order::getUserId, order.getUserId()));
        log.info("修改訂單狀態結束");

        log.info("下單結束");
    }
}

複製代碼

七. 啓動服務功能演示

  1. 分別運行 nacos-seata-order-server、nacos-seata-storage-server 和 nacos-seata-account-server 三個服務

    file
    能夠看到 seata 註冊成功

  2. 查詢數據庫初始數據信息

    file
    file
    file

  3. 打開瀏覽器/Postman 調用接口進行下單操做:http://localhost:8081/order/create?userId=1&productId=1&count=1&payMoney=50
    結果:

    file
    查看控制檯打印: 訂單服務:

file

倉儲服務:

file

帳戶服務:

file

  1. 再次數據庫查詢
    file
    file
    file
  2. 咱們在 nacos-seata-account-server 中製造一個超時異常後(其餘異常也行),調用下單接口
package com.xd.example.seata.service.impl;

import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.xd.example.seata.domain.Account;
import com.xd.example.seata.mapper.AccountMapper;
import com.xd.example.seata.service.IAccountService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import io.seata.core.context.RootContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import java.math.BigDecimal;
import java.util.Optional;

/**
 * <p>
 * 服務實現類
 * </p>
 *
 * @author lihaodong
 * @since 2019-11-25
 */
@Slf4j
@Service
public class AccountServiceImpl extends ServiceImpl<AccountMapper, Account> implements IAccountService {

    @Override
    public boolean reduceBalance(Integer userId, BigDecimal balance) throws Exception {

        log.info("當前 XID: {}", RootContext.getXID());
        checkBalance(userId, balance);

        log.info("開始扣減用戶 {} 餘額", userId);
        //模擬超時異常
        try {
            Thread.sleep(10 * 1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        Integer record = baseMapper.reduceBalance(userId, balance);
        log.info("結束扣減用戶 {} 餘額結果:{}", userId, record > 0 ? "操做成功" : "扣減餘額失敗");
        return record > 0;
    }

    private void checkBalance(Integer userId, BigDecimal price) throws Exception {
        log.info("檢查用戶 {} 餘額", userId);

        Optional<Account> account = Optional.ofNullable(baseMapper.selectOne(Wrappers.<Account>lambdaQuery().eq(Account::getUserId, userId)));
        if (account.isPresent()) {
            BigDecimal balance = account.get().getBalance();
            if (balance.compareTo(price) == -1) {
                log.warn("用戶 {} 餘額不足,當前餘額:{}", userId, balance);
                throw new Exception("餘額不足");
            }
        }
    }
}
複製代碼

修改完會重啓帳戶服務,再次發送請求

file
訂單服務控制檯:
file

能夠看到訂單正常,扣除庫存正常,帳戶服務讀取超時異常

  1. 發現下單後數據庫數據並無任何改變

    file

  2. 咱們在 seata-order-service 中註釋掉@GlobalTransactional 來看看會發生什麼

//    @GlobalTransactional(name = "prex-create-order",rollbackFor = Exception.class)
    @Override
    public void createOrder(Order order) {
        log.info("當前 XID: {}", RootContext.getXID());
        log.info("下單開始,用戶:{},商品:{},數量:{},金額:{}", order.getUserId(), order.getProductId(), order.getCount(), order.getPayMoney());
        //建立訂單
        order.setStatus(0);
        boolean save = save(order);
        log.info("保存訂單{}", save ? "成功" : "失敗");

		... 省略代碼
}
複製代碼

保存重啓訂單服務,再次請求接口 因爲 nacos-seata-account-server 的超時會致使當庫存和帳戶金額扣減後訂單狀態並無設置爲已經完成

file

八. Seata 事務分組

下一篇更新

Seata 分佈式事務原理解釋

下一篇更新

項目源碼地址

gitee.com/li_haodong/…

參考資料:
seata.io/zh-cn/

相關文章
相關標籤/搜索