Spring系列(9)-多數據源和2PC分佈式事務

1. 前言

本系列有寫過在spring boot中,普通數據庫事務的處理方式,主要是經過@Transactional的註解,可是卻不能知足於分佈式事務的需求。例如:跨多個多種數據庫的一致性事務,跨系統RPC調用的事務,等等。java

在分佈式領域基於CAP理論以及BASE理論,有人就提出了 柔性事務 的概念。CAP(一致性、可用性、分區容忍性)理論你們都理解不少次了,這裏再也不敘述。說一下BASE理論,它是在CAP理論的基礎之上的延伸。包括 基本可用(Basically Available)、柔性狀態(Soft State)、最終一致性(Eventual Consistency)。mysql

  • 基本可用 : 分佈式系統出現故障的時候,容許損失一部分可用性。好比,京東618大促的時候,對一些非核心鏈路的功能進行降級處理。核心高可用,非核心可降級。
  • 柔性狀態: 容許系統存在中間狀態,這個中間狀態又不會影響系統總體可用性。好比,數據庫讀寫分離,寫庫同步到讀庫(主庫同步到從庫)會有一個延時,這樣實際是一種柔性狀態。
  • 最終一致性: 數據庫主從複製的例子,通過數據同步延時以後,最終數據能達到一致。

針對柔性事務的解決方案,業界內有下面幾種:spring

  1. 兩階段提交(Two-phase Commit,2PC),經過引入協調者(Coordinator)來協調參與者的行爲,並最終決定這些參與者是否要真正執行事務。協調者包括支持事務的XA數據庫,Jms等等。
  2. 補償事務(Try - Confirm - Cancel,TCC),針對每一個操做,都要註冊一個與其對應的確認和補償(撤銷)操做。
  3. 異步確保,本地消息表與業務數據表處於同一個數據庫中,這樣就能利用本地事務來保證在對這兩個表的操做知足事務特性,而且使用了消息隊列來保證最終一致性。

本文專門講解2PC兩階段提交的這種解決方案,前面會講解若是在spring boot中配置多數據源,後續會經過引入Atomikos來實踐2PC的分佈式事務。sql

2. 多數據源

2.1. application.propreties

spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
##a數據源
spring.datasource.druid.a.url=
spring.datasource.druid.a.username=
spring.datasource.druid.a.password=
spring.datasource.druid.a.driver-class-name=oracle.jdbc.driver.OracleDriver
## b數據源
spring.datasource.druid.b.url=
spring.datasource.druid.b.username=
spring.datasource.druid.b.password=
spring.datasource.druid.b.driver-class-name=oracle.jdbc.driver.OracleDriver

2.2. 數據源配置類

ADataSourceConfig.java數據庫

/*
** @MapperScan:A 數據源dao層路徑
** @Primary:多數據源時,表示默認數據源的配置
*/
@Configuration
@MapperScan(basePackages = "pers.demo.transaction.transaction2pc.mapper.a",
        sqlSessionFactoryRef = "aSqlSessionFactory")
public class ADataSourceConfig {
//註冊數據源
    @Primary
    @Bean(name = "aDataSource")
    @ConfigurationProperties(prefix = "spring.datasource.druid.a")
    public DataSource aDataSource() {
        return DruidDataSourceBuilder.create().build();
    }
//註冊事務管理器(很重要!!!)
    @Bean(name = "aTransactionManager")
    @Primary
    public DataSourceTransactionManager aTransactionManager() {
        return new DataSourceTransactionManager(aDataSource());
    }

    @Bean(name = "aSqlSessionFactory")
    @Primary
    public SqlSessionFactory aSqlSessionFactory(@Qualifier("aDataSource") DataSource dataSource) throws Exception {
        final SqlSessionFactoryBean sessionFactoryBean = new SqlSessionFactoryBean();
        sessionFactoryBean.setDataSource(dataSource);
        // sessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath:mapper/a/*.xml"));
        return sessionFactoryBean.getObject();
    }
}

BDataSourceConfig.java服務器

/*
** B 數據源的配置,注意都沒有 @Primary 了
*/
@Configuration
@MapperScan(basePackages = "pers.demo.transaction.transaction2pc.mapper.b",
        sqlSessionFactoryRef = "bSqlSessionFactory")
public class BDataSourceConfig {

    @Bean(name = "bDataSource")
    @ConfigurationProperties(prefix = "spring.datasource.druid.b")
    public DataSource bDataSource() {
        return DruidDataSourceBuilder.create().build();
    }

    @Bean(name = "bTransactionManager")
    public DataSourceTransactionManager bTransactionManager() {
        return new DataSourceTransactionManager(bDataSource());
    }

    @Bean(name = "bSqlSessionFactory")
    public SqlSessionFactory bSqlSessionFactory(@Qualifier("bDataSource") DataSource dataSource) throws Exception {
        final SqlSessionFactoryBean sessionFactoryBean = new SqlSessionFactoryBean();
        sessionFactoryBean.setDataSource(dataSource);
       // sessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath:mapper/b/*.xml"));
        return sessionFactoryBean.getObject();
    }
}

總結

多數據源配置,核心的代碼只有上面這些。先是在配置文件中定義A、B兩個數據源的鏈接信息,而後分別構建不一樣數據源的配置類,而且指向對應的dao層路徑。由此:session

Dao層數據源pers.demo.transaction.transaction2pc.mapper.a.xxx.java ,dao層執行的方法,都是基於A數據源的;pers.demo.transaction.transaction2pc.mapper.b.xxx.java,dao層執行的方法,都是基於B數據源的。oracle

Service層事務:還記得 @Transactional 事務嗎?Service層中若是沒有指定事務管理器,默認會取值@Primary,即A數據源的事務管理器。若是想要使用B數據源的事務管理器,須要手動聲明。app

@Transactional(transactionManager = "bTransactionManager")

若是你勤于思考的話,這時就會有疑惑,當前的事務管理器都是基於單個數據源定義的,那麼分佈式事務該如何定義事務管理器呢?框架

3. XA 和 JTA

3.1. XA

你們對XA有印象嗎?我實在是印象深入。實習時第一天,就是經過ADF在本地電腦上運行WebLogic服務器,而後就是配置數據源。Oracle數據源的驅動有不少,就包括 oracle.jdbc.xa.client.OracleXADataSource ,我當時仍是對這個XA疑惑好久。

XA協議由Tuxedo首先提出的,並交給X/Open組織,做爲資源管理器(數據庫)與事務管理器的接口標準。XA協議採用兩階段提交方式來管理分佈式事務。

XA規範定義了:

  1. TransactionManager : 這個TransactionManager能夠經過管理多個ResourceManager來管理多個Resouce,也就是管理多個數據源。
  2. XAResource : 針對數據資源封裝的一個接口。
  3. 兩段式提交 : 多數據源事務提交的機制。

簡單來講,基於XA協議的數據庫,均可以採用兩階段提交方式來管理分佈式事務。所幸常見的關係型數據庫oracle、mysql、sql server都支持,可是一些不支持事務的nosql數據庫是不行的。另外,jms、rocketmq等也是支持XA協議的,一樣能夠經過2PC來管理分佈式事務。

3.2. JTA

JTA(Java Transaction Manager) : 是Java規範,是XA在Java上的實現.

  1. TransactionManager : 經常使用方法,能夠開啓,回滾,獲取事務. begin(),rollback()...
  2. XAResouce : 資源管理,經過Session來進行事務管理,commit(xid)...
  3. XID : 每個事務都分配一個特定的XID

JTA是如何實現多數據源的事務管理呢?

主要的原理是兩階段提交,以上面的請求業務爲例,當整個業務完成了以後只是第一階段提交,在第二階段提交以前會檢查其餘全部事務是否已經提交,若是前面出現了錯誤或是沒有提交,那麼第二階段就不會提交,而是直接rollback操做,這樣全部的事務都會作Rollback操做.

JTA的有點就是可以支持多數據庫事務同時事務管理,知足分佈式系統中的數據的一致性.可是也有對應的弊端:

  1. 兩階段提交
  2. 事務時間太長,鎖數據太長
  3. 低性能,低吞吐量

4. JTA事務管理

spring boot支持JTA的框架有不少,咱們此次使用Atomikos。咱們仍是基於以前配置多數據源的代碼。

pom.xml

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jta-atomikos</artifactId>
        </dependency>

4.1. 數據源配置類

ADataSourceConfig.java

@Configuration
@MapperScan(basePackages = "pers.demo.transaction.transaction2pc.mapper.a",
        sqlSessionFactoryRef = "aSqlSessionFactory")
@ConfigurationProperties(prefix = "spring.datasource.druid.a")
@Data
public class ADataSourceConfig {
    private String url;
    private String username;
    private String password;

    @Primary
    @Bean(name = "aDataSource")
    public DataSource aDataSource() {
        Properties properties = new Properties();
        properties.setProperty("URL", url);
        properties.setProperty("user", username);
        properties.setProperty("password", password);

        AtomikosDataSourceBean ds = new AtomikosDataSourceBean();
        ds.setXaProperties(properties);
        ds.setUniqueResourceName("AOracleXADataSource");
        ds.setXaDataSourceClassName("oracle.jdbc.xa.client.OracleXADataSource");
        return ds;
    }

    @Bean(name = "aTransactionManager")
    @Primary
    public DataSourceTransactionManager aTransactionManager() {
        return new DataSourceTransactionManager(aDataSource());
    }

    @Bean(name = "aSqlSessionFactory")
    @Primary
    public SqlSessionFactory aSqlSessionFactory(@Qualifier("aDataSource") DataSource dataSource) throws Exception {
        final SqlSessionFactoryBean sessionFactoryBean = new SqlSessionFactoryBean();
        sessionFactoryBean.setDataSource(dataSource);
        return sessionFactoryBean.getObject();
    }

}

BDataSourceConfig.java

@Configuration
@MapperScan(basePackages = "pers.demo.transaction.transaction2pc.mapper.b",
        sqlSessionFactoryRef = "bSqlSessionFactory")
@ConfigurationProperties(prefix = "spring.datasource.druid.b")
@Data
public class BDataSourceConfig {
    private String url;
    private String username;
    private String password;

    @Bean(name = "bDataSource")
    public DataSource bDataSource() {
        Properties properties = new Properties();
        properties.setProperty("URL", url);
        properties.setProperty("user", username);
        properties.setProperty("password", password);

        AtomikosDataSourceBean ds = new AtomikosDataSourceBean();
        ds.setXaProperties(properties);
        ds.setUniqueResourceName("BOracleXADataSource");
        ds.setXaDataSourceClassName("oracle.jdbc.xa.client.OracleXADataSource");
        return ds;
    }

    @Bean(name = "bTransactionManager")
    public DataSourceTransactionManager bTransactionManager() {
        return new DataSourceTransactionManager(bDataSource());
    }

    @Bean(name = "bSqlSessionFactory")
    public SqlSessionFactory bSqlSessionFactory(@Qualifier("bDataSource") DataSource dataSource) throws Exception {
        final SqlSessionFactoryBean sessionFactoryBean = new SqlSessionFactoryBean();
        sessionFactoryBean.setDataSource(dataSource);
        return sessionFactoryBean.getObject();
    }

}

4.2. 註冊JTA事務管理器

在配置類中註冊JTA的TransactionManager。

@Bean(name = "jtaTransactionManager")
    @Primary
    public JtaTransactionManager jtaTransactionManager () {
        UserTransactionManager userTransactionManager = new UserTransactionManager();
        UserTransaction userTransaction = new UserTransactionImp();
        return new JtaTransactionManager(userTransaction, userTransactionManager);
    }

4.3. Service事務驗證

DemoService.java

/**
     * 同時往 A和B 兩個數據庫中insert數據
     * @param jpaUserDO
     */
    @Transactional(transactionManager = "jtaTransactionManager")
    public void addJTAUser(JpaUserDO jpaUserDO){
        aUserMapper.addUsername(jpaUserDO.getUsername());
        bUserMapper.addUsername(jpaUserDO.getUsername());
        //int a=1/0;
    }
  • 當正常執行,沒有報錯,A和B兩個數據庫中都能成功插入數據。
  • 當將 int a=1/0; 註釋打開,報錯會致使A和B兩個數據庫中的事務一塊兒回滾,都不會插入數據。

經過以上驗證,2PC的分佈式事務試驗成功!

相關文章
相關標籤/搜索