sharding-jdbc在分庫分表方面提供了很大的便利性,在使用DB的時候,一般都會涉及到事務這個概念,而在分庫分表的環境上再加上事務,就會使事情變得複雜起來。本章試圖剖析sharding-jdbc在事務方面的解決思路。spring
傳統的事務模型以下:sql
Connection conn = getConnection(); try{ Statement stmt1 = conn.parpareStatement(sql1); stmt1.executeUpdate(); Statement stmt2 = conn.parpareStatement(sql2); stmt2.executeUpdate(); conn.commit(); }catch(Exception e){ conn.rollback(); }
對於同一個鏈接,能夠執行多條sql語句,任何一條語句出現錯誤的時候,整個操做流程均可以回滾,從而達到事務的原子操做。數據庫
再來看最基本的spring事務操做:緩存
class ServiceA(){ public void updateA(){...} } class ServiceB(){ public void updateB(){...} } @Transactional class ServiceC(){ public void updateC(){ serviceA.updateA(); serviceB.updateB(); } }
咱們知道,當updateC執行的時候,不論是updateA仍是updateB出現了異常,updateC均可以總體回滾,達到原子操做的效果,其主要緣由是updateA和updateB共享了同一個Connection,這是spring底層經過ThreadLocal緩存了Connection實現的。網絡
以上介紹的這兩種狀況都只是針對單庫單表的原子操做,事務的實現並不難理解,那麼在跨庫的狀況下,sharding-jdbc又是如何解決事務問題的呢?異步
在官方文檔中,針對弱事務有以下三點說明:分佈式
爲了理解以上幾點,咱們來看看sharding-jdbc默認是如何處理事務的。ide
這是一個很是常見的處理模式,一個總鏈接處理了多條sql語句,最後一次性提交整個事務,每一條sql語句可能會分爲多條子sql分庫分表去執行,這意味着底層可能會關聯多個真正的數據庫鏈接,咱們先來看看若是一切正常,commit會如何去處理。post
public abstract class AbstractConnectionAdapter extends AbstractUnsupportedOperationConnection { @Override public final void commit() throws SQLException { Collection<SQLException> exceptions = new LinkedList<>(); for (Connection each : cachedConnections.values()) { try { each.commit(); } catch (final SQLException ex) { exceptions.add(ex); } } throwSQLExceptionIfNecessary(exceptions); } }
引擎會遍歷底層全部真正的數據庫鏈接,一個個進行commit操做,若是任何一個出現了異常,直接捕獲異常,可是也只是捕獲而已,而後接着下一個鏈接的commit,這也就很好的說明了,若是在執行任何一條sql語句出現了異常,整個操做是能夠原子性回滾的,由於此時全部鏈接都不會執行commit,但若是已經到了commit這一步的話,若是有鏈接commit失敗了,是不會影響到其餘鏈接的。google
sharding-jdbc的弱事務並非完美的,有時可能會致使數據的一致性問題,因此針對某些特定的場景,又提出了柔性事務的概念。先來看一張官方的說明圖:
這裏想表達兩個意思:
1. 對於sql的執行,在執行前記錄日誌,若是執行成功,把日誌刪除,若是執行失敗,重試必定次數(若是未達到最大嘗試次數便執行成功了,同樣刪除日誌)。
2. 異步任務不斷掃描執行日誌,若是重試次數未達到最大上限,嘗試從新執行,若是執行成功,刪除日誌。
從上面兩點分析能夠看出,因爲採用的是重試的模式,也就是說同一條語句,是有可能被屢次執行的,因此官方提到了柔性事務的適用場景:
並且它還有必定的限制: SQL須要知足冪等性,具體爲:
在有了一個大概的瞭解以後,咱們來更加深刻的瞭解。
sharding-jdbc使用了google的EventBus事件模型,註冊了一個Listener,監聽器對三種事件進行了處理,以下代碼所示:
switch (event.getEventExecutionType()) { case BEFORE_EXECUTE: transactionLogStorage.add(new TransactionLog(event.getId(), bedSoftTransaction.getTransactionId(), bedSoftTransaction.getTransactionType(), event.getDataSource(), event.getSql(), event.getParameters(), System.currentTimeMillis(), 0)); return; case EXECUTE_SUCCESS: transactionLogStorage.remove(event.getId()); return; case EXECUTE_FAILURE: boolean deliverySuccess = false; for (int i = 0; i < transactionConfig.getSyncMaxDeliveryTryTimes(); i++) { if (deliverySuccess) { return; } boolean isNewConnection = false; Connection conn = null; PreparedStatement preparedStatement = null; try { conn = bedSoftTransaction.getConnection().getConnection(event.getDataSource(), SQLType.DML); if (!isValidConnection(conn)) { bedSoftTransaction.getConnection().release(conn); conn = bedSoftTransaction.getConnection().getConnection(event.getDataSource(), SQLType.DML); isNewConnection = true; } preparedStatement = conn.prepareStatement(event.getSql()); //TODO for batch event need split to 2-level records for (int parameterIndex = 0; parameterIndex < event.getParameters().size(); parameterIndex++) { preparedStatement.setObject(parameterIndex + 1, event.getParameters().get(parameterIndex)); } preparedStatement.executeUpdate(); deliverySuccess = true; transactionLogStorage.remove(event.getId()); } catch (final SQLException ex) { log.error(String.format("Delivery times %s error, max try times is %s", i + 1, transactionConfig.getSyncMaxDeliveryTryTimes()), ex); } finally { close(isNewConnection, conn, preparedStatement); } } return; default: throw new UnsupportedOperationException(event.getEventExecutionType().toString()); }
以上代碼能夠抽取爲以下圖的描述:
監聽器根據三種不一樣的事件類型對事務日誌進行不一樣的操做。有監聽 ,必然就有事件的投遞,那麼引擎是何時產生這些事件的呢?
咱們知道每一條sql語句拆分後有可能對應多條子sql語句,而每一條子sql語句是單獨執行的,執行是封裝在一個內部方法的:
private <T> T executeInternal(final SQLType sqlType, final BaseStatementUnit baseStatementUnit, final List<List<Object>> parameterSets, final ExecuteCallback<T> executeCallback, final boolean isExceptionThrown, final Map<String, Object> dataMap) throws Exception { synchronized (baseStatementUnit.getStatement().getConnection()) { T result; ExecutorExceptionHandler.setExceptionThrown(isExceptionThrown); ExecutorDataMap.setDataMap(dataMap); List<AbstractExecutionEvent> events = new LinkedList<>(); if (parameterSets.isEmpty()) { events.add(getExecutionEvent(sqlType, baseStatementUnit, Collections.emptyList())); } for (List<Object> each : parameterSets) { events.add(getExecutionEvent(sqlType, baseStatementUnit, each)); } for (AbstractExecutionEvent event : events) { EventBusInstance.getInstance().post(event); } try { result = executeCallback.execute(baseStatementUnit); } catch (final SQLException ex) { for (AbstractExecutionEvent each : events) { each.setEventExecutionType(EventExecutionType.EXECUTE_FAILURE); each.setException(Optional.of(ex)); EventBusInstance.getInstance().post(each); ExecutorExceptionHandler.handleException(ex); } return null; } for (AbstractExecutionEvent each : events) { each.setEventExecutionType(EventExecutionType.EXECUTE_SUCCESS); EventBusInstance.getInstance().post(each); } return result; } }
以上代碼能夠簡化爲以下流程:
其實執行流程比較簡單,但還有兩個重要的細節這裏沒有體現:
當使用柔性事務的時候,須要建立事務管理器,並獲取事務對象,調用事務對象的begin開始一個事務,在這一步,會強制設置鏈接的autoCommit=true,這會致使全部的sql語句執時後當即提交,想一想若是能回滾,那柔性事務也就失去了意義。
當事務執行begin時,會標記當前不拋出異常,這樣當執行sql語句有異常時,會生成相應的EXECUTE_FAILURE事件,從而進行事務日誌處理,而不是往外拋出異常,當事務結束時,調用事務對象的end方法,恢復異常的捕獲。
一個常見的代碼編寫模式以下(來自官方的demo)
private static void updateFailure(final DataSource dataSource) throws SQLException { String sql1 = "UPDATE t_order SET status='UPDATE_1' WHERE user_id=10 AND order_id=1000"; String sql2 = "UPDATE t_order SET not_existed_column=1 WHERE user_id=1 AND order_id=?"; String sql3 = "UPDATE t_order SET status='UPDATE_2' WHERE user_id=10 AND order_id=1000"; SoftTransactionManager transactionManager = new SoftTransactionManager(getSoftTransactionConfiguration(dataSource)); transactionManager.init(); BEDSoftTransaction transaction = (BEDSoftTransaction) transactionManager.getTransaction(SoftTransactionType.BestEffortsDelivery); Connection conn = null; try { conn = dataSource.getConnection(); transaction.begin(conn); PreparedStatement preparedStatement1 = conn.prepareStatement(sql1); PreparedStatement preparedStatement2 = conn.prepareStatement(sql2); preparedStatement2.setObject(1, 1000); PreparedStatement preparedStatement3 = conn.prepareStatement(sql3); preparedStatement1.executeUpdate(); preparedStatement2.executeUpdate(); preparedStatement3.executeUpdate(); } finally { transaction.end(); if (conn != null) { conn.close(); } } }
看到這個編寫模式,你必定會想,若是我使用MyBatis和spring,這一切可否整合起來,這個話題有興趣你們能夠去嘗試。
分佈式事務處理起來有必定的難度,sharding-jdbc採用了簡單的弱事務模式和特殊場景下的柔性事務模式,沒有最好,只有更好,根據自身業務去選擇事務模式纔是最重要的。