JTA 使用 MySQL 分佈式事務

假定在MySQL實例1上有表mysql

create table person(
    id int, 
    name varchar(32)
)

MySQL實例2上也有一張一樣的表,如今從實例1中的 person 表中刪除一條數據,並把這條數據插入到實例2的表中,這兩個操做在同一個事務中,由於跨越了數據庫實例,涉及到了分佈式事務。sql

MySQL實現了分佈式事務,查看數據庫是否啓用了 XA 事務:數據庫

show variables like 'innodb_support_xa';

MySQL 關於xa的命令:分佈式

xa start 'a';
sql 語句;
xa end 'a';
xa prepare 'a';
xa commit 'a';

與正常事務相比,XA 命令多了 prepare,詢問是否準備好,事務管理器根據 prepare 返回的結果進行操做。this

以上命令是分佈式事務的操做方法,在一個命令行中輸入上述命令,並非真實的分佈式事務。可使用 JTA 來控制MySQL的 XA:url

public class JTA_MySQL {

    public static void main(String[] args) {
        XADataSource xaDs1 = JTA_MySQL.getDataSource(
                "jdbc:mysql://172.30.60.126:3306/db_zhang", "root",
                "root");
        XAConnection xaCon1 = null;
        XAResource xaRes1 = null;
        Connection conn1 = null;
        Statement stmt1 = null;

        XADataSource xaDs2 = JTA_MySQL.getDataSource(
                "jdbc:mysql://172.30.60.124:3306/db_zhang", "root",
                "root");
        XAConnection xaCon2 = null;
        XAResource xaRes2 = null;
        Connection conn2 = null;
        Statement stmt2 = null;

        int ret1 = 0;
        int ret2 = 0;

        Xid xid1 = new MyXid(100, new byte[] { 0x01 }, new byte[] { 0x02 });
        Xid xid2 = new MyXid(100, new byte[] { 0x01 }, new byte[] { 0x03 });
        try {
            xaCon1 = getXAConnetion(xaDs1);
            conn1 = getConnection(xaCon1);
            stmt1 = conn1.createStatement();
            xaRes1 = xaCon1.getXAResource();

            xaCon2 = getXAConnetion(xaDs2);
            conn2 = getConnection(xaCon2);
            stmt2 = conn2.createStatement();
            xaRes2 = xaCon2.getXAResource();

            xaRes1.start(xid1, XAResource.TMNOFLAGS);
            stmt1.execute("delete from person where id=1");
            xaRes1.end(xid1, XAResource.TMSUCCESS);
            
            xaRes2.start(xid2, XAResource.TMNOFLAGS);
            stmt2.execute("insert into person select 1, 'zhang'");
            xaRes2.end(xid2, XAResource.TMSUCCESS);
            
            ret1 = xaRes1.prepare(xid1);
            ret2 = xaRes2.prepare(xid2);
        
            if (XAResource.XA_OK == ret1 && XAResource.XA_OK == ret2) {
                xaRes1.commit(xid1, false);
                xaRes2.commit(xid2, false);
                System.out.println("提交分佈式事務");
            } else {
                xaRes1.rollback(xid1);
                xaRes2.rollback(xid2);
                System.out.println("回退分佈式事務");
            }
        } catch (SQLException e) {
            e.printStackTrace();
        } catch (XAException e) {
            e.printStackTrace();
        }
    }

    private static XADataSource getDataSource(String url, String user,
            String password) {
        MysqlXADataSource dataSource = new MysqlXADataSource();
        dataSource.setUrl(url);
        dataSource.setUser(user);
        dataSource.setPassword(password);
        return dataSource;
    }

    public static XAConnection getXAConnetion(XADataSource dataSource) {
        XAConnection XAConn = null;
        try {
            XAConn = dataSource.getXAConnection();
        } catch (SQLException e) {
            e.printStackTrace();
        }
        return XAConn;
    }

    public static Connection getConnection(XAConnection XAConn) {
        Connection conn = null;
        try {
            conn = XAConn.getConnection();
        } catch (SQLException e) {
            e.printStackTrace();
        }
        return conn;
    }

    public static void closeConnection(Connection conn) {
        try {
            conn.close();
        } catch (SQLException e) {
            System.out.println("鏈接關閉失敗");
        }
    }
}

 MyXid 類:spa

public class MyXid implements Xid {
    private int formatId;
    private byte[] globalTid;
    private byte[] branchQ;
    
    public MyXid(int formatId, byte[] globalTid, byte[] branchQ) {
        this.formatId = formatId;
        this.globalTid = globalTid;
        this.branchQ = branchQ;
    }
    
    public byte[] getBranchQualifier() {
        return this.branchQ;
    }

    public int getFormatId() {
        return formatId;
    }

    public byte[] getGlobalTransactionId() {
        return this.globalTid;
    }
}
相關文章
相關標籤/搜索