前幾天,在家裏研究了下阿里巴巴開源的分佈式事務中間件Seata
,並記錄了一下過程。java
SpringBoot+Dubbo+Seata分佈式事務實戰git
不過光有實戰不行,咱多少也得把原理搞搞清楚,否則出了問題不知咋解決豈不是很尷尬。github
首先,設想一個傳統的單體應用,經過 3 個 模塊,在同一個數據源上更新數據來完成一項業務。spring
很天然的,整個業務過程的數據一致性由本地事務來保證。sql
隨着業務需求和架構的變化,單體應用被拆分爲微服務。原來的3個模塊被拆分爲3個獨立的服務,分別使用獨立的數據。數據庫
業務過程將經過RPC的服務調用來完成。springboot
那麼這個時候,每個服務內部的數據一致性仍由本地事務來保證。bash
而整個業務層面的全局數據一致性和完整性要如何保障呢?這就是微服務架構下面臨的,典型的分佈式事務需求。服務器
Seata
把一個分佈式事務理解成一個包含了若干 分支事務 的 全局事務 。全局事務 的職責是協調其下管轄的 分支事務 達成一致,要麼一塊兒成功提交,要麼一塊兒失敗回滾。此外,一般 分支事務 自己就是一個知足 ACID 的 本地事務。markdown
Seata
定義了3個組件來協議分佈式事務的處理過程。
一個典型的分佈式事務過程:
Seata有4種分佈式事務解決方案,分別是 AT 模式、TCC 模式、Saga 模式和 XA 模式。<後兩種實現還在官方計劃版本中>
咱們的示例項目中,所用到的就是AT模式。在 AT 模式下,用戶只需關注本身的「業務 SQL」,用戶的 「業務 SQL」 做爲一階段,Seata 框架會自動生成事務的二階段提交和回滾操做。
在一階段,Seata 會攔截「業務 SQL」,首先解析 SQL 語義,找到「業務 SQL」要更新的業務數據,在業務數據被更新前,將其保存成「before image」,而後執行「業務 SQL」更新業務數據,在業務數據更新以後,再將其保存成「after image」,最後生成行鎖。以上操做所有在一個數據庫事務內完成,這樣保證了一階段操做的原子性。
二階段若是是提交的話,由於「業務 SQL」在一階段已經提交至數據庫,因此Seata 框架只需將一階段保存的快照數據和行鎖刪掉,完成數據清理便可。
二階段若是是回滾的話,Seata 就須要回滾一階段已經執行的「業務 SQL」,還原業務數據。回滾方式即是用「before image」還原業務數據。
下面咱們從源碼中來看看這整個流程是怎麼串起來的。
爲了方便看源碼,首先就得把調試環境搞起,方便Debug。
Seata 源碼:github.com/seata/seata 。
目前的版本是0.7.0-SNAPSHOT
,而後經過mvn install
將項目打包到本地。
咱們的SpringBoot+Seata
測試項目就能夠引入這個依賴。
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
<version>0.7.0-SNAPSHOT</version>
</dependency>
複製代碼
爲啥要這樣幹呢?由於Seata
不一樣組件之間的通訊都是Netty
來完成的,在調試的時候,每每會由於超時而斷開鏈接。
引入了本地版本,咱們就能夠把心跳檢測時間加長或者索性去掉,隨便搞~
找到io.seata.server.Server
,直接運行main
方法,就啓動了Seata服務,so easy~
咱們上面說Seata
定義了三個組件,其中有一個叫TC的事務協調器,就是指這個服務端。
咱們看看它具體幹了些啥。
public class Server { public static void main(String[] args) throws IOException { //初始化參數解析器 ParameterParser parameterParser = new ParameterParser(args); //初始化RpcServer ,設置服務器參數 RpcServer rpcServer = new RpcServer(WORKING_THREADS); rpcServer.setHost(parameterParser.getHost()); rpcServer.setListenPort(parameterParser.getPort()); UUIDGenerator.init(1); //從文件或者數據庫中加載Session SessionHolder.init(parameterParser.getStoreMode()); //初始化默認的協調器 DefaultCoordinator coordinator = new DefaultCoordinator(rpcServer); coordinator.init(); rpcServer.setHandler(coordinator); //註冊鉤子程序 清理協調器相關資源 ShutdownHook.getInstance().addDisposable(coordinator); //127.0.0.1 and 0.0.0.0 are not valid here. if (NetUtil.isValidIp(parameterParser.getHost(), false)) { XID.setIpAddress(parameterParser.getHost()); } else { XID.setIpAddress(NetUtil.getLocalIp()); } XID.setPort(rpcServer.getListenPort()); //啓動RPC服務 rpcServer.init(); System.exit(0); } } 複製代碼
這裏的RpcServer
是經過Netty實現的一個RPC服務端,用來接收並處理TM和RM的消息。本文的重點不在服務端,因此咱們先有一個大體的印象便可。
在項目中,咱們配置了SeataConfiguration
,其中的重點是配置全局事務掃描器和數據源代理。因此,咱們先來看看爲啥要配置它們,它們具體又作了什麼事。
@Bean public GlobalTransactionScanner globalTransactionScanner() { return new GlobalTransactionScanner("springboot-order", "my_test_tx_group"); } 複製代碼
按照規矩,咱們看一個類,先看它的結構。好比它是誰的兒子,從哪裏來,欲往何處去?
public class GlobalTransactionScanner extends AbstractAutoProxyCreator
implements InitializingBean, ApplicationContextAware,DisposableBean {
}
複製代碼
這裏咱們看到它是AbstractAutoProxyCreator
的子類,又實現了InitializingBean
接口。
這倆哥們都是Spring你們族的成員,一個用於Spring AOP生成代理;一個用於調用Bean的初始化方法。
Bean的初始化方法有三種方式,按照前後順序是,@PostConstruct、afterPropertiesSet、init-method
。
在這裏,它的初始化方法中,主要就幹了三件事。
private void initClient() { if (LOGGER.isInfoEnabled()) { LOGGER.info("Initializing Global Transaction Clients ... "); } //init TM 初始化事務管理器 TMClient.init(applicationId, txServiceGroup); //init RM 初始化資源管理器 RMClient.init(applicationId, txServiceGroup); //註冊鉤子程序,用於TM、RM的資源清理 registerSpringShutdownHook(); } 複製代碼
到目前爲止,Seata定義的三個組件都已經浮出水面了。
TMClient.init
主要是初始化事務管理器的客戶端,創建與RPC服務端的鏈接,同時向事務協調器註冊。
RMClient.init
也是同樣過程,初始化資源管理器,創建與RPC服務端的鏈接,同時向事務協調器註冊。
同時,它們都是經過定時任務來完成鏈接的,因此斷線以後能夠自動重連。
timerExecutor.scheduleAtFixedRate(new Runnable() { @Override public void run() { clientChannelManager.reconnect(getTransactionServiceGroup()); } }, 5, 5, TimeUnit.SECONDS); 複製代碼
最後,註冊鉤子程序,用於清理這兩個組件中的資源。
它其實是一個Bean的後置處理器,在Bean初始化以後,調用postProcessAfterInitialization
方法。
public Object postProcessAfterInitialization(@Nullable Object bean, String beanName) { if (bean != null) { Object cacheKey = this.getCacheKey(bean.getClass(), beanName); if (this.earlyProxyReferences.remove(cacheKey) != bean) { return this.wrapIfNecessary(bean, beanName, cacheKey); } } return bean; } 複製代碼
而後在GlobalTransactionScanner.wrapIfNecessary()
裏它幹了些什麼呢?
就是檢查Bean的方法上是否包含GlobalTransactional
和GlobalLock
註解,而後生成代理類。
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey){ if (disableGlobalTransaction) { return bean; } //已經生成了代理,直接返回 if (PROXYED_SET.contains(beanName)) { return bean; } interceptor = null; //檢查是否是TCC的代理 if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) { //TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName)); } else { Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean); Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean); //判斷類方法上是否包含GlobalTransactional註解和GlobalLock註解 if (!existsAnnotation(new Class[] {serviceInterface}) && !existsAnnotation(interfacesIfJdk)) { return bean; } //建立攔截器 if (interceptor == null) { interceptor = new GlobalTransactionalInterceptor(failureHandlerHook); } } //若是不是AOP代理,則建立代理;若是是代理,則將攔截器加入到Advisor if (!AopUtils.isAopProxy(bean)) { bean = super.wrapIfNecessary(bean, beanName, cacheKey); } else { AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean); Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null)); for (Advisor avr : advisor) { advised.addAdvisor(0, avr); } } PROXYED_SET.add(beanName); return bean; } 複製代碼
至此,咱們已經肯定了一件事。咱們ServiceImpl
實現類上帶有GlobalTransactional
註解的方法,會生成一個代理類。
在調用方法時,實際會調用的就是代理類的攔截器方法invoke()
。
public class GlobalTransactionalInterceptor implements MethodInterceptor { @Override public Object invoke(final MethodInvocation methodInvocation) throws Throwable { //獲取目標類 Class<?> targetClass = AopUtils.getTargetClass(methodInvocation.getThis()); //獲取調用的方法 Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass); final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod); //獲取方法上的註解 final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, GlobalTransactional.class); final GlobalLock globalLockAnnotation = getAnnotation(method, GlobalLock.class); //處理全局事務 if (globalTransactionalAnnotation != null) { return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation); } else if (globalLockAnnotation != null) { return handleGlobalLock(methodInvocation); } else { return methodInvocation.proceed(); } } } 複製代碼
能夠看到,這裏是開始處理全局事務的地方。這裏咱們先不深究,接着往下看。
除了上面建立方法的代理,還要建立數據源的代理;而後把這個代理對象設置到SqlSessionFactory
。
@Bean public DataSourceProxy dataSourceProxy(DataSource dataSource) { return new DataSourceProxy(dataSource); } @Bean public SqlSessionFactory sqlSessionFactoryBean(DataSourceProxy dataSourceProxy) throws Exception { SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean(); sqlSessionFactoryBean.setDataSource(dataSourceProxy); sqlSessionFactoryBean.setTransactionFactory(new JdbcTransactionFactory()); return sqlSessionFactoryBean.getObject(); } 複製代碼
這裏的重點是建立了DataSourceProxy
,並把它設置到Mybatis
中的SqlSessionFactory
。
咱們知道,在Mybatis
執行方法的時候,最終要建立PreparedStatement
對象,而後執行ps.execute()
返回SQL結果。
這裏有兩點咱們須要注意:
PreparedStatement
對象是從Connection
對象建立而來的,也許咱們都寫過:
PreparedStatement pstmt = conn.prepareStatement(insert ........)
複製代碼
Connection
又是從哪裏來的呢?這個咱們沒必要遲疑,固然從數據源中才能拿到一個鏈接。
不過咱們已經把數據源DataSource
對象已經被替換成了Seata
中的DataSourceProxy
對象。
因此,Connection和PreparedStatement
在建立的時候,都被搞成了Seata
中的代理對象。
不信你看嘛:
public class DataSourceProxy extends AbstractDataSourceProxy implements Resource { public ConnectionProxy getConnection() throws SQLException { Connection targetConnection = targetDataSource.getConnection(); return new ConnectionProxy(this, targetConnection); } } 複製代碼
而後調用AbstractDataSourceProxy
來建立PreparedStatement
。
public abstract class AbstractConnectionProxy implements Connection { @Override public PreparedStatement prepareStatement(String sql) throws SQLException { PreparedStatement targetPreparedStatement = getTargetConnection().prepareStatement(sql); return new PreparedStatementProxy(this, targetPreparedStatement, sql); } } 複製代碼
看到這裏,咱們應該明白一件事。
在執行ps.execute()
的時候,則會調用到PreparedStatementProxy.execute()
。
理清了配置文件後面的邏輯,也許就掌握了它的脈絡,再看代碼的時候,能夠知道從哪裏下手。
上面已經說到,ServiceImpl
已是一個代理類,因此咱們直接看GlobalTransactionalInterceptor.invoke()
。
它會調用到TransactionalTemplate.execute()
,TransactionalTemplate
是業務邏輯和全局事務的模板。
public class TransactionalTemplate { public Object execute(TransactionalExecutor business) throws Throwable { // 1. 建立一個全局事務 GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate(); // 1.1 獲取事務的屬性 好比超時時間、事務名稱 TransactionInfo txInfo = business.getTransactionInfo(); if (txInfo == null) { throw new ShouldNeverHappenException("transactionInfo does not exist"); } try { // 2. 開始事務 beginTransaction(txInfo, tx); Object rs = null; try { // 執行業務邏輯 rs = business.execute(); } catch (Throwable ex) { // 3.回滾 completeTransactionAfterThrowing(txInfo,tx,ex); throw ex; } // 4. 提交 commitTransaction(tx); return rs; } finally { //5. 清理資源 triggerAfterCompletion(); cleanUp(); } } } 複製代碼
這裏的代碼很清晰,事務的流程也一目瞭然。
下面咱們看看具體它是怎麼作的。
從客戶端的角度來看,開啓事務就是告訴服務器說:我要開啓一個全局事務了,請事務協調器TC先生分配一個全局事務ID給我。
TC先生會根據應用名稱、事務分組、事務名稱等建立全局Session,並生成一個全局事務XID。
而後客戶端記錄當前的事務狀態爲Begin
,並將XID綁定到當前線程。
開啓事務以後,開始執行咱們本身的業務邏輯。這就涉及到了數據庫操做,上面咱們說到Seata
已經將PreparedStatement
對象作了代理。因此在執行的時候將會調用到PreparedStatementProxy.execute()
。
public class PreparedStatementProxy{ public boolean execute() throws SQLException { return ExecuteTemplate.execute(this, new StatementCallback<Boolean, PreparedStatement>() { @Override public Boolean execute(PreparedStatement statement, Object... args) throws SQLException { return statement.execute(); } }); } } 複製代碼
在這裏它會先根據SQL的類型生成不一樣的執行器。好比是一個INSERT INTO
語句,那麼就是InsertExecutor
執行器。
而後判斷是否是自動提交的,執行相應方法。那麼接着看executeAutoCommitFalse()
public abstract class AbstractDMLBaseExecutor{ protected T executeAutoCommitFalse(Object[] args) throws Throwable { TableRecords beforeImage = beforeImage(); T result = statementCallback.execute(statementProxy.getTargetStatement(), args); TableRecords afterImage = afterImage(beforeImage); prepareUndoLog(beforeImage, afterImage); return result; } } 複製代碼
這裏就是AT模式一階段所作的事,攔截業務SQL,在數據保存前將其保存爲beforeImage
;而後執行業務SQL,在數據更新後再將其保存爲afterImage
。這些操做所有在一個本地事務中完成,保證了一階段操做的原子性。
咱們以INSERT INTO
爲例,看看它是怎麼作的。
因爲是新增操做,因此在執行以前,這條記錄尚未,beforeImage只是一個空表記錄。
執行原有的SQL語句,好比INSERT INTO ORDER(ID,NAME)VALUE(?,?)
它要作的事就是,把剛剛添加的那條記錄從數據庫中再查出來。
protected TableRecords afterImage(TableRecords beforeImage) throws SQLException { //查找主鍵ID的值 List<Object> pkValues = containsPK() ? getPkValuesByColumn() : getPkValuesByAuto(); //根據主鍵ID查找記錄 TableRecords afterImage = getTableRecords(pkValues); return afterImage; } 複製代碼
而後將beforeImage
和afterImage
構建成UndoLog
對象,保存到數據庫。重要的是,這些操做都是在同一個本地事務中進行的。咱們看它的sqlList也能看出來。
最後,咱們看一下UndoLog
在數據庫中的記錄是長這樣的:
{ "@class": "io.seata.rm.datasource.undo.BranchUndoLog", "xid": "192.168.216.1:8091:2016493467", "branchId": 2016493468, "sqlUndoLogs": ["java.util.ArrayList", [{ "@class": "io.seata.rm.datasource.undo.SQLUndoLog", "sqlType": "INSERT", "tableName": "t_order", "beforeImage": { "@class": "io.seata.rm.datasource.sql.struct.TableRecords$EmptyTableRecords", "tableName": "t_order", "rows": ["java.util.ArrayList", []] }, "afterImage": { "@class": "io.seata.rm.datasource.sql.struct.TableRecords", "tableName": "t_order", "rows": ["java.util.ArrayList", [{ "@class": "io.seata.rm.datasource.sql.struct.Row", "fields": ["java.util.ArrayList", [{ "@class": "io.seata.rm.datasource.sql.struct.Field", "name": "id", "keyType": "PrimaryKey", "type": 4, "value": 116 }, { "@class": "io.seata.rm.datasource.sql.struct.Field", "name": "order_no", "keyType": "NULL", "type": 12, "value": "c233d8fb-5e71-4fc1-bc95-6f3d86312db6" }, { "@class": "io.seata.rm.datasource.sql.struct.Field", "name": "user_id", "keyType": "NULL", "type": 12, "value": "200548" }, { "@class": "io.seata.rm.datasource.sql.struct.Field", "name": "commodity_code", "keyType": "NULL", "type": 12, "value": "HYD5620" }, { "@class": "io.seata.rm.datasource.sql.struct.Field", "name": "count", "keyType": "NULL", "type": 4, "value": 10 }, { "@class": "io.seata.rm.datasource.sql.struct.Field", "name": "amount", "keyType": "NULL", "type": 8, "value": 5000.0 }]] }]] } }]] } 複製代碼
若是執行業務沒有異常,就進入二階段提交。客戶端向服務器發送Commit事件,同時將XID解綁。
服務器端回覆確認提交後,客戶端將本地UndoLog數據清除。
這裏重要在AsyncWorker.init()
方法,它會啓動一個定時任務來執行doBranchCommits
,來清除Log數據。
若是發生異常,則進行二階段回滾。
先經過xid和branchId 找到UnDoLog這條記錄,而後在解析裏面的數據生成反向SQL,將剛纔的執行結果給撤銷。
這塊代碼較長,你們自行參考UndoLogManager.undo()
和AbstractUndoExecutor.executeOn()
方法。
只有一個事務管理器TM纔會開啓全局事務,那麼其餘服務參與者是如何自動歸入到全局事務中去的呢?
首先,Seata
給Dubbo搞了個Filter過濾器叫作TransactionPropagationFilter
。
它會在Dubbo RPC
上下文中設置XID,這樣在其餘服務中也能夠獲取這個XID。
而後,咱們知道,Seata已經代理了PreparedStatement
。在執行數據操做的時候,就有個判斷。
if (!RootContext.inGlobalTransaction() && !RootContext.requireGlobalLock()) { //若是不包含XID,就執行原始方法 return statementCallback.execute(statementProxy.getTargetStatement(), args); } 複製代碼
這裏的意思就是,若是當前線程不包含XID,就執行原始方法;若是包含呢,就繼續往下執行事務方法。
本文大概闡述了Seata TA模式下,客戶端的工做原理。還有一部分Seata服務端的邏輯,本文並無深刻涉及。
緣由在於筆者尚未徹底的吃透這部份內容,沒辦法通俗的寫出來,等之後再補~
如若文中有不許確的地方,也但願朋友們不吝賜教,謝謝。