分佈式事務框架seata落地實踐

前言

seata是阿里巴巴研發的一套開源分佈式事務框架,提供了AT、TCC、SAGA 和 XA 幾種事務模式。本文以精品課項目組的物流後臺服務爲例,介紹seata框架落地的過程,遇到的問題以及解決方案。java

做者/ 鄧新偉mysql

編輯/ 網易有道redis

有道精品課教務系統是基於springcloud的分佈式集羣服務。在實際業務中,存在許多分佈式事務場景。然而傳統的事務框架是沒法實現全局事務的。長期以來,咱們的分佈式場景的一致性,每每指的是放棄強一致性,保證最終一致性。spring

咱們從調研中發現,seata框架既能夠知足業務需求,靈活兼容多種事務模式,又能夠實現數據強一致性。sql

本文以物流業務爲例,記錄了在實際業務中落地seata框架落地的過程當中遇到的一些問題以及解決方案,供你們學習討論~歡迎你們在留言區討論交流數據庫

1. 基礎信息

  • seata版本:1.4
  • 微服務框架:springcloud
  • 註冊中心:consul

2.基本框架

2.1 基本組件

seata框架分爲3個組件:segmentfault

  • TC (Transaction Coordinator) -事務協調者 (即seata-server)

維護全局和分支事務的狀態,驅動全局事務提交或回滾。api

  • TM (Transaction Manager) -事務管理器 (在client上,發起事務的服務)

定義全局事務的範圍:開始全局事務、提交或回滾全局事務。架構

  • RM (Resource Manager) - 資源管理器 (在client)

管理分支事務處理的資源,與TC交談以註冊分支事務和報告分支事務的狀態,並驅動分支事務提交或回滾oracle

2.2. 部署seata-server(TC)

在官網下載 seata 服務端,解壓後執行bin/seata-server.sh便可啓動。

seata-server 有2個配置文件:registry.conf 與 file.conf。而 registry.conf 文件決定了 seata-server 使用的註冊中心配置和配置信息獲取方式。

咱們使用 consul 作註冊中心,所以須要在registry.conf文件中,須要修改如下配置:

registry {
  #file 、nacos 、eureka、redis、zk、consul、etcd三、sofa
  type = "consul" ## 這裏註冊中心填consul
  loadBalance = "RandomLoadBalance"
  loadBalanceVirtualNodes = 10
   ... ...
  consul {
    cluster = "seata-server"
    serverAddr = "***註冊中心地址***"
    #這裏的dc指的是datacenter,若consul爲多數據源配置須要在請求中加入dc參數。
    #dc與namespace並不是是seata框架自帶的,文章後面將會進一步解釋
    dc="bj-th"
    namespace="seata-courseop"
  }
  ... ...
}

config {
  # file、nacos 、apollo、zk、consul、etcd3
  ## 若是啓動時從註冊中心獲取基礎配置信息,填consul
  ## 不然從file.conf文件中獲取
  type = "consul"
  consul {
    serverAddr = "127.0.0.1:8500"
  }
... ...
}

其中須要注意的是,若是須要高可用部署,seata獲取配置信息的方式就必須是註冊中心,此時file.conf就沒用了。

(固然,須要事先把file.conf文件中的配置信息遷移到consul中)

store {
  ## store mode: file、db、redis
  mode = "db"

... ...
  ## database store property
  ## 若是使用數據庫模式,須要配置數據庫鏈接設置
  db {
    ## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp)/HikariDataSource(hikari) etc.
    datasource = "druid"
    ## mysql/oracle/postgresql/h2/oceanbase etc.
    dbType = "mysql"
    driverClassName = "com.mysql.jdbc.Driver"
    url = "jdbc:mysql://***線上數據庫地址***/seata"
    user = "******"
    password = "******"
    minConn = 5
    maxConn = 100
    ## 這裏的三張表須要提早在數據庫建好
    globalTable = "global_table"
    branchTable = "branch_table"
    lockTable = "lock_table"
    queryLimit = 100
    maxWait = 5000
  }
... ...
}

service {
  #vgroup->rgroup
  vgroupMapping.tx-seata="seata-server"
  default.grouplist="127.0.0.1:8091"
  #degrade current not support
  enableDegrade = false
  #disable
  disable = false
  max.commit.retry.timeout = "-1"
  max.rollback.retry.timeout = "-1"
}

其中,global_tablebranch_tablelock_table三張表須要提早在數據庫中建好。

2.3 配置client端(RM與TM)

每一個使用seata框架的服務都須要引入seata組件

dependencies {

    api 'com.alibaba:druid-spring-boot-starter:1.1.10'
    api 'mysql:mysql-connector-java:6.0.6'
    api('com.alibaba.cloud:spring-cloud-alibaba-seata:2.1.0.RELEASE') {
        exclude group:'io.seata', module:'seata-all'
    }
    api 'com.ecwid.consul:consul-api:1.4.5'
    api 'io.seata:seata-all:1.4.0'
}

每一個服務都一樣須要配置file.conf與registry.conf文件,放在resource目錄下。registry.conf與server的保持一致。在file.conf文件中,除了db配置外,還須要進行client參數的配置:

client {
  rm {
    asyncCommitBufferLimit = 10000
    lock {
      retryInterval = 10
      retryTimes = 30
      retryPolicyBranchRollbackOnConflict = true
    }
    reportRetryCount = 5
    tableMetaCheckEnable = false
    reportSuccessEnable = false
  }
  tm {
    commitRetryCount = 5
    rollbackRetryCount = 5
  }
  undo {
    dataValidation = true
    logSerialization = "jackson"
    ## 這個undo_log也須要提早在mysql中建立
    logTable = "undo_log"
  }
  log {
    exceptionRate = 100
  }
}

在application.yml文件中添加seata配置:

spring:
  cloud:
      seata: ## 注意tx-seata須要與服務端和客戶端的配置文件保持一致
        tx-service-group: tx-seata

另外,還須要替換項目的數據源,

@Primary
    @Bean("dataSource")
    public DataSource druidDataSource(){
        DruidDataSource druidDataSource = new DruidDataSource();
        druidDataSource.setUrl(url);
        druidDataSource.setUsername(username);
        druidDataSource.setPassword(password);
        druidDataSource.setDriverClassName(driverClassName);
        return new DataSourceProxy(druidDataSource);
    }

至此,client端的配置也已經完成了。

3. 功能演示

一個分佈式的全局事務,總體是兩階段提交的模型。

全局事務是由若干分支事務組成的,

分支事務要知足兩階段提交的模型要求,即須要每一個分支事務都具有本身的:

  • 一階段 prepare 行爲
  • 二階段commit 或 rollback 行爲

根據兩階段行爲模式的不一樣,咱們將分支事務劃分爲 Automatic (Branch) Transaction ModeTCC (Branch) Transaction Mode.

3.1 AT模式

AT 模式基於支持本地ACID事務的關係型數據庫:

  • 一階段 prepare 行爲:在本地事務中,一併提交業務數據更新和相應回滾日誌記錄。
  • 二階段 commit 行爲:立刻成功結束,自動異步批量清理回滾日誌。
  • 二階段 rollback 行爲:經過回滾日誌,自動生成補償操做,完成數據回滾
直接在須要添加全局事務的方法中加上註解@GlobalTransactional
@SneakyThrows
    @GlobalTransactional
    @Transactional(rollbackFor = Exception.class)
    public void buy(int id, int itemId){
        // 先生成訂單
        Order order = orderFeignDao.create(id, itemId);
        // 根據訂單扣減帳戶餘額
        accountFeignDao.draw(id, order.amount);
    }
注意:同@Transactional同樣,@GlobalTransactional若要生效也要知足:
  • 目標函數必須爲public類型
  • 同一類內方法調用時,調用目標函數的方法必須經過springBeanName.method的形式來調用,不能使用this直接調用內部方法

3.2TCC模式

TCC 模式是支持把自定義的分支事務歸入到全局事務的管理中。

  • 一階段 prepare 行爲:調用自定義的 prepare 邏輯。
  • 二階段 commit 行爲:調用自定義的 commit 邏輯。
  • 二階段 rollback 行爲:調用自定義的 rollback 邏輯。
首先編寫一個TCC服務接口:
@LocalTCC
public interface BusinessAction {
    @TwoPhaseBusinessAction(name = "doBusiness", commitMethod = "commit", rollbackMethod = "rollback")
    boolean doBusiness(BusinessActionContext businessActionContext,
                       @BusinessActionContextParameter(paramName = "message") String msg);

    boolean commit(BusinessActionContext businessActionContext);

    boolean rollback(BusinessActionContext businessActionContext);
}
其中,BusinessActionContext爲全局事務上下文,能夠今後對象中獲取全局事務相關信息(若是是發起全局事務方,傳入null後自動生成),而後實現該接口:
@Slf4j
@Service
public class BusinessActionImpl implements BusinessAction {

    @Transactional(rollbackFor = Exception.class)
    @Override
    public boolean doBusiness(BusinessActionContext businessActionContext, String msg) {
        log.info("準備do business:{}",msg);
        return true;
    }

    @Transactional(rollbackFor = Exception.class)
    @Override
    public boolean commit(BusinessActionContext businessActionContext) {
        log.info("business已經commit");
        return true;
    }

    @Transactional(rollbackFor = Exception.class)
    @Override
    public boolean rollback(BusinessActionContext businessActionContext) {
        log.info("business已經rollback");
        return true;
    }
}
最後,開啓全局事務方法同AT模式。
@SneakyThrows
    @GlobalTransactional
    public void doBusiness(BusinessActionContext context, String msg){
        accountFeignDao.draw(3, new BigDecimal(100));
        businessAction.doBusiness(context, msg);
    }

4. 遇到的問題

4.1 client TM/RM 沒法註冊到TC

在部署seata項目時經常會遇到這樣的問題:在本地調試時一切正常,可是當試圖部署到線上時,老是在clinet端提示註冊TC端失敗。

  • 這是由於client須要先經過服務發現,找到註冊中內心seata-server的服務信息,而後再與seata-server創建鏈接。不過線上的consul採用了多數據中心模式,在調用consul api時,必須加上dc參數項,不然將沒法返回正確的服務信息;然而,seata提供的consul服務發現組件彷佛並不支持dc參數的配置。
  • 還有一個緣由也會致使client沒法鏈接到TC:seata的consul客戶端在調用服務狀態監控api時,使用了wait與index參數,從而使consul查詢進入了阻塞查詢模式。此時client對consul中要查詢的key作監聽,只有當key發生變化或者達到最大請求時間時,纔會返回結果。貌似因爲consul版本的問題,這個阻塞查詢並無監聽到key的變化,反而會讓服務發現的線程陷入無限等待之中,天然也就沒法讓client獲取到server的註冊信息了。

4.2 高可用部署

seata服務的高可用部署只支持註冊中心模式。所以,咱們須要想辦法將file.conf文件以鍵值對的形式存到consul中。

遺憾的是,consul並無顯式支持namespace,咱們只能在put請求中用「/」爲分隔符起到相似的效果。固然,seata框架也沒有考慮到這一點。因此咱們須要修改源碼中的Configuration接口與RegistryProvider接口的consul實現類,增長namespace屬性

4.3global_log與branch_log

TC在想mysql插入日誌數據時,偶爾會報:

Caused by: java.sql.SQLException: Incorrect string value:

application_data字段其實就是對業務數據的記錄。官方給出的建表語句是這樣的:

CREATE TABLE IF NOT EXISTS `global_table`
(
    `xid`                       VARCHAR(128) NOT NULL,
    `transaction_id`            BIGINT,
    `status`                    TINYINT      NOT NULL,
    `application_id`            VARCHAR(32),
    `transaction_service_group` VARCHAR(32),
    `transaction_name`          VARCHAR(128),
    `timeout`                   INT,
    `begin_time`                BIGINT,
    `application_data`          VARCHAR(2000),
    `gmt_create`                DATETIME,
    `gmt_modified`              DATETIME,
    PRIMARY KEY (`xid`),
    KEY `idx_gmt_modified_status` (`gmt_modified`, `status`),
    KEY `idx_transaction_id` (`transaction_id`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8;

顯然,VARCHAR(2000)的大小是不合適的, utf8的格式也是不合適的。因此咱們須要修改seata關於數據源鏈接的部分代碼:

// connectionInitSql設置
    protected Set<String> getConnectionInitSqls(){
        Set<String> set = new HashSet<>();
        String connectionInitSqls = CONFIG.getConfig(ConfigurationKeys.STORE_DB_CONNECTION_INIT_SQLS);
        if(StringUtils.isNotEmpty(connectionInitSqls)) {
            String[] strs = connectionInitSqls.split(",");
            for(String s:strs){
                set.add(s);
            }
        }
        // 默認支持utf8mb4
        set.add("set names utf8mb4");
        return set;
    }

5. 自定義開發

5.1 利用SPI機制編寫自定義組件

seata基於java的spi機制提供了自定義實現接口的功能,咱們只須要在本身的服務中,根據seata的接口寫好本身的實現類便可。

SPI(Service Provider Interface)是JDK內置的服務發現機制,用在不一樣模塊間經過接口調用服務,避免對具體服務服務接口具體實現類的耦合。好比JDBC的數據庫驅動模塊,不一樣數據庫鏈接驅動接口相同但實現類不一樣,在使用SPI機制之前調用驅動代碼須要直接在類裏採用Class.forName(具體實現類全名)的方式調用,這樣調用方依賴了具體的驅動實現,在替換驅動實現時要修改代碼。

ConsulRegistryProvider爲例:

  • ConsulRegistryServiceImpl

    // 增長DC和namespace
      private static String NAMESPACE;
      private static String DC;
    
      private ConsulConfiguration() {
          Config registryCongig = ConfigFactory.parseResources("registry.conf");
          NAMESPACE = registryCongig.getString("config.consul.namespace");
          DC = CommonSeataConfiguration.getDatacenter();
          consulNotifierExecutor = new ThreadPoolExecutor(THREAD_POOL_NUM, THREAD_POOL_NUM, Integer.MAX_VALUE,
                  TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(),
                  new NamedThreadFactory("consul-config-executor", THREAD_POOL_NUM));
      }
      ... ...
    // 同時在getHealthyServices中,刪除請求參數wait&index    
      /**
       * get healthy services
       *
       * @param service
       * @return
       */
      private Response<List<HealthService>> getHealthyServices(String service, long index, long watchTimeout) {
          return getConsulClient().getHealthServices(service, HealthServicesRequest.newBuilder()
                  .setTag(SERVICE_TAG)
                  .setDatacenter(DC)
                  .setPassing(true)
                  .build());
      }
  • ConsulRegistryProvider 注意order要大於seata包中的默認值1,seata類加載器會優先加載order更大的實現類

    @LoadLevel(name = "Consul" ,order = 2)
    public class ConsulRegistryProvider implements RegistryProvider {
     @Override
     public RegistryService provide() {
         return ConsulRegistryServiceImpl.getInstance();
     }
    }
  • 而後在META-INF 的services目錄下添加:io.seata.discovery.registry.RegistryProvider

    com.youdao.ke.courseop.common.seata.ConsulRegistryProvider

    這樣就能夠替換seata包中的實現了。

5.2 common-seata工具包

對於這些自定義實現類,以及一些公共client配置,咱們能夠統一封裝到一個工具包下:

圖1

這樣,其餘項目只須要引入這個工具包,就能夠無需繁瑣的配置,直接使用了。

gradle引入common包:
api 'com.youdao.ke.courseop.common:common-seata:0.0.+'

6. 落地實例

以一個物流場景爲例:
業務架構

  • logistics-server (物流服務)
  • logistics-k3c-server (物流-金蝶客戶端,封裝調用金蝶服務的api
  • elasticsearch

業務背景:logistics 執行領用單新增,在 elasticsearch 中更新數據,同時經過 rpc 調用 logistics-k3c 的金蝶出庫方法,生成金蝶單據,如圖2所示
圖2

問題:若是elasticsearch單據更新出現異常,金蝶單據將沒法回滾,形成數據不一致的問題。

在部署完seata線上服務後,只須要在logistics與logistics-k3c中分別引入common-seata工具包

logistics服務

// 使用全局事務註解開啓全局事務
    @GlobalTransactional
    @Transactional(rollbackFor = Exception.class)
    public void Scm經過(StaffOutStockDoc staffOutStock, String body) throws Exception {
        ... 一些業務處理...
         // 構建金蝶單據請求
        K3cApi.StaffoutstockReq req = new K3cApi.StaffoutstockReq();
        req.materialNums = materialNums;
        req.staffOutStockId = staffOutStock.id;
        ... 一些業務處理 ...
       // 調用logistics-k3c-api金蝶出庫
        k3cApi.staffoutstockAuditPass(req);

        staffOutStock.status = 待發貨;
        staffOutStock.scmAuditTime = new Date();
        staffOutStock.updateTime = new Date();
        staffOutStock.historyPush("scm經過");
        // 更新對象後存入elasticsearch
        es.set(staffOutStock);
    }

logistics-k3c

因爲咱們新增單據接口是調用金蝶的服務,因此這裏使用TCC模式構建事務接口

  • 首先建立StaffoutstockCreateAction接口

    @LocalTCC
    public interface StaffoutstockCreateAction {
      @TwoPhaseBusinessAction(name = "staffoutstockCreate")
      boolean create(BusinessActionContext businessActionContext,
                         @BusinessActionContextParameter(paramName = "staffOutStock") StaffOutStock staffOutStock,
                         @BusinessActionContextParameter(paramName = "materialNum") List<Triple<Integer, Integer, Integer>> materialNum);
    
      boolean commit(BusinessActionContext businessActionContext);
    
      boolean rollback(BusinessActionContext businessActionContext);
    
    }
  • 接口實現StaffoutstockCreateActionImpl

    @Slf4j
    @Service
    public class StaffoutstockCreateActionImpl implements StaffoutstockCreateAction {
    
      @Autowired
      private K3cAction4Staffoutstock k3cAction4Staffoutstock;
    
      @SneakyThrows
      @Transactional(rollbackFor = Exception.class)
      @Override
      public boolean create(BusinessActionContext businessActionContext, StaffOutStock staffOutStock, List<Triple<Integer, Integer, Integer>> materialNum) {
          //金蝶單據新增
          k3cAction4Staffoutstock.staffoutstockAuditPass(staffOutStock, materialNum);
          return true;
      }
    
      @SneakyThrows
      @Transactional(rollbackFor = Exception.class)
      @Override
      public boolean commit(BusinessActionContext businessActionContext) {
          Map<String, Object> context = businessActionContext.getActionContext();
          JSONObject staffOutStockJson = (JSONObject) context.get("staffOutStock");
          // 若是嘗試新增成功,commit不作任何事
          StaffOutStock staffOutStock = staffOutStockJson.toJavaObject(StaffOutStock.class);
          log.info("staffoutstock {} commit successfully!", staffOutStock.id);
          return true;
      }
    
      @SneakyThrows
      @Transactional(rollbackFor = Exception.class)
      @Override
      public boolean rollback(BusinessActionContext businessActionContext) {
          Map<String, Object> context = businessActionContext.getActionContext();
          JSONObject staffOutStockJson = (JSONObject) context.get("staffOutStock");
          StaffOutStock staffOutStock = staffOutStockJson.toJavaObject(StaffOutStock.class);
          // 這裏調用金蝶單據刪除接口進行回滾
          k3cAction4Staffoutstock.staffoutstockRollback(staffOutStock);
          log.info("staffoutstock {} rollback successfully!", staffOutStock.id);
          return true;
      }
    }
  • 封裝爲業務方法

    /**
       * 項目組領用&報廢的審覈經過:新增其餘出庫單
       * 該方法使用seata-TCC方案實現全局事務
       * @param staffOutStock
       * @param materialNum
       */
      
      @Transactional
      public void staffoutstockAuditPassWithTranscation(StaffOutStock staffOutStock,
                                                        List<Triple<Integer, Integer, Integer>> materialNum){
          staffoutstockCreateAction.create(null, staffOutStock, materialNum);
      }
  • k3c API實現類

    @SneakyThrows
      @Override
      public void staffoutstockAuditPass(StaffoutstockReq req) {
          ... 一些業務處理方法 ...
          //這裏調用了封裝好的事務方法
          k3cAction4Staffoutstock.staffoutstockAuditPassWithTranscation(staffOutStock, triples);
      }

這樣,一個基於 TCC 的全局事務鏈路就創建起來了。

當全局事務執行成功時,咱們能夠在 server 中看到打印的日誌(如圖3):
圖3

若是全局事務執行失敗,會進行回滾,此時會執行接口中的rollback,調用金蝶接口刪除生成的單據,如圖4。
圖4

7. 總結

本文以seata框架的部署與使用爲主線,記錄了seata 框架運用的一些關鍵步驟與技術細節,並針對項目落地時遇到的一些的技術問題提供瞭解決方案。

在後續的推文中,咱們還將繼續以 seata 框架的源碼解析爲主線,向你們介紹 seata 實現分佈式事務的核心原理與技術細節。
-END-

相關文章
相關標籤/搜索