前幾天,在家裏研究了下阿里巴巴開源的分佈式事務中間件Seata
,並記錄了一下過程。java
SpringBoot+Dubbo+Seata分佈式事務實戰git
不過光有實戰不行,咱多少也得把原理搞搞清楚,否則出了問題不知咋解決豈不是很尷尬。github
首先,設想一個傳統的單體應用,經過 3 個 模塊,在同一個數據源上更新數據來完成一項業務。spring
很天然的,整個業務過程的數據一致性由本地事務來保證。sql
隨着業務需求和架構的變化,單體應用被拆分爲微服務。原來的3個模塊被拆分爲3個獨立的服務,分別使用獨立的數據。數據庫
業務過程將經過RPC的服務調用來完成。springboot
那麼這個時候,每個服務內部的數據一致性仍由本地事務來保證。bash
而整個業務層面的全局數據一致性和完整性要如何保障呢?這就是微服務架構下面臨的,典型的分佈式事務需求。服務器
Seata
把一個分佈式事務理解成一個包含了若干 分支事務 的 全局事務 。全局事務 的職責是協調其下管轄的 分支事務 達成一致,要麼一塊兒成功提交,要麼一塊兒失敗回滾。此外,一般 分支事務 自己就是一個知足 ACID 的 本地事務。架構
Seata
定義了3個組件來協議分佈式事務的處理過程。
一個典型的分佈式事務過程:
Seata有4種分佈式事務解決方案,分別是 AT 模式、TCC 模式、Saga 模式和 XA 模式。<後兩種實現還在官方計劃版本中>
咱們的示例項目中,所用到的就是AT模式。在 AT 模式下,用戶只需關注本身的「業務 SQL」,用戶的 「業務 SQL」 做爲一階段,Seata 框架會自動生成事務的二階段提交和回滾操做。
在一階段,Seata 會攔截「業務 SQL」,首先解析 SQL 語義,找到「業務 SQL」要更新的業務數據,在業務數據被更新前,將其保存成「before image」,而後執行「業務 SQL」更新業務數據,在業務數據更新以後,再將其保存成「after image」,最後生成行鎖。以上操做所有在一個數據庫事務內完成,這樣保證了一階段操做的原子性。
二階段若是是提交的話,由於「業務 SQL」在一階段已經提交至數據庫,因此Seata 框架只需將一階段保存的快照數據和行鎖刪掉,完成數據清理便可。
二階段若是是回滾的話,Seata 就須要回滾一階段已經執行的「業務 SQL」,還原業務數據。回滾方式即是用「before image」還原業務數據。
下面咱們從源碼中來看看這整個流程是怎麼串起來的。
爲了方便看源碼,首先就得把調試環境搞起,方便Debug。
Seata 源碼:github.com/seata/seata 。
目前的版本是0.7.0-SNAPSHOT
,而後經過mvn install
將項目打包到本地。
咱們的SpringBoot+Seata
測試項目就能夠引入這個依賴。
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
<version>0.7.0-SNAPSHOT</version>
</dependency>
複製代碼
爲啥要這樣幹呢?由於Seata
不一樣組件之間的通訊都是Netty
來完成的,在調試的時候,每每會由於超時而斷開鏈接。
引入了本地版本,咱們就能夠把心跳檢測時間加長或者索性去掉,隨便搞~
找到io.seata.server.Server
,直接運行main
方法,就啓動了Seata服務,so easy~
咱們上面說Seata
定義了三個組件,其中有一個叫TC的事務協調器,就是指這個服務端。
咱們看看它具體幹了些啥。
public class Server {
public static void main(String[] args) throws IOException {
//初始化參數解析器
ParameterParser parameterParser = new ParameterParser(args);
//初始化RpcServer ,設置服務器參數
RpcServer rpcServer = new RpcServer(WORKING_THREADS);
rpcServer.setHost(parameterParser.getHost());
rpcServer.setListenPort(parameterParser.getPort());
UUIDGenerator.init(1);
//從文件或者數據庫中加載Session
SessionHolder.init(parameterParser.getStoreMode());
//初始化默認的協調器
DefaultCoordinator coordinator = new DefaultCoordinator(rpcServer);
coordinator.init();
rpcServer.setHandler(coordinator);
//註冊鉤子程序 清理協調器相關資源
ShutdownHook.getInstance().addDisposable(coordinator);
//127.0.0.1 and 0.0.0.0 are not valid here.
if (NetUtil.isValidIp(parameterParser.getHost(), false)) {
XID.setIpAddress(parameterParser.getHost());
} else {
XID.setIpAddress(NetUtil.getLocalIp());
}
XID.setPort(rpcServer.getListenPort());
//啓動RPC服務
rpcServer.init();
System.exit(0);
}
}
複製代碼
這裏的RpcServer
是經過Netty實現的一個RPC服務端,用來接收並處理TM和RM的消息。本文的重點不在服務端,因此咱們先有一個大體的印象便可。
在項目中,咱們配置了SeataConfiguration
,其中的重點是配置全局事務掃描器和數據源代理。因此,咱們先來看看爲啥要配置它們,它們具體又作了什麼事。
@Bean
public GlobalTransactionScanner globalTransactionScanner() {
return new GlobalTransactionScanner("springboot-order", "my_test_tx_group");
}
複製代碼
按照規矩,咱們看一個類,先看它的結構。好比它是誰的兒子,從哪裏來,欲往何處去?
public class GlobalTransactionScanner extends AbstractAutoProxyCreator
implements InitializingBean, ApplicationContextAware,DisposableBean {
}
複製代碼
這裏咱們看到它是AbstractAutoProxyCreator
的子類,又實現了InitializingBean
接口。
這倆哥們都是Spring你們族的成員,一個用於Spring AOP生成代理;一個用於調用Bean的初始化方法。
Bean的初始化方法有三種方式,按照前後順序是,@PostConstruct、afterPropertiesSet、init-method
。
在這裏,它的初始化方法中,主要就幹了三件事。
private void initClient() {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Initializing Global Transaction Clients ... ");
}
//init TM 初始化事務管理器
TMClient.init(applicationId, txServiceGroup);
//init RM 初始化資源管理器
RMClient.init(applicationId, txServiceGroup);
//註冊鉤子程序,用於TM、RM的資源清理
registerSpringShutdownHook();
}
複製代碼
到目前爲止,Seata定義的三個組件都已經浮出水面了。
TMClient.init
主要是初始化事務管理器的客戶端,創建與RPC服務端的鏈接,同時向事務協調器註冊。
RMClient.init
也是同樣過程,初始化資源管理器,創建與RPC服務端的鏈接,同時向事務協調器註冊。
同時,它們都是經過定時任務來完成鏈接的,因此斷線以後能夠自動重連。
timerExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
clientChannelManager.reconnect(getTransactionServiceGroup());
}
}, 5, 5, TimeUnit.SECONDS);
複製代碼
最後,註冊鉤子程序,用於清理這兩個組件中的資源。
它其實是一個Bean的後置處理器,在Bean初始化以後,調用postProcessAfterInitialization
方法。
public Object postProcessAfterInitialization(@Nullable Object bean, String beanName) {
if (bean != null) {
Object cacheKey = this.getCacheKey(bean.getClass(), beanName);
if (this.earlyProxyReferences.remove(cacheKey) != bean) {
return this.wrapIfNecessary(bean, beanName, cacheKey);
}
}
return bean;
}
複製代碼
而後在GlobalTransactionScanner.wrapIfNecessary()
裏它幹了些什麼呢?
就是檢查Bean的方法上是否包含GlobalTransactional
和GlobalLock
註解,而後生成代理類。
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey){
if (disableGlobalTransaction) {
return bean;
}
//已經生成了代理,直接返回
if (PROXYED_SET.contains(beanName)) {
return bean;
}
interceptor = null;
//檢查是否是TCC的代理
if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
//TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
} else {
Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
//判斷類方法上是否包含GlobalTransactional註解和GlobalLock註解
if (!existsAnnotation(new Class[] {serviceInterface})
&& !existsAnnotation(interfacesIfJdk)) {
return bean;
}
//建立攔截器
if (interceptor == null) {
interceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
}
}
//若是不是AOP代理,則建立代理;若是是代理,則將攔截器加入到Advisor
if (!AopUtils.isAopProxy(bean)) {
bean = super.wrapIfNecessary(bean, beanName, cacheKey);
} else {
AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
for (Advisor avr : advisor) {
advised.addAdvisor(0, avr);
}
}
PROXYED_SET.add(beanName);
return bean;
}
複製代碼
至此,咱們已經肯定了一件事。咱們ServiceImpl
實現類上帶有GlobalTransactional
註解的方法,會生成一個代理類。
在調用方法時,實際會調用的就是代理類的攔截器方法invoke()
。
public class GlobalTransactionalInterceptor implements MethodInterceptor {
@Override
public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
//獲取目標類
Class<?> targetClass = AopUtils.getTargetClass(methodInvocation.getThis());
//獲取調用的方法
Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
//獲取方法上的註解
final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, GlobalTransactional.class);
final GlobalLock globalLockAnnotation = getAnnotation(method, GlobalLock.class);
//處理全局事務
if (globalTransactionalAnnotation != null) {
return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
} else if (globalLockAnnotation != null) {
return handleGlobalLock(methodInvocation);
} else {
return methodInvocation.proceed();
}
}
}
複製代碼
能夠看到,這裏是開始處理全局事務的地方。這裏咱們先不深究,接着往下看。
除了上面建立方法的代理,還要建立數據源的代理;而後把這個代理對象設置到SqlSessionFactory
。
@Bean
public DataSourceProxy dataSourceProxy(DataSource dataSource) {
return new DataSourceProxy(dataSource);
}
@Bean
public SqlSessionFactory sqlSessionFactoryBean(DataSourceProxy dataSourceProxy) throws Exception {
SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
sqlSessionFactoryBean.setDataSource(dataSourceProxy);
sqlSessionFactoryBean.setTransactionFactory(new JdbcTransactionFactory());
return sqlSessionFactoryBean.getObject();
}
複製代碼
這裏的重點是建立了DataSourceProxy
,並把它設置到Mybatis
中的SqlSessionFactory
。
咱們知道,在Mybatis
執行方法的時候,最終要建立PreparedStatement
對象,而後執行ps.execute()
返回SQL結果。
這裏有兩點咱們須要注意:
PreparedStatement
對象是從Connection
對象建立而來的,也許咱們都寫過:
PreparedStatement pstmt = conn.prepareStatement(insert ........)
複製代碼
Connection
又是從哪裏來的呢?這個咱們沒必要遲疑,固然從數據源中才能拿到一個鏈接。
不過咱們已經把數據源DataSource
對象已經被替換成了Seata
中的DataSourceProxy
對象。
因此,Connection和PreparedStatement
在建立的時候,都被搞成了Seata
中的代理對象。
不信你看嘛:
public class DataSourceProxy extends AbstractDataSourceProxy implements Resource {
public ConnectionProxy getConnection() throws SQLException {
Connection targetConnection = targetDataSource.getConnection();
return new ConnectionProxy(this, targetConnection);
}
}
複製代碼
而後調用AbstractDataSourceProxy
來建立PreparedStatement
。
public abstract class AbstractConnectionProxy implements Connection {
@Override
public PreparedStatement prepareStatement(String sql) throws SQLException {
PreparedStatement targetPreparedStatement = getTargetConnection().prepareStatement(sql);
return new PreparedStatementProxy(this, targetPreparedStatement, sql);
}
}
複製代碼
看到這裏,咱們應該明白一件事。
在執行ps.execute()
的時候,則會調用到PreparedStatementProxy.execute()
。
理清了配置文件後面的邏輯,也許就掌握了它的脈絡,再看代碼的時候,能夠知道從哪裏下手。
上面已經說到,ServiceImpl
已是一個代理類,因此咱們直接看GlobalTransactionalInterceptor.invoke()
。
它會調用到TransactionalTemplate.execute()
,TransactionalTemplate
是業務邏輯和全局事務的模板。
public class TransactionalTemplate {
public Object execute(TransactionalExecutor business) throws Throwable {
// 1. 建立一個全局事務
GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
// 1.1 獲取事務的屬性 好比超時時間、事務名稱
TransactionInfo txInfo = business.getTransactionInfo();
if (txInfo == null) {
throw new ShouldNeverHappenException("transactionInfo does not exist");
}
try {
// 2. 開始事務
beginTransaction(txInfo, tx);
Object rs = null;
try {
// 執行業務邏輯
rs = business.execute();
} catch (Throwable ex) {
// 3.回滾
completeTransactionAfterThrowing(txInfo,tx,ex);
throw ex;
}
// 4. 提交
commitTransaction(tx);
return rs;
} finally {
//5. 清理資源
triggerAfterCompletion();
cleanUp();
}
}
}
複製代碼
這裏的代碼很清晰,事務的流程也一目瞭然。
下面咱們看看具體它是怎麼作的。
從客戶端的角度來看,開啓事務就是告訴服務器說:我要開啓一個全局事務了,請事務協調器TC先生分配一個全局事務ID給我。
TC先生會根據應用名稱、事務分組、事務名稱等建立全局Session,並生成一個全局事務XID。
而後客戶端記錄當前的事務狀態爲Begin
,並將XID綁定到當前線程。
開啓事務以後,開始執行咱們本身的業務邏輯。這就涉及到了數據庫操做,上面咱們說到Seata
已經將PreparedStatement
對象作了代理。因此在執行的時候將會調用到PreparedStatementProxy.execute()
。
public class PreparedStatementProxy{
public boolean execute() throws SQLException {
return ExecuteTemplate.execute(this, new StatementCallback<Boolean, PreparedStatement>() {
@Override
public Boolean execute(PreparedStatement statement, Object... args) throws SQLException {
return statement.execute();
}
});
}
}
複製代碼
在這裏它會先根據SQL的類型生成不一樣的執行器。好比是一個INSERT INTO
語句,那麼就是InsertExecutor
執行器。
而後判斷是否是自動提交的,執行相應方法。那麼接着看executeAutoCommitFalse()
public abstract class AbstractDMLBaseExecutor{
protected T executeAutoCommitFalse(Object[] args) throws Throwable {
TableRecords beforeImage = beforeImage();
T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
TableRecords afterImage = afterImage(beforeImage);
prepareUndoLog(beforeImage, afterImage);
return result;
}
}
複製代碼
這裏就是AT模式一階段所作的事,攔截業務SQL,在數據保存前將其保存爲beforeImage
;而後執行業務SQL,在數據更新後再將其保存爲afterImage
。這些操做所有在一個本地事務中完成,保證了一階段操做的原子性。
咱們以INSERT INTO
爲例,看看它是怎麼作的。
因爲是新增操做,因此在執行以前,這條記錄尚未,beforeImage只是一個空表記錄。
執行原有的SQL語句,好比INSERT INTO ORDER(ID,NAME)VALUE(?,?)
它要作的事就是,把剛剛添加的那條記錄從數據庫中再查出來。
protected TableRecords afterImage(TableRecords beforeImage) throws SQLException {
//查找主鍵ID的值
List<Object> pkValues = containsPK() ? getPkValuesByColumn() : getPkValuesByAuto();
//根據主鍵ID查找記錄
TableRecords afterImage = getTableRecords(pkValues);
return afterImage;
}
複製代碼
而後將beforeImage
和afterImage
構建成UndoLog
對象,保存到數據庫。重要的是,這些操做都是在同一個本地事務中進行的。咱們看它的sqlList也能看出來。
最後,咱們看一下UndoLog
在數據庫中的記錄是長這樣的:
{
"@class": "io.seata.rm.datasource.undo.BranchUndoLog",
"xid": "192.168.216.1:8091:2016493467",
"branchId": 2016493468,
"sqlUndoLogs": ["java.util.ArrayList", [{
"@class": "io.seata.rm.datasource.undo.SQLUndoLog",
"sqlType": "INSERT",
"tableName": "t_order",
"beforeImage": {
"@class": "io.seata.rm.datasource.sql.struct.TableRecords$EmptyTableRecords",
"tableName": "t_order",
"rows": ["java.util.ArrayList", []]
},
"afterImage": {
"@class": "io.seata.rm.datasource.sql.struct.TableRecords",
"tableName": "t_order",
"rows": ["java.util.ArrayList", [{
"@class": "io.seata.rm.datasource.sql.struct.Row",
"fields": ["java.util.ArrayList", [{
"@class": "io.seata.rm.datasource.sql.struct.Field",
"name": "id",
"keyType": "PrimaryKey",
"type": 4,
"value": 116
}, {
"@class": "io.seata.rm.datasource.sql.struct.Field",
"name": "order_no",
"keyType": "NULL",
"type": 12,
"value": "c233d8fb-5e71-4fc1-bc95-6f3d86312db6"
}, {
"@class": "io.seata.rm.datasource.sql.struct.Field",
"name": "user_id",
"keyType": "NULL",
"type": 12,
"value": "200548"
}, {
"@class": "io.seata.rm.datasource.sql.struct.Field",
"name": "commodity_code",
"keyType": "NULL",
"type": 12,
"value": "HYD5620"
}, {
"@class": "io.seata.rm.datasource.sql.struct.Field",
"name": "count",
"keyType": "NULL",
"type": 4,
"value": 10
}, {
"@class": "io.seata.rm.datasource.sql.struct.Field",
"name": "amount",
"keyType": "NULL",
"type": 8,
"value": 5000.0
}]]
}]]
}
}]]
}
複製代碼
若是執行業務沒有異常,就進入二階段提交。客戶端向服務器發送Commit事件,同時將XID解綁。
服務器端回覆確認提交後,客戶端將本地UndoLog數據清除。
這裏重要在AsyncWorker.init()
方法,它會啓動一個定時任務來執行doBranchCommits
,來清除Log數據。
若是發生異常,則進行二階段回滾。
先經過xid和branchId 找到UnDoLog這條記錄,而後在解析裏面的數據生成反向SQL,將剛纔的執行結果給撤銷。
這塊代碼較長,你們自行參考UndoLogManager.undo()
和AbstractUndoExecutor.executeOn()
方法。
只有一個事務管理器TM纔會開啓全局事務,那麼其餘服務參與者是如何自動歸入到全局事務中去的呢?
首先,Seata
給Dubbo搞了個Filter過濾器叫作TransactionPropagationFilter
。
它會在Dubbo RPC
上下文中設置XID,這樣在其餘服務中也能夠獲取這個XID。
而後,咱們知道,Seata已經代理了PreparedStatement
。在執行數據操做的時候,就有個判斷。
if (!RootContext.inGlobalTransaction() && !RootContext.requireGlobalLock()) {
//若是不包含XID,就執行原始方法
return statementCallback.execute(statementProxy.getTargetStatement(), args);
}
複製代碼
這裏的意思就是,若是當前線程不包含XID,就執行原始方法;若是包含呢,就繼續往下執行事務方法。
本文大概闡述了Seata TA模式下,客戶端的工做原理。還有一部分Seata服務端的邏輯,本文並無深刻涉及。
緣由在於筆者尚未徹底的吃透這部份內容,沒辦法通俗的寫出來,等之後再補~
如若文中有不許確的地方,也但願朋友們不吝賜教,謝謝。