數據庫分庫分表中間件 Sharding-JDBC 源碼分析 —— SQL 執行

摘要: 原創出處 www.iocoder.cn/Sharding-JD… 「芋道源碼」歡迎轉載,保留摘要,謝謝!html

本文主要基於 Sharding-JDBC 1.5.0 正式版 java


🙂🙂🙂關注微信公衆號:【芋道源碼】有福利: mysql

  1. RocketMQ / MyCAT / Sharding-JDBC 全部源碼分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文註釋源碼 GitHub 地址
  3. 您對於源碼的疑問每條留言將獲得認真回覆。甚至不知道如何讀源碼也能夠請教噢
  4. 新的源碼解析文章實時收到通知。每週更新一篇左右
  5. 認真的源碼交流微信羣。
  6. 掘金Java QQ 羣:217878901

1. 概述

越過千山萬水(SQL 解析、SQL 路由、SQL 改寫),咱們終於來到了 SQL 執行。開森不開森?!git

本文主要分享SQL 執行的過程,不包括結果聚合《結果聚合》 東半球第二良心筆者會更新,關注微信公衆號【芋道源碼】完稿後第一時間通知您喲。github

綠框部分 SQL 執行主流程。sql


Sharding-JDBC 正在收集使用公司名單:傳送門
🙂 你的登記,會讓更多人蔘與和使用 Sharding-JDBC。傳送門
Sharding-JDBC 也會所以,可以覆蓋更多的業務場景。傳送門
登記吧,騷年!傳送門
數據庫

2. ExecutorEngine

ExecutorEngine,SQL執行引擎。編程

分表分庫,須要執行的 SQL 數量從單條變成了多條,此時有兩種方式執行:數組

  • 串行執行 SQL
  • 並行執行 SQL

前者,編碼容易,性能較差,總耗時是多條 SQL 執行時間累加。
後者,編碼複雜,性能較好,總耗時約等於執行時間最長的 SQL。安全

👼 ExecutorEngine 固然採用的是後者,並行執行 SQL。

2.1 ListeningExecutorService

Guava( Java 工具庫 ) 提供的繼承自 ExecutorService 的線程服務接口,提供建立 ListenableFuture 功能。ListenableFuture 接口,繼承 Future 接口,有以下好處:

咱們強烈地建議你在代碼中多使用ListenableFuture來代替JDK的 Future, 由於:

  • 大多數Futures 方法中須要它。
  • 轉到ListenableFuture 編程比較容易。
  • Guava提供的通用公共類封裝了公共的操做方方法,不須要提供Future和ListenableFuture的擴展方法。

傳統JDK中的Future經過異步的方式計算返回結果:在多線程運算中可能或者可能在沒有結束返回結果,Future是運行中的多線程的一個引用句柄,確保在服務執行返回一個Result。

ListenableFuture能夠容許你註冊回調方法(callbacks),在運算(多線程執行)完成的時候進行調用, 或者在運算(多線程執行)完成後當即執行。這樣簡單的改進,使得能夠明顯的支持更多的操做,這樣的功能在JDK concurrent中的Future是不支持的。

如上內容來自《Google Guava包的ListenableFuture解析
,文章寫的很棒。下文你會看到 Sharding-JDBC 是如何經過 ListenableFuture 簡化併發編程的

下面看看 ExecutorEngine 如何初始化 ListeningExecutorService

// ShardingDataSource.java
public ShardingDataSource(final ShardingRule shardingRule, final Properties props) {
    // .... 省略部分代碼
   shardingProperties = new ShardingProperties(props);
   int executorSize = shardingProperties.getValue(ShardingPropertiesConstant.EXECUTOR_SIZE);
   executorEngine = new ExecutorEngine(executorSize);
   // .... 省略部分代碼
}

// ExecutorEngine
public ExecutorEngine(final int executorSize) {
   executorService = MoreExecutors.listeningDecorator(new ThreadPoolExecutor(
           executorSize, executorSize, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
           new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ShardingJDBC-%d").build()));
   MoreExecutors.addDelayedShutdownHook(executorService, 60, TimeUnit.SECONDS);
}複製代碼
  • 一個分片數據源( ShardingDataSource ) 獨佔 一個 SQL執行引擎( ExecutorEngine )。
  • MoreExecutors#listeningDecorator() 建立 ListeningExecutorService,這樣 #submit()#invokeAll() 能夠返回 ListenableFuture。
  • 默認狀況下,線程池大小爲 8。能夠根據實際業務須要,設置 ShardingProperties 進行調整。
  • #setNameFormat() 併發編程時,必定要對線程名字作下定義,這樣排查問題會方便不少。
  • MoreExecutors#addDelayedShutdownHook()應用關閉時,等待全部任務所有完成再關閉。默認配置等待時間爲 60 秒,建議將等待時間作成可配的。

2.2 關閉

數據源關閉時,會調用 ExecutorEngine 也進行關閉。

// ShardingDataSource.java
@Override
public void close() {
   executorEngine.close();
}

// ExecutorEngine
@Override
public void close() {
   executorService.shutdownNow();
   try {
       executorService.awaitTermination(5, TimeUnit.SECONDS);
   } catch (final InterruptedException ignored) {
   }
   if (!executorService.isTerminated()) {
       throw new ShardingJdbcException("ExecutorEngine can not been terminated");
   }
}複製代碼
  • #shutdownNow() 嘗試使用 Thread.interrupt() 打斷正在執行中的任務,未執行的任務再也不執行。建議打印下哪些任務未執行,由於 SQL 未執行,可能數據未能持久化。
  • #awaitTermination() 由於 #shutdownNow() 打斷不是當即結束,須要一個過程,所以這裏等待了 5 秒。
  • 等待 5 秒後,線程池不必定已經關閉,此時拋出異常給上層。建議打印下日誌,記錄出現這個狀況。

2.3 執行 SQL 任務

ExecutorEngine 對外暴露 #executeStatement()#executePreparedStatement()#executeBatch()

三個方法分別提供給 StatementExecutor、PreparedStatementExecutor、BatchPreparedStatementExecutor 調用。而這三個方法,內部調用的都是 #execute() 私有方法。

// ExecutorEngine.java
/** * 執行Statement. * @param sqlType SQL類型 * @param statementUnits 語句對象執行單元集合 * @param executeCallback 執行回調函數 * @param <T> 返回值類型 * @return 執行結果 */
public <T> List<T> executeStatement(final SQLType sqlType, final Collection<StatementUnit> statementUnits, final ExecuteCallback<T> executeCallback) {
   return execute(sqlType, statementUnits, Collections.<List<Object>>emptyList(), executeCallback);
}

/** * 執行PreparedStatement. * @param sqlType SQL類型 * @param preparedStatementUnits 語句對象執行單元集合 * @param parameters 參數列表 * @param executeCallback 執行回調函數 * @param <T> 返回值類型 * @return 執行結果 */
public <T> List<T> executePreparedStatement( final SQLType sqlType, final Collection<PreparedStatementUnit> preparedStatementUnits, final List<Object> parameters, final ExecuteCallback<T> executeCallback) {
   return execute(sqlType, preparedStatementUnits, Collections.singletonList(parameters), executeCallback);
}

/** * 執行Batch. * @param sqlType SQL類型 * @param batchPreparedStatementUnits 語句對象執行單元集合 * @param parameterSets 參數列表集 * @param executeCallback 執行回調函數 * @return 執行結果 */
public List<int[]> executeBatch(
       final SQLType sqlType, final Collection<BatchPreparedStatementUnit> batchPreparedStatementUnits, final List<List<Object>> parameterSets, final ExecuteCallback<int[]> executeCallback) {
   return execute(sqlType, batchPreparedStatementUnits, parameterSets, executeCallback);
}複製代碼

#execute() 執行過程大致流程以下圖:

/** * 執行 * * @param sqlType SQL 類型 * @param baseStatementUnits 語句對象執行單元集合 * @param parameterSets 參數列表集 * @param executeCallback 執行回調函數 * @param <T> 返回值類型 * @return 執行結果 */
private  <T> List<T> execute( final SQLType sqlType, final Collection<? extends BaseStatementUnit> baseStatementUnits, final List<List<Object>> parameterSets, final ExecuteCallback<T> executeCallback) {
   if (baseStatementUnits.isEmpty()) {
       return Collections.emptyList();
   }
   Iterator<? extends BaseStatementUnit> iterator = baseStatementUnits.iterator();
   BaseStatementUnit firstInput = iterator.next();
   // 第二個任務開始全部 SQL任務 提交線程池【異步】執行任務
   ListenableFuture<List<T>> restFutures = asyncExecute(sqlType, Lists.newArrayList(iterator), parameterSets, executeCallback);
   T firstOutput;
   List<T> restOutputs;
   try {
       // 第一個任務【同步】執行任務
       firstOutput = syncExecute(sqlType, firstInput, parameterSets, executeCallback);
       // 等待第二個任務開始全部 SQL任務完成
       restOutputs = restFutures.get();
       //CHECKSTYLE:OFF
   } catch (final Exception ex) {
       //CHECKSTYLE:ON
       ExecutorExceptionHandler.handleException(ex);
       return null;
   }
   // 返回結果
   List<T> result = Lists.newLinkedList(restOutputs);
   result.add(0, firstOutput);
   return result;
}複製代碼
  • 第一個任務【同步】調用 #executeInternal() 執行任務。
private <T> T syncExecute(final SQLType sqlType, final BaseStatementUnit baseStatementUnit, final List<List<Object>> parameterSets, final ExecuteCallback<T> executeCallback) throws Exception {
   // 【同步】執行任務
   return executeInternal(sqlType, baseStatementUnit, parameterSets, executeCallback, ExecutorExceptionHandler.isExceptionThrown(), ExecutorDataMap.getDataMap());
}複製代碼
  • 第二個開始的任務提交線程池異步調用 #executeInternal() 執行任務。
private <T> ListenableFuture<List<T>> asyncExecute(
       final SQLType sqlType, final Collection<BaseStatementUnit> baseStatementUnits, final List<List<Object>> parameterSets, final ExecuteCallback<T> executeCallback) {
   List<ListenableFuture<T>> result = new ArrayList<>(baseStatementUnits.size());
   final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
   final Map<String, Object> dataMap = ExecutorDataMap.getDataMap();
   for (final BaseStatementUnit each : baseStatementUnits) {
       // 提交線程池【異步】執行任務
       result.add(executorService.submit(new Callable<T>() {

           @Override
           public T call() throws Exception {
               return executeInternal(sqlType, each, parameterSets, executeCallback, isExceptionThrown, dataMap);
           }
       }));
   }
   // 返回 ListenableFuture
   return Futures.allAsList(result);
}複製代碼
  • 咱們注意下 Futures.allAsList(result);restOutputs = restFutures.get();。神器 Guava 簡化併發編程 的好處就提現出來了。ListenableFuture#get()全部任務都成功時,返回全部任務執行結果;當任何一個任務失敗時,立刻拋出異常,無需等待其餘任務執行完成。

_😮 Guava 真她喵神器,公衆號:【芋道源碼】會更新 Guava 源碼分享的一個系列喲!老司機還不趕忙上車?_

  • 爲何會分同步執行和異步執行呢?猜想,當SQL 執行是單表時,只要進行第一個任務的同步調用,性能更加優秀。等跟張亮大神請教確認緣由後,咱會進行更新。
// ExecutorEngine.java
private <T> T executeInternal(final SQLType sqlType, final BaseStatementUnit baseStatementUnit, final List<List<Object>> parameterSets, final ExecuteCallback<T> executeCallback, final boolean isExceptionThrown, final Map<String, Object> dataMap) throws Exception {
   synchronized (baseStatementUnit.getStatement().getConnection()) {
       T result;
       ExecutorExceptionHandler.setExceptionThrown(isExceptionThrown);
       ExecutorDataMap.setDataMap(dataMap);
       List<AbstractExecutionEvent> events = new LinkedList<>();
       // 生成 Event
       if (parameterSets.isEmpty()) {
           events.add(getExecutionEvent(sqlType, baseStatementUnit, Collections.emptyList()));
       } else {
           for (List<Object> each : parameterSets) {
               events.add(getExecutionEvent(sqlType, baseStatementUnit, each));
           }
       }
       // EventBus 發佈 EventExecutionType.BEFORE_EXECUTE
       for (AbstractExecutionEvent event : events) {
           EventBusInstance.getInstance().post(event);
       }
       try {
           // 執行回調函數
           result = executeCallback.execute(baseStatementUnit);
       } catch (final SQLException ex) {
           // EventBus 發佈 EventExecutionType.EXECUTE_FAILURE
           for (AbstractExecutionEvent each : events) {
               each.setEventExecutionType(EventExecutionType.EXECUTE_FAILURE);
               each.setException(Optional.of(ex));
               EventBusInstance.getInstance().post(each);
               ExecutorExceptionHandler.handleException(ex);
           }
           return null;
       }
       // EventBus 發佈 EventExecutionType.EXECUTE_SUCCESS
       for (AbstractExecutionEvent each : events) {
           each.setEventExecutionType(EventExecutionType.EXECUTE_SUCCESS);
           EventBusInstance.getInstance().post(each);
       }
       return result;
   }
}複製代碼
  • result = executeCallback.execute(baseStatementUnit); 執行回調函數。StatementExecutor,PreparedStatementExecutor,BatchPreparedStatementExecutor 經過傳遞執行回調函數( ExecuteCallback )實現給 ExecutorEngine 實現並行執行。
public interface ExecuteCallback<T> {
    /** * 執行任務. * * @param baseStatementUnit 語句對象執行單元 * @return 處理結果 * @throws Exception 執行期異常 */
    T execute(BaseStatementUnit baseStatementUnit) throws Exception;
}複製代碼
  • synchronized (baseStatementUnit.getStatement().getConnection()) 原覺得 Connection 非線程安全,所以須要用同步,後翻查資料《數據庫鏈接池爲何要創建多個鏈接》,Connection 是線程安全的。等跟張亮大神請教確認緣由後,咱會進行更新。

    • 解答:MySQL、Oracle 的 Connection 實現是線程安全的。數據庫鏈接池實現的 Connection 不必定是線程安全,例如 Druid 的線程池 Connection 非線程安全

      FROM github.com/dangdangdot…
      druid的數據源的stat這種filter在併發使用同一個connection連接時沒有考慮線程安全的問題,故形成多個線程修改filter中的狀態異常。
      改造這個問題時,考慮到mysql驅動在執行statement時對同一個connection是線程安全的。也就是說同一個數據庫連接的會話是串行執行的。故在sjdbc的executor對於多線程執行的狀況也進行了針對數據庫連接級別的同步。故該方案不會下降sjdbc的性能。
      同時jdk1.7版本的同步採用了鎖升級技術,在碰撞較低的狀況下開銷也是很小的。

  • ExecutionEvent 這裏先不解釋,在本文第四節【EventBus】分享。

  • ExecutorExceptionHandler、ExecutorDataMap 和 柔性事務 ( AbstractSoftTransaction ),放在《柔性事務》分享。

3. Executor

Executor,執行器,目前一共有三個執行器。不一樣的執行器對應不一樣的執行單元 (BaseStatementUnit)。

執行器類 執行器名 執行單元
StatementExecutor 靜態語句對象執行單元 StatementUnit
PreparedStatementExecutor 預編譯語句對象請求的執行器 PreparedStatementUnit
BatchPreparedStatementExecutor 批量預編譯語句對象請求的執行器 BatchPreparedStatementUnit
  • 執行器提供的方法不一樣,所以不存在公用接口或者抽象類。
  • 執行單元繼承自 BaseStatementUnit

3.1 StatementExecutor

StatementExecutor,多線程執行靜態語句對象請求的執行器,一共有三類方法:

  • #executeQuery()
// StatementExecutor.java
/** * 執行SQL查詢. * @return 結果集列表 */
public List<ResultSet> executeQuery() {
   Context context = MetricsContext.start("ShardingStatement-executeQuery");
   List<ResultSet> result;
   try {
       result = executorEngine.executeStatement(sqlType, statementUnits, new ExecuteCallback<ResultSet>() {
           @Override
           public ResultSet execute(final BaseStatementUnit baseStatementUnit) throws Exception {
               return baseStatementUnit.getStatement().executeQuery(baseStatementUnit.getSqlExecutionUnit().getSql());
           }
       });
   } finally {
       MetricsContext.stop(context);
   }
   return result;
}複製代碼
  • #executeUpdate() 由於有四個不一樣狀況的#executeUpdate(),因此抽象了 Updater 接口,從而達到邏輯重用。
// StatementExecutor.java
/** * 執行SQL更新. * @return 更新數量 */
public int executeUpdate() {
   return executeUpdate(new Updater() {
       @Override
       public int executeUpdate(final Statement statement, final String sql) throws SQLException {
           return statement.executeUpdate(sql);
       }
   });
}
private int executeUpdate(final Updater updater) {
   Context context = MetricsContext.start("ShardingStatement-executeUpdate");
   try {
       List<Integer> results = executorEngine.executeStatement(sqlType, statementUnits, new ExecuteCallback<Integer>() {
           @Override
           public Integer execute(final BaseStatementUnit baseStatementUnit) throws Exception {
               return updater.executeUpdate(baseStatementUnit.getStatement(), baseStatementUnit.getSqlExecutionUnit().getSql());
           }
       });
       return accumulate(results);
   } finally {
       MetricsContext.stop(context);
   }
}
/** * 計算總的更新數量 * @param results 更新數量數組 * @return 更新數量 */
private int accumulate(final List<Integer> results) {
   int result = 0;
   for (Integer each : results) {
       result += null == each ? 0 : each;
   }
   return result;
}複製代碼
  • #execute() 由於有四個不一樣狀況的#execute(),因此抽象了 Executor 接口,從而達到邏輯重用。
/** * 執行SQL請求. * @return true表示執行DQL語句, false表示執行的DML語句 */
public boolean execute() {
   return execute(new Executor() {

       @Override
       public boolean execute(final Statement statement, final String sql) throws SQLException {
           return statement.execute(sql);
       }
   });
}
private boolean execute(final Executor executor) {
   Context context = MetricsContext.start("ShardingStatement-execute");
   try {
       List<Boolean> result = executorEngine.executeStatement(sqlType, statementUnits, new ExecuteCallback<Boolean>() { 
           @Override
           public Boolean execute(final BaseStatementUnit baseStatementUnit) throws Exception {
               return executor.execute(baseStatementUnit.getStatement(), baseStatementUnit.getSqlExecutionUnit().getSql());
           }
       });
       if (null == result || result.isEmpty() || null == result.get(0)) {
           return false;
       }
       return result.get(0);
   } finally {
       MetricsContext.stop(context);
   }
}複製代碼

3.2 PreparedStatementExecutor

PreparedStatementExecutor,多線程執行預編譯語句對象請求的執行器。比 StatementExecutor 多了 parameters 參數,方法邏輯上基本一致,就不重複分享啦。

3.3 BatchPreparedStatementExecutor

BatchPreparedStatementExecutor,多線程執行批量預編譯語句對象請求的執行器。

// BatchPreparedStatementExecutor.java
/** * 執行批量SQL. * * @return 執行結果 */
public int[] executeBatch() {
   Context context = MetricsContext.start("ShardingPreparedStatement-executeBatch");
   try {
       return accumulate(executorEngine.executeBatch(sqlType, batchPreparedStatementUnits, parameterSets, new ExecuteCallback<int[]>() {

           @Override
           public int[] execute(final BaseStatementUnit baseStatementUnit) throws Exception {
               return baseStatementUnit.getStatement().executeBatch();
           }
       }));
   } finally {
       MetricsContext.stop(context);
   }
}
/** * 計算每一個語句的更新數量 * * @param results 每條 SQL 更新數量 * @return 每一個語句的更新數量 */
private int[] accumulate(final List<int[]> results) {
   int[] result = new int[parameterSets.size()];
   int count = 0;
   // 每一個語句按照順序,讀取到其對應的每一個分片SQL影響的行數進行累加
   for (BatchPreparedStatementUnit each : batchPreparedStatementUnits) {
       for (Map.Entry<Integer, Integer> entry : each.getJdbcAndActualAddBatchCallTimesMap().entrySet()) {
           result[entry.getKey()] += null == results.get(count) ? 0 : results.get(count)[entry.getValue()];
       }
       count++;
   }
   return result;
}複製代碼

眼尖的同窗會發現,爲何有 BatchPreparedStatementExecutor,而沒有 BatchStatementExecutor 呢?目前 Sharding-JDBC 不支持 Statement 批量操做,只能進行 PreparedStatement 的批操做。

// PreparedStatement 批量操做,不會報錯
PreparedStatement ps = conn.prepareStatement(sql)
ps.addBatch();
ps.addBatch();

// Statement 批量操做,會報錯
ps.addBatch(sql); // 報錯:at com.dangdang.ddframe.rdb.sharding.jdbc.unsupported.AbstractUnsupportedOperationStatement.addBatch複製代碼

4. ExecutionEvent

AbstractExecutionEvent,SQL 執行事件抽象接口。

public abstract class AbstractExecutionEvent {
    /** * 事件編號 */
    private final String id;
    /** * 數據源 */
    private final String dataSource;
    /** * SQL */
    private final String sql;
    /** * 參數 */
    private final List<Object> parameters;
    /** * 事件類型 */
    private EventExecutionType eventExecutionType;
    /** * 異常 */
    private Optional<SQLException> exception;
}複製代碼

AbstractExecutionEvent 有兩個實現子類:

  • DMLExecutionEvent:DML類SQL執行時事件
  • DQLExecutionEvent:DQL類SQL執行時事件

EventExecutionType,事件觸發類型。

  • BEFORE_EXECUTE:執行前
  • EXECUTE_SUCCESS:執行成功
  • EXECUTE_FAILURE:執行失敗

4.1 EventBus

那究竟有什麼用途呢? Sharding-JDBC 使用 Guava(沒錯,又是它)的 EventBus 實現了事件的發佈和訂閱。從上文 ExecutorEngine#executeInternal() 咱們能夠看到每一個分片 SQL 執行的過程當中會發布相應事件:

  • 執行 SQL 前:發佈類型類型爲 BEFORE_EXECUTE 的事件
  • 執行 SQL 成功:發佈類型類型爲 EXECUTE_SUCCESS 的事件
  • 執行 SQL 失敗:發佈類型類型爲 EXECUTE_FAILURE 的事件

怎麼訂閱事件呢?很是簡單,例子以下:

EventBusInstance.getInstance().register(new Runnable() {

  @Override
  public void run() {
  }

  @Subscribe // 訂閱
  @AllowConcurrentEvents // 是否容許併發執行,即線程安全
  public void listen(final DMLExecutionEvent event) { // DMLExecutionEvent
      System.out.println("DMLExecutionEvent:" + event.getSql() + "\t" + event.getEventExecutionType());
  }

  @Subscribe // 訂閱
  @AllowConcurrentEvents // 是否容許併發執行,即線程安全
  public void listen2(final DQLExecutionEvent event) { //DQLExecutionEvent
      System.out.println("DQLExecutionEvent:" + event.getSql() + "\t" + event.getEventExecutionType());
  }

});複製代碼
  • #register() 任何類均可以,並不是必定須要使用 Runnable 類。此處例子單純由於方便
  • @Subscribe 註解在方法上,實現對事件的訂閱
  • @AllowConcurrentEvents 註解在方法上,表示線程安全,容許併發執行
  • 方法上的參數對應的類便是訂閱的事件。例如,#listen() 訂閱了 DMLExecutionEvent 事件
  • EventBus#post() 發佈事件,同步調用訂閱邏輯

Sharding-JDBC 正在收集使用公司名單:傳送門
🙂 你的登記,會讓更多人蔘與和使用 Sharding-JDBC。傳送門
Sharding-JDBC 也會所以,可以覆蓋更多的業務場景。傳送門
登記吧,騷年!傳送門

4.2 BestEffortsDeliveryListener

BestEffortsDeliveryListener,最大努力送達型事務監聽器。

本文暫時暫時不分析其實現,僅僅做爲另一個訂閱者的例子。咱們會在《柔性事務》進行分享。

public final class BestEffortsDeliveryListener {
    @Subscribe
    @AllowConcurrentEvents
    public void listen(final DMLExecutionEvent event) {
        if (!isProcessContinuously()) {
            return;
        }
        SoftTransactionConfiguration transactionConfig = SoftTransactionManager.getCurrentTransactionConfiguration().get();
        TransactionLogStorage transactionLogStorage = TransactionLogStorageFactory.createTransactionLogStorage(transactionConfig.buildTransactionLogDataSource());
        BEDSoftTransaction bedSoftTransaction = (BEDSoftTransaction) SoftTransactionManager.getCurrentTransaction().get();
        switch (event.getEventExecutionType()) {
            case BEFORE_EXECUTE:
                //TODO 對於批量執行的SQL須要解析成兩層列表
                transactionLogStorage.add(new TransactionLog(event.getId(), bedSoftTransaction.getTransactionId(), bedSoftTransaction.getTransactionType(), 
                        event.getDataSource(), event.getSql(), event.getParameters(), System.currentTimeMillis(), 0));
                return;
            case EXECUTE_SUCCESS: 
                transactionLogStorage.remove(event.getId());
                return;
            case EXECUTE_FAILURE: 
                boolean deliverySuccess = false;
                for (int i = 0; i < transactionConfig.getSyncMaxDeliveryTryTimes(); i++) {
                    if (deliverySuccess) {
                        return;
                    }
                    boolean isNewConnection = false;
                    Connection conn = null;
                    PreparedStatement preparedStatement = null;
                    try {
                        conn = bedSoftTransaction.getConnection().getConnection(event.getDataSource(), SQLType.UPDATE);
                        if (!isValidConnection(conn)) {
                            bedSoftTransaction.getConnection().release(conn);
                            conn = bedSoftTransaction.getConnection().getConnection(event.getDataSource(), SQLType.UPDATE);
                            isNewConnection = true;
                        }
                        preparedStatement = conn.prepareStatement(event.getSql());
                        //TODO 對於批量事件須要解析成兩層列表
                        for (int parameterIndex = 0; parameterIndex < event.getParameters().size(); parameterIndex++) {
                            preparedStatement.setObject(parameterIndex + 1, event.getParameters().get(parameterIndex));
                        }
                        preparedStatement.executeUpdate();
                        deliverySuccess = true;
                        transactionLogStorage.remove(event.getId());
                    } catch (final SQLException ex) {
                        log.error(String.format("Delivery times %s error, max try times is %s", i + 1, transactionConfig.getSyncMaxDeliveryTryTimes()), ex);
                    } finally {
                        close(isNewConnection, conn, preparedStatement);
                    }
                }
                return;
            default: 
                throw new UnsupportedOperationException(event.getEventExecutionType().toString());
        }
    }
}複製代碼

666. 彩蛋

本文完,但也未完。

跨分片事務問題。例如:

UPDATE t_order SET nickname = ? WHERE user_id = ?複製代碼

A 節點 connection.commit() 時,應用忽然掛了!B節點 connection.commit() 還來不及執行。
咱們一塊兒去《柔性事務》尋找答案。

道友,分享一波朋友圈可好?

相關文章
相關標籤/搜索