Spring的事務實現原理

 

主流程html

Spring的事務採用AOP的方式實現。數據庫

@Transactional 註解的屬性信息

name                當在配置文件中有多個 TransactionManager , 能夠用該屬性指定選擇哪一個事務管理器ide

propagation      事務的傳播行爲,默認值爲 REQUIRED。ui

isolation            事務的隔離度,默認值採用 DEFAULT。this

timeout              事務的超時時間,默認值爲-1。若是超過該時間限制但事務尚未完成,則自動回滾事務。spa

read-only           指定事務是否爲只讀事務,默認值爲 false;爲了忽略那些不須要事務的方法,好比讀取數據,能夠設置 read-only 爲 true。.net

rollback-for         用於指定可以觸發事務回滾的異常類型,若是有多個異常類型須要指定,各種型之間能夠經過逗號分隔。線程

no-rollback- for    拋出 no-rollback-for 指定的異常類型,不回滾事務。debug

除此之外,@Transactional 註解也能夠添加到類級別上。當把@Transactional 註解放在類級別時,表示全部該類的公共方法都配置相同的事務屬性信息。見清單 2,EmployeeService 的全部方法都支持事務而且是隻讀。當類級別配置了@Transactional,方法級別也配置了@Transactional,應用程序會以方法級別的事務屬性信息來管理事務,換言之,方法級別的事務屬性信息會覆蓋類級別的相關配置信息。3d

 

咱們從TransactionAspectSupport這個類開始f分析。

  1. 獲取事務的屬性(@Transactional註解中的配置)
  2. 加載配置中的TransactionManager.
  3. 獲取收集事務信息TransactionInfo
  4. 執行目標方法
  5. 出現異常,嘗試處理。
  6. 清理事務相關信息
  7. 提交事務
//1. 獲取@Transactional註解的相關參數
TransactionAttributeSource tas = getTransactionAttributeSource();
// 2. 獲取事務管理器
    final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
    final PlatformTransactionManager tm = determineTransactionManager(txAttr);
    final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);

    if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
        // Standard transaction demarcation with getTransaction and commit/rollback calls.
        // 3. 獲取TransactionInfo,包含了tm和TransactionStatus
        TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
        Object retVal = null;
        try {
            // This is an around advice: Invoke the next interceptor in the chain.
            // This will normally result in a target object being invoked.
            // 4.執行目標方法
            retVal = invocation.proceedWithInvocation();
        }
        catch (Throwable ex) {
           //5.回滾
            // target invocation exception
            completeTransactionAfterThrowing(txInfo, ex);
            throw ex;
        }
        finally {
          // 6. 清理當前線程的事務相關信息
            cleanupTransactionInfo(txInfo);
        }
        // 提交事務
        commitTransactionAfterReturning(txInfo);
        return retVal;
    }
View Code

        在執行createTransactionIfNecessary獲取事務狀態時,就準備好了開啓事務的全部內容,主要是執行了JDBC的con.setAutoCommit(false)方法。同時處理了不少和數據庫鏈接相關的ThreadLocal變量。

 

關鍵對象介紹
PlatformTransactionManager
TransactionManager是作什麼的?它保存着當前的數據源鏈接,對外提供對該數據源的事務提交回滾操做接口,同時實現了事務相關操做的方法。一個數據源DataSource須要一個事務管理器。

屬性:DataSource

內部核心方法:
public commit 提交事務
public rollback 回滾事務
public getTransaction 得到當前事務狀態

protected doSuspend 掛起事務

protected doBegin 開始事務,主要是執行了JDBC的con.setAutoCommit(false)方法。同時處理了不少和數據庫鏈接相關的ThreadLocal變量。

protected doCommit 提交事務
protected doRollback 回滾事務
protected doGetTransaction() 獲取事務信息
final getTransaction 獲取事務狀態

獲取對應的TransactionManager

@Nullable
    protected PlatformTransactionManager determineTransactionManager(@Nullable TransactionAttribute txAttr) {
        // Do not attempt to lookup tx manager if no tx attributes are set
        if (txAttr == null || this.beanFactory == null) {
            return getTransactionManager();
        }

        String qualifier = txAttr.getQualifier();
        //若是指定了Bean則取指定的PlatformTransactionManager類型的Bean
        if (StringUtils.hasText(qualifier)) {
            return determineQualifiedTransactionManager(this.beanFactory, qualifier);
        }
        //若是指定了Bean的名稱,則根據bean名稱獲取對應的bean
        else if (StringUtils.hasText(this.transactionManagerBeanName)) {
            return determineQualifiedTransactionManager(this.beanFactory, this.transactionManagerBeanName);
        }
        else {
        // 默認取一個PlatformTransactionManager類型的Bean
            PlatformTransactionManager defaultTransactionManager = getTransactionManager();
            if (defaultTransactionManager == null) {
                defaultTransactionManager = this.transactionManagerCache.get(DEFAULT_TRANSACTION_MANAGER_KEY);
                if (defaultTransactionManager == null) {
                    defaultTransactionManager = this.beanFactory.getBean(PlatformTransactionManager.class);
                    this.transactionManagerCache.putIfAbsent(
                            DEFAULT_TRANSACTION_MANAGER_KEY, defaultTransactionManager);
                }
            }
            return defaultTransactionManager;
        }
    }
View Code

能夠看到,若是沒有指定TransactionManager參數的話,會使用默認的一個實現,因此當程序中有多個數據庫時,事務執行最好是指定事務管理器。

1.核心屬性:事務狀態TransactionStatus
2.事務管理器
3.事務屬性
4.上一個事務信息oldTransactionInfo,REQUIRE_NEW傳播級別時,事務掛起後前一個事務的事務信息

 

 

當前事務狀態TransactionStatus
經過TransactionManager的getTransaction方法,獲取當前事務的狀態。
具體是在AbstractPlatformTransactionManager中實現.
TransactionStatus被用來作什麼:TransactionManager對事務進行提交或回滾時須要用到該對象
具體方法有:

 

 

做用:

判斷當前事務是不是一個新的事務,不然加入到一個已經存在的事務中。事務傳播級別REQUIRED和REQUIRE_NEW有用到。
當前事務是否攜帶保存點,嵌套事務用到。
setRollbackOnly,isRollbackOnly,當子事務回滾時,並不真正回滾事務,而是對子事務設置一個標誌位。
事務是否已經完成,已經提交或者已經回滾。

 

傳播級別
介紹
Spring事務的傳播級別描述的是多個使用了@Transactional註解的方法互相調用時,Spring對事務的處理。包涵的傳播級別有:(在TransactionDefinition接口中定義)

7種事務傳播行爲以下:默認  propagation = PROPAGATION_REQUIRED

1.PROPAGATION_REQUIRED

  若是當前沒有事務,就建立一個新事務,若是當前存在事務,就加入該事務,這是最多見的選擇,也是Spring默認的事務傳播行爲。

2.PROPAGATION_SUPPORTS

  支持當前事務,若是當前存在事務,就加入該事務,若是當前不存在事務,就以非事務執行。

若是外圍事務回滾,內部事務也要回滾。

3.PROPAGATION_MANDATORY

  支持當前事務,若是當前存在事務,就加入該事務,若是當前不存在事務,就拋出異常。

4.PROPAGATION_REQUIRES_NEW

  建立新事務,不管當前存不存在事務,都建立新事務。

5.PROPAGATION_NOT_SUPPORTED

  以非事務方式執行操做,若是當前存在事務,就把當前事務掛起。

6.PROPAGATION_NEVER

  以非事務方式執行,若是當前存在事務,則拋出異常。

7.PROPAGATION_NESTED

  若是當前存在事務,則在嵌套事務內執行。若是當前沒有事務,則按REQUIRED屬性執行。

 

原理:

咱們從兩個角度看他們的實現原理,一個是剛進入事務時,針對不一樣的傳播級別,它們的行爲有什麼區別。另外一個角度是當事務提交或者回滾時,傳播級別對事務行爲的影響。

首先在嘗試獲取事務信息時,若是當前已經存在一個事務,則會根據傳播級別作一些處理:

隔離級別對開始事務的影響(獲取TransactionStatus)

@Override
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException {
  // 從當前的transactionManager獲取DataSource對象
  // 而後以該DataSource對象爲Key,
  // 去一個ThreadLocal變量中的map中獲取該DataSource的鏈接
  // 而後設置到DataSourceTransactionObject中返回。
    Object transaction = doGetTransaction();
    // Cache debug flag to avoid repeated checks.
    boolean debugEnabled = logger.isDebugEnabled();
    if (definition == null) {
        // Use defaults if no transaction definition given.
        definition = new DefaultTransactionDefinition();
    }
// 若是當前線程已經在一個事務中了,則須要根據事務的傳播級別
//來決定如何處理並獲取事務狀態對象
    if (isExistingTransaction(transaction)) {
        // Existing transaction found -> check propagation behavior to find out how to behave.
        return handleExistingTransaction(definition, transaction, debugEnabled);
    }
    // Check definition settings for new transaction.
    if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
        throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
    }
    // No existing transaction found -> check propagation behavior to find out how to proceed.
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
        throw new IllegalTransactionStateException(
                "No existing transaction found for transaction marked with propagation 'mandatory'");
    }
    else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
            definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
            definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
        SuspendedResourcesHolder suspendedResources = suspend(null);
        if (debugEnabled) {
            logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
        }
        try {
          //若是當前不在一個事務中,則執行事務的準備操做
            boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
            // 構造事務狀態對象,注意這裏第三個參數爲true,表明是一個新事務
            DefaultTransactionStatus status = newTransactionStatus(
                    definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
            //執行begin操做,核心操做是設置隔離級別,執行   conn.setAutoCommit(false); 同時將數據鏈接綁定到當前線程
            doBegin(transaction, definition);
            // 針對事務相關屬性如隔離級別,是否在事務中,設置綁定到當前線程
            prepareSynchronization(status, definition);
            return status;
        }
        catch (RuntimeException | Error ex) {
            resume(null, suspendedResources);
            throw ex;
        }
    }
    else {
        // Create "empty" transaction: no actual transaction, but potentially synchronization.
        if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
            logger.warn("Custom isolation level specified but no actual transaction initiated; " +
                    "isolation level will effectively be ignored: " + definition);
        }
        boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
        return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
    }
}
View Code

 

能夠看到,若是當前不存在事務時,建立一個新的TransactionStatus對象,不然進入到handleExistingTransaction。下面來看這個方法

1.若是是NEVER傳播級別則拋出異常
2. 若是是不支持,則掛起事務,將當前事務對象設置爲null,newTransaction設置爲false,把線程的相關Threadlocal變量改的就像當前不存在事務同樣
3. 若是是required_NEW的話,則掛起當前事務,同時建立一個新的事務,執行doBegin操做,設置newTransaction爲true
4.若是是嵌入事務,則建立一個SAVEPOINT
5.對於REQUIRED傳播級別會把newTransaction標誌位設置爲false

/**
 * Create a TransactionStatus for an existing transaction.
 */
private TransactionStatus handleExistingTransaction(
        TransactionDefinition definition, Object transaction, boolean debugEnabled)
        throws TransactionException {
    //若是是NEVER傳播級別則拋出異常
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
        throw new IllegalTransactionStateException(
                "Existing transaction found for transaction marked with propagation 'never'");
    }
    // 若是是不支持,則掛起事務
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
        if (debugEnabled) {
            logger.debug("Suspending current transaction");
        }
        Object suspendedResources = suspend(transaction);
        boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
        //掛起事務同時將當前事務設置爲null,newTransaction設置爲false,把線程的相關Threadlocal變量改的就像當前不存在事務同樣
        return prepareTransactionStatus(
                definition, null, false, newSynchronization, debugEnabled, suspendedResources);
    }
    //若是是required_NEW的話,則掛起當前事務,同時建立一個新的事務,執行doBegin操做
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
        if (debugEnabled) {
            logger.debug("Suspending current transaction, creating new transaction with name [" +
                    definition.getName() + "]");
        }
        SuspendedResourcesHolder suspendedResources = suspend(transaction);
        try {
            boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
            DefaultTransactionStatus status = newTransactionStatus(
                    definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
            doBegin(transaction, definition);
            prepareSynchronization(status, definition);
            return status;
        }
        catch (RuntimeException | Error beginEx) {
            resumeAfterBeginException(transaction, suspendedResources, beginEx);
            throw beginEx;
        }
    }
    // 若是是嵌入事務,則建立一個SAVEPOINT
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
        if (!isNestedTransactionAllowed()) {
            throw new NestedTransactionNotSupportedException(
                    "Transaction manager does not allow nested transactions by default - " +
                    "specify 'nestedTransactionAllowed' property with value 'true'");
        }
        if (debugEnabled) {
            logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
        }
        if (useSavepointForNestedTransaction()) {
            // Create savepoint within existing Spring-managed transaction,
            // through the SavepointManager API implemented by TransactionStatus.
            // Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
            DefaultTransactionStatus status =
                    prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
            status.createAndHoldSavepoint();
            return status;
        }
        else {
            // Nested transaction through nested begin and commit/rollback calls.
            // Usually only for JTA: Spring synchronization might get activated here
            // in case of a pre-existing JTA transaction.
            boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
            DefaultTransactionStatus status = newTransactionStatus(
                    definition, transaction, true, newSynchronization, debugEnabled, null);
            doBegin(transaction, definition);
            prepareSynchronization(status, definition);
            return status;
        }
    }

    // Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
    if (debugEnabled) {
        logger.debug("Participating in existing transaction");
    }
//這裏判斷是否須要對已經存在的事務進行校驗,這個能夠經過AbstractPlatformTransactionManager.setValidateExistingTransaction(boolean)來設置,設置爲true後須要校驗當前事務的隔離級別和已經存在的事務的隔離級別是否一致 
    if (isValidateExistingTransaction()) {
        if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
            Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
            if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
                Constants isoConstants = DefaultTransactionDefinition.constants;
                throw new IllegalTransactionStateException("Participating transaction with definition [" +
                        definition + "] specifies isolation level which is incompatible with existing transaction: " +
                        (currentIsolationLevel != null ?
                                isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
                                "(unknown)"));
            }
        }
        if (!definition.isReadOnly()) {
            if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
                throw new IllegalTransactionStateException("Participating transaction with definition [" +
                        definition + "] is not marked as read-only but existing transaction is");
            }
        }
    }
    // 若是不設置是否校驗已經存在的事務,則對於REQUIRED傳播級別會走到這裏來,這裏把newTransaction標誌位設置爲false,
    // 這裏用的definition是當前事務的相關屬性,因此隔離級別等依然是當前事務的(子事務),而不是已經存在的事務的隔離級別(父事務)
    boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
    return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}
View Code

 

隔離級別對回滾事務的影響

  1. 若是newTransaction設置爲ture,則真正執行回滾。
  2. 若是有保存點,則回滾到保存點。
  3. 不然不會真正回滾,而是設置回滾標誌位。
private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
        try {
            boolean unexpectedRollback = unexpected;
            try {
                triggerBeforeCompletion(status);
                // 若是有保存點
                if (status.hasSavepoint()) {
                    if (status.isDebug()) {
                        logger.debug("Rolling back transaction to savepoint");
                    }
                    status.rollbackToHeldSavepoint();
                }
                // 若是是新的事務,當傳播級別爲RUQUIRED_NEW時會走到這裏來
                else if (status.isNewTransaction()) {
                    if (status.isDebug()) {
                        logger.debug("Initiating transaction rollback");
                    }
                    doRollback(status);
                }
                else {
                    // 加入到事務中,設置回滾狀態,適用於REQUIRED傳播級別
                    // 並不會真的回滾,而是設置回滾標誌位
                    // Participating in larger transaction
                    if (status.hasTransaction()) {
                        if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
                            if (status.isDebug()) {
                                logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
                            }
                            doSetRollbackOnly(status);
                        }
                        else {
                            if (status.isDebug()) {
                                logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
                            }
                        }
                    }
                    else {
                        logger.debug("Should roll back transaction but cannot - no transaction available");
                    }
                    // Unexpected rollback only matters here if we're asked to fail early
                    if (!isFailEarlyOnGlobalRollbackOnly()) {
                        unexpectedRollback = false;
                    }
                }
            }
            catch (RuntimeException | Error ex) {
                triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
                throw ex;
            }

            triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);

            // Raise UnexpectedRollbackException if we had a global rollback-only marker
            if (unexpectedRollback) {
                throw new UnexpectedRollbackException(
                        "Transaction rolled back because it has been marked as rollback-only");
            }
        }
        finally {
            cleanupAfterCompletion(status);
        }
    }
View Code

 

隔離級別對提交事務的影響
就算事務方法沒有拋出異常,走到了commit方法中,可是依然有可能回滾事務。
對於REQUIRED傳播級別,即便父事務中沒有拋出異常,可是子事務中已經設置了回滾標誌,那麼父事務依然會回滾
只有newTransaction標誌位爲true的事務纔會真正執行commit操做。

@Override
public final void commit(TransactionStatus status) throws TransactionException {
    if (status.isCompleted()) {
        throw new IllegalTransactionStateException(
                "Transaction is already completed - do not call commit or rollback more than once per transaction");
    }

    DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
    if (defStatus.isLocalRollbackOnly()) {
        if (defStatus.isDebug()) {
            logger.debug("Transactional code has requested rollback");
        }
        processRollback(defStatus, false);
        return;
    }
 // 對於REQUIRED傳播級別,即便父事務中沒有拋出異常,可是子事務中已經設置了回滾標誌,那麼父事務依然會回滾
    if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
        if (defStatus.isDebug()) {
            logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
        }
        processRollback(defStatus, true);
        return;
    }

    processCommit(defStatus);
}

 

 

爲何個人事務不生效
1.若是不是Innodb存儲引擎,MyISAM不支持事務。
2.沒有指定rollbackFor參數。
3. 沒有指定transactionManager參數,默認的transactionManager並非我指望的,以及一個事務中涉及到了多個數據庫。
4. 若是AOP使用了JDK動態代理,對象內部方法互相調用不會被Spring的AOP攔截,@Transactional註解無效。
5. 若是AOP使用了CGLIB代理,事務方法或者類不是public,沒法被外部包訪問到,或者是final沒法繼承,@transactional註解無效。

 

原文連接:https://blog.csdn.net/weixin_44366439/article/details/89030080

參考: https://www.cnblogs.com/haha12/p/11855001.html

相關文章
相關標籤/搜索