tcc-transaction 分析

  tcc-transaction是TCC型事務java實現,具體項目地址  點我。本文經過tcc-transaction和Springcloud,分析下tcc-transaction的原理。java

  要了解一個東西首先就要先會用它,tcc-transaction自己有多個模塊,因爲咱們是和springcloud結合,因此咱們只須要引入如下四個模塊。git

此次demo咱們就兩個服務,分別是Trade(交易)和account(積分)。交易在完成的同時,給當前用戶增長指定的積分。各個項目只須要引入一個tcc配置類github

@Configuration
@ImportResource(locations = "classpath:tcc-transaction.xml")
public class TccConfig {

    @Autowired
    private TccDataSourceProperties properties;

    @Bean
    public TransactionRepository transactionRepository(ObjectSerializer<?> serializer) {
        SpringJdbcTransactionRepository repository = new SpringJdbcTransactionRepository();
        repository.setDataSource(druidDataSource()); repository.setDomain("TRADE"); repository.setTbSuffix("_TRADE");
        repository.setSerializer(serializer);
        return repository;
    }

    /**
     * 設置恢復策略
     * @return
     */
    @Bean
    public DefaultRecoverConfig defaultRecoverConfig(){
        DefaultRecoverConfig defaultRecoverConfig=new DefaultRecoverConfig();
        defaultRecoverConfig.setCronExpression("0 */1 * * * ?");
        defaultRecoverConfig.setMaxRetryCount(120);
        defaultRecoverConfig.setMaxRetryCount(30);
        return defaultRecoverConfig;
    }

    public DataSource druidDataSource(){
        DruidDataSource datasource = new DruidDataSource();
        datasource.setUrl(properties.getUrl());
        datasource.setDriverClassName(properties.getDriverClassName());
        datasource.setUsername(properties.getUsername());
        datasource.setPassword(properties.getPassword());
        datasource.setInitialSize(10);
        datasource.setMinIdle(1);
        datasource.setMaxWait(6000);
        datasource.setMaxActive(10);
        datasource.setMinEvictableIdleTimeMillis(1800000);
        return datasource;
    }

    @Bean
    public ObjectSerializer<?> objectSerializer() {
        return new KryoTransactionSerializer();
    }

}

 TransactionRepository的subffix要和表名一致,例如,trade表的建表語句以下spring

CREATE TABLE `TCC_TRANSACTION_TRADE` (
  `TRANSACTION_ID` int(11) NOT NULL AUTO_INCREMENT,
  `DOMAIN` varchar(100) DEFAULT NULL,
  `GLOBAL_TX_ID` varbinary(32) NOT NULL,
  `BRANCH_QUALIFIER` varbinary(32) NOT NULL,
  `CONTENT` varbinary(8000) DEFAULT NULL,
  `STATUS` int(11) DEFAULT NULL,
  `TRANSACTION_TYPE` int(11) DEFAULT NULL,
  `RETRIED_COUNT` int(11) DEFAULT NULL,
  `CREATE_TIME` datetime DEFAULT NULL,
  `LAST_UPDATE_TIME` datetime DEFAULT NULL,
  `VERSION` int(11) DEFAULT NULL,
  PRIMARY KEY (`TRANSACTION_ID`),
  UNIQUE KEY `UX_TX_BQ` (`GLOBAL_TX_ID`,`BRANCH_QUALIFIER`)
) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8;

 

 引入以後咱們即可以正式的使用了,首先咱們先發布account的tcc服務數據庫

@Compensable(confirmMethod = "confirmAddPoint",cancelMethod = "cancleAddPoint") public boolean addPoint(TransactionContext transactionContext, Integer point, String userId) {
        Account account = accountService.
                selectOne(new EntityWrapper<Account>().eq("user_id", userId));
        CheckUtil.notNull(account,"account not null");
        return true;
    }

    @Override
    public void confirmAddPoint(TransactionContext transactionContext, Integer point, String userId) {
        //建議用悲觀鎖先鎖定資源再去更新
        logger.info("肯定 新增積分");
        throw new RuntimeException("模擬confirm階段異常拋出");
//        Account account = accountService.
//                selectOne(new EntityWrapper<Account>().eq("user_id", userId));
//        Long point1 = account.getPoint();
//        long l = point1 + point;
//        account.setPoint(l);
//        boolean update = accountService.updateById(account);
//        CheckUtil.notFalse(update,"account update fail");
    }

    @Override
    public void cancleAddPoint(TransactionContext transactionContext, Integer point, String userId) {
        //trying階段沒作任何操做
    }

 

 發佈tcc服務有4個約束api

  1. 在服務方法上加上@Compensable註解
  2. 服務方法第一個入參類型爲org.mengyun.tcctransaction.api.TransactionContext
  3. 服務方法的入參都須能序列化(實現Serializable接口)
  4. try方法、confirm方法和cancel方法入參類型須同樣

咱們再去調用account的tcc服務數組

@Compensable(confirmMethod = "confirmPay",cancelMethod = "canselPay") public boolean pay(Long tradeId,String userId) {
        Trade trade = tradeService.selectById(tradeId);
        CheckUtil.notNull(trade,"trade not null");
        CheckUtil.notNull(trade.getPrice(),"price not null");
        CheckUtil.notFalse(trade.getStatus().compareTo(TradeStatusEnum.CONFIRMINF.getValue())==0,"trade is finish");
        Result result = accountRouteService.addPoint(null,trade.getPrice().intValue(), userId);
        CheckUtil.notFalse(result.isSuccess(),"積分增長失敗");
        return true;
    }

    @Override
    public void confirmPay(Long tradeId,String userId) {
        logger.info("開始confirm pay");
        Trade trade = tradeService.selectById(tradeId);
        //訂單已完結不做任何處理
        if (trade.getStatus().compareTo(TradeStatusEnum.CONFIRMINF.getValue())!=0){
            return;
        }
        Trade trade1=new Trade();
        trade1.setId(trade.getId());
        trade1.setStatus(TradeStatusEnum.SUCCESS.getValue());
        boolean update = tradeService.updateById(trade1);
        CheckUtil.notFalse(update,"trade fail to success");
    }

    @Override
    public void canselPay(Long tradeId,String userId) {
        Trade trade = tradeService.selectById(tradeId);
        //訂單已完結不做任何處理
        if (trade.getStatus().compareTo(TradeStatusEnum.CONFIRMINF.getValue())!=0){
            return;
        }
        Trade trade1=new Trade();
        trade1.setId(trade.getId());
        trade1.setStatus(TradeStatusEnum.FAIL.getValue());
        boolean update = tradeService.updateById(trade1);
        CheckUtil.notFalse(update,"trade fail to failed");
    }

 

 與發佈一個Tcc服務不一樣,本地Tcc服務方法有三個約束:緩存

  1. 在服務方法上加上@Compensable註解
  2. 服務方法的入參都須能序列化(實現Serializable接口)
  3. try方法、confirm方法和cancel方法入參類型須同樣

即與發佈Tcc服務不一樣的是本地Tcc服務無需聲明服務方法第一個入參類型爲org.mengyun.tcctransaction.api.TransactionContext。app

更細節的使用咱們仍是要參看官網,下面咱們啓動服務,故意在confirm階段拋異常,就會發現trade和account會一直在confirm階段不斷重試,而後咱們將confirm階段異常去掉,兩個服務都會同時達到成功,就像處於同一個數據庫事務同樣。分佈式

tcc主要依靠aop攔截來實現。咱們能夠先看下tcc-transaction.xml配置文件

 

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:aop="http://www.springframework.org/schema/aop" xmlns:task="http://www.springframework.org/schema/task"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd   http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">

    <!-- 開啓Spring對@AspectJ風格切面的支持(由於下面用到自定義的TCC補償切面類) -->
    <!-- @Aspect註解不能被Spring自動識別並註冊爲Bean,所以要經過xml的bean配置,或經過@Compenent註解標識其爲Spring管理Bean -->
    <aop:aspectj-autoproxy/>

    <context:component-scan base-package="org.mengyun.tcctransaction.spring"/>

    <bean id="springBeanFactory" class="org.mengyun.tcctransaction.spring.support.SpringBeanFactory"/>


    <!-- TCC事務配置器 -->
    <bean id="tccTransactionConfigurator" class="org.mengyun.tcctransaction.spring.support.TccTransactionConfigurator">
    </bean>

    <!-- 事務恢復 -->
    <bean id="transactionRecovery" class="org.mengyun.tcctransaction.recover.TransactionRecovery">
        <property name="transactionConfigurator" ref="tccTransactionConfigurator"/>
    </bean>

    <!-- 可補償事務攔截器 -->
    <bean id="compensableTransactionInterceptor"
          class="org.mengyun.tcctransaction.interceptor.CompensableTransactionInterceptor">
        <property name="transactionConfigurator" ref="tccTransactionConfigurator"/>
    </bean>

    <!-- 資源協調攔截器 -->
    <bean id="resourceCoordinatorInterceptor"
          class="org.mengyun.tcctransaction.interceptor.ResourceCoordinatorInterceptor">
        <property name="transactionConfigurator" ref="tccTransactionConfigurator"/>
    </bean>

    <!-- TCC補償切面 -->
    <bean id="tccCompensableAspect" class="org.mengyun.tcctransaction.spring.TccCompensableAspect">
        <property name="compensableTransactionInterceptor" ref="compensableTransactionInterceptor"/>
    </bean>

    <!-- TCC事務上下文切面 -->
    <bean id="transactionContextAspect" class="org.mengyun.tcctransaction.spring.TccTransactionContextAspect">
        <property name="resourceCoordinatorInterceptor" ref="resourceCoordinatorInterceptor"/>
    </bean>

    <!-- 啓用定時任務註解 -->
    <task:annotation-driven/>

    <!-- 事務恢復任務調度器 -->
    <bean id="recoverScheduler" class="org.springframework.scheduling.quartz.SchedulerFactoryBean"/>

    <!-- 事務恢復調度任務,初始化方法:init -->
    <bean id="recoverScheduledJob" class="org.mengyun.tcctransaction.spring.recover.RecoverScheduledJob" init-method="init">
        <property name="transactionConfigurator" ref="tccTransactionConfigurator"/>
        <property name="transactionRecovery" ref="transactionRecovery"/>
        <property name="scheduler" ref="recoverScheduler"/>
    </bean>

</beans>

 TccTransactionConfigurator配置了tcc的數據庫鏈接和事務管理

public class TccTransactionConfigurator implements TransactionConfigurator {

    /**
     * 事務庫
     */
    @Autowired
    private TransactionRepository transactionRepository;

    /**
     * 事務恢復配置
     */
    @Autowired(required = false)
    private RecoverConfig recoverConfig = DefaultRecoverConfig.INSTANCE;

    .....
}

 

 TransactionRepository就是咱們在TccConfig中配置的

@Bean
    public TransactionRepository transactionRepository(ObjectSerializer<?> serializer) {
        SpringJdbcTransactionRepository repository = new SpringJdbcTransactionRepository();
        repository.setDataSource(druidDataSource());
        repository.setDomain("ACCOUNT");
        repository.setTbSuffix("_ACCOUNT");
        repository.setSerializer(serializer);
        return repository;
    }

 

接下來就是兩個攔截器和切面

兩個攔截器的切點以下

TccCompensableAspect @Pointcut("@annotation(org.mengyun.tcctransaction.Compensable)")
TccTransactionContextAspect @Pointcut("execution(public * *(org.mengyun.tcctransaction.api.TransactionContext,..))||@annotation(org.mengyun.tcctransaction.Compensable)")

兩個攔截器分別攔截  Compensable 註解和方法參數第一位是是TransactionContext 的方法。當咱們調用如下方法時,攔截就會開始

@Compensable(confirmMethod = "confirmPay",cancelMethod = "canselPay") public boolean pay(Long tradeId,String userId) {
        ......
        return true;
    }

 

首先進入的切面是 org.mengyun.tcctransaction.interceptor.CompensableTransactionInterceptor#interceptCompensableMethod

public Object interceptCompensableMethod(ProceedingJoinPoint pjp) throws Throwable {

        // 從攔截方法的參數中獲取事務上下文
        TransactionContext transactionContext = CompensableMethodUtils.getTransactionContextFromArgs(pjp.getArgs());
        
        // 計算可補償事務方法類型
        MethodType methodType = CompensableMethodUtils.calculateMethodType(transactionContext, true);
        
        logger.debug("==>interceptCompensableMethod methodType:" + methodType.toString());

        switch (methodType) {
            case ROOT:
            // 主事務方法的處理
                return rootMethodProceed(pjp); 
            case PROVIDER:
            // 服務提供者事務方法處理
                return providerMethodProceed(pjp, transactionContext); 
            default:
                return pjp.proceed(); // 其餘的方法都是直接執行
        }
    }

 

切面首先會獲取到TransactionContext,可是在服務的發起方TransactionContext是null,在下一步的計算方法類型的步驟中,切面經過是否含有註解和TransactionContext來計算類型。

private Object rootMethodProceed(ProceedingJoinPoint pjp) throws Throwable {
        transactionConfigurator.getTransactionManager().begin(); // 事務開始(建立事務日誌記錄,並在當前線程緩存該事務日誌記錄)
        Object returnValue = null; // 返回值
        try {
            logger.debug("==>rootMethodProceed try begin");
            returnValue = pjp.proceed();  // Try (開始執行被攔截的方法)
            logger.debug("==>rootMethodProceed try end");
        } catch (OptimisticLockException e) {
            logger.warn("==>compensable transaction trying exception.", e);
            throw e; //do not rollback, waiting for recovery job
        } catch (Throwable tryingException) {
            logger.warn("compensable transaction trying failed.", tryingException);
            transactionConfigurator.getTransactionManager().rollback(); throw tryingException;
        }
        logger.info("===>rootMethodProceed begin commit()");
        transactionConfigurator.getTransactionManager().commit(); // Try檢驗正常後提交(事務管理器在控制提交)
        return returnValue;
    }

 

在rootMethodProceed中,首先新建一個全局事務,保存到數據庫

public void begin() {
        LOG.debug("==>begin()");
        Transaction transaction = new Transaction(TransactionType.ROOT); // 事務類型爲ROOT:1
        LOG.debug("==>TransactionType:" + transaction.getTransactionType().toString() + ", Transaction Status:" + transaction.getStatus().toString());
        TransactionRepository transactionRepository = transactionConfigurator.getTransactionRepository();
        transactionRepository.create(transaction); // 建立事務記錄,寫入事務日誌庫
        threadLocalTransaction.set(transaction); // 將該事務日誌記錄存入當前線程的事務局部變量中
    }

 

接着會執行pjp.proceed(); 還記得上面講的兩個切面麼,這個時候會被第二個切面了攔截

org.mengyun.tcctransaction.interceptor.ResourceCoordinatorInterceptor#interceptTransactionContextMethod

public Object interceptTransactionContextMethod(ProceedingJoinPoint pjp) throws Throwable {
        LOG.debug("==>interceptTransactionContextMethod(ProceedingJoinPoint pjp)");
        // 獲取當前事務
        Transaction transaction = transactionConfigurator.getTransactionManager().getCurrentTransaction();
        // Trying(判斷是否Try階段的事務)
        if (transaction != null && transaction.getStatus().equals(TransactionStatus.TRYING)) {
            LOG.debug("==>TransactionStatus:" + transaction.getStatus().toString());
            // 從參數獲取事務上下文
            TransactionContext transactionContext = CompensableMethodUtils.getTransactionContextFromArgs(pjp.getArgs());
            // 獲取事務補償註解
            Compensable compensable = getCompensable(pjp);
            // 計算方法類型
            MethodType methodType = CompensableMethodUtils.calculateMethodType(transactionContext, compensable != null ? true : false);
            LOG.debug("==>methodType:" + methodType.toString());
            switch (methodType) {
                case ROOT:
                    generateAndEnlistRootParticipant(pjp); // 生成和登記根參與者
                    break;
                case CONSUMER:
                    generateAndEnlistConsumerParticipant(pjp); // 生成並登記消費者的參與者
                    break;
                case PROVIDER:
                    generateAndEnlistProviderParticipant(pjp); // 生成並登記服務提供者的參與者
                    break;
            }
        }
        LOG.debug("==>pjp.proceed(pjp.getArgs())");
        return pjp.proceed(pjp.getArgs());
    }

 

這裏計算出來的類型是root,咱們直接看 generateAndEnlistRootParticipant 

private Participant generateAndEnlistRootParticipant(ProceedingJoinPoint pjp) {
        LOG.debug("==>generateAndEnlistRootParticipant(ProceedingJoinPoint pjp)");
        MethodSignature signature = (MethodSignature) pjp.getSignature();
        Method method = signature.getMethod();
        Compensable compensable = getCompensable(pjp);
        String confirmMethodName = compensable.confirmMethod(); // 確認方法
        String cancelMethodName = compensable.cancelMethod(); // 取消方法
        // 獲取當前事務
        Transaction transaction = transactionConfigurator.getTransactionManager().getCurrentTransaction(); 
        // 獲取事務Xid
        TransactionXid xid = new TransactionXid(transaction.getXid().getGlobalTransactionId()); 
        LOG.debug("==>TransactionXid:" + TransactionXid.byteArrayToUUID(xid.getGlobalTransactionId()).toString()
                + "|" + TransactionXid.byteArrayToUUID(xid.getBranchQualifier()).toString());

        Class targetClass = ReflectionUtils.getDeclaringType(pjp.getTarget().getClass(), method.getName(), method.getParameterTypes());

        // 構建確認方法的提交上下文
        InvocationContext confirmInvocation = new InvocationContext(targetClass,
                confirmMethodName,
                method.getParameterTypes(), pjp.getArgs());        
        // 構建取消方法的提交上下文
        InvocationContext cancelInvocation = new InvocationContext(targetClass,
                cancelMethodName,
                method.getParameterTypes(), pjp.getArgs());
        // 構建參與者對像
        Participant participant =
                new Participant(
                        xid,
                        new Terminator(confirmInvocation, cancelInvocation));
        // 加入參與者
        transaction.enlistParticipant(participant); 
        TransactionRepository transactionRepository = transactionConfigurator.getTransactionRepository();
        // 更新事務
        transactionRepository.update(transaction); 
        return participant;
    }

 generateAndEnlistRootParticipant主要是構建Compensable註解上的兩個參數,confirmMethod 和 cancelMethod,並構建兩個方法的上下文,序列化保存到數據庫中。

如今終於能夠執行咱們真正的邏輯了。當咱們執行到調用遠程服務時。由於該方法的第一個參數是TransactionContext。因此又要被第二個切面攔截了。

Result result = accountRouteService.addPoint(null,trade.getPrice().intValue(), userId);
@PostMapping(value = "tccaccount/account/addPoint")
Result addPoint(@RequestBody TransactionContext context, @RequestParam(value = "point") Integer point,
@RequestParam(value = "userId") String userId);

 

public Object interceptTransactionContextMethod(ProceedingJoinPoint pjp) throws Throwable {
        LOG.debug("==>interceptTransactionContextMethod(ProceedingJoinPoint pjp)");
        // 獲取當前事務
        Transaction transaction = transactionConfigurator.getTransactionManager().getCurrentTransaction();
        
        // Trying(判斷是否Try階段的事務)
        if (transaction != null && transaction.getStatus().equals(TransactionStatus.TRYING)) {
            LOG.debug("==>TransactionStatus:" + transaction.getStatus().toString());
            // 從參數獲取事務上下文
            TransactionContext transactionContext = CompensableMethodUtils.getTransactionContextFromArgs(pjp.getArgs());
            // 獲取事務補償註解
            Compensable compensable = getCompensable(pjp);
            // 計算方法類型
            MethodType methodType = CompensableMethodUtils.calculateMethodType(transactionContext, compensable != null ? true : false);
            LOG.debug("==>methodType:" + methodType.toString());
            
            switch (methodType) {
                case ROOT:
                    generateAndEnlistRootParticipant(pjp); // 生成和登記根參與者
                    break;
                case CONSUMER:
                    generateAndEnlistConsumerParticipant(pjp); // 生成並登記消費者的參與者
                    break;
                case PROVIDER:
                    generateAndEnlistProviderParticipant(pjp); // 生成並登記服務提供者的參與者
                    break;
            }
        }

 

這裏methodType計算出來是CONSUMER

private Participant generateAndEnlistConsumerParticipant(ProceedingJoinPoint pjp) {
        LOG.debug("==>generateAndEnlistConsumerParticipant(ProceedingJoinPoint pjp)");
        MethodSignature signature = (MethodSignature) pjp.getSignature();
        Method method = signature.getMethod();
        Transaction transaction = transactionConfigurator.getTransactionManager().getCurrentTransaction(); // 獲取當前事務
        TransactionXid xid = new TransactionXid(transaction.getXid().getGlobalTransactionId()); // 獲取事務Xid
        LOG.debug("==>TransactionXid:" + TransactionXid.byteArrayToUUID(xid.getGlobalTransactionId()).toString()
                + "|" + TransactionXid.byteArrayToUUID(xid.getBranchQualifier()).toString());
        
        // 獲取事務上下文參數的位置
        int position = CompensableMethodUtils.getTransactionContextParamPosition(((MethodSignature) pjp.getSignature()).getParameterTypes());
        
        // 給服務接口的TransactionContext參數設值
        pjp.getArgs()[position] = new TransactionContext(xid, transaction.getStatus().getId()); // 構建事務上下文

        Object[] tryArgs = pjp.getArgs(); // 獲取服務接口參數
        Object[] confirmArgs = new Object[tryArgs.length]; // 確認提交參數
        Object[] cancelArgs = new Object[tryArgs.length]; // 取消提交參數

        System.arraycopy(tryArgs, 0, confirmArgs, 0, tryArgs.length); // 數組拷貝
        confirmArgs[position] = new TransactionContext(xid, TransactionStatus.CONFIRMING.getId()); // 構建事務確認上下文
        System.arraycopy(tryArgs, 0, cancelArgs, 0, tryArgs.length); // 數組拷貝
        cancelArgs[position] = new TransactionContext(xid, TransactionStatus.CANCELLING.getId()); // 構建事務取消上下文
        Class targetClass = ReflectionUtils.getDeclaringType(pjp.getTarget().getClass(), method.getName(), method.getParameterTypes());
        
        // 構建確認方法的提交上下文
        InvocationContext confirmInvocation = new InvocationContext(targetClass, method.getName(), method.getParameterTypes(), confirmArgs);
        // 構建取消方法的提交上下文
        InvocationContext cancelInvocation = new InvocationContext(targetClass, method.getName(), method.getParameterTypes(), cancelArgs);

        // 構建參與者對像
        Participant participant =
                new Participant(
                        xid,
                        new Terminator(confirmInvocation, cancelInvocation));

        transaction.enlistParticipant(participant); // 加入到參與者
        TransactionRepository transactionRepository = transactionConfigurator.getTransactionRepository();
        transactionRepository.update(transaction); // 更新事務

        return participant;
    }

這個generateAndEnlistConsumerParticipant 方法幹了什麼呢。上文咱們調用遠程服務時,第一個入參的TransactionContext是null,因此這裏咱們咱們構建了一個 TransactionContext 。這裏的全局事務id和以前構造的TransactionContext 都是一致的,而後咱們再構建 確認與取消方法保存到數據庫中。如今咱們在trade的切面攔截終於結束了。調用遠程方法到account服務時,咱們又遇到了新的一次攔截

@Compensable(confirmMethod = "confirmAddPoint",cancelMethod = "cancleAddPoint")
    public boolean addPoint(TransactionContext transactionContext, Integer point, String userId) {
        Account account = accountService.
                selectOne(new EntityWrapper<Account>().eq("user_id", userId));
        CheckUtil.notNull(account,"account not null");
        return true;
    }

 

此次的攔截和在trade遇到的類似,可是此次算出來的是 PROVIDER 類型

public Object interceptCompensableMethod(ProceedingJoinPoint pjp) throws Throwable {

        // 從攔截方法的參數中獲取事務上下文
        TransactionContext transactionContext = CompensableMethodUtils.getTransactionContextFromArgs(pjp.getArgs());
        
        // 計算可補償事務方法類型
        MethodType methodType = CompensableMethodUtils.calculateMethodType(transactionContext, true);
        
        logger.debug("==>interceptCompensableMethod methodType:" + methodType.toString());

        switch (methodType) {
            case ROOT:
                return rootMethodProceed(pjp); // 主事務方法的處理
            case PROVIDER:
                return providerMethodProceed(pjp, transactionContext); // 服務提供者事務方法處理
            default:
                return pjp.proceed(); // 其餘的方法都是直接執行
        }
    }

 

如今咱們是trying階段,因此如今第一個切面和第二個切面只須要在本地數據庫保存下事務的二進制流,校驗try階段的邏輯便可。

private Object providerMethodProceed(ProceedingJoinPoint pjp, TransactionContext transactionContext) throws Throwable {
        logger.debug("==>providerMethodProceed transactionStatus:" + TransactionStatus.valueOf(transactionContext.getStatus()).toString());
        switch (TransactionStatus.valueOf(transactionContext.getStatus())) {
            case TRYING:
                logger.debug("==>providerMethodProceed try begin");
                // 基於全局事務ID擴展建立新的分支事務,並存於當前線程的事務局部變量中.
                transactionConfigurator.getTransactionManager().propagationNewBegin(transactionContext);
                logger.debug("==>providerMethodProceed try end");
                return pjp.proceed();
            case CONFIRMING: try {
                    logger.debug("==>providerMethodProceed confirm begin");
                    // 找出存在的事務並處理.
                    transactionConfigurator.getTransactionManager().propagationExistBegin(transactionContext);
                    transactionConfigurator.getTransactionManager().commit(); // 提交
                    logger.debug("==>providerMethodProceed confirm end");
                } catch (NoExistedTransactionException excepton) {
                    //the transaction has been commit,ignore it.
                }
                break;
            case CANCELLING: try {
                    logger.debug("==>providerMethodProceed cancel begin");
                    transactionConfigurator.getTransactionManager().propagationExistBegin(transactionContext);
                    transactionConfigurator.getTransactionManager().rollback(); // 回滾
                    logger.debug("==>providerMethodProceed cancel end");
                } catch (NoExistedTransactionException exception) {
                    //the transaction has been rollback,ignore it.
                }
                break;
        }
        Method method = ((MethodSignature) (pjp.getSignature())).getMethod();
        return ReflectionUtils.getNullValue(method.getReturnType());
    }

 

當account服務執行成功以後,咱們回到trade服務  如今returnValue = pjp.proceed(); 這一行終於執行完了。也就是說各個事務的準備階段都完成了。只有兩種狀況,成功和非成功(失敗和超時)。咱們樂觀點,看當作功後是怎麼提交所有事物的。

private Object rootMethodProceed(ProceedingJoinPoint pjp) throws Throwable {
        logger.debug("==>rootMethodProceed");

        transactionConfigurator.getTransactionManager().begin(); // 事務開始(建立事務日誌記錄,並在當前線程緩存該事務日誌記錄)

        Object returnValue = null; // 返回值
        try {
            
            logger.debug("==>rootMethodProceed try begin");
            returnValue = pjp.proceed();  // Try (開始執行被攔截的方法)
            logger.debug("==>rootMethodProceed try end");
            
        } catch (OptimisticLockException e) {
            logger.warn("==>compensable transaction trying exception.", e);
            throw e; //do not rollback, waiting for recovery job
        } catch (Throwable tryingException) {
            logger.warn("compensable transaction trying failed.", tryingException);
            transactionConfigurator.getTransactionManager().rollback(); throw tryingException;
        }

        logger.info("===>rootMethodProceed begin commit()");
        transactionConfigurator.getTransactionManager().commit(); // Try檢驗正常後提交(事務管理器在控制提交)

        return returnValue;
    }

 

事務提交總共分爲兩部分,

1.更改狀態保存到數據庫。作持久化保存,這樣宕機了也能恢復。

2.反射調用切面攔截保存的參與者

public void commit() {
        LOG.debug("==>TransactionManager commit()");
        Transaction transaction = getCurrentTransaction();

        transaction.changeStatus(TransactionStatus.CONFIRMING);
        LOG.debug("==>update transaction status to CONFIRMING");
        //更改狀態爲CONFIRMING,數據庫持久化保存
        transactionConfigurator.getTransactionRepository().update(transaction);

        try {
            LOG.info("==>transaction begin commit()");
            //事務提交
 transaction.commit();
            transactionConfigurator.getTransactionRepository().delete(transaction);
        } catch (Throwable commitException) {
            LOG.error("compensable transaction confirm failed.", commitException);
            throw new ConfirmingException(commitException);
        }
    }

    public void commit() {
        LOG.debug("==>Transaction.commit()");
        for (Participant participant : participants) {
            participant.commit();
        }
    }

    public void commit() {
        LOG.debug("==>Participant.rollback()");
        terminator.commit();
    }

    public void commit() {
        LOG.debug("==>Terminator commit invoke");
        invoke(confirmInvocationContext);
    }

    private Object invoke(InvocationContext invocationContext) {

        if (StringUtils.isNotEmpty(invocationContext.getMethodName())) {
            
            LOG.debug("==>Terminator invoke " + invocationContext.getTargetClass().getName() + "." + invocationContext.getMethodName());

            try {
                Object target = FactoryBuilder.factoryOf(invocationContext.getTargetClass()).getInstance();

                // 找到要調用的目標方法
                Method method = target.getClass().getMethod(invocationContext.getMethodName(), invocationContext.getParameterTypes());

                // 調用服務方法,被再次被TccTransactionContextAspect和ResourceCoordinatorInterceptor攔截,但由於事務狀態已經再也不是TRYING了,因此直接執行遠程服務
                return method.invoke(target, invocationContext.getArgs()); // 調用服務方法

            } catch (Exception e) {
                throw new SystemException(e);
            }
        }
        return null;
    }

本地的反射調用時很好實現的,可是如何遠程反射調用其餘服務的confirm方法呢。

tcc-transaction根據transactionContext的狀態字段,經過切面攔截 當transactionContext變成CONFIRMING時,就會反射調用confirmInvocationContext

case TRYING:
                logger.debug("==>providerMethodProceed try begin");
                // 基於全局事務ID擴展建立新的分支事務,並存於當前線程的事務局部變量中.
                transactionConfigurator.getTransactionManager().propagationNewBegin(transactionContext);
                logger.debug("==>providerMethodProceed try end");
                return pjp.proceed();
            case CONFIRMING: try {
                    logger.debug("==>providerMethodProceed confirm begin");
                    // 找出存在的事務並處理.
 transactionConfigurator.getTransactionManager().propagationExistBegin(transactionContext); transactionConfigurator.getTransactionManager().commit(); // 提交
                    logger.debug("==>providerMethodProceed confirm end");
                } catch (NoExistedTransactionException excepton) {
                    //the transaction has been commit,ignore it.
                }
                break;

到這裏,一個tcc分佈式理論上是已經完成了。可是,咱們考慮下一種場景,若是在confirm階段出現異常怎麼辦呢?

tcc-transaction還提供了恢復功能。他能從數據庫找到還未完成的事物,間隔的去執行,咱們也能夠配置相關的策略

/**
     * 設置恢復策略
     * @return
     */
    @Bean
    public DefaultRecoverConfig defaultRecoverConfig(){
        DefaultRecoverConfig defaultRecoverConfig=new DefaultRecoverConfig();
        defaultRecoverConfig.setCronExpression("0 */1 * * * ?");
        defaultRecoverConfig.setMaxRetryCount(120);
        return defaultRecoverConfig;
    }

 

 

以上就是tcc-transaction的分析

相關文章
相關標籤/搜索