本專題大綱:java
本文爲本專題倒數第二篇文章。程序員
在上篇文章中咱們一塊兒學習了Spring中的事務抽象機制以及動手模擬了一下Spring中的事務管理機制,那麼本文咱們就經過源碼來分析一下Spring中的事務管理究竟是如何實現的,本文將選用Spring5.2.x
版本。sql
Spring事務管理的入口就是@EnableTransactionManagement
註解,因此咱們直接從這個註解入手,其源碼以下:數據庫
public @interface EnableTransactionManagement { // 是否使用cglib代理,默認是jdk代理 boolean proxyTargetClass() default false; // 使用哪一種代理模式,Spring AOP仍是AspectJ AdviceMode mode() default AdviceMode.PROXY; // 爲了完成事務管理,會向容器中添加通知 // 這個order屬性表明了通知的執行優先級 // 默認是最低優先級 int order() default Ordered.LOWEST_PRECEDENCE; }
須要注意的是,@EnableTransactionManagement
的proxyTargetClass
會影響Spring中全部經過自動代理生成的對象。若是將proxyTargetClass
設置爲true,那麼意味經過@EnableAspectJAutoProxy
所生成的代理對象也會使用cglib進行代理。關於@EnableTransactionManagement
跟@EnableAspectJAutoProxy
混用時的一些問題等咱們在對@EnableTransactionManagement
有必定了解後再專門作一個比較,如今咱們先來看看這個註解到底在作了什麼?緩存
從上圖中能夠看出這個註解作的就是向容器中註冊了AutoProxyRegistrar
跟一個ProxyTransactionManagementConfiguration
(這裏就不考慮AspectJ了,咱們日常都是使用SpringAOP),微信
AutoProxyRegistrar
用於開啓自動代理,其源碼以下:mybatis
這個類實現了ImportBeanDefinitionRegistrar
,它的做用是向容器中註冊別的BeanDefinition
,咱們直接關注它的registerBeanDefinitions
方法便可app
// AnnotationMetadata,表明的是AutoProxyRegistrar的導入類的元信息 // 既包含了類元信息,也包含了註解元信息 public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) { boolean candidateFound = false; // 獲取@EnableTransactionManagement所在配置類上的註解元信息 Set<String> annTypes = importingClassMetadata.getAnnotationTypes(); // 遍歷註解 for (String annType : annTypes) { // 能夠理解爲將註解中的屬性轉換成一個map AnnotationAttributes candidate = AnnotationConfigUtils.attributesFor(importingClassMetadata, annType); if (candidate == null) { continue; } // 直接從map中獲取對應的屬性 Object mode = candidate.get("mode"); Object proxyTargetClass = candidate.get("proxyTargetClass"); // mode,代理模型,通常都是SpringAOP // proxyTargetClass,是否使用cglib代理 if (mode != null && proxyTargetClass != null && AdviceMode.class == mode.getClass() && Boolean.class == proxyTargetClass.getClass()) { // 註解中存在這兩個屬性,而且屬性類型符合要求,表示找到了合適的註解 candidateFound = true; // 實際上會往容器中註冊一個InfrastructureAdvisorAutoProxyCreator if (mode == AdviceMode.PROXY) { AopConfigUtils.registerAutoProxyCreatorIfNecessary(registry); if ((Boolean) proxyTargetClass) { AopConfigUtils.forceAutoProxyCreatorToUseClassProxying(registry); return; } } } } // ...... }
若是對AOP比較瞭解的話,那麼應該知道@EnableAspectJAutoProxy
註解也向容器中註冊了一個能實現自動代理的bd,那麼當@EnableAspectJAutoProxy
跟@EnableTransactionManagement
同時使用會有什麼問題嗎?答案你們確定知道,不會有問題,那麼爲何呢?咱們查看源碼會發現,@EnableAspectJAutoProxy
最終調用的是異步
AopConfigUtils#registerAspectJAnnotationAutoProxyCreatorIfNecessary
,其源碼以下ide
public static BeanDefinition registerAspectJAnnotationAutoProxyCreatorIfNecessary( BeanDefinitionRegistry registry, @Nullable Object source) { return registerOrEscalateApcAsRequired(AnnotationAwareAspectJAutoProxyCreator.class, registry, source); }
@EnableTransactionManagement
最終調用的是,AopConfigUtils#registerAutoProxyCreatorIfNecessary
,其源碼以下
public static BeanDefinition registerAutoProxyCreatorIfNecessary( BeanDefinitionRegistry registry, @Nullable Object source) { return registerOrEscalateApcAsRequired(InfrastructureAdvisorAutoProxyCreator.class, registry, source); }
它們最終都會調用registerOrEscalateApcAsRequired
方法,只不過傳入的參數不同而已,一個是AnnotationAwareAspectJAutoProxyCreator
,另外一個是InfrastructureAdvisorAutoProxyCreator
。
registerOrEscalateApcAsRequired
源碼以下:
private static BeanDefinition registerOrEscalateApcAsRequired( Class<?> cls, BeanDefinitionRegistry registry, @Nullable Object source) { if (registry.containsBeanDefinition(AUTO_PROXY_CREATOR_BEAN_NAME)) { BeanDefinition apcDefinition = registry.getBeanDefinition(AUTO_PROXY_CREATOR_BEAN_NAME); if (!cls.getName().equals(apcDefinition.getBeanClassName())) { // 當前已經註冊到容器中的Bean的優先級 int currentPriority = findPriorityForClass(apcDefinition.getBeanClassName()); // 當前準備註冊到容器中的Bean的優先級 int requiredPriority = findPriorityForClass(cls); // 誰的優先級大就註冊誰,AnnotationAwareAspectJAutoProxyCreator是最大的 // 因此AnnotationAwareAspectJAutoProxyCreator會覆蓋別的Bean if (currentPriority < requiredPriority) { apcDefinition.setBeanClassName(cls.getName()); } } return null; } // 註冊bd RootBeanDefinition beanDefinition = new RootBeanDefinition(cls); beanDefinition.setSource(source); beanDefinition.getPropertyValues().add("order", Ordered.HIGHEST_PRECEDENCE); beanDefinition.setRole(BeanDefinition.ROLE_INFRASTRUCTURE); registry.registerBeanDefinition(AUTO_PROXY_CREATOR_BEAN_NAME, beanDefinition); return beanDefinition; }
InfrastructureAdvisorAutoProxyCreator
跟AnnotationAwareAspectJAutoProxyCreator
的優先級是如何定義的呢?咱們來看看AopConfigUtils
這個類中的一個靜態代碼塊
static { APC_PRIORITY_LIST.add(InfrastructureAdvisorAutoProxyCreator.class); APC_PRIORITY_LIST.add(AspectJAwareAdvisorAutoProxyCreator.class); APC_PRIORITY_LIST.add(AnnotationAwareAspectJAutoProxyCreator.class); }
實際上它們的優先級就是在APC_PRIORITY_LIST
這個集合中的下標,下標越大優先級越高,因此AnnotationAwareAspectJAutoProxyCreator
的優先級最高,因此AnnotationAwareAspectJAutoProxyCreator
會覆蓋InfrastructureAdvisorAutoProxyCreator
,那麼這種覆蓋會不會形成問題呢?答案確定是不會的,由於你用了這麼久了也沒出過問題嘛~那麼再思考一個問題,爲何不會出現問題呢?這是由於InfrastructureAdvisorAutoProxyCreator
只會使用容器內部定義的Advisor
,可是AnnotationAwareAspectJAutoProxyCreator
會使用全部實現了Advisor
接口的通知,也就是說AnnotationAwareAspectJAutoProxyCreator
的做用範圍大於InfrastructureAdvisorAutoProxyCreator
,所以這種覆蓋是沒有問題的。限於篇幅緣由這個問題我不作詳細解答了,有興趣的同窗能夠看下兩個類的源碼。
@EnableTransactionManagement
除了註冊了一個AutoProxyRegistrar
外,還向容器中註冊了一個ProxyTransactionManagementConfiguration
。
那麼這個ProxyTransactionManagementConfiguration
有什麼做用呢?
若是你們對我文章的風格有一些瞭解的話就會知道,分析一個類通常有兩個切入點
- 它的繼承關係
- 它提供的API
你們本身在閱讀源碼時也能夠參考這種思路,分析一個類的繼承關係可讓咱們瞭解它從抽象到實現的過程,即便不去細看API也能知道它的大致做用。僅僅知道它的大體做用是不夠的,爲了更好了解它的細節咱們就須要進一步去探索它的具體實現,也就是它提供的API。這算是我看了這麼就源碼的一點心得,正好想到了因此在這裏分享下,以後會專門寫一篇源碼心得的文章
這個類的繼承關係仍是很簡單的,只有一個父類AbstractTransactionManagementConfiguration
源碼以下:
@Configuration public abstract class AbstractTransactionManagementConfiguration implements ImportAware { @Nullable protected AnnotationAttributes enableTx; @Nullable protected TransactionManager txManager; // 這個方法就是獲取@EnableTransactionManagement的屬性 // importMetadata:就是@EnableTransactionManagement這個註解所在類的元信息 @Override public void setImportMetadata(AnnotationMetadata importMetadata) { // 將EnableTransactionManagement註解中的屬性對存入到map中 // AnnotationAttributes實際上就是個map this.enableTx = AnnotationAttributes.fromMap( importMetadata.getAnnotationAttributes(EnableTransactionManagement.class.getName(), false)); // 這裏能夠看到,限定了導入的註解必須使用@EnableTransactionManagement if (this.enableTx == null) { throw new IllegalArgumentException( "@EnableTransactionManagement is not present on importing class " + importMetadata.getClassName()); } } // 咱們能夠配置TransactionManagementConfigurer // 經過TransactionManagementConfigurer向容器中註冊一個事務管理器 // 通常不會這麼使用,更多的是經過@Bean的方式直接註冊 @Autowired(required = false) void setConfigurers(Collection<TransactionManagementConfigurer> configurers) { // ..... TransactionManagementConfigurer configurer = configurers.iterator().next(); this.txManager = configurer.annotationDrivenTransactionManager(); } // 向容器中註冊一個TransactionalEventListenerFactory // 這個類用於處理@TransactionalEventListener註解 // 能夠實現對事件的監聽,而且在事務的特定階段對事件進行處理 @Bean(name = TransactionManagementConfigUtils.TRANSACTIONAL_EVENT_LISTENER_FACTORY_BEAN_NAME) @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public static TransactionalEventListenerFactory transactionalEventListenerFactory() { return new TransactionalEventListenerFactory(); } }
上面的代碼中你們可能比較不熟悉的就是TransactionalEventListenerFactory
,這個類主要是用來處理@TransactionalEventListener
註解的,咱們來看一個實際使用的例子
@Component public class DmzListener { // 添加一個監聽器 // phase = TransactionPhase.AFTER_COMMIT意味着這個方法在事務提交後執行 @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) public void listen(DmzTransactionEvent transactionEvent){ System.out.println("事務已提交"); } } // 定義一個事件 public class DmzTransactionEvent extends ApplicationEvent { public DmzTransactionEvent(Object source) { super(source); } } @Component public class DmzService { @Autowired ApplicationContext applicationContext; // 一個須要進行事務管理的方法 @Transactional public void invokeWithTransaction() { // 發佈一事件 applicationContext.publishEvent(new DmzTransactionEvent(this)); // 以一條sout語句提代sql執行過程 System.out.println("sql invoked"); } } // 測試方法 public class Main { public static void main(String[] args) { AnnotationConfigApplicationContext ac = new AnnotationConfigApplicationContext(Config.class); DmzService dmzService = ac.getBean(DmzService.class); dmzService.invokeWithTransaction(); } } // 最後程序會按順序輸出 // sql invoked // 事務已提交
經過上面的例子咱們能夠看到,雖然咱們在invokeWithTransaction
方法中一開始就發佈了一個事件,可是監聽事件的方法倒是在invokeWithTransaction
才執行的,正常事件的監聽是同步的,假設咱們將上述例子中的@TransactionalEventListener
註解替換成爲@EventListener
註解,以下:
@Component public class DmzListener { // 添加一個監聽器 @EventListener public void listen(DmzTransactionEvent transactionEvent){ System.out.println("事務已提交"); } }
這個時候程序的輸出就會是
// 事務已提交 // sql invoked
那麼@TransactionalEventListener
註解是實現這種看似異步(實際上並非)的監聽方式的呢?
你們按照上面這個調用鏈能夠找到這麼一段代碼
經過上面的代碼,咱們能夠發現最終會調用到TransactionalEventListenerFactory
的createApplicationListener
方法,經過這個方法建立一個事件監聽器而後添加到容器中,createApplicationListener
方法源碼以下:
public ApplicationListener<?> createApplicationListener(String beanName, Class<?> type, Method method) { return new ApplicationListenerMethodTransactionalAdapter(beanName, type, method); }
就是建立了一個ApplicationListenerMethodTransactionalAdapter
,這個類自己就是一個事件監聽器(實現了ApplicationListener
接口)。咱們直接關注它的事件監聽方法,也就是onApplicationEvent
方法,其源碼以下:
@Override public void onApplicationEvent(ApplicationEvent event) { // 激活了同步,而且真實存在事務 if (TransactionSynchronizationManager.isSynchronizationActive() && TransactionSynchronizationManager.isActualTransactionActive()) { // 其實是依賴事務的同步機制實現的事件監聽 TransactionSynchronization transactionSynchronization = createTransactionSynchronization(event); TransactionSynchronizationManager.registerSynchronization(transactionSynchronization); } // 在沒有開啓事務的狀況下是否處理事件 else if (this.annotation.fallbackExecution()) { // .... // 若是註解中的fallbackExecution爲true,意味着沒有事務開啓的話 // 也會執行監聽邏輯 processEvent(event); } else { // .... } }
到這一步邏輯已經清楚了,@TransactionalEventListener
所標註的方法在容器啓動時被解析成了一個ApplicationListenerMethodTransactionalAdapter
,這個類自己就是一個事件監聽器,當容器中的組件發佈了一個事件後,若是事件匹配,會進入它的onApplicationEvent
方法,這個方法並無直接執行咱們所定義的監聽邏輯,而是給當前事務註冊了一個同步的行爲,當事務到達某一個階段時,這個行爲會被觸發。經過這種方式,實現一種僞異步。實際上註冊到事務的的同步就是TransactionSynchronizationEventAdapter
,這個類的源碼很是簡單,這裏就單獨取它一個方法看下
// 這個方法會在事務提交前執行 public void beforeCommit(boolean readOnly) { // 在執行時會先判斷在@TransactionalEventListener註解中定義的phase是否是BEFORE_COMMIT // 若是不是的話,什麼事情都不作 if (this.phase == TransactionPhase.BEFORE_COMMIT) { processEvent(); } }
別看上面這麼多內容,到目前爲止咱們仍是隻對ProxyTransactionManagementConfiguration
的父類作了介紹,接下來咱們就來看看ProxyTransactionManagementConfiguration
自身作了什麼事情。
// proxyBeanMethods=false,意味着不對配置類生成代理對象 @Configuration(proxyBeanMethods = false) @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public class ProxyTransactionManagementConfiguration extends AbstractTransactionManagementConfiguration { // 註冊了一個BeanFactoryTransactionAttributeSourceAdvisor // advisor就是一個綁定了切點的通知 // 能夠看到通知就是TransactionInterceptor // 切點會經過TransactionAttributeSource去解析@Transacational註解 // 只會對有這個註解的方法進行攔截 @Bean(name = TransactionManagementConfigUtils.TRANSACTION_ADVISOR_BEAN_NAME) // BeanDefinition的角色是一個基礎設施類 @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public BeanFactoryTransactionAttributeSourceAdvisor transactionAdvisor( TransactionAttributeSource transactionAttributeSource, TransactionInterceptor transactionInterceptor) { BeanFactoryTransactionAttributeSourceAdvisor advisor = new BeanFactoryTransactionAttributeSourceAdvisor(); advisor.setTransactionAttributeSource(transactionAttributeSource); advisor.setAdvice(transactionInterceptor); if (this.enableTx != null) { advisor.setOrder(this.enableTx.<Integer>getNumber("order")); } return advisor; } // 註冊一個AnnotationTransactionAttributeSource // 這個類的主要做用是用來解析@Transacational註解 @Bean @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public TransactionAttributeSource transactionAttributeSource() { return new AnnotationTransactionAttributeSource(); } // 事務是經過AOP實現的,AOP的核心就是攔截器 // 這裏就是註冊了實現事務須要的攔截器 @Bean @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public TransactionInterceptor transactionInterceptor(TransactionAttributeSource transactionAttributeSource) { TransactionInterceptor interceptor = new TransactionInterceptor(); interceptor.setTransactionAttributeSource(transactionAttributeSource); if (this.txManager != null) { interceptor.setTransactionManager(this.txManager); } return interceptor; } }
實現事務管理的核心就是SpringAOP,而完成SpringAOP的核心就是通知(Advice),通知的核心就是攔截器,關於SpringAOP、切點、通知在以前的文章中已經作過詳細介紹了,因此對這一塊本文就跳過了,咱們直接定位到事務管理的核心TransactionInterceptor
。
TransactionInterceptor
實現了MethodInterceptor
,核心方法就是invoke
方法,咱們直接定位到這個方法的具體實現邏輯
// invocation:表明了要進行事務管理的方法 public Object invoke(MethodInvocation invocation) throws Throwable { Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null); // 核心方法就是invokeWithinTransaction return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed); }
這個方法很長,可是主要能夠分爲三段
這裏咱們只分析標準事務管理
,下面的源碼也只保留標準事務管理相關代碼
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass, final InvocationCallback invocation) throws Throwable { // 以前在配置類中註冊了一個AnnotationTransactionAttributeSource // 這裏就是直接返回了以前註冊的那個Bean,經過它去獲取事務屬性 TransactionAttributeSource tas = getTransactionAttributeSource(); // 解析@Transactional註解獲取事務屬性 final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null); // 獲取對應的事務管理器 final TransactionManager tm = determineTransactionManager(txAttr); // ... // 忽略響應式的事務管理 // ... // 作了個強轉PlatformTransactionManager PlatformTransactionManager ptm = asPlatformTransactionManager(tm); // 切點名稱(類名+方法名),會被做爲事務的名稱 final String joinpointIdentification = methodIdentification(method, targetClass, txAttr); if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) { // 建立事務 TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification); Object retVal; try { // 這裏執行真正的業務邏輯 retVal = invocation.proceedWithInvocation(); } catch (Throwable ex) { // 方法執行出現異常,在異常狀況下完成事務 completeTransactionAfterThrowing(txInfo, ex); throw ex; } finally { // 清除線程中的事務信息 cleanupTransactionInfo(txInfo); } // ... // 省略不重要代碼 // ... // 提交事務 commitTransactionAfterReturning(txInfo); return retVal; } // .... // 省略回調實現事務管理相關代碼 // .... return result; } }
經過上面這段代碼能夠概括出事務管理的流程以下:
tas.getTransactionAttribute
createTransactionIfNecessary
invocation.proceedWithInvocation
completeTransactionAfterThrowing
cleanupTransactionInfo
commitTransactionAfterReturning
接下來咱們一步步分析
// 獲取事務對應的屬性,實際上返回一個AnnotationTransactionAttributeSource // 以後再調用AnnotationTransactionAttributeSource的getTransactionAttribute // getTransactionAttribute:先從攔截的方法上找@Transactional註解 // 若是方法上沒有的話,再從方法所在的類上找,若是類上尚未的話嘗試從接口或者父類上找 public TransactionAttribute getTransactionAttribute(Method method, @Nullable Class<?> targetClass) { if (method.getDeclaringClass() == Object.class) { return null; } // 在緩存中查找 Object cacheKey = getCacheKey(method, targetClass); TransactionAttribute cached = this.attributeCache.get(cacheKey); if (cached != null) { if (cached == NULL_TRANSACTION_ATTRIBUTE) { return null; } else { return cached; } } else { // 這裏真正的去執行解析 TransactionAttribute txAttr = computeTransactionAttribute(method, targetClass); // 緩存解析的結果,若是爲事務屬性爲null,也放入一個標誌 // 表明這個方法不須要進行事務管理 if (txAttr == null) { this.attributeCache.put(cacheKey, NULL_TRANSACTION_ATTRIBUTE); } else { String methodIdentification = ClassUtils.getQualifiedMethodName(method, targetClass); if (txAttr instanceof DefaultTransactionAttribute) { ((DefaultTransactionAttribute) txAttr).setDescriptor(methodIdentification); } this.attributeCache.put(cacheKey, txAttr); } return txAttr; } }
真正解析註解時調用了computeTransactionAttribute
方法,其代碼以下:
protected TransactionAttribute computeTransactionAttribute(Method method, @Nullable Class<?> targetClass) { // 默認狀況下allowPublicMethodsOnly爲true // 這意味着@Transactional若是放在非public方法上不會生效 if (allowPublicMethodsOnly() && !Modifier.isPublic(method.getModifiers())) { return null; } // method是接口中的方法 // specificMethod是具體實現類的方法 Method specificMethod = AopUtils.getMostSpecificMethod(method, targetClass); // 如今目標類方法上找 TransactionAttribute txAttr = findTransactionAttribute(specificMethod); if (txAttr != null) { return txAttr; } // 再在目標類上找 txAttr = findTransactionAttribute(specificMethod.getDeclaringClass()); if (txAttr != null && ClassUtils.isUserLevelMethod(method)) { return txAttr; } // 降級到接口跟接口中的方法上找這個註解 if (specificMethod != method) { txAttr = findTransactionAttribute(method); if (txAttr != null) { return txAttr; } txAttr = findTransactionAttribute(method.getDeclaringClass()); if (txAttr != null && ClassUtils.isUserLevelMethod(method)) { return txAttr; } } return null; }
能夠看到在computeTransactionAttribute
方法中又進一步調用了findTransactionAttribute
方法,咱們一步步跟蹤最終會進入到SpringTransactionAnnotationParser#parseTransactionAnnotation(AnnotationAttributes)
這個方法中
整個調用鏈我這裏也畫出來了,感興趣的你們能夠跟一下
對於本文,咱們直接定位到最後一步,對應源碼以下:
protected TransactionAttribute parseTransactionAnnotation(AnnotationAttributes attributes) { // 最終返回的是一個RuleBasedTransactionAttribute // 在上篇文章分析過了,定義了在出現異常時如何回滾 RuleBasedTransactionAttribute rbta = new RuleBasedTransactionAttribute(); Propagation propagation = attributes.getEnum("propagation"); rbta.setPropagationBehavior(propagation.value()); Isolation isolation = attributes.getEnum("isolation"); rbta.setIsolationLevel(isolation.value()); rbta.setTimeout(attributes.getNumber("timeout").intValue()); rbta.setReadOnly(attributes.getBoolean("readOnly")); rbta.setQualifier(attributes.getString("value")); List<RollbackRuleAttribute> rollbackRules = new ArrayList<>(); for (Class<?> rbRule : attributes.getClassArray("rollbackFor")) { rollbackRules.add(new RollbackRuleAttribute(rbRule)); } for (String rbRule : attributes.getStringArray("rollbackForClassName")) { rollbackRules.add(new RollbackRuleAttribute(rbRule)); } for (Class<?> rbRule : attributes.getClassArray("noRollbackFor")) { rollbackRules.add(new NoRollbackRuleAttribute(rbRule)); } for (String rbRule : attributes.getStringArray("noRollbackForClassName")) { rollbackRules.add(new NoRollbackRuleAttribute(rbRule)); } rbta.setRollbackRules(rollbackRules); return rbta; }
到這一步很明確了,解析了@Transactional
註解,並將這個註解的屬性封裝到了一個RuleBasedTransactionAttribute
對象中返回。
對應代碼以下:
protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm, @Nullable TransactionAttribute txAttr, final String joinpointIdentification) { // 若是沒有爲事務指定名稱,使用切點做爲事務名稱 if (txAttr != null && txAttr.getName() == null) { txAttr = new DelegatingTransactionAttribute(txAttr) { @Override public String getName() { return joinpointIdentification; } }; } TransactionStatus status = null; if (txAttr != null) { if (tm != null) { // 調用事務管理器的方法,獲取一個事務並返回事務的狀態 status = tm.getTransaction(txAttr); } // ....省略日誌 } // 將事務相關信息封裝到TransactionInfo對象中 // 並將TransactionInfo綁定到當前線程 return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status); }
從上面方法簽名上咱們能夠看到,建立事務實際上就是建立了一個TransactionInfo
。一個TransactionInfo
對象包含了事務相關的全部信息,例如實現事務使用的事務管理器(PlatformTransactionManager
),事務的屬性(TransactionAttribute
),事務的狀態(transactionStatus
)以及與當前建立的關聯的上一個事務信息(oldTransactionInfo
)。咱們能夠經過TransactionInfo
對象的hasTransaction
方法判斷是否真正建立了一個事務。
上面的核心代碼只有兩句
tm.getTransaction
,經過事務管理器建立事務prepareTransactionInfo
,封裝TransactionInfo
並綁定到線程上咱們先來看看getTransaction
幹了啥,對應代碼以下:
這個代碼應該是整個Spring實現事務管理裏面最難的了,由於牽涉到事務的傳播機制,不一樣傳播級別是如何進行處理的就是下面這段代碼決定的,比較難,但願你們能耐心看完
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException { // 事務的屬性(TransactionAttribute),經過解析@Transacational註解de'dao TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults()); // 獲取一個數據庫事務對象(DataSourceTransactionObject), // 這個對象中封裝了一個從當前線程上下文中獲取到的鏈接 Object transaction = doGetTransaction(); boolean debugEnabled = logger.isDebugEnabled(); // 判斷是否存在事務 // 若是以前獲取到的鏈接不爲空,而且鏈接上激活了事務,那麼就爲true if (isExistingTransaction(transaction)) { // 若是已經存在了事務,須要根據不一樣傳播機制進行不一樣的處理 return handleExistingTransaction(def, transaction, debugEnabled); } // 校驗事務的超時設置,默認爲-1,表明不進行超時檢查 if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) { throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout()); } // 檢查隔離級別是否爲mandatory(強制性要求必須開啓事務) // 若是爲mandatory,可是沒有事務存在,那麼拋出異常 if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) { throw new IllegalTransactionStateException( "No existing transaction found for transaction marked with propagation 'mandatory'"); } // 目前爲止沒有事務,而且隔離級別不是mandatory else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED || def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW || def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { // 當隔離級別爲required,required_new,nested時均須要新建事務 // 若是存在同步,將註冊的同步掛起 SuspendedResourcesHolder suspendedResources = suspend(null); if (debugEnabled) { logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def); } try { // 開啓一個新事務 return startTransaction(def, transaction, debugEnabled, suspendedResources); } catch (RuntimeException | Error ex) { resume(null, suspendedResources); throw ex; } } else { // 建立一個空事務,沒有實際的事務提交以及回滾機制 // 會激活同步:將數據庫鏈接綁定到當前線程上 boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null); } }
上面這段代碼能夠分爲兩種狀況分析
應用程序直接調用了一個被事務管理的方法(直接調用)
在一個須要事務管理的方法中調用了另一個須要事務管理的方法(嵌套調用)
用代碼表示以下:
@Service public class IndexService { @Autowired DmzService dmzService; // 直接調用 @Transactional public void directTransaction(){ // ...... } // 嵌套調用 @Transactional public void nestedTransaction(){ dmzService.directTransaction(); } }
ps:咱們暫且不考慮自調用的狀況,由於自調用可能會出現事務失效,在下篇文章咱們專門來聊一聊事務管理中的那些坑
咱們先來看看直接調用的狀況下上述代碼時如何執行的
doGetTransaction
源碼分析protected Object doGetTransaction() { DataSourceTransactionObject txObject = new DataSourceTransactionObject(); // 在建立DataSourceTransactionManager將其設置爲了true // 標誌是否容許 txObject.setSavepointAllowed(isNestedTransactionAllowed()); // 從線程上下文中獲取到對應的這個鏈接池中的鏈接 // 獲取對應數據源下的這個綁定的鏈接 // 當咱們將數據庫鏈接綁定到線程上時,實際上綁定到當前線程的是一個map // 其中key是對應的數據源,value是經過這個數據源獲取的一個鏈接 ConnectionHolder conHolder =(ConnectionHolder)TransactionSynchronizationManager .getResource(obtainDataSource()); // 若是當前上下文中已經有這個鏈接了,那麼將newConnectionHolder這個標誌設置爲false // 表明複用了以前的鏈接(不是一個新鏈接) txObject.setConnectionHolder(conHolder, false); return txObject; }
直接調用的狀況下,獲取到的鏈接確定爲空,因此這裏返回的是一個沒有持有數據庫鏈接的DataSourceTransactionObject
。
isExistingTransaction
源碼分析protected boolean isExistingTransaction(Object transaction) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; // 只有在存在鏈接而且鏈接上已經激活了事務纔會返回true return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive()); }
直接調用的狀況下,這裏確定是返回fasle
MANDATORY
的方法將會拋出異常required、requires_new、nested
時,會真正開啓事務,對應代碼以下else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED || def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW || def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { // 若是存在同步,將註冊的同步掛起 SuspendedResourcesHolder suspendedResources = suspend(null); if (debugEnabled) { logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def); } try { // 開啓一個新事務 return startTransaction(def, transaction, debugEnabled, suspendedResources); } catch (RuntimeException | Error ex) { resume(null, suspendedResources); throw ex; } }
咱們先來看看它的掛起操做幹了什麼,對應代碼以下:
// 直接調用時,傳入的transaction爲null protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException { // 在以前的代碼中沒有進行任何激活同步的操做,因此不會進入下面這個判斷 if (TransactionSynchronizationManager.isSynchronizationActive()) { // ... } // 傳入的transaction爲null,這個判斷也不進 else if (transaction != null) { // ... } else { return null; } }
從上面能夠看出,這段代碼其實啥都沒幹,OK,減負,直接跳過。
接着,就是真正的開啓事務了,會調用一個startTransaction
方法,對應代碼以下:
private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction, boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) { // 默認狀況下,getTransactionSynchronization方法會返回SYNCHRONIZATION_ALWAYS // 因此這裏是true boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); // 根據以前的事務定義等相關信息構造一個事務狀態對象 DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, suspendedResources); // 真正開啓事務,會從數據源中獲取鏈接並綁定到線程上 doBegin(transaction, definition); // 在這裏會激活同步 prepareSynchronization(status, definition); return status; }
newTransactionStatus
實際上就是調用了DefaultTransactionStatus
的構造函數,咱們來看一看每一個參數的含義以及實際傳入的是什麼。對應代碼以下:
// definition:事務的定義,解析@Transactional註解獲得的 // transaction:經過前面的doGetTransaction方法獲得的,關聯了一個數據庫鏈接 // newTransaction:是不是一個新的事務 // debug:是否開啓debug級別日誌 // newSynchronization:是否須要一個新的同步 // suspendedResources:表明了執行當前事務時掛起的資源 protected DefaultTransactionStatus newTransactionStatus( TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction, boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) { // 是不是一個真實的新的同步 // 除了傳入的標誌以外還須要判斷當前線程上的同步是否激活 // 沒有激活纔算是一個真正的新的同步 boolean actualNewSynchronization = newSynchronization && !TransactionSynchronizationManager.isSynchronizationActive(); // 返回一個事務狀態對象 // 包含了事務的定於、事務使用的鏈接、事務是否要開啓一個新的同步、事務掛起的資源等 return new DefaultTransactionStatus( transaction, newTransaction, actualNewSynchronization, definition.isReadOnly(), debug, suspendedResources); }
在完成事務狀態對象的構造以後,就是真正的開啓事務了,咱們也不難猜出所謂開啓事務其實就是從數據源中獲取一個一個鏈接並設置autoCommit爲false。對應代碼以下:
protected void doBegin(Object transaction, TransactionDefinition definition) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; Connection con = null; try { // 判斷txObject中是否存在鏈接而且鏈接上已經激活了事務 // txObject是經過以前的doGetTransaction方法獲得的 // 直接調用的狀況下,這個判斷確定爲true if (!txObject.hasConnectionHolder() || txObject.getConnectionHolder().isSynchronizedWithTransaction()) { // 從數據源中獲取一個鏈接 Connection newCon = obtainDataSource().getConnection(); } // 將鏈接放入到txObject中 // 第二個參數爲true,標誌這是一個新鏈接 txObject.setConnectionHolder(new ConnectionHolder(newCon), true); } // 標誌鏈接資源跟事務同步 txObject.getConnectionHolder().setSynchronizedWithTransaction(true); con = txObject.getConnectionHolder().getConnection(); // 應用事務定義中的read only跟隔離級別 // 實際就是調用了Connection的setReadOnly跟setTransactionIsolation方法 // 若是事務定義中的隔離級別跟數據庫默認的隔離級別不一致會返回的是數據庫默認的隔離級別 // 不然返回null // 主要是爲了在事務完成後能將鏈接狀態恢復 Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition); txObject.setPreviousIsolationLevel(previousIsolationLevel); txObject.setReadOnly(definition.isReadOnly()); // 設置autoCommit爲false,顯示開啓事務 if (con.getAutoCommit()) { // 表明在事務完成後須要將鏈接重置爲autoCommit=true // 跟以前的previousIsolationLevel做用同樣,都是爲了恢復鏈接 txObject.setMustRestoreAutoCommit(true); con.setAutoCommit(false); } // 是否經過顯示的語句設置read only,默認是不須要的 prepareTransactionalConnection(con, definition); txObject.getConnectionHolder().setTransactionActive(true); // 設置超時時間 int timeout = determineTimeout(definition); if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) { txObject.getConnectionHolder().setTimeoutInSeconds(timeout); } // 將鏈接綁定到當前線程上下文中,實際存入到線程上下文的是一個map // 其中key爲數據源,value爲從該數據源中獲取的一個鏈接 if (txObject.isNewConnectionHolder()) { TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder()); } } catch (Throwable ex) { // 出現異常的話,須要歸還鏈接 if (txObject.isNewConnectionHolder()) { DataSourceUtils.releaseConnection(con, obtainDataSource()); txObject.setConnectionHolder(null, false); } throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex); } }
概括起來,doBegin
作了這麼幾件事
在經過doBegin
開啓了事務後,接下來調用了prepareSynchronization
,這個方法的主要目的就是爲了準備這個事務須要的同步,對應源碼以下:
protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) { if (status.isNewSynchronization()) { // 到這裏真正激活了事務 TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction()); // 隔離級別 // 只有在不是默認隔離級別的狀況下才會綁定到線程上,不然綁定一個null TransactionSynchronizationManager.setCurrentTransactionIsolationLevel( definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ? definition.getIsolationLevel() : null); // 是否只讀 TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly()); TransactionSynchronizationManager.setCurrentTransactionName(definition.getName()); // 初始化同步行爲 TransactionSynchronizationManager.initSynchronization(); } }
主要對一些信息進行了同步,例如事務真正激活了事務,事務隔離級別,事務名稱,是否只讀。同時初始化了同步事務過程當中要執行的一些回調(也就是一些同步的行爲)
在前面咱們已經介紹了在直接調用的狀況下,若是傳播級別爲mandatory
會直接拋出異常,傳播級別爲required、requires_new、nested
時,會調用startTransaction
真正開啓一個事務,可是除了這幾種傳播級別以外還有supports、not_supported、never
。這幾種傳播級別在直接調用時會作什麼呢?前面那張圖我其實已經畫出來了,會開啓一個空事務("empty" transaction
),對應代碼以下:
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException { // ...... // mandatory拋出異常 // required、requires_new、nested調用startTransaction開啓事務 // supports、not_supported、never會進入下面這個判斷 else { // 默認同步級別就是always boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); // 直接調用prepareTransactionStatus返回一個事務狀態對象 return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null); } }
prepareTransactionStatus
代碼以下
// 傳入的參數爲:def, null, true, newSynchronization, debugEnabled, null protected final DefaultTransactionStatus prepareTransactionStatus( TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction, boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) { // 調用這個方法時 // definition:傳入的是解析@Transactional註解獲得的事務屬性 // transaction:寫死的傳入爲null,意味着沒有真正的事務() // newTransaction:寫死的傳入爲true // newSynchronization:默認同步級別爲always,在沒有真正事務的時候也進行同步 // suspendedResources:寫死了,傳入爲null,不掛起任何資源 DefaultTransactionStatus status = newTransactionStatus( definition, transaction, newTransaction, newSynchronization, debug, suspendedResources); prepareSynchronization(status, definition); return status; }
能夠看到這個方法跟以前介紹的startTransaction
方法相比較下就有幾點不一樣,最明顯的是少了一個步驟,在prepareTransactionStatus
方法中沒有調用doBegin
方法,這意味這個這個方法不會去獲取數據庫鏈接,更不會綁定數據庫鏈接到上下文中,僅僅是作了一個同步的初始化。
其次,startTransaction
方法在調用newTransactionStatus
傳入的第二個參數是從doGetTransaction
方法中獲取的,不可能爲null,而調用prepareTransactionStatus
方法時,寫死的傳入爲null。這也表明了prepareTransactionStatus
不會真正開啓一個事務。
雖然不會真正開啓一個事務,只是開啓了一個「空事務」,可是當這個空事務完成時仍然會觸發註冊的回調。
前面已經介紹了在直接調用下七種不一樣隔離級別在建立事務時的不一樣表現,代碼看似不少,實際仍是比較簡單的,接下來咱們要介紹的就是嵌套調用
,也就是已經存在事務的狀況下,調用了另一個被事務管理的方法(而且事務管理是生效的)。咱們須要關注的是下面這段代碼
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException { // 事務的屬性(TransactionAttribute) TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults()); // 從線程上下文中獲取到一個鏈接,並封裝到一個DataSourceTransactionObject對象中 Object transaction = doGetTransaction(); boolean debugEnabled = logger.isDebugEnabled(); // 判斷以前獲取到的事務上是否有鏈接,而且鏈接上激活了事務 if (isExistingTransaction(transaction)) // 嵌套調用處理在這裏 return handleExistingTransaction(def, transaction, debugEnabled); } // ... // 下面是直接調用的狀況,前文已經分析過了 // 省略直接調用的相關代碼 }
處理嵌套調用的核心代碼其實就是handleExistingTransaction
。可是進入這個方法前首先isExistingTransaction
這個方法得返回true才行,因此咱們先來看看isExistingTransaction
這個方法作了什麼,代碼以下:
protected boolean isExistingTransaction(Object transaction) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; // 首先判斷txObject中是否已經綁定了鏈接 // 其次判斷這個鏈接上是否已經激活了事務 return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive()); }
結合咱們以前分析的直接調用的代碼邏輯,能夠看出,只有外圍的事務的傳播級別爲required、requires_new、nested
時,這個判斷纔會成立。由於只有在這些傳播級別下才會真正的開啓事務,纔會將鏈接綁定到當前線程,doGetTransaction
方法才能返回一個已經綁定了數據庫鏈接的事務對象。
在知足了isExistingTransaction
時,會進入嵌套調用的處理邏輯,也就是handleExistingTransaction
方法,其代碼以下:
代碼很長,可是你們跟着我一步步梳理就能夠了,其實邏輯也不算特別複雜
整個代碼邏輯其實就是爲了實現事務的傳播,在不一樣的傳播級別下作不一樣的事情
private TransactionStatus handleExistingTransaction( TransactionDefinition definition, Object transaction, boolean debugEnabled) throws TransactionException { // 若是嵌套的事務的傳播級別爲never,那麼直接拋出異常 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) { throw new IllegalTransactionStateException( "Existing transaction found for transaction marked with propagation 'never'"); } // 若是嵌套的事務的傳播級別爲not_soupported,那麼掛起外圍事務 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) { if (debugEnabled) { logger.debug("Suspending current transaction"); } // 掛起外圍事務 Object suspendedResources = suspend(transaction); boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); // 開啓一個新的空事務 return prepareTransactionStatus( definition, null, false, newSynchronization, debugEnabled, suspendedResources); } // 若是嵌套的事務傳播級別爲requires_new,那麼掛起外圍事務,而且新建一個新的事務 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) { SuspendedResourcesHolder suspendedResources = suspend(transaction); try { return startTransaction(definition, transaction, debugEnabled, suspendedResources); } catch (RuntimeException | Error beginEx) { resumeAfterBeginException(transaction, suspendedResources, beginEx); throw beginEx; } } // 若是嵌套事務的傳播級別爲nested,會獲取當前線程綁定的數據庫鏈接 // 並經過數據庫鏈接建立一個保存點(save point) // 其實就是調用Connection的setSavepoint方法 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { // 默認是容許的,因此這個判斷不會成立 if (!isNestedTransactionAllowed()) { throw new NestedTransactionNotSupportedException( "Transaction manager does not allow nested transactions by default - " + "specify 'nestedTransactionAllowed' property with value 'true'"); } // 默認是true if (useSavepointForNestedTransaction()) { DefaultTransactionStatus status = prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null); status.createAndHoldSavepoint(); return status; } else { // JTA進行事務管理纔會進入這這裏,咱們不作考慮 return startTransaction(definition, transaction, debugEnabled, null); } } if (debugEnabled) { logger.debug("Participating in existing transaction"); } // 嵌套事務傳播級別爲supports、required、mandatory時,是否須要校驗嵌套事務的屬性 // 主要校驗的是個隔離級別跟只讀屬性 // 默認是不須要校驗的 // 若是開啓了校驗,那麼會判斷若是外圍事務的隔離級別跟嵌套事務的隔離級別是否一致 // 若是不一致,直接拋出異常 if (isValidateExistingTransaction()) { if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) { Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel(); if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) { Constants isoConstants = DefaultTransactionDefinition.constants; // 這裏會拋出異常 } } // 嵌套事務的只讀爲false if (!definition.isReadOnly()) { // 可是外圍事務的只讀爲true,那麼直接拋出異常 if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) { // 這裏會拋出異常 } } } boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null); }
觀察上面的代碼咱們能夠發現,返回值只有兩個狀況(不考慮拋出異常)
startTransaction
方法prepareTransactionStatus
方法在前面咱們已經介紹了這兩個方法的區別,相比較於startTransaction
方法,prepareTransactionStatus
並不會再去數據源中獲取鏈接。
另外咱們還能夠發現,只有傳播級別爲required_new
的狀況纔會去調用startTransaction
方法(不考慮JTA),也就是說,只有required_new
纔會真正的獲取一個新的數據庫鏈接,其他的狀況若是支持事務的話都是複用了外圍事務獲取到的鏈接,也就是說它們實際上是加入了外圍的事務中,例如supports、required、mandatory、nested
,其中nested
又比較特殊,由於它不只僅是單純的加入了外圍的事務,並且在加入前設置了一個保存點,若是僅僅是嵌套事務發生了異常,會回滾到以前設置的這個保存點上。另外須要注意的是,由於是直接複用了外部事務的鏈接,因此supports、required、mandatory、nested
這幾種傳播級別下,嵌套的事務會隨着外部事務的提交而提交,同時也會跟着外部事物的回滾而回滾。
接下來咱們開始細節性的分析上邊的代碼,對於傳播級別爲never,沒啥好說的,直接拋出異常,由於不支持在事務中運行嘛~
當傳播級別爲not_supported
時會進入下面這段代碼
// 若是嵌套的事務的傳播級別爲not_soupported,那麼掛起外圍事務 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) { if (debugEnabled) { logger.debug("Suspending current transaction"); } // 掛起外圍事務 Object suspendedResources = suspend(transaction); boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); // 開啓一個新的空事務 return prepareTransactionStatus( definition, null, false, newSynchronization, debugEnabled, suspendedResources); }
主要就是兩個動做,首先掛起外圍事務。不少同窗可能不是很理解掛起這個詞的含義,掛起實際上作了什麼呢?
其中第一步對應的代碼以下:
protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException { // 嵌套調用下,同步是已經被激活了的 if (TransactionSynchronizationManager.isSynchronizationActive()) { // 解綁線程上綁定的同步回調,並返回 List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization(); try { Object suspendedResources = null; if (transaction != null) { // 這裏實際上就是解綁線程上綁定的數據庫鏈接 // 同時返回這個鏈接 suspendedResources = doSuspend(transaction); } // 解綁線程上綁定的事務屬性並返回 String name = TransactionSynchronizationManager.getCurrentTransactionName(); TransactionSynchronizationManager.setCurrentTransactionName(null); boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly(); TransactionSynchronizationManager.setCurrentTransactionReadOnly(false); Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel(); TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null); boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive(); TransactionSynchronizationManager.setActualTransactionActive(false); // 最後集中封裝爲一個SuspendedResourcesHolder返回 return new SuspendedResourcesHolder( suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive); } catch (RuntimeException | Error ex) { // 出現異常的話,恢復 doResumeSynchronization(suspendedSynchronizations); throw ex; } } else if (transaction != null) { // 若是沒有激活同步,那麼也須要將鏈接掛起 Object suspendedResources = doSuspend(transaction); return new SuspendedResourcesHolder(suspendedResources); } else { // Neither transaction nor synchronization active. return null; } }
在上面的代碼咱們要注意到一個操做,它會清空線程綁定的數據庫鏈接,同時在後續操做中也不會再去獲取一個數據庫鏈接從新綁定到當前線程上,因此not_supported
傳播級別下每次執行SQL均可能使用的不是同一個數據庫鏈接對象(依賴於業務中獲取鏈接的方式)。這點你們要注意,跟後面的幾種有很大的區別。
獲取到須要掛起的資源後,調用了prepareTransactionStatus
,這個方法咱們以前分析過了,可是在這裏傳入的參數是不一樣的
// definition:非null,解析事務註解得來的 // transaction:null // newTransaction:false // newSynchronization:true // suspendedResources:表明掛起的資源,包括鏈接以及同步 protected final DefaultTransactionStatus prepareTransactionStatus( TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction, boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) { DefaultTransactionStatus status = newTransactionStatus( definition, transaction, newTransaction, newSynchronization, debug, suspendedResources); prepareSynchronization(status, definition); return status; }
最終會返回一個這樣的事務狀態對象,其中的transaction
爲null、newTransaction
爲false,這兩者表明了不存在一個真正的事務。在後續的事務提交跟回滾時會根據事務狀態對象中的這兩個屬性來判斷是否須要真正執行回滾,若是不存在真正的事務,那麼也就沒有必要去回滾(固然,這只是針對內部的空事務而言,若是拋出的異常同時中斷了外部事務,那麼外部事務仍是會回滾的)。除了這兩個屬性外,還有newSynchronization
,由於在掛起同步時已經將以前的同步清空了,因此newSynchronization
仍然爲true,這個屬性會影響後續的一些同步回調,只有爲true的時候纔會執行回調操做。最後就是suspendedResources
,後續須要根據這個屬性來恢復外部事務的狀態。
當傳播級別爲requires_new
時,會進入下面這段代碼
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) { // 第一步,也是掛起 SuspendedResourcesHolder suspendedResources = suspend(transaction); try { // 第二步,開啓一個新事務,會綁定一個新的鏈接到當前線程上 return startTransaction(definition, transaction, debugEnabled, suspendedResources); } catch (RuntimeException | Error beginEx) { // 出現異常,恢復外部事務狀態 resumeAfterBeginException(transaction, suspendedResources, beginEx); throw beginEx; } }
很簡單吧,掛起事務,而後新開一個事務,而且新事務的鏈接跟外圍事務的不同。也就是說這兩個事務互不影響。它也會返回一個事務狀態對象,可是不一樣的是,transaction
不爲null、newTransaction
爲true。也就是說它有本身的提交跟回滾機制,也不難理解,畢竟是兩個不一樣的數據庫鏈接嘛~
當傳播級別爲nested
進入下面這段代碼
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { if (useSavepointForNestedTransaction()) { DefaultTransactionStatus status = prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null); status.createAndHoldSavepoint(); return status; } else { // JTA相關,不考慮 return startTransaction(definition, transaction, debugEnabled, null); } }
前面其實已經介紹過了,也很簡單。仍是把重點放在返回的事務狀態對象中的那幾個關鍵屬性上,transaction
不爲null,可是newTransaction
爲false,也就是說它也不是一個新事務,另外須要注意的是,它沒有掛起任何事務相關的資源,僅僅是建立了一個保存點而已。這個事務在回滾時,只會回滾到指定的保存點。同時由於它跟外圍事務共用一個鏈接,因此它會跟隨外圍事務的提交而提交,回滾而回滾。
剩下的supports、required、mandatory
這幾種傳播級別都會進入下面這段代碼
// 省略了校驗相關代碼,前面已經介紹過了,默認是關閉校驗的 boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
咱們會發現相比較於nested
只是少了一個建立保存點的動做。最終返回的事務狀態對象中的屬性,transaction
不爲null,可是newTransaction
爲false,也就是說它也不是一個新事務。同時由於沒有掛起外圍事務的同步,因此它也不是一個新的同步(newSynchronization爲false
)。
對於每一個隔離級別下返回的事務狀態對象中的屬性但願你們有必定了解,由於後續的回滾、提交等操做都依賴於這個事務狀態對象。
到目前爲止,咱們就介紹完了事務的建立,緊接着就是真正的執行業務代碼了,要保證業務代碼能被事務管理,最重要的一點是保證在業務代碼中執行SQL時仍然是使用咱們在開啓事務時綁定到線程上的數據庫鏈接。那麼是如何保證的呢?咱們分爲兩種狀況討論
JdbcTemplate
訪問數據庫Mybatis
訪問數據庫對於第二種,可能須要你對Mybatis
稍微有些瞭解
咱們要討論的只是
JdbcTemplate
是如何獲取到以前綁定在線程上的鏈接這個問題,不要想的太複雜
在事務專題的第一篇我就對JdbcTemplate
作了一個簡單的源碼分析,它底層獲取數據庫鏈接實際上就是調用了DataSourceUtils#doGetConnection
方法,代碼以下:
關鍵代碼我已經標出來了,實際上獲取鏈接時也是從線程上下文中獲取
Mybatis相對來講比較複雜,你們目前作爲了解便可。當Spring整合Mybatis時,事務是交由Spring來管理的,那麼Spring是如何接管Mybatis的事務的呢?核心代碼位於SqlSessionFactoryBean#buildSqlSessionFactory
方法中。其中有這麼一段代碼
在這裏替換掉了Mybatis的事務工廠(Mybatis
依賴事務工廠建立的事務對象來獲取鏈接),使用了Spring本身實現的一個事務工廠SpringManagedTransactionFactory
。經過它能夠獲取一個事務對象SpringManagedTransaction
。咱們會發現這個事務對象在獲取鏈接時調用的也是DataSourceUtils
的方法
private void openConnection() throws SQLException { this.connection = DataSourceUtils.getConnection(this.dataSource); this.autoCommit = this.connection.getAutoCommit(); this.isConnectionTransactional = DataSourceUtils.isConnectionTransactional(this.connection, this.dataSource); }
因此它也能保證使用的是開啓事務時綁定在線程上的鏈接,從而保證事務的正確性。
出現異常時其實分爲兩種狀況,並非必定會回滾
對應代碼以下:
protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) { if (txInfo != null && txInfo.getTransactionStatus() != null) { // transactionAttribute是從@Transactional註解中解析得來的 if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) { try { // 回滾 txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus()); } // 省略異常處理 } else { try { // 即便出現異常仍然提交事務 txInfo.getTransactionManager().commit(txInfo.getTransactionStatus()); } // 省略異常處理 } } }
能夠看到,只有在知足txInfo.transactionAttribute.rollbackOn(ex)
這個條件時纔會真正執行回滾,不然即便出現了異常也會先提交事務。這個條件取決於@Transactiona
註解中的rollbackFor
屬性是如何配置的,若是不進行配置的話,默認只會對RuntimeException
或者Error
進行回滾。
在進行回滾時會調用事務管理器的rollback
方法,對應代碼以下:
public final void rollback(TransactionStatus status) throws TransactionException { // 事務狀態爲已完成的時候調用回滾會拋出異常 if (status.isCompleted()) { throw new IllegalTransactionStateException( "Transaction is already completed - do not call commit or rollback more than once per transaction"); } DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status; // 這裏真正處理回滾 processRollback(defStatus, false); }
private void processRollback(DefaultTransactionStatus status, boolean unexpected) { try { // 傳入時寫死的爲false boolean unexpectedRollback = unexpected; try { // 觸發以前註冊的同步回調 triggerBeforeCompletion(status); // 存在保存點,根據咱們以前的分析,說明這是一個嵌套調用的事務 // 而且內部事務的傳播級別爲nested if (status.hasSavepoint()) { // 這裏會回滾到定義的保存點 status.rollbackToHeldSavepoint(); } // 根據咱們以前的分析有兩種狀況會知足下面這個判斷 // 1.直接調用,傳播級別爲nested、required、requires_new // 2.嵌套調用,而且內部事務的傳播級別爲requires_new else if (status.isNewTransaction()) { // 直接獲取當前線程上綁定的數據庫鏈接並調用其rollback方法 doRollback(status); } else { // 到這裏說明存在事務,可是不是一個新事務而且沒有保存點 // 也就是嵌套調用而且內部事務的傳播級別爲supports、required、mandatory if (status.hasTransaction()) { // status.isLocalRollbackOnly,表明事務的結果只能爲回滾 // 默認是false的,在整個流程中沒有看到修改這個屬性 // isGlobalRollbackOnParticipationFailure // 這個屬性的含義是在加入的事務失敗時是否回滾整個事務,默認爲true if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) { // 從這裏能夠看出,但內部的事務發生異常時會將整個大事務標記成回滾 doSetRollbackOnly(status); } else { // 進入這個判斷說明修改了全局配置isGlobalRollbackOnParticipationFailure // 內部事務異常並不影響外部事務 } } else { // 不存在事務,回滾不作任何操做 logger.debug("Should roll back transaction but cannot - no transaction available"); } // isFailEarlyOnGlobalRollbackOnly這個參數默認爲false // unexpectedRollback的值一開始就被賦值成了false if (!isFailEarlyOnGlobalRollbackOnly()) { unexpectedRollback = false; } } } catch (RuntimeException | Error ex) { triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN); throw ex; } // 觸發同步回調 triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK); // unexpectedRollback是false // 這個值若是爲true,說明 if (unexpectedRollback) { throw new UnexpectedRollbackException( "Transaction rolled back because it has been marked as rollback-only"); } } finally { // 在事務完成後須要執行一些清理動做 cleanupAfterCompletion(status); } }
上面的代碼結合註釋看起來應該都很是簡單,咱們最後關注一下cleanupAfterCompletion
這個方法,對應代碼以下
private void cleanupAfterCompletion(DefaultTransactionStatus status) { // 將事務狀態修改成已完成 status.setCompleted(); // 是不是新的同步 // 清理掉線程綁定的全部同步信息 // 直接調用時,在任意傳播級別下這個條件都是知足的 // 嵌套調用時,只有傳播級別爲not_supported、requires_new纔會知足 if (status.isNewSynchronization()) { TransactionSynchronizationManager.clear(); } // 是不是一個新的事務 // 直接調用下,required、requires_new、nested都是新開的一個事務 // 嵌套調用下,只有requires_new會新起一個事務 if (status.isNewTransaction()) { // 真正執行清理 doCleanupAfterCompletion(status.getTransaction()); } // 若是存在掛起的資源,將掛起的資源恢復 // 恢復的操做跟掛起的操做正好相反 // 就是將以前從線程解綁的資源(數據庫鏈接等)已經同步回調從新綁定到線程上 if (status.getSuspendedResources() != null) { if (status.isDebug()) { logger.debug("Resuming suspended transaction after completion of inner transaction"); } Object transaction = (status.hasTransaction() ? status.getTransaction() : null); resume(transaction, (SuspendedResourcesHolder) status.getSuspendedResources()); } }
再來看看doCleanupAfterCompletion
到底作了什麼,源碼以下:
protected void doCleanupAfterCompletion(Object transaction) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; // 先判斷是不是一個新鏈接 // 直接調用,若是真正開啓了一個事務一定是個鏈接 // 可是嵌套調用時,只有requires_new會新起一個鏈接,其他的都是複用外部事務的鏈接 // 這種狀況下不能將鏈接從線程上下文中清除,由於外部事務還須要使用 if (txObject.isNewConnectionHolder()) { TransactionSynchronizationManager.unbindResource(obtainDataSource()); } // 恢復鏈接的狀態 // 1.從新將鏈接設置爲自動提交 // 2.恢復隔離級別 // 3.將read only從新設置爲false Connection con = txObject.getConnectionHolder().getConnection(); try { if (txObject.isMustRestoreAutoCommit()) { con.setAutoCommit(true); } DataSourceUtils.resetConnectionAfterTransaction( con, txObject.getPreviousIsolationLevel(), txObject.isReadOnly()); } catch (Throwable ex) { logger.debug("Could not reset JDBC Connection after transaction", ex); } // 最後,由於事務已經完成了因此歸還鏈接 if (txObject.isNewConnectionHolder()) { DataSourceUtils.releaseConnection(con, this.dataSource); } // 將事務對象中綁定的鏈接相關信息也清空掉 txObject.getConnectionHolder().clear(); }
提交事務有兩種狀況
- 正常執行完成,提交事務
- 出現異常,可是不知足回滾條件,仍然提交事務
可是無論哪一種清空最終都會調用
AbstractPlatformTransactionManager#commit
方法,因此咱們就直接分析這個方法
代碼以下:
public final void commit(TransactionStatus status) throws TransactionException { // 跟處理回滾時是同樣的,都會先校驗事務的狀態 // 若是事務已經完成了,那麼直接拋出異常 if (status.isCompleted()) { throw new IllegalTransactionStateException( "Transaction is already completed - do not call commit or rollback more than once per transaction"); } // 這裏會檢查事務的狀態是否被設置成了只能回滾 // 這裏檢查的是事務狀態對象中的rollbackOnly屬性 DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status; if (defStatus.isLocalRollbackOnly()) { processRollback(defStatus, false); return; } // 這裏會檢查事務對象自己是否被設置成了rollbackOnly // 以前咱們在分析回滾的代碼時知道,當內部的事務發生回滾時(supports、required) // 默認狀況下會將整個事務對象標記爲回滾,實際上在外部事務提交時就會進入這個判斷 // shouldCommitOnGlobalRollbackOnly:在全局被標記成回滾時是否還要提交,默認爲false if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) { processRollback(defStatus, true); return; } // 真正處理提交 processCommit(defStatus); }
能夠看到,即便進入了提交事務的邏輯仍是可能回滾。
rollbackOnly
rollbackOnly
兩者在回滾時都是調用了processRollback
方法,可是稍有區別,經過事務狀態對象形成的回滾最終在回滾後並不會拋出異常,可是事務對象自己會拋出異常給調用者。
真正處理提交的邏輯在processCommit
方法中,代碼以下:
private void processCommit(DefaultTransactionStatus status) throws TransactionException { try { boolean beforeCompletionInvoked = false; try { boolean unexpectedRollback = false; // 留給子類複寫的一個方法,沒有作實質性的事情 prepareForCommit(status); // 觸發同步回調 triggerBeforeCommit(status); triggerBeforeCompletion(status); beforeCompletionInvoked = true; // 存在保存點,將事務中的保存點清理掉 if (status.hasSavepoint()) { if (status.isDebug()) { logger.debug("Releasing transaction savepoint"); } unexpectedRollback = status.isGlobalRollbackOnly(); status.releaseHeldSavepoint(); } // 直接調用,傳播級別爲required、nested、requires_new // 嵌套調用,且傳播級別爲requires_new else if (status.isNewTransaction()) { if (status.isDebug()) { logger.debug("Initiating transaction commit"); } // 雖然前面已經檢查過rollbackOnly了,在shouldCommitOnGlobalRollbackOnly爲true時 // 仍然須要提交事務,將unexpectedRollback設置爲true,意味着提交事務後仍要拋出異常 unexpectedRollback = status.isGlobalRollbackOnly(); // 提交的邏輯很簡單,調用了connection的commit方法 doCommit(status); } else if (isFailEarlyOnGlobalRollbackOnly()) { unexpectedRollback = status.isGlobalRollbackOnly(); } if (unexpectedRollback) { throw new UnexpectedRollbackException( "Transaction silently rolled back because it has been marked as rollback-only"); } } catch (UnexpectedRollbackException ex) { // unexpectedRollback爲true時,進入這個catch塊 // 觸發同步回調 triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK); throw ex; } catch (TransactionException ex) { // connection的doCommit方法拋出SqlException時進入這裏 if (isRollbackOnCommitFailure()) { // 在doCommit方法失敗後是否進行回滾,默認爲false doRollbackOnCommitException(status, ex); } else { // 觸發同步回調 triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN); } throw ex; } catch (RuntimeException | Error ex) { // 剩餘其它異常進入這個catch塊 if (!beforeCompletionInvoked) { triggerBeforeCompletion(status); } doRollbackOnCommitException(status, ex); throw ex; } // 觸發同步回調 try { triggerAfterCommit(status); } finally { triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED); } } finally { // 跟回滾時同樣,作一些清理動做 cleanupAfterCompletion(status); } }
整個邏輯仍是比較簡單的,最核心的步驟就是調用了數據庫鏈接對象(Connection)的commit方法。
本文主要分析了Spring中事務的實現機制,從事務實現的入口---->EnableTransactionManagement
註解,到事務實現的核心TransactionInterceptor
都作了詳細的分析。其中最複雜的一塊在於事務的建立,也就是createTransactionIfNecessary
,既要考慮直接調用的狀況,也要考慮嵌套調用的狀況,而且還須要針對不一樣的傳播級別作不一樣的處理。對於整個事務管理,咱們須要分清楚下面這幾個對象的含義
TransactionDefinition
表明的是咱們經過@Transactional
註解指定的事務的屬性,包括是否只讀、超時、回滾等。
TransactionStatus
表明的是事務的狀態,這個狀態分爲不少方面,好比事務的運行狀態(是否完成、是否被標記爲rollbakOnly),事務的同步狀態(這個事務是否開啓了一個新的同步),還有事務是否設置了保存點。更準確的來講,TransactionStatus
表明的是被@Transactional
註解修飾的方法的狀態,只要被@Transactional
註解修飾了,在執行通知邏輯時就會生成一個TransactionStatus
對象,即便這個方法沒有真正的開啓一個數據庫事務
DataSourceTransactionObject
表明的是一個數據庫事務對象,實際上保存的就是一個數據庫鏈接以及這個鏈接的狀態。
TransactionInfo
表明的是事務相關的全部信息,組合了事務管理器,事務狀態,事務定義並持有一箇舊的TransactionInfo
引用,這個對象在事務管理的流程中其實沒有實際的做用,主要的目的是爲了讓咱們在事務的運行過程當中獲取到事務的相關信息,咱們能夠直接調用TransactionAspectSupport
的currentTransactionInfo
方法獲取到當前線程綁定的TransactionInfo
對象。
另外,看完本文但願你們對於事務的傳播機制能有更深一點的認知,而不是停留在概念上,要講清楚事務的傳播機制咱們先作如下定。
凡是在執行代理方法的過程當中從數據源從新獲取了鏈接並調用了其setAtuoCommit(false)
方法,並且還將這個鏈接綁定到了線程上下文中,咱們就認爲它新建了一個事務。注意了,有三個要素
鏈接是從數據源中獲取到的
調用了鏈接的setAtuoCommit(false)
方法
這個鏈接被綁定到了線程上下文中
凡是在執行代理方法的過程當中,掛起了外部事務,而且沒有新建事務,那麼咱們認爲這個這個方法是以非事務的方式運行的。
凡是在執行代理方法的過程當中,沒有掛起外部事務,可是也沒有新建事務,那麼咱們認爲這個方法加入了外部的事務
掛起事務的表如今於清空了線程上下文中綁定的鏈接!
同時咱們分爲兩種狀況討論來分析傳播機制
直接調用
不存在加入外部事務這麼一說,要麼就是新建事務,要麼就是以非事務的方式運行,固然,也可能拋出異常。
傳播級別 | 運行方式 |
---|---|
requires_new | 新建事務 |
nested | 新建事務 |
required | 新建事務 |
supports | 非事務的方式運行 |
not_supported | 非事務的方式運行 |
never | 非事務的方式運行 |
mandatory | 拋出異常 |
間接調用
時咱們要分兩種狀況討論
在外部方法新建事務的狀況下時(說明外部事務的傳播級別爲requires_new
,nested
或者required
),當前方法的運行方式以下表所示
傳播級別 | 運行方式 |
---|---|
requires_new | 新建事務 |
nested | |
required | 直接加入到外部事務 |
supports | 直接加入到外部事務 |
not_supported | 掛起外部事務,以非事務的方式運行 |
never | 拋出異常 |
mandatory | 直接加入到外部事務 |
當外部方法沒有新建事務時,其實它的運行方式跟直接調用是同樣的,這裏就不贅述了。
本文到這裏就結束了,文章很長,但願你能夠耐心看完哈~
碼到手痠,求個三連啊!啊!啊!
若是本文對你由幫助的話,記得點個贊吧!也歡迎關注個人公衆號,微信搜索:程序員DMZ,或者掃描下方二維碼,跟着我一塊兒認認真真學Java,踏踏實實作一個coder。
我叫DMZ,一個在學習路上匍匐前行的小菜鳥!