TCC 開源項目源碼學習(一)

TCC 開源項目源碼學習(一)

學習TCC分佈式事務的知識是使用了GIT上的一個開源項目,以前有簡單的看過一些,有了一個大概的瞭解,可是隨着時間的‘清洗’,又開始變得‘渾濁不清’了,此次索性把這份源碼從頭看了下,又把流程推演了好幾回,以爲已經懂了,想把理解的東西經過博客寫出來。在這個過程當中,再次梳理一遍,加深理解,同時也能夠發現一些理解誤差和錯誤的地方。
GIT: https://github.com/1755728288...java

概要

在分佈式系統中,爲了保證執行指令的正確性,引入了事務的概念。通常意義上,事務主要突出的是ACID,即原子性,一致性,隔離性和持久性。而在分佈式事務中,最主要的原子性的保證,要保證一個業務操做在其分佈式系統中的全部指令所有執行或不執行。由於各個指令的操做耗時以及順序,因此原子性都對應了必定的時間窗口,好比單機系統下,這個時間窗口很是短,並且其原子性也由數據庫事務來保證。而在分佈式系統中,原子性就依賴於具體的應用設計了,主要的依據是業務上容許的時間窗口的長短。目前通常來講,若容許的時間窗口較短,就用TCC,若容許的時間窗口較長,則使用最終一致性。最終一致性一般使用消息機制來設計,其核心是消息的安全送達與消費。git

TCC能夠說是一種設計模式,將傳統單機系統中的大事務進行拆分,經過小事務來拼裝,並協調總體的事務提交和回滾。按字面意思理解,TCC主要分Try,Confirm,Cancel三個階段,Try預留資源,Confirm使用預留資源執行業務操做,Cancel則是釋放資源。貌似簡單,可是在實際的業務場景中,每每會困惑咱們在這三個階段分別要作什麼,這個是業務設計的一部份內容了,在此不展開敘述。不過要注意的一點時,任何一個階段的結果都是可見的,好比一個庫存子服務的入庫方法,在try階段就直接加到庫存上去了,回滾的時候發現剛加上的庫存已經有買家下單準備出庫了,那就GG了。github

代碼結構

先放個圖,這是項目的組件,上面綠色區域是框架的子模塊,下面黃色的是樣例模塊。不要被黃色區域嚇到,TCC的代碼很少,主要是tcc-transaction-core.
圖片描述spring

組件名 說明
tcc-transaction-core 【重要】核心代碼,主要的切面和job
tcc-transaction-api 註解和常量
tcc-transaction-spring spring使用的擴展
tcc-transaction-dubbo dubbo使用時的擴展
tcc-transaction-server tcc事務管理控制檯
tcc-transaction-unit-test 單元測試
tcc-transaction-tutorial-sample 訂單場景示例工程,分dubbo和spring版本

因此重點只有一個模塊,就是tcc-transaction-core,代碼量很少,主要分紅下面的模塊數據庫

模塊名 說明
interceptor 【重要】2個切面,一個是根據事務狀態進行tcc階段方法的選擇,一個是組織事務的參與者,用XID將各個參與者綁起來,以便事務的提交和回滾
recover 恢復在同步調用失敗事務的定時處理
repository 事務對象持久DAO對象,提供了多種選擇:緩存,文件系統,jdbc,zookeeper
context 傳遞事務上下文的方法類,通常會在遠程方法調用的切面中,將上下文加入到參數列表中
common 方法枚舉和事務枚舉
serializer 事務對象的序列化器,使用了kyto序列化工具
support 實現了工廠類,管理TCC的類的實例化
utils 工具類

最重要的是interceptor,是主要業務邏輯所在。設計模式

題外話,有人初看了一遍TCC,拿來和數據庫事務來比較,會說‘TCC不就是業務補償麼,沒什麼難點啊’,貌似有道理,其實否則。還記得數據庫事務的ACID嗎?由於數據庫提供了事務特性,ACID由數據庫保證,應用只須要一個簡單的@Transactional註解就都搞定了。而分佈式事務,就把ACID的特性從數據庫裏面拿了出來,由應用程序來保證。固然ACID也有了和以前不同的含義。api

特性 數據庫 分佈式事務
A[Atomicity] 全部數據庫更新一塊兒提交和回滾,數據庫經過日誌來實現 經過協調各個事務的參與方,經過框架來處理異常
C[Consistncy] 經過數據庫約束來實現,好比非空,惟一,外鍵等
I[Isolation] 各個數據庫實現方式不同,主要分讀已提交,讀未提交等 TCC經過try階段鎖定資源來實現隔離,即業務資源的隔離
D[Durability] 數據庫實現 數據庫實現

工做流程

以項目中的例子來講明整個TCC的處理流程,這個例子是相似電商下單的場景,在支付的時候能夠選擇紅包或本金,好比一個筆記本4000,能夠選擇使用3000的本金和1000的紅包來支付,有三個服務:訂單服務(order),本金服務(capital)和紅包服務(redPacket)。主調方是Order,被調方Capital和RedPacket.緩存

下面是絞盡腦汁畫的調用流程圖,爲了圖的簡便直觀,省去了cancel的處理,只有try和confirm的調用(cancel的處理和confirm基本一致)。紅線是try的調用,藍線是confirm的調用。粗紅線是開始,粗藍線是結束。
在try階段的makePayment,方法調用了兩個子事務的api方法,其餘的方法的調用均是全局事務切面決定的。業務方法只要按TCC框架的要求實現便可,其餘的交給TCC框架。
圖片描述安全

主要代碼分析

咱們從TCC事務註解開始,先介紹幾個基本的類。框架

/**
屬性主要是事務傳播屬性,提交方法,回滾方法,上下文傳遞類,是否異步提交,是否異步回滾
propagation屬性比較重要,值有required,support,mandatory和requireNEW,具體的含義和spring的事務傳播相似。
**/
public @interface Compensable {
    public Propagation propagation() default Propagation.REQUIRED;
    public String confirmMethod() default "";
    public String cancelMethod() default "";
    public Class<? extends TransactionContextEditor> transactionContextEditor() default DefaultTransactionContextEditor.class;
    public boolean asyncConfirm() default false;
    public boolean asyncCancel() default false;
/**
事務狀態,在TCC的不通階段進行事務狀態的變動。
**/
public enum TransactionStatus {
    TRYING(1), CONFIRMING(2), CANCELLING(3);
/**
    ROOT:事務啓動方法,好比例子中order的makePayment方法
    PROVIDER:事務協同方法,好比例子中redpacket的record方法
    Normal:普通方法,好比API裏面的record方法
**/
public enum  MethodType {
    ROOT, PROVIDER, NORMAL;
}
/**
    ROOT:主事務,通常MethodType爲ROOT的啓動
    BRANCH:分支事務
**/
public enum TransactionType {
    ROOT(1),
    BRANCH(2);

下面咱們看一下事務持久的內容,能更好的幫助理解。每一個服務都會有一張事務表,會記錄全部的ROOT和BRANCH事務,在事務完成以後,會自動刪除。

TRANSACTION_ID DOMAIN GLOBAL_TX_ID BRANCH_QUALIFIER CONTENT STATUS TRANSACTION_TYPE RETRIED_COUNT VERSION
2 ORDER 事務編號001 分支標識A 事務對象A 2 1 0 11
2 CAPITAL 事務編號001 分支標識B 事務對象B 2 2 0 11
2 REDPACKET 事務編號001 分支標識C 事務對象C 2 2 0 11
domain區分了服務
global_tx_id 串起了全部的事務
transaction_type 1爲主事務,2爲分支事務
content 用於恢復事務,主事務的content中包含了參與者

代碼部分的最後是兩個切面和恢復的job,不進行特別的解釋了,將個人理解放在代碼的註釋裏面。

//CompensableTransactionInterceptor.java
    public Object interceptCompensableMethod(ProceedingJoinPoint pjp) throws Throwable {
        //獲取所執行事務方法的設置屬性
        Method method = CompensableMethodUtils.getCompensableMethod(pjp);
        Compensable compensable = method.getAnnotation(Compensable.class);
        Propagation propagation = compensable.propagation();
        TransactionContext transactionContext = FactoryBuilder.factoryOf(compensable.transactionContextEditor()).getInstance().get(pjp.getTarget(), method, pjp.getArgs());
        boolean asyncConfirm = compensable.asyncConfirm();
        boolean asyncCancel = compensable.asyncCancel();
        
        //當前是否存在事務,
        boolean isTransactionActive = transactionManager.isTransactionActive();

        /**
         * 判斷方法類型
         * MethodType一共有三種,Root,Provider,Normal. 主要經過propagation和isTransactionActive來肯定
         */
        MethodType methodType = CompensableMethodUtils.calculateMethodType(propagation, isTransactionActive, transactionContext);
        switch (methodType) {
            case ROOT:
            //ROOT:(1)propagation爲require_new (2)propataion爲require,且以前沒有有事務存在
            //主事務處理方法
                return rootMethodProceed(pjp, asyncConfirm, asyncCancel);
            case PROVIDER:
            //PROVIDER:(1) propation爲require或者mandatory,且以前有事務
            //從事務處理方法
                return providerMethodProceed(pjp, transactionContext, asyncConfirm, asyncCancel);
            default:
                //普通方法執行
                return pjp.proceed();
        }
    }
    
    private Object rootMethodProceed(ProceedingJoinPoint pjp, boolean asyncConfirm, boolean asyncCancel) throws Throwable {
        Object returnValue = null;
        Transaction transaction = null;
        try {
            //開啓新的主事務,使用新生成的xid,狀態爲trying,事務類型爲ROOT
            transaction = transactionManager.begin();
            try {
                returnValue = pjp.proceed();
            } catch (Throwable tryingException) {
                if (!isDelayCancelException(tryingException)) {
                    logger.warn(String.format("compensable transaction trying failed. transaction content:%s", JSON.toJSONString(transaction)), tryingException);
                    //異常回滾,將事務狀態更新爲CANCELLING
                    transactionManager.rollback(asyncCancel);
                }
                throw tryingException;
            }
            //執行confirm方法,將事務狀態更新爲CONFIRMING。若是是異步,則TM會使用線程池異步執行,不然直接調用,會協調全部的參與者進行提交。並將事務記錄刪除
            transactionManager.commit(asyncConfirm);
        } finally {
            //清理事務數據
           transactionManager.cleanAfterCompletion(transaction);
        }
        return returnValue;
    }

    private Object providerMethodProceed(ProceedingJoinPoint pjp, TransactionContext transactionContext, boolean asyncConfirm, boolean asyncCancel) throws Throwable {
        Transaction transaction = null;
        try {
            switch (TransactionStatus.valueOf(transactionContext.getStatus())) {
                case TRYING:
                    //開啓一個子事務,並調用TCC的try方法
                    transaction = transactionManager.propagationNewBegin(transactionContext);
                    return pjp.proceed();
                case CONFIRMING:
                    //獲取子事務TRY階段的事務,並調用TCC的commit方法
                    try {
                        transaction = transactionManager.propagationExistBegin(transactionContext);
                          transactionManager.commit(asyncConfirm);
                    } catch (NoExistedTransactionException excepton) {
                        //the transaction has been commit,ignore it.
                    }
                    break;
                case CANCELLING:
                    //獲取子事務保存的事務數據,執行cancel方法
                    
                    try {
                        transaction = transactionManager.propagationExistBegin(transactionContext);
                        transactionManager.rollback(asyncCancel);
                    } catch (NoExistedTransactionException exception) {
                        //the transaction has been rollback,ignore it.
                    }
                    break;
            }

        } finally {
            //清理事務數據
           transactionManager.cleanAfterCompletion(transaction);
        }
        Method method = ((MethodSignature) (pjp.getSignature())).getMethod();

        return ReflectionUtils.getNullValue(method.getReturnType());
    }

    private boolean isDelayCancelException(Throwable throwable) {

        if (delayCancelExceptions != null) {
            for (Class delayCancelException : delayCancelExceptions) {

                Throwable rootCause = ExceptionUtils.getRootCause(throwable);

                if (delayCancelException.isAssignableFrom(throwable.getClass())
                        || (rootCause != null && delayCancelException.isAssignableFrom(rootCause.getClass()))) {
                    return true;
                }
            }
        }

        return false;
    }

總結

代碼分析的比較少,這份代碼仍是有不少值得稱道的地方,工廠類,緩存,模板方法等設計模式的使用,下次能夠從設計模式的角度來進行分析。除了TCC,最近項目中還涉及了安全消息,等弄清楚了再來一發。

相關文章
相關標籤/搜索