結合Spring事務源碼分析TransactionSynchronizationAdapter之afterCommit回調方法事務回滾問題

問題提出

   在項目開發中咱們常常會在事務方法A處理一些與業務關聯性較低的邏輯C,對於邏輯C,一般會加入隊列或者利用Spring事務同步回調機制去處理。用Spring事務同步回調機制能夠保證在業務主邏輯執行成功並提交後,再去執行其餘關聯邏輯,如發送釘釘消息到業務方或者通知其餘業務模塊作相應邏輯等。可是假如邏輯C也是事務方法,那麼C方法執行過程當中拋了異常,A和C的事務會回滾嗎?若是事務C傳播行爲發生改變,C事務拋出異常,那麼A和C事務又是如何呢?這個值得探討,本文主要圍繞如下兩個問題進行探討。mysql

@Override
@Transactional(propagation = Propagation.REQUIRED)
public Result A() {
    // 業務邏輯
    TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
        @Override
        public void afterCommit() {
            // C方法是另外一個事務方法,和A()方法不在同一個類中
            C();
        }
    });
    return Result.success(null);
}
複製代碼

問題1:

   回調方法中若是存在事務方法C(默認傳播行爲),那麼C若是報錯,會回滾A嗎,會回滾C報錯前的更新嗎?git

問題2:

   默認事務傳播行爲是REQUIRED,假如是傳播行爲是REQUIRED_NEW,C事務回滾是如何的?github

結論

   在討論這個問題前,先把結論放在問題後面。後面會結合Spring源碼分析這些問題。spring

1)C事務是否存在異常不影響A事務的正常提交,事務C拋出異常後,在默認隔離級別下,異常以前的更新操做不會回滾。sql

2)傳播行爲對事務C回滾策略有所影響,當C事務拋出異常,默認傳播行爲下,C事務方法不回滾;當傳播行爲爲REQUIRES_NEW時,C事務方法會執行回滾策略,這主要與DefaultTransactionStatus中的newTransaction值有關數據庫

代碼演示

演示代碼稍後會貼在github上。緩存

@Override
@Transactional
public Result verify(String orderNo,  Integer value, String checkName) {
    OrderDTO order = orderService.findByBizCode(orderNo);
    order.setValue(value);
    order.updateValue(orderNo, value);

    TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
        @Override
        public void afterCommit() {
            checkService.updateCheckName(orderNo, checkName);
        }
    });
    return Result.success(null);
}
複製代碼
@Override
@Transactional
public void updateCheckName(String orderNo, String checkName) {
    getDao().updateCheckName(orderNo, checkName);
    throw new RuntimeException("測試事務");
}
複製代碼
@Test
public void verify() {
    auctionBizService.verify("A20190614152128000116", 7000009, "tiankunlun88");
}
複製代碼

驗證結果:

1)回調方法中updateCheckName(..)是否報錯,不影響verify(..)方法提交;bash

2)對於updateCheckName(..)方法,執行過程當中拋出異常時,當propagation = Propagation.Propagation.REQUIRES_NEW時,事務會回滾;當propagation = Propagation.Propagation.REQUIRED時,事務不會回滾。session

源碼簡析

spring在掃描配置文件建立BeanDefination時會解析類中註解做爲其屬性,在後續實例及初始化過程當中,Spring會根據是否存在@Transaction註解,利用後置處理器BeanPostProcessor進行代理對象的建立。當執行類中某一方法時首先調用org.springframework.aop.framework.CglibAopProxy.DynamicAdvisedInterceptor#intercept方法,最終判斷是否由代理對象執行方法。關於spring事務網上由不少資料,這裏我就本次解決問題,稍微講一下事務源碼。app

DynamicAdvisedInterceptor#intercept

@Override
public Object intercept(Object proxy, Method method, Object[] args, MethodProxy methodProxy) throws Throwable {
    Object oldProxy = null;
    boolean setProxyContext = false;
    Class<?> targetClass = null;
    Object target = null;
    try {
        if (this.advised.exposeProxy) {
            // Make invocation available if necessary.
            oldProxy = AopContext.setCurrentProxy(proxy);
            setProxyContext = true;
        }
        // May be null. Get as late as possible to minimize the time we
        // "own" the target, in case it comes from a pool...
        target = getTarget();
        if (target != null) {
            targetClass = target.getClass();
        }
        // 獲取執行器鏈
        List<Object> chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass);
        Object retVal;
        // 判斷切面是否爲空或方法是不是public方法,若是爲空或不是public方法就直接反射調用方法便可
        if (chain.isEmpty() && Modifier.isPublic(method.getModifiers())) {
            // We can skip creating a MethodInvocation: just invoke the target directly.
            Object[] argsToUse = AopProxyUtils.adaptArgumentsIfNecessary(method, args);
            retVal = methodProxy.invoke(target, argsToUse);
        }
        else {
            // 建立method invocation,而後進行事務加強
            retVal = new CglibMethodInvocation(proxy, target, method, args, targetClass, chain, methodProxy).proceed();
        }
        retVal = processReturnType(proxy, target, method, retVal);
        return retVal;
    }
    finally {
        if (target != null) {
            releaseTarget(target);
        }
        if (setProxyContext) {
            // Restore old proxy.
            AopContext.setCurrentProxy(oldProxy);
        }
    }
}
複製代碼

分析: 經過intercept方法,最終調用ReflectiveMethodInvocation的proceed方法,最終經過TransactionAspectSupport#invokeWithinTransaction,執行事務方法,這也是Spring事務源碼中最核心的方法之一了。下面咱們還看下該方法都作了什麼事情。

TransactionAspectSupport#invokeWithinTransaction

protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation)
        throws Throwable {
    //經過事務屬性源TransactionAttributeSource讀取事務的屬性配置,即調用上面名稱匹配  
    //事務屬性源NameMatchTransactionAttributeSource的方法   
    final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass);
    
    //獲取Spring事務管理IoC容器配置的事務處理器 
    final PlatformTransactionManager tm = determineTransactionManager(txAttr);
    
    //獲取目標類指定方法的事務鏈接點 
    final String joinpointIdentification = methodIdentification(method, targetClass);
    if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
        // 建立事務,將當前事務狀態和信息保存到TransactionInfo對象中 
        // 這個方法很很重要,設置傳播行爲、隔離級別、手動提交、開啓事務,另一點很是重要
        // 綁定事務txInfo到當前線程,這裏面用了使用許多緩存和threadLocal對象,方便獲取鏈接信息等
        // 重要一點,設置newTransaction標識位!!!!!!!!!!
        TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
        Object retVal = null;
        try {
            // This is an around advice: Invoke the next interceptor in the chain.
            // This will normally result in a target object being invoked.
            //沿着攔截器鏈調用處理,使得最後目標對象的方法獲得調用
            retVal = invocation.proceedWithInvocation();
        }
        catch (Throwable ex) {
            //在調用攔截器攔過程當中出現異常,則根據事務配置進行提交或回滾處理  
            completeTransactionAfterThrowing(txInfo, ex);
            throw ex;
        }
        finally {
            //清除與當前線程綁定的事務信息  
            cleanupTransactionInfo(txInfo);
        }
        // 這個方法用來提交事務,回調方法也是在這裏觸發的。
        commitTransactionAfterReturning(txInfo);
        return retVal;
    }
    else {
        //It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in. ...................... 略去 } } 複製代碼

分析: 這個方法是Spring事務處理核心方法,咱們簡要介紹下。

  • createTransactionIfNecessary方法建立了TransactionInfo對象,設置傳播行爲、隔離級別、手動提交(autoCommit=false),綁定事務信息到當前線程中。咱們本次要分析的重點對象newTransaction標識位就在這個方法中設置值的。
  • proceedWithInvocation方法,主要執行目標對象方法。
  • completeTransactionAfterThrowing,見名知意,在目標對象方法出現異常後,判斷是否進行回滾處理。
  • cleanupTransactionInfo方法主要清除當前線程綁定的事務信息。
  • commitTransactionAfterReturning,這個方法主要用來提交事務。

言歸正傳,咱們仍是繼續分析newTransaction是如何設置以及它如何影響事務提交和回滾的。

設置事務A newTransaction值

TransactionAspectSupport#createTransactionIfNecessary

protected TransactionInfo createTransactionIfNecessary(
        	PlatformTransactionManager tm, TransactionAttribute txAttr, final String joinpointIdentification) {
        
    // If no name specified, apply method identification as transaction name.
    if (txAttr != null && txAttr.getName() == null) {
        txAttr = new DelegatingTransactionAttribute(txAttr) {
    	  @Override
    	  public String getName() {
    	     return joinpointIdentification;
    	  }
        };
    }
    TransactionStatus status = null;
    if (txAttr != null) {
    	if (tm != null) {
    	        // 獲取事務狀態,newTransaction就在這個方法中設置的。
    		status = tm.getTransaction(txAttr);
    	}
    	else {
    	   if (logger.isDebugEnabled()) {
    		logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +"] because no transaction manager has been configured");
    	   }
    	}
    }
    return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}
複製代碼

AbstractPlatformTransactionManager#getTransaction

@Override
public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
    Object transaction = doGetTransaction();

    // Cache debug flag to avoid repeated checks.
    boolean debugEnabled = logger.isDebugEnabled();

    if (definition == null) {
        // 若是沒有配置事務屬性,則使用默認的事務屬性
        definition = new DefaultTransactionDefinition();
    }
    
    // 檢查當前線程是否存在事務,若是已存在事務,那麼須要根據在事務屬性中定義的事務傳播屬性來處理事務的產生
    if (isExistingTransaction(transaction)) {
         // 因此afterCommit方法中的事務C方法就是經過這個方法獲取TransactionStatus,
            且newTransaction標識位就是在這裏設置。
        return handleExistingTransaction(definition, transaction, debugEnabled);
    }

    //檢查事務屬性中timeout超時屬性設置是否合理
    if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
        throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
    }

    // 如下代碼就是對事務傳播行進行處理
    // 若是事務傳播特性配置的是mandatory,當前沒有事務存在,拋出異常
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
        throw new IllegalTransactionStateException(
                "No existing transaction found for transaction marked with propagation 'mandatory'");
    }
    // PROPAGATION_REQUIRED,PROPAGATION_REQUIRES_NEW,PROPAGATION_NESTED的邏輯以下
    else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
            definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
            definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
        SuspendedResourcesHolder suspendedResources = suspend(null);
        if (debugEnabled) {
            logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
        }
        try {
            boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
            DefaultTransactionStatus status = newTransactionStatus(
                    definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
            // 開啓事務,開啓手動提交、設置超時時間、綁定資源到當前線程
            doBegin(transaction, definition);
            
            // 初始化和同步事務狀態
            prepareSynchronization(status, definition);
            return status;
        }
        catch (RuntimeException ex) {
            
            // 恢復上一個事務資源
            resume(null, suspendedResources);
            throw ex;
        }
        catch (Error err) {
            resume(null, suspendedResources);
            throw err;
        }
    }
    else {
        // Create "empty" transaction: no actual transaction, but potentially synchronization.
        if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
            logger.warn("Custom isolation level specified but no actual transaction initiated; " +
                    "isolation level will effectively be ignored: " + definition);
        }
        boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
        
        // 走到這步newTransaction標誌爲true,也就是說當事務A中newTransaction標誌爲truereturn prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
    }
}
複製代碼

分析: getTransaction方法根據isExistingTransaction(transaction)判斷當前線是否存在事務不會,若是有事務,走handleExistingTransaction方法。若是沒有,會開啓事務(即mysql中的begin),設置autoCommit = false,設置事務隔離級別,綁定session到到當前線程等,最終建立DefaultTransactionStatus,這個過程會設置newTransaction值。經過源碼能夠發現,噹噹前線程不存在其餘事務時,只要事務A的隔離級別不爲PROPAGATION_MANDATORY,newTransaction都是爲true的。因此事務A的newTransaction標誌爲true。

事務A提交流程分析

當retVal = invocation.proceedWithInvocation()執行完updateValue後,沒有異常,執行finally方法cleanupTransactionInfo(txInfo)後,進行事務提交操做,即執行commitTransactionAfterReturning(txInfo)方法。

TransactionAspectSupport#invokeWithinTransaction

-> invocation.proceedWithInvocation()

-> commitTransactionAfterReturning(txInfo)

  -> 提交事務AbstractPlatformTransactionManager#processCommit

    -> DataSourceTransactionManager#doCommit

      -> 執行con.commit(); 這時候事務A更新操做持久化了

    -> 執行AbstractPlatformTransactionManager#triggerAfterCommit       -> TransactionSynchronizationUtils#invokeAfterCommit,執行回調

小結: 執行invokeAfterCommit後,進入updateCheckName,此時,事務A已經提交了。 回答了問題1中:的部分問題C若是報錯,會回滾A嗎?Spring執行afterCommit中的回調方法,無論C事務是否存在異常,都不是影響事務A,由於A此時已經commit持久化了!!!

事務C提交流程分析

  執行流程與事務A執行流程大都類似,但獲取TransactionStatus方法有所不一樣,在事務C執行流程拋出了異常,這時事務又是如何回滾的呢,這部份內容咱們會獲得問題2的答案。

設置事務C newTransaction值

AbstractPlatformTransactionManager#getTransaction

@Override
public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
    Object transaction = doGetTransaction();

    // Cache debug flag to avoid repeated checks.
    boolean debugEnabled = logger.isDebugEnabled();

    if (definition == null) {
        // Use defaults if no transaction definition given.
        definition = new DefaultTransactionDefinition();
    }
    
    // 這時後線程存在事務,走handleExistingTransaction方法,而事務C的newTransaction
    // 標誌位就在這個方法中設置。
    if (isExistingTransaction(transaction)) {
        // Existing transaction found -> check propagation behavior to find out how to behave.
        return handleExistingTransaction(definition, transaction, debugEnabled);
    }
    ........
}
複製代碼

分析: 此時當前線程存在事務,執行handleExistingTransaction方法。接下來咱們來看下handleExistingTransaction源碼。

AbstractPlatformTransactionManager#handleExistingTransaction

private TransactionStatus handleExistingTransaction(
        TransactionDefinition definition, Object transaction, boolean debugEnabled)
        throws TransactionException {
    // 若是事務傳播特性爲:never,則拋出異常
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
        throw new IllegalTransactionStateException(
                "Existing transaction found for transaction marked with propagation 'never'");
    }
    
    // 若是事務傳播特性是PROPAGATION_NOT_SUPPORTED
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
        if (debugEnabled) {
            logger.debug("Suspending current transaction");
        }
        // 當前線程存在事務,則將事務掛起
        Object suspendedResources = suspend(transaction);
        boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
        // 建立非事務的事務狀態,讓方法不走事務
        return prepareTransactionStatus(
                definition, null, false, newSynchronization, debugEnabled, suspendedResources);
    }
    
    // 重點來來,若是傳播行爲PROPAGATION_REQUIRES_NEW
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
        if (debugEnabled) {
            logger.debug("Suspending current transaction, creating new transaction with name [" +
                    definition.getName() + "]");
        }
        SuspendedResourcesHolder suspendedResources = suspend(transaction);
        try {
            boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
            // 設置newnewTransaction位true
            DefaultTransactionStatus status = newTransactionStatus(
                    definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
            doBegin(transaction, definition);
            prepareSynchronization(status, definition);
            return status;
        }
        catch (RuntimeException beginEx) {
            resumeAfterBeginException(transaction, suspendedResources, beginEx);
            throw beginEx;
        }
        catch (Error beginErr) {
            resumeAfterBeginException(transaction, suspendedResources, beginErr);
            throw beginErr;
        }
    }
    
    // 這一段主要是分析嵌套事務的,咱們基本不用。
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
        //若是不容許事務嵌套,則拋出異常
        if (!isNestedTransactionAllowed()) {
            throw new NestedTransactionNotSupportedException(
                    "Transaction manager does not allow nested transactions by default - " +
                    "specify 'nestedTransactionAllowed' property with value 'true'");
        }
        if (debugEnabled) {
            logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
        }
        
        // 若是容許使用savepoint保存點保存嵌套事務,爲當前事務建立一個保存點
        if (useSavepointForNestedTransaction()) {
            // Create savepoint within existing Spring-managed transaction,
            // through the SavepointManager API implemented by TransactionStatus.
            // Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
            DefaultTransactionStatus status =
                    prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
            status.createAndHoldSavepoint();
            return status;
        }
        else {
            // Nested transaction through nested begin and commit/rollback calls.
            // Usually only for JTA: Spring synchronization might get activated here
            // in case of a pre-existing JTA transaction.
            boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
            DefaultTransactionStatus status = newTransactionStatus(
                    definition, transaction, true, newSynchronization, debugEnabled, null);
            doBegin(transaction, definition);
            prepareSynchronization(status, definition);
            return status;
        }
    }
    
    // Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
    if (debugEnabled) {
        logger.debug("Participating in existing transaction");
    }
    
    // 下面這個if主要是判斷隔離級別和數據庫默認隔離級別是否一致,若是不一致,判斷當前的隔離級別和definition的隔離級別
    // 是否一致,不一致就拋異常。這段代碼主要是隔離級別的參數校驗。
    if (isValidateExistingTransaction()) {
        if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
            Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
            if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
                Constants isoConstants = DefaultTransactionDefinition.constants;
                throw new IllegalTransactionStateException("Participating transaction with definition [" +
                        definition + "] specifies isolation level which is incompatible with existing transaction: " +
                        (currentIsolationLevel != null ?
                                isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
                                "(unknown)"));
            }
        }
        if (!definition.isReadOnly()) {
            if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
                throw new IllegalTransactionStateException("Participating transaction with definition [" +
                        definition + "] is not marked as read-only but existing transaction is");
            }
        }
    }
    boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
    
    // 剩下的就是PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED,從這裏能夠看出newTransaction爲false
    return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}
複製代碼

小結: 這個方法主要是根據事務傳播屬性,判斷是否要從新建立事務,仍是使用原來的事務。若是是默認傳播行爲,仍是使用原來的事務,此時newTranaction值爲false,在這裏肯定了事務C的newTranaction值爲false。 若是是傳播行爲REQUIRES_NEW,則會新建事務,newTranaction即爲true。

newTranaction如何影響事務C提交

在事務事務C拋出異常後,事務C是否進行會回滾呢,可能咱們感官上覺得出現了異常,應該會回滾的,可是事實可能並不是咱們想象得那般。分了分析問題2,咱們須要再次貼上事務執行的部分源代碼,方便理解。 TransactionAspectSupport#invokeWithinTransaction

protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation)
        throws Throwable {
    // 略......
    if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
        TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
        Object retVal = null;
        try {
            retVal = invocation.proceedWithInvocation();
        }
        catch (Throwable ex) {
            //在調用攔截器攔過程當中出現異常,則根據事務配置進行提交或回滾處理  
            completeTransactionAfterThrowing(txInfo, ex);
            throw ex;
        }
        finally {
            //清除與當前線程綁定的事務信息  
            cleanupTransactionInfo(txInfo);
        }
        
        // 這個方法用來提交事務
        commitTransactionAfterReturning(txInfo);
        return retVal;
    }
    else {
        // It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in. ...................... 略去 } } 複製代碼

當invocation.proceedWithInvocation()方法報錯,即updateCheckName拋異常時,進入completeTransactionAfterThrowing方法,完成事務操做。 TransactionAspectSupport#completeTransactionAfterThrowing

protected void completeTransactionAfterThrowing(TransactionInfo txInfo, Throwable ex) {
    if (txInfo != null && txInfo.hasTransaction()) {
        if (logger.isTraceEnabled()) {
            logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() +
                    "] after exception: " + ex);
        }
        
        // 知足配置的異常回滾策略,會進入回滾方法rollback
        if (txInfo.transactionAttribute.rollbackOn(ex)) {
            try {
                txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
            }
            catch (TransactionSystemException ex2) {
                logger.error("Application exception overridden by rollback exception", ex);
                ex2.initApplicationException(ex);
                throw ex2;
            }
            catch (RuntimeException ex2) {
                logger.error("Application exception overridden by rollback exception", ex);
                throw ex2;
            }
            catch (Error err) {
                logger.error("Application exception overridden by rollback error", ex);
                throw err;
            }
        }
        else {
            // 提交事務
            try {
                txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
            }
            catch (TransactionSystemException ex2) {
                logger.error("Application exception overridden by commit exception", ex);
                ex2.initApplicationException(ex);
                throw ex2;
            }
            catch (RuntimeException ex2) {
                logger.error("Application exception overridden by commit exception", ex);
                throw ex2;
            }
            catch (Error err) {
                logger.error("Application exception overridden by commit error", ex);
                throw err;
            }
        }
    }
}
複製代碼

分析: 從completeTransactionAfterThrowing方法中能夠看出,並非全部的異常都會使事務在出現異常時回滾。對於不支持回滾的異常,Spring依然會提交事務。 AbstractPlatformTransactionManager#processRollback

private void processRollback(DefaultTransactionStatus status) {
    try {
        try {
            // 觸發完成前的回調操做
            triggerBeforeCompletion(status);
            //嵌套事務回滾處理
            if (status.hasSavepoint()) {
                if (status.isDebug()) {
                    logger.debug("Rolling back transaction to savepoint");
                }
                //回滾掛起在保存點的事務
                status.rollbackToHeldSavepoint();
            }
            
            // 當前事務中新事務的回滾操做,就是根據newTransaction標誌位進行判斷的
            // 因此在這裏事務C方法是默認傳播行爲時不會進行回滾操做,當傳播行爲爲required_new
            // 時,會進入doRollback方法回滾以前的操做。
            else if (status.isNewTransaction()) {
                if (status.isDebug()) {
                    logger.debug("Initiating transaction rollback");
                }
                //回滾處理,由具體的事務處理器實現
                doRollback(status);
            }
            
            // 這個方法沒明白到底起什麼做用,就不分析了。
            else if (status.hasTransaction()) {
                if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
                    if (status.isDebug()) {
                        logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
                    }
                    doSetRollbackOnly(status);
                }
                else {
                    if (status.isDebug()) {
                        logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
                    }
                }
            }
            else {
                logger.debug("Should roll back transaction but cannot - no transaction available");
            }
        }
        catch (RuntimeException ex) {
            triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
            throw ex;
        }
        catch (Error err) {
            triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
            throw err;
        }
        triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
    }
    finally {
        cleanupAfterCompletion(status);
    }
}
複製代碼

分析: 當執行processRollback方法時,會判斷事務C是不是新事務,有前面當分析咱們已經得知newTransaction爲false。所以,不會執行doRollback,也就是說事務C不會執行回滾操做!

此時,回答了問題1:事務C異常,會回滾C報錯前的更新嗎? 可是由引來了一個問題,事務C的數據是什麼時候持久化的呢?接下來,咱們分析下,在默認隔離級別下,事務C數據在什麼時候進行持久化的。

事務C持久化分析

當事務C拋出異常後,意味着事務A執行執行triggerAfterCommit失敗,進入finally流程,執行cleanupAfterCompletion(status)方法,也就是在這方法中事務C的數據落地了。 DataSourceTransactionManager#cleanupAfterCompletion

private void cleanupAfterCompletion(DefaultTransactionStatus status) {
    status.setCompleted();
    if (status.isNewSynchronization()) {
    	TransactionSynchronizationManager.clear();
    }
    // 此時回到事務A的執行流程中,isNewTransaction爲true
    if (status.isNewTransaction()) {
        // 事務結束後的清理工做,接下來會詳細分析
    	doCleanupAfterCompletion(status.getTransaction());
    }
    if (status.getSuspendedResources() != null) {
    	if (status.isDebug()) {
    		logger.debug("Resuming suspended transaction after completion of inner transaction");
    	}
    	resume(status.getTransaction(), (SuspendedResourcesHolder) status.getSuspendedResources());
    }
}
複製代碼

DataSourceTransactionManager#doCleanupAfterCompletion

@Override
protected void doCleanupAfterCompletion(Object transaction) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;

    // Remove the connection holder from the thread, if exposed.
    if (txObject.isNewConnectionHolder()) {
        //從TransactionSynchronizationManager中解綁相應的connectionHolder
        TransactionSynchronizationManager.unbindResource(this.dataSource);
    }

    //還原Connection
    Connection con = txObject.getConnectionHolder().getConnection();
    try {
        if (txObject.isMustRestoreAutoCommit()) {
            // 當這段代碼執行後,數據落地了。
            con.setAutoCommit(true);
        }
        DataSourceUtils.resetConnectionAfterTransaction(con, txObject.getPreviousIsolationLevel());
    }
    catch (Throwable ex) {
        logger.debug("Could not reset JDBC Connection after transaction", ex);
    }

    if (txObject.isNewConnectionHolder()) {
        if (logger.isDebugEnabled()) {
            logger.debug("Releasing JDBC Connection [" + con + "] after transaction");
        }
        //若是是newConnection將這個連接關閉,若是是鏈接池將還給鏈接池
        DataSourceUtils.releaseConnection(con, this.dataSource);
    }
    // 設置transactionActive爲false
    txObject.getConnectionHolder().clear();
}
複製代碼

分析: 將將鏈接autoCommit設置爲true時,事務C的數據就落地了。

傳播行爲對事務C回滾的影響分析

  事務C的傳播行爲爲PROPAGATION_REQUIRES_NEW時,newTransaction值爲true,在拋出異常後執行doRollback方法,回滾數據。 此時回答了問題2,也就是說當傳播行爲不一樣時,事務C回滾策略不同。

AbstractPlatformTransactionManager#processRollback

private void processRollback(DefaultTransactionStatus status) {
    try {
        try {
            // 觸發完成前的回調操做
            triggerBeforeCompletion(status);
            //嵌套事務回滾處理
            if (status.hasSavepoint()) {
                if (status.isDebug()) {
                    logger.debug("Rolling back transaction to savepoint");
                }
                //回滾掛起在保存點的事務
                status.rollbackToHeldSavepoint();
            }
            
            // 當前事務中新事務的回滾操做,就是根據newTransaction標誌位進行判斷的
            // 因此在這裏事務C方法是默認傳播行爲時不會進行回滾操做,當傳播行爲爲required_new
            // 時,會進入doRollback方法回滾以前的操做。
            else if (status.isNewTransaction()) {
                if (status.isDebug()) {
                    logger.debug("Initiating transaction rollback");
                }
                //回滾處理,由具體的事務處理器實現
                doRollback(status);
            }
         ........
    }
    finally {
        cleanupAfterCompletion(status);
    }
}
複製代碼

DataSourceTransactionManager#doRollback

@Override
protected void doRollback(DefaultTransactionStatus status) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
    Connection con = txObject.getConnectionHolder().getConnection();
    if (status.isDebug()) {
    	logger.debug("Rolling back JDBC transaction on Connection [" + con + "]");
    }
    try {
    	con.rollback();
    }
    catch (SQLException ex) {
    	throw new TransactionSystemException("Could not roll back JDBC transaction", ex);
    }
}
複製代碼

觸類旁通

若是事務C沒有拋出異常,事務C的提交會在哪裏呢?

   若是是默認傳播行爲,事務C沒有異常,它的數據仍然是在事務A提交事務後才進行落地的,也就是doCleanupAfterCompletion方法中設置autoCommit=true時。可是當傳播行爲爲REQUIRES_NEW時,就不同了,事務C的提交,在本身的事務中進行。

總結

1.Spring事務同步回調afterCommit方法中的事務方法C拋異常,不影響主方法A正常提交;

2.不一樣傳播行爲對事務C回滾影響不一樣,默認傳播行爲時,C事務方法不回滾。當傳播行爲REQUIRES_NEW時,C事務方法在異常是,會執行回滾策略,這主要與DefaultTransactionStatus中的newTransaction值有關;

3.事務A方法執行DataSourceTransactionManager#doCleanupAfterCompletionautoComumit設置爲true時,C事務中的數據就會被持久化。固然若是是隔離級別發生變化,落地時機也會發生改變,這主要與事務C的傳播行爲有關,本質上仍是與newTransaction有關。

4.若是要保證afterCommit中的事務方法有效,須要將傳播行爲設置爲Required_New,這一點Spring也有過建議。

相關文章
相關標籤/搜索