如何將一個操做「綁定到數據庫事務上」

  • 摘要spring

  • spring-cache簡介數據庫

  • 基本機制緩存

  • 事務上下文中的問題session

  • 將操做綁定到數據庫事務上app

  • spring-cache的相關實現less

  • TransactionSynchronizationManager和TransactionSynchronizationAdapter分佈式

  • 事務相關操做註冊與回調流程ide

  • 其它應用this

摘要spa

在開發中,咱們經常會遇到(或者須要)把一些操做「綁定到數據庫事務上」。也就是說,若是數據庫事務成功提交,則執行這個操做;若是數據庫事務回滾,則不執行這個操做(或者執行另外一個操做)。

例如,《JMS與數據庫事務》中介紹了一種JmsTemplate的配置方法,能夠把「發送JMS消息」的操做綁定到數據庫事務上。除此以外,更新緩存的操做也須要作相似的綁定處理。不然,數據庫事務回滾了,而緩存中卻完成了更新操做,可能致使一段時間內都會發生「髒讀」。

那麼,這種「綁定到數據庫事務上」的功能,是如何實現的呢?spring-cache中就有一個很好的例子。

 


 

spring-cache簡介

spring-cache本質上不是一個具體的緩存實現方案(例如EHCache 或者 OSCache),而是一個對緩存使用的抽象,經過在既有代碼中添加少許它定義的各類 annotation,即可以簡單而快捷地操做緩存。

 spring-cache提供了一個CacheManager接口,用於抽象和管理緩存;緩存則抽象爲Cache接口;而業務數據的CRUD操做,則由@CachePut/@Cacheable/@CacheEviet註解來進行配置後,由Cache接口下的各類實現類來處理。此外還有一些輔助類、配置類,因爲這裏是「簡介」,按下不表。

基本機制

顯然,spring-cache使用了基於註解的AOP機制。以@CachePut註解爲例,它的基本操做流程是這樣的:

 

其中,「獲取緩存實例Cache」就是由CacheManager接口負責的。這裏的「緩存實例」只是一個「邏輯」上的實例;在物理實現上,它多是同一個緩存中的不一樣命名空間、也可能確實是不一樣的物理緩存。

「將返回結果寫入緩存」,以及其它的緩存讀、寫操做,都由Cache接口來負責。

事務上下文中的問題

在事務上下文中,上面所說的「基本流程」是存在問題的:若是「寫緩存」操做成功、而數據庫事務回滾了,那麼緩存中就會出現一筆髒數據。以下圖所示:

 

這種場景下,咱們就須要把緩存操做綁定到數據庫事務上。


 

將操做綁定到數據庫事務上

spring-cache的相關實現

與JmsTemplate相似,Spring-cache提供了一個「綁定數據庫事務」的CacheManager實現類:AbstractTransactionSupportingCacheManager。不過,這個類只提供一個「是否綁定到數據庫事務上」的配置項(transactionAware),自身並不處理「綁定數據庫事務」這個操做。真正實現了「綁定」處理的,是AbstractTransactionSupportingCacheManager提供的Cache實現類:TransactionAwareCacheDecorator。這個類的put方法代碼以下:

TransactionAwareCacheDecorator 

public class TransactionAwareCacheDecorator implements Cache {

    private final Cache targetCache;

 

    @Override

    public void put(final Object key, final Object value) {

        // 判斷是否開啓了事務

        if (TransactionSynchronizationManager.isSynchronizationActive()) {

            // 將操做註冊到「afterCommit」階段

            TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {

                @Override

                public void afterCommit() {

                    targetCache.put(key, value);

                }

            });

        }

        else {

            this.targetCache.put(key, value);

        }

    }

    // 省略其它方法

}

 

AbstractTransactionSupportingCacheManager是基於「繼承」來提供TransactionAwareCacheDecorator。除了它以外,spring-cache還提供了一個基於「組合」的CacheManager實現類:TransactionAwareCacheManagerProxy。不過,後者本質上也要經過TransactionAwareCacheDecorator來實現所需功能。

TransactionSynchronizationManager和TransactionSynchronizationAdapter

TransactionSynchronizationManager中的代碼有點複雜。可是其功能能夠「一言以蔽之」:維護事務狀態。在這個類中有一系列的ThreadLocal類型的類變量,它們就負責存儲當前線程中的事務數據。相關代碼以下:

TransactionSynchronizationManager中的ThreadLocal 

private static final ThreadLocal<Map<Object, Object>> resources =

        new NamedThreadLocal<Map<Object, Object>>("Transactional resources");

// 關注點:事務相關操做的回調模板

private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =

        new NamedThreadLocal<Set<TransactionSynchronization>>("Transaction synchronizations");

private static final ThreadLocal<String> currentTransactionName =

        new NamedThreadLocal<String>("Current transaction name");

private static final ThreadLocal<Boolean> currentTransactionReadOnly =

        new NamedThreadLocal<Boolean>("Current transaction read-only status");

private static final ThreadLocal<Integer> currentTransactionIsolationLevel =

        new NamedThreadLocal<Integer>("Current transaction isolation level");

private static final ThreadLocal<Boolean> actualTransactionActive =

        new NamedThreadLocal<Boolean>("Actual transaction active");

這些類變量中,咱們須要關注的是synchronizations 。在TransactionAwareCacheDecorator中使用到的TransactionSynchronizationManager.isSynchronizationActive()、TransactionSynchronizationManager.registerSynchronization()和new TransactionSynchronizationAdapter(),都與它有關。

 

先看isSynchronizationActive()方法。它的代碼實現很是簡單,僅僅是判斷了synchronizations中是否有數據(Set<TransactionSynchronization>非null便可,並不要求其中有TransactionSynchronization實例)。之因此能夠這樣判斷,是由於Spring在開啓數據庫事務(不管是使用@Transactional註解,仍是用xml配置)時,都會向其中寫入一個實例,用於自動處理Connection的獲取、提交或回滾等操做。這個方法的代碼以下:

isSynchronizationActive() 

/**

 * Return if transaction synchronization is active for the current thread.

 * Can be called before register to avoid unnecessary instance creation.

 * @see #registerSynchronization

 */

public static boolean isSynchronizationActive() {

    return (synchronizations.get() != null);

}

 

再看registerSynchronization()方法。它其實也很是簡單:首先調用isSynchronizationActive()作一個校驗;而後將入參synchronization添加到synchronizations 中。入參synchronization中的方法不會在這裏執行,而是要等到事務執行到必定階段時纔會被調用。這個方法的代碼以下:

registerSynchronization() 

/**

 * Register a new transaction synchronization for the current thread.

 * Typically called by resource management code.

 * <p>Note that synchronizations can implement the

 * {@link org.springframework.core.Ordered} interface.

 * They will be executed in an order according to their order value (if any).

 * @param synchronization the synchronization object to register

 * @throws IllegalStateException if transaction synchronization is not active

 * @see org.springframework.core.Ordered

 */

public static void registerSynchronization(TransactionSynchronization synchronization)

        throws IllegalStateException {

    Assert.notNull(synchronization, "TransactionSynchronization must not be null");

    if (!isSynchronizationActive()) {

        throw new IllegalStateException("Transaction synchronization is not active");

    }

    synchronizations.get().add(synchronization);

}

 

比較複雜的是TransactionSynchronizationAdapter類。在進入這個類以前,咱們得先看看TransactionSynchronization接口。

TransactionSynchronization接口定義了一系列的回調方法,對應一個事務執行的不一樣階段:掛起、恢復、flush、提交(前、後)、完成(事務成功或失敗)等。當事務運行到對應階段時,事務管理器會從TransactionSynchronizationManager維護的synchronizations中拿出全部的回調器,逐個回調其中的對應方法。這個接口的代碼以下:

TransactionSynchronization 

/**

 * Interface for transaction synchronization callbacks.

 * Supported by AbstractPlatformTransactionManager.

 *

 * <p>TransactionSynchronization implementations can implement the Ordered interface

 * to influence their execution order. A synchronization that does not implement the

 * Ordered interface is appended to the end of the synchronization chain.

 *

 * <p>System synchronizations performed by Spring itself use specific order values,

 * allowing for fine-grained interaction with their execution order (if necessary).

 *

 * @author Juergen Hoeller

 * @since 02.06.2003

 * @see TransactionSynchronizationManager

 * @see AbstractPlatformTransactionManager

 * @see org.springframework.jdbc.datasource.DataSourceUtils#CONNECTION_SYNCHRONIZATION_ORDER

 */

public interface TransactionSynchronization extends Flushable {

    /** Completion status in case of proper commit */

    int STATUS_COMMITTED = 0;

    /** Completion status in case of proper rollback */

    int STATUS_ROLLED_BACK = 1;

    /** Completion status in case of heuristic mixed completion or system errors */

    int STATUS_UNKNOWN = 2;

 

    /**

     * Suspend this synchronization.

     * Supposed to unbind resources from TransactionSynchronizationManager if managing any.

     * @see TransactionSynchronizationManager#unbindResource

     */

    void suspend();

    /**

     * Resume this synchronization.

     * Supposed to rebind resources to TransactionSynchronizationManager if managing any.

     * @see TransactionSynchronizationManager#bindResource

     */

    void resume();

    /**

     * Flush the underlying session to the datastore, if applicable:

     * for example, a Hibernate/JPA session.

     * @see org.springframework.transaction.TransactionStatus#flush()

     */

    @Override

    void flush();

    /**

     * Invoked before transaction commit (before "beforeCompletion").

     * Can e.g. flush transactional O/R Mapping sessions to the database.

     * <p>This callback does <i>not</i> mean that the transaction will actually be committed.

     * A rollback decision can still occur after this method has been called. This callback

     * is rather meant to perform work that's only relevant if a commit still has a chance

     * to happen, such as flushing SQL statements to the database.

     * <p>Note that exceptions will get propagated to the commit caller and cause a

     * rollback of the transaction.

     * @param readOnly whether the transaction is defined as read-only transaction

     * @throws RuntimeException in case of errors; will be <b>propagated to the caller</b>

     * (note: do not throw TransactionException subclasses here!)

     * @see #beforeCompletion

     */

    void beforeCommit(boolean readOnly);

    /**

     * Invoked before transaction commit/rollback.

     * Can perform resource cleanup <i>before</i> transaction completion.

     * <p>This method will be invoked after {@code beforeCommit}, even when

     * {@code beforeCommit} threw an exception. This callback allows for

     * closing resources before transaction completion, for any outcome.

     * @throws RuntimeException in case of errors; will be <b>logged but not propagated</b>

     * (note: do not throw TransactionException subclasses here!)

     * @see #beforeCommit

     * @see #afterCompletion

     */

    void beforeCompletion();

    /**

     * Invoked after transaction commit. Can perform further operations right

     * <i>after</i> the main transaction has <i>successfully</i> committed.

     * <p>Can e.g. commit further operations that are supposed to follow on a successful

     * commit of the main transaction, like confirmation messages or emails.

     * <p><b>NOTE:</b> The transaction will have been committed already, but the

     * transactional resources might still be active and accessible. As a consequence,

     * any data access code triggered at this point will still "participate" in the

     * original transaction, allowing to perform some cleanup (with no commit following

     * anymore!), unless it explicitly declares that it needs to run in a separate

     * transaction. Hence: <b>Use {@code PROPAGATION_REQUIRES_NEW} for any

     * transactional operation that is called from here.</b>

     * @throws RuntimeException in case of errors; will be <b>propagated to the caller</b>

     * (note: do not throw TransactionException subclasses here!)

     */

    void afterCommit();

    /**

     * Invoked after transaction commit/rollback.

     * Can perform resource cleanup <i>after</i> transaction completion.

     * <p><b>NOTE:</b> The transaction will have been committed or rolled back already,

     * but the transactional resources might still be active and accessible. As a

     * consequence, any data access code triggered at this point will still "participate"

     * in the original transaction, allowing to perform some cleanup (with no commit

     * following anymore!), unless it explicitly declares that it needs to run in a

     * separate transaction. Hence: <b>Use {@code PROPAGATION_REQUIRES_NEW}

     * for any transactional operation that is called from here.</b>

     * @param status completion status according to the {@code STATUS_*} constants

     * @throws RuntimeException in case of errors; will be <b>logged but not propagated</b>

     * (note: do not throw TransactionException subclasses here!)

     * @see #STATUS_COMMITTED

     * @see #STATUS_ROLLED_BACK

     * @see #STATUS_UNKNOWN

     * @see #beforeCompletion

     */

    void afterCompletion(int status);

}

 

TransactionSynchronizationAdapter顯然是一個適配器:它實現了TransactionSynchronization接口,併爲每個接口方法提供了一個空的實現。這類適配器的基本思想是:接口中定義了不少方法,然而業務代碼每每只須要實現其中一小部分。利用這種「空實現」適配器,咱們能夠專一於業務上須要處理的回調方法,而不用在業務類中放大量並且重複的空方法。

TransactionSynchronizationAdapter類的代碼以下:

TransactionSynchronizationAdapter 

public abstract class TransactionSynchronizationAdapter implements TransactionSynchronization, Ordered {

    @Override

    public int getOrder() {

        return Ordered.LOWEST_PRECEDENCE;

    }

    @Override

    public void suspend() {

    }

    @Override

    public void resume() {

    }

    @Override

    public void flush() {

    }

    @Override

    public void beforeCommit(boolean readOnly) {

    }

    @Override

    public void beforeCompletion() {

    }

    @Override

    public void afterCommit() {

    }

    @Override

    public void afterCompletion(int status) {

    }

}

 

事務相關操做註冊與回調流程

說了這麼多,都是靜態的代碼,抽象而費解。這裏再提供一張流程圖(省略了一些與緩存操做不太相關的事務相關操做),但願能幫助你們更好的理解相關代碼和機制。

上圖與事務上下文中的問題相比,所謂「寫入」緩存操做實際並無真正去操做緩存,而僅僅是註冊了一個回調實例。直到數據庫事務執行到afterCommit階段時,這個回調實例纔會被調用,並真正地向緩存中寫入新的數據。

順帶一提,TransactionSynchronization中沒有afterRollback()。若是須要在事務回滾後作某些處理,須要在afterCompletion(int)方法中判斷入參的值,而後再作處理。


 

其它應用

「綁定到數據庫事務上」這一功能,除了JmsTemplate、Cache操做中能夠用到以外,在一些弱/最終一致性分佈式事務中也有應用。如TCC模型中,業務代碼中只調用Try服務,而在afterCommit或afterCompletion中處理Commit或Cancel服務。兩階段也是相似地在"afterRollback"中去調用第二階段的回滾服務。

相關文章
相關標籤/搜索