【SpringCloud】Spring Cloud Alibaba 之 Seata 分佈式事務中間件(三十六)

什麼是分佈式事務問題?

單體應用

  單體應用中,一個業務操做須要調用三個模塊完成,此時數據的一致性由本地事務來保證。html

微服務應用

  隨着業務需求的變化,單體應用被拆分紅微服務應用,原來的三個模塊被拆分紅三個獨立的應用,分別使用獨立的數據源,業務操做須要調用三個服務來完成。此時每一個服務內部的數據一致性由本地事務來保證,可是全局的數據一致性問題無法保證。java

小結

  在微服務架構中因爲全局數據一致性無法保證產生的問題就是分佈式事務問題。簡單來講,一次業務操做須要操做多個數據源或須要進行遠程調用,就會產生分佈式事務問題。mysql

Seata 是什麼?

  Seata 是一款開源的分佈式事務解決方案,致力於提供高性能和簡單易用的分佈式事務服務。Seata 將爲用戶提供了 AT、TCC、SAGA 和 XA 事務模式,爲用戶打造一站式的分佈式解決方案。git

  官網:http://seata.io/github

Seata 組成

Transaction ID(XID)

  全局惟一的事務idweb

三組件

  Transaction Coordinator(TC):事務協調器,維護全局事務的運行狀態,負責協調並驅動全局事務的提交或回滾redis

  Transaction Manager(TM):控制全局事務的邊界,負責開啓一個全局事務,並最終發起全局提交或全局回滾的決議spring

  Resource Manager(RM):控制分支事務,負責分支註冊、狀態彙報,並接受事務協調的指令,驅動分支(本地)事務的提交和回滾sql

Seata 分佈式事務處理過程

  過程圖:數據庫

  

  說明:

  一、TM 向 TC 申請開啓一個全局事務,全局事務建立成功並生成一個全局惟一的 XID;

  二、XID 在微服務調用鏈路的上下文中傳播;

  三、RM 向 TC 註冊分支事務,將其歸入 XID 對應的全局事務的管轄;

  四、TM 向 TC 發起針對 XID 的全局提交或回滾決議;

  五、TC 調度 XID 下管轄的所有分支事務完成提交或回滾請求。

Seata 部署

  Seata分TC、TM和RM三個角色,TC(Server端)爲單獨服務端部署,TM和RM(Client端)由業務系統集成。

  項目結構圖:

    

Seata服務端(TC)部署

  一、下載服務端壓縮包,地址:https://github.com/seata/seata/releases

    本例下載的是 seata-server-1.2.0.tar.gz,並解壓

  二、修改事務日誌存儲模式爲 db 及數據庫鏈接信息,即修改 conf目錄中 flie.conf 文件,以下:

 1 ## transaction log store, only used in seata-server
 2 store {
 3   ## store mode: file、db
 4   mode = "db"
 5 
 6   ## file store property
 7   file {
 8     ## store location dir
 9     dir = "sessionStore"
10     # branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions
11     maxBranchSessionSize = 16384
12     # globe session size , if exceeded throws exceptions
13     maxGlobalSessionSize = 512
14     # file buffer size , if exceeded allocate new buffer
15     fileWriteBufferCacheSize = 16384
16     # when recover batch read size
17     sessionReloadReadSize = 100
18     # async, sync
19     flushDiskMode = async
20   }
21 
22   ## database store property
23   db {
24     ## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp) etc.
25     datasource = "druid"
26     ## mysql/oracle/postgresql/h2/oceanbase etc.
27     dbType = "mysql"
28     driverClassName = "com.mysql.cj.jdbc.Driver"
29     url = "jdbc:mysql://localhost:3306/seata"
30     user = "admin"
31     password = "123456"
32     minConn = 5
33     maxConn = 30
34     globalTable = "global_table"
35     branchTable = "branch_table"
36     lockTable = "lock_table"
37     queryLimit = 100
38     maxWait = 5000
39   }
40 }

    因爲咱們使用了db模式存儲事務日誌,因此咱們須要建立一個seat數據庫,建表sql在seat項目的github找到,

    地址:https://github.com/seata/seata/tree/1.2.0/script/server/db 目錄 mysql.sql 中

 1 -- -------------------------------- The script used when storeMode is 'db' --------------------------------
 2 -- the table to store GlobalSession data
 3 CREATE TABLE IF NOT EXISTS `global_table`
 4 (
 5     `xid`                       VARCHAR(128) NOT NULL,
 6     `transaction_id`            BIGINT,
 7     `status`                    TINYINT      NOT NULL,
 8     `application_id`            VARCHAR(32),
 9     `transaction_service_group` VARCHAR(32),
10     `transaction_name`          VARCHAR(128),
11     `timeout`                   INT,
12     `begin_time`                BIGINT,
13     `application_data`          VARCHAR(2000),
14     `gmt_create`                DATETIME,
15     `gmt_modified`              DATETIME,
16     PRIMARY KEY (`xid`),
17     KEY `idx_gmt_modified_status` (`gmt_modified`, `status`),
18     KEY `idx_transaction_id` (`transaction_id`)
19 ) ENGINE = InnoDB
20   DEFAULT CHARSET = utf8;
21 
22 -- the table to store BranchSession data
23 CREATE TABLE IF NOT EXISTS `branch_table`
24 (
25     `branch_id`         BIGINT       NOT NULL,
26     `xid`               VARCHAR(128) NOT NULL,
27     `transaction_id`    BIGINT,
28     `resource_group_id` VARCHAR(32),
29     `resource_id`       VARCHAR(256),
30     `branch_type`       VARCHAR(8),
31     `status`            TINYINT,
32     `client_id`         VARCHAR(64),
33     `application_data`  VARCHAR(2000),
34     `gmt_create`        DATETIME(6),
35     `gmt_modified`      DATETIME(6),
36     PRIMARY KEY (`branch_id`),
37     KEY `idx_xid` (`xid`)
38 ) ENGINE = InnoDB
39   DEFAULT CHARSET = utf8;
40 
41 -- the table to store lock data
42 CREATE TABLE IF NOT EXISTS `lock_table`
43 (
44     `row_key`        VARCHAR(128) NOT NULL,
45     `xid`            VARCHAR(96),
46     `transaction_id` BIGINT,
47     `branch_id`      BIGINT       NOT NULL,
48     `resource_id`    VARCHAR(256),
49     `table_name`     VARCHAR(32),
50     `pk`             VARCHAR(36),
51     `gmt_create`     DATETIME,
52     `gmt_modified`   DATETIME,
53     PRIMARY KEY (`row_key`),
54     KEY `idx_branch_id` (`branch_id`)
55 ) ENGINE = InnoDB
56   DEFAULT CHARSET = utf8;
mysql.sql

    

  三、修改註冊中心,使用nacos做爲註冊中心,即修改 conf目錄中 registry.conf 文件,以下:

 1 registry {
 2   # file 、nacos 、eureka、redis、zk、consul、etcd三、sofa
 3   type = "nacos"
 4 
 5   nacos {
 6     application = "seata-server"
 7     serverAddr = "localhost:8848"
 8     namespace = ""
 9     cluster = "default"
10     username = ""
11     password = ""
12   }
13 }

  四、啓動Nacos服務(參考:【SpringCloud】Spring Cloud Alibaba 之 Nacos註冊中心(二十七))、在啓動Seata服務

    Seata服務啓動命令:sh ./bin/seata-server.sh    

    

Seata客戶端(TM和RM)部署

業務數據庫準備

  訂單庫order

 1 CREATE DATABASE seata_order;
 2 
 3 USE seata_order;
 4 
 5 CREATE TABLE `order` (
 6   `id` bigint(11) NOT NULL AUTO_INCREMENT,
 7   `user_id` bigint(11) DEFAULT NULL COMMENT '用戶id',
 8   `product_id` bigint(11) DEFAULT NULL COMMENT '產品id',
 9   `count` int(11) DEFAULT NULL COMMENT '數量',
10   `money` decimal(11,0) DEFAULT NULL COMMENT '金額',
11   `status` int(1) DEFAULT NULL COMMENT '訂單狀態:0:建立中;1:已完結',
12   PRIMARY KEY (`id`)
13 ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

  庫存庫storage

 1 USE seata_storage;
 2 
 3 CREATE TABLE `storage` (
 4                          `id` bigint(11) NOT NULL AUTO_INCREMENT,
 5                          `product_id` bigint(11) DEFAULT NULL COMMENT '產品id',
 6                          `total` int(11) DEFAULT NULL COMMENT '總庫存',
 7                          `used` int(11) DEFAULT NULL COMMENT '已用庫存',
 8                          `residue` int(11) DEFAULT NULL COMMENT '剩餘庫存',
 9                          PRIMARY KEY (`id`)
10 ) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;
11 
12 INSERT INTO `seata_storage`.`storage` (`id`, `product_id`, `total`, `used`, `residue`) VALUES ('1', '1', '100', '0', '100');

  帳戶庫

 1 CREATE DATABASE seata_account;
 2 
 3 USE seata_account;
 4 
 5 
 6 CREATE TABLE `account` (
 7   `id` bigint(11) NOT NULL AUTO_INCREMENT COMMENT 'id',
 8   `user_id` bigint(11) DEFAULT NULL COMMENT '用戶id',
 9   `total` decimal(10,0) DEFAULT NULL COMMENT '總額度',
10   `used` decimal(10,0) DEFAULT NULL COMMENT '已用餘額',
11   `residue` decimal(10,0) DEFAULT '0' COMMENT '剩餘可用額度',
12   PRIMARY KEY (`id`)
13 ) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;
14 
15 INSERT INTO `seata_account`.`account` (`id`, `user_id`, `total`, `used`, `residue`) VALUES ('1', '1', '1000', '0', '1000');

  在三個庫中都插入undo_log表,sql地址:https://github.com/seata/seata/blob/develop/script/client/at/db/mysql.sql

 1 -- for AT mode you must to init this sql for you business database. the seata server not need it.
 2 CREATE TABLE IF NOT EXISTS `undo_log`
 3 (
 4     `branch_id`     BIGINT(20)   NOT NULL COMMENT 'branch transaction id',
 5     `xid`           VARCHAR(100) NOT NULL COMMENT 'global transaction id',
 6     `context`       VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',
 7     `rollback_info` LONGBLOB     NOT NULL COMMENT 'rollback info',
 8     `log_status`    INT(11)      NOT NULL COMMENT '0:normal status,1:defense status',
 9     `log_created`   DATETIME(6)  NOT NULL COMMENT 'create datetime',
10     `log_modified`  DATETIME(6)  NOT NULL COMMENT 'modify datetime',
11     UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
12 ) ENGINE = InnoDB
13   AUTO_INCREMENT = 1
14   DEFAULT CHARSET = utf8 COMMENT ='AT transaction mode undo table';

  三個數據庫

    

Order訂單服務

  一、新建訂單模塊(springcloud-seata-order9011)

    

  二、編輯pom文件,完整pom以下:

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <project xmlns="http://maven.apache.org/POM/4.0.0"
 3          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 4          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 5     <parent>
 6         <artifactId>test-springcloud</artifactId>
 7         <groupId>com.test</groupId>
 8         <version>1.0-SNAPSHOT</version>
 9     </parent>
10     <modelVersion>4.0.0</modelVersion>
11 
12     <artifactId>springcloud-seata-order9011</artifactId>
13 
14     <dependencies>
15 
16         <!-- seata-->
17         <dependency>
18             <groupId>com.alibaba.cloud</groupId>
19             <artifactId>spring-cloud-alibaba-seata</artifactId>
20             <exclusions>
21                 <exclusion>
22                     <groupId>io.seata</groupId>
23                     <artifactId>seata-all</artifactId>
24                 </exclusion>
25             </exclusions>
26         </dependency>
27         <dependency>
28             <groupId>io.seata</groupId>
29             <artifactId>seata-all</artifactId>
30             <version>1.2.0</version>
31         </dependency>
32 
33         <!-- alibaba nacos discovery -->
34         <dependency>
35             <groupId>com.alibaba.cloud</groupId>
36             <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
37         </dependency>
38 
39         <dependency>
40             <groupId>org.springframework.cloud</groupId>
41             <artifactId>spring-cloud-starter-openfeign</artifactId>
42         </dependency>
43 
44         <!-- spring boot -->
45         <dependency>
46             <groupId>org.springframework.boot</groupId>
47             <artifactId>spring-boot-starter-web</artifactId>
48         </dependency>
49         <dependency>
50             <groupId>org.springframework.boot</groupId>
51             <artifactId>spring-boot-starter-actuator</artifactId>
52         </dependency>
53         <dependency>
54             <groupId>org.springframework.boot</groupId>
55             <artifactId>spring-boot-devtools</artifactId>
56             <scope>runtime</scope>
57             <optional>true</optional>
58         </dependency>
59 
60         <dependency>
61             <groupId>org.springframework.boot</groupId>
62             <artifactId>spring-boot-starter-jdbc</artifactId>
63         </dependency>
64 
65         <dependency>
66             <groupId>org.mybatis.spring.boot</groupId>
67             <artifactId>mybatis-spring-boot-starter</artifactId>
68         </dependency>
69 
70         <!-- mysql -->
71         <dependency>
72             <groupId>mysql</groupId>
73             <artifactId>mysql-connector-java</artifactId>
74         </dependency>
75 
76         <dependency>
77             <groupId>org.projectlombok</groupId>
78             <artifactId>lombok</artifactId>
79             <optional>true</optional>
80         </dependency>
81         <dependency>
82             <groupId>org.springframework.boot</groupId>
83             <artifactId>spring-boot-starter-test</artifactId>
84             <scope>test</scope>
85         </dependency>
86 
87     </dependencies>
88 
89 </project>
pom.xml

    注意seata-all版本是:1.2.0,與服務端版本一致

  三、編輯application.yml屬性文件

 1 # 端口
 2 server:
 3   port: 9011
 4 
 5 spring:
 6   application:
 7     name: seata-order-service
 8   #   數據源基本配置
 9   cloud:
10     nacos:
11       discovery:
12         server-addr: localhost:8848
13     alibaba:
14       seata:
15         # 此處的名稱必定要與 vgroup-mapping配置的參數保持一致
16         tx-service-group: my_order_tx_group
17   datasource:
18     driver-class-name: com.mysql.cj.jdbc.Driver
19     url: jdbc:mysql://localhost:3306/seata_order?allowPublicKeyRetrieval=true&useSSL=true
20     username: admin
21     password: 123456
22     hikari:
23       connection-test-query: SELECT 1 FROM DUAL
24       minimum-idle: 1
25       maximum-pool-size: 10
26       pool-name: ${spring.application.name}-CP
27       idle-timeout: 10000
28       cachePrepStmts: true
29       prepStmtCacheSize: 250
30       prepStmtCacheSqlLimit: 2048
31       leakDetectionThreshold: 40000
32 
33 ribbon:
34   ReadTimeout: 600000
35   ConnectTimeout: 600000
36   MaxAutoRetries: 0
37   MaxAutoRetriesNextServer: 1
38 
39 # 餓加載開啓 Feign 預加載, 防止第一次請求超時
40   eager-load:
41     enabled: true
42     clients: seata-storage-service, storage-account-server
43 
44 mybatis:
45   mapperLocations: classpath:mapper/*Mapper.xml
46   # 全部entity別名類所在的包
47   type-aliases-pachage: com.test.springcloud.entities
48 
49 logging:
50   level:
51     #    root: debug
52     com.test.springcloud: debug

  四、在resource目錄中添加registry.conf配置文件,可服務服務端中的registry.conf文件以下:

 1 registry {
 2   # file 、nacos 、eureka、redis、zk、consul、etcd三、sofa
 3   type = "nacos"
 4 
 5   nacos {
 6     application = "seata-server"
 7     serverAddr = "localhost:8848"
 8     namespace = ""
 9     cluster = "default"
10     username = ""
11     password = ""
12   }
13   eureka {
14     serviceUrl = "http://localhost:8761/eureka"
15     application = "default"
16     weight = "1"
17   }
18   redis {
19     serverAddr = "localhost:6379"
20     db = 0
21     password = ""
22     cluster = "default"
23     timeout = 0
24   }
25   zk {
26     cluster = "default"
27     serverAddr = "127.0.0.1:2181"
28     sessionTimeout = 6000
29     connectTimeout = 2000
30     username = ""
31     password = ""
32   }
33   consul {
34     cluster = "default"
35     serverAddr = "127.0.0.1:8500"
36   }
37   etcd3 {
38     cluster = "default"
39     serverAddr = "http://localhost:2379"
40   }
41   sofa {
42     serverAddr = "127.0.0.1:9603"
43     application = "default"
44     region = "DEFAULT_ZONE"
45     datacenter = "DefaultDataCenter"
46     cluster = "default"
47     group = "SEATA_GROUP"
48     addressWaitTime = "3000"
49   }
50   file {
51     name = "file.conf"
52   }
53 }
54 
55 config {
56   # file、nacos 、apollo、zk、consul、etcd3
57   type = "file"
58 
59   nacos {
60     serverAddr = "localhost:8848"
61     namespace = ""
62     group = "SEATA_GROUP"
63     username = ""
64     password = ""
65   }
66   consul {
67     serverAddr = "127.0.0.1:8500"
68   }
69   apollo {
70     appId = "seata-server"
71     apolloMeta = "http://192.168.1.204:8801"
72     namespace = "application"
73   }
74   zk {
75     serverAddr = "127.0.0.1:2181"
76     sessionTimeout = 6000
77     connectTimeout = 2000
78     username = ""
79     password = ""
80   }
81   etcd3 {
82     serverAddr = "http://localhost:2379"
83   }
84   file {
85     name = "file.conf"
86   }
87 }
View Code

  五、在resource目錄中添加file.conf配置文件,內容以下;

 1 transport {
 2   # tcp udt unix-domain-socket
 3   type = "TCP"
 4   #NIO NATIVE
 5   server = "NIO"
 6   #enable heartbeat
 7   heartbeat = true
 8   # the client batch send request enable
 9   enableClientBatchSendRequest = true
10   #thread factory for netty
11   threadFactory {
12     bossThreadPrefix = "NettyBoss"
13     workerThreadPrefix = "NettyServerNIOWorker"
14     serverExecutorThread-prefix = "NettyServerBizHandler"
15     shareBossWorker = false
16     clientSelectorThreadPrefix = "NettyClientSelector"
17     clientSelectorThreadSize = 1
18     clientWorkerThreadPrefix = "NettyClientWorkerThread"
19     # netty boss thread size,will not be used for UDT
20     bossThreadSize = 1
21     #auto default pin or 8
22     workerThreadSize = "default"
23   }
24   shutdown {
25     # when destroy server, wait seconds
26     wait = 3
27   }
28   serialization = "seata"
29   compressor = "none"
30 }
31 
32 service {
33   #transaction service group mapping
34   vgroupMapping.my_order_tx_group = "default"
35   #only support when registry.type=file, please don't set multiple addresses
36   default.grouplist = "127.0.0.1:8091"
37   #degrade, current not support
38   enableDegrade = false
39   #disable seata
40   disableGlobalTransaction = false
41 }
42 
43 client {
44   rm {
45     asyncCommitBufferLimit = 10000
46     lock {
47       retryInterval = 10
48       retryTimes = 30
49       retryPolicyBranchRollbackOnConflict = true
50     }
51     reportRetryCount = 5
52     tableMetaCheckEnable = false
53     reportSuccessEnable = false
54   }
55   tm {
56     commitRetryCount = 5
57     rollbackRetryCount = 5
58   }
59   undo {
60     dataValidation = true
61     logSerialization = "jackson"
62     logTable = "undo_log"
63   }
64   log {
65     exceptionRate = 100
66   }
67 }
View Code

    注意其中的:vgroupMapping.my_order_tx_group = "default"配置

  六、編輯主啓動類

 1 @EnableFeignClients
 2 // 自動代理數據源
 3 @EnableAutoDataSourceProxy
 4 @EnableDiscoveryClient
 5 @SpringBootApplication
 6 public class SeataOrder9011 {
 7     public static void main(String[] args) {
 8         SpringApplication.run(SeataOrder9011.class, args);
 9     }
10 }

  七、編輯業務實現類

    7.一、controller以下:

 1 @Slf4j
 2 @RestController
 3 public class OrderController {
 4     @Autowired
 5     private OrderService orderService;
 6 
 7     @GetMapping("/order/create")
 8     public CommonResult create(Order order) {
 9         orderService.create(order);
10         return new CommonResult(1, "success");
11     }
12 }
View Code

    7.二、service接口:

1 public interface OrderService {
2 
3     // 新建訂單
4     public void create(Order order);
5 
6 }

    7.三、service實現類

      主要業務是:建立訂單->調用庫存服務扣減庫存->調用帳戶服務扣減帳戶餘額->修改訂單狀態

      @GlobalTransactional註解用以開啓全局事務

 1 @Service
 2 @Slf4j
 3 public class OrderServiceImpl implements OrderService {
 4 
 5     @Autowired
 6     private OrderDao orderDao;
 7 
 8     @Autowired
 9     private StorageService storageService;
10 
11     @Autowired
12     private AccountService accountService;
13 
14     /**
15      * 建立訂單->調用庫存服務扣減庫存->調用帳戶服務扣減帳戶餘額->修改訂單狀態
16      */
17     @GlobalTransactional(name = "my_test_tx_group",rollbackFor = Exception.class)
18     public void create(Order order) {
19         log.info("------->下單開始");
20         //本應用建立訂單
21         orderDao.insert(order);
22 
23         //遠程調用庫存服務扣減庫存
24         log.info("------->order-service中扣減庫存開始");
25         storageService.decrease(order.getProductId(),order.getCount());
26         log.info("------->order-service中扣減庫存結束:{}",order.getId());
27 
28         //遠程調用帳戶服務扣減餘額
29         log.info("------->order-service中扣減餘額開始");
30         accountService.decrease(order.getUserId(),order.getMoney());
31         log.info("------->order-service中扣減餘額結束");
32 
33         //修改訂單狀態爲已完成
34         log.info("------->order-service中修改訂單狀態開始");
35         orderDao.update(order.getId(), order.getUserId(),0);
36         log.info("------->order-service中修改訂單狀態結束");
37 
38         log.info("------->下單結束");
39     }
40 
41 }

    7.五、StorageService的FeignClient

1 @FeignClient(value = "seata-storage-service")
2 public interface StorageService {
3 
4     @PostMapping(value = "/storage/decrease")
5     CommonResult decrease(@RequestParam("productId") Long productId,@RequestParam("count") Integer count);
6 }
View Code 

    7.六、AccountService的FeignClient

1 @FeignClient(value = "seata-account-service")
2 public interface AccountService {
3 
4     @PostMapping(value = "/account/decrease")
5     void decrease(@RequestParam("userId") Long userId,@RequestParam("money") BigDecimal money);
6 
7 }
View Code

    7.七、OrderDao

1 @Mapper
2 public interface OrderDao {
3     // 新建訂單
4     public int insert(Order order);
5 
6     // 更新訂單 從0修改成1
7     public Order update(@Param("id") Long id, @Param("userId") Long userId, @Param("status") Integer status);
8 }
View Code

    7.八、實體類CommonResult

 1 @Data
 2 @AllArgsConstructor
 3 @NoArgsConstructor
 4 public class CommonResult<T> {
 5 
 6     private int code;
 7     private String msg;
 8     private T data;
 9 
10     public CommonResult(int code, String msg) {
11         this.code = code;
12         this.msg = msg;
13     }
14 }
View Code

    7.九、實體類Order

 1 @Data
 2 @AllArgsConstructor
 3 @NoArgsConstructor
 4 public class Order {
 5     private Long id;
 6     private Long userId;
 7     private Long productId;
 8     private Integer count;
 9     private BigDecimal money;
10     private Integer status;
11 
12 }
View Code

    7.十、在resource/mapper添加映射文件OrderMapper.xml

 1 <?xml version="1.0" encoding="UTF-8" ?>
 2 <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
 3 
 4 <mapper namespace="com.test.springcloud.dao.OrderDao">
 5 
 6     <resultMap id="BaseResultMap" type="com.test.springcloud.entities.Order" >
 7         <id property="id" jdbcType="BIGINT" column="id" />
 8         <result property="userId" jdbcType="BIGINT" column="user_id" />
 9         <result property="productId" jdbcType="BIGINT" column="product_id" />
10         <result property="count" jdbcType="INTEGER" column="count" />
11         <result property="money" jdbcType="DECIMAL" column="money" />
12         <result property="status" jdbcType="INTEGER" column="status" />
13     </resultMap>
14 
15     <insert id="insert" parameterType="com.test.springcloud.entities.Order" useGeneratedKeys="true"
16             keyProperty="id">
17         INSERT INTO `order` (id, user_id, product_id, count, money, status )
18         values(null, #{userId}, #{productId}, #{count}, #{money}, 0)
19     </insert>
20 
21     <select id="update" >
22         UPDATE `order`  set status = 1
23         WHERE id = #{id} and status = #{status} and user_id = #{userId}
24     </select>
25 </mapper>
View Code

Storage庫存服務

  一、新建訂單模塊(springcloud-seata-storage9012)

  二、編輯pom文件,同上

  三、編輯application.yml屬性文件,同上

 1 # 端口
 2 server:
 3   port: 9012
 4 
 5 spring:
 6   application:
 7     name: seata-storage-service
 8   #   數據源基本配置
 9   cloud:
10     nacos:
11       discovery:
12         server-addr: localhost:8848
13     alibaba:
14       seata:
15         tx-service-group: my_storage_tx_group
16   datasource:
17     driver-class-name: com.mysql.cj.jdbc.Driver
18     url: jdbc:mysql://localhost 3306/seata_storage?allowPublicKeyRetrieval=true&useSSL=true
19     username: admin
20     password: 123456
21     hikari:
22       connection-test-query: SELECT 1 FROM DUAL
23       minimum-idle: 1
24       maximum-pool-size: 10
25       pool-name: ${spring.application.name}-CP
26       idle-timeout: 10000
27       cachePrepStmts: true
28       prepStmtCacheSize: 250
29       prepStmtCacheSqlLimit: 2048
30       leakDetectionThreshold: 40000
31 
32 feign.hystrix.enabled: true
33 hystrix:
34   command:
35     default:
36       circuitBreaker:
37         sleepWindowInMilliseconds: 30000
38         requestVolumeThreshold: 10
39       execution:
40         isolation:
41           strategy: SEMAPHORE
42           thread:
43             timeoutInMilliseconds: 100000
44 
45 
46 
47 mybatis:
48   mapperLocations: classpath:mapper/*Mapper.xml
49   # 全部entity別名類所在的包
50   type-aliases-pachage: com.test.springcloud.entities
51 
52 logging:
53   level:
54 #    root: debug
55     com.test.springcloud: debug
View Code

  四、在resource目錄中添加registry.conf配置文件,同上

  五、在resource目錄中添加file.conf配置文件,同上

 1 transport {
 2   # tcp udt unix-domain-socket
 3   type = "TCP"
 4   #NIO NATIVE
 5   server = "NIO"
 6   #enable heartbeat
 7   heartbeat = true
 8   # the client batch send request enable
 9   enableClientBatchSendRequest = true
10   #thread factory for netty
11   threadFactory {
12     bossThreadPrefix = "NettyBoss"
13     workerThreadPrefix = "NettyServerNIOWorker"
14     serverExecutorThread-prefix = "NettyServerBizHandler"
15     shareBossWorker = false
16     clientSelectorThreadPrefix = "NettyClientSelector"
17     clientSelectorThreadSize = 1
18     clientWorkerThreadPrefix = "NettyClientWorkerThread"
19     # netty boss thread size,will not be used for UDT
20     bossThreadSize = 1
21     #auto default pin or 8
22     workerThreadSize = "default"
23   }
24   shutdown {
25     # when destroy server, wait seconds
26     wait = 3
27   }
28   serialization = "seata"
29   compressor = "none"
30 }
31 
32 service {
33   #transaction service group mapping
34   vgroupMapping.my_storage_tx_group = "default"
35   #only support when registry.type=file, please don't set multiple addresses
36   default.grouplist = "127.0.0.1:8091"
37   #degrade, current not support
38   enableDegrade = false
39   #disable seata
40   disableGlobalTransaction = false
41 }
42 
43 client {
44   rm {
45     asyncCommitBufferLimit = 10000
46     lock {
47       retryInterval = 10
48       retryTimes = 30
49       retryPolicyBranchRollbackOnConflict = true
50     }
51     reportRetryCount = 5
52     tableMetaCheckEnable = false
53     reportSuccessEnable = false
54   }
55   tm {
56     commitRetryCount = 5
57     rollbackRetryCount = 5
58   }
59   undo {
60     dataValidation = true
61     logSerialization = "jackson"
62     logTable = "undo_log"
63   }
64   log {
65     exceptionRate = 100
66   }
67 }
View Code

    注意其中的:vgroupMapping.my_storage_tx_group = "default"

  七、編輯業務實現類

    與上雷同,mapper.xml文件以下:

 1 <?xml version="1.0" encoding="UTF-8" ?>
 2 <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
 3 
 4 <mapper namespace="com.test.springcloud.dao.StorageDao">
 5 
 6     <resultMap id="BaseResultMap" type="com.test.springcloud.entities.Storage" >
 7         <id property="id" jdbcType="BIGINT" column="id" />
 8         <result property="productId" jdbcType="BIGINT" column="product_id" />
 9         <result property="total" jdbcType="INTEGER" column="total" />
10         <result property="used" jdbcType="INTEGER" column="used" />
11         <result property="residue" jdbcType="INTEGER" column="residue" />
12     </resultMap>
13 
14     <select id="decrease" >
15         UPDATE storage
16             set used = used + #{count},
17             residue = residue - #{count}
18         WHERE product_id = #{productId}
19     </select>
20 </mapper>
View Code

Account帳戶服務

  一、新建訂單模塊(springcloud-seata-storage9012)

  二、編輯pom文件,同上

  三、編輯application.yml屬性文件,同上

  四、在resource目錄中添加registry.conf配置文件,同上

  五、在resource目錄中添加file.conf配置文件,同上

    注意其中的:vgroupMapping.my_account_tx_group = "default"

  七、編輯業務實現類

    與上雷同,mapper.xml文件以下:

 1 <?xml version="1.0" encoding="UTF-8" ?>
 2 <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
 3 
 4 <mapper namespace="com.test.springcloud.dao.AccountDao">
 5 
 6     <resultMap id="BaseResultMap" type="com.test.springcloud.entities.Account" >
 7         <id property="id" jdbcType="BIGINT" column="id" />
 8         <result property="userId" jdbcType="BIGINT" column="user_id" />
 9         <result property="total" jdbcType="DECIMAL" column="total" />
10         <result property="used" jdbcType="DECIMAL" column="used" />
11         <result property="residue" jdbcType="DECIMAL" column="residue" />
12     </resultMap>
13 
14     <select id="decrease" >
15         UPDATE account
16             set used = used + #{money},
17             residue = residue - #{money}
18         WHERE user_id = #{userId}
19     </select>
20 </mapper>
View Code

驗證分佈式事務

  一、啓動Nacos,而後啓動Seata服務端

  二、分別啓動order,storage,account服務

  三、查看Seata服務端控制檯輸出內容:

    三個服務分別註冊了 RM 和 TM,都用通道鏈接

    

  四、瀏覽器訪問地址:http://localhost:9011/order/create?userId=1&productId=1&count=10&money=100,建立訂單,開始業務

  五、查看數據庫,order,storage,account,三個數據庫數據的變化

     order表

    

    storage表

    

    account表

    

     三張表數據正常,符合正確的業務邏輯

  六、在Account服務中,增長一個異常,模式業務失敗

 1 @Service
 2 @Slf4j
 3 public class AccountServiceImpl implements AccountService {
 4     @Autowired
 5     private AccountDao storageDao;
 6 
 7     public void decrease(Long userId, BigDecimal money) {
 8         log.info("------->account-service中扣減帳戶餘額開始");
 9         // 模擬業務異常,全局事務回滾
10         int n = 10/0;
11         storageDao.decrease(userId, money);
12         log.info("------->account-service中扣減帳戶餘額結束");
13     }
14

  七、從新啓動Account服務,且訪問地址:http://localhost:9011/order/create?userId=1&productId=1&count=10&money=100,建立訂單,開始業務

  八、請求報錯,數據無變化,符合正常邏輯,驗證Seata分佈式事務管理已生效

  九、還能夠去掉Order服務中的@GlobalTransactional註解,而後從新啓動Order服務

  十、訪問地址:http://localhost:9011/order/create?userId=1&productId=1&count=10&money=100,建立訂單,開始業務

    當服務報錯時,查看數據庫,三個數據庫的數據異常,不符合正常邏輯

相關文章
相關標籤/搜索