Spring在事務管理時,對事務的處理作了極致的抽象,即PlatformTransactionManager。對事務的操做,簡單地來講,只有三步操做:獲取事務,提交事務,回滾事務。數據庫
public interface PlatformTransactionManager { // 獲取事務 TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException; // 提交事務 void commit(TransactionStatus status) throws TransactionException; // 回滾事務 void rollback(TransactionStatus status) throws TransactionException; }
固然Spring不會僅僅只提供一個接口,同時會有一個抽象模版類,實現了事務管理的具體骨架。AbstractPlatformTransactionManager類能夠說是Spring事務管理的控制檯,決定事務如何建立,提交和回滾。ide
在Spring事務管理(二)-TransactionProxyFactoryBean原理中,分析TransactionInterceptor加強時,在invoke方法中最重要的三個操做:ui
在具體操做中,最後都是經過事務管理器PlatformTransactionManager的接口實現來執行的,其實也就是上面列出的三個接口方法。咱們分別介紹這三個方法的實現,並以DataSourceTransactionManager爲實現類觀察JDBC方式事務的具體實現。.net
getTransaction方法根據事務定義來獲取事務狀態,事務狀態中記錄了事務定義,事務對象及事務相關的資源信息。對於事務的獲取,除了調用事務管理器的實現來獲取事務對象自己外,另外的很重要的一點是處理了事務的傳播方式。debug
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException { // 1.獲取事務對象 Object transaction = doGetTransaction(); // Cache debug flag to avoid repeated checks. boolean debugEnabled = logger.isDebugEnabled(); if (definition == null) { // Use defaults if no transaction definition given. definition = new DefaultTransactionDefinition(); } // 2.若是已存在事務,根據不一樣的事務傳播方式處理獲取事務 if (isExistingTransaction(transaction)) { // Existing transaction found -> check propagation behavior to find out how to behave. return handleExistingTransaction(definition, transaction, debugEnabled); } // Check definition settings for new transaction. if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) { throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout()); } // 3. 若是當前沒有事務,不一樣的事務傳播方式不一樣處理方式 // 3.1 事務傳播方式爲mandatory(強制必須有事務),則拋出異常 // No existing transaction found -> check propagation behavior to find out how to proceed. if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) { throw new IllegalTransactionStateException( "No existing transaction found for transaction marked with propagation 'mandatory'"); } // 3.2 事務傳播方式爲required或required_new或nested(嵌套),建立一個新的事務狀態 else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED || definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW || definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { SuspendedResourcesHolder suspendedResources = suspend(null); if (debugEnabled) { logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition); } try { boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); // 建立新的事務狀態對象 DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, suspendedResources); // 事務初始化 doBegin(transaction, definition); // 準備其餘同步操做 prepareSynchronization(status, definition); return status; } catch (RuntimeException | Error ex) { resume(null, suspendedResources); throw ex; } } // 3.3 其餘事務傳播方式,返回一個事務對象爲null的事務狀態對象 else { // Create "empty" transaction: no actual transaction, but potentially synchronization. if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) { logger.warn("Custom isolation level specified but no actual transaction initiated; " + "isolation level will effectively be ignored: " + definition); } boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null); } }
獲取事務的方法主要作兩件事情:code
獲取事務對象,在DataSourceTransactionManager的實現中,返回一個DataSourceTransactionObject對象orm
protected Object doGetTransaction() { DataSourceTransactionObject txObject = new DataSourceTransactionObject(); txObject.setSavepointAllowed(isNestedTransactionAllowed()); ConnectionHolder conHolder = (ConnectionHolder) // 從事務同步管理器中根據DataSource獲取數據庫鏈接資源 TransactionSynchronizationManager.getResource(obtainDataSource()); txObject.setConnectionHolder(conHolder, false); return txObject; }
每次執行doGetTransaction方法,即會建立一個DataSourceTransactionObject對象txObject,並從事務同步管理器中根據DataSource獲取數據庫鏈接持有對象ConnectionHolder,而後存入txObject中。**事務同步管理類持有一個ThreadLocal級別的resources對象,存儲DataSource和ConnectionHolder的映射關係。**所以返回的txObject中持有的ConnectionHolder可能有值,也可能爲空。而不一樣的事務傳播方式下,事務管理的處理根據txObejct中是否存在事務有不一樣的處理方式。對象
關於關注事務傳播方式的實現,不少人對事務傳播方式都是隻知其一;不知其二,只是由於沒有了解源碼的實現。如今就來看看具體的實現。事務傳播方式的實現分爲兩種狀況,事務不存在和事務已經存在。isExistingTransaction方法判斷事務是否存在,默認在AbstractPlatformTransactionManager抽象類中返回false,而在DataSourceTransactionManager實現中,則根據是否有數據庫鏈接來決定。blog
protected boolean isExistingTransaction(Object transaction) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive()); }
若是當前沒有事務,則不一樣事務傳播方式的處理以下:繼承
如何建立一個新的事務狀態
// 1. 新建事務狀態,返回DefaultTransactionStatus對象 DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, suspendedResources); // 2. 事務初始化 doBegin(transaction, definition); // 3. 準備同步操做 prepareSynchronization(status, definition); return status;
第一步,新建事務狀態,就是構建一個DefaultTransactionStatus對象
protected DefaultTransactionStatus newTransactionStatus( TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction, boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) { boolean actualNewSynchronization = newSynchronization && !TransactionSynchronizationManager.isSynchronizationActive(); return new DefaultTransactionStatus( transaction, newTransaction, actualNewSynchronization, definition.isReadOnly(), debug, suspendedResources); }
第二步,事務初始化,AbstractPlatformTransactionManager沒有實現,來看DataSourceTransactionManager的實現:獲取一個新的數據庫鏈接並開啓事務,完成事務的基本屬性設置。
protected void doBegin(Object transaction, TransactionDefinition definition) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; Connection con = null; try { // 若是當前沒有事務,由DataSource獲取一個新的數據庫鏈接,並賦予txObject if (!txObject.hasConnectionHolder() || txObject.getConnectionHolder().isSynchronizedWithTransaction()) { Connection newCon = obtainDataSource().getConnection(); if (logger.isDebugEnabled()) { logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction"); } txObject.setConnectionHolder(new ConnectionHolder(newCon), true); } // 設置數據庫鏈接與事務同步 txObject.getConnectionHolder().setSynchronizedWithTransaction(true); con = txObject.getConnectionHolder().getConnection(); // 設置事務隔離級別 Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition); txObject.setPreviousIsolationLevel(previousIsolationLevel); // Switch to manual commit if necessary. This is very expensive in some JDBC drivers, // so we don't want to do it unnecessarily (for example if we've explicitly // configured the connection pool to set it already). // 很是重要的一點,JDBC經過設置自動提交爲false,開啓一個新的事務 if (con.getAutoCommit()) { txObject.setMustRestoreAutoCommit(true); if (logger.isDebugEnabled()) { logger.debug("Switching JDBC Connection [" + con + "] to manual commit"); } con.setAutoCommit(false); } // 若是設置事務只讀屬性,執行Statement設置只讀 prepareTransactionalConnection(con, definition); // 激活事務狀態 txObject.getConnectionHolder().setTransactionActive(true); // 設置超時時間 int timeout = determineTimeout(definition); if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) { txObject.getConnectionHolder().setTimeoutInSeconds(timeout); } // Bind the connection holder to the thread. // 若是爲新鏈接,綁定DataSource和數據庫鏈接持有者的映射關係 if (txObject.isNewConnectionHolder()) { TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder()); } } catch (Throwable ex) { if (txObject.isNewConnectionHolder()) { DataSourceUtils.releaseConnection(con, obtainDataSource()); txObject.setConnectionHolder(null, false); } throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex); } }
第三步:準備同步操做,若是事務狀態開啓同步,則在事務同步管理器中設置事務基礎屬性
protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) { if (status.isNewSynchronization()) { TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction()); TransactionSynchronizationManager.setCurrentTransactionIsolationLevel( definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ? definition.getIsolationLevel() : null); TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly()); TransactionSynchronizationManager.setCurrentTransactionName(definition.getName()); TransactionSynchronizationManager.initSynchronization(); } }
若是當前已經有事務存在,由handleExistingTransaction方法完成事務操做。
private TransactionStatus handleExistingTransaction( TransactionDefinition definition, Object transaction, boolean debugEnabled) throws TransactionException { // 傳播方式爲never(不容許事務),拋出異常 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) { throw new IllegalTransactionStateException( "Existing transaction found for transaction marked with propagation 'never'"); } // 傳播方式爲not_supported(不支持),則掛起當前事務,以無事務方式運行 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) { if (debugEnabled) { logger.debug("Suspending current transaction"); } Object suspendedResources = suspend(transaction); boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); return prepareTransactionStatus( definition, null, false, newSynchronization, debugEnabled, suspendedResources); } // 傳播方式爲required_new,掛起原有事務,並開啓新的事務 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) { if (debugEnabled) { logger.debug("Suspending current transaction, creating new transaction with name [" + definition.getName() + "]"); } SuspendedResourcesHolder suspendedResources = suspend(transaction); try { boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, suspendedResources); doBegin(transaction, definition); prepareSynchronization(status, definition); return status; } catch (RuntimeException | Error beginEx) { resumeAfterBeginException(transaction, suspendedResources, beginEx); throw beginEx; } } // 傳播方式爲nested(嵌套),建立嵌套事務 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { if (!isNestedTransactionAllowed()) { throw new NestedTransactionNotSupportedException( "Transaction manager does not allow nested transactions by default - " + "specify 'nestedTransactionAllowed' property with value 'true'"); } if (debugEnabled) { logger.debug("Creating nested transaction with name [" + definition.getName() + "]"); } // 使用保存點支持嵌套事務 if (useSavepointForNestedTransaction()) { // Create savepoint within existing Spring-managed transaction, // through the SavepointManager API implemented by TransactionStatus. // Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization. DefaultTransactionStatus status = prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null); status.createAndHoldSavepoint(); return status; } // 只適用於JTA事務:經過嵌套的begin和commit/rollback建立嵌套事務 else { // Nested transaction through nested begin and commit/rollback calls. // Usually only for JTA: Spring synchronization might get activated here // in case of a pre-existing JTA transaction. boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, null); doBegin(transaction, definition); prepareSynchronization(status, definition); return status; } } // Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED. // 傳播方式爲supports或required,返回當前事務 if (debugEnabled) { logger.debug("Participating in existing transaction"); } if (isValidateExistingTransaction()) { if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) { Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel(); if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) { Constants isoConstants = DefaultTransactionDefinition.constants; throw new IllegalTransactionStateException("Participating transaction with definition [" + definition + "] specifies isolation level which is incompatible with existing transaction: " + (currentIsolationLevel != null ? isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) : "(unknown)")); } } if (!definition.isReadOnly()) { if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) { throw new IllegalTransactionStateException("Participating transaction with definition [" + definition + "] is not marked as read-only but existing transaction is"); } } } boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null); }
這裏關注兩個點
第一是事務的掛起,Spring並非真的對數據庫鏈接作了什麼掛起操做,而是在邏輯上由事務同步管理器作了事務信息和狀態的重置,並將原事務信息和狀態返回,並記錄在新的事務狀態對象中,從而造成一種鏈式結構。
protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException { if (TransactionSynchronizationManager.isSynchronizationActive()) { List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization(); try { Object suspendedResources = null; if (transaction != null) { suspendedResources = doSuspend(transaction); } String name = TransactionSynchronizationManager.getCurrentTransactionName(); TransactionSynchronizationManager.setCurrentTransactionName(null); boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly(); TransactionSynchronizationManager.setCurrentTransactionReadOnly(false); Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel(); TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null); boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive(); TransactionSynchronizationManager.setActualTransactionActive(false); return new SuspendedResourcesHolder( suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive); } catch (RuntimeException | Error ex) { // doSuspend failed - original transaction is still active... doResumeSynchronization(suspendedSynchronizations); throw ex; } } else if (transaction != null) { // Transaction active but no synchronization active. Object suspendedResources = doSuspend(transaction); return new SuspendedResourcesHolder(suspendedResources); } else { // Neither transaction nor synchronization active. return null; } }
第二是嵌套事務設置保存點,一般由JDBC3.0支持的savepoint API完成,而後將保存點記錄在事務狀態中。
DefaultTransactionStatus status = prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null); status.createAndHoldSavepoint(); return status;
至此,完成了獲取事務
首先要說的,commit方法並非必定提交事務,也可能回滾。
public final void commit(TransactionStatus status) throws TransactionException { // 事務已經提交,再次提交拋出異常 if (status.isCompleted()) { throw new IllegalTransactionStateException( "Transaction is already completed - do not call commit or rollback more than once per transaction"); } DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status; // 若是事務狀態設置了回滾標識,則執行回滾 if (defStatus.isLocalRollbackOnly()) { if (defStatus.isDebug()) { logger.debug("Transactional code has requested rollback"); } processRollback(defStatus, false); return; } // 設置全局回滾標識爲true,則執行回滾 if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) { if (defStatus.isDebug()) { logger.debug("Global transaction is marked as rollback-only but transactional code requested commit"); } processRollback(defStatus, true); return; } // 提交事務 processCommit(defStatus); }
processCommit執行事務的提交,但事務的提交也分爲幾種狀況:
且在processCommit方法中,不一樣時候設置了不一樣狀態的觸發監控,用來提示事務同步相關資源,觸發須要的操做。
private void processCommit(DefaultTransactionStatus status) throws TransactionException { try { boolean beforeCompletionInvoked = false; try { boolean unexpectedRollback = false; prepareForCommit(status); // 提交前提示 triggerBeforeCommit(status); // 完成前提示 triggerBeforeCompletion(status); beforeCompletionInvoked = true; if (status.hasSavepoint()) { if (status.isDebug()) { logger.debug("Releasing transaction savepoint"); } unexpectedRollback = status.isGlobalRollbackOnly(); status.releaseHeldSavepoint(); } else if (status.isNewTransaction()) { if (status.isDebug()) { logger.debug("Initiating transaction commit"); } unexpectedRollback = status.isGlobalRollbackOnly(); // 執行事務提交 doCommit(status); } else if (isFailEarlyOnGlobalRollbackOnly()) { unexpectedRollback = status.isGlobalRollbackOnly(); } // Throw UnexpectedRollbackException if we have a global rollback-only // marker but still didn't get a corresponding exception from commit. if (unexpectedRollback) { throw new UnexpectedRollbackException( "Transaction silently rolled back because it has been marked as rollback-only"); } } catch (UnexpectedRollbackException ex) { // can only be caused by doCommit // 回滾完成提示 triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK); throw ex; } catch (TransactionException ex) { // can only be caused by doCommit if (isRollbackOnCommitFailure()) { doRollbackOnCommitException(status, ex); } else { // 未知狀態完成提示 triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN); } throw ex; } catch (RuntimeException | Error ex) { if (!beforeCompletionInvoked) { triggerBeforeCompletion(status); } doRollbackOnCommitException(status, ex); throw ex; } // Trigger afterCommit callbacks, with an exception thrown there // propagated to callers but the transaction still considered as committed. try { // 事務提交完成提示 triggerAfterCommit(status); } finally { // 操做完成完成提示 triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED); } } finally { // 完成後清理 cleanupAfterCompletion(status); } }
DataSourceTransactionManager對doCommit的實現,就是執行數據庫鏈接的提交
protected void doCommit(DefaultTransactionStatus status) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction(); Connection con = txObject.getConnectionHolder().getConnection(); if (status.isDebug()) { logger.debug("Committing JDBC transaction on Connection [" + con + "]"); } try { // 提交事務 con.commit(); } catch (SQLException ex) { throw new TransactionSystemException("Could not commit JDBC transaction", ex); } }
回滾事務時也分爲幾種狀況:
private void processRollback(DefaultTransactionStatus status, boolean unexpected) { try { boolean unexpectedRollback = unexpected; try { triggerBeforeCompletion(status); if (status.hasSavepoint()) { if (status.isDebug()) { logger.debug("Rolling back transaction to savepoint"); } // 回滾保存點 status.rollbackToHeldSavepoint(); } else if (status.isNewTransaction()) { if (status.isDebug()) { logger.debug("Initiating transaction rollback"); } // 回滾事務 doRollback(status); } else { // Participating in larger transaction if (status.hasTransaction()) { if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) { if (status.isDebug()) { logger.debug("Participating transaction failed - marking existing transaction as rollback-only"); } doSetRollbackOnly(status); } else { if (status.isDebug()) { logger.debug("Participating transaction failed - letting transaction originator decide on rollback"); } } } else { logger.debug("Should roll back transaction but cannot - no transaction available"); } // Unexpected rollback only matters here if we're asked to fail early if (!isFailEarlyOnGlobalRollbackOnly()) { unexpectedRollback = false; } } } catch (RuntimeException | Error ex) { triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN); throw ex; } triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK); // Raise UnexpectedRollbackException if we had a global rollback-only marker if (unexpectedRollback) { throw new UnexpectedRollbackException( "Transaction rolled back because it has been marked as rollback-only"); } } finally { cleanupAfterCompletion(status); } }
對於DataSourceTransactionManager實現,回滾保存點和回滾事務都由JDBC的API來完成。
至此,事務管理器對事務的三種操做就簡單地介紹完了,但其中事務同步資源的控制十分精妙,這裏就不作詳細的介紹。有興趣的本身去研究源碼。於我而言,任何強大的機制都是由一行行地源碼精妙地組建出來的,深刻進去,一切都將真相大白。