深度剖析一站式分佈式事務方案Seata-Client

1.背景

在以前的文章中已經介紹過Seata的整體介紹,如何使用以及Seata-Server的原理分析,有興趣的能夠閱讀下面的文章:spring

這篇文章會介紹Seata中另外兩個重要的角色TM(事務管理器)和RM(資源管理器),首先仍是來看看下面這張圖:sql

上一個文章對於TC的原理已經作了詳細介紹,對於TM和RM咱們看見在圖中都是屬於client的角色,他們分別的功能以下:數據庫

  • TM(事務管理器):用來控制整個分佈式事務的管理,發起全局事務的Begin/Commit/Rollback
  • RM(資源管理器):用來註冊本身的分支事務,接受TCCommit或者Rollback請求.

2.Seata-Spring

首先咱們來介紹一些Seata-clientSpring模塊,Seata經過這個模塊對本身的TMRM進行初始化以及掃描AT模式和TCC模式的註解並初始化這些模式須要的資源。 在Seata的項目中有一個spring模塊,裏面包含了咱們和spring相關的邏輯,GlobalTransactionScanner是其中的核心類:mybatis

public class GlobalTransactionScanner extends AbstractAutoProxyCreator implements InitializingBean,ApplicationContextAware,
        DisposableBean

上面代碼是類的定義,首先它繼承了AbstractAutoProxyCreator實現了wrapIfNecessary方法實現咱們的方法的切面代理,實現了InitializingBean接口用於初始化咱們的客戶端,實現了ApplicationContextAware用於保存咱們的spring容器,實現了DisposableBean用於優雅關閉。app

首先來看繼承AbstractAutoProxyCreator實現的wrapIfNecessary框架

protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
        if (PROXYED_SET.contains(beanName)) {
            return bean;
        }
        interceptor = null;
        //check TCC proxy
        if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
            //TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
            interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
        } else {
            Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
            Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
            if (!existsAnnotation(new Class[]{serviceInterface})
                    && !existsAnnotation(interfacesIfJdk)) {
                return bean;
            }
            if (interceptor == null) {
                interceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
            }
        }
        if (!AopUtils.isAopProxy(bean)) {
            bean = super.wrapIfNecessary(bean, beanName, cacheKey);
        } else {
            AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
            Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
            for (Advisor avr : advisor) {
                advised.addAdvisor(0, avr);
            }
        }
        PROXYED_SET.add(beanName);   
        return bean;
    }
  • Step1:檢查當前beanName是否已經處理過 若是處理過本次就不處理。
  • Step2:根據註解,找到對應模式的Inteceptor,這裏有三種狀況第一個TCC,第二個是全局事務管理TM的攔截器,第三個是沒有註解,若是沒有那麼直接返回便可。
  • Step3:將對應的interceptor添加進入當前Bean

而後再看從InitializingBean中實現的afterPropertiesSet,也就是對Seata的初始化:異步

public void afterPropertiesSet() {
        initClient();

    }
    private void initClient() {
     
        //init TM
        TMClient.init(applicationId, txServiceGroup);
        //init RM
        RMClient.init(applicationId, txServiceGroup);
        registerSpringShutdownHook();
    }
    private void registerSpringShutdownHook() {
        if (applicationContext instanceof ConfigurableApplicationContext) {
            ((ConfigurableApplicationContext) applicationContext).registerShutdownHook();
            ShutdownHook.removeRuntimeShutdownHook();
        }
        ShutdownHook.getInstance().addDisposable(TmRpcClient.getInstance(applicationId, txServiceGroup));
        ShutdownHook.getInstance().addDisposable(RmRpcClient.getInstance(applicationId, txServiceGroup));
    }

上面的代碼邏輯比較清楚:async

  • Step1:初始化TM客戶端,這裏會向Server註冊該TM
  • Step2:初始化RM客戶端,這裏會向Server註冊該RM
  • Step3:註冊ShutdownHook,後續將TMRM優雅關閉。

注意這裏初始化的時候會初始化兩個客戶端,分別是TM客戶端和RM客戶端,不少人認爲TMRM是用的同一個客戶端,這裏須要注意一下。分佈式

2.1 Interceptor

再上面的第一部分邏輯中咱們看到咱們有兩個業務核心Interceptor,一個是GlobalTransactionalInterceptor用來處理全局事務的管理(開啓,提交,回滾),另一個是TccActionInterceptor用來處理TCC模式。熟悉Seata的朋友會問AT模式呢,爲何只有TCC模式,這裏AT模式表明着就是自動處理事務,咱們不須要有切面ide

2.1.1 GlobalTransactionalInterceptor

首先來看看GlobalTransactionalInterceptor#invoke:

public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
        Class<?> targetClass = (methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null);
        Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
        final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);

        final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, GlobalTransactional.class);
        final GlobalLock globalLockAnnotation = getAnnotation(method, GlobalLock.class);
        if (globalTransactionalAnnotation != null) {
            return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
        } else if (globalLockAnnotation != null) {
            return handleGlobalLock(methodInvocation);
        } else {
            return methodInvocation.proceed();
        }
    }
  • Step1:從代理類中獲取到原始的Method
  • Step2: 獲取Method中的註解
  • Step3: 若是有@GlobalTransactional註解執行handleGlobalTransaction切面邏輯,這個也是咱們全局事務的邏輯。
  • Step4: 若是有@GlobalLock註解,則執行handleGlobalLock切面邏輯,這個註解是用於一些非AT模式的數據庫加鎖,加上這個註解以後再執行Sql語句以前會查詢對應的數據是否加鎖,可是他不會加入全局事務。

handleGlobalTransaction邏輯以下:

private Object handleGlobalTransaction(final MethodInvocation methodInvocation,
                                           final GlobalTransactional globalTrxAnno) throws Throwable {

        return transactionalTemplate.execute(new TransactionalExecutor() {

            @Override
            public Object execute() throws Throwable {
                return methodInvocation.proceed();
            }

        });
    }
    TransactionalTemplate#execute
        public Object execute(TransactionalExecutor business) throws Throwable {
        // 1. get or create a transaction
        GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
        // 1.1 get transactionInfo
        TransactionInfo txInfo = business.getTransactionInfo();
        if (txInfo == null) {
            throw new ShouldNeverHappenException("transactionInfo does not exist");
        }
        try {
            // 2. begin transaction
            beginTransaction(txInfo, tx);
            Object rs = null;
            try {
                // Do Your Business
                rs = business.execute();

            } catch (Throwable ex) {
                // 3.the needed business exception to rollback.
                completeTransactionAfterThrowing(txInfo,tx,ex);
                throw ex;
            }
            // 4. everything is fine, commit.
            commitTransaction(tx);
            return rs;
        } finally {
            //5. clear
            triggerAfterCompletion();
            cleanUp();
        }
    }

handleGlobalTransaction中將具體的實現交給了TransactionalTemplate#execute去作了,其中具體的步驟以下:

  • Step1:獲取當前的全局事務,若是沒有則建立。
  • Step2:獲取業務中的事務信息包含超時時間等。
  • Step3:開啓全局事務
  • Step4:若是有異常拋出處理異常,rollback。
  • Step5:若是沒有異常那麼commit全局事務。
  • Step6:清除當前事務上下文信息。

2.1.2 TccActionInterceptor

咱們先看看TccActionInterceptor是如何使用:

@TwoPhaseBusinessAction(name = "TccActionOne" , commitMethod = "commit", rollbackMethod = "rollback")
    public boolean prepare(BusinessActionContext actionContext, int a);

    public boolean commit(BusinessActionContext actionContext);
    
    public boolean rollback(BusinessActionContext actionContext);

通常來講會定義三個方法一個是階段的try方法,另一個是二階段的commit和rollback,每一個方法的第一個參數是咱們事務上下文,這裏咱們不須要關心他在咱們切面中會自行填充處理。

接下來咱們再看看TCC相關的攔截器是如何處理的:

public Object invoke(final MethodInvocation invocation) throws Throwable {
		Method method = getActionInterfaceMethod(invocation);
		TwoPhaseBusinessAction businessAction = method.getAnnotation(TwoPhaseBusinessAction.class);	
		//try method
	    if(businessAction != null) {
			if(StringUtils.isBlank(RootContext.getXID())){
				//not in distribute transaction
				return invocation.proceed();
			}
	    	Object[] methodArgs = invocation.getArguments();
	    	//Handler the TCC Aspect
			Map<String, Object> ret = actionInterceptorHandler.proceed(method, methodArgs, businessAction, new Callback<Object>(){
				@Override
				public Object execute() throws Throwable {
					return invocation.proceed();
				}
	    	});
	    	//return the final result
	    	return ret.get(Constants.TCC_METHOD_RESULT);
	    }
		return invocation.proceed();
	}
  • Step1:獲取原始Method
  • Step2:判斷是否再全局事務中,也就是整個邏輯服務最外層是否執行了GlobalTransactionalInterceptor。若是再也不直接執行便可。
  • Step3:執行TCC切面,核心邏輯在actionInterceptorHandler#proceed中。

再來看看actionInterceptorHandler#proceed這個方法:

public Map<String, Object> proceed(Method method, Object[] arguments, TwoPhaseBusinessAction businessAction, Callback<Object> targetCallback) throws Throwable {
		Map<String, Object> ret = new HashMap<String, Object>(16);
		
		//TCC name
        String actionName = businessAction.name();
        String xid = RootContext.getXID();
        BusinessActionContext actionContext = new BusinessActionContext();
        actionContext.setXid(xid);
        //set action anme
        actionContext.setActionName(actionName)

        //Creating Branch Record
        String branchId = doTccActionLogStore(method, arguments, businessAction, actionContext);
        actionContext.setBranchId(branchId);
        
        //set the parameter whose type is BusinessActionContext
        Class<?>[] types = method.getParameterTypes();
        int argIndex = 0;
        for (Class<?> cls : types) {
            if (cls.getName().equals(BusinessActionContext.class.getName())) {
            	arguments[argIndex] = actionContext;
                break;
            }
            argIndex++;
        }
        //the final parameters of the try method
        ret.put(Constants.TCC_METHOD_ARGUMENTS, arguments);
        //the final result
        ret.put(Constants.TCC_METHOD_RESULT, targetCallback.execute());
        return ret;
	}
  • Step1:獲取一些事務信息,好比TCC名字,本次事務XID等。
  • Step2:建立Branch事務,一個是在本地的context上下文中將它的commitrollback信息保存起來,另外一個是向咱們的Seata-Server註冊分支事務,用於後續的管理。
  • Step3:填充方法參數,也就是咱們的BusinessActionContext

2.2 小結

Spring的幾個總要的內容已經剖析完畢,核心類主要是三個,一個Scanner,兩個Interceptor。總體來講比較簡單,Spring作的基本上也是咱們客戶端一些初始化的事,接下來咱們深刻了解一下TM這個角色。

3. TM 事務管理器

在上面章節中咱們講了GlobalTransactionalInterceptor這個切面攔截器,咱們知道了這個攔截器中作了咱們TM應該作的事,事務的開啓,事務的提交,事務的回滾。這裏只是咱們總體邏輯的發起點,其中具體的客戶端邏輯在咱們的DefaultTransactionManager中,這個類中的代碼以下所示:

public class DefaultTransactionManager implements TransactionManager {

    @Override
    public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
        throws TransactionException {
        GlobalBeginRequest request = new GlobalBeginRequest();
        request.setTransactionName(name);
        request.setTimeout(timeout);
        GlobalBeginResponse response = (GlobalBeginResponse)syncCall(request);
        return response.getXid();
    }

    @Override
    public GlobalStatus commit(String xid) throws TransactionException {
        GlobalCommitRequest globalCommit = new GlobalCommitRequest();
        globalCommit.setXid(xid);
        GlobalCommitResponse response = (GlobalCommitResponse)syncCall(globalCommit);
        return response.getGlobalStatus();
    }

    @Override
    public GlobalStatus rollback(String xid) throws TransactionException {
        GlobalRollbackRequest globalRollback = new GlobalRollbackRequest();
        globalRollback.setXid(xid);
        GlobalRollbackResponse response = (GlobalRollbackResponse)syncCall(globalRollback);
        return response.getGlobalStatus();
    }

    @Override
    public GlobalStatus getStatus(String xid) throws TransactionException {
        GlobalStatusRequest queryGlobalStatus = new GlobalStatusRequest();
        queryGlobalStatus.setXid(xid);
        GlobalStatusResponse response = (GlobalStatusResponse)syncCall(queryGlobalStatus);
        return response.getGlobalStatus();
    }

    private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {
        try {
            return (AbstractTransactionResponse)TmRpcClient.getInstance().sendMsgWithResponse(request);
        } catch (TimeoutException toe) {
            throw new TransactionException(TransactionExceptionCode.IO, toe);
        }
    }
}

DefaultTransactionManager中總體邏輯比較簡單有四個方法:

  • begin:向Server發起GlobalBeginRequest請求,用於開啓全局事務。
  • commit:向Server發起GlobalCommitRequest請求,用於提交全局事務。
  • rollback:向Server發起GlobalRollbackRequest請求,用於回滾全局事務。
  • getStatus:向Server發起GlobalStatusRequest請求,用於查詢全局事務狀態信息。

4. RM 資源管理器

Seata中目前管理RM有兩種模式:一種是AT模式,須要事務性數據庫支持,會自動記錄修改前快照和修改後的快照,用於提交和回滾;還有一種是TCC模式,也能夠看做是MT模式,用於AT模式不支持的狀況,手動進行提交和回滾。接下來將會深刻剖析一下這兩種模式的實現原理。

4.1 AT 資源管理

AT模式下須要使用Seata提供的數據源代理,其總體實現邏輯以下圖所示:

在咱們的程序中執行一個sql語句,不管你是使用mybatis,仍是直接使用jdbcTemplate,都會遵循下面的步驟:

  • Step 1:從數據源中獲取數據庫鏈接。
  • Step 2: 從鏈接中獲取Statement
  • Step 3: 經過Statement執行咱們的sql語句

因此咱們能夠將DataSourceConnectionStatement代理起來而後執行咱們的一些特殊的邏輯,完成咱們的AT模式。

4.1.1 DataSourceProxy

在DataSourceProxy中沒有太多的業務邏輯,只是簡單的將獲取Connection用咱們的ConnectionProxy代理類進行了封裝,代碼以下:

public ConnectionProxy getConnection() throws SQLException {
        Connection targetConnection = targetDataSource.getConnection();
        return new ConnectionProxy(this, targetConnection);
    }

首先經過咱們代理以前的DataSource獲取鏈接,而後用ConnectionProxy將其代理起來。

4.1.2 ConnectionProxy

ConnectionProxy主要作三件事,第一個是生成代理的Statement,第二個是保存咱們的鏈接上下文:加鎖的Key,undoLog等,第三個是代理執行咱們的本地事務的commitrollback

首先來看看代理生成的Statement

@Override
    public Statement createStatement() throws SQLException {
        Statement targetStatement = getTargetConnection().createStatement();
        return new StatementProxy(this, targetStatement);
    }

    @Override
    public PreparedStatement prepareStatement(String sql) throws SQLException {
        PreparedStatement targetPreparedStatement = getTargetConnection().prepareStatement(sql);
        return new PreparedStatementProxy(this, targetPreparedStatement, sql);
    }

這裏也是經過咱們原來的鏈接直接生成Statement,而後將其進行代理。

接下來看看對咱們上下文的管理,你們都知道咱們的一個事務其實對應的是一個數據庫鏈接,在這個事務中的全部sqlundologlockKey都會在鏈接的上下文中記錄。以下面代碼所示:

/**
     * append sqlUndoLog
     *
     * @param sqlUndoLog the sql undo log
     */
    public void appendUndoLog(SQLUndoLog sqlUndoLog) {
        context.appendUndoItem(sqlUndoLog);
    }

    /**
     * append lockKey
     *
     * @param lockKey the lock key
     */
    public void appendLockKey(String lockKey) {
        context.appendLockKey(lockKey);
    }

這裏的代碼很簡單,lockKeyundolog都是用list保存,直接add便可。

當咱們的本地事務完成的時候,須要調用Connectioncommitrollback來進行事務的提交或回滾。這裏咱們也須要代理這兩個方法來完成咱們對分支事務的處理,先來看看commit方法。

public void commit() throws SQLException {
        if (context.inGlobalTransaction()) {
            processGlobalTransactionCommit();
        } else if (context.isGlobalLockRequire()) {
            processLocalCommitWithGlobalLocks();
        } else {
            targetConnection.commit();
        }
    }
    private void processGlobalTransactionCommit() throws SQLException {
        try {
            register();
        } catch (TransactionException e) {
            recognizeLockKeyConflictException(e);
        }
        try {
            if (context.hasUndoLog()) {
                UndoLogManager.flushUndoLogs(this);
            }
            targetConnection.commit();
        } catch (Throwable ex) {
            report(false);
            if (ex instanceof SQLException) {
                throw new SQLException(ex);
            }
        }
        report(true);
        context.reset();
    }
  • Step 1:判斷context是否再全局事務中,若是在則進行提交,到Step2。
  • Step 2: 註冊分支事務並加上全局鎖,若是全局鎖加鎖失敗則拋出異常。
  • Step 3: 若是context中有undolog,那麼將Unlog刷至數據庫。
  • Step 4: 提交本地事務。
  • Step 5:報告本地事務狀態,若是出現異常則報告失敗,若是沒有問題則報告正常。

上面介紹了提交事務的流程,當context在全局鎖的流程中,會進行全局鎖的查詢,這裏比較簡單就不作贅述,若是context都沒有在上述的狀況中那麼會直接進行事務提交。

對於咱們rollback來講代碼比較簡單:

public void rollback() throws SQLException {
        targetConnection.rollback();
        if (context.inGlobalTransaction()) {
            if (context.isBranchRegistered()) {
                report(false);
            }
        }
        context.reset();
    }
  • Step 1:首先提交本地事務。
  • Step 2:判斷是否在全局事務中。
  • Step 3:若是在則判斷分支事務是否已經註冊。
  • Step 4: 若是已經註冊那麼直接向客戶端報告該事務失敗異常。

細心的小夥伴可能發現若是咱們的本地事務提交或者回滾以後失敗,那咱們的分佈式事務運行結果還能正確嗎?這裏徹底不用擔憂,再咱們的服務端有完善的超時檢測,重試等機制,來幫助咱們應對這些特殊狀況。

4.1.3 StatementProxy

咱們通常用statement會調用executeXXX方法來執行咱們的sql語句,因此在咱們的Proxy中能夠利用這個方法,再執行sql的時候作一些咱們須要作的邏輯,下面看看execute方法的代碼:

public boolean execute(String sql) throws SQLException {
        this.targetSQL = sql;
        return ExecuteTemplate.execute(this, new StatementCallback<Boolean, T>() {
            @Override
            public Boolean execute(T statement, Object... args) throws SQLException {
                return statement.execute((String) args[0]);
            }
        }, sql);
    }

這裏直接將邏輯交給咱們的ExecuteTemplate去執行,有以下代碼:

public static <T, S extends Statement> T execute(SQLRecognizer sqlRecognizer,
                                                     StatementProxy<S> statementProxy,
                                                     StatementCallback<T, S> statementCallback,
                                                     Object... args) throws SQLException {

        if (!RootContext.inGlobalTransaction() && !RootContext.requireGlobalLock()) {
            // Just work as original statement
            return statementCallback.execute(statementProxy.getTargetStatement(), args);
        }

        if (sqlRecognizer == null) {
            sqlRecognizer = SQLVisitorFactory.get(
                    statementProxy.getTargetSQL(),
                    statementProxy.getConnectionProxy().getDbType());
        }
        Executor<T> executor = null;
        if (sqlRecognizer == null) {
            executor = new PlainExecutor<T, S>(statementProxy, statementCallback);
        } else {
            switch (sqlRecognizer.getSQLType()) {
                case INSERT:
                    executor = new InsertExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
                    break;
                case UPDATE:
                    executor = new UpdateExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
                    break;
                case DELETE:
                    executor = new DeleteExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
                    break;
                case SELECT_FOR_UPDATE:
                    executor = new SelectForUpdateExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
                    break;
                default:
                    executor = new PlainExecutor<T, S>(statementProxy, statementCallback);
                    break;
            }
        }
        T rs = null;
        try {
            rs = executor.execute(args);
        } catch (Throwable ex) {
            if (!(ex instanceof SQLException)) {
                // Turn other exception into SQLException
                ex = new SQLException(ex);
            }
            throw (SQLException)ex;
        }
        return rs;
    }
}

這裏是咱們代理執行sql的核心邏輯,步驟以下:

  • Step 1:若是不在全局事務且不須要查詢全局鎖,那麼就直接執行原始的Statement
  • Step 2: 若是沒有傳入sql識別器,那麼咱們須要生成sql識別器,這裏咱們會借用Druid中對sql的解析,咱們獲取sql的識別器,咱們經過這個識別器能夠獲取到不一樣類型的sql語句的一些條件,好比說SQLUpdateRecognizer是用於updatesql識別器,咱們能夠直接獲取到表名,條件語句,更新的字段,更新字段的值等。
  • Step 3:根據sql識別器的類型,來生成咱們不一樣類型的執行器。
  • Step 4:經過第三步中的執行器來執行咱們的sql語句。

這裏有五種Executor:INSERT,UPDATE,DELETE的執行器會進行undolog記錄而且記錄全局鎖,SELECT_FOR_UPDATE只會進行查詢全局鎖,有一個默認的表明咱們如今還不支持,什麼都不會作直接執行咱們的sql語句。

對於INSERT,UPDATE,DELETE的執行器會繼承咱們的AbstractDMLBaseExecutor

protected T executeAutoCommitFalse(Object[] args) throws Throwable {
        TableRecords beforeImage = beforeImage();
        T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
        TableRecords afterImage = afterImage(beforeImage);
        prepareUndoLog(beforeImage, afterImage);
        return result;
    }

    protected abstract TableRecords beforeImage() throws SQLException;


    protected abstract TableRecords afterImage(TableRecords beforeImage) throws SQLException;

AbstractDMLBaseExecutor中執行邏輯在executeAutoCommitFalse這個方法,步驟以下:

  • Step 1:獲取執行當前sql以前所受影響行的快照,這裏beforeImage會被不一樣類型的sql語句從新實現。
  • Step 2:執行當前sql語句,並獲取結果。
  • Step 3:獲取執行sql以後的快照,這裏的afterIamge也會被不一樣類型的sql語句從新實現。
  • Step 4:將undolog準備好,這裏會保存到咱們的ConnectionContext中。
protected void prepareUndoLog(TableRecords beforeImage, TableRecords afterImage) throws SQLException {
        if (beforeImage.getRows().size() == 0 && afterImage.getRows().size() == 0) {
            return;
        }

        ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();

        TableRecords lockKeyRecords = sqlRecognizer.getSQLType() == SQLType.DELETE ? beforeImage : afterImage;
        String lockKeys = buildLockKey(lockKeyRecords);
        connectionProxy.appendLockKey(lockKeys);

        SQLUndoLog sqlUndoLog = buildUndoItem(beforeImage, afterImage);
        connectionProxy.appendUndoLog(sqlUndoLog);
    }

準備UndoLog的時候會獲取咱們的ConnectionProxy,將咱們的UndologLockKey保存起來,給後面的本地事務commitrollback使用,上面已經講過。

4.1.4 分支事務的提交和回滾

上面的4.1.1-4.1.3都是說的是咱們分佈式事務的第一階段,也就是將咱們的分支事務註冊到Server,而第二階段分支提交和分支回滾都在咱們的DataSourceManager中,對於分支事務提交有以下代碼:

public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
        return asyncWorker.branchCommit(branchType, xid, branchId, resourceId, applicationData);
    }
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
        if (!ASYNC_COMMIT_BUFFER.offer(new Phase2Context(branchType, xid, branchId, resourceId, applicationData))) {
            LOGGER.warn("Async commit buffer is FULL. Rejected branch [" + branchId + "/" + xid + "] will be handled by housekeeping later.");
        }
        return BranchStatus.PhaseTwo_Committed;
    }

這裏將咱們的分支事務提交的信息,放到一個隊列中,異步去處理,也就是異步刪除咱們的undolog數據,由於提交以後undolog數據沒用了。

這裏有人可能會問若是當你將這個信息異步提交到隊列中的時候,機器宕機,那麼就不會執行異步刪除undolog的邏輯,那麼這條undolog是否是就會成爲永久的髒數據呢?這裏Seata爲了防止這種事出現,會定時掃描某些較老的undolog數據而後進行刪除,不會污染咱們的數據。

對於咱們的分支事務回滾有以下代碼:

public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
        DataSourceProxy dataSourceProxy = get(resourceId);
        if (dataSourceProxy == null) {
            throw new ShouldNeverHappenException();
        }
        try {
            UndoLogManager.undo(dataSourceProxy, xid, branchId);
        } catch (TransactionException te) {
            if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) {
                return BranchStatus.PhaseTwo_RollbackFailed_Unretryable;
            } else {
                return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
            }
        }
        return BranchStatus.PhaseTwo_Rollbacked;

    }

這裏會先獲取到咱們的數據源,接下來調用咱們的重作日誌管理器的undo方法進行日誌重作,undo方法較長這裏就不貼上來了,其核心邏輯是查找到咱們的undolog而後將裏面的快照在咱們數據庫進行重作。

4.2 TCC 資源管理

TCC沒有AT模式資源管理這麼複雜,部分核心邏輯在以前的Interceptor中已經講解過了,好比二階段方法的保存等。這裏主要看看TCC的分支事務提交和分支事務回滾,在TCCResourceManager中有:

public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
									 String applicationData) throws TransactionException {
		TCCResource tccResource = (TCCResource) tccResourceCache.get(resourceId);
		if (tccResource == null) {
			throw new ShouldNeverHappenException("TCC resource is not exist, resourceId:" + resourceId);
		}
		Object targetTCCBean = tccResource.getTargetBean();
		Method commitMethod = tccResource.getCommitMethod();
		if (targetTCCBean == null || commitMethod == null) {
			throw new ShouldNeverHappenException("TCC resource is not available, resourceId:" + resourceId);
		}
		boolean result = false;
		//BusinessActionContext
		BusinessActionContext businessActionContext =
				getBusinessActionContext(xid, branchId, resourceId, applicationData);
		Object ret = commitMethod.invoke(targetTCCBean, businessActionContext);
		LOGGER.info("TCC resource commit result :" + ret + ", xid:" + xid + ", branchId:" + branchId + ", resourceId:" +
				resourceId);
		if (ret != null && ret instanceof TwoPhaseResult) {
			result = ((TwoPhaseResult) ret).isSuccess();
		} else {
			result = (boolean) ret;
		}
		return result ? BranchStatus.PhaseTwo_Committed : BranchStatus.PhaseTwo_CommitFailed_Retryable;
	}

步驟以下:

  • Step 1:首先查找當前服務是否有該TCC資源,若是沒有拋出異常。
  • Step 2:而後找到咱們的TCC對象和對應的commit方法。
  • Step 3:而後執行咱們的commit方法。
  • Step 4:最後將結果返回給咱們的Server,由Server決定是否重試。

這裏的branchRollback方法也比較簡單,這裏就不作過多分析了。

總結

經過上面分析咱們知道,Seata的初始化是依賴Spring去進行,咱們的全局事務的開啓/提交/回滾都是依賴咱們的TM事務管理器,而咱們的分支事務的管理是依靠咱們的RM,其中提供了兩個模式ATTCCAT模式必須使用數據庫,其核心實現是實現數據源的代理,將咱們本身的邏輯注入進去。而咱們的TCC能彌補咱們沒有使用數據庫的狀況,將提交和回滾都交由咱們本身實現,其核心實現邏輯是依賴將一個資源的二階段的方法和咱們的目標對象在咱們的資源上下文中保存下來,方便咱們後續使用。

最後若是你們對分佈式事務感興趣,歡迎你們使用並閱讀Seata的代碼,並給咱們提出建議。

若是你們以爲這篇文章對你有幫助,你的關注和轉發是對我最大的支持,O(∩_∩)O:

相關文章
相關標籤/搜索