Seata 分佈式事務框架 TCC 模式源碼分析

Seata 是什麼

Seata 是阿里近期開源的分佈式事務框架,地址:github.com/seata/seata。框架包括了集團的 TXC(雲版本叫 GTS)和螞蟻金服的 TCC 兩種模式,短短數月 Github 上的 star 數已經接近一萬,算是目前惟一有大廠背書的分佈式事務解決方案。java

TXCSeata 中又叫 AT 模式,意爲補償方法是框架自動生成的,對用戶徹底屏蔽,用戶能夠向使用本地事務那樣使用分佈式事務,缺點是僅支持關係型數據庫(目前支持 MySQL),引入 Seata AT 的服務須要本地建表存儲 rollback_info,隔離級別默認 RU 適用場景有限。git

TCC 不算是新概念,很早就有了,用戶經過定義 try/confirm/cancel 三個方法在應用層面模擬兩階段提交,區別在於 TCC 中 try 方法也須要操做數據庫進行資源鎖定,後續兩個補償方法由框架自動調用,分別進行資源提交和回滾,這點同單純的存儲層 2PC 不太同樣。螞蟻金服向 Seata 貢獻了本身的 TCC 實現,聽說已經演化了十多年,大量應用在在金融、交易、倉儲等領域。程序員

分佈式事務的誕生背景

早期應用都是單一架構,例如支付服務涉及到的帳戶、金額、訂單系統等都由單一應用負責,底層訪問同一個數據庫實例,天然事務操做也是本地事務,藉助 Spring 能夠輕鬆實現;可是因爲量級愈來愈大,單一服務須要進行職責拆分變爲三個獨立的服務,經過 RPC 進行調用,數據也存在不一樣的數據庫實例中,因爲這時一次業務操做涉及對多個數據庫數據的修改,沒法再依靠本地事務,只能經過分佈式事務框架來解決。github

TCC 就是分佈式事務的一種解決方案,屬於柔性補償型,優勢在於理解簡單、僅 try 階段加鎖併發性能較好,缺點在於代碼改形成本。spring

什麼是 TCC 本文就再也不贅述了,TCC 的概念自己並不複雜數據庫

Seata TCC 使用方法

在分析源碼以前,咱們先簡要說起下 Seata TCC 模式的使用方法,有助於後續理解整個 TCC 流程。session

Seata TCC 參與方

Seata 中的 TCC 模式要求 TCC 服務的參與方在接口上增長 @TwoPhaseBusinessAction 註解,註明 TCC 接口的名稱(全局惟一),TCC 接口的 confirmcancel 方法的名稱,用於後續框架反射調用,下面是一個 TCC 接口的案例:架構

public interface TccAction {
    @TwoPhaseBusinessAction(name = "yourTccActionName", commitMethod = "confirm", rollbackMethod = "cancel")
    public boolean try(BusinessActionContext businessActionContext, int a, int b);
    public boolean confirm(BusinessActionContext businessActionContext);
    public boolean cancel(BusinessActionContext businessActionContext);
}
複製代碼

緊接着定義實現類 Impl 實現這個接口,爲三個方法提供具體實現。最後將參與方服務進行發佈,註冊到遠端,主要爲了後續能讓 Seata 框架調用到參與方的 confirm 或者 cancel 方法閉環整個 TCC 事務。併發

Seata TCC 發起方

Seata TCC 的發起方相似於咱們上圖中的 payment service,參與方須要在業務方法上增長 @GlobalTransactional 註解,用於開啓切面註冊全局事務,業務方法中調用 TCC 參與方的若干 try 方法,一旦業務方法調用成功,Seata 框架會通知 TC 回調這些參與方的 confirmcancel 方法。app

源碼分析

SeataTCC 模式的源碼並不複雜,主要集中於:

module class 功能
seata-spring GlobalTransactionalInterceptor.class 全局事務切面邏輯,包括註冊全局事務,拿到 xid
seata-spring TccActionInterceptor.class TCC 參與方切面邏輯
seata-tcc TCCResourceManager.class 解析 TCC Bean,保存 TCC Resources,便於後續回調
seata-tcc ActionInterceptorHandler.class TCC 分支事務註冊實現
seata-server DefaultCoordinator.class、FileTransactionStoreManager.class 主要是 TC 的實現、事務存儲等實現

註冊 TCC Resources

Seata 中一個 TCC 接口被稱做一個 TCC Resources,其結構以下:

public class TCCResource implements Resource {

    private String resourceGroupId = "DEFAULT";

    private String appName;

    private String actionName; // TCC 接口名稱 

    private Object targetBean; // TCC Bean

    private Method prepareMethod; // try 方法

    private String commitMethodName;

    private Method commitMethod; // confirm 方法

    private String rollbackMethodName;

    private Method rollbackMethod; // cancel 方法

    // …… 省略
}
複製代碼

Seata 解析到應用中存在 TCC Bean,則經過 parserRemotingServiceInfo 方法生成一個 TCCResource 對象,進而調用 TCCResourceManager 類的 registerResource 方法,將 TCCResource 對象保存到本地的 tccResourceCache 中,它是一個 ConcurrentHashMap 結構,同時經過 RmRpcClient 將該 TCCResourceresourceIdaddress 等信息註冊到服務端,便於後續 TC 經過 RPC 回調到正確的地址。

// 解析 TCCResource 的部分代碼
Class<?> interfaceClass = remotingBeanDesc.getInterfaceClass();
Method[] methods = interfaceClass.getMethods();
if(isService(bean, beanName)){
    try {
        // 若是是 TCC service Bean,解析並註冊該 resource
        Object targetBean = remotingBeanDesc.getTargetBean();
        for(Method m : methods){
            TwoPhaseBusinessAction twoPhaseBusinessAction = m.getAnnotation(TwoPhaseBusinessAction.class);
            if(twoPhaseBusinessAction != null){
                // 若是有 TCC 參與方註解,定義一個 TCCResource,
                TCCResource tccResource = new TCCResource();
                tccResource.setActionName(twoPhaseBusinessAction.name());
                // TCC Bean
                tccResource.setTargetBean(targetBean); 
                // try 方法
                tccResource.setPrepareMethod(m); 
                // confirm 方法名稱
                tccResource.setCommitMethodName(twoPhaseBusinessAction.commitMethod());
                // confirm 方法對象
                tccResource.setCommitMethod(ReflectionUtil.getMethod(interfaceClass, twoPhaseBusinessAction.commitMethod(), new Class[]{BusinessActionContext.class}));
                // cancel 方法名稱
                tccResource.setRollbackMethodName(twoPhaseBusinessAction.rollbackMethod());
                // cancel 方法對象
                tccResource.setRollbackMethod(ReflectionUtil.getMethod(interfaceClass, twoPhaseBusinessAction.rollbackMethod(), new Class[]{BusinessActionContext.class}));
                // 調用到 TCCResourceManager 的 registerResource 方法
                DefaultResourceManager.get().registerResource(tccResource);
            }
        }
    }catch (Throwable t){
        throw new FrameworkException(t, "parser remting service error");
    }
}
複製代碼

咱們看一下 TCCResourceManagerregisterResource 方法的實現:

// 內存中保存的 resourceId 和 TCCResource 的映射關係
private Map<String, Resource> tccResourceCache = new ConcurrentHashMap<String, Resource>();

@Override
public void registerResource(Resource resource) {
    TCCResource tccResource = (TCCResource) resource;
    tccResourceCache.put(tccResource.getResourceId(), tccResource);
    // 調用父類的方法經過 RPC 註冊到遠端
    super.registerResource(tccResource);
}
複製代碼

咱們看下 TCCResource 是如何註冊到服務端的:

public void registerResource(Resource resource) {
    // 拿到 RmRpcClient 實例,調用其 registerResource 方法
    RmRpcClient.getInstance().registerResource(resource.getResourceGroupId(), resource.getResourceId());
}

public void registerResource(String resourceGroupId, String resourceId) {
    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("register to RM resourceId:" + resourceId);
    }
    synchronized (channels) {
        for (Map.Entry<String, Channel> entry : channels.entrySet()) {
            String serverAddress = entry.getKey();
            Channel rmChannel = entry.getValue();
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("register resource, resourceId:" + resourceId);
            }
            // 註冊 resourceId,遠端將其解析爲一個 RpcContext 保存在內存中
            sendRegisterMessage(serverAddress, rmChannel, resourceId);
        }
    }
}
複製代碼

GlobalTransaction 註冊全局事務

GlobalTransaction 註解是全局事務的入口,其切面邏輯實如今 GlobalTransactionalInterceptor 類中。若是判斷進入 @GlobalTransaction 修飾的方法,會調用 handleGlobalTransaction 方法進入切面邏輯,其中關鍵方法是 transactionalTemplateexecute 方法。

public Object execute(TransactionalExecutor business) throws Throwable {
    
    // 若是上游已經有 xid 傳過來講明本身是下游,直接參與到這個全局事務中就能夠,沒必要新開一個,角色是 Participant
    // 若是上游沒有 xid 傳遞過來,說明本身是發起方,新開啓一個全局事務,角色是 Launcher
    GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();

    // …… …… 省略 

    try {

        // 開啓全局事務
        beginTransaction(txInfo, tx);

        Object rs = null;
        try {

            // 調用業務方法
            rs = business.execute();

        } catch (Throwable ex) {

            // 若是拋異常,通知 TC 回滾全局事務
            completeTransactionAfterThrowing(txInfo,tx,ex);
            throw ex;
        }

        // 若是不拋異常,通知 TC 提交全局事務
        commitTransaction(tx);

        return rs;
    } 

    // …… …… 省略
}
複製代碼

beginTransaction 方法調用了 transactionManagerbegin 方法:

// 客戶端
@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout) throws TransactionException {
    GlobalBeginRequest request = new GlobalBeginRequest();
    request.setTransactionName(name);
    request.setTimeout(timeout);
    // 發送 RPC,獲取 TC 下發的 xid
    GlobalBeginResponse response = (GlobalBeginResponse)syncCall(request);
    return response.getXid();
}

// 服務端
@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout) throws TransactionException {
    // 全局事務用 GlobalSession 來表示
    GlobalSession session = GlobalSession.createGlobalSession(
        applicationId, transactionServiceGroup, name, timeout);
    session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
    // 將 GlobalSession 寫入文件存儲
    session.begin();
    // 返回 UUID 做爲全局事務 ID
    return XID.generateXID(session.getTransactionId());
}
複製代碼

TwoPhaseBusinessAction 註冊分支事務

全局事務調用業務方法時,會進入 TCC 參與方的切面邏輯,主要實如今 TccActionInterceptor 類中,關鍵方法是 actionInterceptorHandlerproceed 方法。

public Map<String, Object> proceed(Method method, Object[] arguments, TwoPhaseBusinessAction businessAction, Callback<Object> targetCallback) throws Throwable {
    
    // …… …… 省略

    // 建立分支事務
    String branchId = doTccActionLogStore(method, arguments, businessAction, actionContext);
    actionContext.setBranchId(branchId);
    
    // 記錄方法參數
    Class<?>[] types = method.getParameterTypes();
    int argIndex = 0;
    for (Class<?> cls : types) {
        if (cls.getName().equals(BusinessActionContext.class.getName())) {
            arguments[argIndex] = actionContext;
            break;
        }
        argIndex++;
    }
    
    // …… …… 省略
}
複製代碼

doTccActionLogStore 方法負責註冊分支事務:

// 客戶端
protected String doTccActionLogStore(Method method, Object[] arguments, TwoPhaseBusinessAction businessAction, BusinessActionContext actionContext) {
    String actionName = actionContext.getActionName();
    // 拿到全局事務 ID
    String xid = actionContext.getXid();
    
    // …… …… 省略

    try {
        // resourceManager 經過 RPC 向 TC 註冊分支事務
        Long branchId = DefaultResourceManager.get().branchRegister(BranchType.TCC, actionName, null, xid, applicationContextStr, null);
        // 拿到 TC 返回的分支事務 ID
        return String.valueOf(branchId);
    }

    // …… …… 省略
}

// 服務端
@Override
public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String applicationData, String lockKeys) throws TransactionException {
    GlobalSession globalSession = assertGlobalSession(XID.getTransactionId(xid), GlobalStatus.Begin);
    // 分支事務用 BranchSession 表示,新建一個 BranchSession
    BranchSession branchSession = SessionHelper.newBranchByGlobal(globalSession, branchType, resourceId,
        applicationData, lockKeys, clientId);

    if (!branchSession.lock()) {
        throw new TransactionException(LockKeyConflict);
    }
    try {
        // 將分支事務加入全局事務中,也會寫文件
        globalSession.addBranch(branchSession);
    } catch (RuntimeException ex) {
        throw new TransactionException(FailedToAddBranch);
    }
    // 返回分支事務 ID
    return branchSession.getBranchId();
}
複製代碼

TC 回調參與方補償方法

分支事務註冊完畢,業務方法調用成功則通知 TC 提交全局事務。

@Override
public void commit() throws TransactionException {
    // 若是是參與者,無需發起提交請求
    if (role == GlobalTransactionRole.Participant) {
        return;
    }
    // 由 TM 向 TC 發出提交全局事務的請求
    status = transactionManager.commit(xid);
}
複製代碼

TC 收到客戶端 TMcommit 請求後:

@Override
public GlobalStatus commit(String xid) throws TransactionException {
    // 根據 xid 找出 GlobalSession
    GlobalSession globalSession = SessionHolder.findGlobalSession(XID.getTransactionId(xid));
    if (globalSession == null) {
        return GlobalStatus.Finished;
    }
    GlobalStatus status = globalSession.getStatus();

    // 關閉這個 GlobalSession,不讓後續的分支事務再註冊上來
    globalSession.closeAndClean(); 

    if (status == GlobalStatus.Begin) {
        // 修改狀態爲提交進行中
        globalSession.changeStatus(GlobalStatus.Committing);
        // 一旦分支事務中存在 TCC,作同步提交,其實 TCC 分支也能夠異步提交,要求高性能時能夠選擇異步
        if (globalSession.canBeCommittedAsync()) {
            asyncCommit(globalSession);
        } else {
            doGlobalCommit(globalSession, false);
        }
    }
    return globalSession.getStatus();
}
複製代碼

doGlobalCommit 是咱們關注的關鍵方法,咱們忽略其中的次要邏輯:

@Override
public void doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException {
    for (BranchSession branchSession : globalSession.getSortedBranches()) {
        
        // …… …… 省略

        try {
            // 調用 DefaultCoordinator 的 branchCommit 方法作分支提交
            // 參數有分支事務 id,resourceId 用來尋找對應的 TCCResource 和補償方法參數信息
            BranchStatus branchStatus = resourceManagerInbound.branchCommit(branchSession.getBranchType(),
                XID.generateXID(branchSession.getTransactionId()), branchSession.getBranchId(),
                branchSession.getResourceId(), branchSession.getApplicationData());
        }
    }

    // …… …… 省略
}
複製代碼

服務端的 DefaultCoordinator 類中的 branchCommit 方法發出 RPC 請求,調用對應 TCCResource 提供方:

@Override
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
    
    // …… …… 省略
    // 獲取全局事務和分支事務
    GlobalSession globalSession = SessionHolder.findGlobalSession(XID.getTransactionId(xid));
        BranchSession branchSession = globalSession.getBranch(branchId);
    // 根據 resourceId 找到對應的 channel 和 RpcContext 
    BranchCommitResponse response = (BranchCommitResponse)messageSender.sendSyncRequest(resourceId,
        branchSession.getClientId(), request);
    // 返回分支事務提交狀態
    return response.getBranchStatus();

    // …… …… 省略
}
複製代碼

客戶端天然是接收到分支提交的 RPC 請求,而後本地找出以前解析並保持下來的 TCCResource 進行補償方法的反射調用,下面咱們截取其中的關鍵步驟進行分析。

@Override
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
    // 根據 resourceId 找出內存中保留的 TCCResource 對象
    TCCResource tccResource = (TCCResource) tccResourceCache.get(resourceId);
    if(tccResource == null){
        throw new ShouldNeverHappenException("TCC resource is not exist, resourceId:" + resourceId);
    }
    // 獲取 targetBean 和相應的 method 對象
    Object targetTCCBean = tccResource.getTargetBean();
    Method commitMethod = tccResource.getCommitMethod();
    try {
        boolean result = false;
        // 取出補償方法參數信息
        BusinessActionContext businessActionContext = getBusinessActionContext(xid, branchId, resourceId, applicationData);
        // 反射調用補償方法
        Object ret = commitMethod.invoke(targetTCCBean, businessActionContext);
        // 返回狀態
        return result ? BranchStatus.PhaseTwo_Committed:BranchStatus.PhaseTwo_CommitFailed_Retryable;
    }
    // …… …… 省略
}
複製代碼

事務存儲

關於 Seata TC 模塊如何進行事務存儲,網上有的文章已經講得很詳細,例如 深度剖析一站式分佈式事務方案 Seata-Server,所以這裏再也不贅述。

須要說起的一點是,TC 有可能成爲整個分佈式事務服務的性能瓶頸,所以如何作到高性能高可用很重要,目前的存儲方式是 File,代碼中也有關於 DB Store ModeTODO 項,文件相比於 DB 性能確定好一些可是可用性會差一點,這塊怎麼保證要等到後續 HA Cluster 發佈以後再看。

總結

整個 Seata 框架中關於 TCC 部分的源碼並不複雜,本文只選取了部分類中的關鍵代碼進行展現,忽略了一些判斷邏輯和異常處理,筆者認爲 Seata TCC 中關於 TCC 異常的封裝和自定義處理、還有各類用戶擴展埋點的設計也值得一看。

螞蟻 SOFA Channel 以前作過一個關於 Seata TCC Seata TCC 分享 的講解裏也提到,TCC 框架的難點不在於自己,而在於如何寫好一個 TCC 接口,若是對這部份內容感興趣,能夠點擊連接進行詳細瞭解。

寫在最後

這是一個不定時更新的、披着程序員外衣的文青小號。既分享極客技術,也記錄人間煙火。

相關文章
相關標籤/搜索