Seata分佈式事務TA模式源碼解讀

前言

前幾天,在家裏研究了下阿里巴巴開源的分佈式事務中間件Seata,並記錄了一下過程。java

SpringBoot+Dubbo+Seata分佈式事務實戰git

不過光有實戰不行,咱多少也得把原理搞搞清楚,否則出了問題不知咋解決豈不是很尷尬。github

1、原理

首先,設想一個傳統的單體應用,經過 3 個 模塊,在同一個數據源上更新數據來完成一項業務。spring

很天然的,整個業務過程的數據一致性由本地事務來保證。sql

隨着業務需求和架構的變化,單體應用被拆分爲微服務。原來的3個模塊被拆分爲3個獨立的服務,分別使用獨立的數據。數據庫

業務過程將經過RPC的服務調用來完成。springboot

那麼這個時候,每個服務內部的數據一致性仍由本地事務來保證。bash

而整個業務層面的全局數據一致性和完整性要如何保障呢?這就是微服務架構下面臨的,典型的分佈式事務需求。服務器

一、原理和設計

Seata把一個分佈式事務理解成一個包含了若干 分支事務全局事務全局事務 的職責是協調其下管轄的 分支事務 達成一致,要麼一塊兒成功提交,要麼一塊兒失敗回滾。此外,一般 分支事務 自己就是一個知足 ACID 的 本地事務。markdown

Seata定義了3個組件來協議分佈式事務的處理過程。

  • Transaction Coordinator (TC): 事務協調器,維護全局事務的運行狀態,負責協調並驅動全局事務的提交或回滾。
  • Transaction Manager (TM): 控制全局事務的邊界,負責開啓一個全局事務,並最終發起全局提交或全局回滾的決議。
  • Resource Manager (RM): 控制分支事務,負責分支註冊、狀態彙報,並接收事務協調器的指令,驅動分支(本地)事務的提交和回滾。

一個典型的分佈式事務過程:

  1. TM 向 TC 申請開啓一個全局事務,全局事務建立成功並生成一個全局惟一的 XID。
  2. XID 在微服務調用鏈路的上下文中傳播。
  3. RM 向 TC 註冊分支事務,將其歸入 XID 對應全局事務的管轄。
  4. TM 向 TC 發起針對 XID 的全局提交或回滾決議。
  5. TC 調度 XID 下管轄的所有分支事務完成提交或回滾請求。

二、AT模式

Seata有4種分佈式事務解決方案,分別是 AT 模式、TCC 模式、Saga 模式和 XA 模式。<後兩種實現還在官方計劃版本中>

咱們的示例項目中,所用到的就是AT模式。在 AT 模式下,用戶只需關注本身的「業務 SQL」,用戶的 「業務 SQL」 做爲一階段,Seata 框架會自動生成事務的二階段提交和回滾操做。

  • 一階段:

在一階段,Seata 會攔截「業務 SQL」,首先解析 SQL 語義,找到「業務 SQL」要更新的業務數據,在業務數據被更新前,將其保存成「before image」,而後執行「業務 SQL」更新業務數據,在業務數據更新以後,再將其保存成「after image」,最後生成行鎖。以上操做所有在一個數據庫事務內完成,這樣保證了一階段操做的原子性。

  • 二階段提交:

二階段若是是提交的話,由於「業務 SQL」在一階段已經提交至數據庫,因此Seata 框架只需將一階段保存的快照數據和行鎖刪掉,完成數據清理便可。

  • 二階段回滾:

二階段若是是回滾的話,Seata 就須要回滾一階段已經執行的「業務 SQL」,還原業務數據。回滾方式即是用「before image」還原業務數據。

下面咱們從源碼中來看看這整個流程是怎麼串起來的。

2、本地環境搭建

爲了方便看源碼,首先就得把調試環境搞起,方便Debug。

Seata 源碼:github.com/seata/seata

目前的版本是0.7.0-SNAPSHOT,而後經過mvn install將項目打包到本地。

咱們的SpringBoot+Seata測試項目就能夠引入這個依賴。

<dependency>
	<groupId>io.seata</groupId>
	<artifactId>seata-all</artifactId>
	<version>0.7.0-SNAPSHOT</version>
</dependency>
複製代碼

爲啥要這樣幹呢?由於Seata不一樣組件之間的通訊都是Netty來完成的,在調試的時候,每每會由於超時而斷開鏈接。

引入了本地版本,咱們就能夠把心跳檢測時間加長或者索性去掉,隨便搞~

一、服務端啓動

找到io.seata.server.Server,直接運行main方法,就啓動了Seata服務,so easy~

咱們上面說Seata定義了三個組件,其中有一個叫TC的事務協調器,就是指這個服務端。

咱們看看它具體幹了些啥。

public class Server {
	
    public static void main(String[] args) throws IOException {
	
	//初始化參數解析器 
	ParameterParser parameterParser = new ParameterParser(args);
	
	//初始化RpcServer ,設置服務器參數
	RpcServer rpcServer = new RpcServer(WORKING_THREADS);
	rpcServer.setHost(parameterParser.getHost());
	rpcServer.setListenPort(parameterParser.getPort());
	UUIDGenerator.init(1);
	
	//從文件或者數據庫中加載Session 
	SessionHolder.init(parameterParser.getStoreMode());
	
	//初始化默認的協調器
	DefaultCoordinator coordinator = new DefaultCoordinator(rpcServer);
	coordinator.init();
	rpcServer.setHandler(coordinator);
	
	//註冊鉤子程序 清理協調器相關資源
	ShutdownHook.getInstance().addDisposable(coordinator);

	//127.0.0.1 and 0.0.0.0 are not valid here.
	if (NetUtil.isValidIp(parameterParser.getHost(), false)) {
	    XID.setIpAddress(parameterParser.getHost());
	} else {
	    XID.setIpAddress(NetUtil.getLocalIp());
	}
	XID.setPort(rpcServer.getListenPort());
	//啓動RPC服務
	rpcServer.init();
	System.exit(0);
    }
}
複製代碼

這裏的RpcServer是經過Netty實現的一個RPC服務端,用來接收並處理TM和RM的消息。本文的重點不在服務端,因此咱們先有一個大體的印象便可。

3、客戶端配置

在項目中,咱們配置了SeataConfiguration,其中的重點是配置全局事務掃描器和數據源代理。因此,咱們先來看看爲啥要配置它們,它們具體又作了什麼事。

一、事務掃描器

@Bean
public GlobalTransactionScanner globalTransactionScanner() {
    return new GlobalTransactionScanner("springboot-order", "my_test_tx_group");
}
複製代碼

按照規矩,咱們看一個類,先看它的結構。好比它是誰的兒子,從哪裏來,欲往何處去?

public class GlobalTransactionScanner extends AbstractAutoProxyCreator
    implements InitializingBean, ApplicationContextAware,DisposableBean {
}
複製代碼

這裏咱們看到它是AbstractAutoProxyCreator的子類,又實現了InitializingBean接口。

這倆哥們都是Spring你們族的成員,一個用於Spring AOP生成代理;一個用於調用Bean的初始化方法。

  • InitializingBean

Bean的初始化方法有三種方式,按照前後順序是,@PostConstruct、afterPropertiesSet、init-method

在這裏,它的初始化方法中,主要就幹了三件事。

private void initClient() {
    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("Initializing Global Transaction Clients ... ");
    }
    //init TM 初始化事務管理器
    TMClient.init(applicationId, txServiceGroup);
    //init RM 初始化資源管理器
    RMClient.init(applicationId, txServiceGroup);
    //註冊鉤子程序,用於TM、RM的資源清理
    registerSpringShutdownHook();
}
複製代碼

到目前爲止,Seata定義的三個組件都已經浮出水面了。

TMClient.init主要是初始化事務管理器的客戶端,創建與RPC服務端的鏈接,同時向事務協調器註冊。

RMClient.init也是同樣過程,初始化資源管理器,創建與RPC服務端的鏈接,同時向事務協調器註冊。

同時,它們都是經過定時任務來完成鏈接的,因此斷線以後能夠自動重連。

timerExecutor.scheduleAtFixedRate(new Runnable() {
	@Override
	public void run() {
	    clientChannelManager.reconnect(getTransactionServiceGroup());
	}
}, 5, 5, TimeUnit.SECONDS);
複製代碼

最後,註冊鉤子程序,用於清理這兩個組件中的資源。

  • AbstractAutoProxyCreator

它其實是一個Bean的後置處理器,在Bean初始化以後,調用postProcessAfterInitialization方法。

public Object postProcessAfterInitialization(@Nullable Object bean, String beanName) {
    if (bean != null) {
    	Object cacheKey = this.getCacheKey(bean.getClass(), beanName);
    	if (this.earlyProxyReferences.remove(cacheKey) != bean) {
    	    return this.wrapIfNecessary(bean, beanName, cacheKey);
    	}
    }
    return bean;
}
複製代碼

而後在GlobalTransactionScanner.wrapIfNecessary()裏它幹了些什麼呢?

就是檢查Bean的方法上是否包含GlobalTransactionalGlobalLock註解,而後生成代理類。

protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey){
    if (disableGlobalTransaction) {
    	return bean;
    }
    //已經生成了代理,直接返回
    if (PROXYED_SET.contains(beanName)) {
	    return bean;
    }
    interceptor = null;
	
    //檢查是否是TCC的代理
    if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
    	//TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
    	interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
    } else {
    	Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
    	Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
    	//判斷類方法上是否包含GlobalTransactional註解和GlobalLock註解
    	if (!existsAnnotation(new Class[] {serviceInterface})
    		&& !existsAnnotation(interfacesIfJdk)) {
    		return bean;
    	}
    	//建立攔截器
    	if (interceptor == null) {
    	    interceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
    	}
    }
    //若是不是AOP代理,則建立代理;若是是代理,則將攔截器加入到Advisor
    if (!AopUtils.isAopProxy(bean)) {
    	bean = super.wrapIfNecessary(bean, beanName, cacheKey);
    } else {
    	AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
    	Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
    	for (Advisor avr : advisor) {
    		advised.addAdvisor(0, avr);
    	}
    }
    PROXYED_SET.add(beanName);
    return bean;
}
複製代碼

至此,咱們已經肯定了一件事。咱們ServiceImpl實現類上帶有GlobalTransactional註解的方法,會生成一個代理類。

在調用方法時,實際會調用的就是代理類的攔截器方法invoke()

public class GlobalTransactionalInterceptor implements MethodInterceptor {
	
    @Override
    public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
    	
    	//獲取目標類
    	Class<?> targetClass = AopUtils.getTargetClass(methodInvocation.getThis());
    	//獲取調用的方法
    	Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
    	final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
    	//獲取方法上的註解
    	final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, GlobalTransactional.class);
    	final GlobalLock globalLockAnnotation = getAnnotation(method, GlobalLock.class);
    	//處理全局事務
    	if (globalTransactionalAnnotation != null) {
    	    return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
    	} else if (globalLockAnnotation != null) {
    	    return handleGlobalLock(methodInvocation);
    	} else {
    	    return methodInvocation.proceed();
    	}
    }
}
複製代碼

能夠看到,這裏是開始處理全局事務的地方。這裏咱們先不深究,接着往下看。

二、數據源代理

除了上面建立方法的代理,還要建立數據源的代理;而後把這個代理對象設置到SqlSessionFactory

@Bean
public DataSourceProxy dataSourceProxy(DataSource dataSource) {
    return new DataSourceProxy(dataSource);
}

@Bean
public SqlSessionFactory sqlSessionFactoryBean(DataSourceProxy dataSourceProxy) throws Exception {
    SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
    sqlSessionFactoryBean.setDataSource(dataSourceProxy);
    sqlSessionFactoryBean.setTransactionFactory(new JdbcTransactionFactory());
    return sqlSessionFactoryBean.getObject();
}
複製代碼

這裏的重點是建立了DataSourceProxy,並把它設置到Mybatis中的SqlSessionFactory

咱們知道,在Mybatis執行方法的時候,最終要建立PreparedStatement對象,而後執行ps.execute()返回SQL結果。

這裏有兩點咱們須要注意:

  • PreparedStatement的建立

PreparedStatement對象是從Connection對象建立而來的,也許咱們都寫過:

PreparedStatement pstmt = conn.prepareStatement(insert ........)
複製代碼
  • Connection的建立

Connection又是從哪裏來的呢?這個咱們沒必要遲疑,固然從數據源中才能拿到一個鏈接。

不過咱們已經把數據源DataSource對象已經被替換成了Seata中的DataSourceProxy對象。

因此,Connection和PreparedStatement在建立的時候,都被搞成了Seata中的代理對象。

不信你看嘛:

public class DataSourceProxy extends AbstractDataSourceProxy implements Resource {
    public ConnectionProxy getConnection() throws SQLException {
    	Connection targetConnection = targetDataSource.getConnection();
    	return new ConnectionProxy(this, targetConnection);
    }
}
複製代碼

而後調用AbstractDataSourceProxy來建立PreparedStatement

public abstract class AbstractConnectionProxy implements Connection {
	
    @Override
    public PreparedStatement prepareStatement(String sql) throws SQLException {
        PreparedStatement targetPreparedStatement = getTargetConnection().prepareStatement(sql);
        return new PreparedStatementProxy(this, targetPreparedStatement, sql);
    }
}
複製代碼

看到這裏,咱們應該明白一件事。

在執行ps.execute()的時候,則會調用到PreparedStatementProxy.execute()

理清了配置文件後面的邏輯,也許就掌握了它的脈絡,再看代碼的時候,能夠知道從哪裏下手。

4、方法的執行

上面已經說到,ServiceImpl已是一個代理類,因此咱們直接看GlobalTransactionalInterceptor.invoke()

它會調用到TransactionalTemplate.execute()TransactionalTemplate是業務邏輯和全局事務的模板。

public class TransactionalTemplate {

    public Object execute(TransactionalExecutor business) throws Throwable {
	
    	// 1. 建立一個全局事務
    	GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
    	// 1.1 獲取事務的屬性  好比超時時間、事務名稱
    	TransactionInfo txInfo = business.getTransactionInfo();
    	if (txInfo == null) {
    	    throw new ShouldNeverHappenException("transactionInfo does not exist");
    	}
    	try {
    	    // 2. 開始事務
    	    beginTransaction(txInfo, tx);
    	    Object rs = null;
    	    try {
    	    	// 執行業務邏輯
    	    	rs = business.execute();
    	    } catch (Throwable ex) {
    	    	// 3.回滾
    	    	completeTransactionAfterThrowing(txInfo,tx,ex);
    	    	throw ex;
    	    }
    	    // 4. 提交
    	    commitTransaction(tx);
    	    return rs;
    	} finally {
    	    //5. 清理資源
    	    triggerAfterCompletion();
    	    cleanUp();
    	}
    }
}
複製代碼

這裏的代碼很清晰,事務的流程也一目瞭然。

  1. 建立全局事務,並設置事務屬性
  2. 開啓一個事務
  3. 執行業務邏輯
  4. 若是發生異常,則回滾事務;不然提交事務
  5. 清理資源

下面咱們看看具體它是怎麼作的。

一、開啓事務

從客戶端的角度來看,開啓事務就是告訴服務器說:我要開啓一個全局事務了,請事務協調器TC先生分配一個全局事務ID給我。

TC先生會根據應用名稱、事務分組、事務名稱等建立全局Session,並生成一個全局事務XID。

而後客戶端記錄當前的事務狀態爲Begin ,並將XID綁定到當前線程。

二、執行業務邏輯

開啓事務以後,開始執行咱們本身的業務邏輯。這就涉及到了數據庫操做,上面咱們說到Seata已經將PreparedStatement對象作了代理。因此在執行的時候將會調用到PreparedStatementProxy.execute()

public class PreparedStatementProxy{
	
    public boolean execute() throws SQLException {
        return ExecuteTemplate.execute(this, new StatementCallback<Boolean, PreparedStatement>() {
            @Override
            public Boolean execute(PreparedStatement statement, Object... args) throws SQLException {
                return statement.execute();
            }
        });
    }
}
複製代碼

在這裏它會先根據SQL的類型生成不一樣的執行器。好比是一個INSERT INTO語句,那麼就是InsertExecutor執行器。

而後判斷是否是自動提交的,執行相應方法。那麼接着看executeAutoCommitFalse()

public abstract class AbstractDMLBaseExecutor{
	
    protected T executeAutoCommitFalse(Object[] args) throws Throwable {
    	TableRecords beforeImage = beforeImage();
    	T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
    	TableRecords afterImage = afterImage(beforeImage);
    	prepareUndoLog(beforeImage, afterImage);
    	return result;
    }
}
複製代碼

這裏就是AT模式一階段所作的事,攔截業務SQL,在數據保存前將其保存爲beforeImage;而後執行業務SQL,在數據更新後再將其保存爲afterImage。這些操做所有在一個本地事務中完成,保證了一階段操做的原子性。

咱們以INSERT INTO爲例,看看它是怎麼作的。

  • beforeImage

因爲是新增操做,因此在執行以前,這條記錄尚未,beforeImage只是一個空表記錄。

  • 業務SQL

執行原有的SQL語句,好比INSERT INTO ORDER(ID,NAME)VALUE(?,?)

  • afterImage

它要作的事就是,把剛剛添加的那條記錄從數據庫中再查出來。

protected TableRecords afterImage(TableRecords beforeImage) throws SQLException {
	
    //查找主鍵ID的值
    List<Object> pkValues = containsPK() ? getPkValuesByColumn() : getPkValuesByAuto();
    //根據主鍵ID查找記錄
    TableRecords afterImage = getTableRecords(pkValues);
    return afterImage;
}
複製代碼

而後將beforeImageafterImage構建成UndoLog對象,保存到數據庫。重要的是,這些操做都是在同一個本地事務中進行的。咱們看它的sqlList也能看出來。

最後,咱們看一下UndoLog在數據庫中的記錄是長這樣的:

{
	"@class": "io.seata.rm.datasource.undo.BranchUndoLog",
	"xid": "192.168.216.1:8091:2016493467",
	"branchId": 2016493468,
	"sqlUndoLogs": ["java.util.ArrayList", [{
		"@class": "io.seata.rm.datasource.undo.SQLUndoLog",
		"sqlType": "INSERT",
		"tableName": "t_order",
		"beforeImage": {
			"@class": "io.seata.rm.datasource.sql.struct.TableRecords$EmptyTableRecords",
			"tableName": "t_order",
			"rows": ["java.util.ArrayList", []]
		},
		"afterImage": {
			"@class": "io.seata.rm.datasource.sql.struct.TableRecords",
			"tableName": "t_order",
			"rows": ["java.util.ArrayList", [{
				"@class": "io.seata.rm.datasource.sql.struct.Row",
				"fields": ["java.util.ArrayList", [{
					"@class": "io.seata.rm.datasource.sql.struct.Field",
					"name": "id",
					"keyType": "PrimaryKey",
					"type": 4,
					"value": 116
				}, {
					"@class": "io.seata.rm.datasource.sql.struct.Field",
					"name": "order_no",
					"keyType": "NULL",
					"type": 12,
					"value": "c233d8fb-5e71-4fc1-bc95-6f3d86312db6"
				}, {
					"@class": "io.seata.rm.datasource.sql.struct.Field",
					"name": "user_id",
					"keyType": "NULL",
					"type": 12,
					"value": "200548"
				}, {
					"@class": "io.seata.rm.datasource.sql.struct.Field",
					"name": "commodity_code",
					"keyType": "NULL",
					"type": 12,
					"value": "HYD5620"
				}, {
					"@class": "io.seata.rm.datasource.sql.struct.Field",
					"name": "count",
					"keyType": "NULL",
					"type": 4,
					"value": 10
				}, {
					"@class": "io.seata.rm.datasource.sql.struct.Field",
					"name": "amount",
					"keyType": "NULL",
					"type": 8,
					"value": 5000.0
				}]]
			}]]
		}
	}]]
}
複製代碼

三、提交

若是執行業務沒有異常,就進入二階段提交。客戶端向服務器發送Commit事件,同時將XID解綁。

服務器端回覆確認提交後,客戶端將本地UndoLog數據清除。

這裏重要在AsyncWorker.init()方法,它會啓動一個定時任務來執行doBranchCommits,來清除Log數據。

四、回滾

若是發生異常,則進行二階段回滾。

先經過xid和branchId 找到UnDoLog這條記錄,而後在解析裏面的數據生成反向SQL,將剛纔的執行結果給撤銷。

這塊代碼較長,你們自行參考UndoLogManager.undo()AbstractUndoExecutor.executeOn()方法。

五、如何關聯Dubbo

只有一個事務管理器TM纔會開啓全局事務,那麼其餘服務參與者是如何自動歸入到全局事務中去的呢?

首先,Seata給Dubbo搞了個Filter過濾器叫作TransactionPropagationFilter

它會在Dubbo RPC上下文中設置XID,這樣在其餘服務中也能夠獲取這個XID。

而後,咱們知道,Seata已經代理了PreparedStatement。在執行數據操做的時候,就有個判斷。

if (!RootContext.inGlobalTransaction() && !RootContext.requireGlobalLock()) {
    //若是不包含XID,就執行原始方法
    return statementCallback.execute(statementProxy.getTargetStatement(), args);
}
複製代碼

這裏的意思就是,若是當前線程不包含XID,就執行原始方法;若是包含呢,就繼續往下執行事務方法。

5、總結

本文大概闡述了Seata TA模式下,客戶端的工做原理。還有一部分Seata服務端的邏輯,本文並無深刻涉及。

緣由在於筆者尚未徹底的吃透這部份內容,沒辦法通俗的寫出來,等之後再補~

如若文中有不許確的地方,也但願朋友們不吝賜教,謝謝。

相關文章
相關標籤/搜索