做者 | 朱晉君 來源 | 阿里巴巴雲原生公衆號html
XA 協議是由 X/Open 組織提出的分佈式事務處理規範,主要定義了事務管理器 TM 和局部資源管理器 RM 之間的接口。目前主流的數據庫,好比 oracle、DB2 都是支持 XA 協議的。java
mysql 從 5.0 版本開始,innoDB 存儲引擎已經支持 XA 協議,今天的源碼介紹實驗環境使用的是 mysql 數據庫。mysql
兩階段提交
分佈式事務的兩階段提交是把整個事務提交分爲 prepare 和 commit 兩個階段。以電商系統爲例,分佈式系統中有訂單、帳戶和庫存三個服務,以下圖:git
第一階段,事務協調者向事務參與者發送 prepare 請求,事務參與者收到請求後,若是能夠提交事務,回覆 yes,不然回覆 no。github
第二階段,若是全部事務參與者都回復了 yes,事務協調者向全部事務參與者發送 commit 請求,不然發送 rollback 請求。spring
兩階段提交存在三個問題:sql
- 同步阻塞,本地事務在 prepare 階段鎖定資源,若是有其餘事務也要修改 xiaoming 這個帳戶,就必須等待前面的事務完成。這樣就形成了系統性能降低。
- 協調節點單點故障,若是第一個階段 prepare 成功了,可是第二個階段協調節點發出 commit 指令以前宕機了,全部服務的數據資源處於鎖定狀態,事務將無限期地等待。
- 數據不一致,若是第一階段 prepare 成功了,可是第二階段協調節點向某個節點發送 commit 命令時失敗,就會致使數據不一致。
三階段提交
爲了解決兩階段提交的問題,三階段提交作了改進:數據庫
- 在協調節點和事務參與者都引入了超時機制。
- 第一階段的 prepare 階段分紅了兩步,canCommi 和 preCommit。
以下圖:oracle
引入 preCommit 階段後,協調節點會在 commit 以前再次檢查各個事務參與者的狀態,保證它們的狀態是一致的。可是也存在問題,那就是若是第三階段發出 rollback 請求,有的節點沒有收到,那沒有收到的節點會在超時以後進行提交,形成數據不一致。app
XA 事務語法介紹
xa 事務的語法以下:
- 三階段的第一階段:開啓 xa 事務,這裏 xid 爲全局事務 id:
XA {START|BEGIN} xid [JOIN|RESUME]
結束 xa 事務:
XA END xid [SUSPEND [FOR MIGRATE]]
- 三階段的第二階段,即 prepare:
XA PREPARE xid
- 三階段的第三階段,即 commit/rollback:
XA COMMIT xid [ONE PHASE] XA ROLLBACK xid
- 查看處於 PREPARE 階段的全部事務:
XA RECOVER XA RECOVER [CONVERT XID]
seata XA 簡介
seata 是阿里推出的一款開源分佈式事務解決方案,目前有 AT、TCC、SAGA、XA 四種模式。
seata 的 XA 模式是利用分支事務中數據庫對 XA 協議的支持來實現的。咱們看一下 seata 官網的介紹:[1]
從上面的圖能夠看到,seata XA 模式的流程跟其餘模式同樣:
- TM 開啓全局事務
- RM 向 TC 註冊分支事務
- RM 向 TC 報告分支事務狀態
- TC 向 RM 發送 commit/rollback 請求
- TM 結束全局事務
這裏介紹一下 RM 客戶端初始化關聯的 UML 類圖:[2]
這個圖中有一個類是 AbstractNettyRemotingClient,這個類的內部類 ClientHandler 來處理 TC 發來的請求並委託給父類 AbstractNettyRemoting 的 processMessage 方法來處理。processMessage 方法調用 RmBranchCommitProcessor 類的 process 方法。
須要注意的是,「seata 的 xa 模式對傳統的三階段提交作了優化,改爲了兩階段提交」:
- 第一階段首執行 XA 開啓、執行 sql、XA 結束三個步驟,以後直接執行 XA prepare。
- 第二階段執行 XA commit/rollback。
mysql 目前是支持 seata xa 模式的兩階段優化的。
「可是這個優化對 oracle 不支持,由於 oracle 實現的是標準的 xa 協議,即 xa end 後,協調節點向事務參與者統一發送 prepare,最後再發送 commit/rollback。這也致使了 seata 的 xa 模式對 oracle 支持不太好。」
seata XA 源碼
seata 中的 XA 模式是使用數據源代理來實現的,須要手動配置數據源代理,代碼以下:
@Bean @ConfigurationProperties(prefix = "spring.datasource") public DruidDataSource druidDataSource() { return new DruidDataSource(); } @Bean("dataSourceProxy") public DataSource dataSource(DruidDataSource druidDataSource) { return new DataSourceProxyXA(druidDataSource); }
- 也能夠根據普通 DataSource 來建立 XAConnection,可是這種方式有兼容性問題(好比 oracle),因此 seata 使用了開發者本身配置 XADataSource。
- seata 提供的 XA 數據源代理,要求代碼框架中必須使用 druid 鏈接池。
1. XA 第一階段
當 RM 收到 DML 請求後,seata 會使用 ExecuteTemplateXA來執行,執行方法 execute 中有一個地方很關鍵,就是把 autocommit 屬性改成了 false,而 mysql 默認 autocommit 是 true。事務提交以後,還要把 autocommit 改回默認。
下面咱們看一下 XA 第一階段提交的主要代碼。
1)開啓 XA
上面代碼標註[1]處,調用了 ConnectionProxyXA 類的 setAutoCommit 方法,這個方法的源代碼中,XA start 主要作了三件事:
- 向 TC 註冊分支事務
- 調用數據源的 XA Start
xaResource.start(this.xaBranchXid, XAResource.TMNOFLAGS);
- 把 xaActive 設置爲 true
RM 並無直接使用 TC 返回的 branchId 做爲 xa 數據源的 branchId,而是使用全局事務 id(xid) 和 branchId 從新構建了一個。
2)執行 sql
調用 PreparedStatementProxyXA 的 execute 執行 sql。
3)XA end/prepare
public void commit() throws SQLException { //省略部分源代碼 try { // XA End: Success xaResource.end(xaBranchXid, XAResource.TMSUCCESS); // XA Prepare xaResource.prepare(xaBranchXid); // Keep the Connection if necessary keepIfNecessary(); } catch (XAException xe) { try { // Branch Report to TC: Failed DefaultResourceManager.get().branchReport(BranchType.XA, xid, xaBranchXid.getBranchId(), BranchStatus.PhaseOne_Failed, null); } catch (TransactionException te) { //這兒只打印了一個warn級別的日誌 } throw new SQLException( "Failed to end(TMSUCCESS)/prepare xa branch on " + xid + "-" + xaBranchXid.getBranchId() + " since " + xe .getMessage(), xe); } finally { cleanXABranchContext(); } }
從這個源碼咱們看到,commit 主要作了三件事:
- 調用數據源的 XA end
- 調用數據源的 XA prepare
- 向 TC 報告分支事務狀態
到這裏咱們就能夠看到,seata 把 xa 協議的前兩個階段合成了一個階段。
2. XA commit
這裏的調用關係用一個時序圖來表示:
看一下 RmBranchCommitProcessor 類的 process 方法,代碼以下:
@Override public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception { String remoteAddress = NetUtil.toStringAddress(ctx.channel().remoteAddress()); Object msg = rpcMessage.getBody(); if (LOGGER.isInfoEnabled()) { LOGGER.info("rm client handle branch commit process:" + msg); } handleBranchCommit(rpcMessage, remoteAddress, (BranchCommitRequest) msg); }
從調用關係時序圖能夠看出,上面的 handleBranchCommit 方法最終調用了 AbstractRMHandler 的 handle 方法,最後經過 branchCommit 方法調用了 ResourceManagerXA 類的 finishBranch 方法。 ResourceManagerXA 類是 XA 模式的資源管理器,看下面這個類圖,也就是 seata 中資源管理器(RM)的 UML 類圖:
上面的 finishBranch 方法調用了 connectionProxyXA.xaCommit 方法,咱們最後看一下 xaCommit 方法:
public void xaCommit(String xid, long branchId, String applicationData) throws XAException { XAXid xaXid = XAXidBuilder.build(xid, branchId); //由於使用mysql,這裏xaResource是MysqlXAConnection xaResource.commit(xaXid, false); releaseIfNecessary(); }
上面調用了數據源的 commit 方法,提交了 RM 分支事務。
到這裏,整個 RM 分支事務就結束了。Rollback 的代碼邏輯跟 commit 相似。
最後要說明的是,上面的 xaResource,是 mysql-connector-java.jar 包中的 MysqlXAConnection 類實例,它封裝了 mysql 提供的 XA 協議接口。
總結
seata 中 XA 模式的實現是使用數據源代理完成的,底層使用了數據庫對 XA 協議的原生支持。
mysql 的 java 驅動庫中,MysqlXAConnection 類封裝類 XA 協議的底層接口供外部調用。
跟 TCC 和 SAGA 模式須要在業務代碼中實現 prepare/commit/rollback 邏輯相比,XA 模式對業務代碼無侵入。
Reference
[1]:http://seata.io/zh-cn/docs/overview/what-is-seata.html [2]:https://github.com/seata/seata