JPA多數據源分佈式事務處理-兩種事務方案

前言

多數據源的事務處理是個老生常談的話題,跨兩個數據源的事務管理也算是分佈式事務的範疇,在同一個JVM裏處理多數據源的事務,比較經典的處理方案是JTA(基於XA協議建模的java標準事務抽象)+XA(XA事務協議),常見的JTA實現框架有Atomikos、Bitronix、Narayana,Spring對這些框架都有組件封裝,基本能夠作到開箱即用程度。本文除了分享XA事務方案外,提供了一種新的多數據源事務解決思路和視角。html

問題背景

在解決mysql字段脫敏處理時,結合sharding-jdbc的脫敏組件功能,爲了sql兼容和最小化應用改造,博主給出了一個多數據源融合的字段脫敏解決方案(只把包含脫敏字段表的操做走sharding-jdbc脫敏代理數據源)。這個方案解決了問題的同時,帶來了一個新的問題,數據源的事務是獨立的,正如我文中所述《JPA項目多數據源模式整合sharding-jdbc實現數據脫敏》,在spring上下文中,每一個數據源對應一個獨立的事務管理器,默認的事務管理器的數據源就用業務自己的數據源,因此須要加密的業務使用時,須要指定@Transactional註解裏的事務管理器名稱爲脫敏對應的事務管理器名稱。簡單的業務場景這樣用也就沒有問題了,可是通常的業務場景總有一個事務覆蓋兩個數據源的操做,這個時候單指定哪一個事務管理器都不行,so,這裏須要一種多數據源的事務管理器。java

XA事務方案

XA協議採用2PC(兩階段提交)的方式來管理分佈式事務。XA接口提供資源管理器與事務管理器之間進行通訊的標準接口。在JDBC的XA事務相關api抽象裏,相關接口定義以下mysql

XADataSource,XA協議數據源spring

public interface XADataSource extends CommonDataSource {
  /**
   * 嘗試創建物理數據庫鏈接,使用給定的用戶名和密碼。返回的鏈接能夠在分佈式事務中使用
   */
  XAConnection getXAConnection() throws SQLException;
   //省略getLogWriter等非關鍵方法
 }

XAConnectionsql

public interface XAConnection extends PooledConnection {

    /**
     * 檢索一個{@code XAResource}對象,事務管理器將使用該對象管理該{@code XAConnection}對象在分佈式事務中的事務行爲
     */
    javax.transaction.xa.XAResource getXAResource() throws SQLException;
}

XAResource數據庫

public interface XAResource {
    /**
     * 提交xid指定的全局事務
     */
    void commit(Xid xid, boolean onePhase) throws XAException;

    /**
     * 結束表明事務分支執行的工做。資源管理器從指定的事務分支中分離XA資源,並讓事務完成。
     */
    void end(Xid xid, int flags) throws XAException;

    /**
     * 通知事務管理器忽略此xid事務分支
     */
    void forget(Xid xid) throws XAException;

    /**
     * 判斷是否同一個資源管理器
     */
    boolean isSameRM(XAResource xares) throws XAException;

    /**
     * 指定xid事務準備階段
     */
    int prepare(Xid xid) throws XAException;

    /**
     * 從資源管理器獲取準備好的事務分支的列表。事務管理器在恢復期間調用此方法,
     * 以獲取當前處於準備狀態或初步完成狀態的事務分支的列表。
     */
    Xid[] recover(int flag) throws XAException;

    /**
     * 通知資源管理器回滾表明事務分支完成的工做。
     */
    void rollback(Xid xid) throws XAException;

    /**
     * 表明xid中指定的事務分支開始工做。
     */
    void start(Xid xid, int flags) throws XAException;

    //省略非關鍵方法
}

相比較普通的事務管理,JDBC的XA協議管理多了一個XAResource資源管理器,XA事務相關的行爲(開啓、準備、提交、回滾、結束)都由這個資源管理器來控制,這些都是框架內部的行爲,體如今開發層面提供的數據源也變成了XADataSource。而JTA的抽象裏,定義了UserTransaction、TransactionManager。想要使用JTA事務,必須先實現這兩個接口。因此,若是咱們要使用JTA+XA控制多數據源的事務,在sprign boot裏以Atomikos爲例,api

引入Atomikos依賴app

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jta-atomikos</artifactId>
        </dependency>

spring boot已經幫咱們把XA事務管理器自動裝載類定義好了,如:框架

建立JTA事務管理器分佈式

@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties({ AtomikosProperties.class, JtaProperties.class })
@ConditionalOnClass({ JtaTransactionManager.class, UserTransactionManager.class })
@ConditionalOnMissingBean(PlatformTransactionManager.class)
class AtomikosJtaConfiguration {

    @Bean(initMethod = "init", destroyMethod = "shutdownWait")
    @ConditionalOnMissingBean(UserTransactionService.class)
    UserTransactionServiceImp userTransactionService(AtomikosProperties atomikosProperties,
            JtaProperties jtaProperties) {
        Properties properties = new Properties();
        if (StringUtils.hasText(jtaProperties.getTransactionManagerId())) {
            properties.setProperty("com.atomikos.icatch.tm_unique_name", jtaProperties.getTransactionManagerId());
        }
        properties.setProperty("com.atomikos.icatch.log_base_dir", getLogBaseDir(jtaProperties));
        properties.putAll(atomikosProperties.asProperties());
        return new UserTransactionServiceImp(properties);
    }
    @Bean(initMethod = "init", destroyMethod = "close")
    @ConditionalOnMissingBean(TransactionManager.class)
    UserTransactionManager atomikosTransactionManager(UserTransactionService userTransactionService) throws Exception {
        UserTransactionManager manager = new UserTransactionManager();
        manager.setStartupTransactionService(false);
        manager.setForceShutdown(true);
        return manager;
    }
    @Bean
    @ConditionalOnMissingBean(XADataSourceWrapper.class)
    AtomikosXADataSourceWrapper xaDataSourceWrapper() {
        return new AtomikosXADataSourceWrapper();
    }
    @Bean
    JtaTransactionManager transactionManager(UserTransaction userTransaction, TransactionManager transactionManager,
            ObjectProvider<TransactionManagerCustomizers> transactionManagerCustomizers) {
        JtaTransactionManager jtaTransactionManager = new JtaTransactionManager(userTransaction, transactionManager);
        transactionManagerCustomizers.ifAvailable((customizers) -> customizers.customize(jtaTransactionManager));
        return jtaTransactionManager;
    }
    、、、、、、、、、、
}

顯然,想要使用XA事務,除了須要提供UserTransaction、TransactionManager的實現。還必需要有一個XADataSource,而sharding-jdbc代理的數據源是DataSource的,咱們須要將XADataSource包裝成普通的DataSource,spring已經提供了一個AtomikosXADataSourceWrapper的XA數據源包裝器,並且在AtomikosJtaConfiguration裏已經註冊到Spring上下文中,因此咱們在自定義數據源時能夠直接注入包裝器實例,而後,由於是JPA環境,因此在建立EntityManagerFactory實例時,須要指定JPA的事務管理類型爲JTA,綜上,普通的業務默認數據源配置以下:

/**
 * @author: kl @kailing.pub
 * @date: 2020/5/18
 */
@Configuration
@EnableConfigurationProperties({JpaProperties.class, DataSourceProperties.class})
public class DataSourceConfiguration{

    @Primary
    @Bean
    public DataSource dataSource(AtomikosXADataSourceWrapper wrapper, DataSourceProperties dataSourceProperties) throws Exception {
        MysqlXADataSource dataSource = dataSourceProperties.initializeDataSourceBuilder().type(MysqlXADataSource.class).build();
        return wrapper.wrapDataSource(dataSource);
    }

    @Primary
    @Bean(initMethod = "afterPropertiesSet")
    public LocalContainerEntityManagerFactoryBean entityManagerFactory(JpaProperties jpaProperties, DataSource dataSource, EntityManagerFactoryBuilder factoryBuilder) {
        return factoryBuilder.dataSource(dataSource)
                .packages(Constants.BASE_PACKAGES)
                .properties(jpaProperties.getProperties())
                .persistenceUnit("default")
                .jta(true)
                .build();
    }

    @Bean
    @Primary
    public EntityManager entityManager(EntityManagerFactory entityManagerFactory){
        //必須使用SharedEntityManagerCreator建立SharedEntityManager實例,不然SimpleJpaRepository中的事務不生效
        return SharedEntityManagerCreator.createSharedEntityManager(entityManagerFactory);
    }
}

sharding-jdbc加密數據源和普通業務數據源實際上是同一個數據源,只是走加解密邏輯的數據源須要被sharding-jdbc的加密組件代理一層,加上了加解密的處理邏輯。因此配置以下:

/**
 * @author: kl @kailing.pub
 * @date: 2020/5/18
 */
@Configuration
@EnableConfigurationProperties({JpaProperties.class,SpringBootEncryptRuleConfigurationProperties.class, SpringBootPropertiesConfigurationProperties.class})
public class EncryptDataSourceConfiguration {

    @Bean
    public DataSource encryptDataSource(DataSource dataSource,SpringBootPropertiesConfigurationProperties props,SpringBootEncryptRuleConfigurationProperties encryptRule) throws SQLException {
        return EncryptDataSourceFactory.createDataSource(dataSource, new EncryptRuleConfigurationYamlSwapper().swap(encryptRule), props.getProps());
    }

    @Bean(initMethod = "afterPropertiesSet")
    public LocalContainerEntityManagerFactoryBean encryptEntityManagerFactory(@Qualifier("encryptDataSource") DataSource dataSource,JpaProperties jpaProperties, EntityManagerFactoryBuilder factoryBuilder) throws SQLException {
        return factoryBuilder.dataSource(dataSource)
                .packages(Constants.BASE_PACKAGES)
                .properties(jpaProperties.getProperties())
                .persistenceUnit("encryptPersistenceUnit")
                .jta(true)
                .build();
    }

    @Bean
    public EntityManager encryptEntityManager(@Qualifier("encryptEntityManagerFactory") EntityManagerFactory entityManagerFactory){
        //必須使用SharedEntityManagerCreator建立SharedEntityManager實例,不然SimpleJpaRepository中的事務不生效
        return SharedEntityManagerCreator.createSharedEntityManager(entityManagerFactory);
    }
}
  • 遇到問題一、:Connection pool exhausted - try increasing 'maxPoolSize' and/or 'borrowConnectionTimeout' on the DataSourceBean.
  • 解決問題:默認AtomikosXADataSourceWrapper包裝器初始化的數據源鏈接池最大爲1,因此須要添加配置參數如:
spring.jta.atomikos.datasource.max-pool-size=20
  • 遇到問題二、: XAER_INVAL: Invalid arguments (or unsupported command)
  • 解決問題:這個是mysql實現XA的bug,僅當您在同一事務中屢次訪問同一MySQL數據庫時,纔會發生此問題,在mysql鏈接url加上以下參數便可,如:
spring.datasource.url = jdbc:mysql://127.0.0.1:3306/xxx?pinGlobalTxToPhysicalConnection=true

Mysql XA事務行爲

在這個場景中,雖然是多數據源,可是底層連接的是同一個mysql數據庫,因此XA事務行爲爲,從第一個執行的sql開始(並非JTA事務begin階段),生成xid並XA START事務,而後XA END。第二個數據源的sql執行時會判斷是否同一個mysql資源,若是是同一個則用剛生成的xid從新XA START RESUME,而後XA END,最終雖然在應用層是兩個DataSource,其實最後只會調用XA COMMIT一次。mysql驅動實現的XAResource的start以下:

public void start(Xid xid, int flags) throws XAException {
        StringBuilder commandBuf = new StringBuilder(MAX_COMMAND_LENGTH);
        commandBuf.append("XA START ");
        appendXid(commandBuf, xid);

        switch (flags) {
            case TMJOIN:
                commandBuf.append(" JOIN");
                break;
            case TMRESUME:
                commandBuf.append(" RESUME");
                break;
            case TMNOFLAGS:
                // no-op
                break;
            default:
                throw new XAException(XAException.XAER_INVAL);
        }
        dispatchCommand(commandBuf.toString());
        this.underlyingConnection.setInGlobalTx(true);
    }

第一次sql執行時,flags=0,走的TMNOFLAGS邏輯,第二次sql執行時,flags=134217728,走的TMRESUME,從新開啓事務的邏輯。以上是Mysql XA的真實事務邏輯,可是博主研究下來發現,msyql xa並不支持XA START RESUME這種語句,並且有不少限制《Mysql XA交易限制》,因此在mysql數據庫使用XA事務時,最好了解下mysql xa的缺陷

鏈式事務方案

鏈式事務不是我獨創的叫法,在spring-data-common項目的Transaction包下,已經有一個默認實現ChainedTransactionManager,前文中《深刻理解spring的@Transactional工做原理》已經分析了Spring的事務抽象,由PlatformTransactionManager(事務管理器)、TransactionStatus(事務狀態)、TransactionDefinition(事務定義)等形態組成,ChainedTransactionManager也是實現了PlatformTransactionManager和TransactionStatus。實現原理也很簡單,在ChainedTransactionManager內部維護了事務管理器的集合,經過代理編排真實的事務管理器,在事務開啓、提交、回滾時,都分別操做集合裏的事務。以達到對多個事務的統一管理。這個方案比較簡陋,並且有缺陷,在提交階段,若是異常不是發生在第一個數據源,那麼會存在以前的提交不會回滾,因此在使用ChainedTransactionManager時,儘可能把出問題可能性比較大的事務管理器放鏈的後面(開啓事務、提交事務順序相反)。這裏只是拋出了一種新的多數據源事務管理的思路,能用XA儘可能用XA管理。

普通的業務默認數據源配置以下:

/**
 * @author: kl @kailing.pub
 * @date: 2020/5/18
 */
@Configuration
@EnableConfigurationProperties({JpaProperties.class, DataSourceProperties.class})
public class DataSourceConfiguration{

    @Primary
    @Bean
    public DataSource dataSource(DataSourceProperties dataSourceProperties){
       return dataSourceProperties.initializeDataSourceBuilder().type(HikariDataSource.class).build();
    }

    @Primary
    @Bean(initMethod = "afterPropertiesSet")
    public LocalContainerEntityManagerFactoryBean entityManagerFactory(JpaProperties jpaProperties, DataSource dataSource, EntityManagerFactoryBuilder factoryBuilder) {
        return factoryBuilder.dataSource(dataSource)
                .packages(Constants.BASE_PACKAGES)
                .properties(jpaProperties.getProperties())
                .persistenceUnit("default")
                .build();
    }

    @Bean
    @Primary
    public EntityManager entityManager(EntityManagerFactory entityManagerFactory){
        //必須使用SharedEntityManagerCreator建立SharedEntityManager實例,不然SimpleJpaRepository中的事務不生效
        return SharedEntityManagerCreator.createSharedEntityManager(entityManagerFactory);
    }

    @Primary
    @Bean
    public PlatformTransactionManager transactionManager(EntityManagerFactory entityManagerFactory){
        JpaTransactionManager txManager = new JpaTransactionManager();
        txManager.setEntityManagerFactory(entityManagerFactory);
        return txManager;
    }
}

sharding-jdbc加密數據源配置以下:

/**
 * @author: kl @kailing.pub
 * @date: 2020/5/18
 */
@Configuration
@EnableConfigurationProperties({JpaProperties.class,SpringBootEncryptRuleConfigurationProperties.class, SpringBootPropertiesConfigurationProperties.class})
public class EncryptDataSourceConfiguration {

    @Bean
    public DataSource encryptDataSource(DataSource dataSource,SpringBootPropertiesConfigurationProperties props,SpringBootEncryptRuleConfigurationProperties encryptRule) throws SQLException {
        return EncryptDataSourceFactory.createDataSource(dataSource, new EncryptRuleConfigurationYamlSwapper().swap(encryptRule), props.getProps());
    }

    @Bean(initMethod = "afterPropertiesSet")
    public LocalContainerEntityManagerFactoryBean encryptEntityManagerFactory(@Qualifier("encryptDataSource") DataSource dataSource,JpaProperties jpaProperties, EntityManagerFactoryBuilder factoryBuilder) throws SQLException {
        return factoryBuilder.dataSource(dataSource)
                .packages(Constants.BASE_PACKAGES)
                .properties(jpaProperties.getProperties())
                .persistenceUnit("encryptPersistenceUnit")
                .build();
    }

    @Bean
    public EntityManager encryptEntityManager(@Qualifier("encryptEntityManagerFactory") EntityManagerFactory entityManagerFactory){
        //必須使用SharedEntityManagerCreator建立SharedEntityManager實例,不然SimpleJpaRepository中的事務不生效
        return SharedEntityManagerCreator.createSharedEntityManager(entityManagerFactory);
    }

    @Bean
    public PlatformTransactionManager chainedTransactionManager(PlatformTransactionManager transactionManager) throws SQLException {
        JpaTransactionManager encryptTransactionManager = new JpaTransactionManager();
        encryptTransactionManager.setEntityManagerFactory(encryptEntityManagerFactory());
        //使用鏈式事務管理器包裝真正的transactionManager、txManager事務
        ChainedTransactionManager chainedTransactionManager = new ChainedTransactionManager(encryptTransactionManager,transactionManager);
        return chainedTransactionManager;
    }
}

使用這種方案,在涉及到多數據源的業務時,須要指定使用哪一個事務管理器,如:

@PersistenceContext(unitName = "encryptPersistenceUnit")
    private EntityManager entityManager;

    @PersistenceContext
    private EntityManager manager;

    @Transactional(transactionManager = "chainedTransactionManager")
    public AccountModel  save(AccountDTO dto){
        AccountModel accountModel = AccountMapper.INSTANCE.dtoTo(dto);

        entityManager.persist(accountModel);
        entityManager.flush();
        AccountModel accountMode2 = AccountMapper.INSTANCE.dtoTo(dto);

        manager.persist(accountMode2);
        manager.flush();

        return accountModel;
    }

結語

綜上,對於JPA的多數據源分佈式事務處理,JTA的事務管理器通過spring boot的封裝已經能夠開箱即用了。重點在JPA環境下,須要指定EntityManagerFactory的事務使用JTA事務。另本文分享了一種鏈式事務編排的方式也能夠應用在這種場景,可是特殊的場景下不能保證事務的完整性,因此博主推薦使用JtaTransactionManager,有符合的場景也能夠試試ChainedTransactionManager。

做者簡介:

獨立博客KL博客(http://www.kailing.pub)博主。

相關文章
相關標籤/搜索