Spring 源碼學習(九) Transaction 事務

Spring Transaction 事務的使用和實現原理php


前言

業務系統的數據,通常最後都會落入到數據庫中,例如 MySQLOracle 等主流數據庫,不可避免的,在數據更新時,有可能會遇到錯誤,這時須要將以前的數據更新操做撤回,避免錯誤數據。html

Spring 的聲明式事務能幫咱們處理回滾操做,讓咱們不須要去關注數據庫底層的事務操做,能夠不用在出現異常狀況下,在 try / catch / finaly 中手寫回滾操做。java

Spring 的事務保證程度比行業中其它技術(例如 TCC / 2PC / 3PC 等)稍弱一些,但使用 Spring 事務已經知足大部分場景,因此它的使用和配置規則也是值得學習的。mysql

接下來一塊兒學習 Spring 事務是如何使用以及實現原理吧。git


使用例子

1.建立數據庫表github

create table test.user(
id int auto_increment	
primary key,
name varchar(20) null, age int(3) null)
engine=InnoDB charset=utf8;
複製代碼

2.建立對應數據庫表的 POspring

public class JdbcUser {

	private Integer id;

	private String name;

	private Integer age;
	
	...(使用 ctrl + N 進行代碼補全 setter 和 getter)
}
複製代碼

3.建立表與實體間的映射sql

在使用 JdbcTemplate 時很糾結,在 Java 類中寫了不少硬編碼 SQL,與 MyBatis 使用方法不同,爲了示例簡單,使用了 JdbcTemplate,不過仍是建議朋友們用 MyBatis,讓代碼風格整潔。數據庫

public class UserRowMapper implements RowMapper {


	@Override
	public Object mapRow(ResultSet rs, int rowNum) throws SQLException {
		JdbcUser user = new JdbcUser();
		user.setId(rs.getInt("id"));
		user.setName(rs.getString("name"));
		user.setAge(rs.getInt("age"));
		return user;
	}
}
複製代碼

4.建立數據操做接口編程

public interface UserDao {

	/** * 插入 * @param user 用戶信息 */
	void insertUser(JdbcUser user);

	/** * 根據 id 進行刪除 * @param id 主鍵 */
	void deleteById(Integer id);

	/** * 查詢 * @return 所有 */
	List<JdbcUser> selectAll();
}
複製代碼

5.建立數據操做接口實現類

跟書中例子不同,沒有在接口上加入事務註解,而是在公共方法上進行添加,能夠在每一個方法上自定義傳播事件、隔離級別。

public class UserJdbcTemplate implements UserDao {

	private DataSource dataSource;

	private JdbcTemplate jdbcTemplate;


	@Override
	@Transactional(propagation = Propagation.REQUIRED)
	public void insertUser(JdbcUser user) {
		String sql = "insert into user (id, name, age) values (?, ?, ?)";
		jdbcTemplate.update(sql, user.getId(), user.getName(), user.getAge());
		System.out.println("Create record : " + user.toString());
	}

	@Override
	@Transactional
	public void deleteById(Integer id) {
		String sql = "delete from user where id = ?";
		jdbcTemplate.update(sql, id);
		System.out.println("Delete record, id = " + id);
		// 事務測試,拋出異常,讓上面的插入操做回滾
		throw new RuntimeException("aa");
	}


	@Override
	public List<JdbcUser> selectAll() {
		String sql = "select * from user";
		List<JdbcUser> users = jdbcTemplate.query(sql, new UserRowMapper());
		return users;
	}



	public void setDataSource(DataSource dataSource) {
		// 使用 setter 注入參數時,同時初始化 jdbcTemplate
		this.dataSource = dataSource;
		this.jdbcTemplate = new JdbcTemplate(dataSource);
	}

}
複製代碼

6.建立配置文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:tx="http://www.springframework.org/schema/tx" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd">

	<!-- 數據源 MySQL -->
	<bean id="dataSource" class="org.springframework.jdbc.datasource.DriverManagerDataSource">
		<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
		<property name="url" value="jdbc:mysql://localhost:3306/test?characterEncoding=utf8"/>
		<property name="username" value="root"/>
		<property name="password" value="12345678"/>
	</bean>

	<bean id="userJdbcTemplate" class="transaction.UserJdbcTemplate">
		<property name="dataSource" ref="dataSource"/>
	</bean>

	<!-- 開啓事務,若是將這行去掉,將不會建立事務 -->
	<tx:annotation-driven/>
	<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
		<property name="dataSource" ref="dataSource"/>
	</bean>
</beans>
複製代碼

7.添加依賴

記得添加數據庫鏈接和 jdbctx 這兩個 spring 模塊的依賴

optional(project(":spring-jdbc"))  // for Quartz support
optional(project(":spring-tx"))  // for Quartz support
compile group: 'mysql', name: 'mysql-connector-java', version: '5.1.6'
複製代碼

8.啓動代碼

public class TransactionBootstrap {

	public static void main(String[] args) {
			ApplicationContext context = new ClassPathXmlApplicationContext("transaction/transaction.xml");
			UserJdbcTemplate jdbcTemplate = (UserJdbcTemplate) context.getBean("userJdbcTemplate");
			System.out.println("--- Records Creation start ----");
			JdbcUser user = new JdbcUser(4, "test", 21);
			jdbcTemplate.insertUser(user);
	}
}
複製代碼

經過上面的代碼,我作了兩個測試:

  1. 配置文件中,沒開啓事務。 也就是 <tx:annotation-driven/> 這一行被註釋了,雖然咱們執行的方法中拋出了 RuntimeExcepton,可是數據庫中依舊被插入了數據。
  2. 配置文件中,開啓事務。 將上面的註釋去掉,刪掉數據庫中的記錄,從新執行啓動代碼,發現數據沒有被插入, 在程序拋出異常狀況下,Spring 成功執行了事務,回滾了插入操做。

註解屬性 @Transactional

具體位置在:org.springframework.transaction.annotation.Transactional

屬性 類型 做用
value String 可選的限定描述符,指定使用的事務管理器
propagation 枚舉:Propagation 可選的事務傳播行爲
isolation 枚舉:Isolation 可選的事務隔離級別設置
readOnly boolean 設置讀寫或只讀事務,默認是隻讀
rollbackFor Class 數組,必須繼承自 Throwable 致使事務回滾的異常類數組
rollbackForClassName 類名稱數組,必須繼承自 Throwable
noRollbackFor Class 數組,必須繼承自 Throwable 不會致使事務回滾的異常類數組
noRollbackForClassName 類名稱數組,必須繼承自 Throwable

事務的傳播性 Propagation

  • REQUIRED

這是默認的傳播屬性,若是外部調用方有事務,將會加入到事務,沒有的話新建一個。

  • PROPAGATION_SUPPORTS

若是當前存在事務,則加入到該事務;若是當前沒有事務,則以非事務的方式繼續運行。

  • PROPAGATION_NOT_SUPPORTED

以非事務方式運行,若是當前存在事務,則把當前事務掛起。

  • PROPAGATION_NEVER

以非事務方式運行,若是當前存在事務,則拋出異常。


事務的隔離性 Isolation

  • READ_UNCOMMITTED

最低級別,只能保證不讀取 物理上損害的數據,容許髒讀

  • READ_COMMITTED

只能讀到已經提交的數據

  • REPEATABLE_READ

可重複讀

  • SERIALIZABLE

串行化讀,讀寫相互阻塞

這裏只是簡單描述了一下這兩個主要屬性,由於底層與數據庫相關,能夠看下我以前整理過的 MySQL鎖機制


Spring 中實現邏輯

介紹完如何使用還有關鍵屬性設定,本着知其然,知其因此然的學習精神,來了解代碼是如何實現的吧。


解析

以前在解析自定義標籤時提到,AOPTX 都使用了自定義標籤,按照咱們上一篇 AOP 的學習,再來一遍解析自定義標籤的套路:事務自定義標籤。

定位到 TxNamespaceHandler 類的初始化方法:

@Override
public void init() {
	registerBeanDefinitionParser("advice", new TxAdviceBeanDefinitionParser());
	// 使用 AnnotationDrivenBeanDefinitionParser 解析器,解析 annotation-driven 標籤
	registerBeanDefinitionParser("annotation-driven", new AnnotationDrivenBeanDefinitionParser());
	registerBeanDefinitionParser("jta-transaction-manager", new JtaTransactionManagerBeanDefinitionParser());
}
複製代碼

根據上面的方法,Spring 在初始化時候,若是遇到諸如 <tx:annotation-driven> 開頭的配置後,將會使用 AnnotationDrivenBeanDefinitionParser 解析器的 parse 方法進行解析。

public BeanDefinition parse(Element element, ParserContext parserContext) {
	registerTransactionalEventListenerFactory(parserContext);
	String mode = element.getAttribute("mode");
	// AspectJ 另外處理
	if ("aspectj".equals(mode)) {
		// mode="aspectj"
		registerTransactionAspect(element, parserContext);
		if (ClassUtils.isPresent("javax.transaction.Transactional", getClass().getClassLoader())) {
			registerJtaTransactionAspect(element, parserContext);
		}
	}
	else {
		// mode="proxy"
		AopAutoProxyConfigurer.configureAutoProxyCreator(element, parserContext);
	}
	return null;
}
複製代碼

Spring 中的事務默認是以 AOP 爲基礎,若是須要使用 AspectJ 的方式進行事務切入,須要在 mode 屬性中配置:

<tx:annotation-driven mode="aspectj"/>
複製代碼

本篇筆記主要圍繞着默認實現方式,動態 AOP 來學習,若是對於 AspectJ 實現感興趣請查閱更多資料~


註冊 InfrastructureAdvisorAutoProxyCreator

AOP 同樣,在解析時,會建立一個自動建立代理器,在事務 TX 模塊中,使用的是 InfrastructureAdvisorAutoProxyCreator

首先來看,在默認配置狀況下,AopAutoProxyConfigurer.configureAutoProxyCreator(element, parserContext) 作了什麼操做:

private static class AopAutoProxyConfigurer {
	public static void configureAutoProxyCreator(Element element, ParserContext parserContext) {
		// 註冊 InfrastructureAdvisorAutoProxyCreator 自動建立代理器
		AopNamespaceUtils.registerAutoProxyCreatorIfNecessary(parserContext, element);
		// txAdvisorBeanName = org.springframework.transaction.config.internalTransactionAdvisor
		String txAdvisorBeanName = TransactionManagementConfigUtils.TRANSACTION_ADVISOR_BEAN_NAME;
		if (!parserContext.getRegistry().containsBeanDefinition(txAdvisorBeanName)) {
			Object eleSource = parserContext.extractSource(element);
			// Create the TransactionAttributeSource definition.
			// 建立 TransactionAttributeSource 的 bean
			RootBeanDefinition sourceDef = new RootBeanDefinition(
					"org.springframework.transaction.annotation.AnnotationTransactionAttributeSource");
			// 註冊 bean,並使用 Spring 中的定義規則生成 beanName
			String sourceName = parserContext.getReaderContext().registerWithGeneratedName(sourceDef);
			// 建立 TransactionInterceptor 的 bean
			RootBeanDefinition interceptorDef = new RootBeanDefinition(TransactionInterceptor.class);
			interceptorDef.getPropertyValues().add("transactionAttributeSource", new RuntimeBeanReference(sourceName));
			String interceptorName = parserContext.getReaderContext().registerWithGeneratedName(interceptorDef);
			// 建立 TransactionAttributeSourceAdvisor 的 bean
			RootBeanDefinition advisorDef = new RootBeanDefinition(BeanFactoryTransactionAttributeSourceAdvisor.class);
			// 將 sourceName 的 bean 注入 advisor 的 transactionAttributeSource 屬性中
			advisorDef.getPropertyValues().add("transactionAttributeSource", new RuntimeBeanReference(sourceName));
			// 將 interceptorName 的 bean 注入到 advisor 的 adviceBeanName 屬性中
			advisorDef.getPropertyValues().add("adviceBeanName", interceptorName);
			if (element.hasAttribute("order")) {
				// 若是配置了 order 屬性,則加入到 bean 中
				advisorDef.getPropertyValues().add("order", element.getAttribute("order"));
			}
			// 以 txAdvisorBeanName 名字註冊 advisorDef
			parserContext.getRegistry().registerBeanDefinition(txAdvisorBeanName, advisorDef);
			// 建立 CompositeComponentDefinition
			CompositeComponentDefinition compositeDef = new CompositeComponentDefinition(element.getTagName(), eleSource);
			compositeDef.addNestedComponent(new BeanComponentDefinition(sourceDef, sourceName));
			compositeDef.addNestedComponent(new BeanComponentDefinition(interceptorDef, interceptorName));
			compositeDef.addNestedComponent(new BeanComponentDefinition(advisorDef, txAdvisorBeanName));
			parserContext.registerComponent(compositeDef);
		}
	}
}
複製代碼

在這裏註冊了代理類和三個 bean,這三個關鍵 bean 支撐了整個事務功能,爲了待會更好的理解這三者的關聯關係,咱們先來回顧下 AOP 的核心概念:

  1. Pointcut 定義一個切點,能夠在這個被攔截的方法先後進行切面邏輯。
  2. Advice 用來定義攔截行爲,在這裏實現加強的邏輯,它是一個祖先接口 org.aopalliance.aop.Advice。還有其它繼承接口,例如 MethodBeforeAdvice ,特定指方法執行前的加強。
  3. Advisor 用來封裝切面的全部信息,主要是上面兩個,它用來充當 AdvicePointcut 的適配器。

advisor_consist

回顧完 AOP 的概念後,繼續來看下這三個關鍵 bean:

  • TransactionInterceptor: 實現了 Advice 接口,在這裏定義了攔截行爲。
  • AnnotationTransactionAttributeSource:封裝了目標方法是否被攔截的邏輯,雖然沒有實現 Pointcut 接口,可是在後面目標方法判斷的時候,實際上仍是委託給了 AnnotationTransactionAttributeSource.getTransactionAttributeSource,經過適配器模式,返回了 Pointcut 切點信息。
  • TransactionAttributeSourceAdvisor: 實現了 Advisor 接口,包裝了上面兩個信息。

這三個 bean 組成的結構與 AOP 切面環繞實現的結構一致,因此先學習 AOP 的實現,對事務的瞭解會有所幫助


接着看咱們的自動建立代理器是如何建立的:

AopNamespaceUtils.registerAutoProxyCreatorIfNecessary(parserContext, element)

public static void registerAutoProxyCreatorIfNecessary( ParserContext parserContext, Element sourceElement) {
	BeanDefinition beanDefinition = AopConfigUtils.registerAutoProxyCreatorIfNecessary(
			parserContext.getRegistry(), parserContext.extractSource(sourceElement));
	useClassProxyingIfNecessary(parserContext.getRegistry(), sourceElement);
	registerComponentIfNecessary(beanDefinition, parserContext);
}

private static void registerComponentIfNecessary(@Nullable BeanDefinition beanDefinition, ParserContext parserContext) {
	if (beanDefinition != null) {
	    // 註冊的 beanName 是 org.springframework.aop.config.internalAutoProxyCreator
		parserContext.registerComponent(
				new BeanComponentDefinition(beanDefinition, AopConfigUtils.AUTO_PROXY_CREATOR_BEAN_NAME));
	}
}
複製代碼

在這一步中,註冊了一個 beanNameorg.springframework.aop.config.internalAutoProxyCreatorbeanInfrastructureAdsivorAutoProxyCreator,下圖是它的繼承體系圖:

infrastructrue_advisor_auto_proxy_creator_diagram

能夠看到,它實現了 InstantiationAwareBeanPostProcessor 這個接口,也就是說在 Spring 容器中,全部 bean 實例化時,Spring 都會保證調用其 postProcessAfterInitialization 方法。

與上一篇介紹的 AOP 代理器同樣,在實例化 bean 的時候,調用了代理器父類 AbstractAutoProxyCreatorpostProcessAfterInitialization 方法:

public Object postProcessAfterInitialization(@Nullable Object bean, String beanName) {
	if (bean != null) {
		// 組裝 key
		Object cacheKey = getCacheKey(bean.getClass(), beanName);
		if (this.earlyProxyReferences.remove(cacheKey) != bean) {
			// 若是適合被代理,則須要封裝指定的 bean
			return wrapIfNecessary(bean, beanName, cacheKey);
		}
	}
	return bean;
}
複製代碼

其中關於 wrapIfNecessory 方法,在上一篇 AOP 中已經詳細講過,這裏講下這個方法作了什麼工做:

  1. 找出指定 bean 對應的加強器
  2. 根據找出的加強器建立代理

與建立 AOP 代理類似的過程就再也不重複說,講下它們的不一樣點:


判斷目標方法是否適合 canApply

AopUtils#canApply(Advisor, Class<?>, boolean)

public static boolean canApply(Advisor advisor, Class<?> targetClass, boolean hasIntroductions) {
	if (advisor instanceof IntroductionAdvisor) {
		return ((IntroductionAdvisor) advisor).getClassFilter().matches(targetClass);
	}
	else if (advisor instanceof PointcutAdvisor) {
		PointcutAdvisor pca = (PointcutAdvisor) advisor;
		return canApply(pca.getPointcut(), targetClass, hasIntroductions);
	}
	else {
		// It doesn't have a pointcut so we assume it applies.
		return true;
	}
}
複製代碼

咱們在前面看到,TransactionAttributeSourceAdvisor 的父類是 PointcutAdvisor,因此在目標方法判斷的時候,會取出切點信息 pca.getPointcut()

咱們以前注入的切面類型 beanAnnotationTransactionAttributeSource,經過下面的方法包裝,最後返回對象類型是 TransactionAttributeSourcePointcut 的切點信息

private final TransactionAttributeSourcePointcut pointcut = new TransactionAttributeSourcePointcut() {
	@Override
	@Nullable
	protected TransactionAttributeSource getTransactionAttributeSource() {
		// 實現父類的方法,在子類中進行了擴展,返回以前在標籤註冊時的AnnotationTransactionAttributeSource
		return transactionAttributeSource;
	}
};
複製代碼

匹配標籤 match

在匹配 match 操做中,區別的是 AOP 識別的是 @Before@After,而咱們的事務 TX 識別的是 @Transactional 標籤。

判斷是不是事務方法的入口方法在這:

org.springframework.transaction.interceptor.TransactionAttributeSourcePointcut#matches

@Override
public boolean matches(Method method, Class<?> targetClass) {
	// 事務切點匹配的方法
	TransactionAttributeSource tas = getTransactionAttributeSource();
	return (tas == null || tas.getTransactionAttribute(method, targetClass) != null);
}
複製代碼

那它到底到哪一步解析事務註解的呢,繼續跟蹤代碼,答案是:

AnnotationTransactionAttributeSource#determineTransactionAttribute

protected TransactionAttribute determineTransactionAttribute(AnnotatedElement element) {
	for (TransactionAnnotationParser parser : this.annotationParsers) {
		TransactionAttribute attr = parser.parseTransactionAnnotation(element);
		if (attr != null) {
			return attr;
		}
	}
	return null;
}
複製代碼

在這一步中,遍歷註冊的註解解析器進行解析,因爲咱們關注的是事務解析,因此直接定位到事務註解的解析器:

SpringTransactionAnnotationParser#parseTransactionAnnotation(AnnotatedElement)

public TransactionAttribute parseTransactionAnnotation(AnnotatedElement element) {
	// 解析事務註解的屬性
	AnnotationAttributes attributes = AnnotatedElementUtils.findMergedAnnotationAttributes(
			element, Transactional.class, false, false);
	if (attributes != null) {
		return parseTransactionAnnotation(attributes);
	}
	else {
		return null;
	}
}
複製代碼

首先判斷是否含有 @Transactional 註解,若是有的話,才繼續調用 parse 解析方法:

protected TransactionAttribute parseTransactionAnnotation(AnnotationAttributes attributes) {
	RuleBasedTransactionAttribute rbta = new RuleBasedTransactionAttribute();
	// 註釋 9.4 解析事務註解的每個屬性
	Propagation propagation = attributes.getEnum("propagation");
	rbta.setPropagationBehavior(propagation.value());
	Isolation isolation = attributes.getEnum("isolation");
	rbta.setIsolationLevel(isolation.value());
	rbta.setTimeout(attributes.getNumber("timeout").intValue());
	rbta.setReadOnly(attributes.getBoolean("readOnly"));
	rbta.setQualifier(attributes.getString("value"));
	List<RollbackRuleAttribute> rollbackRules = new ArrayList<>();
	for (Class<?> rbRule : attributes.getClassArray("rollbackFor")) {
		rollbackRules.add(new RollbackRuleAttribute(rbRule));
	}
	for (String rbRule : attributes.getStringArray("rollbackForClassName")) {
		rollbackRules.add(new RollbackRuleAttribute(rbRule));
	}
	for (Class<?> rbRule : attributes.getClassArray("noRollbackFor")) {
		rollbackRules.add(new NoRollbackRuleAttribute(rbRule));
	}
	for (String rbRule : attributes.getStringArray("noRollbackForClassName")) {
		rollbackRules.add(new NoRollbackRuleAttribute(rbRule));
	}
	rbta.setRollbackRules(rollbackRules);
	return rbta;
}
複製代碼

小結

經過上面的步驟,完成了對應類或者方法的事務屬性解析。

主要步驟在於尋找加強器,以及判斷這些加強器是否與方法或者類匹配。

若是某個 bean 屬於可被事務加強時,也就是適用於加強器 BeanFactoryTransactionAttributeSourceAdvisor 進行加強。

以前咱們注入了 TransactionInterceptorBeanFactoryTransactionAttributeSourceAdvisor 中,因此在調用事務加強器加強的代理類時,會執行 TransactionInterceptor 進行加強。同時,也就是在 TransactionInterceptor 類中的 invoke 方法中完成整個事務的邏輯。


運行


事務加強器 TransactionInterceptor

TransactionInterceptor 支撐着整個事務功能的架構。跟以前 AOPJDK 動態代理 分析的同樣,TransactionInterceptor 攔截器繼承於 MethodInterceptor,因此咱們要從它的關鍵方法 invoke() 看起:

public Object invoke(MethodInvocation invocation) throws Throwable {
	// 註釋 9.5 執行事務攔截器,完成整個事務的邏輯
	Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
	// Adapt to TransactionAspectSupport's invokeWithinTransaction...
	return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
}
複製代碼

實際調用了父類的方法:TransactionAspectSupport#invokeWithinTransaction

protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass, final InvocationCallback invocation) throws Throwable {
	// 若是transaction屬性爲null,則該方法是非事務性的
	TransactionAttributeSource tas = getTransactionAttributeSource();
	// 獲取對應事務屬性
	final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
	// 獲取事務管理器
	final PlatformTransactionManager tm = determineTransactionManager(txAttr);
	// 構造方法惟一標識(類.方法)
	final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
	if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
		// 聲明式事務處理
		// 標準事務劃分 : 使用 getTransaction 和 commit / rollback 調用
		TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
		Object retVal;
		try {
			//傳入的是回調函數對象: invocation.proceed。 執行被加強的方法
			retVal = invocation.proceedWithInvocation();
		}
		catch (Throwable ex) {
			// 異常回滾
			completeTransactionAfterThrowing(txInfo, ex);
			throw ex;
		}
		finally {
			// 清除信息
			cleanupTransactionInfo(txInfo);
		}
		// 提交事務
		commitTransactionAfterReturning(txInfo);
		return retVal;
	}
	else {
		// 編程式事務處理
		final ThrowableHolder throwableHolder = new ThrowableHolder();
		// It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.
		try {
			Object result = ((CallbackPreferringPlatformTransactionManager) tm).execute(txAttr, status -> {
				TransactionInfo txInfo = prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
			...
			return result;
		}
	}
}
複製代碼

貼出的代碼有刪減,簡略了錯誤異常的 try / catch 和編程式事務處理的邏輯。由於咱們更多使用到的是聲明式事務處理,就是在 XML 文件配置或者 @Transactional 註解編碼,實際經過 AOP 實現,而編程式事務處理是經過 Transaction Template 實現,比較少使用到,因此省略了這部分處理代碼。


事務管理器

經過該方法,肯定要用於給定事務的特定事務管理器

TransactionAspectSupport#determineTransactionManager

protected PlatformTransactionManager determineTransactionManager(@Nullable TransactionAttribute txAttr) {
	// Do not attempt to lookup tx manager if no tx attributes are set
	// 註釋 9.6 尋找事務管理器
	if (txAttr == null || this.beanFactory == null) {
		// 若是沒有事務屬性或者 BeanFactory 爲空時,從緩存裏面尋找
		return asPlatformTransactionManager(getTransactionManager());
	}

	String qualifier = txAttr.getQualifier();
	// 若是註解配置中指定了事務管理器,直接取出使用
	if (StringUtils.hasText(qualifier)) {
		return determineQualifiedTransactionManager(this.beanFactory, qualifier);
	}
	else if (StringUtils.hasText(this.transactionManagerBeanName)) {
		return determineQualifiedTransactionManager(this.beanFactory, this.transactionManagerBeanName);
	}
	else {
		// 上面步驟都沒找到,最後纔去容器中,根據 className 來尋找
		PlatformTransactionManager defaultTransactionManager = asPlatformTransactionManager(getTransactionManager());
		...
		return defaultTransactionManager;
	}
}
複製代碼

因爲最開始咱們在 XML 文件中配置過 transactionManager 屬性,因此該方法在咱們例子中將會返回類型是 DataSourceTransactionManager 的事務管理器,下面是 DataSourceTransactionManager 的繼承體系:

datasource_transaction_manager

它實現了 InitializingBean 接口,不過只是在 afterPropertiesSet() 方法中,簡單校驗 dataSource 是否爲空,不細說這個類。


事務開啓

TransactionAspectSupport#createTransactionIfNecessary

protected TransactionInfo createTransactionIfNecessary(PlatformTransactionManager tm, TransactionAttribute txAttr, final String joinpointIdentification) {
	// 若是沒有名稱指定則使用方法惟一標識,並使用 DelegatingTransactionAttribute 包裝 txAttr
	if (txAttr != null && txAttr.getName() == null) {
		txAttr = new DelegatingTransactionAttribute(txAttr) {
			@Override
			public String getName() {
				return joinpointIdentification;
			}
		};
	}

	TransactionStatus status = null;
	if (txAttr != null) {
		if (tm != null) {
			// 獲取 TransactionStatus
			status = tm.getTransaction(txAttr);
		}
	}
	// 根據指定的屬性與 status 準備一個 TransactionInfo
	return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}
複製代碼

在建立事務方法中,主要完成如下三件事:

  1. 使用 DelegatingTransactionAttribute 包裝 txAttr 實例
  2. 獲取事務:tm.getTransaction(txAttr)
  3. 構建事務信息:prepareTransactionInfo(tm, txAttr, joinpointIdentification, status)

核心方法在第二點和第三點,分別摘取核心進行熟悉。


獲取 TransactionStatus

status = tm.getTransaction(txAttr);

因爲代碼較長,直接來總結其中幾個關鍵點

獲取事務

建立對應的事務實例,咱們使用的是 DataSourceTransactionManager 中的 doGetTransaction 方法,建立基於 JDBC 的事務實例。

protected Object doGetTransaction() {
	DataSourceTransactionObject txObject = new DataSourceTransactionObject();
	txObject.setSavepointAllowed(isNestedTransactionAllowed());
	// 若是當前線程已經記錄數據庫連接則使用原有連接
	ConnectionHolder conHolder =
			(ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
	// false 表示非新建立鏈接
	txObject.setConnectionHolder(conHolder, false);
	return txObject;
}
複製代碼

其中在同一個線程中,判斷是否有重複的事務,是在 TransactionSynchronizationManager.getResource(obtainDataSource()) 中完成的,關鍵判斷邏輯是下面這個:

private static final ThreadLocal<Map<Object, Object>> resources =
			new NamedThreadLocal<>("Transactional resources");
			
private static Object doGetResource(Object actualKey) {
	Map<Object, Object> map = resources.get();
	if (map == null) {
		return null;
	}
	Object value = map.get(actualKey);
	// Transparently remove ResourceHolder that was marked as void...
	if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
		map.remove(actualKey);
		// Remove entire ThreadLocal if empty...
		if (map.isEmpty()) {
			resources.remove();
		}
		value = null;
	}
	return value;
}
複製代碼

結論:resources 是一個 ThreadLocal 線程私有對象,每一個線程獨立存儲,因此判斷是否存在事務,判斷的依據是當前線程、當前數據源(DataSource)中是否存在活躍的事務 - map.get(actualKey)


處理已經存在的事務

根據前面說的,判斷當前線程是否存在事務,判斷依據爲當前線程記錄的鏈接不爲空且鏈接中(connectionHolder)中的 transactionActive 屬性不爲空,若是當前線程存在事務,將根據不一樣的事務傳播特性進行處理。具體代碼邏輯以下:

if (isExistingTransaction(transaction)) {
	// Existing transaction found -> check propagation behavior to find out how to behave.
	// 當前線程存在事務,分狀況進行處理
	return handleExistingTransaction(def, transaction, debugEnabled);
}
複製代碼

PROPAGATION_NEVER

在配置中配置設定爲 PROPAGATION_NEVER,表示該方法須要在非事務的環境下運行,但處於事務處理的狀態(多是外部帶事務的方法調用了非事務的方法),將會拋出異常:

if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
		throw new IllegalTransactionStateException(
				"Existing transaction found for transaction marked with propagation 'never'");
	}
複製代碼

PROPAGATION_NOT_SUPPORTED

若是有事務存在,將事務掛起,而不是拋出異常:

if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
	Object suspendedResources = suspend(transaction);
	boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
	return prepareTransactionStatus(
			definition, null, false, newSynchronization, debugEnabled, suspendedResources);
}
複製代碼

事務掛起

對於掛起操做,主要目的是記錄原有事務的狀態,以便於後續操做對事務的恢復:

實際上,suspend() 方法調用的是事務管理器 DataSourceTransactionManager 中的 doSuspend() 方法

protected Object doSuspend(Object transaction) {
	DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
	// 將數據庫鏈接設置爲 null
	txObject.setConnectionHolder(null);
	return TransactionSynchronizationManager.unbindResource(obtainDataSource());
}
複製代碼

最後調用的關鍵方法是 TransactionSynchronizationManager#doUnbindResource

private static Object doUnbindResource(Object actualKey) {
	Map<Object, Object> map = resources.get();
	if (map == null) {
		return null;
	}
	Object value = map.remove(actualKey);
	if (map.isEmpty()) {
		resources.remove();
	}
	if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
		value = null;
	}
	if (value != null && logger.isTraceEnabled()) {
        Thread.currentThread().getName() + "]");
	}
	return value;
}
複製代碼

看了第七條參考資料中的文章,結合代碼理解了事務掛起的操做:移除當前線程、數據源活動事務對象的一個過程

那它是如何實現事務掛起的呢,答案是在 doSuspend() 方法中的 txObject.setConnectionHolder(null),將 connectionHolder 設置爲 null

一個 connectionHolder 表示一個數據庫鏈接對象,若是它爲 null,表示在下次須要使用時,得從緩存池中獲取一個鏈接,新鏈接的自動提交是 true


PROPAGATION_REQUIRES_NEW

表示當前方法必須在它本身的事務裏運行,一個新的事務將被啓動,而若是有一個事務正在運行的話,則這個方法運行期間被掛起。

SuspendedResourcesHolder suspendedResources = suspend(transaction);
try {
	boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
	DefaultTransactionStatus status = newTransactionStatus(
			definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
	// 新事務的創建
	doBegin(transaction, definition);
	prepareSynchronization(status, definition);
	return status;
}
catch (RuntimeException | Error beginEx) {
	resumeAfterBeginException(transaction, suspendedResources, beginEx);
	throw beginEx;
}
複製代碼

與前一個方法相同的是,在 PROPAGATION_REQUIRES_NEW 廣播特性下,也會使用 suspend 方法將原事務掛起。方法 doBegin(),是事務開啓的核心。


PROPAGATION_NESTED

表示若是當前正有一個事務在運行中,則該方法應該運行在一個嵌套的事務中,被嵌套的事務能夠獨立於封裝事務進行提交或者回滾,若是封裝事務不存在,行爲就像 PROPAGATION_REQUIRES_NEW

在代理處理上,有兩個分支,與 PROPAGATION_REQUIRES_NEW 類似的不貼出來,講下使用 savepoint 保存點的方式事務處理:

if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
	// 嵌入式事務的處理
	if (useSavepointForNestedTransaction()) {
		DefaultTransactionStatus status =
				prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
		// 建立 savepoint
		status.createAndHoldSavepoint();
		return status;
	}
}
複製代碼

學習過數據庫的朋友應該清楚 savepoint,能夠利用保存點回滾部分事務,從而使事務處理更加靈活和精細。跟蹤代碼,發現建立保存點調用的方法是 org.hsqldb.jdbc.JDBCConnection#setSavepoint(java.lang.String),感興趣的能夠往下繼續深刻學習~


事務建立

其實在前面方法中,都出現過這個方法 doBegin(),在這個方法中建立事務,順便設置數據庫的隔離級別、timeout 屬性和設置 connectionHolder

DataSourceTransactionManager#doBegin

protected void doBegin(Object transaction, TransactionDefinition definition) {
	DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
	Connection con = null;
	if (!txObject.hasConnectionHolder() ||
			txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
		Connection newCon = obtainDataSource().getConnection();

		txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
	}

	txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
	con = txObject.getConnectionHolder().getConnection();
	// 設置隔離級別
	Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
	txObject.setPreviousIsolationLevel(previousIsolationLevel);

	// configured the connection pool to set it already).
	// 更改自動提交設置,由 spring 進行控制
	if (con.getAutoCommit()) {
		txObject.setMustRestoreAutoCommit(true);
		con.setAutoCommit(false);
	}
	// 準備事務鏈接
	prepareTransactionalConnection(con, definition);
	// 設置判斷當前線程是否存在事務的依據
	txObject.getConnectionHolder().setTransactionActive(true);

	int timeout = determineTimeout(definition);
	if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
		txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
	}

	// Bind the connection holder to the thread.
	if (txObject.isNewConnectionHolder()) {
		// 將當前獲取到的鏈接綁定到當前線程
		TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
		}
	}
}
複製代碼

結論:Spring 事務的開啓,就是將數據庫自動提交屬性設置爲 false


小結

在聲明式的事務處理中,主要有如下幾個處理步驟:

  1. 獲取事務的屬性tas.getTransactionAttribute(method, targetClass)
  2. 加載配置中配置的 TransactionManagerdetermineTransactionManager(txAttr);
  3. 不一樣的事務處理方式使用不一樣的邏輯:關於聲明式事務和編程式事務,能夠查看這篇文章-Spring編程式和聲明式事務實例講解
  4. 在目標方法執行前獲取事務並收集事務信息:createTransactionIfNecessary(tm, txAttr, joinpointIdentification)
  5. 執行目標方法invocation.proceed()
  6. 出現異常,嘗試異常處理completeTransactionAfterThrowing(txInfo, ex);
  7. 提交事務前的事務信息消除cleanupTransactionInfo(txInfo)
  8. 提交事務commitTransactionAfterReturning(txInfo)

事務回滾 & 提交

這兩步操做,主要調用了底層數據庫鏈接的 API,因此沒有細說。


總結

本篇文章簡單記錄瞭如何使用 Spring 的事務,以及在代碼中如何實現。

在以前的使用場景中,只用到了默認配置的聲明式事務 @Transactional,不瞭解其它屬性設置的含義,也不知道在默認配置下,若是是同一個類中的方法自調用是不支持事務。

因此,通過這一次學習和總結,在下一次使用時,就可以知道不一樣屬性設置能解決什麼問題,例如修改廣播特性 PROPAGATION,讓事務支持方法自調用,還有設置事務超時時間 timeout、隔離級別等屬性。


因爲我的技術有限,若是有理解不到位或者錯誤的地方,請留下評論,我會根據朋友們的建議進行修正

Gitee 地址 https://gitee.com/vip-augus/spring-analysis-note.git

Github 地址 https://github.com/Vip-Augus/spring-analysis-note


參考資料

  1. 透徹的掌握 Spring 中@transactional 的使用
  2. Spring—@Transactional註解
  3. spring 中經常使用的兩種事務配置方式以及事務的傳播性、隔離級別
  4. Spring事務之切點解析詳解
  5. Spring中的Advisor,Advice,Pointcut
  6. Spring編程式和聲明式事務實例講解
  7. spring-transaction
  8. savepoint原理
  9. Spring 源碼深度解析 / 郝佳編著. -- 北京 : 人民郵電出版社

傳送門:

相關文章
相關標籤/搜索