sharding-jdbc 事務支持部分觀後感

官網說明支持的事務

  • 圖就不上了,官網有http://shardingjdbc.io/docs/02-guide/transaction/
  • 幾個關鍵字弱XA事務支持,非跨庫事務,不支持因網絡、硬件異常致使的跨庫事務
  • 開始擼代碼官網demo,柔性事務管理器,見名知意。功能同spring,事務管理器大同小異,定義getTransaction方法,中間操做包裝後的jdbc Connection,後面可看見更直觀

柔性事務管理器DEMO

// 1. 配置SoftTransactionConfiguration
    SoftTransactionConfiguration transactionConfig = new SoftTransactionConfiguration(dataSource);
    transactionConfig.setXXX();
    // 2. 初始化SoftTransactionManager
    SoftTransactionManager transactionManager = new SoftTransactionManager(transactionConfig);
    transactionManager.init();
    // 3. 獲取BEDSoftTransaction
    BEDSoftTransaction transaction = (BEDSoftTransaction) transactionManager.getTransaction(SoftTransactionType.BestEffortsDelivery);
    // 4. 開啓事務
    transaction.begin(connection);
    // 5. 執行JDBC
    // 6.關閉事務
    transaction.end();
  • 分佈式事務除了強XA,無非都是借住外部存儲,心存這個概念,看代碼順暢不少。
  • @1處可發現,事務庫可支持數據庫或內存hashMap存儲,可配置。
  • @2處,先準備數據,建立事務庫的sql,本質是個table,記錄相關信息,包括sql,參數,時間,重試次數等。 上下文參數保存傳遞,經過threadlocal包裝後的工具,很通用,不作重點說明 其中關鍵方法SoftTransactionManager 中的getTransaction

柔性事務管理器

@RequiredArgsConstructor
public final class SoftTransactionManager {
    private static final String TRANSACTION = "transaction";
    private static final String TRANSACTION_CONFIG = "transactionConfig";
    @Getter
    private final SoftTransactionConfiguration transactionConfig;
	
    public void init() throws SQLException {
        //@3
        EventBusInstance.getInstance().register(new BestEffortsDeliveryListener());    
        //@1
        if (TransactionLogDataSourceType.RDB == transactionConfig.getStorageType()) {
            Preconditions.checkNotNull(transactionConfig.getTransactionLogDataSource());
            //@2
            createTable();
        }
		......略
    }
    private void createTable() throws SQLException {
        String dbSchema = "CREATE TABLE IF NOT EXISTS `transaction_log` ("
                + "`id` VARCHAR(40) NOT NULL, "
                + "`transaction_type` VARCHAR(30) NOT NULL, "
                + "`data_source` VARCHAR(255) NOT NULL, "
                + "`sql` TEXT NOT NULL, "
                + "`parameters` TEXT NOT NULL, "
                + "`creation_time` LONG NOT NULL, "
                + "`async_delivery_try_times` INT NOT NULL DEFAULT 0, "
                + "PRIMARY KEY (`id`));";
        try (
                Connection conn = transactionConfig.getTransactionLogDataSource().getConnection();
                PreparedStatement preparedStatement = conn.prepareStatement(dbSchema)) {
            preparedStatement.executeUpdate();
        }
    }
  • @3比較關鍵,guava的EventBus工具,沒接觸過的話,可理解爲一個內存隊列,在柔性事務管理器裏註冊消費者的行爲,舉例:如更新10條記錄,執行前會發送這10個sql相關信息,在事務管理器裏收到消息,記錄到事務庫。
  • @4 更改表以前,預先插入sql相關信息到事務庫,@5表更新完成後,清除事務庫對應記錄,@6執行sql重試操做,包括處理重試次數,成功後刪除事務庫記錄

最大努力推送監聽器

@Slf4j
public final class BestEffortsDeliveryListener {
    
    @Subscribe
    @AllowConcurrentEvents
    public void listen(final DMLExecutionEvent event) {
        .....略
        BEDSoftTransaction bedSoftTransaction = (BEDSoftTransaction) SoftTransactionManager.getCurrentTransaction().get();
        switch (event.getEventExecutionType()) {
            //@4
            case BEFORE_EXECUTE:
                //TODO for batch SQL need split to 2-level records
                transactionLogStorage.add(new TransactionLog(event.getId(), bedSoftTransaction.getTransactionId(), bedSoftTransaction.getTransactionType(), 
                        event.getDataSource(), event.getSql(), event.getParameters(), System.currentTimeMillis(), 0));
                return;
            //@5
            case EXECUTE_SUCCESS: 
                transactionLogStorage.remove(event.getId());
                return;
            //@6
            case EXECUTE_FAILURE: 
                boolean deliverySuccess = false;
                for (int i = 0; i < transactionConfig.getSyncMaxDeliveryTryTimes(); i++) {
                    if (deliverySuccess) {
                        return;
                    }
                    boolean isNewConnection = false;
                    Connection conn = null;
                    PreparedStatement preparedStatement = null;
                    try {
                         .....
                        preparedStatement.executeUpdate();
                        .....
                        transactionLogStorage.remove(event.getId());
                    } catch (final SQLException ex) {
                        log.error(String.format("Delivery times %s error, max try times is %s", i + 1, transactionConfig.getSyncMaxDeliveryTryTimes()), ex);
                    } finally {
                        close(isNewConnection, conn, preparedStatement);
                    }
//2種事務實現
public AbstractSoftTransaction getTransaction(final SoftTransactionType type) {
        AbstractSoftTransaction result;
        switch (type) {
            case BestEffortsDelivery: 
                result = new BEDSoftTransaction();
                break;
            case TryConfirmCancel:
                result = new TCCSoftTransaction();
                break;
            default: 
                throw new UnsupportedOperationException(type.toString());
        }
//繼續深刻
public class BEDSoftTransaction extends AbstractSoftTransaction {
    public void begin(final Connection connection) throws SQLException {
        beginInternal(connection, SoftTransactionType.BestEffortsDelivery);
    }
  • 看到了熟悉的connection.setAutoCommit(true);開啓自動提交,由事務管理器操做Connection完成
  • 由於是 單Connection裏異常了數據也不會改變,跨Connection的事務由定時任務+事務庫保證,因此設置自動提交。

開啓事務

public abstract class AbstractSoftTransaction {
    
    private boolean previousAutoCommit;
     @Getter
    private ShardingConnection connection;
    @Getter
    private SoftTransactionType transactionType;
    @Getter
    private String transactionId;
    
    protected final void    beginInternal(final Connection conn, final SoftTransactionType type) throws SQLException {
        connection = (ShardingConnection) conn;
     
        previousAutoCommit = connection.getAutoCommit();

        connection.setAutoCommit(true);

    }
}
  • 到此,可大概瞭解事務管理實現分佈式功能,操做要被事務管理的Connection,而且接受來自EventBus的消息,操做事務庫,完成單Connection的事務

具體的Connection的包裝,執行更改發送EventBus消息,下面繼續

  • 比較直接的源碼,根據查詢流程一步步進到ShardingPreparedStatement
  • @1,route()根據分庫分表生成路由PreoareStatement,到@2去執行

路由,執行

@Override
    public boolean execute() throws SQLException {
        try {
            //@1
            Collection<PreparedStatementUnit> preparedStatementUnits = route();
            //@2
            return new PreparedStatementExecutor(
                    getConnection().getShardingContext().getExecutorEngine(), routeResult.getSqlStatement().getType(), preparedStatementUnits, getParameters()).execute();
        } finally {
            clearBatch();
        }
    }
  • 一路跟進到ExecutorEngine執行引擎,發現最後更新表方法最後到execute方法
  • @1,無論成不成功,先插入事務表,成功再刪。比較可靠的方法了....
  • @2,仍是Guava的異步執行方法,見Guava API,事實上在這裏已經執行完表更改,哪怕有異常,只要沒有get(),一切繼續
  • @3,獲取異步執行結果,不懂的話,見JDK Future API
  • @4,異常出現了,投遞消息去重試了
  • @5,一切正常,投遞消息,清除事務庫。前面已設置自動提交,或交給spring事務管理器等。操做定義在事務器裏,下面繼續

具體執行流程

@Slf4j
public final class ExecutorEngine implements AutoCloseable {
    
    private final ListeningExecutorService executorService;
    
    private  <T> List<T> execute(
            final SQLType sqlType, final Collection<? extends BaseStatementUnit> baseStatementUnits, final List<List<Object>> parameterSets, final ExecuteCallback<T> executeCallback) {
        if (baseStatementUnits.isEmpty()) {
            return Collections.emptyList();
        }
        //@1 執行前插入
        OverallExecutionEvent event = new OverallExecutionEvent(sqlType, baseStatementUnits.size());
        //投遞到EventBus,事務管理區去處理
        EventBusInstance.getInstance().post(event);
        Iterator<? extends BaseStatementUnit> iterator = baseStatementUnits.iterator();
        BaseStatementUnit firstInput = iterator.next();
        //@2 
        ListenableFuture<List<T>> restFutures = asyncExecute(sqlType, Lists.newArrayList(iterator), parameterSets, executeCallback);
        T firstOutput;
        List<T> restOutputs;
        try {
            firstOutput = syncExecute(sqlType, firstInput, parameterSets, executeCallback);
            //@3 
            restOutputs = restFutures.get();
            //CHECKSTYLE:OFF
        } catch (final Exception ex) {
            //CHECKSTYLE:ON
            event.setException(ex);
            event.setEventExecutionType(EventExecutionType.EXECUTE_FAILURE);
            //@4
            EventBusInstance.getInstance().post(event);
            ExecutorExceptionHandler.handleException(ex);
            return null;
        }
        //@5
        event.setEventExecutionType(EventExecutionType.EXECUTE_SUCCESS);
        EventBusInstance.getInstance().post(event);
        List<T> result = Lists.newLinkedList(restOutputs);
        result.add(0, firstOutput);
        return result;
    }
  • @1,收集Connection的setAutoCommit操做,在準備執行sql前反射調用,更改數據庫的自動提交。代碼比較簡單,一筆帶過
public abstract class AbstractConnectionAdapter extends AbstractUnsupportedOperationConnection {
    
    @Getter
    private final Map<String, Connection> cachedConnections = new HashMap<>();
 
    @Override
    public final boolean getAutoCommit() throws SQLException {
        return autoCommit;
    }
    
    @Override
    public final void setAutoCommit(final boolean autoCommit) throws SQLException {
        this.autoCommit = autoCommit;
        //@1
        recordMethodInvocation(Connection.class, "setAutoCommit", new Class[] {boolean.class}, new Object[] {autoCommit});
        for (Connection each : cachedConnections.values()) {
            each.setAutoCommit(autoCommit);
        }
    }
    
    @Override
    public final void commit() throws SQLException {
        Collection<SQLException> exceptions = new LinkedList<>();
        for (Connection each : cachedConnections.values()) {
            try {
                    each.commit();
            } catch (final SQLException ex) {
                exceptions.add(ex);
            }
        }
        throwSQLExceptionIfNecessary(exceptions);
    }
    
    @Override
    public final void rollback() throws SQLException {
        Collection<SQLException> exceptions = new LinkedList<>();
        for (Connection each : cachedConnections.values()) {
            try {
                each.rollback();
            } catch (final SQLException ex) {
                exceptions.add(ex);
            }
        }
        throwSQLExceptionIfNecessary(exceptions);
    }

分佈式事務在Sharding-jdbc的實現

  • 3個經典的JDBC操做,Sharding-jdbc,在這裏實現了具體Connection操做的實現,這就比較靈活了,經常使用的如Spring的DataSourceTransactionManager事務管理器,在Connection上調用實現時,交給sharding-jdbc裏的這些實現了。
  • 最後總結一下上面沒說清的問題,分佈式事務若XA在Sharding-jdbc中的流程。
  1. 開啓自動提交,收集Connection到集合
  2. 執行前插入事務庫
  3. 執行JDBC代碼
  4. 異常,遍歷集合中Connection,事務回滾/ 正常,遍歷集合中Connection,事務提交
  5. 異常會有定時任務輪訓重試事務庫中sql/ 正常,清除事務庫記錄
  • 解釋了上面的3個關鍵字弱XA事務支持,非跨庫事務,不支持因網絡、硬件異常致使的跨庫事務
  1. 弱XA,單Connection事務保證
  2. 跨庫,仍是由於是基於Connection
  3. 硬件啊,網絡異常,可能致使事務庫記錄有問題,沒辦法恢復。不像Mysql會有binlog崩潰備份恢復。
  • 分庫分表的路由,sql改寫,專門在另外一篇研究
相關文章
相關標籤/搜索