Pure Proxy-base mode有缺陷,其失效緣由分析及使用方法及運行機制(LoadTimeWeaverBeanDefinitionParser
和 AspectJWeavingEnabler
)已經寫過了,再也不多寫;java
基本操做模板: setAutoCommit(false)-> setTransactionIsolation(isolation_level)->statement.excute()... —>commit()-> onException conn.rollBack()
;web
說明:spring
@Transactional(rollbackFor = Exception.class) @Override public void test() throws Exception { /** * 從數據源獲取connection ,不會回滾 */ // Connection connection = masterDataSource.getConnection(); // int i = connection.createStatement().executeUpdate("insert into test (name)values ('test2')"); // if (i ==1){ // throw new Exception("test"); // } /** * 從同步資源管理器獲取connection,會回滾 */ ConnectionHolder connectionHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(masterDataSource); int i = connectionHolder.getConnection().createStatement().executeUpdate("insert into test (name)values ('test2')"); logger.info("after insert excute"); if (i ==1){ throw new Exception("test"); } /** * mybatis dao 模板類,會回滾,也是從TransactionSynchronizationManager bind並獲取的資源SQL session Factory,能夠看下SqlSessionUtils.getSqlSession(SqlSessionFactory sessionFactory, ExecutorType executorType, PersistenceExceptionTranslator exceptionTranslator) 這段代碼的實現, */ // int i = testDao.deleteByPrimaryKey(3); // if (i == 1){ // throw new Exception("test"); // } }
配置文件在spring-aspects.jar/META-INF/aop.xml
,以下:sql
<?xml version="1.0"?> <!--AspectJ load-time weaving config file to install common Spring aspects.--> <aspectj> <!--<weaver options="-showWeaveInfo"/>--> <aspects> ... <aspect name="org.springframework.transaction.aspectj.AnnotationTransactionAspect"/> ... </aspects> </aspectj>
/**匹配全部的全部被@Transactional註解標註的類及其子類的任何public method */ private pointcut executionOfAnyPublicMethodInAtTransactionalType() :execution(public * ((@Transactional *)+).*(..)) && within(@Transactional *); /** * 匹配全部被@Transactional註解標註的方法 */ private pointcut executionOfTransactionalMethod() :execution(@Transactional * *(..)); /**全部被Spring事務管理的其切面*/ protected pointcut transactionalMethodExecution(Object txObject) :(executionOfAnyPublicMethodInAtTransactionalType() || executionOfTransactionalMethod() ) && this(txObject);
@SuppressAjWarnings("adviceDidNotMatch") Object around(final Object txObject): transactionalMethodExecution(txObject) { MethodSignature methodSignature = (MethodSignature) thisJoinPoint.getSignature(); try { return invokeWithinTransaction(methodSignature.getMethod(), txObject.getClass(), new InvocationCallback() { public Object proceedWithInvocation() throws Throwable { return proceed(txObject); //繼續執行被加強的方法,也就是咱們本身寫的業務邏輯 } }); }catch (RuntimeException ex) { throw ex; }catch (Error err) { throw err; }catch (Throwable thr) { Rethrower.rethrow(thr); throw new IllegalStateException("Should never get here", thr); } }
結合上面這些代碼,能夠看到Spring將全部的事務方法執行所有攔截並實際轉到了TransactionAspectSupport.invokeWithinTransaction
方法內;緩存
// 默認的transactionManager 緩存key private static final Object DEFAULT_TRANSACTION_MANAGER_KEY = new Object(); // Named線程本地資源(resource not Inheritable from parent thread) 當前線程執行的事務 private static final ThreadLocal<TransactionInfo> transactionInfoHolder = new NamedThreadLocal<TransactionInfo>("Current aspect-driven transaction"); // 默認就是 transactionManager private String transactionManagerBeanName; // bean private PlatformTransactionManager transactionManager; // 事務管理的屬性 就是個map<methodName,TransactionAttribute>:methodName就是一個事務方法的描述符號=全類名+方法名;TransactionAttribute 就是描述一個Spring事務的基本屬性 隔離類型ISOLATION,傳播類型PROPAGATION,什麼狀況要回滾,使用那個事務管理器etc; private TransactionAttributeSource transactionAttributeSource; // SpringBean Factory private BeanFactory beanFactory; // 事務管理器緩存(若是有多個的話) private final ConcurrentMap<Object, PlatformTransactionManager> transactionManagerCache =new ConcurrentReferenceHashMap<Object, PlatformTransactionManager>(4);
invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation)
protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation)throws Throwable { //獲取當前方法事務方法的定義,參看默認的 AnnotationTransactionAttributeSource類就能夠; final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass); //根據方法的事務定義中指定的事務管理器name從beanFactory中獲取事務管理器 final PlatformTransactionManager tm = determineTransactionManager(txAttr); // 生成事務的Id final String joinpointIdentification = methodIdentification(method, targetClass, txAttr); //若是不是事務方法,事務管理器也不是servletServer提供的話,不論DataSource仍是Hibernate都不是這一類 if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) { //建立事務 TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification); Object retVal = null; try { //執行事務方法 retVal = invocation.proceedWithInvocation(); }catch (Throwable ex) { // 拋出異常以後決定是否要回滾 completeTransactionAfterThrowing(txInfo, ex); throw ex; }finally { // 最終清理事務資源等等 cleanupTransactionInfo(txInfo); } // 事務提交 commitTransactionAfterReturning(txInfo); return retVal; }else { // 這裏能夠省略了;通常在使用應用服務器(好比weblogic,WebSphere)提供的事務管理時纔會進入這個分支,在非土豪單位不多能使用這些服務器,並且既然用Spring了最好聽從Spring輕量級這個約定,不要把本身的代碼和應用環境綁定在一塊 .... } }
protected TransactionInfo createTransactionIfNecessary(PlatformTransactionManager tm, TransactionAttribute txAttr, final String joinpointIdentification) { if (txAttr != null && txAttr.getName() == null) { txAttr = new DelegatingTransactionAttribute(txAttr) { @Override public String getName() { return joinpointIdentification; } }; } TransactionStatus status = null; if (txAttr != null) { if (tm != null) { status = tm.getTransaction(txAttr); }else { if (logger.isDebugEnabled()) { logger.debug("Skipping transactional joinpoint [" + joinpointIdentification + "] because no transaction manager has been configured"); } } } return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status); }
AbstractPlatformTransactionManager.getTransaction(txAttr)
public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException { // 這個須要細看一下,以DataSourceTransactionManager爲例子; Object transaction = doGetTransaction(); boolean debugEnabled = logger.isDebugEnabled(); if (definition == null) {// 若是不是事務方法,生成一個默認的事務定義 definition = new DefaultTransactionDefinition(); } if (isExistingTransaction(transaction)) { // 當存在事務時,按照級別和傳播等級的定義處理邏輯 return handleExistingTransaction(definition, transaction, debugEnabled); } if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) { // 事務超時時間,並沒什麼用,默認不設置超時時間的 throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout()); } // 若是當前線程不存在事務,根據TransactionAttr的開始執行業務邏輯 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {//只支持已存在的事務,不存在的話拋異常 throw new IllegalTransactionStateException("No existing transaction found for transaction marked with propagation 'mandatory'"); }else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { //對於PROPAGATION_REQUIRED,PROPAGATION_REQUIRES_NEW,PROPAGATION_NESTED,那麼建立或者加入當前事務 SuspendedResourcesHolder suspendedResources = suspend(null); if (debugEnabled) { logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition); } try { boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); DefaultTransactionStatus status = newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, suspendedResources); //完成 DataBaseConnection及事務信息與線程的綁定 doBegin(transaction, definition); prepareSynchronization(status, definition); return status; }catch (RuntimeException ex) { resume(null, suspendedResources); throw ex; }catch (Error err) { resume(null, suspendedResources); throw err; } }else { // Create "empty" transaction: no actual transaction, but potentially synchronization. if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) { logger.warn("Custom isolation level specified but no actual transaction initiated; " +"isolation level will effectively be ignored: " + definition); } boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null); } }
DataSourceTransactionManager.doGetTransaction()
protected Object doGetTransaction() { DataSourceTransactionObject txObject = new DataSourceTransactionObject();//數據源型的事務屬性對象,是一個鏈表結構 txObject.setSavepointAllowed(isNestedTransactionAllowed());//true.容許設置sql save_point ConnectionHolder conHolder =(ConnectionHolder) TransactionSynchronizationManager.getResource(this.dataSource); // 若是是新開啓動事務的話,conHolder==null,TransactionSynchronizationManager的ThreadLocal resource在TransactionManger.dobegin()時纔會賦值,而噹噹前事務方法是被其餘事務方法調用時 這個conHolder調用方方法bind的ConnectionHolder txObject.setConnectionHolder(conHolder, false); return txObject; }
doBegin(transaction, definition)
;DefaultTransactionStatus status = newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, suspendedResources); //主要是綁定資源 doBegin(transaction, definition); prepareSynchronization(status, definition);
protected void doBegin(Object transaction, TransactionDefinition definition) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; Connection con = null; try { //若是是新事務則從datasource獲取一個connection if (!txObject.hasConnectionHolder() ||txObject.getConnectionHolder().isSynchronizedWithTransaction()) { Connection newCon = this.dataSource.getConnection(); if (logger.isDebugEnabled()) { logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction"); } txObject.setConnectionHolder(new ConnectionHolder(newCon), true); } txObject.getConnectionHolder().setSynchronizedWithTransaction(true); //若是是嵌套事務的話,使用外部事物的connection con = txObject.getConnectionHolder().getConnection(); // 對connection 設定 超時時間,是否只讀,以及事務隔離級別 Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition); txObject.setPreviousIsolationLevel(previousIsolationLevel); //若是是自動提交的話,設置爲手動提交 if (con.getAutoCommit()) { txObject.setMustRestoreAutoCommit(true); con.setAutoCommit(false); } //強制要求之制度的話,強制只讀 prepareTransactionalConnection(con, definition); txObject.getConnectionHolder().setTransactionActive(true); int timeout = determineTimeout(definition); if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) { txObject.getConnectionHolder().setTimeoutInSeconds(timeout); } // 把數據源和當前connection 綁定到線程上,ThreadLocal 系列的操做;這裏就和3.1.1.1 建立新的事務定義對象對起來了 if (txObject.isNewConnectionHolder()) { TransactionSynchronizationManager.bindResource(getDataSource(), txObject.getConnectionHolder()); } }catch (Throwable ex) { if (txObject.isNewConnectionHolder()) { DataSourceUtils.releaseConnection(con, this.dataSource); txObject.setConnectionHolder(null, false); } throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex); } }
isExistingTransaction(Object transaction)
;這個就簡寫吧,主要是事務傳播機制各類相應處理,是否加入老事務,savepoint的設置等等;仍是要調用dobegin方法服務器
prepareTransactionInfo(PlatformTransactionManager tm,TransactionAttribute txAttr, String joinpointIdentification, TransactionStatus status)
TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification); //綁定到線程,這個是鏈表結構 txInfo.bindToThread();
TransactionAspectSupport.commitTransactionAfterReturning(txInfo)
protected void commitTransactionAfterReturning(TransactionInfo txInfo) { if (txInfo != null && txInfo.hasTransaction()) { txInfo.getTransactionManager().commit(txInfo.getTransactionStatus()); } }
protected void completeTransactionAfterThrowing(TransactionInfo txInfo, Throwable ex) { if (txInfo != null && txInfo.hasTransaction()) { if (txInfo.transactionAttribute.rollbackOn(ex)) { try { // 事務回滾 txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus()); } catch (TransactionSystemException ex2) { logger.error("Application exception overridden by rollback exception", ex); ex2.initApplicationException(ex); throw ex2; }catch (RuntimeException ex2) { logger.error("Application exception overridden by rollback exception", ex); throw ex2; }catch (Error err) { logger.error("Application exception overridden by rollback error", ex); throw err; } }else { try { // 非回滾異常,commit txInfo.getTransactionManager().commit(txInfo.getTransactionStatus()); } catch (TransactionSystemException ex2) { logger.error("Application exception overridden by commit exception", ex); ex2.initApplicationException(ex); throw ex2; }catch (RuntimeException ex2) { logger.error("Application exception overridden by commit exception", ex); throw ex2; }catch (Error err) { logger.error("Application exception overridden by commit error", ex); throw err; } } } }
cleanupTransactionInfo(txInfo)
protected void cleanupTransactionInfo(TransactionInfo txInfo) { if (txInfo != null) { txInfo.restoreThreadLocalStatus(); } }