先推薦一下碼雲上的一個GVP(最有價值的開源項目) AgileBPM(下面簡稱ab),我下面講解的方案也是它的Bo支持多數據源操做事務管理器,友情連接:http://doc.agilebpm.cn/java
目前是解決的是處理單系統內的多數據源問題,簡單來講就是在單系統中的一個線程內,保護多個數據源事務,這也是ab項目所須要的場景。redis
參考了碼雲上的開源的lcn分佈式事務解決方案,以爲再拓展一下也是能夠解決微服務間的分佈式事務處理,利用redis放一個事務處理的共同空間,而後在共同空間內來統籌事務,不過它處理commit異常的問題也是用通用方式(commit失敗不少項目都是採起tcc的方式處理)。spring
ps:以前本人試過使用jta事務管理器,這個性能真看不下去。一會就卡。。因此就想着本身定義個管理器,本身來釋放資源。sql
1 用AbstractRoutingDataSource讓系統支持多數據源數據庫
動態數據源配置:緩存
真正的數據源(druid數據源):分佈式
展現一下DynamicDataSource是繼承了AbstractRoutingDataSource的實現,這裏不是重點。ide
2 實現支持這種路由數據源的事務管理器微服務
先繼承AbstractPlatformTransactionManager(事務管理器的抽象類,咱們很經常使用的DataSourceTransactionManager就是繼承它的)
性能
裏面須要實現幾個關鍵點就行(筆者只考慮了事務傳播性爲PROPAGATION_REQUIRED的狀況,這也是項目最經常使用的,其餘我沒支持,畢竟是定製化的事務管理器)
package com.dstz.bus.service.impl; import java.sql.Connection; import java.sql.SQLException; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; import javax.sql.DataSource; import org.springframework.beans.factory.InitializingBean; import org.springframework.jdbc.datasource.ConnectionHolder; import org.springframework.jdbc.datasource.DataSourceUtils; import org.springframework.transaction.CannotCreateTransactionException; import org.springframework.transaction.TransactionDefinition; import org.springframework.transaction.TransactionException; import org.springframework.transaction.TransactionSystemException; import org.springframework.transaction.support.AbstractPlatformTransactionManager; import org.springframework.transaction.support.DefaultTransactionStatus; import org.springframework.transaction.support.ResourceTransactionManager; import org.springframework.transaction.support.TransactionSynchronizationManager; import com.dstz.base.core.util.AppUtil; import com.dstz.base.core.util.ThreadMapUtil; import com.dstz.base.db.datasource.DataSourceUtil; import com.dstz.base.db.datasource.DbContextHolder; import com.dstz.base.db.datasource.DynamicDataSource; /** * <pre> * 描述:ab 結合sys多數據源操做 專門爲bo db實例化作的事務管理器 * 它只保護系統數據源(包含dataSourceDefault),不會保護datasource * 其實能夠作到,可是這個事務管理器目前只爲bo多數據源的保護,因此我沒支持 * 做者:aschs * 郵箱:aschs@qq.com * 日期:2018年10月10日 * 版權:summer * </pre> */ public class AbDataSourceTransactionManager extends AbstractPlatformTransactionManager implements ResourceTransactionManager, InitializingBean { private int i = 0; @Override public void afterPropertiesSet() throws Exception { logger.debug("ab的事務管理器已就緒"); } @Override public Object getResourceFactory() { return DataSourceUtil.getDataSourceByAlias(DataSourceUtil.GLOBAL_DATASOURCE); } /** * <pre> * 生成一個在整個事務處理都用到的資源 * 這裏我放了在過程當中的全部鏈接 Map<數據源別名,鏈接> * </pre> */ @Override protected Object doGetTransaction() { return new HashMap<String, Connection>(); } /** * 判斷是否已存在事務 */ @Override protected boolean isExistingTransaction(Object transaction) { return (boolean) ThreadMapUtil.getOrDefault("abTransactionManagerExist", false); } /** * <pre> * 必須實現的一個方法,設置線程內的事務爲回滾狀態。 * 這裏實際上是爲了預防傳播性設置爲 讓線程內能夠屢次管理器操做的狀況下,用來通知你們不要只作回滾,別commit了。 * 在該事務管理器只支持PROPAGATION_REQUIRED 的狀況下(線程只有一個管理器操做),沒多大用,只是必需要實現這個 * 否則抽象類那裏會有報錯代碼。 * </pre> */ @Override protected void doSetRollbackOnly(DefaultTransactionStatus status) { ThreadMapUtil.put("abTransactionManagerRollbackOnly", true);//標記ab事務管理器在線程內已準備要回滾了 } /** * <pre> * 準備事務,獲取連接 * </pre> */ @Override protected void doBegin(Object transaction, TransactionDefinition definition) throws TransactionException { logger.info("分佈式事務開始:"+i); Map<String, Connection> conMap = (Map<String, Connection>) transaction; Map<String, DataSource> dsMap = DataSourceUtil.getDataSources(); // 遍歷系統中的全部數據源,打開鏈接 for (Entry<String, DataSource> entry : dsMap.entrySet()) { Connection con = null; try { ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(entry.getValue()); if (conHolder == null) { con = entry.getValue().getConnection(); con.setAutoCommit(false); // 緩存連接 TransactionSynchronizationManager.bindResource(entry.getValue(), new ConnectionHolder(con)); } else { con = conHolder.getConnection(); } //系統數據源放進資源裏 if(DbContextHolder.getDataSource().equals(entry.getKey())) { DynamicDataSource dynamicDataSource = (DynamicDataSource) AppUtil.getBean(DataSourceUtil.GLOBAL_DATASOURCE); TransactionSynchronizationManager.bindResource(dynamicDataSource, new ConnectionHolder(con)); } conMap.put(entry.getKey(), con); logger.debug("數據源別名[" + entry.getKey() + "]打開鏈接成功"); } catch (Throwable ex) { doCleanupAfterCompletion(conMap); throw new CannotCreateTransactionException("數據源別名[" + entry.getKey() + "]打開鏈接錯誤", ex); } } ThreadMapUtil.put("abTransactionManagerExist", true);//標記ab事務管理器已經在線程內啓動了 } @Override protected void doCommit(DefaultTransactionStatus status) { Map<String, Connection> conMap = (Map<String, Connection>) status.getTransaction(); for (Entry<String, Connection> entry : conMap.entrySet()) { try { entry.getValue().commit(); logger.debug("數據源別名[" + entry.getKey() + "]提交事務成功"); } catch (SQLException ex) { doCleanupAfterCompletion(conMap); throw new TransactionSystemException("數據源別名[" + entry.getKey() + "]提交事務失敗", ex); } } logger.info("分佈式事務提交:"+i); } /** * 回滾 */ @Override protected void doRollback(DefaultTransactionStatus status) throws TransactionException { Map<String, Connection> conMap = (Map<String, Connection>) status.getTransaction(); for (Entry<String, Connection> entry : conMap.entrySet()) { try { entry.getValue().rollback(); logger.debug("數據源別名[" + entry.getKey() + "]回滾事務成功"); } catch (SQLException ex) { doCleanupAfterCompletion(conMap); throw new TransactionSystemException("數據源別名[" + entry.getKey() + "]回滾事務失敗", ex); } } logger.info("分佈式事務回滾:"+i); } /** * 回收連接 */ @Override protected void doCleanupAfterCompletion(Object transaction) { Map<String, Connection> conMap = (Map<String, Connection>) transaction; for (Entry<String, Connection> entry : conMap.entrySet()) { DataSource dataSource = DataSourceUtil.getDataSourceByAlias(entry.getKey()); TransactionSynchronizationManager.unbindResource(dataSource); DataSourceUtils.releaseConnection(entry.getValue(), dataSource); logger.debug("數據源別名[" + entry.getKey() + "]關閉連接成功"); } //最後把本地資源也釋放了 DynamicDataSource dynamicDataSource = (DynamicDataSource) AppUtil.getBean(DataSourceUtil.GLOBAL_DATASOURCE); TransactionSynchronizationManager.unbindResource(dynamicDataSource); ThreadMapUtil.remove("abTransactionManagerExist"); ThreadMapUtil.remove("abTransactionManagerRollbackOnly"); ThreadMapUtil.remove(); logger.info("分佈式事務釋放:"+(i++)); } }
事務管理器的方法調用順序和時機大概說一下:
1 doGetTransaction方法:來初始化事務處理過程當中的公共資源,後面調用的其餘方法都是以它爲媒介的。
2 doBegin方法:開始事務操做,主要是打開數據源的連接,記得要放到事務資源管理服務中TransactionSynchronizationManager,很是重要,由於這個過程當中用到的jdbc操做是從這裏面拿的。
3 doCommit(doRollback):如題,把獲取的連接提交或者回滾操做。
4 doCleanupAfterCompletion:回收連接資源
至此,事務管理器的邏輯已經結束了~
最後,實現務必實現isExistingTransaction,用來處理重複線程內屢次觸發了事務切面的邏輯
這裏筆者用簡單的線程變量來標記是否線程內已存在了事務管理,由於我只支持PROPAGATION_REQUIRED傳播性,因此沒考慮內部嵌入的其餘狀況,其實也是內部commit一下,資源確定是最後統一釋放的。
3 使用自定義事務管理器
先提一下,這裏筆者只保護會使用到多數據源的模塊,其實大部分系統邏輯仍是用DataSourceTransactionManager就夠,不須要保護太多數據源(由於釋放和打開連接是有性能損耗的)。
能夠看出,主要的邏輯系統仍是使用傳統管理器,而後在特定地方聲明特殊管理器則可:
5 到這裏,整個分佈式事務管理已完成了,主要是利用了路由數據源AbstractRoutingDataSource和自定義事務管理器實現的~
6 AgileBPM的多數據源結合展現(可跳過)
這裏展現一下,這個開源的流程系統的強大可配置性(讓人髮指的靈活性-。-)的數據源管理功能。
數據源模板,在這裏你可使用定義不一樣的數據源實現類,只要在項目import就行
這裏有一個內置的阿里的數據源,後面有須要你能夠增長其餘模板,例如BasicDataSOurce這個,最經常使用的數據源。
有了模板,就能夠新建數據源了:
這裏的是特殊默認數據源,系統本地數據源,用戶能夠隨便添加。
而後,我就基於AgileBPM的強大數據源管理下,進行了分佈式數據源測試。測試邏輯很簡單,就是在一個線程內,操做多個數據源,而後看一下會不會一塊兒回滾。
這裏展現了一下,AgileBPM中使用數據源的便捷性,根據配置的別名,直接拿來代碼開發則可,測試代碼比較隨意了,能保證一致性。
7 這樣的實如今壓測中的表現
本人用的是jmetter來壓測事務處理,它的表現跟傳統的DataSourceTransactionManager表現是同樣的!!!!(雖然過程遇到了線程變量的坑,但已修復)。
配置,400進程同時施壓:
這是壓測結果
這是日誌輸出,我故意輸出了每一次獲取連接,提交,和釋放的事務處理過程
8 挫敗:原來行業內的問題主要卡在commit上。
因爲對分佈式事務產生極大興趣,因此專研了一下,這麼簡單的實現爲啥別人都以爲是打問題呢?原來是由於commit會出錯的狀況,第一個連接commit成功後,第二個連接commit失敗,那麼第一個連接已不能回滾了!!!!因此行業內大部分方案都在處理這種狀況,雖然到了commit階段,數據庫已經對相關資源產生了寫鎖,數據也寫入磁盤,就等commit刷進去了,產生錯誤的機率是極少了。做爲行業內的大難題,不少方案在處理這個問題。什麼2pc原則。。等等,有空我整理一下。大部分主流項目解決方案仍是tcc爲主,畢竟這個最通用直接。
8.1頓悟!!
頓悟!!其實我以上這種實現方案就是2pc的實現方案,在jdbc都操做了sql沒問題後,再一併提交的方案就是2pc。可是2pc有這個commit提交存在的設計缺陷(這種時機是存在不多可能性的),因此別人就提出tcc和消息隊列的解決commit異常的更可靠的方案(可是,只要是串行邏輯就沒有百分百可靠的方案,只是下降了可能性罷了)。因此,ab項目關於分佈式是採用了2pc的解決方案,順帶提一下jta事務也是相似的邏輯,不過他們的性能主要卡在消息通知上。例如全部連接操做sql都成功了,我須要通知AB連接去提交,我通知了A,A提交成功,而後我通知B,B沒收到消息,那麼AB資源都會卡住不釋放,而後B會超時致使回滾了。因此,jta在消息通知上比較損耗性能……關於2pc的友情連接:
落寞的流月城(632266504) 14:13:42
https://cloud.tencent.com/developer/article/1355859
9關於AB項目的多數據bo場景方案
在經歷挫敗以後,理性分析了一下,其實當前這種方案已經知足了ab的分佈式事務處理的需求了。首先,其實commit失敗的場景是少之又少,筆者調整了邏輯,後面把重要的系統數據源放在最後提交,保證了系統數據源的強一致性,也就是說保證了流程數據的一致性。
緣由分析,這裏細想一個場景,我先把業務數據從1改爲2,而後驅動流程流轉,假如個人bo是其餘數據源A,A先提交成功,可是系統的本地數據源B提交失敗了,那麼致使B操做的流程數據會回滾,可是A的數據已提交沒法回滾。結果是,流程沒有流轉,可是業務數據已更新了爲2了。這種場景在ab中,至關於,我操做了一下業務數據的保存,由於流程沒有變,只是保存了一下數據,對於流程系統自己來講,有時候仍是好事,由於雖然流程流轉失敗了,可是業務數據不想再填寫一次。因此我說這種方案已經知足ab項目的多數據源下的分佈式場景的需求了。
固然,若是用戶仍是執着於全部數據源的強一致性,在ab項目中能夠在bo保存前,先備份一下bo數據,而後在doCommit時恢復備份數據則能夠,ab裏有不少時機插件,定義了一些時機插件列表,而後你多實現了插件則會運行,ab的插件代碼展現,以下:
ab做爲面向技術人員的流程系統,裏面內嵌提供了豐富的便捷開發的寫法和實現。