基於 XA 協議實現一個分佈式事務處理框架

引言

現目前處理分佈式事務的方案有不少,好比java

  • 基於 XA 協議的方案
  • 基於 TCC 協議方案
  • 基於 SAGA 協議的方案

而實現了對應的協議的有mysql

  • 在 Java 中基於 XA 協議實現的框架有 Atomikos,JOTM 等框架
  • TCC 協議的框架有 EasyTransaction、tcc-transaction 等框架
  • Sharding Sphere 能夠經過 SPI 注入 Atomikos 等來實現 XA 分佈式事務

對於 XA 協議來講,它使用的是 2PC 協議的方式,是阻塞式的,而且它還依賴於數據庫自身提供的 XA 接口的可靠性,對於大部分商業數據庫來講作的都還蠻不錯,在 Mysql 中只有 InnoDB 引擎支持 XA 分佈式事務。git

基於 XA 的分佈式事務方案更適用於單機應用中跨庫事務,它也能夠作到遠程調用事務支持github

XA 協議

XA 協議由 Tuxedo 首先提出的,並交給X/Open組織,這個組織隨即定義了一套分佈式事務的標準即 X/Open DTP(X/Open Distributed Transaction Processing Reference Model) 即一些列的接口各個廠商須要遵循這個標準來實現sql

在 DTP 模型中定義了五個組成元素數據庫

  • AP(Application Program):也就是應用程序,能夠理解爲使用 DTP 的程序
  • TM(Transaction Manager):事務管理器,負責協調和管理事務,提供給 AP 編程接口以及管理資源管理器
  • RM(Resource Manager):資源管理器,能夠理解爲一個 DBMS 系統,或者消息服務器管理系統,應用程序經過資源管理器對資源進行控制,資源必須實現 XA 定義的接口(好比 Mysql 就實現了對 XA 協議的支持)
  • 通訊資源管理器(Communication Resource Manager):它規定了,對於須要跨應用的分佈式事務,事務管理器彼此之間須要進行通訊,就須要經過這個組件來完成
  • 通訊協議(Communication Protocol)
    • XA 協議:應用或應用服務器與事務管理之間通訊的協議
    • TX 協議:全局事務管理器與資源管理器之間通訊的接口

其中在 DTP 中又定義瞭如下幾個概念編程

  • 全局事務:資源管理器會操做多個分支事務,根據分支事務的準備狀況決定是提交仍是回滾
  • 分支事務:一個全局事務中會有多個分支事務,統一由資源管理器來管理,好比購買商品這個方法,可能涉及到扣款事務方法,生成訂單事務方法,增長財務流水事務方法調用等
  • 控制線程:須要表示全局事務以及分支事務的關係

DTP 模型主要是經過 2PC 來控制事務管理器和資源管理之間的交互的api

  • 第一階段:準備階段,事務管理器詢問資源管理器分支事務是否正常執行完畢,資源管理返回是或者否
  • 第二階段:提交階段,事務管理器根據上一階段全部資源管理器的反饋結果,若是都是那麼提交事務,若是否一個失敗,那麼回滾事務

XA 優缺點

優勢:能夠作到對業務無侵入,可是對事務管理器和資源管理器要求比較高服務器

缺點:網絡

  • 同步阻塞:在二階段提交的過程當中直到 commit 結束爲止,全部節點都須要等到其它節點完成後纔可以釋放事務資源,這種同步阻塞極大的限制了分佈式系統的性能
    • 好比本來在一個購買商品的場景中,一個扣款服務能夠直接完成提交事務釋放鎖資源,如今須要等到訂單生成,帳戶流水記錄,積分增長扣減等操做完成後才能釋放
    • 而且若是說在這個操做中,資源管理器由於網絡等緣由沒有辦法收到事務管理器的指令,那麼它會一直處於阻塞狀態狀態而不釋放鎖資源
  • 單點問題:協調者(事務管理器)若是一旦出現故障那麼整個服務就會不可用,因此須要實現對應的協調者(資源管理器)選舉操做
  • 數據不一致:若是說全部資源管理器都反饋的準備好了,這時候進入 commit 階段,可是因爲網絡問題或者說某個資源管理器掛了,那麼就會存在一部分機器執行了事務,另一部分沒有執行致使數據不一致
    • 固然這種狀況若是說存在協調者選舉的話能夠必定程度的避免,好比說協調者同時向 A、B、C 三個服務器發出 commit,這時 A 和 B commit 成功,可是 C 在收到命令前就掛了,那麼這個時候 C 若是不須要恢復了就不須要管它了(之後啓用的時候能夠經過好比 mysql binlog 等同步數據)
    • 若是說 C 過一會就恢復了的話,協調者能夠保存以前提交的狀態,C 去詢問協調者該 commit 仍是 rollback,協調者就去檢查本身的記錄去看一下 A 和 B 上次是 commit 成功了仍是失敗了,而後給 C 發送指令去執行完成這個事務,最終讓數據保持一致
    • 可是說,若是說協調者再發送指令給 A、B、C 後它本身立馬就掛了,這個時候正好 C 正好執行完了操做後(多是 commit 多是 rollback)後也掛了,碰巧的是,協調者立刻選舉完成了而後 A 和 B 返回成功了,這個時候若是 A 和 B 都 commit 了,而 C rollback 了那麼就會形成數據不一致了(由於 C 已經執行完了這個事務,不像上一個場景沒有執行事務或者事務執行失敗,它能夠再執行或者回滾了)
  • 容錯性很差:若是說在提交詢問階段,參與者掛了那麼這個時候協調者就只能依靠超時機制來處理是否須要中斷事務

能夠看出 2PC 協議存在阻塞低效和數據不一致的問題,因此在大型應用須要較高的吞吐量的應用是不多使用這種方案的

JTA 事務規範

JTA(Java Transaction API)即 Java 事務 API 和 JTS(Java Transaction Service)即 Java 事務服務,他們爲 JAVA 平臺提供了分佈式事務服務,能夠把 JTA 理解爲是 XA 協議規範的 Java 版本。它也採用的是 DTP 模型

咱們知道 JTA 它只是一個規範,定義瞭如何去於實現了 XA 協議的資源管理器交互的接口,可是並無對應的實現,官方推薦 Atomikos 來實現 XA 分佈式事務,感謝的能夠直接取研究 Atomikos 源碼。

基於 XA 協議實現一個分佈式事務處理框架

能夠經過對 XA 協議的實際應用來加深咱們的理解,代碼以下

public static void test() {
        try {
            // 開啓全局事務
            transactionManager.begin();
            // 向服務器 A 數據庫寫入數據
            saveDB1();
            // 向服務器 B 數據庫寫入數據
            saveDB2();
            // 詢問 RM 分支事務是否準備就緒
            boolean prepareSuccess = transactionManager.prepare();
            // 目前沒有涉及到遠程事務的支持,在本地都是同步的方式調用因此此處沒有作作阻塞等待而是返回馬上知道是否成功
            // 若是涉及到遠程事務的支持,那麼此處應該就有一個阻塞喚醒機制
            if (prepareSuccess) {
                // 開始提交分支事務
                transactionManager.commit();
            } else {
                // 回滾
                transactionManager.rollback();
            }
        } catch (Exception e) {
            e.printStackTrace();
            // 若是出錯了就進行回滾各分支事務
            try {
                transactionManager.rollback();
            } catch (Exception e1) {
                e1.printStackTrace();
            }
        } finally {
            // 資源回收
            GlobalInfo.remove();
        }
    }
    
    private static void saveDB1() throws XAException, SQLException {
        // 由於存在多個數據源,因此須要指定是使用哪個數據源
        XAResourceManager xaResourceManager = RMUtil.getResourceManager(dbPool1);
        // 分支事務開啓
        xaResourceManager.begin();
        xaResourceManager.execute("insert into test1(name, age) values('pt', 21)");
        // 事務執行完畢處於準備階段等待 TM 下達 commit 指令
        xaResourceManager.prepare();
    }

    private static void saveDB2() throws XAException, SQLException {
        XAResourceManager xaResourceManager = RMUtil.getResourceManager(dbPool2);
        // 分支事務開啓
        xaResourceManager.begin();
        xaResourceManager.execute("insert into test2(name, age) values('tom', 22)");
        // 事務執行完畢處於準備階段等待 TM 下達 commit 指令
        xaResourceManager.prepare();
        // 測試回滾
        // throw new RuntimeException("xx");
    }
複製代碼

包結構

  • db:中存放了數據庫鏈接池的實現,能夠實現多數據源的切換
  • rm:中存放了資源管理器的實現,主要是針對分支事務的處理與隔離
  • tm:中存放了事務管理器的實現,它主要是去操做 rm 包來完成全局事務的提交或者回滾

代碼放在了 github 上面感興趣的能夠了解一下

github 代碼連接

參考:

相關文章
相關標籤/搜索