分佈式事務(三)mysql對XA協議的支持

系列目錄

分佈式事務(一)原理概覽html

分佈式事務(二)JTA規範java

分佈式事務(三)mysql對XA協議的支持mysql

分佈式事務(四)簡單樣例sql

分佈式事務(五)源碼詳解數據庫

分佈式事務(六)總結提升api

引子

從Mysql5開始,innoDB引擎支持XA協議的分佈式事務。DTP模型中,一個TM(事務管理器管理)管理多個RM(資源管理器),每一個RM維護本身的事務分支。在看源碼以前咱們看一下底層DB mysql對XA事務的支持。分佈式

1. XA語法

官網:13.3.8.1 XA Transaction SQL Syntaxpost

 1 XA {START|BEGIN} xid [JOIN|RESUME] 開啓XA事務,使用begin才能使用join/resume,start不支持  2  3 XA END xid [SUSPEND [FOR MIGRATE]] 不支持SUSPEND [FOR MIGRATE]  4  5 XA PREPARE xid 二階段提交的準備階段  6  7 XA COMMIT xid [ONE PHASE] 二階段提交的提交階段,ONE PHASE表明一階段提交,若是隻有一個rm參與者,那麼二階段提交優化爲一階段提交  8  9 XA ROLLBACK xid 回滾 10 11 XA RECOVER [CONVERT XID] 列出全部處於prepared狀態的事務

上面的語法中都有xid官方解釋以下:測試

xid: gtrid [, bqual [, formatID ]]優化

其中,

gtrid:全局事務ID,不得超過64,建議使用十六進制數。

bqual:分支限定符(branch qualifier),若是沒有提供bqual,那麼默認值爲空字符串'',長度不超過64,建議使用十六進制數。

formatID:是一個無符號整數,用於標記gtrid和bqual值的格式,默認爲1,長度不超過64.

對應java接口:

 1 public interface Xid {  2 int MAXGTRIDSIZE = 64;  3 int MAXBQUALSIZE = 64;  4  5 int getFormatId();  6  7 byte[] getGlobalTransactionId();  8  9 byte[] getBranchQualifier(); 10 }

2. XA狀態

官網:13.3.8.2 XA Transaction States

XA事務的狀態,按照以下步驟進行展開

1.    使用XA START來啓動一個XA事務,並把它置於ACTIVE狀態。

2.    對於一個ACTIVE狀態的 XA事務,咱們能夠執行構成事務的SQL語句,而後發佈一個XA END語句。XA END使事務進入IDLE狀態。

3.    對於一個IDLE 狀態XA事務,能夠執行一個XA PREPARE語句或一個XA COMMIT…ONE PHASE語句:

  • XA PREPARE把事務放入PREPARED狀態。在此點上的XA RECOVER語句將在其輸出中包括事務的xid值,由於XA RECOVER會列出處於PREPARED狀態的全部XA事務。

  • XA COMMIT…ONE PHASE(優化成一階段提交)用於預備和提交事務。xid值將不會被XA RECOVER列出,由於事務終止。

4.    對於一個PREPARED狀態的 XA事務,執行XA COMMIT語句來提交和終止事務,或者執行XA ROLLBACK回滾並終止事務。 

 

 注意:

同一個客戶端數據庫鏈接,XA事務和非XA事務(即本地事務)是互斥的。例如,已經執行了」XA START」命令來開啓一個XA事務,則本地事務不會被啓動,直到XA事務已經被提交或被 回滾爲止。相反的,若是已經使用START TRANSACTION啓動一個本地事務,則XA語句不能被使用,直到該事務被提交或被 回滾爲止。

3. 測試

 1 package study.xa;
 2 
 3 import com.mysql.jdbc.jdbc2.optional.MysqlXAConnection;
 4 import com.mysql.jdbc.jdbc2.optional.MysqlXid;
 5 
 6 import javax.sql.XAConnection;
 7 import javax.transaction.xa.XAException;
 8 import javax.transaction.xa.XAResource;
 9 import javax.transaction.xa.Xid;
10 import java.sql.Connection;
11 import java.sql.DriverManager;
12 import java.sql.PreparedStatement;
13 import java.sql.SQLException;
14 
15 /***
16  * @Description mysql分佈式事務XAConnection模擬
17  * @author denny
18  * @date 2019/4/3 上午9:15
19  */
20 public class MysqlXaConnectionTest {
21 
22     public static void main(String[] args) throws SQLException {
23         //true表示打印XA語句,,用於調試
24         boolean logXaCommands = true;
25         // 得到資源管理器操做接口實例 RM1
26         Connection conn1 = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "12345");
27         XAConnection xaConn1 = new MysqlXAConnection((com.mysql.jdbc.Connection)conn1, logXaCommands);
28         XAResource rm1 = xaConn1.getXAResource();
29 
30         // 得到資源管理器操做接口實例 RM2
31         Connection conn2 = DriverManager.getConnection("jdbc:mysql://localhost:3306/test2", "root", "12345");
32         XAConnection xaConn2 = new MysqlXAConnection((com.mysql.jdbc.Connection)conn2, logXaCommands);
33         XAResource rm2 = xaConn2.getXAResource();
34         // AP請求TM執行一個分佈式事務,TM生成全局事務id
35         byte[] gtrid = "g12345".getBytes();
36         int formatId = 1;
37         try {
38             // ==============分別執行RM1和RM2上的事務分支====================
39             // TM生成rm1上的事務分支id
40             byte[] bqual1 = "b00001".getBytes();
41             Xid xid1 = new MysqlXid(gtrid, bqual1, formatId);
42             // 執行rm1上的事務分支 One of TMNOFLAGS, TMJOIN, or TMRESUME.
43             rm1.start(xid1, XAResource.TMNOFLAGS);
44             // 業務1:插入user表
45             PreparedStatement ps1 = conn1.prepareStatement("INSERT into user VALUES ('99', 'user99')");
46             ps1.execute();
47             rm1.end(xid1, XAResource.TMSUCCESS);
48 
49             // TM生成rm2上的事務分支id
50             byte[] bqual2 = "b00002".getBytes();
51             Xid xid2 = new MysqlXid(gtrid, bqual2, formatId);
52             // 執行rm2上的事務分支
53             rm2.start(xid2, XAResource.TMNOFLAGS);
54             // 業務2:插入user_msg表
55             PreparedStatement ps2 = conn2.prepareStatement("INSERT into user_msg VALUES ('88', '99', 'user99的備註')");
56             ps2.execute();
57             rm2.end(xid2, XAResource.TMSUCCESS);
58 
59             // ===================兩階段提交================================
60             // phase1:詢問全部的RM 準備提交事務分支
61             int rm1Prepare = rm1.prepare(xid1);
62             int rm2Prepare = rm2.prepare(xid2);
63             // phase2:提交全部事務分支
64             boolean onePhase = false;
65             //TM判斷有2個事務分支,因此不能優化爲一階段提交
66             if (rm1Prepare == XAResource.XA_OK
67                 && rm2Prepare == XAResource.XA_OK
68                 ) {
69                 //全部事務分支都prepare成功,提交全部事務分支
70                 rm1.commit(xid1, onePhase);
71                 rm2.commit(xid2, onePhase);
72             } else {
73                 //若是有事務分支沒有成功,則回滾
74                 rm1.rollback(xid1);
75                 rm1.rollback(xid2);
76             }
77         } catch (XAException e) {
78             // 若是出現異常,也要進行回滾
79             e.printStackTrace();
80         }
81     }
82 }

打印日誌:

1 Tue Jun 04 17:08:18 CST 2019 DEBUG: Executing XA statement: XA START 0x673132333435,0x623030303031,0x1
2 Tue Jun 04 17:08:18 CST 2019 DEBUG: Executing XA statement: XA END 0x673132333435,0x623030303031,0x1
3 Tue Jun 04 17:08:18 CST 2019 DEBUG: Executing XA statement: XA START 0x673132333435,0x623030303032,0x1
4 Tue Jun 04 17:08:18 CST 2019 DEBUG: Executing XA statement: XA END 0x673132333435,0x623030303032,0x1
5 Tue Jun 04 17:08:18 CST 2019 DEBUG: Executing XA statement: XA PREPARE 0x673132333435,0x623030303031,0x1
6 Tue Jun 04 17:08:18 CST 2019 DEBUG: Executing XA statement: XA PREPARE 0x673132333435,0x623030303032,0x1
7 Tue Jun 04 17:08:18 CST 2019 DEBUG: Executing XA statement: XA COMMIT 0x673132333435,0x623030303031,0x1
8 Tue Jun 04 17:08:18 CST 2019 DEBUG: Executing XA statement: XA COMMIT 0x673132333435,0x623030303032,0x1

 

 

=====參考======

http://www.tianshouzhi.com/api/tutorials/distributed_transaction/384

相關文章
相關標籤/搜索