摘要: 原創出處 http://www.iocoder.cn/TCC-Transaction/tcc-core 「芋道源碼」歡迎轉載,保留摘要,謝謝!html
本文主要基於 TCC-Transaction 1.2.3.3 正式版java
🙂🙂🙂關注**微信公衆號:【芋道源碼】**有福利:git
- RocketMQ / MyCAT / Sharding-JDBC 全部源碼分析文章列表
- RocketMQ / MyCAT / Sharding-JDBC 中文註釋源碼 GitHub 地址
- 您對於源碼的疑問每條留言都將獲得認真回覆。甚至不知道如何讀源碼也能夠請教噢。
- 新的源碼解析文章實時收到通知。每週更新一篇左右。
- 認真的源碼交流微信羣。
本文分享 TCC 實現。主要涉及以下三個 Maven 項目:github
tcc-transaction-core
:tcc-transaction 底層實現。tcc-transaction-api
:tcc-transaction 使用 API。tcc-transaction-spring
:tcc-transaction Spring 支持。你行好事會由於獲得讚揚而愉悅
同理,開源項目貢獻者會由於 Star 而更加有動力
爲 TCC-Transaction 點贊!傳送門算法
OK,開始咱們的第一段 TCC 旅程吧。spring
ps:筆者假設你已經閱讀過《tcc-transaction 官方文檔 —— 使用指南1.2.x》。api
ps2:未特殊說明的狀況下,本文事務指的是 TCC事務。數組
FROM https://support.hwclouds.com/devg-servicestage/zh-cn_topic_0056814426.html
TCC事務
爲了解決在事務運行過程當中大顆粒度資源鎖定的問題,業界提出一種新的事務模型,它是基於業務層面的事務定義。鎖粒度徹底由業務本身控制。它本質是一種補償的思路。它把事務運行過程分紅 Try、Confirm / Cancel 兩個階段。在每一個階段的邏輯由業務代碼控制。這樣就事務的鎖粒度能夠徹底自由控制。業務能夠在犧牲隔離性的狀況下,獲取更高的性能。微信
總體流程以下圖:架構
紅框部分功能由 tcc-transaction-core
實現:
黃框部分功能由 tcc-transaction-http-sample
實現( 官方提供的示例項目 ):
與 2PC協議 比較:
參考資料:
在 TCC 裏,一個業務活動能夠有多個事務,每一個業務操做歸屬於不一樣的事務,即一個事務能夠包含多個業務操做。TCC-Transaction 將每一個業務操做抽象成事務參與者,每一個事務能夠包含多個參與者。
參與者須要聲明 try / confirm / cancel 三個類型的方法,和 TCC 的操做一一對應。在程序裏,經過 @Compensable 註解標記在 try 方法上,並填寫對應的 confirm / cancel 方法,示例代碼以下:
// try
@Compensable(confirmMethod = "confirmRecord", cancelMethod = "cancelRecord", transactionContextEditor = MethodTransactionContextEditor.class)
public String record(TransactionContext transactionContext, CapitalTradeOrderDto tradeOrderDto) {}
// confirm
public void confirmRecord(TransactionContext transactionContext, CapitalTradeOrderDto tradeOrderDto) {}
// cancel
public void cancelRecord(TransactionContext transactionContext, CapitalTradeOrderDto tradeOrderDto) {}
複製代碼
TCC-Transaction 有兩個攔截器,經過對 @Compensable AOP 切面( 參與者 try 方法 )進行攔截,透明化對參與者 confirm / cancel 方法調用,從而實現 TCC 。簡化流程以下圖:
第一個攔截器,可補償事務攔截器,實現以下功能:
第二個攔截器,資源協調者攔截器,實現以下功能:
實際攔截器對事務的處理會比上圖複雜一些,在本文「6. 事務攔截器」詳細解析。
在 TCC-Transaction 代碼實現上,組件分層以下圖:
本文按照以下順序分享:
內容是自下而上的方式分享,每一個組件能夠更加總體的被認識。固然這可能對你理解時產生一臉悶逼,因此推薦兩種閱讀方式:
事務存儲器在《TCC-Transaction 源碼解析 —— 事務存儲於恢復》詳細解析。
事務恢復在《TCC-Transaction 源碼解析 —— 事務恢復》詳細解析。
在 TCC 裏,一個事務( org.mengyun.tcctransaction.Transaction
) 能夠有多個參與者( org.mengyun.tcctransaction.Participant
)參與業務活動。類圖關係以下( 打開大圖 ):
Transaction 實現代碼以下:
public class Transaction implements Serializable {
private static final long serialVersionUID = 7291423944314337931L;
/** * 事務編號 */
private TransactionXid xid;
/** * 事務狀態 */
private TransactionStatus status;
/** * 事務類型 */
private TransactionType transactionType;
/** * 重試次數 */
private volatile int retriedCount = 0;
/** * 建立時間 */
private Date createTime = new Date();
/** * 最後更新時間 */
private Date lastUpdateTime = new Date();
/** * 版本號 */
private long version = 1;
/** * 參與者集合 */
private List<Participant> participants = new ArrayList<Participant>();
/** * 附帶屬性映射 */
private Map<String, Object> attachments = new ConcurrentHashMap<String, Object>();
/** * 添加參與者 * * @param participant 參與者 */
public void enlistParticipant(Participant participant) {
participants.add(participant);
}
/** * 提交 TCC 事務 */
public void commit() {
for (Participant participant : participants) {
participant.commit();
}
}
/** * 回滾 TCC 事務 */
public void rollback() {
for (Participant participant : participants) {
participant.rollback();
}
}
}
複製代碼
xid,事務編號( TransactionXid ),用於惟一標識一個事務。使用 UUID 算法生成,保證惟一性。org.mengyun.tcctransaction.api.TransactionXid
實現 javax.transaction.xa.Xid
接口,實現代碼以下:
public class TransactionXid implements Xid, Serializable {
private static final long serialVersionUID = -6817267250789142043L;
/** * xid 格式標識符 */
private int formatId = 1;
/** * 全局事務編號 */
private byte[] globalTransactionId;
/** * 分支事務編號 */
private byte[] branchQualifier;
}
複製代碼
status,事務狀態( TransactionStatus )。org.mengyun.tcctransaction.api.TransactionStatus
實現代碼以下:
public enum TransactionStatus {
/** * 嘗試中狀態 */
TRYING(1),
/** * 確認中狀態 */
CONFIRMING(2),
/** * 取消中狀態 */
CANCELLING(3);
private int id;
}
複製代碼
transactionType,事務類型( TransactionType )。org.mengyun.tcctransaction.common.TransactionType
實現代碼以下:
public enum TransactionType {
/** * 根事務 */
ROOT(1),
/** * 分支事務 */
BRANCH(2);
int id;
}
複製代碼
retriedCount,重試次數。在 TCC 過程當中,可能參與者異常崩潰,這個時候會進行重試直到成功或超過最大次數。在《TCC-Transaction 源碼解析 —— 事務恢復》詳細解析。
version,版本號,用於樂觀鎖更新事務。在《TCC-Transaction 源碼解析 —— 事務存儲器》詳細解析。
attachments,附帶屬性映射。在《TCC-Transaction 源碼解析 —— Dubbo 支持》詳細解析。
提供 #enlistParticipant()
方法,添加事務參與者。
提供 #commit()
方法,調用參與者們提交事務。
提供 #rollback()
方法,調用參與者回滾事務。
Participant 實現代碼以下:
public class Participant implements Serializable {
private static final long serialVersionUID = 4127729421281425247L;
/** * 事務編號 */
private TransactionXid xid;
/** * 確認執行業務方法調用上下文 */
private InvocationContext confirmInvocationContext;
/** * 取消執行業務方法 */
private InvocationContext cancelInvocationContext;
/** * 執行器 */
private Terminator terminator = new Terminator();
/** * 事務上下文編輯 */
Class<? extends TransactionContextEditor> transactionContextEditorClass;
/** * 提交事務 */
public void commit() {
terminator.invoke(new TransactionContext(xid, TransactionStatus.CONFIRMING.getId()), confirmInvocationContext, transactionContextEditorClass);
}
/** * 回滾事務 */
public void rollback() {
terminator.invoke(new TransactionContext(xid, TransactionStatus.CANCELLING.getId()), cancelInvocationContext, transactionContextEditorClass);
}
}
複製代碼
xid,參與者事務編號。經過 TransactionXid.globalTransactionId
屬性,關聯上其所屬的事務。當參與者進行遠程調用時,遠程的分支事務的事務編號等於該參與者的事務編號。經過事務編號的關聯,TCC Confirm / Cancel 階段,使用參與者的事務編號和遠程的分支事務進行關聯,從而實現事務的提交和回滾,在「5.2 傳播發起分支事務」 + 「6.2 可補償事務攔截器」能夠看到具體實現。
confirmInvocationContext,確認執行業務方法調用上下文( InvocationContext )。org.mengyun.tcctransaction.InvocationContext
實現代碼以下:
public class InvocationContext implements Serializable {
private static final long serialVersionUID = -7969140711432461165L;
/** * 類 */
private Class targetClass;
/** * 方法名 */
private String methodName;
/** * 參數類型數組 */
private Class[] parameterTypes;
/** * 參數數組 */
private Object[] args;
}
複製代碼
org.mengyun.tcctransaction.Terminator
會看到具體的代碼實現。本質上,TCC 經過多個參與者的 try / confirm / cancel 方法,實現事務的最終一致性。cancelInvocationContext,取消執行業務方法調用上下文( InvocationContext )。
terminator,執行器( Terminator )。org.mengyun.tcctransaction.Terminator
實現代碼以下:
public class Terminator implements Serializable {
private static final long serialVersionUID = -164958655471605778L;
public Object invoke(TransactionContext transactionContext, InvocationContext invocationContext, Class<? extends TransactionContextEditor> transactionContextEditorClass) {
if (StringUtils.isNotEmpty(invocationContext.getMethodName())) {
try {
// 得到 參與者對象
Object target = FactoryBuilder.factoryOf(invocationContext.getTargetClass()).getInstance();
// 得到 方法
Method method = target.getClass().getMethod(invocationContext.getMethodName(), invocationContext.getParameterTypes());
// 設置 事務上下文 到方法參數
FactoryBuilder.factoryOf(transactionContextEditorClass).getInstance().set(transactionContext, target, method, invocationContext.getArgs());
// 執行方法
return method.invoke(target, invocationContext.getArgs());
} catch (Exception e) {
throw new SystemException(e);
}
}
return null;
}
}
複製代碼
transactionContextEditorClass,事務上下文編輯,在「6.1 Compensable」詳細解析。
提交 #commit()
方法,提交參與者本身的事務。
提交 #rollback()
方法,回滾參與者本身的事務。
org.mengyun.tcctransaction.TransactionManager
,事務管理器,提供事務的獲取、發起、提交、回滾,參與者的新增等等方法。
提供 begin()
方法,發起根事務。該方法在調用方法類型爲 MethodType.ROOT 而且 事務處於 Try 階段被調用。MethodType 在「6.2 可補償事務攔截器」詳細解析。
實現代碼以下:
// TransactionManager.java
/** * 發起根事務 * * @return 事務 */
public Transaction begin() {
// 建立 根事務
Transaction transaction = new Transaction(TransactionType.ROOT);
// 存儲 事務
transactionRepository.create(transaction);
// 註冊 事務
registerTransaction(transaction);
return transaction;
}
複製代碼
調用 Transaction 構造方法,建立根事務。實現代碼以下:
// Transaction.java
/** * 建立指定類型的事務 * * @param transactionType 事務類型 */
public Transaction(TransactionType transactionType) {
this.xid = new TransactionXid();
this.status = TransactionStatus.TRYING; // 嘗試中狀態
this.transactionType = transactionType;
}
複製代碼
TransactionManager#begin()
在調用,即只建立根事務。調用 TransactionRepository#crete()
方法,存儲事務。
調用 #registerTransaction(...)
方法,註冊事務到當前線程事務隊列。實現代碼以下:
// TransactionManager.java
/** * 當前線程事務隊列 */
private static final ThreadLocal<Deque<Transaction>> CURRENT = new ThreadLocal<Deque<Transaction>>();
/** * 註冊事務到當前線程事務隊列 * * @param transaction 事務 */
private void registerTransaction(Transaction transaction) {
if (CURRENT.get() == null) {
CURRENT.set(new LinkedList<Transaction>());
}
CURRENT.get().push(transaction); // 添加到頭部
}
複製代碼
org.springframework.transaction.annotation.Propagation.REQUIRES_NEW
。在下文,很快咱們就會看到 TCC-Transaction 本身的 org.mengyun.tcctransaction.api.Propagation
。調用 #propagationNewBegin(...)
方法,傳播發起分支事務。該方法在調用方法類型爲 MethodType.PROVIDER 而且 事務處於 Try 階段被調用。MethodType 在「6.2 可補償事務攔截器」詳細解析。
實現代碼以下:
/** * 傳播發起分支事務 * * @param transactionContext 事務上下文 * @return 分支事務 */
public Transaction propagationNewBegin(TransactionContext transactionContext) {
// 建立 分支事務
Transaction transaction = new Transaction(transactionContext);
// 存儲 事務
transactionRepository.create(transaction);
// 註冊 事務
registerTransaction(transaction);
return transaction;
}
複製代碼
調用 Transaction 構造方法,建立分支事務。實現代碼以下:
/** * 建立分支事務 * * @param transactionContext 事務上下文 */
public Transaction(TransactionContext transactionContext) {
this.xid = transactionContext.getXid(); // 事務上下文的 xid
this.status = TransactionStatus.TRYING; // 嘗試中狀態
this.transactionType = TransactionType.BRANCH; // 分支事務
}
複製代碼
調用 TransactionRepository#crete()
方法,存儲事務。爲何要存儲分支事務,在「6.3 資源協調者攔截器」詳細解析。
調用 #registerTransaction(...)
方法,註冊事務到當前線程事務隊列。
調用 #propagationExistBegin(...)
方法,傳播發起分支事務。該方法在調用方法類型爲 MethodType.PROVIDER 而且 事務處於 Confirm / Cancel 階段被調用。MethodType 在「6.2 可補償事務攔截器」詳細解析。
實現代碼以下:
/** * 傳播獲取分支事務 * * @param transactionContext 事務上下文 * @return 分支事務 * @throws NoExistedTransactionException 當事務不存在時 */
public Transaction propagationExistBegin(TransactionContext transactionContext) throws NoExistedTransactionException {
// 查詢 事務
Transaction transaction = transactionRepository.findByXid(transactionContext.getXid());
if (transaction != null) {
// 設置 事務 狀態
transaction.changeStatus(TransactionStatus.valueOf(transactionContext.getStatus()));
// 註冊 事務
registerTransaction(transaction);
return transaction;
} else {
throw new NoExistedTransactionException();
}
}
複製代碼
TransactionRepository#findByXid()
方法,查詢事務。Transaction#changeStatus(...)
方法,設置事務狀態爲 CONFIRMING 或 CANCELLING。#registerTransaction(...)
方法,註冊事務到當前線程事務隊列。#propagationNewBegin(...)
思考下。調用 #commit(...)
方法,提交事務。該方法在事務處於 Confirm / Cancel 階段被調用。
實現代碼以下:
/** * 提交事務 */
public void commit() {
// 獲取 事務
Transaction transaction = getCurrentTransaction();
// 設置 事務狀態 爲 CONFIRMING
transaction.changeStatus(TransactionStatus.CONFIRMING);
// 更新 事務
transactionRepository.update(transaction);
try {
// 提交 事務
transaction.commit();
// 刪除 事務
transactionRepository.delete(transaction);
} catch (Throwable commitException) {
logger.error("compensable transaction confirm failed.", commitException);
throw new ConfirmingException(commitException);
}
}
複製代碼
調用 #getCurrentTransaction()
方法, 獲取事務。實現代碼以下:
public Transaction getCurrentTransaction() {
if (isTransactionActive()) {
return CURRENT.get().peek(); // 得到頭部元素
}
return null;
}
public boolean isTransactionActive() {
Deque<Transaction> transactions = CURRENT.get();
return transactions != null && !transactions.isEmpty();
}
複製代碼
#registerTransaction(...)
註冊到隊列頭部。調用 Transaction#changeStatus(...)
方法, 設置事務狀態爲 CONFIRMING。
調用 TransactionRepository#update(...)
方法, 更新事務。
調用 Transaction#commit(...)
方法, 提交事務。
調用 TransactionRepository#delete(...)
方法,刪除事務。
調用 #rollback(...)
方法,取消事務,和 #commit()
方法基本相似。該方法在事務處於 Confirm / Cancel 階段被調用。
實現代碼以下:
/** * 回滾事務 */
public void rollback() {
// 獲取 事務
Transaction transaction = getCurrentTransaction();
// 設置 事務狀態 爲 CANCELLING
transaction.changeStatus(TransactionStatus.CANCELLING);
// 更新 事務
transactionRepository.update(transaction);
try {
// 提交 事務
transaction.rollback();
// 刪除 事務
transactionRepository.delete(transaction);
} catch (Throwable rollbackException) {
logger.error("compensable transaction rollback failed.", rollbackException);
throw new CancellingException(rollbackException);
}
}
複製代碼
#getCurrentTransaction()
方法,獲取事務。Transaction#changeStatus(...)
方法, 設置事務狀態爲 CANCELLING。TransactionRepository#update(...)
方法, 更新事務。Transaction#rollback(...)
方法, 回滾事務。TransactionRepository#delete(...)
方法,刪除事務。調用 #enlistParticipant(...)
方法,添加參與者到事務。該方法在事務處於 Try 階段被調用,在「6.3 資源協調者攔截器」有詳細解析。
實現代碼以下:
/** * 添加參與者到事務 * * @param participant 參與者 */
public void enlistParticipant(Participant participant) {
// 獲取 事務
Transaction transaction = this.getCurrentTransaction();
// 添加參與者
transaction.enlistParticipant(participant);
// 更新 事務
transactionRepository.update(transaction);
}
複製代碼
#getCurrentTransaction()
方法,獲取事務。Transaction#enlistParticipant(...)
方法, 添加參與者到事務。TransactionRepository#update(...)
方法, 更新事務。TCC-Transaction 基於 org.mengyun.tcctransaction.api.@Compensable
+ org.aspectj.lang.annotation.@Aspect
註解 AOP 切面實現業務方法的 TCC 事務聲明攔截,同 Spring 的 org.springframework.transaction.annotation.@Transactional
的實現。
TCC-Transaction 有兩個攔截器:
org.mengyun.tcctransaction.interceptor.CompensableTransactionInterceptor
,可補償事務攔截器。org.mengyun.tcctransaction.interceptor.ResourceCoordinatorInterceptor
,資源協調者攔截器。在分享攔截器的實現以前,咱們先一塊兒看看 @Compensable 註解。
@Compensable,標記可補償的方法註解。實現代碼以下:
public @interface Compensable {
/** * 傳播級別 */
Propagation propagation() default Propagation.REQUIRED;
/** * 確認執行業務方法 */
String confirmMethod() default "";
/** * 取消執行業務方法 */
String cancelMethod() default "";
/** * 事務上下文編輯 */
Class<? extends TransactionContextEditor> transactionContextEditor() default DefaultTransactionContextEditor.class;
}
複製代碼
propagation,傳播級別( Propagation ),默認 Propagation.REQUIRED。和 Spring 的 Propagation 除了缺乏幾個屬性,基本一致。實現代碼以下:
public enum Propagation {
/** * 支持當前事務,若是當前沒有事務,就新建一個事務。 */
REQUIRED(0),
/** * 支持當前事務,若是當前沒有事務,就以非事務方式執行。 */
SUPPORTS(1),
/** * 支持當前事務,若是當前沒有事務,就拋出異常。 */
MANDATORY(2),
/** * 新建事務,若是當前存在事務,把當前事務掛起。 */
REQUIRES_NEW(3);
private final int value;
}
複製代碼
confirmMethod,確認執行業務方法名。
cancelMethod,取消執行業務方法名。
TransactionContextEditor,事務上下文編輯器( TransactionContextEditor ),用於設置和得到事務上下文( TransactionContext ),在「6.3 資源協調者攔截器」能夠看到被調用,此處只看它的代碼實現。org.mengyun.tcctransaction.api.TransactionContextEditor
接口代碼以下:
public interface TransactionContextEditor {
/** * 從參數中得到事務上下文 * * @param target 對象 * @param method 方法 * @param args 參數 * @return 事務上下文 */
TransactionContext get(Object target, Method method, Object[] args);
/** * 設置事務上下文到參數中 * * @param transactionContext 事務上下文 * @param target 對象 * @param method 方法 * @param args 參數 */
void set(TransactionContext transactionContext, Object target, Method method, Object[] args);
}
複製代碼
DefaultTransactionContextEditor,默認事務上下文編輯器實現。實現代碼以下:
class DefaultTransactionContextEditor implements TransactionContextEditor {
@Override
public TransactionContext get(Object target, Method method, Object[] args) {
int position = getTransactionContextParamPosition(method.getParameterTypes());
if (position >= 0) {
return (TransactionContext) args[position];
}
return null;
}
@Override
public void set(TransactionContext transactionContext, Object target, Method method, Object[] args) {
int position = getTransactionContextParamPosition(method.getParameterTypes());
if (position >= 0) {
args[position] = transactionContext; // 設置方法參數
}
}
/** * 得到事務上下文在方法參數裏的位置 * * @param parameterTypes 參數類型集合 * @return 位置 */
public static int getTransactionContextParamPosition(Class<?>[] parameterTypes) {
int position = -1;
for (int i = 0; i < parameterTypes.length; i++) {
if (parameterTypes[i].equals(org.mengyun.tcctransaction.api.TransactionContext.class)) {
position = i;
break;
}
}
return position;
}
}
複製代碼
NullableTransactionContextEditor,無事務上下文編輯器實現。實現代碼以下:
class NullableTransactionContextEditor implements TransactionContextEditor {
@Override
public TransactionContext get(Object target, Method method, Object[] args) {
return null;
}
@Override
public void set(TransactionContext transactionContext, Object target, Method method, Object[] args) {
}
}
複製代碼
DubboTransactionContextEditor,Dubbo 事務上下文編輯器實現,經過 Dubbo 隱式傳參方式得到事務上下文,在《TCC-Transaction 源碼解析 —— Dubbo 支持》詳細解析。
先一塊兒來看下可補償事務攔截器對應的切面 org.mengyun.tcctransaction.interceptor.CompensableTransactionAspect
,實現代碼以下:
@Aspect
public abstract class CompensableTransactionAspect {
private CompensableTransactionInterceptor compensableTransactionInterceptor;
public void setCompensableTransactionInterceptor(CompensableTransactionInterceptor compensableTransactionInterceptor) {
this.compensableTransactionInterceptor = compensableTransactionInterceptor;
}
@Pointcut("@annotation(org.mengyun.tcctransaction.api.Compensable)")
public void compensableService() {
}
@Around("compensableService()")
public Object interceptCompensableMethod(ProceedingJoinPoint pjp) throws Throwable {
return compensableTransactionInterceptor.interceptCompensableMethod(pjp);
}
public abstract int getOrder();
}
複製代碼
org.aspectj.lang.annotation.@Pointcut
+ org.aspectj.lang.annotation.@Around
註解,配置對 @Compensable 註解的方法進行攔截,調用 CompensableTransactionInterceptor#interceptCompensableMethod(...)
方法進行處理。CompensableTransactionInterceptor 實現代碼以下:
public class CompensableTransactionInterceptor {
private TransactionManager transactionManager;
private Set<Class<? extends Exception>> delayCancelExceptions;
public Object interceptCompensableMethod(ProceedingJoinPoint pjp) throws Throwable {
// 得到帶 @Compensable 註解的方法
Method method = CompensableMethodUtils.getCompensableMethod(pjp);
//
Compensable compensable = method.getAnnotation(Compensable.class);
Propagation propagation = compensable.propagation();
// 得到 事務上下文
TransactionContext transactionContext = FactoryBuilder.factoryOf(compensable.transactionContextEditor()).getInstance().get(pjp.getTarget(), method, pjp.getArgs());
// 當前線程是否在事務中
boolean isTransactionActive = transactionManager.isTransactionActive();
// 判斷事務上下文是否合法
if (!TransactionUtils.isLegalTransactionContext(isTransactionActive, propagation, transactionContext)) {
throw new SystemException("no active compensable transaction while propagation is mandatory for method " + method.getName());
}
// 計算方法類型
MethodType methodType = CompensableMethodUtils.calculateMethodType(propagation, isTransactionActive, transactionContext);
// 處理
switch (methodType) {
case ROOT:
return rootMethodProceed(pjp);
case PROVIDER:
return providerMethodProceed(pjp, transactionContext);
default:
return pjp.proceed();
}
}
}
複製代碼
調用 CompensableMethodUtils#getCompensableMethod(...)
方法,得到帶 @Compensable 註解的方法。實現代碼以下:
// CompensableMethodUtils.java
/** * 得到帶 @Compensable 註解的方法 * * @param pjp 切面點 * @return 方法 */
public static Method getCompensableMethod(ProceedingJoinPoint pjp) {
Method method = ((MethodSignature) (pjp.getSignature())).getMethod(); // 代理方法對象
if (method.getAnnotation(Compensable.class) == null) {
try {
method = pjp.getTarget().getClass().getMethod(method.getName(), method.getParameterTypes()); // 實際方法對象
} catch (NoSuchMethodException e) {
return null;
}
}
return method;
}
複製代碼
調用 TransactionContextEditor#get(...)
方法,從參數中得到事務上下文。爲何從參數中能夠得到事務上下文呢?在「6.3 資源協調者攔截器」揭曉答案。
調用 TransactionManager#isTransactionActive()
方法,當前線程是否在事務中。實現代碼以下:
// TransactionManager.java
private static final ThreadLocal<Deque<Transaction>> CURRENT = new ThreadLocal<Deque<Transaction>>();
public boolean isTransactionActive() {
Deque<Transaction> transactions = CURRENT.get();
return transactions != null && !transactions.isEmpty();
}
複製代碼
調用 TransactionUtils#isLegalTransactionContext(...)
方法,判斷事務上下文是否合法。實現代碼以下:
// TransactionUtils.java
/** * 判斷事務上下文是否合法 * 在 Propagation.MANDATORY 必須有在事務內 * * @param isTransactionActive 是否 * @param propagation 傳播級別 * @param transactionContext 事務上下文 * @return 是否合法 */
public static boolean isLegalTransactionContext(boolean isTransactionActive, Propagation propagation, TransactionContext transactionContext) {
if (propagation.equals(Propagation.MANDATORY) && !isTransactionActive && transactionContext == null) {
return false;
}
return true;
}
複製代碼
調用 CompensableMethodUtils#calculateMethodType(...)
方法,計算方法類型。實現代碼以下:
/** * 計算方法類型 * * @param propagation 傳播級別 * @param isTransactionActive 是否事務開啓 * @param transactionContext 事務上下文 * @return 方法類型 */
public static MethodType calculateMethodType(Propagation propagation, boolean isTransactionActive, TransactionContext transactionContext) {
if ((propagation.equals(Propagation.REQUIRED) && !isTransactionActive && transactionContext == null) // Propagation.REQUIRED:支持當前事務,當前沒有事務,就新建一個事務。
|| propagation.equals(Propagation.REQUIRES_NEW)) { // Propagation.REQUIRES_NEW:新建事務,若是當前存在事務,把當前事務掛起。
return MethodType.ROOT;
} else if ((propagation.equals(Propagation.REQUIRED) // Propagation.REQUIRED:支持當前事務
|| propagation.equals(Propagation.MANDATORY)) // Propagation.MANDATORY:支持當前事務
&& !isTransactionActive && transactionContext != null) {
return MethodType.PROVIDER;
} else {
return MethodType.NORMAL;
}
}
複製代碼
當方法類型爲 MethodType.ROOT 時,調用 #rootMethodProceed(...)
方法,發起 TCC 總體流程。實現代碼以下:
private Object rootMethodProceed(ProceedingJoinPoint pjp) throws Throwable {
Object returnValue;
Transaction transaction = null;
try {
// 發起根事務
transaction = transactionManager.begin();
// 執行方法原邏輯
try {
returnValue = pjp.proceed();
} catch (Throwable tryingException) {
if (isDelayCancelException(tryingException)) { // 是否延遲迴滾
} else {
logger.warn(String.format("compensable transaction trying failed. transaction content:%s", JSON.toJSONString(transaction)), tryingException);
// 回滾事務
transactionManager.rollback();
}
throw tryingException;
}
// 提交事務
transactionManager.commit();
} finally {
// 將事務從當前線程事務隊列移除
transactionManager.cleanAfterCompletion(transaction);
}
return returnValue;
}
複製代碼
調用 #transactionManager()
方法,發起根事務,TCC Try 階段開始。
調用 ProceedingJoinPoint#proceed()
方法,執行方法原邏輯( 即 Try 邏輯 )。
當原邏輯執行異常時,TCC Try 階段失敗,調用 TransactionManager#rollback(...)
方法,TCC Cancel 階段,回滾事務。此處 #isDelayCancelException(...)
方法,判斷異常是否爲延遲取消回滾異常,部分異常不適合當即回滾事務,在《TCC-Transaction 源碼分析 —— 事務恢復》詳細解析。
當原邏輯執行成功時,TCC Try 階段成功,調用 TransactionManager#commit(...)
方法,TCC Confirm 階段,提交事務。
調用 TransactionManager#cleanAfterCompletion(...)
方法,將事務從當前線程事務隊列移除,避免線程衝突。實現代碼以下:
// TransactionManager.java
public void cleanAfterCompletion(Transaction transaction) {
if (isTransactionActive() && transaction != null) {
Transaction currentTransaction = getCurrentTransaction();
if (currentTransaction == transaction) {
CURRENT.get().pop();
} else {
throw new SystemException("Illegal transaction when clean after completion");
}
}
}
複製代碼
當方法類型爲 Propagation.PROVIDER 時,服務提供者參與 TCC 總體流程。實現代碼以下:
private Object providerMethodProceed(ProceedingJoinPoint pjp, TransactionContext transactionContext) throws Throwable {
Transaction transaction = null;
try {
switch (TransactionStatus.valueOf(transactionContext.getStatus())) {
case TRYING:
// 傳播發起分支事務
transaction = transactionManager.propagationNewBegin(transactionContext);
return pjp.proceed();
case CONFIRMING:
try {
// 傳播獲取分支事務
transaction = transactionManager.propagationExistBegin(transactionContext);
// 提交事務
transactionManager.commit();
} catch (NoExistedTransactionException excepton) {
//the transaction has been commit,ignore it.
}
break;
case CANCELLING:
try {
// 傳播獲取分支事務
transaction = transactionManager.propagationExistBegin(transactionContext);
// 回滾事務
transactionManager.rollback();
} catch (NoExistedTransactionException exception) {
//the transaction has been rollback,ignore it.
}
break;
}
} finally {
// 將事務從當前線程事務隊列移除
transactionManager.cleanAfterCompletion(transaction);
}
// 返回空值
Method method = ((MethodSignature) (pjp.getSignature())).getMethod();
return ReflectionUtils.getNullValue(method.getReturnType());
}
複製代碼
當事務處於 TransactionStatus.TRYING 時,調用 TransactionManager#propagationExistBegin(...)
方法,傳播發起分支事務。發起分支事務完成後,調用 ProceedingJoinPoint#proceed()
方法,執行方法原邏輯( 即 Try 邏輯 )。
當事務處於 TransactionStatus.CONFIRMING 時,調用 TransactionManager#commit()
方法,提交事務。
當事務處於 TransactionStatus.CANCELLING 時,調用 TransactionManager#rollback()
方法,提交事務。
調用 TransactionManager#cleanAfterCompletion(...)
方法,將事務從當前線程事務隊列移除,避免線程衝突。
當事務處於 TransactionStatus.CONFIRMING / TransactionStatus.CANCELLING 時,調用 ReflectionUtils#getNullValue(...)
方法,返回空值。爲何返回空值?Confirm / Cancel 相關方法,是經過 AOP 切面調用,只調用,不處理返回值,可是又不能沒有返回值,所以直接返回空。實現代碼以下:
public static Object getNullValue(Class type) {
// 處理基本類型
if (boolean.class.equals(type)) {
return false;
} else if (byte.class.equals(type)) {
return 0;
} else if (short.class.equals(type)) {
return 0;
} else if (int.class.equals(type)) {
return 0;
} else if (long.class.equals(type)) {
return 0;
} else if (float.class.equals(type)) {
return 0;
} else if (double.class.equals(type)) {
return 0;
}
// 處理對象
return null;
}
複製代碼
當方法類型爲 Propagation.NORMAL 時,執行方法原邏輯,不進行事務處理。
先一塊兒來看下資源協調者攔截器 對應的切面 org.mengyun.tcctransaction.interceptor.CompensableTransactionAspect
,實現代碼以下:
@Aspect
public abstract class ResourceCoordinatorAspect {
private ResourceCoordinatorInterceptor resourceCoordinatorInterceptor;
@Pointcut("@annotation(org.mengyun.tcctransaction.api.Compensable)")
public void transactionContextCall() {
}
@Around("transactionContextCall()")
public Object interceptTransactionContextMethod(ProceedingJoinPoint pjp) throws Throwable {
return resourceCoordinatorInterceptor.interceptTransactionContextMethod(pjp);
}
public void setResourceCoordinatorInterceptor(ResourceCoordinatorInterceptor resourceCoordinatorInterceptor) {
this.resourceCoordinatorInterceptor = resourceCoordinatorInterceptor;
}
public abstract int getOrder();
}
複製代碼
org.aspectj.lang.annotation.@Pointcut
+ org.aspectj.lang.annotation.@Around
註解,配置對 @Compensable 註解的方法進行攔截,調用 ResourceCoordinatorInterceptor#interceptTransactionContextMethod(...)
方法進行處理。ResourceCoordinatorInterceptor 實現代碼以下:
public class ResourceCoordinatorInterceptor {
private TransactionManager transactionManager;
public Object interceptTransactionContextMethod(ProceedingJoinPoint pjp) throws Throwable {
Transaction transaction = transactionManager.getCurrentTransaction();
if (transaction != null) {
switch (transaction.getStatus()) {
case TRYING:
// 添加事務參與者
enlistParticipant(pjp);
break;
case CONFIRMING:
break;
case CANCELLING:
break;
}
}
// 執行方法原邏輯
return pjp.proceed(pjp.getArgs());
}
}
複製代碼
#enlistParticipant(...)
方法,添加事務參與者。ProceedingJoinPoint#proceed(...)
方法,執行方法原邏輯。ResourceCoordinatorInterceptor#enlistParticipant()
實現代碼以下:
private void enlistParticipant(ProceedingJoinPoint pjp) throws IllegalAccessException, InstantiationException {
// 得到 @Compensable 註解
Method method = CompensableMethodUtils.getCompensableMethod(pjp);
if (method == null) {
throw new RuntimeException(String.format("join point not found method, point is : %s", pjp.getSignature().getName()));
}
Compensable compensable = method.getAnnotation(Compensable.class);
// 得到 確認執行業務方法 和 取消執行業務方法
String confirmMethodName = compensable.confirmMethod();
String cancelMethodName = compensable.cancelMethod();
// 獲取 當前線程事務第一個(頭部)元素
Transaction transaction = transactionManager.getCurrentTransaction();
// 建立 事務編號
TransactionXid xid = new TransactionXid(transaction.getXid().getGlobalTransactionId());
// TODO
if (FactoryBuilder.factoryOf(compensable.transactionContextEditor()).getInstance().get(pjp.getTarget(), method, pjp.getArgs()) == null) {
FactoryBuilder.factoryOf(compensable.transactionContextEditor()).getInstance().set(new TransactionContext(xid, TransactionStatus.TRYING.getId()), pjp.getTarget(), ((MethodSignature) pjp.getSignature()).getMethod(), pjp.getArgs());
}
// 得到類
Class targetClass = ReflectionUtils.getDeclaringType(pjp.getTarget().getClass(), method.getName(), method.getParameterTypes());
// 建立 確認執行方法調用上下文 和 取消執行方法調用上下文
InvocationContext confirmInvocation = new InvocationContext(targetClass,
confirmMethodName,
method.getParameterTypes(), pjp.getArgs());
InvocationContext cancelInvocation = new InvocationContext(targetClass,
cancelMethodName,
method.getParameterTypes(), pjp.getArgs());
// 建立 事務參與者
Participant participant =
new Participant(
xid,
confirmInvocation,
cancelInvocation,
compensable.transactionContextEditor());
// 添加 事務參與者 到 事務
transactionManager.enlistParticipant(participant);
}
複製代碼
調用 CompensableMethodUtils#getCompensableMethod(...)
方法,得到帶 @Compensable 註解的方法。
調用 #getCurrentTransaction()
方法, 獲取事務。
調用 TransactionXid 構造方法,建立分支事務編號。實現代碼以下:
/** * 全局事務編號 */
private byte[] globalTransactionId;
/** * 分支事務編號 */
private byte[] branchQualifier;
public TransactionXid(byte[] globalTransactionId) {
this.globalTransactionId = globalTransactionId;
branchQualifier = uuidToByteArray(UUID.randomUUID()); // 生成 分支事務編號
}
複製代碼
branchQualifier
) 須要生成。TODO TransactionContext 和 Participant 的關係。
調用 ReflectionUtils#getDeclaringType(...)
方法,得到聲明 @Compensable 方法的實際類。實現代碼以下:
public static Class getDeclaringType(Class aClass, String methodName, Class<?>[] parameterTypes) {
Method method;
Class findClass = aClass;
do {
Class[] clazzes = findClass.getInterfaces();
for (Class clazz : clazzes) {
try {
method = clazz.getDeclaredMethod(methodName, parameterTypes);
} catch (NoSuchMethodException e) {
method = null;
}
if (method != null) {
return clazz;
}
}
findClass = findClass.getSuperclass();
} while (!findClass.equals(Object.class));
return aClass;
}
複製代碼
調用 InvocationContext 構造方法,分別建立確認執行方法調用上下文和取消執行方法調用上下文。實現代碼以下:
/** * 類 */
private Class targetClass;
/** * 方法名 */
private String methodName;
/** * 參數類型數組 */
private Class[] parameterTypes;
/** * 參數數組 */
private Object[] args;
public InvocationContext(Class targetClass, String methodName, Class[] parameterTypes, Object... args) {
this.methodName = methodName;
this.parameterTypes = parameterTypes;
this.targetClass = targetClass;
this.args = args;
}
複製代碼
調用 Participant 構造方法,建立事務參與者。實現代碼以下:
public class Participant implements Serializable {
private static final long serialVersionUID = 4127729421281425247L;
/** * 事務編號 */
private TransactionXid xid;
/** * 確認執行業務方法調用上下文 */
private InvocationContext confirmInvocationContext;
/** * 取消執行業務方法 */
private InvocationContext cancelInvocationContext;
/** * 執行器 */
private Terminator terminator = new Terminator();
/** * 事務上下文編輯 */
Class<? extends TransactionContextEditor> transactionContextEditorClass;
public Participant() {
}
public Participant(TransactionXid xid, InvocationContext confirmInvocationContext, InvocationContext cancelInvocationContext, Class<? extends TransactionContextEditor> transactionContextEditorClass) {
this.xid = xid;
this.confirmInvocationContext = confirmInvocationContext;
this.cancelInvocationContext = cancelInvocationContext;
this.transactionContextEditorClass = transactionContextEditorClass;
}
}
複製代碼
調用 TransactionManager#enlistParticipant(...)
方法,添加事務參與者到事務。
受限於本人的能力,蠻多處表達不夠清晰或者易懂,很是抱歉。若是你對任何地方有任何疑問,歡迎添加本人微信號( wangwenbin-server ),期待與你的交流。不限於 TCC,也能夠是分佈式事務,也能夠是微服務,以及等等。
外送一本武林祕籍:帶中文註釋的 TCC-Transaction 倉庫地址,目前正在慢慢完善。傳送門:github.com/YunaiV/tcc-…。
再送一本葵花寶典:《TCC型分佈式事務原理和實現》系列。
胖友,分享一個朋友圈可好?