可能因爲業務上的某些需求,咱們的系統中有時每每要鏈接多個數據庫,這就產生了多數據源問題。html
多數據源的狀況下,通常咱們要作到能夠自動切換,此時會涉及到事務註解 Transactional 不生效問題和分佈式事務問題。java
關於多數據源方案,筆者在網上看過一些例子,然而大部分都是錯誤示例,根本跑不通,或者沒辦法兼容事務。mysql
今天,咱們就一點點來分析這些問題產生的根源和相應的解決方法。git
爲了劇情的順利開展,咱們模擬的業務是建立訂單和扣減庫存。github
因此,咱們先建立訂單表和庫存表。注意,把他們分別放到兩個數據庫中。面試
CREATE TABLE `t_storage` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`commodity_code` varchar(255) DEFAULT NULL,
`count` int(11) DEFAULT '0',
PRIMARY KEY (`id`),
UNIQUE KEY `commodity_code` (`commodity_code`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;
CREATE TABLE `t_order` (
`id` bigint(16) NOT NULL,
`commodity_code` varchar(255) DEFAULT NULL,
`count` int(11) DEFAULT '0',
`amount` double(14,2) DEFAULT '0.00',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
複製代碼
經過YML文件先把兩個數據庫都配置一下。spring
spring:
datasource:
ds1:
jdbc_url: jdbc:mysql://127.0.0.1:3306/db1
username: root
password: root
ds2:
jdbc_url: jdbc:mysql://127.0.0.1:3306/db2
username: root
password: root
複製代碼
咱們知道,Mybatis執行一條SQL語句的時候,須要先獲取一個Connection。這時候,就交由Spring管理器到DataSource中獲取鏈接。sql
Spring中有個具備路由功能的DataSource,它能夠經過查找鍵調用不一樣的數據源,這就是AbstractRoutingDataSource
。數據庫
public abstract class AbstractRoutingDataSource{
//數據源的集合
@Nullable
private Map<Object, Object> targetDataSources;
//默認的數據源
@Nullable
private Object defaultTargetDataSource;
//返回當前的路由鍵,根據該值返回不一樣的數據源
@Nullable
protected abstract Object determineCurrentLookupKey();
//肯定一個數據源
protected DataSource determineTargetDataSource() {
//抽象方法 返回一個路由鍵
Object lookupKey = determineCurrentLookupKey();
DataSource dataSource = this.targetDataSources.get(lookupKey);
return dataSource;
}
}
複製代碼
能夠看到,該抽象類的核心就是先設置多個數據源到Map集合中,而後根據Key能夠獲取不一樣的數據源。安全
那麼,咱們就能夠重寫這個determineCurrentLookupKey方法,它返回的是一個數據源的名稱。
public class DynamicDataSource extends AbstractRoutingDataSource {
@Override
protected Object determineCurrentLookupKey() {
DataSourceType.DataBaseType dataBaseType = DataSourceType.getDataBaseType();
return dataBaseType;
}
}
複製代碼
而後還須要一個工具類,來保存當前線程的數據源類型。
public class DataSourceType {
public enum DataBaseType {
ds1, ds2
}
// 使用ThreadLocal保證線程安全
private static final ThreadLocal<DataBaseType> TYPE = new ThreadLocal<DataBaseType>();
// 往當前線程裏設置數據源類型
public static void setDataBaseType(DataBaseType dataBaseType) {
if (dataBaseType == null) {
throw new NullPointerException();
}
TYPE.set(dataBaseType);
}
// 獲取數據源類型
public static DataBaseType getDataBaseType() {
DataBaseType dataBaseType = TYPE.get() == null ? DataBaseType.ds1 : TYPE.get();
return dataBaseType;
}
}
複製代碼
這些都搞定以後,咱們還須要把這個DataSource配置到Spring容器中去。下面這個配置類的做用以下:
@Configuration
public class DataSourceConfig {
/**
* 建立多個數據源 ds1 和 ds2
* 此處的Primary,是設置一個Bean的優先級
* @return
*/
@Primary
@Bean(name = "ds1")
@ConfigurationProperties(prefix = "spring.datasource.ds1")
public DataSource getDateSource1() {
return DataSourceBuilder.create().build();
}
@Bean(name = "ds2")
@ConfigurationProperties(prefix = "spring.datasource.ds2")
public DataSource getDateSource2() {
return DataSourceBuilder.create().build();
}
/**
* 將多個數據源注入到DynamicDataSource
* @param dataSource1
* @param dataSource2
* @return
*/
@Bean(name = "dynamicDataSource")
public DynamicDataSource DataSource(@Qualifier("ds1") DataSource dataSource1,
@Qualifier("ds2") DataSource dataSource2) {
Map<Object, Object> targetDataSource = new HashMap<>();
targetDataSource.put(DataSourceType.DataBaseType.ds1, dataSource1);
targetDataSource.put(DataSourceType.DataBaseType.ds2, dataSource2);
DynamicDataSource dataSource = new DynamicDataSource();
dataSource.setTargetDataSources(targetDataSource);
dataSource.setDefaultTargetDataSource(dataSource1);
return dataSource;
}
/**
* 將動態數據源注入到SqlSessionFactory
* @param dynamicDataSource
* @return
* @throws Exception
*/
@Bean(name = "SqlSessionFactory")
public SqlSessionFactory getSqlSessionFactory(@Qualifier("dynamicDataSource") DataSource dynamicDataSource)
throws Exception {
SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
bean.setDataSource(dynamicDataSource);
bean.setMapperLocations(
new PathMatchingResourcePatternResolver().getResources("classpath*:mapping/*.xml"));
bean.setTypeAliasesPackage("cn.youyouxunyin.multipledb2.entity");
return bean.getObject();
}
}
複製代碼
上面的配置都完成以後,咱們還須要想辦法動態的改變數據源的鍵值,這個就跟系統的業務相關了。
好比在這裏,咱們有兩個Mapper接口,建立訂單和扣減庫存。
public interface OrderMapper {
void createOrder(Order order);
}
public interface StorageMapper {
void decreaseStorage(Order order);
}
複製代碼
那麼,咱們就能夠搞一個切面,在執行訂單的操做時,切到數據源ds1,執行庫存操做時,切到數據源ds2。
@Component
@Aspect
public class DataSourceAop {
@Before("execution(* cn.youyouxunyin.multipledb2.mapper.OrderMapper.*(..))")
public void setDataSource1() {
DataSourceType.setDataBaseType(DataSourceType.DataBaseType.ds1);
}
@Before("execution(* cn.youyouxunyin.multipledb2.mapper.StorageMapper.*(..))")
public void setDataSource2() {
DataSourceType.setDataBaseType(DataSourceType.DataBaseType.ds2);
}
}
複製代碼
如今就能夠寫一個Service方法,經過REST接口測試一下啦。
public class OrderServiceImpl implements OrderService {
@Override
public void createOrder(Order order) {
storageMapper.decreaseStorage(order);
logger.info("庫存已扣減,商品代碼:{},購買數量:{}。建立訂單中...",order.getCommodityCode(),order.getCount());
orderMapper.createOrder(order);
}
}
複製代碼
不出意外的話,業務執行完成後,兩個數據庫的表都已經有了變化。
但此時,咱們會想到,這兩個操做是須要保證原子性的。因此,咱們須要依賴事務,在Service方法上標註Transactional。
若是咱們在createOrder方法上添加了Transactional註解,而後在運行代碼,就會拋出異常。
### Cause: java.sql.SQLSyntaxErrorException: Table 'db2.t_order' doesn't exist
; bad SQL grammar []; nested exception is java.sql.SQLSyntaxErrorException:
Table 'db2.t_order' doesn't exist] with root cause 複製代碼
這就說明,若是加上了 Spring 的事務,咱們的數據源切換不過去了。這又是咋回事呢?
要想搞清楚緣由,咱們就得來分析分析若是加上了Spring事務,它又幹了哪些事情呢 ?
咱們知道,Spring的自動事務是基於AOP實現的。在調用包含事務的方法時,會進入一個攔截器。
public class TransactionInterceptor{
public Object invoke(MethodInvocation invocation) throws Throwable {
//獲取目標類
Class<?> targetClass = AopUtils.getTargetClass(invocation.getThis());
//事務調用
return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
}
}
複製代碼
在這裏面呢,首先就是開始建立一個事務。
protected Object doGetTransaction() {
//DataSource的事務對象
DataSourceTransactionObject txObject = new DataSourceTransactionObject();
//設置事務自動保存
txObject.setSavepointAllowed(isNestedTransactionAllowed());
//給事務對象設置ConnectionHolder
ConnectionHolder conHolder = TransactionSynchronizationManager.getResource(obtainDataSource());
txObject.setConnectionHolder(conHolder, false);
return txObject;
}
複製代碼
在這一步,重點是給事務對象設置了ConnectionHolder屬性,不過此時仍是爲空。
接下來,就是開啓一個事務,這裏主要是經過ThreadLocal將資源和當前的事務對象綁定,而後設置一些事務狀態。
protected void doBegin(Object txObject, TransactionDefinition definition) {
Connection con = null;
//從數據源中獲取一個鏈接
Connection newCon = obtainDataSource().getConnection();
//從新設置事務對象中的connectionHolder,此時已經引用了一個鏈接
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
//將這個connectionHolder標記爲與事務同步
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
con = txObject.getConnectionHolder().getConnection();
con.setAutoCommit(false);
//激活事務活動狀態
txObject.getConnectionHolder().setTransactionActive(true);
//將connection holder綁定到當前線程,經過threadlocal
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
}
//事務管理器,激活事務同步狀態
TransactionSynchronizationManager.initSynchronization();
}
複製代碼
開啓事務以後,就開始執行目標類真實方法。在這裏,就會開始進入Mybatis的代理對象。。哈哈,框架嘛,就各類代理。
咱們知道,Mybatis在執行SQL的以前,須要先獲取到SqlSession對象。
public static SqlSession getSqlSession(SqlSessionFactory sessionFactory, ExecutorType executorType,
PersistenceExceptionTranslator exceptionTranslator) {
//從ThreadLocal中獲取SqlSessionHolder,第一次獲取不到爲空
SqlSessionHolder holder = TransactionSynchronizationManager.getResource(sessionFactory);
//若是SqlSessionHolder爲空,那也確定獲取不到SqlSession;
//若是SqlSessionHolder不爲空,直接經過它來拿到SqlSession
SqlSession session = sessionHolder(executorType, holder);
if (session != null) {
return session;
}
//建立一個新的SqlSession
session = sessionFactory.openSession(executorType);
//若是當前線程的事務處於激活狀態,就將SqlSessionHolder綁定到ThreadLocal
registerSessionHolder(sessionFactory, executorType, exceptionTranslator, session);
return session;
}
複製代碼
拿到SqlSession以後,就開始調用Mybatis的執行器,準備執行SQL語句。在執行SQL以前呢,固然須要先拿到Connection鏈接。
public Connection getConnection() throws SQLException {
//經過數據源獲取鏈接
//好比咱們配置了多數據源,此時還會正常切換
if (this.connection == null) {
openConnection();
}
return this.connection;
}
複製代碼
咱們看openConnection方法,它的做用就是從數據源中獲取一個Connection鏈接。若是咱們配置了多數據源,此時是能夠正常切換的。若是加了事務,之因此沒有切換數據源,是由於第二次調用時,this.connection != null
,返回的仍是上一次的鏈接。
這是由於,在第二次獲取SqlSession的時候,當前線程是從ThreadLocal中拿到的,因此不會重複獲取Connection鏈接。
至此,在多數據源狀況下,若是加了Spring事務,不能動態切換數據源的緣由,咱們應該都明白了。
在這裏,筆者插播一道面試題:
那就是將多個業務操做,放到同一個數據庫鏈接中,一塊兒提交或回滾。
這裏就是各類ThreadlLocal的運用,想辦法將數據庫資源和當前事務綁定到一塊兒。
上面咱們已經把緣由搞清楚了,接下來就看怎麼支持它動態切換數據源。
其餘配置都不變的狀況下,咱們須要建立兩個不一樣的sqlSessionFactory。
@Bean(name = "sqlSessionFactory1")
public SqlSessionFactory sqlSessionFactory1(@Qualifier("ds1") DataSource dataSource){
return createSqlSessionFactory(dataSource);
}
@Bean(name = "sqlSessionFactory2")
public SqlSessionFactory sqlSessionFactory2(@Qualifier("ds2") DataSource dataSource){
return createSqlSessionFactory(dataSource);
}
複製代碼
而後自定義一個CustomSqlSessionTemplate,來代替Mybatis中原有的sqlSessionTemplate,把上面定義的兩個SqlSessionFactory注入進去。
@Bean(name = "sqlSessionTemplate")
public CustomSqlSessionTemplate sqlSessionTemplate(){
Map<Object,SqlSessionFactory> sqlSessionFactoryMap = new HashMap<>();
sqlSessionFactoryMap.put("ds1",factory1);
sqlSessionFactoryMap.put("ds2",factory2);
CustomSqlSessionTemplate customSqlSessionTemplate = new CustomSqlSessionTemplate(factory1);
customSqlSessionTemplate.setTargetSqlSessionFactorys(sqlSessionFactoryMap);
customSqlSessionTemplate.setDefaultTargetSqlSessionFactory(factory1);
return customSqlSessionTemplate;
}
複製代碼
在定義的CustomSqlSessionTemplate中,其餘都同樣,主要看獲取SqlSessionFactory的方法。
public class CustomSqlSessionTemplate extends SqlSessionTemplate {
@Override
public SqlSessionFactory getSqlSessionFactory() {
//當前數據源的名稱
String currentDsName = DataSourceType.getDataBaseType().name();
SqlSessionFactory targetSqlSessionFactory = targetSqlSessionFactorys.get(currentDsName);
if (targetSqlSessionFactory != null) {
return targetSqlSessionFactory;
} else if (defaultTargetSqlSessionFactory != null) {
return defaultTargetSqlSessionFactory;
}
return this.sqlSessionFactory;
}
}
複製代碼
在這裏,重點就是咱們能夠根據不一樣的數據源獲取不一樣的SqlSessionFactory。若是SqlSessionFactory不同,那麼在獲取SqlSession的時候,就不會在ThreadLocal中拿到,從而每次都是新的SqlSession對象。
既然SqlSession也不同,那麼在獲取Connection鏈接的時候,每次都會去動態數據源中去獲取。
原理就是這麼個原理,咱們來走一把。
修改完配置以後,咱們把Service方法加上事務的註解,此時數據也是能夠正常更新的。
@Transactional
@Override
public void createOrder(Order order) {
storageMapper.decreaseStorage(order);
orderMapper.createOrder(order);
}
複製代碼
能夠切換數據源只是第一步,咱們須要的保證能夠保證事務操做。假如在上面的代碼中,庫存扣減完成,可是建立訂單失敗,庫存是不會回滾的。由於它們分別屬於不一樣的數據源,根本不是同一個鏈接。
要解決上面那個問題,咱們只能考慮XA協議。
關於XA協議是啥,筆者再也不過多的描述。咱們只需知道,MySQL InnoDB存儲引擎是支持XA事務的。
那麼XA協議的實現,在Java中叫作Java Transaction Manager,簡稱JTA。
如何實現JTA呢?咱們藉助Atomikos框架,先引入它的依賴。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jta-atomikos</artifactId>
<version>2.2.7.RELEASE</version>
</dependency>
複製代碼
而後,只需把DataSource對象改爲AtomikosDataSourceBean。
public DataSource getDataSource(Environment env, String prefix, String dataSourceName){
Properties prop = build(env,prefix);
AtomikosDataSourceBean ds = new AtomikosDataSourceBean();
ds.setXaDataSourceClassName(MysqlXADataSource.class.getName());
ds.setUniqueResourceName(dataSourceName);
ds.setXaProperties(prop);
return ds;
}
複製代碼
這樣配完以後,獲取Connection鏈接的時候,拿到的實際上是MysqlXAConnection對象。在提交或者回滾的時候,走的就是MySQL的XA協議了。
public void commit(Xid xid, boolean onePhase) throws XAException {
//封裝 XA COMMIT 請求
StringBuilder commandBuf = new StringBuilder(300);
commandBuf.append("XA COMMIT ");
appendXid(commandBuf, xid);
try {
//交給MySQL執行XA事務操做
dispatchCommand(commandBuf.toString());
} finally {
this.underlyingConnection.setInGlobalTx(false);
}
}
複製代碼
經過引入Atomikos和修改DataSource,在多數據源狀況下,即使業務操做中間發生錯誤,多個數據庫也是能夠正常回滾的。
另一個問題,是否應該使用XA協議?
XA協議看起來看起來比較簡單,但它也有一些缺點。好比:
在MySQL官方文檔中也列舉了一些XA協議的限制項:
https://dev.mysql.com/doc/refman/8.0/en/xa-restrictions.html
另外,筆者在實際的項目裏,其實也沒有用過,經過這樣的方式來解決分佈式事務問題,此例僅作可行性方案探討。
本文經過引入SpringBoot+Mybatis的多數據源場景,分析了以下問題:
因爲篇幅有限,本文示例不包含全部的代碼。若有須要,請到GitHub自取。
https://github.com/taoxun/multipledb2.git
原創不易,客官們點個贊再走嘛,這將是筆者持續寫做的動力~