最近讀了Spring聲明式事務相關源碼,如今將相關原理及本人註釋過的實現源碼整理到博客上並對一些工做中的案例與事務源碼中的參數進行總結。java
名詞 | 概念 |
---|---|
PlatformTransactionManager | 事務管理器,管理事務的各生命週期方法,下文簡稱TxMgr |
TransactionAttribute | 事務屬性, 包含隔離級別,傳播行爲,是否只讀等信息,下文簡稱TxAttr |
TransactionStatus | 事務狀態,下文簡稱TxStatus |
TransactionInfo | 事務信息,內含TxMgr, TxAttr, TxStatus等信息,下文簡稱TxInfo |
TransactionSynchronization | 事務同步回調,內含多個鉤子方法,下文簡稱TxSync / transaction synchronization |
TransactionSynchronizationManager | 事務同步管理器,維護當前線程事務資源,信息以及TxSync集合 |
Spring中的Propagation枚舉和TransactionDefinition接口定義了7種事務傳播行爲:mysql
這裏介紹如下Spring聲明式事務的套路。若是能知曉大體套路會對接下來看源碼有很大的幫助。文筆不是太好,下面的描述若有不清,還請指出。git
咱們給一個bean的方法加上@Transactional
註解後,Spring容器給咱們的是一個代理的bean。當咱們對事務方法調用時,會進入Spring的ReflectiveMethodInvocation#proceed方法。這是AOP的主要實現,在進入業務方法前會調用各類方法攔截器,咱們須要關注的攔截器是org.springframework.transaction.interceptor.TransactionInterceptor。
TransactionInterceptor的職責相似於一個「環繞切面」,在業務方法調用前根據狀況開啓事務,在業務方法調用完回到攔截器後進行善後清理。github
事務切面在源碼中具體的實現方法是TransactionAspectSupport#invokeWithinTransaction,相信平時你們debug的時候在調用棧中常常能看到此方法。事務切面關注的是TransactionInfo(TxInfo),TxInfo是一個「很是大局觀」的東西(裏面啥都有:TxMgr, TxAttr, TxStatus還有前一次進入事務切面的TransactionInfo)。web
所以事務切面會調用createTransactionIfNecessary方法來建立事務並拿到一個TxInfo(不管是否真的物理建立了一個事務)。若是事務塊內的代碼發生了異常,則會根據TxInfo裏面的TxAttr配置的rollback規則看看這個異常是否是須要回滾,不須要回滾就嘗試提交,不然就嘗試回滾。若是未發生異常,則嘗試提交。spring
事務切面對於嘗試提交會判斷是否到了最外層事務(某個事務邊界)。舉個例子:有四個事務方法依次調用,傳播行爲分別是 方法1:REQUIRED, 方法2:REQUIRED, 方法3: REQUIRES_NEW, 方法4: REQUIRED。很顯然這其中包含了兩個獨立的物理事務,當退棧到方法4的事務切面時,會發現沒有到事務最外層,因此不會有真正的物理提交。而在退棧到了方法3對應的事務切面時會發現是外層事務,此時會發生物理提交。同理,退棧到方法1的事務切面時也會觸發物理提交。sql
那麼問題來了,Spring是怎麼判斷這所謂「最外層事務」的呢。
答案是TxStatus中有個屬性叫newTransaction用於標記是不是新建事務(根據事務傳播行爲得出,好比加入已有事務則會是false),以及一個名爲transaction的Object用於表示物理事務對象(由具體TxMgr子類負責給出)。Spring會根據每一層事務切面建立的TxStatus內部是否持有transaction對象以及newTransaction標誌位判斷是否屬於外層事務。session
相似的,Spring對於回滾事務也是會在最外層事務方法對應的切面中進行物理回滾。而在非最外層事務的時候會由具體txMgr子類給對應的事務打個的標記用於標識這個事務該回滾,這樣的話在全部同一物理事務方法退棧過程當中在事務切面中都能讀取到事務被打了應該回滾的標記。能夠說這是同一物理事務方法之間進行通訊的機制。ide
Spring事務代碼中用ThreadLocal來進行資源與事務的生命週期的同步管理。函數
在事務切面層面,TransactionAspectSupport裏面有個transactionInfoHolder的ThreadLocal對象,用於把TxInfo綁定到線程。那麼這樣在咱們的業務代碼或者其餘切面中,咱們能夠拿到TxInfo,也能拿到TxStatus。拿到TxStatus咱們就能夠調用setRollbackOnly來打標以手動控制事務必須回滾。
TransactionSynchronizationManager是Spring事務代碼中對ThreadLocal使用最多的類,目前它內部含有6個ThreadLocal,分別是:
Map<Object, Object>
用於保存事務相關資源,好比咱們經常使用的DataSourceTransactionManager會在開啓物理事務的時候把<DataSource, ConnectionHolder>
綁定到線程。Set<TransactionSynchronization>
用於保存transaction synchronization,這個能夠理解爲是回調鉤子對象,內部含有beforeCommit, afterCommit, beforeCompletion等鉤子方法。下面是我作的幾張圖,比較醜陋。舉了三個不一樣的事務傳播狀況,列一下TxInfo的信息(TxInfo中主要列了TxStatus的一些關鍵字段以及oldTransactionInfo字段)
最多見的REQUIRED調用REQUIRED
REQUIRED調用REQUIRES_NEW
REQUIRED調用NOT_SUPPORTED
直接進入源碼實現部分,最好須要對Spring AOP,攔截器,代理等有個基本認知閱讀起來會比較輕鬆。
TransactionInterceptor是Spring實現聲明式事務的攔截器,它實現了AOP聯盟的MethodInterceptor接口,它的父類TransactionAspectSupport封裝了一些用於實現事務切面對事務進行管理的基本代碼。下面來看一下TransactionInterceptor的繼承關係。
TransactionInterceptor對於MethodInterceptor#invoke的實現很簡單,就是調用父類的的invokeWithinTransaction,並傳遞給此方法一個回調用於繼續後續的攔截調用。
@Override public Object invoke(final MethodInvocation invocation) throws Throwable { // 由於這裏的invocation.getThis多是一個代理類,須要獲取目標原生class。 Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null); // 調用父類TransactionAspectSupport的invokeWithinTransaction方法,第三個參數是一個簡易回調實現,用於繼續方法調用鏈。 return invokeWithinTransaction(invocation.getMethod(), targetClass, new InvocationCallback() { @Override public Object proceedWithInvocation() throws Throwable { return invocation.proceed(); } }); }
這裏就是上面TransactionInterceptor調用的invokeWithinTransaction實現,能夠將之看做是一個大的環繞切面,將事務的建立與提交/回滾包在事務方法的外圍。
protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation) throws Throwable { // 獲取TransactionAttribute、PlatformTransactionManager、以及鏈接點方法信息。 final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass); final PlatformTransactionManager tm = determineTransactionManager(txAttr); final String joinpointIdentification = methodIdentification(method, targetClass, txAttr); if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) { // Standard transaction demarcation with getTransaction and commit/rollback calls. // 根據上面抓取出來的txAttribute, tm, 鏈接點方法等信息判斷是否須要開啓事務。 TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification); Object retVal = null; try { // 執行回調,若是沒有後續攔截器的話,就進入事務方法了。 retVal = invocation.proceedWithInvocation(); } catch (Throwable ex) { // 事務發生異常。 completeTransactionAfterThrowing(txInfo, ex); throw ex; } finally { // 把上一層事務的TxInfo從新綁到ThreadLocal中。 cleanupTransactionInfo(txInfo); } // 事務未發生異常。 commitTransactionAfterReturning(txInfo); return retVal; } else { try { Object result = ((CallbackPreferringPlatformTransactionManager) tm).execute(txAttr, new TransactionCallback<Object>() { @Override public Object doInTransaction(TransactionStatus status) { TransactionInfo txInfo = prepareTransactionInfo(tm, txAttr, joinpointIdentification, status); try { return invocation.proceedWithInvocation(); } catch (Throwable ex) { if (txAttr.rollbackOn(ex)) { if (ex instanceof RuntimeException) { throw (RuntimeException) ex; } else { throw new ThrowableHolderException(ex); } } else { return new ThrowableHolder(ex); } } finally { cleanupTransactionInfo(txInfo); } } }); if (result instanceof ThrowableHolder) { throw ((ThrowableHolder) result).getThrowable(); } else { return result; } } catch (ThrowableHolderException ex) { throw ex.getCause(); } } }
protected TransactionInfo createTransactionIfNecessary( PlatformTransactionManager tm, TransactionAttribute txAttr, final String joinpointIdentification) { // 若是事務屬性中name爲null,則建立一個簡易委託類,name爲鏈接點方法標識。 if (txAttr != null && txAttr.getName() == null) { txAttr = new DelegatingTransactionAttribute(txAttr) { @Override public String getName() { return joinpointIdentification; } }; } TransactionStatus status = null; if (txAttr != null) { if (tm != null) { // 根據事務屬性判斷是否須要開啓事務,並返回狀態。 status = tm.getTransaction(txAttr); } else { if (logger.isDebugEnabled()) { logger.debug("Skipping transactional joinpoint [" + joinpointIdentification + "] because no transaction manager has been configured"); } } } return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status); } protected TransactionInfo prepareTransactionInfo(PlatformTransactionManager tm, TransactionAttribute txAttr, String joinpointIdentification, TransactionStatus status) { TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification); // 事務方法。 if (txAttr != null) { if (logger.isTraceEnabled()) { logger.trace("Getting transaction for [" + txInfo.getJoinpointIdentification() + "]"); } // The transaction manager will flag an error if an incompatible tx already exists. txInfo.newTransactionStatus(status); } else { // 非事務方法。 if (logger.isTraceEnabled()) logger.trace("Don't need to create transaction for [" + joinpointIdentification + "]: This method isn't transactional."); } // 不管是否建立了新事務,這裏都會把當前的txInfo對象經過threadLocal變量綁定到當前線程。 txInfo.bindToThread(); return txInfo; }
AbstractPlatformTransactionManager是各類事務管理器的抽象基類,也能夠說是骨架。它封裝了不少事務管理的流程代碼,子類須要實現一些模板方法。下面列出一些主要的模板方法。
咱們首先從上面createTransactionIfNecessary方法中調用到的getTransaction方法開始看起。
public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException { // 根據具體的tm實現獲取一個transaction對象。 Object transaction = doGetTransaction(); boolean debugEnabled = logger.isDebugEnabled(); if (definition == null) { definition = new DefaultTransactionDefinition(); } // 已經存在事務的狀況。 if (isExistingTransaction(transaction)) { return handleExistingTransaction(definition, transaction, debugEnabled); } // timeout不能小於-1。 if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) { throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout()); } // 若是傳播行爲是MANDATORY,則應該拋出異常(由於此時不存在事務) if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) { throw new IllegalTransactionStateException( "No existing transaction found for transaction marked with propagation 'mandatory'"); } // 傳播行爲是REQUIRED, REQUIRES_NEW, NESTED。 else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED || definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW || definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { SuspendedResourcesHolder suspendedResources = suspend(null); if (debugEnabled) { logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition); } try { boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); // 注意這裏的newTransaction標識位是true。 DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, suspendedResources); // 調用供子類實現的模板方法doBegin來開啓事務。 doBegin(transaction, definition); // 調用TransactionSynchronizationManager來保存一些事務上下文信息。 prepareSynchronization(status, definition); return status; } catch (RuntimeException ex) { resume(null, suspendedResources); throw ex; } catch (Error err) { resume(null, suspendedResources); throw err; } } /* * 這裏的else分支說明傳播行爲是SUPPORTS或NOT_SUPPORTED或NEVER,這幾種狀況對於當前無事務的邏輯都是直接繼續運行。 */ else { // 若是有指定事務隔離級別,則能夠打warn日誌報出指定隔離級別開啓但沒有事務的警告。 if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) { logger.warn("Custom isolation level specified but no actual transaction initiated; " + "isolation level will effectively be ignored: " + definition); } boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); // 注意這裏的transaction傳的是null,這在嘗試commit的時候會判斷出其實沒有實際須要提交的事務。 return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null); } } /** * prepareSynchronization方法根據status是否須要維護新的事務相關資源,信息與回調。 */ protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) { if (status.isNewSynchronization()) { TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction()); TransactionSynchronizationManager.setCurrentTransactionIsolationLevel( definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ? definition.getIsolationLevel() : null); TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly()); TransactionSynchronizationManager.setCurrentTransactionName(definition.getName()); TransactionSynchronizationManager.initSynchronization(); } }
下面咱們來看一下對於當前已經有事務的狀況下,Spring是如何處理的:
private TransactionStatus handleExistingTransaction( TransactionDefinition definition, Object transaction, boolean debugEnabled) throws TransactionException { // 傳播行爲爲NEVER,拋出異常。 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) { throw new IllegalTransactionStateException( "Existing transaction found for transaction marked with propagation 'never'"); } // 傳播行爲爲NOT_SUPPORTED,則掛起當前事務。 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) { if (debugEnabled) { logger.debug("Suspending current transaction"); } Object suspendedResources = suspend(transaction); boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); return prepareTransactionStatus( definition, null, false, newSynchronization, debugEnabled, suspendedResources); } // 傳播行爲爲REQUIRES_NEW,則掛起當前事務並開啓新事務運行。 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) { if (debugEnabled) { logger.debug("Suspending current transaction, creating new transaction with name [" + definition.getName() + "]"); } SuspendedResourcesHolder suspendedResources = suspend(transaction); try { boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, suspendedResources); // 開啓事務,由具體子類TxMgr實現。 doBegin(transaction, definition); prepareSynchronization(status, definition); return status; } catch (RuntimeException beginEx) { resumeAfterBeginException(transaction, suspendedResources, beginEx); throw beginEx; } catch (Error beginErr) { resumeAfterBeginException(transaction, suspendedResources, beginErr); throw beginErr; } } // 傳播行爲爲NESTED。 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { // 若是不支持嵌套事務拋出異常。 if (!isNestedTransactionAllowed()) { throw new NestedTransactionNotSupportedException( "Transaction manager does not allow nested transactions by default - " + "specify 'nestedTransactionAllowed' property with value 'true'"); } if (debugEnabled) { logger.debug("Creating nested transaction with name [" + definition.getName() + "]"); } // 是否對嵌套事務建立還原點。 if (useSavepointForNestedTransaction()) { DefaultTransactionStatus status = prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null); // 這裏最終會委託SavePointManager去建立還原點。 status.createAndHoldSavepoint(); return status; } else { // 一般tm若是是JTA的話會走到這裏來,這種狀況就經過嵌套的begin和commit/rollback實現。 boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, null); doBegin(transaction, definition); prepareSynchronization(status, definition); return status; } } // 此時說明傳播行爲爲SUPPORTS或REQUIRED或MANDATORY,則直接加入當前事務便可。 if (debugEnabled) { logger.debug("Participating in existing transaction"); } /* * validateExistingTransaction是用來對SUPPORTS和REQUIRED的傳播行爲進行事務定義校驗的開關。 * 默認是不開啓的,此時在加入事務的時候內層註解的一些設定至關於會被忽略。 */ if (isValidateExistingTransaction()) { // 若是自定義了隔離級別校驗與已有事務是否匹配。 if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) { Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel(); // 若是已有事務隔離級別爲null(說明是默認級別)或者已有事務隔離級別不等於當前事務定義的隔離級別拋出異常。 if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) { Constants isoConstants = DefaultTransactionDefinition.constants; throw new IllegalTransactionStateException("Participating transaction with definition [" + definition + "] specifies isolation level which is incompatible with existing transaction: " + (currentIsolationLevel != null ? isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) : "(unknown)")); } } // 若是已有事務爲只讀,但本方法事務定義爲非只讀,則拋出異常。 if (!definition.isReadOnly()) { if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) { throw new IllegalTransactionStateException("Participating transaction with definition [" + definition + "] is not marked as read-only but existing transaction is"); } } } boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null); }
/** * 本方法作的事情主要就是抓取當前事務資源,事務基本信息以及事務回調transaction synchronization等塞到SuspendedResourcesHolder中返回。 */ protected final SuspendedResourcesHolder suspend(Object transaction) throws TransactionException { /* * 在TransactionSynchronizationManager的JavaDoc上已經寫明瞭在須要進行transaction synchronization註冊的時候須要先檢查當前線程是否激活 * isSynchronizationActive是去讀TransactionSynchronizationManager中當前線程綁定的TransactionSynchronization集合是否爲null。 */ if (TransactionSynchronizationManager.isSynchronizationActive()) { // 對全部本線程當前註冊的transaction synchronization調用suspend方法。 List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization(); try { Object suspendedResources = null; if (transaction != null) { // 掛起當前事務,由具體的TxMgr子類實現。 suspendedResources = doSuspend(transaction); } /* * 將當前線程綁定的事務名,是否只讀,隔離級別,是否有實際事務等信息抓取出來。 * 與剛纔抓取出來的transaction synchronization集合一塊兒包到SuspendedResourcesHolder中返回。 */ String name = TransactionSynchronizationManager.getCurrentTransactionName(); TransactionSynchronizationManager.setCurrentTransactionName(null); boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly(); TransactionSynchronizationManager.setCurrentTransactionReadOnly(false); Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel(); TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null); boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive(); TransactionSynchronizationManager.setActualTransactionActive(false); return new SuspendedResourcesHolder( suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive); } catch (RuntimeException ex) { // 恢復transaction synchronization。 doResumeSynchronization(suspendedSynchronizations); throw ex; } catch (Error err) { // 恢復transaction synchronization。 doResumeSynchronization(suspendedSynchronizations); throw err; } } else if (transaction != null) { // 掛起當前事務。 Object suspendedResources = doSuspend(transaction); return new SuspendedResourcesHolder(suspendedResources); } else { // 啥都沒有。 return null; } } private List<TransactionSynchronization> doSuspendSynchronization() { // getSynchronizations返回的是一個不可變的快照。 List<TransactionSynchronization> suspendedSynchronizations = TransactionSynchronizationManager.getSynchronizations(); // 逐一掛起。 for (TransactionSynchronization synchronization : suspendedSynchronizations) { synchronization.suspend(); } // 清理以後除非從新init不然沒法再register新的transaction synchronization了。 TransactionSynchronizationManager.clearSynchronization(); return suspendedSynchronizations; }
protected final void resume(Object transaction, SuspendedResourcesHolder resourcesHolder) throws TransactionException { if (resourcesHolder != null) { Object suspendedResources = resourcesHolder.suspendedResources; if (suspendedResources != null) { // 恢復掛起的事務資源,由具體的TxMgr子類實現。 doResume(transaction, suspendedResources); } List<TransactionSynchronization> suspendedSynchronizations = resourcesHolder.suspendedSynchronizations; if (suspendedSynchronizations != null) { // 還原掛起的事務的一些信息。 TransactionSynchronizationManager.setActualTransactionActive(resourcesHolder.wasActive); TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel); TransactionSynchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly); TransactionSynchronizationManager.setCurrentTransactionName(resourcesHolder.name); // 恢復掛起的transaction syncrhonization。 doResumeSynchronization(suspendedSynchronizations); } } } /** * 從新初始化當前線程維護的synchronization集合,逐一對這些synchronization調用resume方法並加入到集合中。 */ private void doResumeSynchronization(List<TransactionSynchronization> suspendedSynchronizations) { TransactionSynchronizationManager.initSynchronization(); for (TransactionSynchronization synchronization : suspendedSynchronizations) { synchronization.resume(); TransactionSynchronizationManager.registerSynchronization(synchronization); } }
commit與rollback兩個方法是PlatformTransactionManager接口兩個關鍵方法。
通常在事務切面加強的方法成功狀況下會調用commit方法。在事務發生異常後,completeTransactionAfterThrowing方法會根據異常與事務規則是否匹配來決定是否須要回滾。若是須要回滾則調用rollback方法。
須要注意的是commit/rollback方法只是嘗試,Spring會根據事務狀態信息來具體處理,不表明必定會物理提交/回滾,Spring會在事務最外層邊界纔可能觸發物理提交/回滾,甚至也有可能調用commit後發現須要rollback。
public final void commit(TransactionStatus status) throws TransactionException { // 若是事務已經完成了,則拋出異常。 if (status.isCompleted()) { throw new IllegalTransactionStateException( "Transaction is already completed - do not call commit or rollback more than once per transaction"); } DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status; /* * 經過前文的源碼分析能夠知道不管事務怎麼傳播,每一次進入事務攔截器,在進入業務方法以前都會把一個TransactionInfo對象塞到 * transactionInfoHolder這個線程本地變量中。而TransactionInfo包含了一個TransactionStatus對象。 * commit方法是在業務方法正常完成後調用的,所謂isLocalRollbackOnly就是讀當前TransactionStatus對象中的rollbackOnly標誌位。 * 正如其名,它是一個局部的標誌位,只有建立該status的那一層在業務方法執行完畢後會讀到本層status的這個局部標誌位。 * * 咱們能夠在用戶代碼(業務方法或者切面)中經過TransactionAspectSupport.currentTransactionStatus().setRollbackOnly(); * 來置當前事務層的status對象的rollbackOnly標誌位爲true以手動控制回滾。 */ if (defStatus.isLocalRollbackOnly()) { if (defStatus.isDebug()) { logger.debug("Transactional code has requested rollback"); } processRollback(defStatus); return; } /* * shouldCommitOnGlobalRollbackOnly默認實現是false。 * 這裏判斷的語義就是若是發現事務被標記全局回滾而且在全局回滾標記狀況下不該該提交事務的話,則進行回滾。 * * 咱們一般用的DataSourceTransactionManager對於isGlobalRollbackOnly的判斷是去讀status中transaction對象的ConnectionHolder的rollbackOnly標誌位。 */ if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) { if (defStatus.isDebug()) { logger.debug("Global transaction is marked as rollback-only but transactional code requested commit"); } processRollback(defStatus); /* * isNewTransaction用來判斷是不是最外層(事務邊界)。 * 舉個例子:傳播行爲REQUIRE方法調REQUIRE方法再調REQUIRE方法,第三個方法拋出異常,第二個方法捕獲,第一個方法走到這裏會發現到了最外層事務邊界。 * 而failEarlyOnGlobalRollbackOnly是一個標誌位,若是開啓了則會盡早拋出異常。 * * 默認狀況下failEarlyOnGlobalRollbackOnly開關是關閉的。這樣若是內層事務發生了異常,退棧到外層事務後,代碼走到這裏回滾完後會拋出UnexpectedRollbackException。 */ if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) { throw new UnexpectedRollbackException( "Transaction rolled back because it has been marked as rollback-only"); } return; } processCommit(defStatus); } private void processCommit(DefaultTransactionStatus status) throws TransactionException { try { boolean beforeCompletionInvoked = false; try { // 鉤子函數,TxMgr子類能夠覆蓋默認的空實現。 prepareForCommit(status); // 回調transaction synchronization的beforeCommit方法。 triggerBeforeCommit(status); // 回調transaction synchronization的beforeCompletion方法。 triggerBeforeCompletion(status); beforeCompletionInvoked = true; boolean globalRollbackOnly = false; if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) { globalRollbackOnly = status.isGlobalRollbackOnly(); } if (status.hasSavepoint()) { if (status.isDebug()) { logger.debug("Releasing transaction savepoint"); } status.releaseHeldSavepoint(); } // 最外層事務邊界。 else if (status.isNewTransaction()) { if (status.isDebug()) { logger.debug("Initiating transaction commit"); } // 由具體TxMgr子類實現。 doCommit(status); } /* * 咱們通常用的DataSourceTransactionManager是不會走到這裏的。 * 由於默認shouldCommitOnGlobalRollbackOnly開關是關閉的,檢測到golobalRollbackOnly是不會走到processCommit方法的。 * * 但shouldCommitOnGlobalRollbackOnly這個開關對於JtaTransactionManager來講是默認開啓的,這裏主要是須要針對檢測到globalRollbackOnly可是doCommit沒有拋出異常的狀況。 */ if (globalRollbackOnly) { throw new UnexpectedRollbackException( "Transaction silently rolled back because it has been marked as rollback-only"); } } catch (UnexpectedRollbackException ex) { // 回調transaction synchronization的afterCompletion方法。 triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK); throw ex; } catch (TransactionException ex) { // doCommit發生異常後,根據rollbackOnCommitFailure開關決定是否回滾,此開關默認關閉。 if (isRollbackOnCommitFailure()) { doRollbackOnCommitException(status, ex); } else { // 回調transaction synchronization的afterCompletion方法。 triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN); } throw ex; } catch (RuntimeException ex) { if (!beforeCompletionInvoked) { triggerBeforeCompletion(status); } doRollbackOnCommitException(status, ex); throw ex; } catch (Error err) { if (!beforeCompletionInvoked) { triggerBeforeCompletion(status); } doRollbackOnCommitException(status, err); throw err; } try { // 回調transaction synchronization的afterCommit方法。 triggerAfterCommit(status); } finally { // 回調transaction synchronization的afterCompletion方法。 triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED); } } finally { // 後續工做:status標記completed,在最外層清空transaction synchronization集合,恢復掛起事務資源等等。 cleanupAfterCompletion(status); } }
AbstractPlatformTransactionManager#rollback方法,不然調用commit方法。
public final void rollback(TransactionStatus status) throws TransactionException { if (status.isCompleted()) { throw new IllegalTransactionStateException( "Transaction is already completed - do not call commit or rollback more than once per transaction"); } DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status; processRollback(defStatus); } private void processRollback(DefaultTransactionStatus status) { try { try { // 回調transaction synchronization對象的beforeCompletion方法。 triggerBeforeCompletion(status); if (status.hasSavepoint()) { if (status.isDebug()) { logger.debug("Rolling back transaction to savepoint"); } status.rollbackToHeldSavepoint(); } // 在最外層事務邊界進行回滾。 else if (status.isNewTransaction()) { if (status.isDebug()) { logger.debug("Initiating transaction rollback"); } // 由具體TxMgr子類實現回滾。 doRollback(status); } else if (status.hasTransaction()) { /* * 內層事務被標記爲rollBackOnly或者globalRollbackOnParticipationFailure開關開啓時,給當前事務標記須要回滾。 * * 若是內層事務顯式打上了rollBackOnly的標記,最終全事務必定是回滾掉的。 * * 但若是沒有被打上rollBackOnly標記,則globalRollbackOnParticipationFailure開關就很重要了。 * globalRollbackOnParticipationFailure開關默認是開啓的,也就是說內層事務掛了,最終的結果只能是全事務回滾。 * 但若是globalRollbackOnParticipationFailure開關被關閉的話,內層事務掛了,外層事務業務方法中能夠根據狀況控制是否回滾。 */ if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) { if (status.isDebug()) { logger.debug("Participating transaction failed - marking existing transaction as rollback-only"); } // 由具體TxMgr子類實現回滾。 doSetRollbackOnly(status); } else { if (status.isDebug()) { logger.debug("Participating transaction failed - letting transaction originator decide on rollback"); } } } else { logger.debug("Should roll back transaction but cannot - no transaction available"); } } catch (RuntimeException ex) { triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN); throw ex; } catch (Error err) { triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN); throw err; } // 回調transaction synchronization對象的afterCompletion方法。 triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK); } finally { // 後續工做:status標記completed,在最外層清空transaction synchronization集合,恢復掛起事務資源等等。 cleanupAfterCompletion(status); } }
這個問題也是使用Spring聲明式事務很常見的問題。
首先保證配置都是正確的,而且開啓了Spring事務(好比@EnableTransactionManagement
)。
要明白進事務的本質就是進到事務切面的代理方法中,最多見的是同一個類的非事務方法調用一個加了事務註解的方法沒進入事務。
咱們以cglib代理爲例,因爲Spring的對於cglib AOP代理的實現,進入被代理方法的時候實際上已經離開了「代理這一層殼子」,能夠認爲代碼走到的是一個樸素的bean,調用同一個bean中方法天然與代理沒有半毛錢關係了。
通常對於聲明式事務都是以調用另外一個類的加了@Transactional
註解的public方法做爲入口的。
這個案例是工做中同事讓我看的一個線上真實案例。
下面是我將原問題用簡化代碼的形式模擬業務場景的簡單demo,只有core Java代碼和Spring相關注解的可運行代碼。
首先建立一個表。
create table t (value varchar(20));
新建三個類, ServiceA, ServiceB和Response。
/** * 模擬加了事務註解的外層service。 */ @Service public class ServiceA { @Autowired private DataSource dataSource; @Autowired private ServiceB serviceB; @Transactional public Response insert() { executeSql("insert into t select 'serviceA 開始'"); try { serviceB.insert(); } catch (Exception e) { System.out.println("serviceB#insert掛了,緣由: " + e); return Response.FAIL; } return Response.SUCC; } private void executeSql(String sql) { Connection connection = DataSourceUtils.getConnection(dataSource); try { connection.createStatement().execute(sql); } catch (SQLException e) { throw new RuntimeException(e); } } }
/** * 被外層ServiceA調用的ServiceB,拋出異常模擬來模擬掛了的狀況。 */ @Service public class ServiceB { @Autowired private DataSource dataSource; // 用於控制是否模擬insert方法掛了的狀況。 private boolean flag = true; @Transactional public void insert() { executeSql("insert into t select '這裏是ServiceB掛以前'"); if (true) { throw new RuntimeException("模擬內層事務某條語句掛了的狀況"); } executeSql("insert into t select '這裏是ServiceB掛以後'"); } private void executeSql(String sql) { Connection connection = DataSourceUtils.getConnection(dataSource); try { connection.createStatement().execute(sql); } catch (SQLException e) { throw new RuntimeException(e); } } }
/** * Response類。 */ public class Response { public static final Response SUCC = new Response(); public static final Response FAIL = new Response(); }
若是調用ServiceA#insert能夠觀察到控制檯輸出
serviceB#insert掛了,緣由: java.lang.RuntimeException: 模擬內層事務掛了的狀況
的報錯日誌。
但ServiceA#insert沒有返回Response.FAIL而且Spring還拋出了異常
org.springframework.transaction.UnexpectedRollbackException: Transaction rolled back because it has been marked as rollback-only
觀察MySQL通用日誌,結果以下:
2017-10-03T12:46:21.791139Z 382 Connect root@localhost on test using SSL/TLS 2017-10-03T12:46:21.796530Z 382 Query /* mysql-connector-java-5.1.42 ( Revision: 1f61b0b0270d9844b006572ba4e77f19c0f230d4 ) */SELECT @@session.auto_increment_increment AS auto_increment_increment, @@character_set_client AS character_set_client, @@character_set_connection AS character_set_connection, @@character_set_results AS character_set_results, @@character_set_server AS character_set_server, @@init_connect AS init_connect, @@interactive_timeout AS interactive_timeout, @@license AS license, @@lower_case_table_names AS lower_case_table_names, @@max_allowed_packet AS max_allowed_packet, @@net_buffer_length AS net_buffer_length, @@net_write_timeout AS net_write_timeout, @@query_cache_size AS query_cache_size, @@query_cache_type AS query_cache_type, @@sql_mode AS sql_mode, @@system_time_zone AS system_time_zone, @@time_zone AS time_zone, @@tx_isolation AS tx_isolation, @@wait_timeout AS wait_timeout 2017-10-03T12:46:21.821206Z 382 Query SET character_set_results = NULL 2017-10-03T12:46:21.821757Z 382 Query SET autocommit=1 2017-10-03T12:46:21.826013Z 382 Query SET autocommit=0 2017-10-03T12:46:21.837792Z 382 Query select @@session.tx_read_only 2017-10-03T12:46:21.840326Z 382 Query insert into t select 'serviceA 開始' 2017-10-03T12:46:21.850478Z 382 Query select @@session.tx_read_only 2017-10-03T12:46:21.853736Z 382 Query insert into t select '這裏是ServiceB掛以前' 2017-10-03T12:46:21.854520Z 382 Query rollback 2017-10-03T12:46:21.855058Z 382 Query SET autocommit=1 2017-10-03T12:46:21.855514Z 382 Query select @@session.tx_read_only 2017-10-03T12:46:21.856284Z 382 Quit
能夠很清楚的看到整個事務最終被回滾掉了, ServiceB#insert並無執行insert into t select '這裏是ServiceB掛以後'
。
其實對於Spring事務來講,這樣的結果是正確的,但對於開發者來講,這個結果確實看似有些「不能理解」。
咱們不妨來分析一下緣由:
首先ServiceB#insert自己是直接拋出RuntimeException的,那麼退棧到事務切面後,事務切面會發現須要回滾但由於ServiceB#insert還不是事務的最外層邊界,因此在AbstractPlatformTransactionManager#processRollback方法僅僅會調用doSetRollbackOnly(status);
,子類DataSourceTransactionManager會拿出TxStatus中的transaction對象打上回滾標記,具體來講就是transaction對象(對於DataSourceTransactionManager來講類型是DataSourceTransactionObject)會取出ConnectionHolder,調用setRollbackOnly。咱們知道這樣就至關於標記是一個全局的標記了,由於只要是隸屬於同一個物理事務的Spring事務都可以讀到同一個ConnectionHolder。
好了,接下來到了ServiceA在catch塊準備返回Response.FAIL的時候,退棧到事務切面,在AbstractPlatformTransactionManager#commit方法讀到if(!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly())
條件成立,接下來調用processRollback
,因爲在事務最外層邊界會物理回滾掉,而且也正是到了事務最外層邊界,Spring拋出UnexpectedRollbackException。
至此緣由已經分析完畢。
那麼問題怎麼解決呢,這個問題有好幾種解決辦法,可是得根據具體狀況決定。
第一種: 根據實際代碼與業務狀況看看ServiceB#insert是否有必要加事務。若是不加事務的話,其實就事務角度來分析,ServiceB#insert至關於被內聯到了ServiceA#insert中。就上面的示例而言,若是咱們把ServiceB#insert的事務註解拿掉,則事務是能夠順利提交的,Spring也不會拋出UnexpectedRollbackException。可是ServiceB#insert實際上並無完整執行,因此這樣的解決思路很容易致使出現不完整的髒數據。固然仍是要看具體業務需求,若是能夠接受的話也無所謂。
第二種:手動控制是否回滾。若是不能接受ServiceB掛的話,能夠在catch塊里加上TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
用於顯式控制回滾。這樣Spring就明白你本身要求回滾事務,而不是unexpected了。Spring也不會拋出UnexpectedRollbackException了。那麼若是在ServiceA中捕獲到異常,真的就是不想回滾,即使ServiceA發生了異常,也想要最終提交整個事務呢?若是有這樣的需求的話,能夠給TxMgr配置一個參數setGlobalRollbackOnParticipationFailure(false);
。這樣只要沒有顯式在代碼裏經過給TxStatus設置回滾標記,Spring在內層事務掛了的狀況,不會去給該事務打上須要回滾的標記。換句話說,這時候處於徹底手動擋來控制內層事務掛了的狀況到底整個事務的提交/回滾了。
第三種:繼續向上拋出異常。能夠考慮繼續向上拋出異常,通常在web項目都會配置一個大的異常處理切面,統一返回失敗Response。Service層不須要考慮Response。所以能夠考慮重構掉ServiceA的方法簽名,改成void,不關心Response。也不用去捕捉ServiceB的異常了。
本文主要以Spring聲明式事務爲切入點,介紹了Spring事務的實現原理與源碼。因爲在前文的套路簡介中也以文字描述了Spring聲明式事務的大體套路,這裏再也不贅述。這裏順便提一句,閱讀Spring源碼除了看註釋,調試代碼,其實很個東西很容易忽視——Spring打印log的語句,那些語句的內容不少時候都是頗有啓發的,會讓你忽然明白整個分支邏輯究竟是想幹什麼。
這裏再整理一下整個事務切面的流程:
下面總結一下Spring事務控制的一些重要參數。掌握這些參數能夠更靈活地配置TxMgr。
Spring事務在控制提交和回滾中用了很多判斷條件,瞭解其中一些關鍵參數的含義對debug問題頗有幫助。下文描述的一些控制參數的默認是指在AbstractPlatformTransactionManager中的默認值。
TransactionSynchronizationManager.getCurrentTransactionName()
也能拿到當前的事務名(爲此NOT_SUPPORTED事務方法名)。同理,在沒有事務的狀況下進入SUPPORTS傳播行爲的方法也可以讀到當前事務名currentTransactionName。在閱讀源碼的過程當中,避免不了閱讀Java Doc註釋,發現有一處寫了個"PROPAGATION_REQUIRES",按照TransactionDefinition中定義的常量應該是"PROPAGATION_REQUIRED"以及"PROPAGATION_REQUIRES_NEW",前者是"REQUIRED"而不是"REQUIRES"。我用正則在Spring全源碼中搜了一下把"PROPAGATION_REQUIRED"寫成"PROPAGATION_REQUIRES"的有三處,而後給Spring發了一個PR。已經合入Spring主幹。
Spring 4.3.5 源碼 StackOverflow