從AnnotationTransactionAspect開始rushSpring事務

0. Spring 事務 with LTW

0.1. Spring 事務 With LTW的緣由:

Pure Proxy-base mode有缺陷,其失效緣由分析及使用方法及運行機制(LoadTimeWeaverBeanDefinitionParserAspectJWeavingEnabler )已經寫過了,再也不多寫;java

0.2 JDBC事務是Connection級別的東西:

基本操做模板: setAutoCommit(false)-> setTransactionIsolation(isolation_level)->statement.excute()... —>commit()-> onException conn.rollBack() web

0.3 Spring事務基於JDBC事務

  1. Spring 須要保證當前線程內全部的事務方法:獲取到的Connection對象是同一個;
  2. 在最外層的事務開始以前:根據事務的定義,設置connectionC.autoCommit(false),setTransactionIsolation 等等;
  3. 在事務方法之間作好savepoint並根據嵌入的事務定義作savepoint;
  4. 在最外層事務以後作好rollback及commit;

0.4 一個事務失效的代碼樣例

說明:spring

  1. Spring並未攔截對SqlConnection的獲取;
  2. 當你須要Spring事務支持的時候須要從TransactionSynchronizationManager獲取各類線程級別資源或者把資源bind到這個類上,用於方法調用間的參數非侵入性傳遞(尤爲是聲明式事務時)
@Transactional(rollbackFor = Exception.class)
@Override
public void test() throws Exception {
   /**
    *  從數據源獲取connection ,不會回滾
    */
//  Connection connection = masterDataSource.getConnection();
//  int i = connection.createStatement().executeUpdate("insert into test (name)values ('test2')");
//  if (i ==1){
//            throw new Exception("test");
//  }
   /**
    *  從同步資源管理器獲取connection,會回滾
    */
   ConnectionHolder connectionHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(masterDataSource);
   int i = connectionHolder.getConnection().createStatement().executeUpdate("insert into test (name)values ('test2')");
   logger.info("after insert excute");
   if (i ==1){
      throw new Exception("test");
   }

   /**
    * mybatis dao 模板類,會回滾,也是從TransactionSynchronizationManager bind並獲取的資源SQL session Factory,能夠看下SqlSessionUtils.getSqlSession(SqlSessionFactory sessionFactory, ExecutorType executorType, PersistenceExceptionTranslator exceptionTranslator) 這段代碼的實現,
    */
//   int i = testDao.deleteByPrimaryKey(3);
//   if (i == 1){
//       throw new Exception("test");
//   }
}

1. Spring是怎麼攔截全部的事務方法的(以LTW AOP,@Transactional註解爲例子)

1.1 aop.xml 配置文件

配置文件在spring-aspects.jar/META-INF/aop.xml,以下:sql

<?xml version="1.0"?>
<!--AspectJ load-time weaving config file to install common Spring aspects.-->
<aspectj>
    <!--<weaver options="-showWeaveInfo"/>-->
    <aspects>
        ...
        <aspect name="org.springframework.transaction.aspectj.AnnotationTransactionAspect"/>
        ...
    </aspects>
</aspectj>

1.2 切點:pointcut(AnnotationTransactionAspect)

/**匹配全部的全部被@Transactional註解標註的類及其子類的任何public method */
private pointcut executionOfAnyPublicMethodInAtTransactionalType() :execution(public * ((@Transactional *)+).*(..)) && within(@Transactional *);

/**
 * 匹配全部被@Transactional註解標註的方法
 */
private pointcut executionOfTransactionalMethod() :execution(@Transactional * *(..));

/**全部被Spring事務管理的其切面*/
protected pointcut transactionalMethodExecution(Object txObject) :(executionOfAnyPublicMethodInAtTransactionalType() || executionOfTransactionalMethod() ) && this(txObject);

1.2 加強:around advice(AbstractTransactionAspect)

@SuppressAjWarnings("adviceDidNotMatch")
Object around(final Object txObject): transactionalMethodExecution(txObject) {
    MethodSignature methodSignature = (MethodSignature) thisJoinPoint.getSignature();
    try {
        return invokeWithinTransaction(methodSignature.getMethod(), txObject.getClass(), new InvocationCallback() {
            public Object proceedWithInvocation() throws Throwable {
                return proceed(txObject); //繼續執行被加強的方法,也就是咱們本身寫的業務邏輯
            }
        });
    }catch (RuntimeException ex) {
        throw ex;
    }catch (Error err) {
        throw err;
    }catch (Throwable thr) {
        Rethrower.rethrow(thr);
        throw new IllegalStateException("Should never get here", thr);
    }
}

結合上面這些代碼,能夠看到Spring將全部的事務方法執行所有攔截並實際轉到了TransactionAspectSupport.invokeWithinTransaction方法內;緩存

2. 事務的大致流程(TransactionAspectSupport)

2.1 Field

// 默認的transactionManager 緩存key
private static final Object DEFAULT_TRANSACTION_MANAGER_KEY = new Object();
// Named線程本地資源(resource not Inheritable from parent thread) 當前線程執行的事務
private static final ThreadLocal<TransactionInfo> transactionInfoHolder = new NamedThreadLocal<TransactionInfo>("Current aspect-driven transaction");
// 默認就是 transactionManager
private String transactionManagerBeanName;
// bean
private PlatformTransactionManager transactionManager;
// 事務管理的屬性 就是個map<methodName,TransactionAttribute>:methodName就是一個事務方法的描述符號=全類名+方法名;TransactionAttribute 就是描述一個Spring事務的基本屬性 隔離類型ISOLATION,傳播類型PROPAGATION,什麼狀況要回滾,使用那個事務管理器etc;
private TransactionAttributeSource transactionAttributeSource;
// SpringBean Factory
private BeanFactory beanFactory;
// 事務管理器緩存(若是有多個的話)
private final ConcurrentMap<Object, PlatformTransactionManager> transactionManagerCache =new ConcurrentReferenceHashMap<Object, PlatformTransactionManager>(4);

2.2 總體事務流程:invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation)

protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation)throws Throwable {
    //獲取當前方法事務方法的定義,參看默認的 AnnotationTransactionAttributeSource類就能夠;
    final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass);
    //根據方法的事務定義中指定的事務管理器name從beanFactory中獲取事務管理器
    final PlatformTransactionManager tm = determineTransactionManager(txAttr);
    // 生成事務的Id
    final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
    //若是不是事務方法,事務管理器也不是servletServer提供的話,不論DataSource仍是Hibernate都不是這一類
    if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
        //建立事務
        TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
        Object retVal = null;
        try {
            //執行事務方法
            retVal = invocation.proceedWithInvocation();
        }catch (Throwable ex) {
            // 拋出異常以後決定是否要回滾
            completeTransactionAfterThrowing(txInfo, ex);
            throw ex;
        }finally {
            // 最終清理事務資源等等
            cleanupTransactionInfo(txInfo);
        }
        // 事務提交
        commitTransactionAfterReturning(txInfo);
        return retVal;
    }else {
    // 這裏能夠省略了;通常在使用應用服務器(好比weblogic,WebSphere)提供的事務管理時纔會進入這個分支,在非土豪單位不多能使用這些服務器,並且既然用Spring了最好聽從Spring輕量級這個約定,不要把本身的代碼和應用環境綁定在一塊
    ....
    }
}

3. 開始事務須要作的準備工做

3.1 建立事務流程:`createTransactionIfNecessary(tm, txAttr, joinpointIdentification)

protected TransactionInfo createTransactionIfNecessary(PlatformTransactionManager tm, TransactionAttribute txAttr, final String joinpointIdentification) {
    if (txAttr != null && txAttr.getName() == null) {
        txAttr = new DelegatingTransactionAttribute(txAttr) {
            @Override
            public String getName() {
                return joinpointIdentification;
            }
        };
    }
    TransactionStatus status = null;
    if (txAttr != null) {
        if (tm != null) {
            status = tm.getTransaction(txAttr);
        }else {
            if (logger.isDebugEnabled()) {
                logger.debug("Skipping transactional joinpoint [" + joinpointIdentification + "] because no transaction manager has been configured");
            }
        }
    }
    return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}

3.1.1 由事務管理建立事務:AbstractPlatformTransactionManager.getTransaction(txAttr)

public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
    // 這個須要細看一下,以DataSourceTransactionManager爲例子;
    Object transaction = doGetTransaction();
    boolean debugEnabled = logger.isDebugEnabled();
    if (definition == null) {// 若是不是事務方法,生成一個默認的事務定義
        definition = new DefaultTransactionDefinition();
    }
    if (isExistingTransaction(transaction)) { // 當存在事務時,按照級別和傳播等級的定義處理邏輯
        return handleExistingTransaction(definition, transaction, debugEnabled);
    }
    if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) { // 事務超時時間,並沒什麼用,默認不設置超時時間的
        throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
    }
    // 若是當前線程不存在事務,根據TransactionAttr的開始執行業務邏輯
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {//只支持已存在的事務,不存在的話拋異常
        throw new IllegalTransactionStateException("No existing transaction found for transaction marked with propagation 'mandatory'");
    }else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
        //對於PROPAGATION_REQUIRED,PROPAGATION_REQUIRES_NEW,PROPAGATION_NESTED,那麼建立或者加入當前事務
        SuspendedResourcesHolder suspendedResources = suspend(null);
        if (debugEnabled) {
            logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
        }
        try {
            boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
            DefaultTransactionStatus status = newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
            //完成 DataBaseConnection及事務信息與線程的綁定
            doBegin(transaction, definition);
            prepareSynchronization(status, definition);
            return status;
        }catch (RuntimeException ex) {
            resume(null, suspendedResources);
            throw ex;
        }catch (Error err) {
            resume(null, suspendedResources);
            throw err;
        }
    }else {
        // Create "empty" transaction: no actual transaction, but potentially synchronization.
        if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
            logger.warn("Custom isolation level specified but no actual transaction initiated; " +"isolation level will effectively be ignored: " + definition);
        }
        boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
        return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
    }
}
3.1.1.1 建立新的事務定義對象 DataSourceTransactionManager.doGetTransaction()
protected Object doGetTransaction() {
    DataSourceTransactionObject txObject = new DataSourceTransactionObject();//數據源型的事務屬性對象,是一個鏈表結構
    txObject.setSavepointAllowed(isNestedTransactionAllowed());//true.容許設置sql save_point
    ConnectionHolder conHolder =(ConnectionHolder) TransactionSynchronizationManager.getResource(this.dataSource);
    // 若是是新開啓動事務的話,conHolder==null,TransactionSynchronizationManager的ThreadLocal resource在TransactionManger.dobegin()時纔會賦值,而噹噹前事務方法是被其餘事務方法調用時 這個conHolder調用方方法bind的ConnectionHolder
    txObject.setConnectionHolder(conHolder, false);
    return txObject;
}
3.1.1.2 開啓新事務doBegin(transaction, definition);
DefaultTransactionStatus status = newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
//主要是綁定資源
doBegin(transaction, definition);
prepareSynchronization(status, definition);
protected void doBegin(Object transaction, TransactionDefinition definition) {
  DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
  Connection con = null;
  try {
  //若是是新事務則從datasource獲取一個connection
  if (!txObject.hasConnectionHolder() ||txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
    Connection newCon = this.dataSource.getConnection();
    if (logger.isDebugEnabled()) {
       logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
    }
    txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
   }
   txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
   //若是是嵌套事務的話,使用外部事物的connection
   con = txObject.getConnectionHolder().getConnection();
   // 對connection 設定 超時時間,是否只讀,以及事務隔離級別
   Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
   txObject.setPreviousIsolationLevel(previousIsolationLevel);
   //若是是自動提交的話,設置爲手動提交
   if (con.getAutoCommit()) {
      txObject.setMustRestoreAutoCommit(true);
      con.setAutoCommit(false);
   }
   //強制要求之制度的話,強制只讀
   prepareTransactionalConnection(con, definition);
   txObject.getConnectionHolder().setTransactionActive(true);

   int timeout = determineTimeout(definition);
   if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
      txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
    }
   // 把數據源和當前connection 綁定到線程上,ThreadLocal 系列的操做;這裏就和3.1.1.1 建立新的事務定義對象對起來了
   if (txObject.isNewConnectionHolder()) {
      TransactionSynchronizationManager.bindResource(getDataSource(), txObject.getConnectionHolder());
   }
  }catch (Throwable ex) {
  if (txObject.isNewConnectionHolder()) {
      DataSourceUtils.releaseConnection(con, this.dataSource);
      txObject.setConnectionHolder(null, false);
   }
   throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
  }
}
3.1.1.3 若是已有事務isExistingTransaction(Object transaction);

這個就簡寫吧,主要是事務傳播機制各類相應處理,是否加入老事務,savepoint的設置等等;仍是要調用dobegin方法服務器

3.2 事務信息綁定到當前線程 prepareTransactionInfo(PlatformTransactionManager tm,TransactionAttribute txAttr, String joinpointIdentification, TransactionStatus status)

TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);
//綁定到線程,這個是鏈表結構
txInfo.bindToThread();

4. 事務提交TransactionAspectSupport.commitTransactionAfterReturning(txInfo)

protected void commitTransactionAfterReturning(TransactionInfo txInfo) {
	if (txInfo != null && txInfo.hasTransaction()) {
		txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
	}
}

5. 異常產生

5.1 回滾或者提交

protected void completeTransactionAfterThrowing(TransactionInfo txInfo, Throwable ex) {
   if (txInfo != null && txInfo.hasTransaction()) {
      if (txInfo.transactionAttribute.rollbackOn(ex)) {
         try {
         // 事務回滾
            txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
         } catch (TransactionSystemException ex2) {
            logger.error("Application exception overridden by rollback exception", ex);
            ex2.initApplicationException(ex);
            throw ex2;
         }catch (RuntimeException ex2) {
            logger.error("Application exception overridden by rollback exception", ex);
            throw ex2;
         }catch (Error err) {
            logger.error("Application exception overridden by rollback error", ex);
            throw err;
         }
      }else {
         try {
            // 非回滾異常,commit
            txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
         } catch (TransactionSystemException ex2) {
            logger.error("Application exception overridden by commit exception", ex);
            ex2.initApplicationException(ex);
            throw ex2;
         }catch (RuntimeException ex2) {
            logger.error("Application exception overridden by commit exception", ex);
            throw ex2;
         }catch (Error err) {
            logger.error("Application exception overridden by commit error", ex);
            throw err;
         }
      }
   }
}

5.2 回滾或者提交形成異常 清理線程的事務資源資源cleanupTransactionInfo(txInfo)

protected void cleanupTransactionInfo(TransactionInfo txInfo) {
    if (txInfo != null) {
        txInfo.restoreThreadLocalStatus();
    }
}
相關文章
相關標籤/搜索