摘要: 原創出處 http://www.iocoder.cn/TCC-Transaction/transaction-recovery/ 「芋道源碼」歡迎轉載,保留摘要,謝謝!git
本文主要基於 TCC-Transaction 1.2.3.3 正式版github
🙂🙂🙂關注**微信公衆號:【芋道源碼】**有福利:spring
- RocketMQ / MyCAT / Sharding-JDBC 全部源碼分析文章列表
- RocketMQ / MyCAT / Sharding-JDBC 中文註釋源碼 GitHub 地址
- 您對於源碼的疑問每條留言都將獲得認真回覆。甚至不知道如何讀源碼也能夠請教噢。
- 新的源碼解析文章實時收到通知。每週更新一篇左右。
- 認真的源碼交流微信羣。
本文分享 TCC 恢復。主要涉及以下二個 package 路徑下的類:數據庫
org.mengyun.tcctransaction.recover
org.mengyun.tcctransaction.spring.recover
:
本文涉及到的類關係以下圖( 打開大圖 ):服務器
在《TCC-Transaction 源碼分析 —— 事務存儲器》中,事務信息被持久化到外部的存儲器中。事務存儲是事務恢復的基礎。經過讀取外部存儲器中的異常事務,定時任務會按照必定頻率對事務進行重試,直到事務完成或超過最大重試次數。微信
你行好事會由於獲得讚揚而愉悅
同理,開源項目貢獻者會由於 Star 而更加有動力
爲 TCC-Transaction 點贊!傳送門架構
ps:筆者假設你已經閱讀過《tcc-transaction 官方文檔 —— 使用指南1.2.x》。併發
org.mengyun.tcctransaction.recover.RecoverConfig
,事務恢復配置接口,實現代碼以下:app
public interface RecoverConfig {
/** * @return 最大重試次數 */
int getMaxRetryCount();
/** * @return 恢復間隔時間,單位:秒 */
int getRecoverDuration();
/** * @return cron 表達式 */
String getCronExpression();
/** * @return 延遲取消異常集合 */
Set<Class<? extends Exception>> getDelayCancelExceptions();
/** * 設置延遲取消異常集合 * * @param delayRecoverExceptions 延遲取消異常集合 */
void setDelayCancelExceptions(Set<Class<? extends Exception>> delayRecoverExceptions);
}
複製代碼
#getMaxRetryCount()
,單個事務恢復最大重試次數。超過最大重試次數後,目前僅打出錯誤日誌,下文會看到實現。#getRecoverDuration()
,單個事務恢復重試的間隔時間,單位:秒。#getCronExpression()
,定時任務 cron 表達式。#getDelayCancelExceptions()
,延遲取消異常集合。org.mengyun.tcctransaction.spring.recover.DefaultRecoverConfig
,默認事務恢復配置實現,實現代碼以下:框架
public class DefaultRecoverConfig implements RecoverConfig {
public static final RecoverConfig INSTANCE = new DefaultRecoverConfig();
/** * 最大重試次數 */
private int maxRetryCount = 30;
/** * 恢復間隔時間,單位:秒 */
private int recoverDuration = 120;
/** * cron 表達式 */
private String cronExpression = "0 */1 * * * ?";
/** * 延遲取消異常集合 */
private Set<Class<? extends Exception>> delayCancelExceptions = new HashSet<Class<? extends Exception>>();
public DefaultRecoverConfig() {
delayCancelExceptions.add(OptimisticLockException.class);
delayCancelExceptions.add(SocketTimeoutException.class);
}
@Override
public void setDelayCancelExceptions(Set<Class<? extends Exception>> delayCancelExceptions) {
this.delayCancelExceptions.addAll(delayCancelExceptions);
}
}
複製代碼
maxRetryCount
,單個事務恢復最大重試次數 爲 30。recoverDuration
,單個事務恢復重試的間隔時間爲 120 秒。cronExpression
,定時任務 cron 表達式爲 "0 */1 * * * ?"
,每分鐘執行一次。若是你但願定時任務執行的更頻繁,能夠修改 cron 表達式,例如 0/30 * * * * ?
,每 30 秒執行一次。delayCancelExceptions
,延遲取消異常集合。在 DefaultRecoverConfig 構造方法裏,預先添加了 OptimisticLockException / SocketTimeoutException 。
org.mengyun.tcctransaction.spring.recover.RecoverScheduledJob
,事務恢復定時任務,基於 Quartz 實現調度,不斷不斷不斷執行事務恢復。實現代碼以下:
public class RecoverScheduledJob {
private TransactionRecovery transactionRecovery;
private TransactionConfigurator transactionConfigurator;
private Scheduler scheduler;
public void init() {
try {
// Quartz JobDetail
MethodInvokingJobDetailFactoryBean jobDetail = new MethodInvokingJobDetailFactoryBean();
jobDetail.setTargetObject(transactionRecovery);
jobDetail.setTargetMethod("startRecover");
jobDetail.setName("transactionRecoveryJob");
jobDetail.setConcurrent(false); // 禁止併發
jobDetail.afterPropertiesSet();
// Quartz CronTriggerFactoryBean
CronTriggerFactoryBean cronTrigger = new CronTriggerFactoryBean();
cronTrigger.setBeanName("transactionRecoveryCronTrigger");
cronTrigger.setCronExpression(transactionConfigurator.getRecoverConfig().getCronExpression());
cronTrigger.setJobDetail(jobDetail.getObject());
cronTrigger.afterPropertiesSet();
// 啓動任務調度
scheduler.scheduleJob(jobDetail.getObject(), cronTrigger.getObject());
// 啓動 Quartz Scheduler
scheduler.start();
} catch (Exception e) {
throw new SystemException(e);
}
}
}
複製代碼
MethodInvokingJobDetailFactoryBean#setConcurrent(false)
方法,禁用任務併發執行。MethodInvokingJobDetailFactoryBean#setTargetObject(...)
+ MethodInvokingJobDetailFactoryBean#setTargetMethod(...)
方法,設置任務調用 TransactionRecovery#startRecover(...)
方法執行。若是應用集羣部署,會不會相同事務被多個定時任務同時重試?
答案是不會,事務在重試時會樂觀鎖更新,同時只有一個應用節點能更新成功。
官方解釋:多機部署下,全部機器都宕機,從異常中恢復時,全部的機器豈不是均可以查詢到全部的須要恢復的服務?
固然極端狀況下,Socket 調用超時時間大於事務重試間隔,第一個節點在重試某個事務,一直未執行完成,第二個節點已經能夠重試。
ps:建議,Socket 調用超時時間小於事務重試間隔。
是否認時任務和應用服務器解耦?
螞蟻金服的分佈式事務服務 DTS 採用 client-server 模式:
FROM 《螞蟻金融雲 DTS 文檔》
分佈式事務服務 (Distributed Transaction Service, DTS) 是一個分佈式事務框架,用來保障在大規模分佈式環境下事務的最終一致性。DTS 從架構上分爲 xts-client 和 xts-server 兩部分,前者是一個嵌入客戶端應用的 JAR 包,主要負責事務數據的寫入和處理;後者是一個獨立的系統,主要負責異常事務的恢復。
org.mengyun.tcctransaction.recover.TransactionRecovery
,異常事務恢復,實現主體代碼以下:
public class TransactionRecovery {
/** * 啓動恢復事務邏輯 */
public void startRecover() {
// 加載異常事務集合
List<Transaction> transactions = loadErrorTransactions();
// 恢復異常事務集合
recoverErrorTransactions(transactions);
}
}
複製代碼
調用 #loadErrorTransactions()
方法,加載異常事務集合。實現代碼以下:
private List<Transaction> loadErrorTransactions() {
TransactionRepository transactionRepository = transactionConfigurator.getTransactionRepository();
long currentTimeInMillis = Calendar.getInstance().getTimeInMillis();
RecoverConfig recoverConfig = transactionConfigurator.getRecoverConfig();
return transactionRepository.findAllUnmodifiedSince(new Date(currentTimeInMillis - recoverConfig.getRecoverDuration() * 1000));
}
複製代碼
RecoverConfig#getRecoverDuration()
)。這裏有一點要注意,已完成的事務會從事務存儲器刪除。調用 #recoverErrorTransactions(...)
方法,恢復異常事務集合。實現代碼以下:
private void recoverErrorTransactions(List<Transaction> transactions) {
for (Transaction transaction : transactions) {
// 超過最大重試次數
if (transaction.getRetriedCount() > transactionConfigurator.getRecoverConfig().getMaxRetryCount()) {
logger.error(String.format("recover failed with max retry count,will not try again. txid:%s, status:%s,retried count:%d,transaction content:%s", transaction.getXid(), transaction.getStatus().getId(), transaction.getRetriedCount(), JSON.toJSONString(transaction)));
continue;
}
// 分支事務超過最大可重試時間
if (transaction.getTransactionType().equals(TransactionType.BRANCH)
&& (transaction.getCreateTime().getTime() +
transactionConfigurator.getRecoverConfig().getMaxRetryCount() *
transactionConfigurator.getRecoverConfig().getRecoverDuration() * 1000
> System.currentTimeMillis())) {
continue;
}
// Confirm / Cancel
try {
// 增長重試次數
transaction.addRetriedCount();
// Confirm
if (transaction.getStatus().equals(TransactionStatus.CONFIRMING)) {
transaction.changeStatus(TransactionStatus.CONFIRMING);
transactionConfigurator.getTransactionRepository().update(transaction);
transaction.commit();
transactionConfigurator.getTransactionRepository().delete(transaction);
// Cancel
} else if (transaction.getStatus().equals(TransactionStatus.CANCELLING)
|| transaction.getTransactionType().equals(TransactionType.ROOT)) { // 處理延遲取消的狀況
transaction.changeStatus(TransactionStatus.CANCELLING);
transactionConfigurator.getTransactionRepository().update(transaction);
transaction.rollback();
transactionConfigurator.getTransactionRepository().delete(transaction);
}
} catch (Throwable throwable) {
if (throwable instanceof OptimisticLockException
|| ExceptionUtils.getRootCause(throwable) instanceof OptimisticLockException) {
logger.warn(String.format("optimisticLockException happened while recover. txid:%s, status:%s,retried count:%d,transaction content:%s", transaction.getXid(), transaction.getStatus().getId(), transaction.getRetriedCount(), JSON.toJSONString(transaction)), throwable);
} else {
logger.error(String.format("recover failed, txid:%s, status:%s,retried count:%d,transaction content:%s", transaction.getXid(), transaction.getStatus().getId(), transaction.getRetriedCount(), JSON.toJSONString(transaction)), throwable);
}
}
}
}
複製代碼
TransactionManager#commit()
相似。TransactionManager#rollback()
相似。這裏加判斷的事務類型爲根事務,用於處理延遲迴滾異常的事務的回滾。在寫本文的過程當中,無心中翻到螞蟻雲的文檔,分享給看到此處的真愛們。
真愛們,請猛擊《AntCloudPayPublic》跳轉。
胖友,分享一個朋友圈可好?