Spring Boot 集成 Seata 解決分佈式事務問題

seata 簡介

Seata 是 阿里巴巴2019年開源的分佈式事務解決方案,致力於在微服務架構下提供高性能和簡單易用的分佈式事務服務。在 Seata 開源以前,Seata 對應的內部版本在阿里內部一直扮演着分佈式一致性中間件的角色,幫助阿里度過歷年的雙11,對各業務進行了有力的支撐。通過多年沉澱與積累,2019.1 Seata 正式宣佈對外開源 。目前 Seata 1.0 已經 GA。html

微服務中的分佈式事務問題

讓咱們想象一下傳統的單片應用程序,它的業務由3個模塊組成,他們使用單個本地數據源。天然,本地事務將保證數據的一致性。java

微服務架構已發生了變化。上面提到的3個模塊被設計爲3種服務。本地事務天然能夠保證每一個服務中的數據一致性。可是整個業務邏輯範圍如何?mysql

Seata怎麼辦?git

咱們說,分佈式事務是由一批分支事務組成的全局事務,一般分支事務只是本地事務。github

Seata有3個基本組成部分:spring

  • 事務協調器(TC):維護全局事務和分支事務的狀態,驅動全局提交或回滾。
  • 事務管理器TM:定義全局事務的範圍:開始全局事務,提交或回滾全局事務。
  • 資源管理器(RM):管理正在處理的分支事務的資源,與TC對話以註冊分支事務並報告分支事務的狀態,並驅動分支事務的提交或回滾。

Seata管理的分佈式事務的典型生命週期:sql

  1. TM要求TC開始一項新的全局事務。TC生成表明全局事務的XID。
  2. XID經過微服務的調用鏈傳播。
  3. RM將本地事務註冊爲XID到TC的相應全局事務的分支。
  4. TM要求TC提交或回退相應的XID全局事務。
  5. TC驅動XID的相應全局事務下的全部分支事務以完成分支提交或回滾。

快速開始

用例

用戶購買商品的業務邏輯。整個業務邏輯由3個微服務提供支持:docker

  • 倉儲服務:對給定的商品扣除倉儲數量。
  • 訂單服務:根據採購需求建立訂單。
  • 賬戶服務:從用戶賬戶中扣除餘額。

環境準備

步驟 1:創建數據庫

# db_seata
DROP SCHEMA IF EXISTS db_seata;
CREATE SCHEMA db_seata;
USE db_seata;

# Account
CREATE TABLE `account_tbl` (
  `id` INT(11) NOT NULL AUTO_INCREMENT,
  `user_id` VARCHAR(255) DEFAULT NULL,
  `money` INT(11) DEFAULT 0,
  PRIMARY KEY (`id`)
) ENGINE = InnoDB DEFAULT CHARSET = utf8;

INSERT INTO account_tbl (id, user_id, money)
VALUES (1, '1001', 10000);
INSERT INTO account_tbl (id, user_id, money)
VALUES (2, '1002', 10000);

# Order
CREATE TABLE `order_tbl`
(
  `id` INT(11) NOT NULL AUTO_INCREMENT,
  `user_id` VARCHAR(255) DEFAULT NULL,
  `commodity_code` VARCHAR(255) DEFAULT NULL,
  `count` INT(11) DEFAULT '0',
  `money` INT(11) DEFAULT '0',
  PRIMARY KEY (`id`)
) ENGINE = InnoDB DEFAULT CHARSET = utf8;

# Storage
CREATE TABLE `storage_tbl` (
  `id` INT(11) NOT NULL AUTO_INCREMENT,
  `commodity_code` VARCHAR(255) DEFAULT NULL,
  `count` INT(11) DEFAULT '0',
  PRIMARY KEY (`id`),
  UNIQUE KEY `commodity_code` (`commodity_code`)
) ENGINE = InnoDB DEFAULT CHARSET = utf8;


INSERT INTO storage_tbl (id, commodity_code, count)
VALUES (1, '2001', 1000);

CREATE TABLE `undo_log` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `branch_id` bigint(20) NOT NULL,
  `xid` varchar(100) NOT NULL,
  `context` varchar(128) NOT NULL,
  `rollback_info` longblob NOT NULL,
  `log_status` int(11) NOT NULL,
  `log_created` datetime NOT NULL,
  `log_modified` datetime NOT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

seata AT 模式須要 undo_log 表,另外三張是業務表。數據庫

步驟 2: 啓動 Seata Server

Server端存儲模式(store.mode)現有file、db兩種(後續將引入raft),file模式無需改動,直接啓動便可。db模式須要導入用於存儲全局事務回話信息的三張表。api

*注:file模式爲單機模式,全局事務會話信息內存中讀寫並持久化本地文件root.data,性能較高;
db模式爲高可用模式,全局事務會話信息經過db共享,相應性能差些*

能夠直接經過bash 腳本啓動 Seata Server,也能夠經過 Docker 鏡像啓動,可是 Docker 方式目前只支持使用 file 模式,不支持將 Seata-Server 註冊到 Eureka 或 Nacos 等註冊中心。

經過腳本啓動

https://github.com/seata/seat... 下載相應版本的 Seata Server,解壓後執行如下命令啓動,這裏使用 file 配置

經過 Docker 啓動
docker run --name seata-server -p 8091:8091 seataio/seata-server:latest

項目介紹

項目名 地址 說明
sbm-account-service 127.0.0.1:8081 帳戶服務
sbm-order-service 127.0.0.1:8082 訂單服務
sbm-storage-service 127.0.0.1:8083 倉儲服務
sbm-business-service 127.0.0.1:8084 主業務
seata-server 172.16.2.101:8091 seata-server

核心代碼

爲了避免讓篇幅太長,這裏只給出部分代碼,詳細代碼文末會給出源碼地址

maven 引入 seata 的依賴 eata-spring-boot-starter

<dependency>
    <groupId>io.seata</groupId>
    <artifactId>seata-spring-boot-starter</artifactId>
    <version>1.0.0</version>
</dependency>

倉儲服務

application.properties
spring.application.name=account-service
server.port=8081
spring.datasource.url=jdbc:mysql://172.16.2.101:3306/db_seata?useSSL=false&serverTimezone=UTC
spring.datasource.username=root
spring.datasource.password=123456
seata.tx-service-group=my_test_tx_group
mybatis.mapper-locations=classpath*:mapper/*Mapper.xml
seata.service.grouplist=172.16.2.101:8091
logging.level.io.seata=info
logging.level.io.seata.samples.account.persistence.AccountMapper=debug
StorageService
public interface StorageService {

    /**
     * 扣除存儲數量
     */
    void deduct(String commodityCode, int count);
}

訂單服務

public interface OrderService {

    /**
     * 建立訂單
     */
    Order create(String userId, String commodityCode, int orderCount);
}

賬戶服務

public interface AccountService {

    /**
     * 從用戶帳戶中借出
     */
    void debit(String userId, int money);
}

主要業務邏輯

只須要使用一個 @GlobalTransactional 註解在業務方法上。

@GlobalTransactional
public void purchase(String userId, String commodityCode, int orderCount) {
    LOGGER.info("purchase begin ... xid: " + RootContext.getXID());
    storageClient.deduct(commodityCode, orderCount);
    orderClient.create(userId, commodityCode, orderCount);
}

XID 的傳遞

全局事務ID的跨服務傳遞,須要咱們本身實現,這裏經過攔截器的方式。每一個服務都須要添加下面兩個類。

SeataFilter
@Component
public class SeataFilter implements Filter {
    @Override
    public void init(FilterConfig filterConfig) throws ServletException {
    }

    @Override
    public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
        HttpServletRequest req = (HttpServletRequest) servletRequest;
        String xid = req.getHeader(RootContext.KEY_XID.toLowerCase());
        boolean isBind = false;
        if (StringUtils.isNotBlank(xid)) {
            RootContext.bind(xid);
            isBind = true;
        }
        try {
            filterChain.doFilter(servletRequest, servletResponse);
        } finally {
            if (isBind) {
                RootContext.unbind();
            }
        }
    }

    @Override
    public void destroy() {
    }
}
SeataRestTemplateAutoConfiguration
@Configuration
public class SeataRestTemplateAutoConfiguration {
    @Autowired(
            required = false
    )
    private Collection<RestTemplate> restTemplates;
    @Autowired
    private SeataRestTemplateInterceptor seataRestTemplateInterceptor;

    public SeataRestTemplateAutoConfiguration() {
    }

    @Bean
    public SeataRestTemplateInterceptor seataRestTemplateInterceptor() {
        return new SeataRestTemplateInterceptor();
    }

    @PostConstruct
    public void init() {
        if (this.restTemplates != null) {
            Iterator var1 = this.restTemplates.iterator();

            while (var1.hasNext()) {
                RestTemplate restTemplate = (RestTemplate) var1.next();
                List<ClientHttpRequestInterceptor> interceptors = new ArrayList(restTemplate.getInterceptors());
                interceptors.add(this.seataRestTemplateInterceptor);
                restTemplate.setInterceptors(interceptors);
            }
        }

    }
}

測試

測試成功場景:

curl -X POST http://127.0.0.1:8084/api/business/purchase/commit

此時返回結果爲:true

測試失敗場景:

UserId 爲1002 的用戶下單,sbm-account-service會拋出異常,事務會回滾

http://127.0.0.1:8084/api/business/purchase/rollback

此時返回結果爲:false

查看 undo_log 的日誌或者主鍵,能夠看到在執行過程當中有保存數據。
如查看主鍵自增的值,在執行先後的值會發生變化,在執行前是 1,執行後是 7 。

源碼地址

https://github.com/gf-huanchu...

參考

http://seata.io/zh-cn/docs/ov...

關注我

相關文章
相關標籤/搜索