摘要: 原創出處 http://www.iocoder.cn/Sharding-JDBC/transaction-bed/ 「芋道源碼」歡迎轉載,保留摘要,謝謝!java
本文主要基於 Sharding-JDBC 1.5.0 正式版git
🙂🙂🙂關注**微信公衆號:【芋道源碼】**有福利:github
- RocketMQ / MyCAT / Sharding-JDBC 全部源碼分析文章列表
- RocketMQ / MyCAT / Sharding-JDBC 中文註釋源碼 GitHub 地址
- 您對於源碼的疑問每條留言都將獲得認真回覆。甚至不知道如何讀源碼也能夠請教噢。
- 新的源碼解析文章實時收到通知。每週更新一篇左右。
- 認真的源碼交流微信羣。
數據庫表分庫後,業務場景下的單庫本地事務可能變成跨庫分佈式事務。雖然咱們能夠經過合適的分庫規則讓操做的數據在同庫下,繼續保證單庫本地事務,這也是很是推崇的,但不是全部場景下都能適用。若是這些場景對事務的一致性有要求,咱們就不得不解決分佈式事務的「麻煩」。sql
分佈式事務是個很大的話題,咱們來看看 Sharding-JDBC 對她的權衡:數據庫
Sharding-JDBC因爲性能方面的考量,決定不支持強一致性分佈式事務。咱們已明確規劃線路圖,將來會支持最終一致性的柔性事務。微信
Sharding-JDBC 提供了兩種 柔性事務:架構
本文分享 最大努力送達型 的實現。建議前置閱讀:《Sharding-JDBC 源碼分析 —— SQL 執行》。運維
Sharding-JDBC 正在收集使用公司名單:傳送門。
🙂 你的登記,會讓更多人蔘與和使用 Sharding-JDBC。傳送門
Sharding-JDBC 也會所以,可以覆蓋更多的業務場景。傳送門
登記吧,騷年!傳送門dom
概念異步
在分佈式數據庫的場景下,相信對於該數據庫的操做最終必定能夠成功,因此經過最大努力反覆嘗試送達操做。
從概念看,可能不是很直白的理解是什麼意思,本文會最大努力讓你乾淨理解。
架構圖
執行過程有 四種 狀況:
總體成漏斗倒三角,上一個階段失敗,交給下一個階段重試:
整個過程經過以下 組件 完成:
下面,咱們逐節分享每一個組件。
柔性事務管理器,SoftTransactionManager 實現,負責對柔性事務配置( SoftTransactionConfiguration ) 、柔性事務( AbstractSoftTransaction )的管理。
調用 #init()
初始化柔性管理器:
// SoftTransactionManager.java /** * 柔性事務配置對象 */ @Getter private final SoftTransactionConfiguration transactionConfig; // SoftTransactionManager.java /** * 初始化事務管理器. */ public void init() throws SQLException { // 初始化 最大努力送達型事務監聽器 EventBusInstance.getInstance().register(new BestEffortsDeliveryListener()); // 初始化 事務日誌數據庫存儲表 if (TransactionLogDataSourceType.RDB == transactionConfig.getStorageType()) { Preconditions.checkNotNull(transactionConfig.getTransactionLogDataSource()); createTable(); } // 初始化 內嵌的最大努力送達型異步做業 if (transactionConfig.getBestEffortsDeliveryJobConfiguration().isPresent()) { new NestedBestEffortsDeliveryJobFactory(transactionConfig).init(); } }
transaction_log
)**不存在則進行建立。在『事務日誌存儲器』小節會詳細分享SoftTransactionConfiguration
SoftTransactionConfiguration,柔性事務配置對象。
public class SoftTransactionConfiguration { /** * 事務管理器管理的數據源. */ @Getter(AccessLevel.NONE) private final DataSource targetDataSource; /** * 同步的事務送達的最大嘗試次數. */ private int syncMaxDeliveryTryTimes = 3; /** * 事務日誌存儲類型. */ private TransactionLogDataSourceType storageType = RDB; /** * 存儲事務日誌的數據源. */ private DataSource transactionLogDataSource; /** * 內嵌的最大努力送達型異步做業配置對象. */ private Optional<NestedBestEffortsDeliveryJobConfiguration> bestEffortsDeliveryJobConfiguration = Optional.absent(); }
在 Sharding-JDBC 裏,目前柔性事務分紅兩種:
繼承 AbstractSoftTransaction
public abstract class AbstractSoftTransaction { /** * 分片鏈接原自動提交狀態 */ private boolean previousAutoCommit; /** * 分片鏈接 */ @Getter private ShardingConnection connection; /** * 事務類型 */ @Getter private SoftTransactionType transactionType; /** * 事務編號 */ @Getter private String transactionId; }
AbstractSoftTransaction 實現了開啓柔性事務、關閉柔性事務兩個方法提供給子類調用:
#beginInternal()
/** * 開啓柔性 * * @param conn 分片鏈接 * @param type 事務類型 * @throws SQLException */ protected final void beginInternal(final Connection conn, final SoftTransactionType type) throws SQLException { // TODO 判斷若是在傳統事務中,則拋異常 Preconditions.checkArgument(conn instanceof ShardingConnection, "Only ShardingConnection can support eventual consistency transaction."); // 設置執行錯誤,不拋出異常 ExecutorExceptionHandler.setExceptionThrown(false); connection = (ShardingConnection) conn; transactionType = type; // 設置自動提交狀態 previousAutoCommit = connection.getAutoCommit(); connection.setAutoCommit(true); // 生成事務編號 // TODO 替換UUID爲更有效率的id生成器 transactionId = UUID.randomUUID().toString(); }
調用 ExecutorExceptionHandler.setExceptionThrown(false)
設置執行 SQL 錯誤時,也不拋出異常。
調用 connection.setAutoCommit(true);
,設置執行自動提交。使用最大努力型事務時,上層業務執行 SQL 會立刻提交,即便調用 Connection#rollback()
也是沒法回滾的,這點必定要注意。
#end()
/** * 結束柔性事務. */ public final void end() throws SQLException { if (connection != null) { ExecutorExceptionHandler.setExceptionThrown(true); connection.setAutoCommit(previousAutoCommit); SoftTransactionManager.closeCurrentTransactionManager(); } } // SoftTransactionManager.java /** * 關閉當前的柔性事務管理器. */ static void closeCurrentTransactionManager() { ExecutorDataMap.getDataMap().put(TRANSACTION, null); ExecutorDataMap.getDataMap().put(TRANSACTION_CONFIG, null); }
#end()
清理線程變量。不然,下次請求使用到該線程,會繼續在這個柔性事務內。BEDSoftTransaction
BEDSoftTransaction,最大努力送達型柔性事務。
public class BEDSoftTransaction extends AbstractSoftTransaction { /** * 開啓柔性事務. * * @param connection 數據庫鏈接對象 */ public void begin(final Connection connection) throws SQLException { beginInternal(connection, SoftTransactionType.BestEffortsDelivery); } }
TCCSoftTransaction
TCCSoftTransaction,TCC 型柔性事務,暫未實現。實現後,會更新到 《Sharding-JDBC 源碼分析 —— 分佈式事務(二)之事務補償型》。
經過調用 SoftTransactionManager#getTransaction()
建立柔性事務對象:
/** * {@link ExecutorDataMap#dataMap} 柔性事務對象 key */ private static final String TRANSACTION = "transaction"; /** * {@link ExecutorDataMap#dataMap} 柔性事務配置 key */ private static final String TRANSACTION_CONFIG = "transactionConfig"; // SoftTransactionManager.java /** * 建立柔性事務. * * @param type 柔性事務類型 * @return 柔性事務 */ public AbstractSoftTransaction getTransaction(final SoftTransactionType type) { AbstractSoftTransaction result; switch (type) { case BestEffortsDelivery: result = new BEDSoftTransaction(); break; case TryConfirmCancel: result = new TCCSoftTransaction(); break; default: throw new UnsupportedOperationException(type.toString()); } // TODO 目前使用不支持嵌套事務,之後這裏須要可配置 if (getCurrentTransaction().isPresent()) { throw new UnsupportedOperationException("Cannot support nested transaction."); } ExecutorDataMap.getDataMap().put(TRANSACTION, result); ExecutorDataMap.getDataMap().put(TRANSACTION_CONFIG, transactionConfig); return result; }
後續能夠從 ExecutorDataMap 中獲取當前線程的柔性事務和柔性事務配置:
// SoftTransactionManager.java /** * 獲取當前線程的柔性事務配置. * * @return 當前線程的柔性事務配置 */ public static Optional<SoftTransactionConfiguration> getCurrentTransactionConfiguration() { Object transactionConfig = ExecutorDataMap.getDataMap().get(TRANSACTION_CONFIG); return (null == transactionConfig) ? Optional.<SoftTransactionConfiguration>absent() : Optional.of((SoftTransactionConfiguration) transactionConfig); } /** * 獲取當前的柔性事務. * * @return 當前的柔性事務 */ public static Optional<AbstractSoftTransaction> getCurrentTransaction() { Object transaction = ExecutorDataMap.getDataMap().get(TRANSACTION); return (null == transaction) ? Optional.<AbstractSoftTransaction>absent() : Optional.of((AbstractSoftTransaction) transaction); }
柔性事務執行過程當中,會經過事務日誌( TransactionLog ) 記錄每條 SQL 執行狀態:
經過實現事務日誌存儲器接口( TransactionLogStorage ),提供存儲功能。目前有兩種實現:
本節只分析 RdbTransactionLogStorage。對 MemoryTransactionLogStorage 感興趣的同窗能夠點擊連接傳送到達。
TransactionLogStorage 有五個接口方法,下文每一個小標題都是一個方法。
// TransactionLogStorage.java /** * 存儲事務日誌. * * @param transactionLog 事務日誌 */ void add(TransactionLog transactionLog); // RdbTransactionLogStorage.java @Override public void add(final TransactionLog transactionLog) { String sql = "INSERT INTO `transaction_log` (`id`, `transaction_type`, `data_source`, `sql`, `parameters`, `creation_time`) VALUES (?, ?, ?, ?, ?, ?);"; try ( // ... 省略你熟悉的代碼 } catch (final SQLException ex) { throw new TransactionLogStorageException(ex); } }
#add()
和下文的 #remove()
異常時,都打印下異常日誌都文件系統TransactionLog (transaction_log) 數據庫表結構以下:
字段 | 名字 | 數據庫類型 | 備註 |
---|---|---|---|
id | 事件編號 | VARCHAR(40) | EventBus 事件編號,非事務編號 |
transaction_type | 柔性事務類型 | VARCHAR(30) | |
data_source | 真實數據源名 | VARCHAR(255) | |
sql | 執行 SQL | TEXT | 已經改寫過的 SQL |
parameters | 佔位符參數 | TEXT | JSON 字符串存儲 |
creation_time | 記錄時間 | LONG | |
async_delivery_try_times | 已異步重試次數 | INT |
// TransactionLogStorage.java /** * 根據主鍵刪除事務日誌. * * @param id 事務日誌主鍵 */ void remove(String id); // RdbTransactionLogStorage.java @Override public void remove(final String id) { String sql = "DELETE FROM `transaction_log` WHERE `id`=?;"; try ( // ... 省略你熟悉的代碼 } catch (final SQLException ex) { throw new TransactionLogStorageException(ex); } }
// TransactionLogStorage.java /** * 讀取須要處理的事務日誌. * * <p>須要處理的事務日誌爲: </p> * <p>1. 異步處理次數小於最大處理次數.</p> * <p>2. 異步處理的事務日誌早於異步處理的間隔時間.</p> * * @param size 獲取日誌的數量 * @param maxDeliveryTryTimes 事務送達的最大嘗試次數 * @param maxDeliveryTryDelayMillis 執行送達事務的延遲毫秒數. */ List<TransactionLog> findEligibleTransactionLogs(int size, int maxDeliveryTryTimes, long maxDeliveryTryDelayMillis); // RdbTransactionLogStorage.java @Override public List<TransactionLog> findEligibleTransactionLogs(final int size, final int maxDeliveryTryTimes, final long maxDeliveryTryDelayMillis) { List<TransactionLog> result = new ArrayList<>(size); String sql = "SELECT `id`, `transaction_type`, `data_source`, `sql`, `parameters`, `creation_time`, `async_delivery_try_times` " + "FROM `transaction_log` WHERE `async_delivery_try_times`<? AND `transaction_type`=? AND `creation_time`<? LIMIT ?;"; try (Connection conn = dataSource.getConnection()) { // ... 省略你熟悉的代碼 } catch (final SQLException ex) { throw new TransactionLogStorageException(ex); } return result; }
// TransactionLogStorage.java /** * 增長事務日誌異步重試次數. * * @param id 事務主鍵 */ void increaseAsyncDeliveryTryTimes(String id); // RdbTransactionLogStorage.java @Override public void increaseAsyncDeliveryTryTimes(final String id) { String sql = "UPDATE `transaction_log` SET `async_delivery_try_times`=`async_delivery_try_times`+1 WHERE `id`=?;"; try ( // ... 省略你熟悉的代碼 } catch (final SQLException ex) { throw new TransactionLogStorageException(ex); } }
// TransactionLogStorage.java /** * 處理事務數據. * * @param connection 業務數據庫鏈接 * @param transactionLog 事務日誌 * @param maxDeliveryTryTimes 事務送達的最大嘗試次數 */ boolean processData(Connection connection, TransactionLog transactionLog, int maxDeliveryTryTimes); // RdbTransactionLogStorage.java @Override public boolean processData(final Connection connection, final TransactionLog transactionLog, final int maxDeliveryTryTimes) { // 重試執行失敗 SQL try ( Connection conn = connection; PreparedStatement preparedStatement = conn.prepareStatement(transactionLog.getSql())) { for (int parameterIndex = 0; parameterIndex < transactionLog.getParameters().size(); parameterIndex++) { preparedStatement.setObject(parameterIndex + 1, transactionLog.getParameters().get(parameterIndex)); } preparedStatement.executeUpdate(); } catch (final SQLException ex) { // 重試失敗,更新事務日誌,增長已異步重試次數 increaseAsyncDeliveryTryTimes(transactionLog.getId()); throw new TransactionCompensationException(ex); } // 移除重試執行成功 SQL 對應的事務日誌 remove(transactionLog.getId()); return true; }
#processData()
是帶有一些邏輯的。根據事務日誌( TransactionLog )重試執行失敗的 SQL,若成功,移除事務日誌;若失敗,更新事務日誌,增長已異步重試次數最大努力送達型事務監聽器,BestEffortsDeliveryListener,負責記錄事務日誌、同步重試執行失敗 SQL。
// BestEffortsDeliveryListener.java @Subscribe @AllowConcurrentEvents public void listen(final DMLExecutionEvent event) { if (!isProcessContinuously()) { return; } SoftTransactionConfiguration transactionConfig = SoftTransactionManager.getCurrentTransactionConfiguration().get(); TransactionLogStorage transactionLogStorage = TransactionLogStorageFactory.createTransactionLogStorage(transactionConfig.buildTransactionLogDataSource()); BEDSoftTransaction bedSoftTransaction = (BEDSoftTransaction) SoftTransactionManager.getCurrentTransaction().get(); switch (event.getEventExecutionType()) { case BEFORE_EXECUTE: // 執行前,插入事務日誌 //TODO 對於批量執行的SQL須要解析成兩層列表 transactionLogStorage.add(new TransactionLog(event.getId(), bedSoftTransaction.getTransactionId(), bedSoftTransaction.getTransactionType(), event.getDataSource(), event.getSql(), event.getParameters(), System.currentTimeMillis(), 0)); return; case EXECUTE_SUCCESS: // 執行成功,移除事務日誌 transactionLogStorage.remove(event.getId()); return; case EXECUTE_FAILURE: // 執行失敗,同步重試 boolean deliverySuccess = false; for (int i = 0; i < transactionConfig.getSyncMaxDeliveryTryTimes(); i++) { // 同步【屢次】重試 if (deliverySuccess) { return; } boolean isNewConnection = false; Connection conn = null; PreparedStatement preparedStatement = null; try { // 得到數據庫鏈接 conn = bedSoftTransaction.getConnection().getConnection(event.getDataSource(), SQLType.DML); if (!isValidConnection(conn)) { // 由於可能執行失敗是數據庫鏈接異常,因此判斷一次,若是無效,從新獲取數據庫鏈接 bedSoftTransaction.getConnection().release(conn); conn = bedSoftTransaction.getConnection().getConnection(event.getDataSource(), SQLType.DML); isNewConnection = true; } preparedStatement = conn.prepareStatement(event.getSql()); // 同步重試 //TODO 對於批量事件須要解析成兩層列表 for (int parameterIndex = 0; parameterIndex < event.getParameters().size(); parameterIndex++) { preparedStatement.setObject(parameterIndex + 1, event.getParameters().get(parameterIndex)); } preparedStatement.executeUpdate(); deliverySuccess = true; // 同步重試成功,移除事務日誌 transactionLogStorage.remove(event.getId()); } catch (final SQLException ex) { log.error(String.format("Delivery times %s error, max try times is %s", i + 1, transactionConfig.getSyncMaxDeliveryTryTimes()), ex); } finally { close(isNewConnection, conn, preparedStatement); } } return; default: throw new UnsupportedOperationException(event.getEventExecutionType().toString()); } }
BestEffortsDeliveryListener 經過 EventBus 實現監聽 SQL 的執行。Sharding-JDBC 如何實現 EventBus 的,請看《Sharding-JDBC 源碼分析 —— SQL 執行》
調用 #isProcessContinuously()
方法判斷是否處於最大努力送達型事務中,當且僅當處於該狀態才進行監聽事件處理
SQL 執行前,插入事務日誌
SQL 執行成功,移除事務日誌
SQL 執行失敗,根據柔性事務配置( SoftTransactionConfiguration )同步的事務送達的最大嘗試次數( syncMaxDeliveryTryTimes
)進行屢次重試直到成功。整體邏輯和 RdbTransactionLogStorage#processData()
方法邏輯相似,區別在於獲取分片數據庫鏈接的特殊處理:此處調用失敗,數據庫鏈接多是異常無效的,所以調用了 #isValidConnection()
判斷鏈接的有效性。若無效,則從新獲取分片數據庫鏈接。另外,如果從新獲取分片數據庫鏈接,須要進行關閉釋放 (Connection#close()
):
// BestEffortsDeliveryListener.java /** * 經過 SELECT 1 校驗數據庫鏈接是否有效 * * @param conn 數據庫鏈接 * @return 是否有效 */ private boolean isValidConnection(final Connection conn) { try (PreparedStatement preparedStatement = conn.prepareStatement("SELECT 1")) { try (ResultSet rs = preparedStatement.executeQuery()) { return rs.next() && 1 == rs.getInt("1"); } } catch (final SQLException ex) { return false; } } /** * 關閉釋放預編譯SQL對象和數據庫鏈接 * * @param isNewConnection 是否新建立的數據庫鏈接,是的狀況下才釋放 * @param conn 數據庫鏈接 * @param preparedStatement 預編譯SQL */ private void close(final boolean isNewConnection, final Connection conn, final PreparedStatement preparedStatement) { if (null != preparedStatement) { try { preparedStatement.close(); } catch (final SQLException ex) { log.error("PreparedStatement closed error:", ex); } } if (isNewConnection && null != conn) { try { conn.close(); } catch (final SQLException ex) { log.error("Connection closed error:", ex); } } }
當最大努力送達型事務監聽器( BestEffortsDeliveryListener )屢次同步重試失敗後,交給最大努力送達型異步做業進行屢次異步重試,而且屢次執行有固定間隔。
Sharding-JDBC 提供了兩個最大努力送達型異步做業實現:
二者實現代碼邏輯基本一致。前者相比後者,用於開發測試,去除對 Zookeeper 依賴,沒法實現高可用,所以生產環境下不適合使用。
BestEffortsDeliveryJob 所在 Maven 項目爲 sharding-jdbc-transaction-async-job
,基於噹噹開源的 Elastic-Job 實現。以下是官方對該 Maven 項目的簡要說明:
因爲柔性事務採用異步嘗試,須要部署獨立的做業和Zookeeper。sharding-jdbc-transaction採用elastic-job實現的sharding-jdbc-transaction-async-job,經過簡單配置便可啓動高可用做業異步送達柔性事務,啓動腳本爲start.sh。
BestEffortsDeliveryJob
public class BestEffortsDeliveryJob extends AbstractIndividualThroughputDataFlowElasticJob<TransactionLog> { /** * 最大努力送達型異步做業配置對象 */ @Setter private BestEffortsDeliveryConfiguration bedConfig; /** * 事務日誌存儲器對象 */ @Setter private TransactionLogStorage transactionLogStorage; @Override public List<TransactionLog> fetchData(final JobExecutionMultipleShardingContext context) { return transactionLogStorage.findEligibleTransactionLogs(context.getFetchDataCount(), bedConfig.getJobConfig().getMaxDeliveryTryTimes(), bedConfig.getJobConfig().getMaxDeliveryTryDelayMillis()); } @Override public boolean processData(final JobExecutionMultipleShardingContext context, final TransactionLog data) { try ( Connection conn = bedConfig.getTargetDataSource(data.getDataSource()).getConnection()) { transactionLogStorage.processData(conn, data, bedConfig.getJobConfig().getMaxDeliveryTryTimes()); } catch (final SQLException | TransactionCompensationException ex) { log.error(String.format("Async delivery times %s error, max try times is %s, exception is %s", data.getAsyncDeliveryTryTimes() + 1, bedConfig.getJobConfig().getMaxDeliveryTryTimes(), ex.getMessage())); return false; } return true; } @Override public boolean isStreamingProcess() { return false; } }
#fetchData()
方法獲取須要處理的事務日誌 (TransactionLog),內部調用了 TransactionLogStorage#findEligibleTransactionLogs()
方法#processData()
方法處理事務日誌,重試執行失敗的 SQL,內部調用了 TransactionLogStorage#processData()
#fetchData()
和 #processData()
調用是 Elastic-Job 控制的。每一輪定時調度,每條事務日誌只執行一次。當超過最大異步調用次數後,該條事務日誌再也不處理,因此生產使用時,最好增長下相應監控超過最大異步重試次數的事務日誌。AsyncSoftTransactionJobConfiguration,異步柔性事務做業配置對象。
public class AsyncSoftTransactionJobConfiguration { /** * 做業名稱. */ private String name = "bestEffortsDeliveryJob"; /** * 觸發做業的cron表達式. */ private String cron = "0/5 * * * * ?"; /** * 每次做業獲取的事務日誌最大數量. */ private int transactionLogFetchDataCount = 100; /** * 事務送達的最大嘗試次數. */ private int maxDeliveryTryTimes = 3; /** * 執行事務的延遲毫秒數. * * <p>早於此間隔時間的入庫事務纔會被做業執行.</p> */ private long maxDeliveryTryDelayMillis = 60 * 1000L; }
Sharding-JDBC 提供的最大努力送達型異步做業實現( BestEffortsDeliveryJob ),經過與 Elastic-Job 集成,能夠很便捷而且有質量保證的高可用、高性能使用。一部分團隊,可能已經引入或自研了相似 Elastic-Job 的分佈式做業中間件解決方案,每多一箇中間件,就是多一個學習與運維成本。那麼是否可使用本身的分佈式做業解決方案?答案是,能夠的。參考 BestEffortsDeliveryJob 的實現,經過調用 TransactionLogStorage 來實現:
// 僞代碼(不考慮性能、異常) List<TransactionLog> transactionLogs = transactionLogStorage.findEligibleTransactionLogs(....); for (TransactionLog transactionLog : transactionLogs) { transactionLogStorage.processData(conn, log, maxDeliveryTryTimes); }
固然,我的仍是很推薦 Elastic-Job。
😈 筆者要開始寫《Elastic-Job 源碼分析》。
另外,若是有支持事務消息的分佈式隊列系統,能夠經過 TransactionLogStorage 實現存儲事務消息存儲成消息。爲何要支持事務消息?若是 SQL 執行是成功的,須要回滾(刪除)事務消息。
哈哈哈
算是堅持把這個系列寫完了,給本身 32 個贊。
知足!
《Elastic-Job 源碼分析》 走起!不 High 不結束!