數據庫中間件 Sharding-JDBC 源碼分析 —— 分佈式事務(一)之最大努力型

摘要: 原創出處 http://www.iocoder.cn/Sharding-JDBC/transaction-bed/ 「芋道源碼」歡迎轉載,保留摘要,謝謝!java

本文主要基於 Sharding-JDBC 1.5.0 正式版git


🙂🙂🙂關注**微信公衆號:【芋道源碼】**有福利:github

  1. RocketMQ / MyCAT / Sharding-JDBC 全部源碼分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文註釋源碼 GitHub 地址
  3. 您對於源碼的疑問每條留言將獲得認真回覆。甚至不知道如何讀源碼也能夠請教噢
  4. 新的源碼解析文章實時收到通知。每週更新一篇左右
  5. 認真的源碼交流微信羣。

1. 概述

數據庫表分庫後,業務場景下的單庫本地事務可能變成跨庫分佈式事務。雖然咱們能夠經過合適的分庫規則讓操做的數據在同庫下,繼續保證單庫本地事務,這也是很是推崇的,但不是全部場景下都能適用。若是這些場景對事務的一致性有要求,咱們就不得不解決分佈式事務的「麻煩」。sql

分佈式事務是個很大的話題,咱們來看看 Sharding-JDBC 對她的權衡:數據庫

Sharding-JDBC因爲性能方面的考量,決定不支持強一致性分佈式事務。咱們已明確規劃線路圖,將來會支持最終一致性的柔性事務。微信

Sharding-JDBC 提供了兩種 柔性事務架構

  • 最大努力送達型 BED :已經實現
  • 事務補償型 TCC :計劃中

本文分享 最大努力送達型 的實現。建議前置閱讀:《Sharding-JDBC 源碼分析 —— SQL 執行》運維

Sharding-JDBC 正在收集使用公司名單:傳送門
🙂 你的登記,會讓更多人蔘與和使用 Sharding-JDBC。傳送門
Sharding-JDBC 也會所以,可以覆蓋更多的業務場景。傳送門
登記吧,騷年!傳送門
dom

2. 最大努力送達型

概念異步

在分佈式數據庫的場景下,相信對於該數據庫的操做最終必定能夠成功,因此經過最大努力反覆嘗試送達操做。

從概念看,可能不是很直白的理解是什麼意思,本文會最大努力讓你乾淨理解。

架構圖

執行過程有 四種 狀況:

  1. 【紅線】執行成功
  2. 【棕線】執行失敗,同步重試成功
  3. 【粉線】執行失敗,同步重試失敗,異步重試成功
  4. 【綠線】執行失敗,同步重試失敗,異步重試失敗,事務日誌保留

總體成漏斗倒三角,上一個階段失敗,交給下一個階段重試:

整個過程經過以下 組件 完成:

  • 柔性事務管理器
  • 最大努力送達型柔性事務
  • 最大努力送達型事務監聽器
  • 事務日誌存儲器
  • 最大努力送達型異步做業

下面,咱們逐節分享每一個組件。

3. 柔性事務管理器

3.1 概念

柔性事務管理器,SoftTransactionManager 實現,負責對柔性事務配置( SoftTransactionConfiguration ) 、柔性事務( AbstractSoftTransaction )的管理。

3.2 柔性事務配置

調用 #init() 初始化柔性管理器:

// SoftTransactionManager.java
/**
* 柔性事務配置對象
*/
@Getter
private final SoftTransactionConfiguration transactionConfig;  

// SoftTransactionManager.java
/**
* 初始化事務管理器.
*/
public void init() throws SQLException {
   // 初始化 最大努力送達型事務監聽器
   EventBusInstance.getInstance().register(new BestEffortsDeliveryListener());
   // 初始化 事務日誌數據庫存儲表
   if (TransactionLogDataSourceType.RDB == transactionConfig.getStorageType()) {
       Preconditions.checkNotNull(transactionConfig.getTransactionLogDataSource());
       createTable();
   }
   // 初始化 內嵌的最大努力送達型異步做業
   if (transactionConfig.getBestEffortsDeliveryJobConfiguration().isPresent()) {
       new NestedBestEffortsDeliveryJobFactory(transactionConfig).init();
   }
}
  • 將最大努力送達型事務監聽器( BestEffortsDeliveryListener )註冊到事務總線 ( EventBus )。在『最大努力送達型事務監聽器』小節會詳細分享
  • 當使用數據庫存儲事務日誌( TransactionLog ) 時,若**事務日誌表( transaction_log )**不存在則進行建立。在『事務日誌存儲器』小節會詳細分享
  • 當配置使用內嵌的最大努力送達型異步做業( NestedBestEffortsDeliveryJob ) 時,進行初始化。在『最大努力送達型異步做業』小節會詳細分享

SoftTransactionConfiguration

SoftTransactionConfiguration,柔性事務配置對象。

public class SoftTransactionConfiguration {
    /**
     * 事務管理器管理的數據源.
     */
    @Getter(AccessLevel.NONE)
    private final DataSource targetDataSource;
    
    /**
     * 同步的事務送達的最大嘗試次數.
     */
    private int syncMaxDeliveryTryTimes = 3;
    
    /**
     * 事務日誌存儲類型.
     */
    private TransactionLogDataSourceType storageType = RDB;
    /**
     * 存儲事務日誌的數據源.
     */
    private DataSource transactionLogDataSource;
    
    /**
     * 內嵌的最大努力送達型異步做業配置對象.
     */
    private Optional<NestedBestEffortsDeliveryJobConfiguration> bestEffortsDeliveryJobConfiguration = Optional.absent();
}

3.3 柔性事務

在 Sharding-JDBC 裏,目前柔性事務分紅兩種:

  • BEDSoftTransaction :最大努力送達型柔性事務
  • TCCSoftTransaction :TCC型柔性事務

繼承 AbstractSoftTransaction

public abstract class AbstractSoftTransaction {
    /**
     * 分片鏈接原自動提交狀態
     */
    private boolean previousAutoCommit;
    /**
     * 分片鏈接
     */
    @Getter
    private ShardingConnection connection;
    /**
     * 事務類型
     */
    @Getter
    private SoftTransactionType transactionType;
    /**
     * 事務編號
     */
    @Getter
    private String transactionId;
}

AbstractSoftTransaction 實現了開啓柔性事務、關閉柔性事務兩個方法提供給子類調用:

  • #beginInternal()

    /**
    * 開啓柔性
    *
    * @param conn 分片鏈接
    * @param type 事務類型
    * @throws SQLException
    */
    protected final void beginInternal(final Connection conn, final SoftTransactionType type) throws SQLException {
       // TODO 判斷若是在傳統事務中,則拋異常
       Preconditions.checkArgument(conn instanceof ShardingConnection, "Only ShardingConnection can support eventual consistency transaction.");
       // 設置執行錯誤,不拋出異常
       ExecutorExceptionHandler.setExceptionThrown(false);
       connection = (ShardingConnection) conn;
       transactionType = type;
       // 設置自動提交狀態
       previousAutoCommit = connection.getAutoCommit();
       connection.setAutoCommit(true);
       // 生成事務編號
       // TODO 替換UUID爲更有效率的id生成器
       transactionId = UUID.randomUUID().toString();
    }
    • 調用 ExecutorExceptionHandler.setExceptionThrown(false) 設置執行 SQL 錯誤時,也不拋出異常。

      • 對異常處理的代碼:ExecutorExceptionHandler#setExceptionThrown()
      • 對於其餘 SQL,不會由於 SQL 錯誤不執行,會繼續執行
      • 對於上層業務,不會由於 SQL 錯誤終止邏輯,會繼續執行。這裏有一點要注意下,上層業務不能對該 SQL 執行結果有強依賴,由於 SQL 錯誤須要重試達到數據最終一致性
      • 對於最大努力型事務( TCC暫未實現 ),會對執行錯誤的 SQL 進行重試
    • 調用 connection.setAutoCommit(true);,設置執行自動提交。使用最大努力型事務時,上層業務執行 SQL 會立刻提交,即便調用 Connection#rollback() 也是沒法回滾的,這點必定要注意。

  • #end()

    /**
    * 結束柔性事務.
    */
    public final void end() throws SQLException {
      if (connection != null) {
          ExecutorExceptionHandler.setExceptionThrown(true);
          connection.setAutoCommit(previousAutoCommit);
          SoftTransactionManager.closeCurrentTransactionManager();
      }
    }
    
    // SoftTransactionManager.java
    /**
    * 關閉當前的柔性事務管理器.
    */
    static void closeCurrentTransactionManager() {
       ExecutorDataMap.getDataMap().put(TRANSACTION, null);
       ExecutorDataMap.getDataMap().put(TRANSACTION_CONFIG, null);
    }
    • 事務結束後,必定要記得調用 #end() 清理線程變量。不然,下次請求使用到該線程,會繼續在這個柔性事務內。

BEDSoftTransaction

BEDSoftTransaction,最大努力送達型柔性事務。

public class BEDSoftTransaction extends AbstractSoftTransaction {
    
    /**
     * 開啓柔性事務.
     * 
     * @param connection 數據庫鏈接對象
     */
    public void begin(final Connection connection) throws SQLException {
        beginInternal(connection, SoftTransactionType.BestEffortsDelivery);
    }
}

TCCSoftTransaction

TCCSoftTransaction,TCC 型柔性事務,暫未實現。實現後,會更新到 《Sharding-JDBC 源碼分析 —— 分佈式事務(二)之事務補償型》


3.3.1 建立柔性事務

經過調用 SoftTransactionManager#getTransaction() 建立柔性事務對象:

/**
* {@link ExecutorDataMap#dataMap} 柔性事務對象 key
*/
private static final String TRANSACTION = "transaction";
/**
* {@link ExecutorDataMap#dataMap} 柔性事務配置 key
*/
private static final String TRANSACTION_CONFIG = "transactionConfig";

// SoftTransactionManager.java
/**
* 建立柔性事務.
* 
* @param type 柔性事務類型
* @return 柔性事務
*/
public AbstractSoftTransaction getTransaction(final SoftTransactionType type) {
   AbstractSoftTransaction result;
   switch (type) {
       case BestEffortsDelivery: 
           result = new BEDSoftTransaction();
           break;
       case TryConfirmCancel:
           result = new TCCSoftTransaction();
           break;
       default: 
           throw new UnsupportedOperationException(type.toString());
   }
   // TODO 目前使用不支持嵌套事務,之後這裏須要可配置
   if (getCurrentTransaction().isPresent()) {
       throw new UnsupportedOperationException("Cannot support nested transaction.");
   }
   ExecutorDataMap.getDataMap().put(TRANSACTION, result);
   ExecutorDataMap.getDataMap().put(TRANSACTION_CONFIG, transactionConfig);
   return result;
}
  • 後續能夠從 ExecutorDataMap 中獲取當前線程的柔性事務和柔性事務配置:

    // SoftTransactionManager.java
    /**
    * 獲取當前線程的柔性事務配置.
    * 
    * @return 當前線程的柔性事務配置
    */
    public static Optional<SoftTransactionConfiguration> getCurrentTransactionConfiguration() {
       Object transactionConfig = ExecutorDataMap.getDataMap().get(TRANSACTION_CONFIG);
       return (null == transactionConfig)
               ? Optional.<SoftTransactionConfiguration>absent()
               : Optional.of((SoftTransactionConfiguration) transactionConfig);
    }
    
    /**
    * 獲取當前的柔性事務.
    * 
    * @return 當前的柔性事務
    */
    public static Optional<AbstractSoftTransaction> getCurrentTransaction() {
       Object transaction = ExecutorDataMap.getDataMap().get(TRANSACTION);
       return (null == transaction)
               ? Optional.<AbstractSoftTransaction>absent()
               : Optional.of((AbstractSoftTransaction) transaction);
    }

4. 事務日誌存儲器

柔性事務執行過程當中,會經過事務日誌( TransactionLog ) 記錄每條 SQL 執行狀態:

  • SQL 執行前,記錄一條事務日誌
  • SQL 執行成功,移除對應的事務日誌

經過實現事務日誌存儲器接口( TransactionLogStorage ),提供存儲功能。目前有兩種實現:

  • MemoryTransactionLogStorage :基於內存的事務日誌存儲器。主要用於開發測試,生產環境下不要使用
  • RdbTransactionLogStorage :基於數據庫的事務日誌存儲器。

本節只分析 RdbTransactionLogStorage。對 MemoryTransactionLogStorage 感興趣的同窗能夠點擊連接傳送到達。

TransactionLogStorage 有五個接口方法,下文每一個小標題都是一個方法。

4.1 #add()

// TransactionLogStorage.java
/**
* 存儲事務日誌.
* 
* @param transactionLog 事務日誌
*/
void add(TransactionLog transactionLog);

// RdbTransactionLogStorage.java
@Override
public void add(final TransactionLog transactionLog) {
   String sql = "INSERT INTO `transaction_log` (`id`, `transaction_type`, `data_source`, `sql`, `parameters`, `creation_time`) VALUES (?, ?, ?, ?, ?, ?);";
   try (
    // ... 省略你熟悉的代碼
   } catch (final SQLException ex) {
       throw new TransactionLogStorageException(ex);
   }
}
  • 注意:若是插入事務日誌失敗,SQL 會繼續執行,若是此時 SQL 執行失敗,則該 SQL 會不見了。建議:#add() 和下文的 #remove() 異常時,都打印下異常日誌都文件系統

TransactionLog (transaction_log) 數據庫表結構以下:

字段 名字 數據庫類型 備註
id 事件編號 VARCHAR(40) EventBus 事件編號,非事務編號
transaction_type 柔性事務類型 VARCHAR(30)
data_source 真實數據源名 VARCHAR(255)
sql 執行 SQL TEXT 已經改寫過的 SQL
parameters 佔位符參數 TEXT JSON 字符串存儲
creation_time 記錄時間 LONG
async_delivery_try_times 已異步重試次數 INT

4.2 #remove()

// TransactionLogStorage.java
/**
* 根據主鍵刪除事務日誌.
* 
* @param id 事務日誌主鍵
*/
void remove(String id);
    
// RdbTransactionLogStorage.java    
@Override
public void remove(final String id) {
   String sql = "DELETE FROM `transaction_log` WHERE `id`=?;";
   try (
          // ... 省略你熟悉的代碼
   } catch (final SQLException ex) {
       throw new TransactionLogStorageException(ex);
   }
}

4.3 #findEligibleTransactionLogs()

// TransactionLogStorage.java
/**
* 讀取須要處理的事務日誌.
* 
* <p>須要處理的事務日誌爲: </p>
* <p>1. 異步處理次數小於最大處理次數.</p>
* <p>2. 異步處理的事務日誌早於異步處理的間隔時間.</p>
* 
* @param size 獲取日誌的數量
* @param maxDeliveryTryTimes 事務送達的最大嘗試次數
* @param maxDeliveryTryDelayMillis 執行送達事務的延遲毫秒數.
*/
List<TransactionLog> findEligibleTransactionLogs(int size, int maxDeliveryTryTimes, long maxDeliveryTryDelayMillis);

// RdbTransactionLogStorage.java
@Override
public List<TransactionLog> findEligibleTransactionLogs(final int size, final int maxDeliveryTryTimes, final long maxDeliveryTryDelayMillis) {
   List<TransactionLog> result = new ArrayList<>(size);
   String sql = "SELECT `id`, `transaction_type`, `data_source`, `sql`, `parameters`, `creation_time`, `async_delivery_try_times` "
       + "FROM `transaction_log` WHERE `async_delivery_try_times`<? AND `transaction_type`=? AND `creation_time`<? LIMIT ?;";
   try (Connection conn = dataSource.getConnection()) {
       // ... 省略你熟悉的代碼
   } catch (final SQLException ex) {
       throw new TransactionLogStorageException(ex);
   }
   return result;
}

4.4 #increaseAsyncDeliveryTryTimes()

// TransactionLogStorage.java
/**
* 增長事務日誌異步重試次數.
* 
* @param id 事務主鍵
*/
void increaseAsyncDeliveryTryTimes(String id);

// RdbTransactionLogStorage.java
@Override
public void increaseAsyncDeliveryTryTimes(final String id) {
   String sql = "UPDATE `transaction_log` SET `async_delivery_try_times`=`async_delivery_try_times`+1 WHERE `id`=?;";
   try (
       // ... 省略你熟悉的代碼
   } catch (final SQLException ex) {
       throw new TransactionLogStorageException(ex);
   }
}

4.5 #processData()

// TransactionLogStorage.java
/**
* 處理事務數據.
*
* @param connection 業務數據庫鏈接
* @param transactionLog 事務日誌
* @param maxDeliveryTryTimes 事務送達的最大嘗試次數
*/
boolean processData(Connection connection, TransactionLog transactionLog, int maxDeliveryTryTimes);

// RdbTransactionLogStorage.java
@Override
public boolean processData(final Connection connection, final TransactionLog transactionLog, final int maxDeliveryTryTimes) {
   // 重試執行失敗 SQL
   try (
       Connection conn = connection;
       PreparedStatement preparedStatement = conn.prepareStatement(transactionLog.getSql())) {
       for (int parameterIndex = 0; parameterIndex < transactionLog.getParameters().size(); parameterIndex++) {
           preparedStatement.setObject(parameterIndex + 1, transactionLog.getParameters().get(parameterIndex));
       }
       preparedStatement.executeUpdate();
   } catch (final SQLException ex) {
       // 重試失敗,更新事務日誌,增長已異步重試次數
       increaseAsyncDeliveryTryTimes(transactionLog.getId());
       throw new TransactionCompensationException(ex);
   }
   // 移除重試執行成功 SQL 對應的事務日誌
   remove(transactionLog.getId());
   return true;
}
  • 不一樣於前四個增刪改查接口方法的實現,#processData() 是帶有一些邏輯的。根據事務日誌( TransactionLog )重試執行失敗的 SQL,若成功,移除事務日誌;若失敗,更新事務日誌,增長已異步重試次數
  • 該方法會被最大努力送達型異步做業調用到

5. 最大努力送達型事務監聽器

最大努力送達型事務監聽器,BestEffortsDeliveryListener,負責記錄事務日誌、同步重試執行失敗 SQL。

// BestEffortsDeliveryListener.java
@Subscribe
@AllowConcurrentEvents
public void listen(final DMLExecutionEvent event) {
   if (!isProcessContinuously()) {
       return;
   }
   SoftTransactionConfiguration transactionConfig = SoftTransactionManager.getCurrentTransactionConfiguration().get();
   TransactionLogStorage transactionLogStorage = TransactionLogStorageFactory.createTransactionLogStorage(transactionConfig.buildTransactionLogDataSource());
   BEDSoftTransaction bedSoftTransaction = (BEDSoftTransaction) SoftTransactionManager.getCurrentTransaction().get();
   switch (event.getEventExecutionType()) {
       case BEFORE_EXECUTE: // 執行前,插入事務日誌
           //TODO 對於批量執行的SQL須要解析成兩層列表
           transactionLogStorage.add(new TransactionLog(event.getId(), bedSoftTransaction.getTransactionId(), bedSoftTransaction.getTransactionType(), 
                   event.getDataSource(), event.getSql(), event.getParameters(), System.currentTimeMillis(), 0));
           return;
       case EXECUTE_SUCCESS: // 執行成功,移除事務日誌
           transactionLogStorage.remove(event.getId());
           return;
       case EXECUTE_FAILURE: // 執行失敗,同步重試
           boolean deliverySuccess = false;
           for (int i = 0; i < transactionConfig.getSyncMaxDeliveryTryTimes(); i++) { // 同步【屢次】重試
               if (deliverySuccess) {
                   return;
               }
               boolean isNewConnection = false;
               Connection conn = null;
               PreparedStatement preparedStatement = null;
               try {
                   // 得到數據庫鏈接
                   conn = bedSoftTransaction.getConnection().getConnection(event.getDataSource(), SQLType.DML);
                   if (!isValidConnection(conn)) { // 由於可能執行失敗是數據庫鏈接異常,因此判斷一次,若是無效,從新獲取數據庫鏈接
                       bedSoftTransaction.getConnection().release(conn);
                       conn = bedSoftTransaction.getConnection().getConnection(event.getDataSource(), SQLType.DML);
                       isNewConnection = true;
                   }
                   preparedStatement = conn.prepareStatement(event.getSql());
                   // 同步重試
                   //TODO 對於批量事件須要解析成兩層列表
                   for (int parameterIndex = 0; parameterIndex < event.getParameters().size(); parameterIndex++) {
                       preparedStatement.setObject(parameterIndex + 1, event.getParameters().get(parameterIndex));
                   }
                   preparedStatement.executeUpdate();
                   deliverySuccess = true;
                   // 同步重試成功,移除事務日誌
                   transactionLogStorage.remove(event.getId());
               } catch (final SQLException ex) {
                   log.error(String.format("Delivery times %s error, max try times is %s", i + 1, transactionConfig.getSyncMaxDeliveryTryTimes()), ex);
               } finally {
                   close(isNewConnection, conn, preparedStatement);
               }
           }
           return;
       default: 
           throw new UnsupportedOperationException(event.getEventExecutionType().toString());
   }
}
  • BestEffortsDeliveryListener 經過 EventBus 實現監聽 SQL 的執行。Sharding-JDBC 如何實現 EventBus 的,請看《Sharding-JDBC 源碼分析 —— SQL 執行》

  • 調用 #isProcessContinuously() 方法判斷是否處於最大努力送達型事務中,當且僅當處於該狀態才進行監聽事件處理

  • SQL 執行,插入事務日誌

  • SQL 執行成功,移除事務日誌

  • SQL 執行失敗,根據柔性事務配置( SoftTransactionConfiguration )同步的事務送達的最大嘗試次數( syncMaxDeliveryTryTimes )進行屢次重試直到成功。整體邏輯和 RdbTransactionLogStorage#processData() 方法邏輯相似,區別在於獲取分片數據庫鏈接的特殊處理:此處調用失敗,數據庫鏈接多是異常無效的,所以調用了 #isValidConnection() 判斷鏈接的有效性。若無效,則從新獲取分片數據庫鏈接。另外,如果從新獲取分片數據庫鏈接,須要進行關閉釋放 (Connection#close()):

    // BestEffortsDeliveryListener.java
    /**
    * 經過 SELECT 1 校驗數據庫鏈接是否有效
    *
    * @param conn 數據庫鏈接
    * @return 是否有效
    */
    private boolean isValidConnection(final Connection conn) {
       try (PreparedStatement preparedStatement = conn.prepareStatement("SELECT 1")) {
           try (ResultSet rs = preparedStatement.executeQuery()) {
               return rs.next() && 1 == rs.getInt("1");
           }
       } catch (final SQLException ex) {
           return false;
       }
    }
    
    /**
    * 關閉釋放預編譯SQL對象和數據庫鏈接
    *
    * @param isNewConnection 是否新建立的數據庫鏈接,是的狀況下才釋放
    * @param conn 數據庫鏈接
    * @param preparedStatement 預編譯SQL
    */
    private void close(final boolean isNewConnection, final Connection conn, final PreparedStatement preparedStatement) {
       if (null != preparedStatement) {
           try {
               preparedStatement.close();
           } catch (final SQLException ex) {
               log.error("PreparedStatement closed error:", ex);
           }
       }
       if (isNewConnection && null != conn) {
           try {
               conn.close();
           } catch (final SQLException ex) {
               log.error("Connection closed error:", ex);
           }
       }
    }

6. 最大努力送達型異步做業

當最大努力送達型事務監聽器( BestEffortsDeliveryListener )屢次同步重試失敗後,交給最大努力送達型異步做業進行屢次異步重試,而且屢次執行有固定間隔

Sharding-JDBC 提供了兩個最大努力送達型異步做業實現:

  • NestedBestEffortsDeliveryJob :內嵌的最大努力送達型異步做業
  • BestEffortsDeliveryJob :最大努力送達型異步做業

二者實現代碼邏輯基本一致。前者相比後者,用於開發測試,去除對 Zookeeper 依賴,沒法實現高可用,所以生產環境下不適合使用

6.1 BestEffortsDeliveryJob

BestEffortsDeliveryJob 所在 Maven 項目爲 sharding-jdbc-transaction-async-job,基於噹噹開源的 Elastic-Job 實現。以下是官方對該 Maven 項目的簡要說明:

因爲柔性事務採用異步嘗試,須要部署獨立的做業和Zookeeper。sharding-jdbc-transaction採用elastic-job實現的sharding-jdbc-transaction-async-job,經過簡單配置便可啓動高可用做業異步送達柔性事務,啓動腳本爲start.sh。

BestEffortsDeliveryJob

public class BestEffortsDeliveryJob extends AbstractIndividualThroughputDataFlowElasticJob<TransactionLog> {

    /**
     * 最大努力送達型異步做業配置對象
     */
    @Setter
    private BestEffortsDeliveryConfiguration bedConfig;
    /**
     * 事務日誌存儲器對象
     */
    @Setter
    private TransactionLogStorage transactionLogStorage;

    @Override
    public List<TransactionLog> fetchData(final JobExecutionMultipleShardingContext context) {
        return transactionLogStorage.findEligibleTransactionLogs(context.getFetchDataCount(),
            bedConfig.getJobConfig().getMaxDeliveryTryTimes(), bedConfig.getJobConfig().getMaxDeliveryTryDelayMillis());
    }

    @Override
    public boolean processData(final JobExecutionMultipleShardingContext context, final TransactionLog data) {
        try (
            Connection conn = bedConfig.getTargetDataSource(data.getDataSource()).getConnection()) {
            transactionLogStorage.processData(conn, data, bedConfig.getJobConfig().getMaxDeliveryTryTimes());
        } catch (final SQLException | TransactionCompensationException ex) {
            log.error(String.format("Async delivery times %s error, max try times is %s, exception is %s", data.getAsyncDeliveryTryTimes() + 1, 
                bedConfig.getJobConfig().getMaxDeliveryTryTimes(), ex.getMessage()));
            return false;
        }
        return true;
    }
    
    @Override
    public boolean isStreamingProcess() {
        return false;
    }
}
  • 調用 #fetchData() 方法獲取須要處理的事務日誌 (TransactionLog),內部調用了 TransactionLogStorage#findEligibleTransactionLogs() 方法
  • 調用 #processData() 方法處理事務日誌,重試執行失敗的 SQL,內部調用了 TransactionLogStorage#processData()
  • #fetchData()#processData() 調用是 Elastic-Job 控制的。每一輪定時調度,每條事務日誌只執行一次。當超過最大異步調用次數後,該條事務日誌再也不處理,因此生產使用時,最好增長下相應監控超過最大異步重試次數的事務日誌

6.2 AsyncSoftTransactionJobConfiguration

AsyncSoftTransactionJobConfiguration,異步柔性事務做業配置對象。

public class AsyncSoftTransactionJobConfiguration {
    
    /**
     * 做業名稱.
     */
    private String name = "bestEffortsDeliveryJob";
    
    /**
     * 觸發做業的cron表達式.
     */
    private String cron = "0/5 * * * * ?";
    
    /**
     * 每次做業獲取的事務日誌最大數量.
     */
    private int transactionLogFetchDataCount = 100;
    
    /**
     * 事務送達的最大嘗試次數.
     */
    private int maxDeliveryTryTimes = 3;
    
    /**
     * 執行事務的延遲毫秒數.
     *
     * <p>早於此間隔時間的入庫事務纔會被做業執行.</p>
     */
    private long maxDeliveryTryDelayMillis = 60  * 1000L;
}

6.3 Elastic-Job 是否必須?

Sharding-JDBC 提供的最大努力送達型異步做業實現( BestEffortsDeliveryJob ),經過與 Elastic-Job 集成,能夠很便捷而且有質量保證的高可用高性能使用。一部分團隊,可能已經引入或自研了相似 Elastic-Job 的分佈式做業中間件解決方案,每多一箇中間件,就是多一個學習與運維成本。那麼是否可使用本身的分佈式做業解決方案?答案是,能夠的。參考 BestEffortsDeliveryJob 的實現,經過調用 TransactionLogStorage 來實現:

// 僞代碼(不考慮性能、異常)
List<TransactionLog> transactionLogs = transactionLogStorage.findEligibleTransactionLogs(....);
for (TransactionLog transactionLog : transactionLogs) {
       transactionLogStorage.processData(conn, log, maxDeliveryTryTimes);
}

固然,我的仍是很推薦 Elastic-Job。

😈 筆者要開始寫《Elastic-Job 源碼分析》


另外,若是有支持事務消息的分佈式隊列系統,能夠經過 TransactionLogStorage 實現存儲事務消息存儲成消息。爲何要支持事務消息?若是 SQL 執行是成功的,須要回滾(刪除)事務消息。

7. 適用場景

《官方文檔 - 事務支持》

8. 開發指南 & 開發示例

《官方文檔 - 事務支持》

666. 彩蛋

哈哈哈

算是堅持把這個系列寫完了,給本身 32 個贊。

知足!

《Elastic-Job 源碼分析》 走起!不 High 不結束!

相關文章
相關標籤/搜索