Dubbo學習系列之十四(Seata分佈式事務方案AT模式)

一直說寫有關最新技術的文章,但前面彷佛都有點偏了,只能說算主流技術,今天這個主題,我以爲應該名副其實。分佈式微服務的深水區並非單個微服務的設計,而是服務間的數據一致性問題!解決了這個問題,纔算是把分佈式正式收編了!但分佈式事務解決方案並無統一的標準,只能說根據業務特色來適配,有實時的,非實時的,同步或異步的,以前已經實現了異步MQ的分佈式事務方案,今天來看看Seata方案,自19年初才推出,還幾易其名,目前還不算特別完善,但其光環太耀眼,做爲一名IT人,仍是有必要來瞧一瞧的。單說Seata,就有AT、TCC、Saga和XA模式,看來是盤大菜。java

**工具:**node

Idea201902/JDK11/Gradle5.6.2/Mysql8.0.11/Lombok0.26/Postman7.5.0/SpringBoot2.1.9/Nacos1.1.3/Seata0.8.1/SeataServer0.8.1/Dubbo2.7.3linux

**難度:**
新手--戰士--老兵--大師git

**目標:**github

1.多模塊微服務Dubbo框架整合Seata實現分佈式事務的AT模式spring

2.使用Seata實現訂單模塊與其餘模塊的關聯型事務的TCC模式
***sql

**步驟:**數據庫

**爲了更好的遇到各類問題,同時保持時效性,我儘可能使用最新的軟件版本。代碼地址:其中的day17,https://github.com/xiexiaobiao/dubbo-project.git**apache

文中圖片有些顯示不全,是圖片很大,我擔憂縮放會看不清,因此部分顯示不全的,能夠下載圖片再看。api

1.先照搬來點背景材料,分佈式事務典型場景以下圖,一個business主事務發起多個分支事務,並須要保證一致的commit或rollback:

 

 

Seata框架,有三個模塊,分別是

  • - TM-TransactionManager事務管理器:定義全局事務的範圍,開啓、提交或回滾全局事務;
  • - RM -ResourceManager資源管理器:管理分支事務的資源,註冊分支事務到TC,與TC通訊反饋分支事務狀態,驅動分支事務提交或回滾;
  • - TC-TransactionCoordinator事務協調器:維護全局和分支事務,驅動全局提交或回滾;

 分佈式事務流程:

I. TM 請求TC 發起一個全局事務,同時TC生成一個 XID做爲全局事務ID.

II. XID將分發給事務調用鏈上的全部微服務.

III. RM響應全局事務XID向TC註冊本地分支事務.

IV. TM向TC發出提交或回滾全局事務XID的請求.

V. TC響應全局事務XID,驅動全部分支事務提交或 回滾本地分支事務.

其中 TM 和 RM 是做爲 Seata 的客戶端與業務系統集成在一塊兒,TC 做爲 Seata 的服務端獨立部署。

再說seata的AT模式:AT 模式是一種無侵入的分佈式事務解決方案。在 AT 模式下,用戶只需關注本身的「業務 SQL」,用戶的 「業務 SQL」 做爲一階段,Seata 框架會自動生成事務的二階段提交和回滾操做。

  • - 在一階段,Seata 會攔截「業務 SQL」,首先解析 SQL 語義,找到「業務 SQL」要更新的業務數據,在業務數據被更新前,將其保存成「before image」,而後執行「業務 SQL」更新業務數據,在業務數據更新以後,再將其保存成「after image」,最後生成行鎖。以上操做所有在一個數據庫事務內完成,這樣保證了一階段操做的原子性。

  

  • - 二階段若是是提交的話,由於「業務 SQL」在一階段已經提交至數據庫, 因此 Seata 框架只需將一階段保存的快照數據和行鎖刪掉,完成數據清理便可。

  

  • - 二階段若是是回滾的話,Seata 就須要回滾一階段已經執行的「業務 SQL」,還原業務數據。回滾方式即是用「before image」還原業務數據;但在還原前要首先要校驗髒寫,對比「數據庫當前業務數據」和 「after image」,若是兩份數據徹底一致就說明沒有髒寫,能夠還原業務數據,若是不一致就說明有髒寫,出現髒寫就須要轉人工處理。

 

 2.爲了單一化技術點,我直接新建一個gradle項目,以官方例子爲原型作抽取製做,模擬電商業務,總體架構爲多模塊微服務Dubbo框架,創建5個module,common爲公共模塊,account爲用戶帳戶處理,order爲訂單處理,storage爲庫存處理,business爲業務處理,總體的處理邏輯爲第一圖。

 

3.在build.gradle中引入依賴,強烈建議邊寫代碼邊逐步引入,好比使用到druid才加入druid的依賴,這樣才能知道每一個依賴的做用和用法。

 

4.建表,項目文件中已有SQL.script,幾個業務模塊的對應的表,比較簡單,略。重點關注下undo_log,此表爲MQ存儲事務執行先後的日誌表,爲**AT模式所必須**,用於事務提交和回滾,其中最關鍵字段即xid(全局事務ID)和branch_id(分支事務ID)。另外,我將各模塊DB獨立,是爲了模擬分佈式DB環境。

 

5.使用common模塊的mbg快速生成各模塊的Entity、Service、Impl、Mapper、Dao和Controller,可參考往期文章《》。注意每次生成時,需修改配置。

 

6.common模塊:放公共的對象,如全局Enum,Exception,Dto等,還有Dubbo的接口。

 

7.storage模塊:`com.biao.mall.storage.conf.SeataAutoConfig`進行Seata配置:

  • - 先經過SpringBoot自動取得DataSourceProperties,並獲取JDBC的鏈接信息;
  • - 注入Druid鏈接池對象,並對DruidDataSource作屬性設置,如線程池參數,超時參數等;
  • - 注入RM的DataSourceProxy代理,來代理DruidDataSource;
  • - 初始化Mybatis的SqlSessionFactory,這裏使用的是DataSourceProxy實參,並將Mapper文件加入,映射Entity和Table;
  • - 分支經過GlobalTransactionScanner來掃描XID,並註冊本地事務;
@Configuration
public class SeataAutoConfig {

    private DataSourceProperties dataSourceProperties;

    @Autowired
    public SeataAutoConfig(DataSourceProperties dataSourceProperties){
        this.dataSourceProperties = dataSourceProperties;
    }

    /**
     * init durid datasource
     * @Return: druidDataSource  datasource instance
     */
    @Bean
    @Primary
    public DruidDataSource druidDataSource(){
        DruidDataSource druidDataSource = new DruidDataSource();
        druidDataSource.setUrl(dataSourceProperties.getUrl());
        druidDataSource.setUsername(dataSourceProperties.getUsername());
        druidDataSource.setPassword(dataSourceProperties.getPassword());
        druidDataSource.setDriverClassName(dataSourceProperties.getDriverClassName());
        druidDataSource.setInitialSize(0);
        druidDataSource.setMaxActive(180);
        druidDataSource.setMaxWait(60000);
        druidDataSource.setMinIdle(0);
        druidDataSource.setValidationQuery("Select 1 from DUAL");
        druidDataSource.setTestOnBorrow(false);
        druidDataSource.setTestOnReturn(false);
        druidDataSource.setTestWhileIdle(true);
        druidDataSource.setTimeBetweenEvictionRunsMillis(60000);
        druidDataSource.setMinEvictableIdleTimeMillis(25200000);
        druidDataSource.setRemoveAbandoned(true);
        druidDataSource.setRemoveAbandonedTimeout(1800);
        druidDataSource.setLogAbandoned(true);
        return druidDataSource;
    }

    /**
     * init datasource proxy
     * @Param: druidDataSource  datasource bean instance
     * @Return: DataSourceProxy  datasource proxy
     */
    @Bean
    public DataSourceProxy dataSourceProxy(DruidDataSource druidDataSource){
        return new DataSourceProxy(druidDataSource);
    }

    /**
     * init mybatis sqlSessionFactory
     * @Param: dataSourceProxy  datasource proxy
     * @Return: DataSourceProxy  datasource proxy
     */
    @Bean
    public SqlSessionFactory sqlSessionFactory(DataSourceProxy dataSourceProxy) throws Exception {
        SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();
        factoryBean.setDataSource(dataSourceProxy);
        factoryBean.setMapperLocations(new PathMatchingResourcePatternResolver()
                .getResources("classpath:/mapper/*Mapper.xml"));
        factoryBean.setTransactionFactory(new JdbcTransactionFactory());
        return factoryBean.getObject();
    }

    /**
     * init global transaction scanner
     * @Return: GlobalTransactionScanner
     */
    @Bean
    public GlobalTransactionScanner globalTransactionScanner(){
        return new GlobalTransactionScanner("${spring.application.name}", "my_test_tx_group");
    }
}

 

com.biao.mall.storage.dubbo.StorageDubboServiceImpl:Dubbo微服務storage的具體實現,@Service註解爲com.apache.dubbo.config.annotation.Service,將該服務註冊到註冊中心,本項目註冊中心使用Nacos,不是ZK。

@Service(version = "1.0.0",protocol = "${dubbo.protocol.id}",
        application = "${dubbo.application.id}",registry = "${dubbo.registry.id}")
public class StorageDubboServiceImpl implements StorageDubboService {

    @Autowired
    private ProductService  productService;

    @Override
    public ObjectResponse decreaseStorage(CommodityDTO commodityDTO) {
        System.out.println("全局事務id :" + RootContext.getXID());
        return productService.decreaseStorage(commodityDTO);
    }
}

另外注意, `com.biao.mall.storage.impl.ProductServiceImpl`中,這裏的本地方法,並不須要@Transactional註解。

 

8.account模塊和order模塊和storage模塊相似,只是order模塊中`com.biao.mall.order.impl.OrdersServiceImpl`多了一個經過@Reference調用account服務的註解,其餘,略。

 

9.business模塊:SeataAutoConfig中因無本地事務,只需一個GlobalTransactionScanner,BusinessServiceImpl中:

  • - 類註解@Service是Spring的註解;
  • - 經過@Reference從註冊中心獲取storage和order服務;
  • - handleBusiness方法上經過@GlobalTransactional發起全局事務,方法內就是具體使用storage和order服務;
@Service
public class BusinessServiceImpl implements BusinessService {

    @Reference(version = "1.0.0")
    private StorageDubboService storageDubboService;

    @Reference(version = "1.0.0")
    private OrderDubboService orderDubboService;

    private boolean flag;

    @Override
    @GlobalTransactional(timeoutMills = 30000,name = "dubbo-seata-at-springboot")
    public ObjectResponse handleBusiness(BusinessDTO businessDTO) {
        System.out.println("開始全局事務,XID = " + RootContext.getXID());
        ObjectResponse<Object> objectResponse = new ObjectResponse<>();
        //1,減庫存
        CommodityDTO commodityDTO = new CommodityDTO();
        commodityDTO.setCommodityCode(businessDTO.getCommodityCode());
        commodityDTO.setCount(businessDTO.getCount());
        ObjectResponse storageResponse = storageDubboService.decreaseStorage(commodityDTO);
        //2,建立訂單
        OrderDTO orderDTO = new OrderDTO();
        orderDTO.setUserId(businessDTO.getUserId());
        orderDTO.setCommodityCode(businessDTO.getCommodityCode());
        orderDTO.setOrderCount(businessDTO.getCount());
        orderDTO.setOrderAmount(businessDTO.getAmount());
        ObjectResponse<OrderDTO> response = orderDubboService.createOrder(orderDTO);

        //打開註釋測試事務發生異常後,全局回滾功能
//        if (!flag) {
//            throw new RuntimeException("測試拋異常後,分佈式事務回滾!");
//        }
        if (storageResponse.getStatus() != 200 || response.getStatus() != 200) {
            throw new DefaultException(RspStatusEnum.FAIL);
        }

        objectResponse.setStatus(RspStatusEnum.SUCCESS.getCode());
        objectResponse.setMessage(RspStatusEnum.SUCCESS.getMessage());
        objectResponse.setData(response.getData());
        return objectResponse;
    }
} 

 

10.寫個BusinessController的方法,用於測試:

    @PostMapping("/buy")
    ObjectResponse handleBusiness(@RequestBody BusinessDTO businessDTO){
        LOGGER.info("請求參數:{}",businessDTO.toString());
        return businessService.handleBusiness(businessDTO);
    }

 

11.下載安裝TC ,即 Seata 的服務端,須要獨立部署運行,下載地址:https://github.com/seata/seata/releases,解壓,支持window和linux下直接啓動運行,以下linux命令,運行參數將指定port、host和imageFile的存儲方式:

sh seata-server.sh -p 8091 -h 127.0.0.1 -m file

 

12.測試,按順序啓動:Nacos-->SeataServer-->account-->order-->storage-->business ,啓動後的效果。

Nacos註冊的服務信息,注意Dubbo是區分provider和consumer的,這是不一樣於SpringCloud的地方,因此同一服務不一樣身份就有兩個了:

能夠看到各RM向TC註冊的信息:

 

Postman提交至Controller:


提交運行後,一階段更新DB,二階段只需釋放鎖:

 

數據庫狀況:

 

13.回滾測試:將`com.biao.mall.business.service.BusinessServiceImpl`中回滾測試代碼註釋去掉!手動拋出異常,再次Postman提交,可見:

  • - business先開啓了全局事務,並傳播了XID,二階段向TC提交rollback狀態;
  • - order中,能夠看到分支事務是一階段提交了,異常後,二階段根據XID作了rollback;
  • - order有SQL信息,是application.yml中設置了logging.level爲debug;

- 數據庫信息不變,貼圖,略;

 

14.測試undo_log表用途:
`com.biao.mall.business.service.BusinessServiceImpl`加個斷點:

 

其餘模塊正常啓動,postman提交:

 

看undo_log表,這裏只是個臨時的數據,二階段後會刪除:

  

***
覆盤記: 

1.Seata只能支持RPC模式的事務,對MQ模式的分佈式事務不能實施,比較好的搭配是Dubbo+Seata。

2.啓動應用向SeataServer註冊,不必定能一次成功,有時要嘗試屢次,可見穩定性通常!

3.依賴衝突問題:報錯提示:`Class path contains multiple SLF4J bindings`,因其來自於如下兩個jar,
`logback-classic-1.2.3.jar!/org/slf4j/impl/StaticLoggerBinder.class`
,`slf4j-nop-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class`
因爲logback是主流,不排除,直接去掉`slf4j-nop`依賴,問題解決!

4.報錯:`NoSuchMethodError:org.yaml.snakeyaml.nodes.ScalarNode.getScalarStyle`,**特別注意**這種狀況不少時候也是依賴衝突,而不是缺乏類,處理方法:

  a.先百度,須要加入snakeyaml依賴,結果仍是報錯,

  b.再先全局搜索,雙擊shift鍵,查找`ScalarNode`類,發現出如今兩個地方,估計衝突了,

  c.在Idea中使用依賴分析命令,`order`爲module名,`snakeyaml`爲依賴名:

  `gradle :order:dependencyInsight --dependency snakeyaml`

發現有多方引入的狀況,結果是dubbo自己也使用了snakeyaml,直接在dubbo依賴中使用exclude語法排除,問題解決!

5.報錯:`NoSuchBeanDefinitionException: No qualifying bean of type 'com.biao.mall.order.dao.OrdersDao' available`:
表面上看是Mapper類無Bean實例,肯定加了@Mapper和@Repository註解,仍是錯誤!想到既然是缺乏注入的Bean,多是缺乏mybatis-plus依賴致使,添加`mybatis-plus-boot-starter`,問題解決!

6.報錯:`io.seata.common.exception.NotSupportYetException: not support register type: null`,需添加 registry.conf 和 file.conf。

7.seata server安裝和啓動方法:
https://github.com/seata/seata/wiki/Quick-Start

8.報錯:`com.alibaba.nacos.api.exception.NacosException: java.lang.ClassNotFoundException`,添加Nacos相關依賴 dubbo-registry-nacos/spring-context-support/nacos-api/nacos-client。

9.dubbo的service是明顯區分consumer和provider的,若是使用Nacos作註冊中心,能夠經過detail查看其服務角色,還有其提供的方法。

10.`com.biao.mall.storage.conf.SeataAutoConfig`中設置Mapper路徑,需使用`getResources("classpath:/mapper/*Mapper.xml"))`;不可以使用`getResources("${mybatis.mapper-locations}"))`配置方式,
會告警:`Property 'mapperLocations' was specified but matching resources are not found`,最後致使Mapper文件沒法加載,Dao方法讀取失敗,應用運行會異常,我估計是Bean加載順序問題,但沒有驗證,sorry。

11.本文參考文章地址:https://www.sofastack.tech/blog/seata-distributed-transaction-deep-dive/

***
推薦閱讀:

 Dubbo學習系列之十三(Mycat數據庫代理)

相關文章
相關標籤/搜索