分佈式事務 TCC-Transaction 源碼分析 —— 事務恢復

摘要: 原創出處 http://www.iocoder.cn/TCC-Transaction/transaction-recovery/ 「芋道源碼」歡迎轉載,保留摘要,謝謝!git

本文主要基於 TCC-Transaction 1.2.3.3 正式版github


🙂🙂🙂關注**微信公衆號:【芋道源碼】**有福利:spring

  1. RocketMQ / MyCAT / Sharding-JDBC 全部源碼分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文註釋源碼 GitHub 地址
  3. 您對於源碼的疑問每條留言將獲得認真回覆。甚至不知道如何讀源碼也能夠請教噢
  4. 新的源碼解析文章實時收到通知。每週更新一篇左右
  5. 認真的源碼交流微信羣。

1. 概述

本文分享 TCC 恢復。主要涉及以下二個 package 路徑下的類:數據庫

  • org.mengyun.tcctransaction.recover
    • RecoverConfig,事務恢復配置接口
    • TransactionRecovery,事務恢復邏輯
  • org.mengyun.tcctransaction.spring.recover
    • DefaultRecoverConfig,默認事務恢復配置實現
    • RecoverScheduledJob,事務恢復定時任務

本文涉及到的類關係以下圖( 打開大圖 ):服務器

《TCC-Transaction 源碼分析 —— 事務存儲器》中,事務信息被持久化到外部的存儲器中。事務存儲是事務恢復的基礎。經過讀取外部存儲器中的異常事務,定時任務會按照必定頻率對事務進行重試,直到事務完成或超過最大重試次數。微信

你行好事會由於獲得讚揚而愉悅
同理,開源項目貢獻者會由於 Star 而更加有動力
爲 TCC-Transaction 點贊!傳送門架構

ps:筆者假設你已經閱讀過《tcc-transaction 官方文檔 —— 使用指南1.2.x》併發

2. 事務重試配置

org.mengyun.tcctransaction.recover.RecoverConfig,事務恢復配置接口,實現代碼以下:app

public interface RecoverConfig {

    /** * @return 最大重試次數 */
    int getMaxRetryCount();

    /** * @return 恢復間隔時間,單位:秒 */
    int getRecoverDuration();

    /** * @return cron 表達式 */
    String getCronExpression();

    /** * @return 延遲取消異常集合 */
    Set<Class<? extends Exception>> getDelayCancelExceptions();

    /** * 設置延遲取消異常集合 * * @param delayRecoverExceptions 延遲取消異常集合 */
    void setDelayCancelExceptions(Set<Class<? extends Exception>> delayRecoverExceptions);
    
}
複製代碼
  • #getMaxRetryCount(),單個事務恢復最大重試次數。超過最大重試次數後,目前僅打出錯誤日誌,下文會看到實現。
  • #getRecoverDuration(),單個事務恢復重試的間隔時間,單位:秒。
  • #getCronExpression(),定時任務 cron 表達式。
  • #getDelayCancelExceptions(),延遲取消異常集合。

org.mengyun.tcctransaction.spring.recover.DefaultRecoverConfig默認事務恢復配置實現,實現代碼以下:框架

public class DefaultRecoverConfig implements RecoverConfig {

    public static final RecoverConfig INSTANCE = new DefaultRecoverConfig();

    /** * 最大重試次數 */
    private int maxRetryCount = 30;

    /** * 恢復間隔時間,單位:秒 */
    private int recoverDuration = 120;

    /** * cron 表達式 */
    private String cronExpression = "0 */1 * * * ?";

    /** * 延遲取消異常集合 */
    private Set<Class<? extends Exception>> delayCancelExceptions = new HashSet<Class<? extends Exception>>();

    public DefaultRecoverConfig() {
        delayCancelExceptions.add(OptimisticLockException.class);
        delayCancelExceptions.add(SocketTimeoutException.class);
    }
    
    @Override
    public void setDelayCancelExceptions(Set<Class<? extends Exception>> delayCancelExceptions) {
        this.delayCancelExceptions.addAll(delayCancelExceptions);
    }
    
}
複製代碼
  • maxRetryCount,單個事務恢復最大重試次數 爲 30。
  • recoverDuration,單個事務恢復重試的間隔時間爲 120 秒。
  • cronExpression,定時任務 cron 表達式爲 "0 */1 * * * ?",每分鐘執行一次。若是你但願定時任務執行的更頻繁,能夠修改 cron 表達式,例如 0/30 * * * * ?,每 30 秒執行一次。
  • delayCancelExceptions,延遲取消異常集合。在 DefaultRecoverConfig 構造方法裏,預先添加了 OptimisticLockException / SocketTimeoutException 。
    • 針對 SocketTimeoutException :try 階段,本地參與者調用遠程參與者( 遠程服務,例如 Dubbo,Http 服務),遠程參與者 try 階段的方法邏輯執行時間較長,超過 Socket 等待時長,發生 SocketTimeoutException,若是馬上執行事務回滾,遠程參與者 try 的方法未執行完成,可能致使 cancel 的方法實際未執行( try 的方法未執行完成,數據庫事務【非 TCC 事務】未提交,cancel 的方法讀取數據時發現未變動,致使方法實際未執行,最終 try 的方法執行完後,提交數據庫事務【非 TCC 事務】,較爲極端 ),最終引發數據不一致。在事務恢復時,會對這種狀況的事務進行取消回滾,若是此時遠程參與者的 try 的方法還未結束,仍是可能發生數據不一致。
    • 針對 OptimisticLockException :仍是 SocketTimeoutException 的狀況,事務恢復間隔時間小於 Socket 超時時間,此時事務恢復調用遠程參與者取消回滾事務,遠程參與者下次更新事務時,會由於樂觀鎖更新失敗,拋出 OptimisticLockException。若是 CompensableTransactionInterceptor 此時馬上取消回滾,可能會和定時任務的取消回滾衝突,所以統一交給定時任務處理。
      • 官方解釋:事務恢復的疑問
      • 這塊筆者還有一些疑問,若是有別的可能性致使這個狀況,麻煩告知下筆者。謝謝。

3. 事務重試定時任務

org.mengyun.tcctransaction.spring.recover.RecoverScheduledJob,事務恢復定時任務,基於 Quartz 實現調度,不斷不斷不斷執行事務恢復。實現代碼以下:

public class RecoverScheduledJob {

    private TransactionRecovery transactionRecovery;

    private TransactionConfigurator transactionConfigurator;

    private Scheduler scheduler;

    public void init() {
        try {
            // Quartz JobDetail
            MethodInvokingJobDetailFactoryBean jobDetail = new MethodInvokingJobDetailFactoryBean();
            jobDetail.setTargetObject(transactionRecovery);
            jobDetail.setTargetMethod("startRecover");
            jobDetail.setName("transactionRecoveryJob");
            jobDetail.setConcurrent(false); // 禁止併發
            jobDetail.afterPropertiesSet();
            // Quartz CronTriggerFactoryBean
            CronTriggerFactoryBean cronTrigger = new CronTriggerFactoryBean();
            cronTrigger.setBeanName("transactionRecoveryCronTrigger");
            cronTrigger.setCronExpression(transactionConfigurator.getRecoverConfig().getCronExpression());
            cronTrigger.setJobDetail(jobDetail.getObject());
            cronTrigger.afterPropertiesSet();
            // 啓動任務調度
            scheduler.scheduleJob(jobDetail.getObject(), cronTrigger.getObject());
            // 啓動 Quartz Scheduler
            scheduler.start();
        } catch (Exception e) {
            throw new SystemException(e);
        }
    }
}
複製代碼
  • 調用 MethodInvokingJobDetailFactoryBean#setConcurrent(false) 方法,禁用任務併發執行。
  • 調用 MethodInvokingJobDetailFactoryBean#setTargetObject(...) + MethodInvokingJobDetailFactoryBean#setTargetMethod(...) 方法,設置任務調用 TransactionRecovery#startRecover(...) 方法執行。

若是應用集羣部署,會不會相同事務被多個定時任務同時重試

答案是不會,事務在重試時會樂觀鎖更新,同時只有一個應用節點能更新成功。

官方解釋:多機部署下,全部機器都宕機,從異常中恢復時,全部的機器豈不是均可以查詢到全部的須要恢復的服務?

固然極端狀況下,Socket 調用超時時間大於事務重試間隔,第一個節點在重試某個事務,一直未執行完成,第二個節點已經能夠重試。

ps:建議,Socket 調用超時時間小於事務重試間隔。

是否認時任務和應用服務器解耦

螞蟻金服的分佈式事務服務 DTS 採用 client-server 模式:

  • xts-client :負責事務的建立、提交、回滾、記錄。
  • xts-server :負責異常事務的恢復。

FROM 《螞蟻金融雲 DTS 文檔》
分佈式事務服務 (Distributed Transaction Service, DTS) 是一個分佈式事務框架,用來保障在大規模分佈式環境下事務的最終一致性。DTS 從架構上分爲 xts-client 和 xts-server 兩部分,前者是一個嵌入客戶端應用的 JAR 包,主要負責事務數據的寫入和處理;後者是一個獨立的系統,主要負責異常事務的恢復。

4. 異常事務恢復

org.mengyun.tcctransaction.recover.TransactionRecovery,異常事務恢復,實現主體代碼以下:

public class TransactionRecovery {

   /** * 啓動恢復事務邏輯 */
   public void startRecover() {
       // 加載異常事務集合
       List<Transaction> transactions = loadErrorTransactions();
       // 恢復異常事務集合
       recoverErrorTransactions(transactions);
   }

}
複製代碼

4.1 加載異常事務集合

調用 #loadErrorTransactions() 方法,加載異常事務集合。實現代碼以下:

private List<Transaction> loadErrorTransactions() {
   TransactionRepository transactionRepository = transactionConfigurator.getTransactionRepository();
   long currentTimeInMillis = Calendar.getInstance().getTimeInMillis();
   RecoverConfig recoverConfig = transactionConfigurator.getRecoverConfig();
   return transactionRepository.findAllUnmodifiedSince(new Date(currentTimeInMillis - recoverConfig.getRecoverDuration() * 1000));
}
複製代碼
  • 異常事務的定義:當前時間超過 - 事務變動時間( 最後執行時間 ) >= 事務恢復間隔( RecoverConfig#getRecoverDuration() )。這裏有一點要注意,已完成的事務會從事務存儲器刪除。

4.2 恢復異常事務集合

調用 #recoverErrorTransactions(...) 方法,恢復異常事務集合。實現代碼以下:

private void recoverErrorTransactions(List<Transaction> transactions) {
   for (Transaction transaction : transactions) {
       // 超過最大重試次數
       if (transaction.getRetriedCount() > transactionConfigurator.getRecoverConfig().getMaxRetryCount()) {
           logger.error(String.format("recover failed with max retry count,will not try again. txid:%s, status:%s,retried count:%d,transaction content:%s", transaction.getXid(), transaction.getStatus().getId(), transaction.getRetriedCount(), JSON.toJSONString(transaction)));
           continue;
       }
       // 分支事務超過最大可重試時間
       if (transaction.getTransactionType().equals(TransactionType.BRANCH)
               && (transaction.getCreateTime().getTime() +
               transactionConfigurator.getRecoverConfig().getMaxRetryCount() *
                       transactionConfigurator.getRecoverConfig().getRecoverDuration() * 1000
               > System.currentTimeMillis())) {
           continue;
       }
       // Confirm / Cancel
       try {
           // 增長重試次數
           transaction.addRetriedCount();
           // Confirm
           if (transaction.getStatus().equals(TransactionStatus.CONFIRMING)) {
               transaction.changeStatus(TransactionStatus.CONFIRMING);
               transactionConfigurator.getTransactionRepository().update(transaction);
               transaction.commit();
               transactionConfigurator.getTransactionRepository().delete(transaction);
           // Cancel
           } else if (transaction.getStatus().equals(TransactionStatus.CANCELLING)
                   || transaction.getTransactionType().equals(TransactionType.ROOT)) { // 處理延遲取消的狀況
               transaction.changeStatus(TransactionStatus.CANCELLING);
               transactionConfigurator.getTransactionRepository().update(transaction);
               transaction.rollback();
               transactionConfigurator.getTransactionRepository().delete(transaction);
           }
       } catch (Throwable throwable) {
           if (throwable instanceof OptimisticLockException
                   || ExceptionUtils.getRootCause(throwable) instanceof OptimisticLockException) {
               logger.warn(String.format("optimisticLockException happened while recover. txid:%s, status:%s,retried count:%d,transaction content:%s", transaction.getXid(), transaction.getStatus().getId(), transaction.getRetriedCount(), JSON.toJSONString(transaction)), throwable);
           } else {
               logger.error(String.format("recover failed, txid:%s, status:%s,retried count:%d,transaction content:%s", transaction.getXid(), transaction.getStatus().getId(), transaction.getRetriedCount(), JSON.toJSONString(transaction)), throwable);
           }
       }
   }
}
複製代碼
  • 當單個事務超過最大重試次數時,再也不重試,只打印異常,此時須要人工介入解決。能夠接入 ELK 收集日誌監控報警。
  • 分支事務超過最大可重試時間時,再也不重試。可能有同窗和我一開始理解的是相同的,實際分支事務對應的應用服務器也能夠重試分支事務,不是必須根事務發起重試,從而一塊兒重試分支事務。這點要注意下。
  • 當事務處於 TransactionStatus.CONFIRMING 狀態時,提交事務,邏輯和 TransactionManager#commit() 相似。
  • 當事務處於 TransactionStatus.CONFIRMING 狀態,或者事務類型爲根事務,回滾事務,邏輯和 TransactionManager#rollback() 相似。這裏加判斷的事務類型爲根事務,用於處理延遲迴滾異常的事務的回滾。

666. 彩蛋

在寫本文的過程當中,無心中翻到螞蟻雲的文檔,分享給看到此處的真愛們。

真愛們,請猛擊《AntCloudPayPublic》跳轉。

胖友,分享一個朋友圈可好?

相關文章
相關標籤/搜索