Hasor JDBC 的難關,嵌套事務處理思路

    本文存屬提醒我本身不要忘記的事情。也是向你們展現 Hasor 對於 JDBC 方面即將的又一個重大的進步。目前該方案還在實施中。 java

    前段時間閒着沒事分析了下 Spring JDBC ,以爲 Spring JDBC 的設計實在是太絕了,因而就拷貝了 Spring JDBC 的關鍵接口,而後開始了遷移工做,最後 Hasor - JDBC 問世。 數據庫

    但是 Hasor JDBC 至今仍有一個重大問題沒有搞定,那就是事務控制。 安全

    雖然能夠經過暴露 Connection 簡單的加裝一個 Aop 攔截器在配合 @Tar... 註解能夠完成任務。可是我以爲我有點完美主義了。最近腦殼裏一直都是 Spring 那套事務控制體系,我有種衝動在 Hasor 中從新實現這一套事務控制體系。 多線程

    簡介一下 Spring 事務方面的內容,Spring 對於事務方面支持 7種事務傳播屬性。我用這個接口表示它們: 工具

/**
 * 事務傳播屬性
 * @version : 2013-10-30
 * @author 趙永春(zyc@hasor.net)
 */
public enum TransactionBehavior {
    /**
     * 加入已有事務
     * <p><i><b>釋意</b></i>:嘗試加入已經存在的事務中,若是沒有則開啓一個新的事務。*/
    PROPAGATION_REQUIRED,
    /**
     * 獨立事務
     * <p><i><b>釋意</b></i>:將掛起當前存在的事務,而後開啓一個獨立的事務進行處理(若是存在的話)。
     * 而且開啓一個全新的事務,新事務與已存在的事務之間彼此沒有關係。*/
    RROPAGATION_REQUIRES_NEW,
    /**
     * 嵌套事務
     * <p><i><b>釋意</b></i>:在當前事務中開啓一個子事務。
     * 若是事務回滾將連同上一級事務一同回滾(當主事務提交或回滾,子事務也會提交或回滾)
     * <p><i><b>注意</b></i>:須要驅動支持保存點。*/
    PROPAGATION_NESTED,
    /**
     * 跟隨環境
     * <p><i><b>釋意</b></i>:若是當前沒有事務存在,就以非事務方式執行;若是有,就使用當前事務。*/
    PROPAGATION_SUPPORTS,
    /**
     * 非事務方式
     * <p><i><b>釋意</b></i>:若是當前沒有事務存在,就以非事務方式執行;若是有,就將當前事務掛起。
     * */
    PROPAGATION_NOT_SUPPORTED,
    /**
     * 排除事務
     * <p><i><b>釋意</b></i>:若是當前沒有事務存在,就以非事務方式執行;若是有,就拋出異常。*/
    PROPAGATION_NEVER,
    /**
     * 強制要求事務
     * <p><i><b>釋意</b></i>:若是當前沒有事務存在,就拋出異常;若是有,就使用當前事務。*/
    PROPAGATION_MANDATORY,
}

    因爲分析過 Spring 有關事務控制部分的代碼,所以着手實現起來細節問題倒不是很難。Hasor 目前遇到的問題是結構設計上的難題。 this

    首先 Hasor-JDBC 是一個幾乎是徹底獨立的項目,甚至它都不須要 Hasor-Core 的支持。這就意味着,Hasor-JDBC 是獨立的。 spa

    其次我在設計 Hasor 時候一直保持着,穩定依賴原則,包與包之間的依賴徹底隔離。這也爲設計 Hasor-JDBC 的結構提出了要求。 .net

    之因此這樣的原因是這樣的,首先我在設計 Hasor-JDBC 時候並不想像 Spring JDBC 那樣,讓 JdbcTemplate 部分和事務控制部分產生代碼依賴。所以須要拆解它們。 插件

    正由於如此 Hasor-JDBC 在 v0.0.1 版本時能夠率先發布 JdbcTemplate 部分功能。而事務控制則能夠交給插件體系完成。 線程

    這樣一來 Hasor 的鬆散設計會讓 Hasor 穩定不少,萬一事務控制過於複雜,開發者能夠有選擇的關閉這個插件,從而避免相關邏輯代碼判斷提升運行效率。而這一切在 Spring JDBC 中是不可能的,Spring JDBC 在二者之間有着一些代碼依賴。

    爲了達到這樣的目的,我爲 Hasor-JDBC 創建了一個 DataSourceUtils。經過它的靜態方法 申請/釋放 Connection 對象。這樣一來 JDBC 數據庫操做部分就和事務徹底隔離開了。

    事務控制部分和JDBC 操做部分之間只須要經過 DataSourceUtils 上註冊的 DataSourceHelper 進行耦合。

    默認狀況下提供一個基於線程綁定的 DataSourceHelper 工具類,事務控制能夠擴展這個類從新註冊它。

------------------------------

    我先把負責實現上面 7 個事務傳播屬性的關鍵代碼貼上來分享給你們,因爲代碼約有300行,這部分代碼在本文最後奉獻上,它這個類在 Hasor 中算是比較龐大的了,你們能夠先看一下後面要介紹的實現原理而後在看關鍵代碼。

   首先爲了支持多數據源下的嵌套事務管理,事務管理器是隻針對一個數據源的。

   其次,因爲事務能夠嵌套,所以須要一個「事務棧」先進後出的原則處理每個事務請求。這是因爲考慮到事務「原子性」的問題才這樣設計的。

    比方說:若是連續開啓了 3個事務。當遞交第一個事務時,不管後面兩個事務是否已經遞交都須要遞交。回滾也是如此。至於爲何必定要使用「事務棧」的先進後出去實現,其主要緣由是事務可能位於多個 Connection 中的緣故(詳見事務傳播屬性)。

    此外還有掛起和恢復事務,這須要與線程綁定。

    中和起來設計這個事務控制方案仍是比較棘手的,不過能夠藉助下面這張表述事務鏈的圖來解釋。

    藉助 AOP 思想,若是發生嵌套事務就爲每一層事務建立一個事務狀態,而後將事務狀態放入「事務棧「。

    因爲事務是和線程綁定的,這就能夠保證事務在多線程下的調用安全,不會發生跨線程問題。

    位於事務棧中非頂端事務若是出現 commit/rollback 時,能夠藉助事務棧完成原子操做。

    事務狀態中須要保存具體操做數據庫的那個 JDBC Connection接口。

    每次建立事務狀態時,若是是新申請的數據庫鏈接,那麼就設置其一個 NewConn 標誌。這個標誌能夠用於處理嵌套事務中遞交時不是將整個事務遞交而是遞交一個事務保存點。

    若是傳播屬性要求的是獨立事務,那麼能夠將當前事務的Connection 保存起來,而後從新申請一個再次綁定到線程上。已完成傳播屬性要求,當這個獨立事務處理完成以後,在將保存的 Connection 從新與當前線程綁定。

    若是是跟隨環境的事務傳播屬性,則整個事務控制能夠什麼都不作,若是是不須要事務則能夠經過判斷當前鏈接是否爲 autoCommit 來進行後續處理。

    上面是分析 Spring 事務控制時關鍵點的實現策略,下面是 我在 Hasor 中依照這個思想設計的事務管理器關鍵代碼,因爲是半成品。下面這段代碼只能用於展現具體處理每個不一樣傳播屬性時的細節。它還須要和整個 Hasor-JDBC 事務控制體系串起來才能夠運行,如今和你們分享它們:

/**
 * 某一個數據源的事務管理器
 * 
 * <p><b><i>事務棧:</i></b>
 * <p>事務管理器容許使用不一樣的傳播屬性反覆開啓新的事務。全部被開啓的事務在正確處置(commit,rollback)
 * 它們以前都會按照前後順序依次壓入事務管理器的「事務棧」中。一旦有事務被處理(commit,rollback)這個事務纔會被從事務棧中彈出。
 * <p>假若被彈出的事務(A)並非棧頂的事務,那麼在事務(A)被處理(commit,rollback)時會優先處理自事務(A)之後開啓的其它事務。
 * 
 * @version : 2013-10-30
 * @author 趙永春(zyc@hasor.net)
 */
public abstract class AbstractPlatformTransactionManager implements TransactionManager {
    private int                           defaultTimeout = -1;
    private LinkedList<TransactionStatus> tStatusStack   = new LinkedList<TransactionStatus>();
    
    public boolean hasTransaction() {
        return !tStatusStack.isEmpty();
    }
    public boolean isTopTransaction(TransactionStatus status) {
        if (tStatusStack.isEmpty())
            return false;
        return this.tStatusStack.peek() == status;
    }
    /**開啓事務*/
    public final TransactionStatus getTransaction(TransactionBehavior behavior) throws TransactionDataAccessException {
        Hasor.assertIsNotNull(behavior);
        return getTransaction(behavior, TransactionLevel.ISOLATION_DEFAULT);
    };
    public final TransactionStatus getTransaction(TransactionBehavior behavior, TransactionLevel level) throws TransactionDataAccessException {
        Hasor.assertIsNotNull(behavior);
        Hasor.assertIsNotNull(level);
        Object transaction = doGetTransaction();獲取目前事務對象
        AbstractTransactionStatus defStatus = null;TODO new AbstractTransactionStatus(behavior, level, transaction);
        /*-------------------------------------------------------------
        |                      環境已經存在事務
        |
        | PROPAGATION_REQUIRED     :加入已有事務(不處理)
        | RROPAGATION_REQUIRES_NEW :獨立事務(掛起當前事務,開啓新事務)
        | PROPAGATION_NESTED       :嵌套事務(設置保存點)
        | PROPAGATION_SUPPORTS     :跟隨環境(不處理)
        | PROPAGATION_NOT_SUPPORTED:非事務方式(僅掛起當前事務)
        | PROPAGATION_NEVER        :排除事務(異常)
        | PROPAGATION_MANDATORY    :強制要求事務(不處理)
        ===============================================================*/
        if (this.isExistingTransaction(transaction) == true) {
            /*RROPAGATION_REQUIRES_NEW:獨立事務*/
            if (behavior == RROPAGATION_REQUIRES_NEW) {
                this.suspend(transaction, defStatus);/*掛起當前事務*/
                this.processBegin(transaction, defStatus);/*開啓一個新的事務*/
            }
            /*PROPAGATION_NESTED:嵌套事務*/
            if (behavior == PROPAGATION_NESTED) {
                defStatus.markHeldSavepoint();/*設置保存點*/
            }
            /*PROPAGATION_NOT_SUPPORTED:非事務方式*/
            if (behavior == PROPAGATION_NOT_SUPPORTED) {
                this.suspend(transaction, defStatus);/*掛起當前事務*/
            }
            /*PROPAGATION_NEVER:排除事務*/
            if (behavior == PROPAGATION_NEVER)
                throw new IllegalTransactionStateException("Existing transaction found for transaction marked with propagation 'never'");
            return defStatus;
        }
        /*-------------------------------------------------------------
        |                      環境不經存在事務
        |
        | PROPAGATION_REQUIRED     :加入已有事務(開啓新事務)
        | RROPAGATION_REQUIRES_NEW :獨立事務(開啓新事務)
        | PROPAGATION_NESTED       :嵌套事務(開啓新事務)
        | PROPAGATION_SUPPORTS     :跟隨環境(不處理)
        | PROPAGATION_NOT_SUPPORTED:非事務方式(不處理)
        | PROPAGATION_NEVER        :排除事務(不處理)
        | PROPAGATION_MANDATORY    :強制要求事務(異常)
        ===============================================================*/
        /*PROPAGATION_REQUIRED:加入已有事務*/
        if (behavior == PROPAGATION_REQUIRED ||
        /*RROPAGATION_REQUIRES_NEW:獨立事務*/
        behavior == RROPAGATION_REQUIRES_NEW ||
        /*PROPAGATION_NESTED:嵌套事務*/
        behavior == PROPAGATION_NESTED) {
            this.processBegin(transaction, defStatus);/*開啓事務*/
        }
        /*PROPAGATION_MANDATORY:強制要求事務*/
        if (behavior == PROPAGATION_MANDATORY)
            throw new IllegalTransactionStateException("No existing transaction found for transaction marked with propagation 'mandatory'");
        return defStatus;
    }
    /**使用一個新的鏈接開啓一個新的事務做爲當前事務。請確保在調用該方法時候當前不存在事務。*/
    private void processBegin(Object transaction, AbstractTransactionStatus defStatus) {
        try {
            doBegin(transaction, defStatus);
            this.tStatusStack.push(defStatus);/*入棧*/
        } catch (SQLException ex) {
            throw new TransactionDataAccessException("SQL Exception :", ex);
        }
    }
    /**判斷當前事務對象是否已經處於事務中。該方法會用於評估事務傳播屬性的處理方式。*/
    protected abstract boolean isExistingTransaction(Object transaction);
    /**在當前鏈接上開啓一個全新的事務*/
    protected abstract void doBegin(Object transaction, AbstractTransactionStatus defStatus) throws SQLException;
    
    /**遞交事務*/
    public final void commit(TransactionStatus status) throws TransactionDataAccessException {
        Object transaction = doGetTransaction();獲取底層維護的當前事務對象
        AbstractTransactionStatus defStatus = (AbstractTransactionStatus) status;
        /*已完畢,不須要處理*/
        if (defStatus.isCompleted())
            throw new IllegalTransactionStateException("Transaction is already completed - do not call commit or rollback more than once per transaction");
        /*回滾狀況*/
        if (defStatus.isRollbackOnly()) {
            if (Hasor.isDebugLogger())
                Hasor.logDebug("Transactional code has requested rollback");
            rollBack(defStatus);
            return;
        }
        /*-------------------------------------------------------------
        | 1.不管何種傳播形式,遞交事務操做都會將 isCompleted 屬性置爲 true。
        | 2.若是事務狀態中包含一個未處理的保存點。僅遞交保存點,而非遞交整個事務。
        | 3.事務 isNew 只有爲 true 時才真正觸發遞交事務操做。
        ===============================================================*/
        try {
            prepareCommit(defStatus);
            /*若是包含保存點,在遞交事務時只處理保存點*/
            if (defStatus.hasSavepoint())
                defStatus.releaseHeldSavepoint();
            else if (defStatus.isNewConnection())
                doCommit(transaction, defStatus);
            
        } catch (SQLException ex) {
            rollBack(defStatus);/*遞交失敗,回滾*/
            throw new TransactionDataAccessException("SQL Exception :", ex);
        } finally {
            cleanupAfterCompletion(defStatus);
        }
    }
    /**遞交前的預處理*/
    private void prepareCommit(AbstractTransactionStatus defStatus) {
        /*首先預處理的事務必須存在於管理器的事務棧內某一位置中,不然要處理的事務並不是來源於該事務管理器。*/
        if (this.tStatusStack.contains(defStatus) == false)
            throw new IllegalTransactionStateException("This transaction is not derived from this Manager.");
        /*-------------------------------------------------------------
        | 若是預處理的事務並不是位於棧頂,則進行彈棧操做。
        |--------------------------\
        | T5  ^   <-- pop-up       | 假定預處理的事務爲 T4,那麼:
        | T4  ^   <-- pop-up       | T5 事務會被先遞交,而後是 T4
        | T3  .   <-- defStatus    | 接下來就完成了預處理。
        | T2                       |
        | T1                       |
        |--------------------------/
        |
        ===============================================================*/
        
        TransactionStatus inStackStatus = null;
        while ((inStackStatus = this.tStatusStack.peek()) != defStatus)
            this.commit(inStackStatus);
    }
    /**處理當前底層數據庫鏈接的事務遞交操做。*/
    protected abstract void doCommit(Object transaction, AbstractTransactionStatus defStatus) throws SQLException;
    
    /**回滾事務*/
    public final void rollBack(TransactionStatus status) throws TransactionDataAccessException {
        Object transaction = doGetTransaction();獲取目前事務對象
        AbstractTransactionStatus defStatus = (AbstractTransactionStatus) status;
        /*已完畢,不須要處理*/
        if (defStatus.isCompleted())
            throw new IllegalTransactionStateException("Transaction is already completed - do not call commit or rollback more than once per transaction");
        /*-------------------------------------------------------------
        | 1.不管何種傳播形式,遞交事務操做都會將 isCompleted 屬性置爲 true。
        | 2.若是事務狀態中包含一個未處理的保存點。僅回滾保存點,而非回滾整個事務。
        | 3.事務 isNew 只有爲 true 時才真正觸發回滾事務操做。
        ===============================================================*/
        try {
            prepareRollback(defStatus);
            /*若是包含保存點,在遞交事務時只處理保存點*/
            if (defStatus.hasSavepoint())
                defStatus.rollbackToHeldSavepoint();
            else if (defStatus.isNewConnection())
                doRollback(transaction, defStatus);
            
        } catch (SQLException ex) {
            throw new TransactionDataAccessException("SQL Exception :", ex);
        } finally {
            cleanupAfterCompletion(defStatus);
        }
    }
    /**回滾前的預處理*/
    private void prepareRollback(AbstractTransactionStatus defStatus) {
        /*首先預處理的事務必須存在於管理器的事務棧內某一位置中,不然要處理的事務並不是來源於該事務管理器。*/
        if (this.tStatusStack.contains(defStatus) == false)
            throw new IllegalTransactionStateException("This transaction is not derived from this Manager.");
        /*-------------------------------------------------------------
        | 若是預處理的事務並不是位於棧頂,則進行彈棧操做。
        |--------------------------\
        | T5  ^   <-- pop-up       | 假定預處理的事務爲 T4,那麼:
        | T4  ^   <-- pop-up       | T5 事務會被先回滾,而後是 T4
        | T3  .   <-- defStatus    | 接下來就完成了預處理。
        | T2                       |
        | T1                       |
        |--------------------------/
        |
        ===============================================================*/
        
        TransactionStatus inStackStatus = null;
        while ((inStackStatus = this.tStatusStack.peek()) != defStatus)
            this.rollBack(inStackStatus);
    }
    /**處理當前底層數據庫鏈接的事務回滾操做。*/
    protected abstract void doRollback(Object transaction, AbstractTransactionStatus defStatus) throws SQLException;
    
    private static class SuspendedTransactionHolder {
        public Object transaction = null; /*掛起的底層事務對象*/
    }
    /**掛起當前事務*/
    protected final void suspend(Object transaction, AbstractTransactionStatus defStatus) {
        try {
            /*檢查事務是否爲棧頂事務*/
            prepareCheckStack(defStatus);
            /*建立 SuspendedTransactionHolder 對象,用於保存當前底層數據庫鏈接以及事務對象*/
            doSuspend(transaction, defStatus);
            SuspendedTransactionHolder suspendedHolder = new SuspendedTransactionHolder();
            suspendedHolder.transaction = transaction;/*掛起的事務對象(來自於底層)*/
            defStatus.setSuspendHolder(suspendedHolder);
            
        } catch (SQLException ex) {
            throw new TransactionDataAccessException("SQL Exception :", ex);
        }
    }
    /**掛起事務,子類須要重寫該方法掛起transaction事務,並同時清空底層當前數據庫鏈接,*/
    protected void doSuspend(Object transaction, AbstractTransactionStatus defStatus) throws SQLException {
        throw new TransactionSuspensionNotSupportedException("Transaction manager [" + getClass().getName() + "] does not support transaction suspension");
    }
    /**恢復被掛起的事務,恢復掛起的事務時必須是當前事務請妥善處理當前事務以後在恢復掛起的事務*/
    protected final void resume(Object transaction, AbstractTransactionStatus defStatus) {
        if (defStatus.isCompleted() == false)
            throw new IllegalTransactionStateException("the Transaction has not completed.");
        try {
            /*檢查事務是否爲棧頂事務*/
            prepareCheckStack(defStatus);
            SuspendedTransactionHolder suspendedHolder = (SuspendedTransactionHolder) defStatus.getSuspendedTransactionHolder();
            doResume(suspendedHolder.transaction, defStatus);
        } catch (SQLException ex) {
            throw new TransactionDataAccessException("SQL Exception :", ex);
        }
    }
    /**恢復事務,恢復本來掛起的事務(第一個參數),並使用掛起的狀態恢復當前數據庫鏈接。*/
    protected void doResume(Object resumeTransaction, AbstractTransactionStatus defStatus) throws SQLException {
        throw new TransactionSuspensionNotSupportedException("Transaction manager [" + getClass().getName() + "] does not support transaction suspension");
    }
    
    /**檢查正在處理的事務狀態是否位於棧頂,不然拋出異常*/
    private void prepareCheckStack(AbstractTransactionStatus defStatus) {
        if (!this.isTopTransaction(defStatus))
            throw new IllegalTransactionStateException("the Transaction Status is not top in stack.");
    }
    /**commit,rollback。以後的清理工做,同時也負責恢復事務和操做事務堆棧。*/
    private void cleanupAfterCompletion(AbstractTransactionStatus defStatus) {
        /*清理的事務必須是位於棧頂*/
        prepareCheckStack(defStatus);
        /*標記完成*/
        defStatus.setCompleted();
        /*恢復掛起的事務*/
        if (defStatus.getSuspendedTransactionHolder() != null) {
            if (Hasor.isDebugLogger())
                Hasor.logDebug("Resuming suspended transaction after completion of inner transaction");
            resume(defStatus.getSuspendedTransactionHolder(), defStatus);
        }
    }
    /**獲取當前事務管理器中存在的事務對象。*/
    protected abstract Object doGetTransaction();
}

在最後鏈接一下 @黃勇 的Blog,他這裏有一篇文章詳細介紹了 Spring 事務傳播屬性:http://my.oschina.net/huangyong/blog/160012

相關文章
相關標籤/搜索