JTA(Java Transaction API)容許應用程序執行分佈式事務處理--在兩個或多個網絡計算機資源上訪問而且更新數據。JDBC驅動程序的JTA支持極大地加強了數據訪問能力。
本文的目的是要提供一個關於的Java事務處理API(JTA)的高級的概述,以及與分佈式事務相關的內容。一個事務處理定義了一個工做邏輯單元,要麼完全成功要麼不產生任何結果。 一個分佈式事務處理只是一個在兩個或更多網絡資源上訪問和更新數據的事務處理,所以它在那些資源之間必然是等價的。在本文中,咱們主要關心的是如何處理關係數據庫系統。
咱們要討論的分佈式事務處理(DTP)模型中包含的組件是:
應用程序
應用程序服務器
事務管理程序
資源適配器
資源管理程序
在之後的內容中,咱們將描述這些組件以及它們與JTA和數據庫訪問的關係。
訪問數據庫
最好把分佈式事務處理中包含的組件看做是獨立的過程,而不是考慮它們在一個特定的電腦中的位置。這些組件中的一些能夠保存在單機中,或者也可在好幾臺機器之間分佈。 下面例子中的圖表能夠顯示在一臺特定的電腦上的組件,可是這些操做之間的關係是必須首要考慮的。
最簡單的例子:用於本地數據庫事務處理的應用程序
關係數據庫訪問的最簡單的形式僅僅包括應用程序、資源管理程序和資源適配器。應用程序只不過是發送請求到數據庫而且從數據庫中獲取數據的最終用戶訪問點
咱們討論的資源管理程序是一個關係數據庫管理系統(RDBMS),好比Oracle或者SQL Server。全部的實際數據庫管理都是由這個組件處理的。
資源適配器是外部空間之間的通訊管道組件,或者是請求翻譯器,在本例中,是應用程序和資源管理程序。在咱們的討論中,這是一個JDBC驅動程序。
下面的描述是資源管理程序本地事務處理的一個描述,也就是說,一個事務處理被被限制在一個特定的企業數據庫。
java
應用程序發送一個用於JDBC驅動程序數據的請求,而後翻譯這個請求並把它經過網絡發送到數據庫中。 數據庫把數據發送回驅動程序,而後把翻譯的結果發送迴應用程序,以下圖所示:sql
這個例子說明了在一個簡化的系統中的基本的信息流;然而,今天的企業使用的應用程序服務器都添加了其餘的組件到這個過程處理中。
應用程序服務器
應用程序服務器是事務處理操做的另外一個組件。應用程序服務器處理大部分的應用程序操做而且得到最終用戶應用程序的一些負載。基於前面的例子,咱們能夠看出應用程序服務器在事務處理上添加了另外一個操做層:
到目前爲止,咱們的例子說明了單個的本地事務處理,而且描述了分佈式事務處理模型的五個組件中的四個。第五個組件,事務管理程序只有當事務將要被分配的時候纔會開始被考慮。
分佈式事務處理和事務管理程序
像咱們前面所提到的,一個分佈式事務處理是一個在兩個或更多網絡資源上訪問和更新數據的事務處理。
這些資源能夠由好幾個位於一個單獨服務器上的不一樣的關係型數據庫管理系統組成,好比說Oracle、SQL Server和Sybase;它們也能夠包含存在於若干不一樣的服務器上的同一種數據庫的若干個實例。在任何狀況下,一個分佈式事務處理包括各類的資源管理程序之間的協同做用。這個協同做用是事務管理函數。
事務管理程序負責做出要麼提交(commit)要麼退回(rollback)任何分佈式事務處理的決定。一個提交決定應該致使一個成功的事務處理;而退回操做則是保持數據庫中的數據不變。 JTA指定一個分佈式事務處理中的事務管理程序和另外一個組件之間的標準Java接口:應用程序,應用程序服務器和資源管理程序。 這個關係被顯示在下面的圖表中: 數據庫
在事務管理程序周圍的數字框框相應於JTA的三個接口部分:
1—UserTransaction—javax.transaction.UserTransaction接口提供可以編程地控制事務處理範圍的應用程序。 javax.transaction.UserTransaction方法開啓一個全局事務而且使用調用線程與事務處理關聯。
2—Transaction Manager—javax.transaction.TransactionManager接口容許應用程序服務器來控制表明正在管理的應用程序的事務範圍。
3—XAResource—javax.transaction.xa.XAResource接口是一個基於X/Open CAE Specification的行業標準XA接口的Java映射。
注意,一個限制性環節是經過JDBC驅動程序的XAResource接口的支持。JDBC驅動程序必須支持兩個正常的JDBC交互做用:應用程序和/或應用程序服務器,並且以及JTA的XAResource部分。
編寫應用程序水平代碼的開發者不會關心分佈式事務處理管理的細節。 這是分佈式事務處理基本結構的工做—應用程序服務器、事務管理程序和JDBC驅動程序。應用程序代碼中惟一的須要注意的就是當鏈接處於一個分佈式事務範圍內的時候,不該該調用一個會影響事務邊界的方法。特別的是,一個應用程序不該該調用Connection方法commit、rollback和setAutoCommit(true),由於它們將破壞分佈式事務的基本結構管理。
分佈式事務處理
事務管理程序是分佈式事務基本結構的基本組件;然而JDBC驅動程序和應用程序服務器組件應該具有下面的特徵:
驅動程序應該實現JDBC 2.0應用程序接口,包括Optional Package接口XADataSource和XAConnection以及JTA接口XAResource。
應用程序服務器應該提供一個DataSource類,用來實現與分佈式事務基本結的交互以及一個鏈接池模塊(用於改善性能)。
分佈式事務處理的第一步就是應用程序要發送一個事務請求到事務管理程序。雖然最後的commit/rollback決定把事務做爲一個簡單的邏輯單元來對待,可是仍然可能會包括許多事務分支。一個事務分支與一個到包含在分佈式事務中的每一個資源管理程序相關聯。所以,到三個不一樣的關係數據庫管理的請求須要三個事務分支。每一個事務分支必須由本地資源管理程序提交或者返回。事務管理程序控制事務的邊界,而且負責最後決定應該提交或者返回的所有事務。 這個決定由兩個步驟組成,稱爲Two - Phase Commit Protocol。
在第一步驟中,事務管理程序輪詢全部包含在分佈式事務中的資源管理程序(關係數據庫管理)來看看哪一個能夠準備提交。若是一個資源管理程序不能提交,它將不響應,而且把事務的特定部分返回,以便數據不被修改。
在第二步驟中,事務管理程序判斷否認響應的資源管理程序中是否有可以返回整個事務的。若是沒有否認響應的話,翻譯管理程序提交整個事務而且返回結果到應用程序中。
開發事項管理程序代碼的開發者必須與全部三個JTA接口有關:UserTransaction、TransactionManager和XAResource,這三個接口都被描述在
Sun JTA specification中。JDBC驅動程序開發者只須要關心XAResource接口。這個接口是容許一個資源管理程序參與事務的行業標準X/Open XA協議的Java映射。鏈接XAResource接口的驅動程序組件負責在事務管理程序和資源管理程序之間擔任"翻譯"的任務。下面的章節提供了XAResource調用的例子。
JDBC驅動程序和XAResource
爲了簡化XAResource的說明,這些例子說明了一個應用程序在不包含應用程序服務器和事項管理程序的狀況下應該如何使用JTA。 基本上,這些例子中的應用程序也擔任應用程序服務器和事項管理程序的任務。大部分的企業使用事務管理程序和應用程序服務器,由於它們可以比一個應用程序更可以高效地管理分佈式事務。 然而遵循這些例子,一個應用程序開發者能夠測試在JDBC驅動程序中的JTA支持的健壯性。並且有一些例子可能不是工做在某個特定的數據庫上,這是由於關聯在數據庫上的一些內在的問題。
在使用JTA以前,你必須首先實現一個Xid類用來標識事務(在普通狀況下這將由事務管理程序來處理)。Xid包含三個元素:formatID、gtrid(全局事務標識符)和bqual(分支修飾詞標識符)。
formatID一般是零,這意味着你將使用OSI CCR(Open Systems Interconnection Commitment, Concurrency和Recovery 標準)來命名。若是你要是用另一種格式,那麼formatID應該大於零。-1值意味着Xid爲無效。
gtrid和bqual能夠包含64個字節二進制碼來分別標識全局事務和分支事務。惟一的要求是gtrid和bqual必須是全局惟一的。此外,這能夠經過使用指定在OSI CCR中的命名規則規範來完成。
編程
下面的例子說明Xid的實現: 服務器
import javax.transaction.xa.*;
public class MyXid implements Xid
{
protected int formatId;
protected byte gtrid[];
protected byte bqual[];
public MyXid()
{
}
public MyXid(int formatId, byte gtrid[], byte bqual[])
{
this.formatId = formatId;
this.gtrid = gtrid;
this.bqual = bqual;
}
public int getFormatId()
{
return formatId;
}
public byte[] getBranchQualifier()
{
return bqual;
}
public byte[] getGlobalTransactionId()
{
return gtrid;
}
}
其次,你須要建立一個你要使用的數據庫的數據源:
public DataSource getDataSource()
throws SQLException
{
SQLServerDataSource xaDS = new
com.merant.datadirect.jdbcx.sqlserver.SQLServerDataSource();
xaDS.setDataSourceName("SQLServer");
xaDS.setServerName("server");
xaDS.setPortNumber(1433);
xaDS.setSelectMethod("cursor");
return xaDS;
}
例1—這個例子是用「兩步提交協議」來提交一個事務分支:
XADataSource xaDS;
XAConnection xaCon;
XAResource xaRes;
Xid xid;
Connection con;
Statement stmt;
int ret;
xaDS = getDataSource();
xaCon = xaDS.getXAConnection("jdbc_user", "jdbc_password");
xaRes = xaCon.getXAResource();
con = xaCon.getConnection();
stmt = con.createStatement();
xid = new MyXid(100, new byte[]{0x01}, new byte[]{0x02});
try {
xaRes.start(xid, XAResource.TMNOFLAGS);
stmt.executeUpdate("insert into test_table values (100)");
xaRes.end(xid, XAResource.TMSUCCESS);
ret = xaRes.prepare(xid);
if (ret == XAResource.XA_OK) {
xaRes.commit(xid, false);
}
}
catch (XAException e) {
e.printStackTrace();
}
finally {
stmt.close();
con.close();
xaCon.close();
}
由於全部這些例子中的初始化代碼相同或者很是類似,僅僅是一些重要的地方的代碼由不一樣。
例2—這個例子,與例1類似,說明了一個返回過程:
xaRes.start(xid, XAResource.TMNOFLAGS);
stmt.executeUpdate("insert into test_table values (100)");
xaRes.end(xid, XAResource.TMSUCCESS);
ret = xaRes.prepare(xid);
if (ret == XAResource.XA_OK) {
xaRes.rollback(xid);
}
例3—這個例子說明一個分佈式事務分支如何停止,讓相同的鏈接作本地事務處理,以及它們稍後該如何繼續這個分支。 分佈式事務的兩步提交做用不影響本地事務。
xid = new MyXid(100, new byte[]{0x01}, new byte[]{0x02});
xaRes.start(xid, XAResource.TMNOFLAGS);
stmt.executeUpdate("insert into test_table values (100)");
xaRes.end(xid, XAResource.TMSUSPEND);
∥這個更新在事務範圍以外完成,因此它不受XA返回影響。
stmt.executeUpdate("insert into test_table2 values (111)");
xaRes.start(xid, XAResource.TMRESUME);
stmt.executeUpdate("insert into test_table values (200)");
xaRes.end(xid, XAResource.TMSUCCESS);
ret = xaRes.prepare(xid);
if (ret == XAResource.XA_OK) {
xaRes.rollback(xid);
}
例4—這個例子說明一個XA資源如何分擔不一樣的事務。 建立了兩個事務分支,可是它們不屬於相同的分佈式事務。 JTA容許XA資源在第一個分支上作一個兩步提交,雖然這個資源仍然與第二個分支相關聯。
xid1 = new MyXid(100, new byte[]{0x01}, new byte[]{0x02});
xid2 = new MyXid(100, new byte[]{0x11}, new byte[]{0x22});
xaRes.start(xid1, XAResource.TMNOFLAGS);
stmt.executeUpdate("insert into test_table1 values (100)");
xaRes.end(xid1, XAResource.TMSUCCESS);
xaRes.start(xid2, XAResource.TMNOFLAGS);
ret = xaRes.prepare(xid1);
if (ret == XAResource.XA_OK) {
xaRes.commit(xid2, false);
}
stmt.executeUpdate("insert into test_table2 values (200)");
xaRes.end(xid2, XAResource.TMSUCCESS);
ret = xaRes.prepare(xid2);
if (ret == XAResource.XA_OK) {
xaRes.rollback(xid2);
}
例5—這個例子說明不一樣的鏈接上的事務分支如何鏈接成爲一個單獨的分支,若是它們鏈接到相同的資源管理程序。這個特色改善了分佈式事務的效率,由於它減小了兩步提交處理的數目。兩個鏈接到數據庫服務器上的XA將被建立。每一個鏈接建立它本身的XA資源,正規的JDBC鏈接和語句。在第二個XA資源開始一個事務分支以前,它將察看是否使用和第一個XA資源使用的是同一個資源管理程序。若是這是實例,它將加入在第一個XA鏈接上建立的第一個分支,而不是建立一個新的分支。 稍後,這個事務分支使用XA資源來準備和提交。
xaDS = getDataSource();
xaCon1 = xaDS.getXAConnection("jdbc_user", "jdbc_password");
xaRes1 = xaCon1.getXAResource();
con1 = xaCon1.getConnection();
stmt1 = con1.createStatement();
xid1 = new MyXid(100, new byte[]{0x01}, new byte[]{0x02});
xaRes1.start(xid1, XAResource.TMNOFLAGS);
stmt1.executeUpdate("insert into test_table1 values (100)");
xaRes1.end(xid, XAResource.TMSUCCESS);
xaCon2 = xaDS.getXAConnection("jdbc_user", "jdbc_password");
xaRes2 = xaCon1.getXAResource();
con2 = xaCon1.getConnection();
stmt2 = con1.createStatement();
if (xaRes2.isSameRM(xaRes1)) {
xaRes2.start(xid1, XAResource.TMJOIN);
stmt2.executeUpdate("insert into test_table2 values (100)");
xaRes2.end(xid1, XAResource.TMSUCCESS);
}
else {
xid2 = new MyXid(100, new byte[]{0x01}, new byte[]{0x03});
xaRes2.start(xid2, XAResource.TMNOFLAGS);
stmt2.executeUpdate("insert into test_table2 values (100)");
xaRes2.end(xid2, XAResource.TMSUCCESS);
ret = xaRes2.prepare(xid2);
if (ret == XAResource.XA_OK) {
xaRes2.commit(xid2, false);
}
}
ret = xaRes1.prepare(xid1);
if (ret == XAResource.XA_OK) {
xaRes1.commit(xid1, false);
}
例6—這個例子說明在錯誤恢復的階段,如何恢復準備好的或者快要完成的事務分支。 它首先試圖返回每一個分支;若是它失敗了,它嘗試着讓資源管理程序丟掉關於事務的消息。
MyXid[] xids;
xids = xaRes.recover(XAResource.TMSTARTRSCAN | XAResource.TMENDRSCAN);
for (int i=0; xids!=null && i
try {
xaRes.rollback(xids[i]);
}
catch (XAException ex) {
try {
xaRes.forget(xids[i]);
}
catch (XAException ex1) {
System.out.println("rollback/forget failed: " + ex1.errorCode);
}
}
} 網絡