單系統下的分佈式數據庫事務方案(拓展spring的事務管理器)AgileBPM多數據的解決方案

先推薦一下碼雲上的一個GVP(最有價值的開源項目) AgileBPM(下面簡稱ab),我下面講解的方案也是它的Bo支持多數據源操做事務管理器,友情連接:http://doc.agilebpm.cn/java

目前是解決的是處理單系統內的多數據源問題,簡單來講就是在單系統中的一個線程內,保護多個數據源事務,這也是ab項目所須要的場景。redis

參考了碼雲上的開源的lcn分佈式事務解決方案,以爲再拓展一下也是能夠解決微服務間的分佈式事務處理,利用redis放一個事務處理的共同空間,而後在共同空間內來統籌事務,不過它處理commit異常的問題也是用通用方式(commit失敗不少項目都是採起tcc的方式處理)。spring

ps:以前本人試過使用jta事務管理器,這個性能真看不下去。一會就卡。。因此就想着本身定義個管理器,本身來釋放資源。sql

1 用AbstractRoutingDataSource讓系統支持多數據源數據庫

動態數據源配置:緩存

 

真正的數據源(druid數據源):分佈式

展現一下DynamicDataSource是繼承了AbstractRoutingDataSource的實現,這裏不是重點。ide

2 實現支持這種路由數據源的事務管理器微服務

先繼承AbstractPlatformTransactionManager(事務管理器的抽象類,咱們很經常使用的DataSourceTransactionManager就是繼承它的)
性能

裏面須要實現幾個關鍵點就行(筆者只考慮了事務傳播性爲PROPAGATION_REQUIRED的狀況,這也是項目最經常使用的,其餘我沒支持,畢竟是定製化的事務管理器)

package com.dstz.bus.service.impl;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;

import javax.sql.DataSource;

import org.springframework.beans.factory.InitializingBean;
import org.springframework.jdbc.datasource.ConnectionHolder;
import org.springframework.jdbc.datasource.DataSourceUtils;
import org.springframework.transaction.CannotCreateTransactionException;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionException;
import org.springframework.transaction.TransactionSystemException;
import org.springframework.transaction.support.AbstractPlatformTransactionManager;
import org.springframework.transaction.support.DefaultTransactionStatus;
import org.springframework.transaction.support.ResourceTransactionManager;
import org.springframework.transaction.support.TransactionSynchronizationManager;

import com.dstz.base.core.util.AppUtil;
import com.dstz.base.core.util.ThreadMapUtil;
import com.dstz.base.db.datasource.DataSourceUtil;
import com.dstz.base.db.datasource.DbContextHolder;
import com.dstz.base.db.datasource.DynamicDataSource;

/**
 * <pre>
 * 描述:ab 結合sys多數據源操做 專門爲bo db實例化作的事務管理器
 * 它只保護系統數據源(包含dataSourceDefault),不會保護datasource
 * 其實能夠作到,可是這個事務管理器目前只爲bo多數據源的保護,因此我沒支持
 * 做者:aschs
 * 郵箱:aschs@qq.com
 * 日期:2018年10月10日
 * 版權:summer
 * </pre>
 */
public class AbDataSourceTransactionManager extends AbstractPlatformTransactionManager implements ResourceTransactionManager, InitializingBean {
    private int i = 0;
    
    @Override
    public void afterPropertiesSet() throws Exception {
        logger.debug("ab的事務管理器已就緒");
    }

    @Override
    public Object getResourceFactory() {
        return DataSourceUtil.getDataSourceByAlias(DataSourceUtil.GLOBAL_DATASOURCE);
    }

    /**
     * <pre>
     * 生成一個在整個事務處理都用到的資源
     * 這裏我放了在過程當中的全部鏈接 Map<數據源別名,鏈接>
     * </pre>
     */
    @Override
    protected Object doGetTransaction() {
        return new HashMap<String, Connection>();
    }
    
    /**
     * 判斷是否已存在事務
     */
    @Override
    protected boolean isExistingTransaction(Object transaction) {
        return (boolean) ThreadMapUtil.getOrDefault("abTransactionManagerExist", false);
    }
    
    /**
     * <pre>
     * 必須實現的一個方法,設置線程內的事務爲回滾狀態。
     * 這裏實際上是爲了預防傳播性設置爲 讓線程內能夠屢次管理器操做的狀況下,用來通知你們不要只作回滾,別commit了。
     * 在該事務管理器只支持PROPAGATION_REQUIRED 的狀況下(線程只有一個管理器操做),沒多大用,只是必需要實現這個
     * 否則抽象類那裏會有報錯代碼。
     * </pre>
     */
    @Override
    protected void doSetRollbackOnly(DefaultTransactionStatus status) {
        ThreadMapUtil.put("abTransactionManagerRollbackOnly", true);//標記ab事務管理器在線程內已準備要回滾了
    }
    
    /**
     * <pre>
     * 準備事務,獲取連接
     * </pre>
     */
    @Override
    protected void doBegin(Object transaction, TransactionDefinition definition) throws TransactionException {
        logger.info("分佈式事務開始:"+i);
        
        Map<String, Connection> conMap = (Map<String, Connection>) transaction;
        Map<String, DataSource> dsMap = DataSourceUtil.getDataSources();
        // 遍歷系統中的全部數據源,打開鏈接
        for (Entry<String, DataSource> entry : dsMap.entrySet()) {
            Connection con = null;
            try {
                ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(entry.getValue());
                if (conHolder == null) {
                    con = entry.getValue().getConnection();
                    con.setAutoCommit(false);
                    // 緩存連接
                    TransactionSynchronizationManager.bindResource(entry.getValue(), new ConnectionHolder(con));
                } else {
                    con = conHolder.getConnection();
                }
                
                //系統數據源放進資源裏
                if(DbContextHolder.getDataSource().equals(entry.getKey())) {
                    DynamicDataSource dynamicDataSource = (DynamicDataSource) AppUtil.getBean(DataSourceUtil.GLOBAL_DATASOURCE);
                    TransactionSynchronizationManager.bindResource(dynamicDataSource, new ConnectionHolder(con));
                }
                
                conMap.put(entry.getKey(), con);
                logger.debug("數據源別名[" + entry.getKey() + "]打開鏈接成功");
            } catch (Throwable ex) {
                doCleanupAfterCompletion(conMap);
                throw new CannotCreateTransactionException("數據源別名[" + entry.getKey() + "]打開鏈接錯誤", ex);
            }
        }
        
        ThreadMapUtil.put("abTransactionManagerExist", true);//標記ab事務管理器已經在線程內啓動了
    }

    @Override
    protected void doCommit(DefaultTransactionStatus status) {
        Map<String, Connection> conMap = (Map<String, Connection>) status.getTransaction();
        for (Entry<String, Connection> entry : conMap.entrySet()) {
            try {
                entry.getValue().commit();
                logger.debug("數據源別名[" + entry.getKey() + "]提交事務成功");
            } catch (SQLException ex) {
                doCleanupAfterCompletion(conMap);
                throw new TransactionSystemException("數據源別名[" + entry.getKey() + "]提交事務失敗", ex);
            }
        }
        logger.info("分佈式事務提交:"+i);
    }
    
    /**
     * 回滾
     */
    @Override
    protected void doRollback(DefaultTransactionStatus status) throws TransactionException {
        Map<String, Connection> conMap = (Map<String, Connection>) status.getTransaction();
        for (Entry<String, Connection> entry : conMap.entrySet()) {
            try {
                entry.getValue().rollback();
                logger.debug("數據源別名[" + entry.getKey() + "]回滾事務成功");
            } catch (SQLException ex) {
                doCleanupAfterCompletion(conMap);
                throw new TransactionSystemException("數據源別名[" + entry.getKey() + "]回滾事務失敗", ex);
            }
        }
        logger.info("分佈式事務回滾:"+i);
    }
    
    /**
     * 回收連接
     */
    @Override
    protected void doCleanupAfterCompletion(Object transaction) {
        Map<String, Connection> conMap = (Map<String, Connection>) transaction;
        for (Entry<String, Connection> entry : conMap.entrySet()) {
            DataSource dataSource = DataSourceUtil.getDataSourceByAlias(entry.getKey());
            TransactionSynchronizationManager.unbindResource(dataSource);
            DataSourceUtils.releaseConnection(entry.getValue(), dataSource);
            logger.debug("數據源別名[" + entry.getKey() + "]關閉連接成功");
        }
        
        //最後把本地資源也釋放了
        DynamicDataSource dynamicDataSource = (DynamicDataSource) AppUtil.getBean(DataSourceUtil.GLOBAL_DATASOURCE);
        TransactionSynchronizationManager.unbindResource(dynamicDataSource);
        
        ThreadMapUtil.remove("abTransactionManagerExist");
        ThreadMapUtil.remove("abTransactionManagerRollbackOnly");
        ThreadMapUtil.remove();
        
        logger.info("分佈式事務釋放:"+(i++));
    }
}

 

 事務管理器的方法調用順序和時機大概說一下:

1 doGetTransaction方法:來初始化事務處理過程當中的公共資源,後面調用的其餘方法都是以它爲媒介的。

2 doBegin方法:開始事務操做,主要是打開數據源的連接,記得要放到事務資源管理服務中TransactionSynchronizationManager,很是重要,由於這個過程當中用到的jdbc操做是從這裏面拿的。

3 doCommit(doRollback):如題,把獲取的連接提交或者回滾操做。

4 doCleanupAfterCompletion:回收連接資源

至此,事務管理器的邏輯已經結束了~

最後,實現務必實現isExistingTransaction,用來處理重複線程內屢次觸發了事務切面的邏輯

這裏筆者用簡單的線程變量來標記是否線程內已存在了事務管理,由於我只支持PROPAGATION_REQUIRED傳播性,因此沒考慮內部嵌入的其餘狀況,其實也是內部commit一下,資源確定是最後統一釋放的。

3 使用自定義事務管理器

先提一下,這裏筆者只保護會使用到多數據源的模塊,其實大部分系統邏輯仍是用DataSourceTransactionManager就夠,不須要保護太多數據源(由於釋放和打開連接是有性能損耗的)。

能夠看出,主要的邏輯系統仍是使用傳統管理器,而後在特定地方聲明特殊管理器則可:

5 到這裏,整個分佈式事務管理已完成了,主要是利用了路由數據源AbstractRoutingDataSource和自定義事務管理器實現的~

6 AgileBPM的多數據源結合展現(可跳過)

這裏展現一下,這個開源的流程系統的強大可配置性(讓人髮指的靈活性-。-)的數據源管理功能。

數據源模板,在這裏你可使用定義不一樣的數據源實現類,只要在項目import就行

這裏有一個內置的阿里的數據源,後面有須要你能夠增長其餘模板,例如BasicDataSOurce這個,最經常使用的數據源。

有了模板,就能夠新建數據源了:

這裏的是特殊默認數據源,系統本地數據源,用戶能夠隨便添加。

而後,我就基於AgileBPM的強大數據源管理下,進行了分佈式數據源測試。測試邏輯很簡單,就是在一個線程內,操做多個數據源,而後看一下會不會一塊兒回滾。

 

 這裏展現了一下,AgileBPM中使用數據源的便捷性,根據配置的別名,直接拿來代碼開發則可,測試代碼比較隨意了,能保證一致性。

 7 這樣的實如今壓測中的表現

本人用的是jmetter來壓測事務處理,它的表現跟傳統的DataSourceTransactionManager表現是同樣的!!!!(雖然過程遇到了線程變量的坑,但已修復)。

配置,400進程同時施壓:

這是壓測結果

 這是日誌輸出,我故意輸出了每一次獲取連接,提交,和釋放的事務處理過程

 8 挫敗:原來行業內的問題主要卡在commit上。

因爲對分佈式事務產生極大興趣,因此專研了一下,這麼簡單的實現爲啥別人都以爲是打問題呢?原來是由於commit會出錯的狀況,第一個連接commit成功後,第二個連接commit失敗,那麼第一個連接已不能回滾了!!!!因此行業內大部分方案都在處理這種狀況,雖然到了commit階段,數據庫已經對相關資源產生了寫鎖,數據也寫入磁盤,就等commit刷進去了,產生錯誤的機率是極少了。做爲行業內的大難題,不少方案在處理這個問題。什麼2pc原則。。等等,有空我整理一下。大部分主流項目解決方案仍是tcc爲主,畢竟這個最通用直接。

8.1頓悟!!

頓悟!!其實我以上這種實現方案就是2pc的實現方案,在jdbc都操做了sql沒問題後,再一併提交的方案就是2pc。可是2pc有這個commit提交存在的設計缺陷(這種時機是存在不多可能性的),因此別人就提出tcc和消息隊列的解決commit異常的更可靠的方案(可是,只要是串行邏輯就沒有百分百可靠的方案,只是下降了可能性罷了)。因此,ab項目關於分佈式是採用了2pc的解決方案,順帶提一下jta事務也是相似的邏輯,不過他們的性能主要卡在消息通知上。例如全部連接操做sql都成功了,我須要通知AB連接去提交,我通知了A,A提交成功,而後我通知B,B沒收到消息,那麼AB資源都會卡住不釋放,而後B會超時致使回滾了。因此,jta在消息通知上比較損耗性能……關於2pc的友情連接:

落寞的流月城(632266504) 14:13:42
https://cloud.tencent.com/developer/article/1355859

 

 

9關於AB項目的多數據bo場景方案

在經歷挫敗以後,理性分析了一下,其實當前這種方案已經知足了ab的分佈式事務處理的需求了。首先,其實commit失敗的場景是少之又少,筆者調整了邏輯,後面把重要的系統數據源放在最後提交,保證了系統數據源的強一致性,也就是說保證了流程數據的一致性。

 

緣由分析,這裏細想一個場景,我先把業務數據從1改爲2,而後驅動流程流轉,假如個人bo是其餘數據源A,A先提交成功,可是系統的本地數據源B提交失敗了,那麼致使B操做的流程數據會回滾,可是A的數據已提交沒法回滾。結果是,流程沒有流轉,可是業務數據已更新了爲2了。這種場景在ab中,至關於,我操做了一下業務數據的保存,由於流程沒有變,只是保存了一下數據,對於流程系統自己來講,有時候仍是好事,由於雖然流程流轉失敗了,可是業務數據不想再填寫一次。因此我說這種方案已經知足ab項目的多數據源下的分佈式場景的需求了。

固然,若是用戶仍是執着於全部數據源的強一致性,在ab項目中能夠在bo保存前,先備份一下bo數據,而後在doCommit時恢復備份數據則能夠,ab裏有不少時機插件,定義了一些時機插件列表,而後你多實現了插件則會運行,ab的插件代碼展現,以下:

ab做爲面向技術人員的流程系統,裏面內嵌提供了豐富的便捷開發的寫法和實現。

相關文章
相關標籤/搜索