【分佈式事務系列二】Spring事務管理器PlatformTransactionManager

#0 系列目錄#sql

#1 jdbc事務# ##1.1 例子##數據庫

public void save(User user) throws SQLException{
    Connection conn=jdbcDao.getConnection();
    conn.setAutoCommit(false);
    try {
        PreparedStatement ps=conn.prepareStatement("insert into user(name,age) value(?,?)");
        ps.setString(1,user.getName());
        ps.setInt(2,user.getAge());
        ps.execute();
        conn.commit();
    } catch (Exception e) {
        e.printStackTrace();
        conn.rollback();
    }finally{
        conn.close();
    }
}

##1.2 分析##session

  1. 怎麼使用事務

將自動提交設置爲false,即conn.setAutoCommit(false),而後手動來conn.commit()、或者conn.rollback()。分佈式

  1. 一個重要意識

conn.commit();conn.rollback()等這些屬於事務代碼,其餘執行sql的代碼屬於業務代碼。源碼分析

只有事務代碼和業務代碼使用的是同一個Connection的時候,事務的回滾和提交才能正常執行,因此若是咱們要實現事務代碼和業務代碼的分離,必需要保證他們使用的是同一個Connection。this

  1. 誰在執行事務

咱們能夠看到,能夠經過操做Connection鏈接來執行事務,這並不表明Connection具備事務功能,而是使用了數據庫自身的事務功能,Connection僅僅是把一些命令如commit、rollback傳遞給數據庫.net

##1.3 存在的問題##hibernate

  1. 業務代碼都要嵌套在try catch事務模板代碼中。線程

  2. 當存在多個相似save(User user)的業務邏輯時,無法保證他們的原子性:debug

login(user); save(user);

這兩個業務邏輯都是類似的代碼,獲取Connection鏈接,而後執行sql語句。無法保證它們的原子性,是由於它們使用的不是同一個鏈接,不在同一個事務內

#2 Hibernate的事務# ##2.1 例子##

public void save(User user){
    Session session=hibernateDao.openSession();
    Transaction tx=null;
    try {
        tx=session.beginTransaction();  
        session.save(user);  
        tx.commit();  
    } catch (Exception e) {
        if(tx!=null){
            tx.rollback();
        }
    }finally{
        session.close();
    }
}

##2.2 分析##

  1. 事務功能和業務功能的分離

jdbc事務中Connection負擔了兩方面的功能,事務功能和執行sql的功能。這裏的Transaction是Hibernate本身定義的事務,Hibernate則把這二者的功能單獨開來,將事務功能交給了Transaction,使得職責更加分工明確。

  1. 事務的原理

其實Session、Transaction內部會有一個相同的Connection,這樣就保證了業務代碼和事務代碼使用的是同一個Connection,Transaction事務的回滾都是依託內部的Connection來完成的,以下:

  • 事務的開始,設置自動提交爲false

輸入圖片說明

  • 事務的提交,經過connection的commit方法來提交

輸入圖片說明

#3 Spring事務功能的整體接口設計# 因爲上述各家實現事務功能的方式各不相同,Spring進行了統一的抽象,造成了PlatformTransactionManager事務管理器接口,事務的提交、回滾等操做所有交給它來實現。Spring的事務體系也是在PlatformTransactionManager事務管理器接口上開展開來的,因此先來了解下PlatformTransactionManager事務管理器。

##3.1 事務功能的整體接口設計## 先來看下三大接口:

PlatformTransactionManager:事務管理器;

TransactionDefinition:事務的一些基礎信息,如超時時間、隔離級別、傳播屬性等;

TransactionStatus:事務的一些狀態信息,如是不是一個新的事務、是否已被標記爲回滾;

看下PlatformTransactionManager如何來操做事務:

public interface PlatformTransactionManager {

    //根據事務定義TransactionDefinition,獲取事務
    TransactionStatus getTransaction(TransactionDefinition definition);

    //提交事務
    void commit(TransactionStatus status);

    //回滾事務
    void rollback(TransactionStatus status);
}

##3.2 接口對應的實現## ###3.2.1 事務定義接口TransactionDefinition### 輸入圖片說明

  • 紅線上方是一些常量定義(事務的隔離級別和事務的傳播屬性,具體再也不說,網上一大堆)
  • 事務的定義包含:事務的隔離級別、事務的傳播屬性、超時時間設置、是否只讀

**要明白的地方:**事務的隔離級別是數據庫自己的事務功能,然而事務的傳播屬性則是Spring本身爲咱們提供的功能,數據庫事務沒有事務的傳播屬性這一說法。

該接口的實現DefaultTransactionDefinition:默認的事務定義

public class DefaultTransactionDefinition implements TransactionDefinition, Serializable {
    private int propagationBehavior = PROPAGATION_REQUIRED;
    private int isolationLevel = ISOLATION_DEFAULT;
    private int timeout = TIMEOUT_DEFAULT;
    private boolean readOnly = false;
    //略
}

事務的傳播屬性爲PROPAGATION_REQUIRED,即當前沒有事務的時候,建立一個,若是有則使用當前事務

事務的隔離級別採用底層數據庫默認的隔離級別

超時時間採用底層數據庫默認的超時時間

是否只讀爲false

###3.2.2 事務接口定義TransactionStatus### 先引出Connection鏈接中的保存點功能:

//建立一個保存點
conn.setSavepoint(name);
//回滾到某個保存點
conn.rollback(savepoint);
//釋放某個保存點
conn.releaseSavepoint(savepoint);

TransactionStatus它繼承了SavepointManager接口,SavepointManager是對事務中上述保存點功能的封裝,以下:

public interface SavepointManager {
    Object createSavepoint() throws TransactionException;
    void rollbackToSavepoint(Object savepoint) throws TransactionException;
    void releaseSavepoint(Object savepoint) throws TransactionException;
}

Spring利用保存點功能實現了事務的嵌套功能。後面會詳細說明。TransactionStatus自己更多存儲的是事務的一些狀態信息:

輸入圖片說明

是不是一個新的事務

是否有保存點

是否已被標記爲回滾

經常使用的TransactionStatus接口實現爲DefaultTransactionStatus:

輸入圖片說明

目前jdbc事務是經過Connection來實現事務的,Hibernate是經過它本身定義的Transaction來實現的,因此各家的事務都不一樣,因此Spring只能以Object transaction的形式來表示各家的事務,事務的回滾和提交等操做都會最終委託給上述Object transaction來完成。

Object transaction的職責就是提交回滾事務,這個transaction的選擇可能以下:

DataSourceTransactionObject

HibernateTransactionObject

JpaTransactionObject(以後再詳細說)

詳細信息分別以下:

  • 對於DataSourceTransactionObject:

咱們使用了dataSource來獲取鏈接,要想實現事務功能,必然須要使用Connection,因此它中確定有一個Connection來執行事務的操做。

DataSourceTransactionObject中有一個ConnectionHolder,它封裝了一個Connection。

  • 對於HibernateTransactionObject:

咱們使用了hibenrate,此時要想實現事務功能,必然須要經過hibernate本身定義的Transaction來實現。

HibernateTransactionObject中含有一個SessionHolder,和上面的ConnectionHolder同樣,它封裝了一個Session,有了Session,咱們就能夠經過Session來產生一個Hibernate的Transaction,從而實現事務操做。

###3.2.3 事務管理器接口定義PlatformTransactionManager### 類圖關係以下:

輸入圖片說明

重點來講下:

  • AbstractPlatformTransactionManager
  • DataSourceTransactionManager
  • HibernateTransactionManager
  • JpaTransactionManager(以後詳細再說)

這就須要來看看事務管理器的接口,上述的他們都是怎麼實現的:

  1. 第一個接口:TransactionStatus getTransaction(TransactionDefinition definition) 根據事務定義獲取事務狀態

大致內容就是先獲取上述說明的Object transaction,判斷當前事務是否已存在,若是存在則進行事務的傳播屬性處理,後面詳細說明,若是不存在new DefaultTransactionStatus,新建立一個事務,同時使用Object transaction開啓事務。 分紅了幾個過程:

1.1 獲取Object transaction:不一樣的事務管理器獲取不一樣的Object transaction。

  • DataSourceTransactionManager就是獲取上述的DataSourceTransactionObject

從當前線程中獲取綁定的ConnectionHolder,可能爲null,若是爲null,則會在下一個開啓事務的過程當中,從dataSource中獲取一個Connection,封裝成ConnectionHolder,而後再綁定到當前線程。

而後咱們new 一個DataSourceTransactionObject了,具體過程以下:

輸入圖片說明

  • HibernateTransactionManager獲取HibernateTransactionObject

從當前線程中獲取綁定的SessionHolder,可能爲null,若是爲null,則會在下一個開啓事務的過程當中從sessionFactory中獲取一個session,而後封裝成SessionHolder,而後再綁定到當前線程

而後咱們就能夠new 一個HibernateTransactionObject了,具體過程以下:

輸入圖片說明

1.2 構建DefaultTransactionStatus,使用Object transaction開啓事務

  • DataSourceTransactionManager的DataSourceTransactionObject開啓過程以下:

首先判斷以前的獲取當前線程綁定的ConnectionHolder是否爲null,若是爲null,從dataSource中獲取一個Connection,封裝成ConnectionHolder,而後再綁定到當前線程

由於開啓了一個事務,則必需要關閉DataSourceTransactionObject中Connection的自動提交,代碼以下(省略一些):

protected void doBegin(Object transaction, TransactionDefinition definition) {
     DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
     Connection con = null;

     //若是ConnectionHolder是否爲null,重新獲取
     if (txObject.getConnectionHolder() == null ||
             txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
         Connection newCon = this.dataSource.getConnection();
         txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
     }
     con = txObject.getConnectionHolder().getConnection();

     Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
     txObject.setPreviousIsolationLevel(previousIsolationLevel);

     // 取消自動提交
     if (con.getAutoCommit()) {
         txObject.setMustRestoreAutoCommit(true);
         if (logger.isDebugEnabled()) {
             logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
         }
         con.setAutoCommit(false);
     }
     txObject.getConnectionHolder().setTransactionActive(true);
     
     // 若是是新增的ConnectionHolder,則綁定到當前線程
     if (txObject.isNewConnectionHolder()) {
         TransactionSynchronizationManager.bindResource(getDataSource(), txObject.getConnectionHolder());
     }
  }
  • HibernateTransactionManager的HibernateTransactionObject開啓過程以下:

也是一樣的邏輯,若是SessionHolder爲null,則從SessionFactory中獲取一個Session,而後封裝成SessionHolder,而後把這個SessionHolder綁定到當前線程:

Session newSession = (entityInterceptor != null ?
              getSessionFactory().withOptions().interceptor(entityInterceptor).openSession() :
              getSessionFactory().openSession());
txObject.setSession(newSession);

同時,使用上述session開啓一個事務,把事務對象也保存到上述的SessionHolder中:

Transaction hibTx=session.beginTransaction();
txObject.getSessionHolder().setTransaction(hibTx);

若是是新建立的SessionHolder,則綁定到當前線程:

// Bind the session holder to the thread.
if (txObject.isNewSessionHolder()) {
    TransactionSynchronizationManager.bindResource(getSessionFactory(), txObject.getSessionHolder());
}
  1. 第二個接口:void rollback(TransactionStatus status) 回滾事務

回滾,則仍是利用DefaultTransactionStatus內部的Object transaction來執行回滾操做

  • DataSourceTransactionManager就是使用DataSourceTransactionObject中的Connection來進行回滾操做:
protected void doRollback(DefaultTransactionStatus status) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
    Connection con = txObject.getConnectionHolder().getConnection();
    try {
        con.rollback();
    } catch (SQLException ex) {
        throw new TransactionSystemException("Could not roll back JDBC transaction", ex);
    }
}
  • HibernateTransactionManager就是使用HibernateTransactionObject中的SessionHolder中的Session建立的事務Transaction來進行回滾操做:
protected void doRollback(DefaultTransactionStatus status) {
    HibernateTransactionObject txObject = (HibernateTransactionObject) status.getTransaction();
    try {
        txObject.getSessionHolder().getTransaction().rollback();
    }
}
  1. 第三個接口: void commit(TransactionStatus status) 提交事務

同理,DataSourceTransactionManager依託內部的Connection來完成提交操做;HibernateTransactionManager依託內部的Transaction來完成提交操做。

##3.3 事務的傳播屬性解析## 能夠參考這篇文章的案例說明Spring事務的傳播行爲和隔離級別,下面重點源碼分析下Spring的事務傳播屬性:

對於事務的傳播屬性的代碼以下:

輸入圖片說明

在獲取Object transaction以後,先進行判斷,是不是已存在的事務。由於這個Object transaction的獲取過程就是直接從線程綁定的獲取的,可能當前線程已經存在事務,具體判斷以下:

  • **DataSourceTransactionManager:**就是依據和當前線程綁定的ConnectionHolder中是否已存在事務。
protected boolean isExistingTransaction(Object transaction) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
    return (txObject.getConnectionHolder() != null && txObject.getConnectionHolder().isTransactionActive());
}
  • **HibernateTransactionManager:**也是依據和當前線程綁定的SessionHolder是否已存在事務。
public boolean hasSpringManagedTransaction() {
    return (this.sessionHolder != null && this.sessionHolder.getTransaction() != null);
}

若是是已存在事務:則須要對事務的傳播屬性進行處理,以下即上述截圖中的的handleExistingTransaction方法:

  1. PROPAGATION_NEVER:不容許存在事務,若是存在拋出異常
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
    throw new IllegalTransactionStateException(
          "Existing transaction found for transaction marked with propagation 'never'");
}
  1. PROPAGATION_NOT_SUPPORTED:不支持事務,若是存在事務,則需將事務掛起,保存起來,當執行完成以後,須要將掛起的事務繼續恢復
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
    if (debugEnabled) {
        logger.debug("Suspending current transaction");
    }
    Object suspendedResources = suspend(transaction);
    boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
    return prepareTransactionStatus(
          definition, null, false, newSynchronization, debugEnabled, suspendedResources);
}

掛起以後,產生一個Object transaction=null的事務,即不執行事務代碼,同時把掛起的資源信息傳遞給新建立的事務,當這個事務執行完成以後,再把掛起的資源恢復過來。

2.1 對於DataSourceTransactionManager來講,事務的掛起,就是把當前線程關聯的ConnectionHolder解除綁定:

protected Object doSuspend(Object transaction) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
    txObject.setConnectionHolder(null);
    ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.unbindResource(this.dataSource);
    return conHolder;
}

同理事務的恢復就是把上述ConnectionHolder再從新綁定到當前線程,繼續執行該事務。

2.2 對於HibernateTransactionManager來講,事務的掛起,就是把當前線程關聯的SessionHolder解除綁定

protected Object doSuspend(Object transaction) {
    HibernateTransactionObject txObject = (HibernateTransactionObject) transaction;
    txObject.setSessionHolder(null);
    SessionHolder sessionHolder =
         (SessionHolder) TransactionSynchronizationManager.unbindResource(getSessionFactory());
    txObject.setConnectionHolder(null);
    ConnectionHolder connectionHolder = null;
    if (getDataSource() != null) {
        connectionHolder = (ConnectionHolder) TransactionSynchronizationManager.unbindResource(getDataSource());
    }
    return new SuspendedResourcesHolder(sessionHolder, connectionHolder);
}

同理事務的恢復就是把上述SessionHolder再從新綁定到當前線程,繼續執行該事務。

  1. PROPAGATION_REQUIRES_NEW:開啓一個新的事務,若是當前存在事務則把當前事務掛起來
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
    SuspendedResourcesHolder suspendedResources = suspend(transaction);
    try {
        boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
        DefaultTransactionStatus status = newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
        doBegin(transaction, definition);
        prepareSynchronization(status, definition);
        return status;
    }
}

能夠看到,建立新的事務,就會調用doBegin(transaction, definition);方法,將事務開啓。

  1. PROPAGATION_NESTED : 原理不少人已詳細說明,能夠參考Spring事務的傳播行爲和隔離級別,源碼證明有待繼續研究

這裏使用了Object transaction來建立了SavePoint,仍舊使用原事務。

  1. PROPAGATION_SUPPORTS 和 PROPAGATION_REQUIRED : 若是當前存在事務,則仍舊使用該事物。
相關文章
相關標籤/搜索