SpringBoot+Mybatis配置多數據源及事務方案

前言

可能因爲業務上的某些需求,咱們的系統中有時每每要鏈接多個數據庫,這就產生了多數據源問題。html

多數據源的狀況下,通常咱們要作到能夠自動切換,此時會涉及到事務註解 Transactional 不生效問題和分佈式事務問題。java

關於多數據源方案,筆者在網上看過一些例子,然而大部分都是錯誤示例,根本跑不通,或者沒辦法兼容事務。mysql

今天,咱們就一點點來分析這些問題產生的根源和相應的解決方法。git

1、多數據源

爲了劇情的順利開展,咱們模擬的業務是建立訂單和扣減庫存。github

因此,咱們先建立訂單表和庫存表。注意,把他們分別放到兩個數據庫中。面試

CREATE TABLE `t_storage` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `commodity_code` varchar(255) DEFAULT NULL,
  `count` int(11) DEFAULT '0',
  PRIMARY KEY (`id`),
  UNIQUE KEY `commodity_code` (`commodity_code`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;

CREATE TABLE `t_order` (
  `id` bigint(16) NOT NULL,
  `commodity_code` varchar(255) DEFAULT NULL,
  `count` int(11) DEFAULT '0',
  `amount` double(14,2) DEFAULT '0.00',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
複製代碼

一、數據庫鏈接

經過YML文件先把兩個數據庫都配置一下。spring

spring:
  datasource:
    ds1:
      jdbc_url: jdbc:mysql://127.0.0.1:3306/db1
      username: root
      password: root
    ds2:
      jdbc_url: jdbc:mysql://127.0.0.1:3306/db2
      username: root
      password: root
複製代碼

二、配置DataSource

咱們知道,Mybatis執行一條SQL語句的時候,須要先獲取一個Connection。這時候,就交由Spring管理器到DataSource中獲取鏈接。sql

Spring中有個具備路由功能的DataSource,它能夠經過查找鍵調用不一樣的數據源,這就是AbstractRoutingDataSource數據庫

public abstract class AbstractRoutingDataSource{
    //數據源的集合
    @Nullable
    private Map<Object, Object> targetDataSources;
    //默認的數據源
    @Nullable
    private Object defaultTargetDataSource;
	
    //返回當前的路由鍵,根據該值返回不一樣的數據源
    @Nullable
    protected abstract Object determineCurrentLookupKey();
    
    //肯定一個數據源
    protected DataSource determineTargetDataSource() {
        //抽象方法 返回一個路由鍵
        Object lookupKey = determineCurrentLookupKey();
        DataSource dataSource = this.targetDataSources.get(lookupKey);
        return dataSource;
    }
}
複製代碼

能夠看到,該抽象類的核心就是先設置多個數據源到Map集合中,而後根據Key能夠獲取不一樣的數據源。安全

那麼,咱們就能夠重寫這個determineCurrentLookupKey方法,它返回的是一個數據源的名稱。

public class DynamicDataSource extends AbstractRoutingDataSource {
    @Override
    protected Object determineCurrentLookupKey() {
        DataSourceType.DataBaseType dataBaseType = DataSourceType.getDataBaseType();
        return dataBaseType;
    }
}
複製代碼

而後還須要一個工具類,來保存當前線程的數據源類型。

public class DataSourceType {

    public enum DataBaseType {
        ds1, ds2
    }
    // 使用ThreadLocal保證線程安全
    private static final ThreadLocal<DataBaseType> TYPE = new ThreadLocal<DataBaseType>();
    // 往當前線程裏設置數據源類型
    public static void setDataBaseType(DataBaseType dataBaseType) {
        if (dataBaseType == null) {
            throw new NullPointerException();
        }
        TYPE.set(dataBaseType);
    }
    // 獲取數據源類型
    public static DataBaseType getDataBaseType() {
        DataBaseType dataBaseType = TYPE.get() == null ? DataBaseType.ds1 : TYPE.get();
        return dataBaseType;
    }
}
複製代碼

這些都搞定以後,咱們還須要把這個DataSource配置到Spring容器中去。下面這個配置類的做用以下:

  • 建立多個數據源DataSource,ds1 和 ds2;
  • 將ds1 和 ds2 數據源放入動態數據源DynamicDataSource;
  • 將DynamicDataSource注入到SqlSessionFactory。
@Configuration
public class DataSourceConfig {

    /**
     * 建立多個數據源 ds1 和 ds2
     * 此處的Primary,是設置一個Bean的優先級
     * @return
     */
    @Primary
    @Bean(name = "ds1")
    @ConfigurationProperties(prefix = "spring.datasource.ds1")
    public DataSource getDateSource1() {
        return DataSourceBuilder.create().build();
    }
    @Bean(name = "ds2")
    @ConfigurationProperties(prefix = "spring.datasource.ds2")
    public DataSource getDateSource2() {
        return DataSourceBuilder.create().build();
    }


    /**
     * 將多個數據源注入到DynamicDataSource
     * @param dataSource1
     * @param dataSource2
     * @return
     */
    @Bean(name = "dynamicDataSource")
    public DynamicDataSource DataSource(@Qualifier("ds1") DataSource dataSource1,
                                        @Qualifier("ds2") DataSource dataSource2) {
        Map<Object, Object> targetDataSource = new HashMap<>();
        targetDataSource.put(DataSourceType.DataBaseType.ds1, dataSource1);
        targetDataSource.put(DataSourceType.DataBaseType.ds2, dataSource2);
        DynamicDataSource dataSource = new DynamicDataSource();
        dataSource.setTargetDataSources(targetDataSource);
        dataSource.setDefaultTargetDataSource(dataSource1);
        return dataSource;
    }
    
    
    /**
     * 將動態數據源注入到SqlSessionFactory
     * @param dynamicDataSource
     * @return
     * @throws Exception
     */
    @Bean(name = "SqlSessionFactory")
    public SqlSessionFactory getSqlSessionFactory(@Qualifier("dynamicDataSource") DataSource dynamicDataSource)
            throws Exception {
        SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
        bean.setDataSource(dynamicDataSource);
        bean.setMapperLocations(
                new PathMatchingResourcePatternResolver().getResources("classpath*:mapping/*.xml"));
        bean.setTypeAliasesPackage("cn.youyouxunyin.multipledb2.entity");
        return bean.getObject();
    }
}
複製代碼

三、設置路由鍵

上面的配置都完成以後,咱們還須要想辦法動態的改變數據源的鍵值,這個就跟系統的業務相關了。

好比在這裏,咱們有兩個Mapper接口,建立訂單和扣減庫存。

public interface OrderMapper {
    void createOrder(Order order);
}
public interface StorageMapper {
    void decreaseStorage(Order order);
}
複製代碼

那麼,咱們就能夠搞一個切面,在執行訂單的操做時,切到數據源ds1,執行庫存操做時,切到數據源ds2。

@Component
@Aspect
public class DataSourceAop {
    @Before("execution(* cn.youyouxunyin.multipledb2.mapper.OrderMapper.*(..))")
    public void setDataSource1() {
        DataSourceType.setDataBaseType(DataSourceType.DataBaseType.ds1);
    }
    @Before("execution(* cn.youyouxunyin.multipledb2.mapper.StorageMapper.*(..))")
    public void setDataSource2() {
        DataSourceType.setDataBaseType(DataSourceType.DataBaseType.ds2);
    }
}
複製代碼

四、測試

如今就能夠寫一個Service方法,經過REST接口測試一下啦。

public class OrderServiceImpl implements OrderService {
    @Override
    public void createOrder(Order order) {
        storageMapper.decreaseStorage(order);
        logger.info("庫存已扣減,商品代碼:{},購買數量:{}。建立訂單中...",order.getCommodityCode(),order.getCount());
        orderMapper.createOrder(order);
    }
}
複製代碼

不出意外的話,業務執行完成後,兩個數據庫的表都已經有了變化。

但此時,咱們會想到,這兩個操做是須要保證原子性的。因此,咱們須要依賴事務,在Service方法上標註Transactional。

若是咱們在createOrder方法上添加了Transactional註解,而後在運行代碼,就會拋出異常。

### Cause: java.sql.SQLSyntaxErrorException: Table 'db2.t_order' doesn't exist
; bad SQL grammar []; nested exception is java.sql.SQLSyntaxErrorException: 
    Table 'db2.t_order' doesn't exist] with root cause 複製代碼

這就說明,若是加上了 Spring 的事務,咱們的數據源切換不過去了。這又是咋回事呢?

2、事務模式,爲啥不能切換數據源

要想搞清楚緣由,咱們就得來分析分析若是加上了Spring事務,它又幹了哪些事情呢 ?

咱們知道,Spring的自動事務是基於AOP實現的。在調用包含事務的方法時,會進入一個攔截器。

public class TransactionInterceptor{
    public Object invoke(MethodInvocation invocation) throws Throwable {
        //獲取目標類
        Class<?> targetClass = AopUtils.getTargetClass(invocation.getThis());
        //事務調用
        return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
    }
}
複製代碼

一、建立事務

在這裏面呢,首先就是開始建立一個事務。

protected Object doGetTransaction() {
    //DataSource的事務對象
    DataSourceTransactionObject txObject = new DataSourceTransactionObject();
    //設置事務自動保存
    txObject.setSavepointAllowed(isNestedTransactionAllowed());
    //給事務對象設置ConnectionHolder
    ConnectionHolder conHolder = TransactionSynchronizationManager.getResource(obtainDataSource());
    txObject.setConnectionHolder(conHolder, false);
    return txObject;
}
複製代碼

在這一步,重點是給事務對象設置了ConnectionHolder屬性,不過此時仍是爲空。

二、開啓事務

接下來,就是開啓一個事務,這裏主要是經過ThreadLocal將資源和當前的事務對象綁定,而後設置一些事務狀態。

protected void doBegin(Object txObject, TransactionDefinition definition) {
    
    Connection con = null;
    //從數據源中獲取一個鏈接
    Connection newCon = obtainDataSource().getConnection();
    //從新設置事務對象中的connectionHolder,此時已經引用了一個鏈接
    txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
    //將這個connectionHolder標記爲與事務同步
    txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
    con = txObject.getConnectionHolder().getConnection();
    con.setAutoCommit(false);
    //激活事務活動狀態
    txObject.getConnectionHolder().setTransactionActive(true);
    //將connection holder綁定到當前線程,經過threadlocal
    if (txObject.isNewConnectionHolder()) {
        TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
    }
    //事務管理器,激活事務同步狀態
    TransactionSynchronizationManager.initSynchronization();
}
複製代碼

三、執行Mapper接口

開啓事務以後,就開始執行目標類真實方法。在這裏,就會開始進入Mybatis的代理對象。。哈哈,框架嘛,就各類代理。

咱們知道,Mybatis在執行SQL的以前,須要先獲取到SqlSession對象。

public static SqlSession getSqlSession(SqlSessionFactory sessionFactory, ExecutorType executorType,
                PersistenceExceptionTranslator exceptionTranslator) {

    //從ThreadLocal中獲取SqlSessionHolder,第一次獲取不到爲空
    SqlSessionHolder holder = TransactionSynchronizationManager.getResource(sessionFactory);
    
    //若是SqlSessionHolder爲空,那也確定獲取不到SqlSession;
    //若是SqlSessionHolder不爲空,直接經過它來拿到SqlSession
    SqlSession session = sessionHolder(executorType, holder);
    if (session != null) {
        return session;
    }
    //建立一個新的SqlSession
    session = sessionFactory.openSession(executorType);
    //若是當前線程的事務處於激活狀態,就將SqlSessionHolder綁定到ThreadLocal
    registerSessionHolder(sessionFactory, executorType, exceptionTranslator, session);
    return session;
}
複製代碼

拿到SqlSession以後,就開始調用Mybatis的執行器,準備執行SQL語句。在執行SQL以前呢,固然須要先拿到Connection鏈接。

public Connection getConnection() throws SQLException {
    //經過數據源獲取鏈接
    //好比咱們配置了多數據源,此時還會正常切換
    if (this.connection == null) {
        openConnection();
    }
    return this.connection;
}
複製代碼

咱們看openConnection方法,它的做用就是從數據源中獲取一個Connection鏈接。若是咱們配置了多數據源,此時是能夠正常切換的。若是加了事務,之因此沒有切換數據源,是由於第二次調用時,this.connection != null,返回的仍是上一次的鏈接。

這是由於,在第二次獲取SqlSession的時候,當前線程是從ThreadLocal中拿到的,因此不會重複獲取Connection鏈接。

至此,在多數據源狀況下,若是加了Spring事務,不能動態切換數據源的緣由,咱們應該都明白了。

在這裏,筆者插播一道面試題:

  • Spring是如何保證事務的?

那就是將多個業務操做,放到同一個數據庫鏈接中,一塊兒提交或回滾。

  • 怎麼作到,都在一個鏈接中呢?

這裏就是各類ThreadlLocal的運用,想辦法將數據庫資源和當前事務綁定到一塊兒。

3、事務模式,怎麼支持切換數據源

上面咱們已經把緣由搞清楚了,接下來就看怎麼支持它動態切換數據源。

其餘配置都不變的狀況下,咱們須要建立兩個不一樣的sqlSessionFactory。

@Bean(name = "sqlSessionFactory1")
public SqlSessionFactory sqlSessionFactory1(@Qualifier("ds1") DataSource dataSource){
    return createSqlSessionFactory(dataSource);
}

@Bean(name = "sqlSessionFactory2")
public SqlSessionFactory sqlSessionFactory2(@Qualifier("ds2") DataSource dataSource){
    return createSqlSessionFactory(dataSource);
}
複製代碼

而後自定義一個CustomSqlSessionTemplate,來代替Mybatis中原有的sqlSessionTemplate,把上面定義的兩個SqlSessionFactory注入進去。

@Bean(name = "sqlSessionTemplate")
public CustomSqlSessionTemplate sqlSessionTemplate(){
    Map<Object,SqlSessionFactory> sqlSessionFactoryMap = new HashMap<>();
    sqlSessionFactoryMap.put("ds1",factory1);
    sqlSessionFactoryMap.put("ds2",factory2);
    CustomSqlSessionTemplate customSqlSessionTemplate = new CustomSqlSessionTemplate(factory1);
    customSqlSessionTemplate.setTargetSqlSessionFactorys(sqlSessionFactoryMap);
    customSqlSessionTemplate.setDefaultTargetSqlSessionFactory(factory1);
    return customSqlSessionTemplate;
}
複製代碼

在定義的CustomSqlSessionTemplate中,其餘都同樣,主要看獲取SqlSessionFactory的方法。

public class CustomSqlSessionTemplate extends SqlSessionTemplate {
    @Override
    public SqlSessionFactory getSqlSessionFactory() {
        //當前數據源的名稱
        String currentDsName = DataSourceType.getDataBaseType().name();
        SqlSessionFactory targetSqlSessionFactory = targetSqlSessionFactorys.get(currentDsName);
        if (targetSqlSessionFactory != null) {
            return targetSqlSessionFactory;
        } else if (defaultTargetSqlSessionFactory != null) {
            return defaultTargetSqlSessionFactory;
        }
        return this.sqlSessionFactory;
    }
}
複製代碼

在這裏,重點就是咱們能夠根據不一樣的數據源獲取不一樣的SqlSessionFactory。若是SqlSessionFactory不同,那麼在獲取SqlSession的時候,就不會在ThreadLocal中拿到,從而每次都是新的SqlSession對象。

既然SqlSession也不同,那麼在獲取Connection鏈接的時候,每次都會去動態數據源中去獲取。

原理就是這麼個原理,咱們來走一把。

修改完配置以後,咱們把Service方法加上事務的註解,此時數據也是能夠正常更新的。

@Transactional
@Override
public void createOrder(Order order) {
    storageMapper.decreaseStorage(order);
    orderMapper.createOrder(order);
}
複製代碼

能夠切換數據源只是第一步,咱們須要的保證能夠保證事務操做。假如在上面的代碼中,庫存扣減完成,可是建立訂單失敗,庫存是不會回滾的。由於它們分別屬於不一樣的數據源,根本不是同一個鏈接。

4、XA協議分佈式事務

要解決上面那個問題,咱們只能考慮XA協議。

關於XA協議是啥,筆者再也不過多的描述。咱們只需知道,MySQL InnoDB存儲引擎是支持XA事務的。

那麼XA協議的實現,在Java中叫作Java Transaction Manager,簡稱JTA。

如何實現JTA呢?咱們藉助Atomikos框架,先引入它的依賴。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-jta-atomikos</artifactId>
    <version>2.2.7.RELEASE</version>
</dependency>
複製代碼

而後,只需把DataSource對象改爲AtomikosDataSourceBean。

public DataSource getDataSource(Environment env, String prefix, String dataSourceName){
    Properties prop = build(env,prefix);
    AtomikosDataSourceBean ds = new AtomikosDataSourceBean();
    ds.setXaDataSourceClassName(MysqlXADataSource.class.getName());
    ds.setUniqueResourceName(dataSourceName);
    ds.setXaProperties(prop);
    return ds;
}
複製代碼

這樣配完以後,獲取Connection鏈接的時候,拿到的實際上是MysqlXAConnection對象。在提交或者回滾的時候,走的就是MySQL的XA協議了。

public void commit(Xid xid, boolean onePhase) throws XAException {
    //封裝 XA COMMIT 請求
    StringBuilder commandBuf = new StringBuilder(300);
    commandBuf.append("XA COMMIT ");
    appendXid(commandBuf, xid);
    try {
        //交給MySQL執行XA事務操做
        dispatchCommand(commandBuf.toString());
    } finally {
        this.underlyingConnection.setInGlobalTx(false);
    }
}
複製代碼

經過引入Atomikos和修改DataSource,在多數據源狀況下,即使業務操做中間發生錯誤,多個數據庫也是能夠正常回滾的。

另一個問題,是否應該使用XA協議?

XA協議看起來看起來比較簡單,但它也有一些缺點。好比:

  • 性能問題,全部參與者在事務提交階段處於同步阻塞狀態,佔用系統資源,容易致使性能瓶頸,沒法知足高併發場景;
  • 若是協調者存在單點故障問題,若是協調者出現故障,參與者將一直處於鎖定狀態;
  • 主從複製可能產生事務狀態不一致。

在MySQL官方文檔中也列舉了一些XA協議的限制項:

https://dev.mysql.com/doc/refman/8.0/en/xa-restrictions.html

另外,筆者在實際的項目裏,其實也沒有用過,經過這樣的方式來解決分佈式事務問題,此例僅作可行性方案探討。

總結

本文經過引入SpringBoot+Mybatis的多數據源場景,分析了以下問題:

  • 多數據源的配置和實現;
  • Spring事務模式,多數據源不生效的緣由和解決方法;
  • 多數據源,基於XA協議的分佈式事務實現。

因爲篇幅有限,本文示例不包含全部的代碼。若有須要,請到GitHub自取。

https://github.com/taoxun/multipledb2.git

原創不易,客官們點個贊再走嘛,這將是筆者持續寫做的動力~

相關文章
相關標籤/搜索