分佈式事務

  • JTA/XA二段式提交

普通事務管理的過程html

  1. do
  2. commit/rollback

外部(全局)事務-JTAjava

  1. 外部事務管理器提供事務管理
  2. 經過Spring事務接口,調用外部管理器。
  3. 使用JNDI等方式獲取外部事務管理器的實例
  4. 外部事務管理器通常由應用服務器提供,如JBOSS,WebLogic,不過咱們經常使用的Tomcat是不提供的。
  5. 外部事務管理器提供JTA事務管理
  6. JTA事務管理器能夠管理多個數據資源
  7. 經過2階段提交實現多數據源的事務。

JTA事務管理的過程mysql

  1. do
  2. prepare / rollback
  3. commit / rollback

使用應用服務器web

不使用應用服務器(通常使用的是Atomikos)spring

XA與JTAsql

  1. Transaction Manage
  2. XA Resource
  3. 兩階段提交

XA規範的JAVA實現-JTA數據庫

上圖中JTA是事務管理器在Java中的實現,它的全稱爲Java Transaction API.XAResource是Java中對Resource規範的實現。服務器

JTA網絡

  1. TransactionManager
  2. XAResouce
  3. XID

咱們來看一下TransactionManager接口mybatis

public interface TransactionManager {

    /**
     * 開啓一個事務
     *
     */
    public void begin() throws NotSupportedException, SystemException;

    /**
     * 提交一個事務
     *
     */
    public void commit() throws RollbackException,
   HeuristicMixedException, HeuristicRollbackException, SecurityException,
   IllegalStateException, SystemException;

    /**
     * 獲取事務狀態
     *
     */
    public int getStatus() throws SystemException;

    /**
     * 開啓一個事務
     *
     */
    public Transaction getTransaction() throws SystemException;

    /**
     * 繼續掛起的事務
     */
    public void resume(Transaction tobj)
            throws InvalidTransactionException, IllegalStateException,
            SystemException;

    /**
     * 回滾
     *
     */
    public void rollback() throws IllegalStateException, SecurityException,
                            SystemException;

    /**
     * 設置回滾只讀
     *
     */
    public void setRollbackOnly() throws IllegalStateException, SystemException;

    /**
     * 設置事務的超時時間
     *
     */
    public void setTransactionTimeout(int seconds) throws SystemException;

    /**
     * 掛起一個事務
     *
     */
    public Transaction suspend() throws SystemException;
}

XAResource接口

public interface XAResource {
    int TMENDRSCAN = 8388608;
    int TMFAIL = 536870912;
    int TMJOIN = 2097152;
    int TMNOFLAGS = 0;
    int TMONEPHASE = 1073741824;
    int TMRESUME = 134217728;
    int TMSTARTRSCAN = 16777216;
    int TMSUCCESS = 67108864;
    int TMSUSPEND = 33554432;
    int XA_RDONLY = 3;
    int XA_OK = 0;
    //控制某個id的事務進行第幾階段的提交
    void commit(Xid var1, boolean var2) throws XAException;
    
    void end(Xid var1, int var2) throws XAException;

    void forget(Xid var1) throws XAException;
    //獲取事務的超時時間
    int getTransactionTimeout() throws XAException;
    //是否在同一個ResourceManager裏面呢
    boolean isSameRM(XAResource var1) throws XAException;
    //準備一個全局的事務
    int prepare(Xid var1) throws XAException;
    //恢復一個全局事務
    Xid[] recover(int var1) throws XAException;
    
    void rollback(Xid var1) throws XAException;

    boolean setTransactionTimeout(int var1) throws XAException;

    void start(Xid var1, int var2) throws XAException;
}

XID接口

public interface Xid {
    int MAXGTRIDSIZE = 64;
    int MAXBQUALSIZE = 64;

    int getFormatId();

    byte[] getGlobalTransactionId();

    byte[] getBranchQualifier();
}

JTA事務管理的弊端

  1. 兩階段提交
  2. 事務時間太長、鎖數據的時間太長
  3. 低性能、吞吐量低

如今咱們用一個樣例來講明JTA事務管理,咱們先在不一樣的數據庫中添加兩張表

pom

<dependencies>
   <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-actuator</artifactId>
   </dependency>
   <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-jdbc</artifactId>
   </dependency>
   <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
   </dependency>
   <dependency>
      <groupId>org.mybatis.spring.boot</groupId>
      <artifactId>mybatis-spring-boot-starter</artifactId>
      <version>2.1.1</version>
   </dependency>
   <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <scope>8.0.11</scope>
   </dependency>
   <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>druid</artifactId>
      <version>1.0.29</version>
   </dependency>
   <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-jta-atomikos</artifactId>
   </dependency>
   <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      <optional>true</optional>
   </dependency>
   <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
   </dependency>
   <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-configuration-processor</artifactId>
      <optional>true</optional>
   </dependency>
</dependencies>

配置文件

logging:
  level:
    root: info
    com.guanjiann: debug
  file: logs/${spring.application.name}.log
server:
  port: 8080
spring:
  application:
    name: Twocommit
  datasource:
    test1:
      driver-class-name: com.mysql.cj.jdbc.Driver
      jdbcurl: jdbc:mysql://xxx.xxx.xxx.xxx:3306/cloud_resource?useSSL=FALSE&serverTimezone=GMT%2B8
      username: root
      password: abcd123
      type: com.alibaba.druid.pool.DruidDataSource
      filters: stat
      maxActive: 20
      initialSize: 1
      maxWait: 60000
      minIdle: 1
      timeBetweenEvictionRunsMillis: 60000
      minEvictableIdleTimeMillis: 300000
      validationQuery: select 'x'
      testWhileIdle: true
      testOnBorrow: false
      testOnReturn: false
      poolPreparedStatements: true
      maxOpenPreparedStatements: 20
      minPoolSize: 3
      maxPoolSize: 25
      maxLifetime: 20000
      borrowConnectionTimeout: 30
      loginTimeout: 30
      maintenanceInterval: 60
      maxIdleTime: 60
    test2:
      driver-class-name: com.mysql.cj.jdbc.Driver
      jdbcurl: jdbc:mysql://xxx.xxx.xxx.xxx:3306/cloud_resource_base?useSSL=FALSE&serverTimezone=GMT%2B8
      username: root
      password: abcd123
      type: com.alibaba.druid.pool.DruidDataSource
      filters: stat
      maxActive: 20
      initialSize: 1
      maxWait: 60000
      minIdle: 1
      timeBetweenEvictionRunsMillis: 60000
      minEvictableIdleTimeMillis: 300000
      validationQuery: select 'x'
      testWhileIdle: true
      testOnBorrow: false
      testOnReturn: false
      poolPreparedStatements: true
      maxOpenPreparedStatements: 20
      minPoolSize: 3
      maxPoolSize: 25
      maxLifetime: 20000
      borrowConnectionTimeout: 30
      loginTimeout: 30
      maintenanceInterval: 60
      maxIdleTime: 60
mybatis:
  type-aliases-package: com.guanjian.twocommit.domain
  mapper-locations: classpath:/mybatis-mappers/*
  configuration:
    mapUnderscoreToCamelCase: true

SpringBoot啓動類

@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class}, scanBasePackages = {"com.guanjian.twocommit"})
@EnableTransactionManagement
@EnableConfigurationProperties(value = {DBConfig1.class, DBConfig2.class})
public class TwocommitApplication extends SpringBootServletInitializer {

   @Override
   protected SpringApplicationBuilder configure(SpringApplicationBuilder application) {
      return application.sources(TwocommitApplication.class);
   }

   public static void main(String[] args) {
      SpringApplication.run(TwocommitApplication.class, args);
   }

}

配置文件讀取類

@Data
@ConfigurationProperties(prefix = "spring.datasource.test1")
public class DBConfig1 {
    private String jdbcurl;
    private String username;
    private String password;
    private int minPoolSize;
    private int maxPoolSize;
    private int maxLifetime;
    private int borrowConnectionTimeout;
    private int loginTimeout;
    private int maintenanceInterval;
    private int maxIdleTime;
    private String testQuery;
}
@Data
@ConfigurationProperties(prefix = "spring.datasource.test2")
public class DBConfig2 {
    private String jdbcurl;
    private String username;
    private String password;
    private int minPoolSize;
    private int maxPoolSize;
    private int maxLifetime;
    private int borrowConnectionTimeout;
    private int loginTimeout;
    private int maintenanceInterval;
    private int maxIdleTime;
    private String testQuery;
}

Mybatis整合atomikos全局事務管理類

@Configuration
@MapperScan(basePackages = "com.guanjian.twocommit.dao", sqlSessionTemplateRef = "test1SqlSessionTemplate")
public class MyBatisConfig1 {
    @Bean(name = "test1DataSource")  //test1DataSource
    public DataSource testDataSource(DBConfig1 testConfig) throws SQLException {
        MysqlXADataSource mysqlXaDataSource = new MysqlXADataSource();
        //mysqlXaDataSource.setUrl(testConfig.getUrl());
        mysqlXaDataSource.setUrl(testConfig.getJdbcurl());
        mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);
        mysqlXaDataSource.setPassword(testConfig.getPassword());
        mysqlXaDataSource.setUser(testConfig.getUsername());
        mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);

        // 將本地事務註冊到創 Atomikos全局事務
        AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
        xaDataSource.setXaDataSource(mysqlXaDataSource);
        xaDataSource.setUniqueResourceName("test1DataSource");

        xaDataSource.setMinPoolSize(testConfig.getMinPoolSize());
        xaDataSource.setMaxPoolSize(testConfig.getMaxPoolSize());
        xaDataSource.setMaxLifetime(testConfig.getMaxLifetime());
        xaDataSource.setBorrowConnectionTimeout(testConfig.getBorrowConnectionTimeout());
        xaDataSource.setLoginTimeout(testConfig.getLoginTimeout());
        xaDataSource.setMaintenanceInterval(testConfig.getMaintenanceInterval());
        xaDataSource.setMaxIdleTime(testConfig.getMaxIdleTime());
        xaDataSource.setTestQuery(testConfig.getTestQuery());
        return xaDataSource;
    }

    @Bean(name = "test1SqlSessionFactory")
    public SqlSessionFactory testSqlSessionFactory(@Qualifier("test1DataSource") DataSource dataSource)
            throws Exception {
        SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
        bean.setDataSource(dataSource);
        return bean.getObject();
    }

    @Bean(name = "test1SqlSessionTemplate")
    public SqlSessionTemplate testSqlSessionTemplate(
            @Qualifier("test1SqlSessionFactory") SqlSessionFactory sqlSessionFactory) throws Exception {
        return new SqlSessionTemplate(sqlSessionFactory);
    }
}
@Configuration
@MapperScan(basePackages = "com.guanjian.twocommit.dao2", sqlSessionTemplateRef = "test2SqlSessionTemplate")
public class MyBatisConfig2 {
    @Bean(name = "test2DataSource")  //test1DataSource
    public DataSource testDataSource(DBConfig2 testConfig) throws SQLException {
        MysqlXADataSource mysqlXaDataSource = new MysqlXADataSource();
        //mysqlXaDataSource.setUrl(testConfig.getUrl());
        mysqlXaDataSource.setUrl(testConfig.getJdbcurl());
        mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);
        mysqlXaDataSource.setPassword(testConfig.getPassword());
        mysqlXaDataSource.setUser(testConfig.getUsername());
        mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);

        // 將本地事務註冊到創 Atomikos全局事務
        AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
        xaDataSource.setXaDataSource(mysqlXaDataSource);
        xaDataSource.setUniqueResourceName("test2DataSource");

        xaDataSource.setMinPoolSize(testConfig.getMinPoolSize());
        xaDataSource.setMaxPoolSize(testConfig.getMaxPoolSize());
        xaDataSource.setMaxLifetime(testConfig.getMaxLifetime());
        xaDataSource.setBorrowConnectionTimeout(testConfig.getBorrowConnectionTimeout());
        xaDataSource.setLoginTimeout(testConfig.getLoginTimeout());
        xaDataSource.setMaintenanceInterval(testConfig.getMaintenanceInterval());
        xaDataSource.setMaxIdleTime(testConfig.getMaxIdleTime());
        xaDataSource.setTestQuery(testConfig.getTestQuery());
        return xaDataSource;
    }

    @Bean(name = "test2SqlSessionFactory")
    public SqlSessionFactory testSqlSessionFactory(@Qualifier("test2DataSource") DataSource dataSource)
            throws Exception {
        SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
        bean.setDataSource(dataSource);
        return bean.getObject();
    }

    @Bean(name = "test2SqlSessionTemplate")
    public SqlSessionTemplate testSqlSessionTemplate(
            @Qualifier("test2SqlSessionFactory") SqlSessionFactory sqlSessionFactory) throws Exception {
        return new SqlSessionTemplate(sqlSessionFactory);
    }
}

實體類

@Data
public class User {
    private int id;
    private String name;
}
@Data
public class Account {
    private int id;
    private String name;
}
@Data
public class UADTO {
    private User user;
    private Account account;
}

dao接口

public interface UserDao {
    @Select("select * from user where name=#{name}")
    User findUserByName(String name);

    @Options(useGeneratedKeys = true, keyProperty = "id")
    @Insert("insert into user (name) values (#{name})")
    int addUser(User user);
}
public interface AccountDao {
    @Select("select * from account where name=#{name}")
    Account findAccountByName(String name);

    @Options(useGeneratedKeys = true, keyProperty = "id")
    @Insert("insert into account (name) values (#{name})")
    int addAccount(Account account);
}

service接口

public interface UADTOService {
    int addUserAndAccount(UADTO uadto);
}

打標籤的全局事務實現類

@Service
public class UADTOSerciceImpl implements UADTOService {
    @Autowired
    private UserDao userDao;
    @Autowired
    private AccountDao accountDao;

    @Override
    @Transactional
    public int addUserAndAccount(UADTO uadto) {
        userDao.addUser(uadto.getUser());
        accountDao.addAccount(uadto.getAccount());
        return 1;
    }
}

代碼事務實現類

@Service
@Primary
public class UADTOServiceXAImpl implements UADTOService {
    @Autowired
    private UserDao userDao;
    @Autowired
    private AccountDao accountDao;
    @Autowired
    private PlatformTransactionManager transactionManager;

    @Override
    public int addUserAndAccount(UADTO uadto) {
        DefaultTransactionDefinition def = new DefaultTransactionDefinition();
        def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
        def.setTimeout(15);
        TransactionStatus status = transactionManager.getTransaction(def);
        try {
            userDao.addUser(uadto.getUser());
            accountDao.addAccount(uadto.getAccount());
            transactionManager.commit(status);
            return 1;
        }catch (Exception e) {
            transactionManager.rollback(status);
            throw e;
        }
    }
}

以上這兩個實現類的全局事務是等價的,不管是標籤仍是代碼,其實都使用到了JTA的分佈式事務的二階段提交。

Controller

@RestController
@Slf4j
public class AllController {
    @Autowired
    private UADTOService uadtoService;

    @PostMapping("/addall")
    public int addUserAndAccount(@RequestBody UADTO uadto) {
        log.info(uadto.toString());
        return uadtoService.addUserAndAccount(uadto);
    }
}

這裏的二階段提交事務,對不一樣數據庫的表插入要麼是同時成功,要麼是同時失敗的。

  • TCC模式的事務管理(柔性事務)
  1. Try
  2. Commit(Confirm) / Cancel

TCC模式實現思路

每一個須要實現事務的接口,都須要3個接口,分別是:

  1. tryXX():業務檢查,預留資源
  2. confirmXX():執行業務、使用資源
  3. cancelXX():回滾業務、釋放資源

比方說在上圖的下單的操做中,Order服務接收了下單的請求,扣費的時候,Order服務去調用User服務的時候,它去調用User服務的tryCharge()方法。調用完以後就把這一次的調用註冊到協調器裏面,當咱們的下單服務的所有事務完成以後,協調器服務會調用confirmCharge()方法,去完成這個扣費的操做。若是在下單的過程當中,出了任何的錯誤,協調器服務會幫咱們去調用User服務的cancelCharge()方法,去把扣費操做去取消。

TCC模式協調器的功能

  1. 接管事務的管理,相似JTA的獨立事務管理器(非兩階段提交)
  2. 保存每一個資源上的事務記錄:跟蹤狀態、檢查超時
  3. 保證每一個資源上的事務性
  4. 處理各類錯誤:超時、重試、網絡異常、服務不可用

TCC模式實現分佈式事務

  1. 借鑑XA的統一資源管理,又不是兩階段提交
  2. 不一樣資源之間沒有鎖,事務過程數據沒有鎖、沒有隔離
  3. 出錯時可能屢次調用Confirm/Cancel方法、以及順序沒法保證
  4. Confirm/Cancel方法需知足冪等性,即重複調用時結果一致

如今咱們以一個實際業務場景來加以說明

TCC 實現階段一:Try

在上圖的預處理中,那個訂單服務先把本身的狀態修改成:OrderStatus.UPDATING。這個狀態是個沒有任何含義的這麼一個狀態,表明有人正在修改這個狀態罷了。庫存服務別直接扣減庫存,而是凍結掉庫存。舉個例子,原本你的庫存數量是 100,你別直接 100 - 2 = 98,扣減這個庫存!你能夠把可銷售的庫存:100 - 2 = 98,設置爲 98 沒問題,而後在一個單獨的凍結庫存的字段裏,設置一個 2。也就是說,有 2 個庫存是給凍結了。

積分服務也是同理,別直接給用戶增長會員積分。你能夠先在積分表裏的一個預增長積分字段加入積分。好比:用戶積分本來是 1190,如今要增長 10 個積分,別直接 1190 + 10 = 1200 個積分!你能夠保持積分爲 1190 不變,在一個預增長字段裏,好比說 prepare_add_credit 字段,設置一個 10,表示有 10 個積分準備增長。

倉儲服務也是同理啊,你能夠先建立一個銷售出庫單,可是這個銷售出庫單的狀態是「UNKNOWN」。也就是說,剛剛建立這個銷售出庫單,此時還不肯定它的狀態是什麼。

這個操做,通常都是鎖定某個資源,設置一個預備類的狀態,凍結部分數據,等等,大概都是這類操做。

TCC 實現階段二:Confirm

而後就分紅兩種狀況了,第一種狀況是比較理想的,那就是各個服務執行本身的那個 Try 操做,都執行成功了!此時,TCC 分佈式事務框架會控制進入 TCC 下一個階段,第一個 C 階段,也就是 Confirm 階段。爲了實現這個階段,你須要在各個服務裏再加入一些代碼。好比說,訂單服務裏,你能夠加入一個 Confirm 的邏輯,就是正式把訂單的狀態設置爲「已支付」了。

庫存服務也是相似的,將以前凍結庫存字段的 2 個庫存扣掉變爲 0。這樣的話,可銷售庫存以前就已經變爲 98 了,如今凍結的 2 個庫存也沒了,那就正式完成了庫存的扣減。

積分服務也是相似的,就是將預增長字段的 10 個積分扣掉,而後加入實際的會員積分字段中,從 1190 變爲 1120。

倉儲服務也是相似,將銷售出庫單的狀態正式修改成「已建立」,能夠供倉儲管理人員查看和使用,而不是停留在以前的中間狀態「UNKNOWN」了。

上面各類服務的 Confirm 的邏輯都實現好了,一旦訂單服務裏面的 TCC 分佈式事務框架感知到各個服務的 Try 階段都成功了之後,就會執行各個服務的 Confirm 邏輯。

TCC 實現階段三:Cancel

在 Try 階段,好比積分服務吧,它執行出錯了,此時會怎麼樣?那訂單服務內的 TCC 事務框架是能夠感知到的,而後它會決定對整個 TCC 分佈式事務進行回滾。也就是說,會執行各個服務的第二個 C 階段,Cancel 階段。一樣,爲了實現這個 Cancel 階段,各個服務還得加一些代碼。

首先訂單服務,就是能夠將訂單的狀態設置爲「CANCELED」,也就是這個訂單的狀態是已取消。

庫存服務也是同理,就是將凍結庫存扣減掉 2,加回到可銷售庫存裏去,98 + 2 = 100。

積分服務也須要將預增長積分字段的 10 個積分扣減掉。

倉儲服務也須要將銷售出庫單的狀態修改成「CANCELED」設置爲已取消。

而後這個時候,訂單服務的 TCC 分佈式事務框架只要感知到了任何一個服務的 Try 邏輯失敗了,就會跟各個服務內的 TCC 分佈式事務框架進行通訊,而後調用各個服務的 Cancel 邏輯。

總結與思考

先來 Try 一下,不要把業務邏輯完成,先試試看,看各個服務能不能基本正常運轉,能不能先凍結我須要的資源。

若是 Try 都 OK,也就是說,底層的數據庫、Redis、Elasticsearch、MQ 都是能夠寫入數據的,而且你保留好了須要使用的一些資源(好比凍結了一部分庫存)。

接着,再執行各個服務的 Confirm 邏輯,基本上 Confirm 就能夠很大機率保證一個分佈式事務的完成了。

那若是 Try 階段某個服務就失敗了,好比說底層的數據庫掛了,或者 Redis 掛了,等等。

此時就自動執行各個服務的 Cancel 邏輯,把以前的 Try 邏輯都回滾,全部服務都不要執行任何設計的業務邏輯。保證你們要麼一塊兒成功,要麼一塊兒失敗。

等一等,你有沒有想到一個問題?若是有一些意外的狀況發生了,好比說訂單服務忽然掛了,而後再次重啓,TCC 分佈式事務框架是如何保證以前沒執行完的分佈式事務繼續執行的呢?

因此,TCC 事務框架都是要記錄一些分佈式事務的活動日誌的,能夠在磁盤上的日誌文件裏記錄,也能夠在數據庫裏記錄。保存下來分佈式事務運行的各個階段和狀態。

問題還沒完,萬一某個服務的 Cancel 或者 Confirm 邏輯執行一直失敗怎麼辦呢?

那也很簡單,TCC 事務框架會經過活動日誌記錄各個服務的狀態。舉個例子,好比發現某個服務的 Cancel 或者 Confirm 一直沒成功,會不停的重試調用它的 Cancel 或者 Confirm 邏輯,務必要它成功!

  • 分佈式事務框架Seata

Seata 是一款開源的分佈式事務解決方案,致力於提供高性能和簡單易用的分佈式事務服務。Seata 將爲用戶提供了 AT、TCC、SAGA 和 XA 事務模式,爲用戶打造一站式的分佈式解決方案。

有關於Seata的說明能夠參考https://seata.io/zh-cn/docs/overview/what-is-seata.html

相關文章
相關標籤/搜索
本站公眾號
   歡迎關注本站公眾號,獲取更多信息