在NC系列中,咱們發佈了一個 EJB接口後,對接口的調用 事務的commit 和 rollback是全自動的 通常非特殊需求下無需手工處理,那麼NC如何實現的事務無感知 處理呢? java
上文說道 事務的代理是 nc.bs.framework.ejb.CMTEJBServiceHandler 類 轉發給 delegate,咱們去調試下 看看 他如何實現事務開啓結束子事務等:sql
nc.itf.framework.ejb.CMTProxy_Local 咱們進入看看:數據庫
第一個是 開一個全新子事務,第二個是繼承已有事務 沒有就新開。app
protected void beforeCallMethod(int methodId) { Logger.info("Begin Transaction(" + methodId + ")"); MwTookit.setThreadState("nc.bs.mw.naming.BeanBase.beforeCallMethod"); this.setLastCallTime(System.currentTimeMillis()); boolean isCmt = ((HomeBase)this.getEJBLocalHome()).getEJBBeanDescriptor().isCmt(); if (isCmt) { try { this.currentMethodTransectionType = this.getMethodTransectionType(methodId); int isolateLevel = this.getMethodIsolateLevelType(methodId); this.setIerpTransactionManagerProxy(TransactionFactory.getTMProxy()); this.getIerpTransactionManagerProxy().begin(this.currentMethodTransectionType, isolateLevel); } catch (Exception var4) { Logger.error("BeforeCallMethod", var4); } } else { if (this.getIerpUserTransaction() == null) { this.setIerpTransactionManagerProxy((IContainerTransProxy)null); this.setIerpUserTransaction(TransactionFactory.getUTransaction()); } this.getIerpUserTransaction().bindToCurrentThread(); } MwTookit.setThreadState("nc.bs.mw.naming.BeanBase.beforeCallMethod over"); }
2個都開頭第一行調用了 這個方法, 新開子事務 200, 繼承事務穿的 201. 咱們看看這個方法的核心 this.getIerpTransactionManagerProxy().begin(this.currentMethodTransectionType, isolateLevel); 幹了什麼:dom
public void begin(int transType, int isolateLevel) throws NotSupportedException, SystemException, NamingException, TransactionRequiredException { IUAPTransactionManager m_tranManager = (IUAPTransactionManager)tm_local.get(); if (m_tranManager == null) { m_tranManager = new UAPTransactionManager(); tm_local.set(m_tranManager); } ((IUAPTransactionManager)m_tranManager).begin(transType); }
從當前的線程去除 事務管理器,沒有就new一個新的放入線程,而後直接 調用了 begin。 這裏取一次的緣由是 由於是子事務 若是以前有事務 他須要取以前的事務,而後用以前的事務管理器 begin,由於他要統一事務管理器就一個! 咱們繼續:ui
public void begin(int transType) throws NotSupportedException, SystemException { switch(transType) { case 0: this.createTransaction(TransactionContextType.NULL); break; case 1: if (this.tranStack.isEmpty()) { this.createTransaction(TransactionContextType.SOURCE); } else { this.createTransaction(TransactionContextType.JOINED); } break; case 2: if (!this.tranStack.isEmpty()) { this.createTransaction(TransactionContextType.NULL); } else { this.createTransaction(TransactionContextType.JOINED); } break; case 3: this.createTransaction(TransactionContextType.SOURCE); break; case 4: if (this.tranStack.isEmpty()) { throw new SystemException(); } this.createTransaction(TransactionContextType.JOINED); break; case 5: if (!this.tranStack.isEmpty()) { throw new SystemException(); } this.createTransaction(TransactionContextType.NULL); break; case 6: case 7: case 8: case 9: case 10: default: throw new NotSupportedException("trans type error!"); case 11: this.createTransaction(TransactionContextType.JOINED); try { this.setCurInvokeSavePoint(); } catch (SQLException var3) { throw new NotSupportedException("savePoint error!"); } } }
這裏根據事務的開啓類型不一樣 進入不一樣的邏輯,通常是NULL(無事務),JOINED(新支持事務的接口方法被調用 事務嵌套),SOURCE(第一個新事物) 3種,咱們發現 最終事務管理仍是交給了 uap.mw.trans.UAPTransaction類,this
這個類裏 有個關鍵的 uap.mw.ds.DBConnection 爲Value的Map對象, 這個uap.mw.ds.DBConnection是NC對Jdbc最底層的 Connection對象的包裹對象:線程
// // Source code recreated from a .class file by IntelliJ IDEA // (powered by Fernflower decompiler) // package uap.mw.ds; import java.sql.Array; import java.sql.Blob; import java.sql.CallableStatement; import java.sql.Clob; import java.sql.Connection; import java.sql.DatabaseMetaData; import java.sql.NClob; import java.sql.PreparedStatement; import java.sql.SQLClientInfoException; import java.sql.SQLException; import java.sql.SQLWarning; import java.sql.SQLXML; import java.sql.Savepoint; import java.sql.Statement; import java.sql.Struct; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Map; import java.util.Properties; import java.util.UUID; import java.util.Map.Entry; import java.util.concurrent.Executor; import nc.bs.logging.Log; import nc.bs.logging.Logger; import nc.bs.mw.sql.validator.ValidateTool; import uap.mw.ds.exception.DBRunTimeException; import uap.mw.ds.monitor.ConnectionExeInfo; import uap.mw.trans.UAPTransactionManagerProxy; import uap.mw.trans.util.ConnectionStatus; public class DBConnection implements Connection { private static Log log = Log.getInstance(DBConnection.class); private volatile ConnectionStatus status; private Connection realConnection; private UAPDataSource ds; public Exception trace; public Thread lastInUseThread; private long activeTime; private long lastInUseTime; private ConnectionExeInfo lastRunTimeInfo; private ConnectionExeInfo curRunTimeInfo; private String connID; private int prepareStatementNum; private Map<String, UAPPreparedStatement> statementCached; private Map<Integer, UAPPreparedStatement> stmtTracksMap; private Connection getRealConnection() { if (this.isClosed()) { throw new DBRunTimeException(" connection is Closed"); } else { return this.realConnection; } } public Connection getPhysicalConnection() { return this.realConnection; } DBConnection(Connection realConnection, final UAPDataSource ds) { this.status = ConnectionStatus.FREE; this.realConnection = null; this.ds = null; this.activeTime = -1L; this.lastInUseTime = -1L; this.connID = ""; this.prepareStatementNum = 0; this.statementCached = null; this.stmtTracksMap = new HashMap(); this.realConnection = realConnection; this.ds = ds; this.connID = UUID.randomUUID().toString(); this.statementCached = new LinkedHashMap<String, UAPPreparedStatement>(ds.getPreparedStatementCacheSize(), 1.0F, true) { public UAPPreparedStatement remove(Object sql) { UAPPreparedStatement stmt = (UAPPreparedStatement)super.remove(sql); if (stmt != null) { stmt.isPooled = false; } return stmt; } protected boolean removeEldestEntry(Entry<String, UAPPreparedStatement> eldest) { boolean needRemove = this.size() > ds.getPreparedStatementCacheSize(); if (needRemove) { ((UAPPreparedStatement)eldest.getValue()).isPooled = false; if (!((UAPPreparedStatement)eldest.getValue()).inUse) { DBConnection.this.closeStmt((Statement)eldest.getValue()); } } return needRemove; } }; } public ConnectionStatus getStatus() { return this.status; } public void setAutoCommit(boolean b) throws SQLException { this.getRealConnection().setAutoCommit(b); } void setStatus(ConnectionStatus status) { this.status = status; } public void setLastUseMeThread(Thread newLastUseMeThread) { if (newLastUseMeThread == null) { this.trace = null; this.lastInUseTime = -1L; } else { this.trace = new Exception(); this.lastInUseTime = System.currentTimeMillis(); } this.lastInUseThread = newLastUseMeThread; } public void commit() throws SQLException { if (!UAPTransactionManagerProxy.isTrans()) { this.realCommit(); } } public void realCommit() throws SQLException { this.getRealConnection().commit(); this.clearStatements(); } public void rollback() throws SQLException { if (!UAPTransactionManagerProxy.isTrans()) { throw new SQLException("no Transaction ,can not ROLLBACK!"); } } public void rollback(Savepoint savepoint) throws SQLException { if (!UAPTransactionManagerProxy.isTrans()) { throw new SQLException("no Transaction ,can not ROLLBACK!"); } } public void realRollback() throws SQLException { this.getRealConnection().rollback(); } public void realRollback(Savepoint savepoint) throws SQLException { this.getRealConnection().rollback(savepoint); } public void close() throws SQLException { if (!UAPTransactionManagerProxy.isTrans()) { if (this.getStatus() != ConnectionStatus.INUSE) { throw new SQLException("Connection state Error:the connection state is " + this.getStatus()); } if (!this.getAutoCommit()) { this.commit(); } this.backPool(); } } public void backPool() throws SQLException { this.clearStatements(); this.ds.closeConnection(this); } public <T> T unwrap(Class<T> iface) throws SQLException { return this.getRealConnection().unwrap(iface); } public boolean isWrapperFor(Class<?> iface) throws SQLException { return this.getRealConnection().isWrapperFor(iface); } public Statement createStatement() throws SQLException { this.updateActivetime(); return new UAPStatement(this.getRealConnection().createStatement(), this.curRunTimeInfo); } public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException { return new UAPStatement(this.getRealConnection().createStatement(resultSetType, resultSetConcurrency), this.curRunTimeInfo); } private UAPPreparedStatement prepareCachedStatement(String sql) throws SQLException { UAPPreparedStatement ps = new UAPPreparedStatement(this, this.getRealConnection().prepareStatement(sql), this.curRunTimeInfo, true, this.prepareStatementNum++); this.statementCached.put(sql, ps); return ps; } public PreparedStatement prepareStatement(String sql) throws SQLException { UAPPreparedStatement ps = null; if (this.ds.getPreparedStatementCacheSize() <= 0) { ps = new UAPPreparedStatement(this, this.getRealConnection().prepareStatement(sql), this.curRunTimeInfo, false, 0); } else { ps = (UAPPreparedStatement)this.statementCached.get(sql); if (ps == null) { ps = this.prepareCachedStatement(sql); } else if (ps.inUse) { this.statementCached.remove(sql); ps = this.prepareCachedStatement(sql); } else { try { ps.clearParameters(); } catch (SQLException var4) { this.statementCached.remove(sql); this.closeStmt(ps); ps = this.prepareCachedStatement(sql); } } } ps.inUse = true; this.stmtTracksMap.put(ps.getNum(), ps); return ps; } private void clearStatements() { UAPPreparedStatement s; try { for(Iterator i$ = this.stmtTracksMap.values().iterator(); i$.hasNext(); this.closeStmt(s)) { s = (UAPPreparedStatement)i$.next(); if (s.inUse) { Logger.warn("statement in use!"); } } } finally { this.prepareStatementNum = 0; this.stmtTracksMap.clear(); } } private void closeStmt(Statement stmt) { try { if (stmt != null) { stmt.close(); stmt = null; } } catch (SQLException var3) { } } public void removeStatementFromIndex(Integer statementNum) { this.stmtTracksMap.remove(statementNum); } /** @deprecated */ @Deprecated public CallableStatement prepareCall(String sql) throws SQLException { return this.getRealConnection().prepareCall(sql); } public String nativeSQL(String sql) throws SQLException { return this.getRealConnection().nativeSQL(sql); } public boolean getAutoCommit() throws SQLException { return this.getRealConnection().getAutoCommit(); } public boolean isClosed() { return this.status != ConnectionStatus.INUSE; } public DatabaseMetaData getMetaData() throws SQLException { return this.getRealConnection().getMetaData(); } public void setReadOnly(boolean readOnly) throws SQLException { this.getRealConnection().setReadOnly(readOnly); } public boolean isReadOnly() throws SQLException { return this.getRealConnection().isReadOnly(); } public void setCatalog(String catalog) throws SQLException { this.getRealConnection().setCatalog(catalog); } public String getCatalog() throws SQLException { return this.getRealConnection().getCatalog(); } public void setTransactionIsolation(int level) throws SQLException { this.getRealConnection().setTransactionIsolation(level); } public int getTransactionIsolation() throws SQLException { return this.getRealConnection().getTransactionIsolation(); } public SQLWarning getWarnings() throws SQLException { return this.getRealConnection().getWarnings(); } public void clearWarnings() throws SQLException { this.updateActivetime(); this.getRealConnection().clearWarnings(); } /** @deprecated */ @Deprecated public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException { throw new UnsupportedOperationException(); } /** @deprecated */ @Deprecated public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException { throw new UnsupportedOperationException(); } public Map<String, Class<?>> getTypeMap() throws SQLException { return this.getRealConnection().getTypeMap(); } public void setTypeMap(Map<String, Class<?>> map) throws SQLException { this.getRealConnection().setTypeMap(map); } public void setHoldability(int holdability) throws SQLException { this.getRealConnection().setHoldability(holdability); } public int getHoldability() throws SQLException { return this.getRealConnection().getHoldability(); } public Savepoint setSavepoint() throws SQLException { return this.getRealConnection().setSavepoint(); } public Savepoint setSavepoint(String name) throws SQLException { return this.getRealConnection().setSavepoint(name); } public void releaseSavepoint(Savepoint savepoint) throws SQLException { this.getRealConnection().releaseSavepoint(savepoint); this.updateActivetime(); } /** @deprecated */ @Deprecated public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { throw new UnsupportedOperationException(); } /** @deprecated */ @Deprecated public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { throw new UnsupportedOperationException(); } /** @deprecated */ @Deprecated public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { throw new UnsupportedOperationException(); } /** @deprecated */ @Deprecated public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException { throw new UnsupportedOperationException(); } /** @deprecated */ @Deprecated public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException { throw new UnsupportedOperationException(); } /** @deprecated */ @Deprecated public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException { throw new UnsupportedOperationException(); } public Clob createClob() throws SQLException { this.updateActivetime(); return this.getRealConnection().createClob(); } public Blob createBlob() throws SQLException { this.updateActivetime(); return this.getRealConnection().createBlob(); } public NClob createNClob() throws SQLException { this.updateActivetime(); return this.getRealConnection().createNClob(); } public SQLXML createSQLXML() throws SQLException { this.updateActivetime(); return this.getRealConnection().createSQLXML(); } public boolean isValid(int timeout) throws SQLException { return this.getRealConnection().isValid(timeout); } public void setClientInfo(String name, String value) throws SQLClientInfoException { this.getRealConnection().setClientInfo(name, value); } public void setClientInfo(Properties properties) throws SQLClientInfoException { this.getRealConnection().setClientInfo(properties); } public Properties getClientInfo() throws SQLException { return this.getRealConnection().getClientInfo(); } public String getClientInfo(String name) throws SQLException { return this.getRealConnection().getClientInfo(name); } public Array createArrayOf(String typeName, Object[] elements) throws SQLException { this.updateActivetime(); return this.getRealConnection().createArrayOf(typeName, elements); } public Struct createStruct(String typeName, Object[] attributes) throws SQLException { this.updateActivetime(); return this.getRealConnection().createStruct(typeName, attributes); } public long getActiveTime() { return this.activeTime; } public void setActiveTime(long activeTime) { this.activeTime = activeTime; } private void updateActivetime() { this.activeTime = System.currentTimeMillis(); } public boolean validate() { boolean m_useAble = false; try { if (ValidateTool.validate(this, this.ds.getDatabaseType())) { m_useAble = true; } else { m_useAble = false; } } catch (Exception var3) { log.warn("This connection:\"" + this.realConnection + "\" is not usable, it will be removed!", var3); m_useAble = false; } return m_useAble; } public String getDataSourceName() { return this.ds.getDataSourceName(); } public long getLastInUseTime() { return this.lastInUseTime; } public void changeStates(ConnectionStatus newStatus) { StatusHandlerFactory.getStatusHandler(this.getStatus(), this).changeStatus(newStatus); } ConnectionExeInfo getLastConnectionExeInfo() { return this.lastRunTimeInfo; } ConnectionExeInfo getCurConnectionExeInfo() { return this.curRunTimeInfo; } void setLastRunTimeInfo(ConnectionExeInfo lastRunTimeInfo) { this.lastRunTimeInfo = lastRunTimeInfo; } void setCurRunTimeInfo(ConnectionExeInfo curRunTimeInfo) { this.curRunTimeInfo = curRunTimeInfo; if (curRunTimeInfo != null) { curRunTimeInfo.setConnID(this.connID); } } public String toString() { return this.ds.getDataSourceName() + "::" + this.connID; } public String getCurConnectionExeInfoInfo() { return this.getCurConnectionExeInfo() != null ? this.getCurConnectionExeInfo().getInfo() : null; } long getFreeTime() { return this.getStatus() != ConnectionStatus.FREE ? 0L : this.curRunTimeInfo.getFreeTime(); } void realDestroyConn() throws SQLException { this.getRealConnection().close(); } public void setSchema(String schema) throws SQLException { } public String getSchema() throws SQLException { return null; } public void abort(Executor executor) throws SQLException { } public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException { } public int getNetworkTimeout() throws SQLException { return 0; } }
能夠看到裏面 記錄了真正的conn對象 重寫了 commit等方法 邏輯 不進行真實提交, 同時記錄了SPR的信息 好比sql記錄:3d
// // Source code recreated from a .class file by IntelliJ IDEA // (powered by Fernflower decompiler) // package uap.mw.ds.monitor; import java.io.PrintWriter; import java.io.StringWriter; import java.util.Iterator; import java.util.LinkedList; import javax.transaction.Transaction; import nc.bs.logging.Logger; import uap.mw.ds.util.DSUtil; import uap.mw.trans.UAPTransaction; import uap.mw.trans.util.TransUtil; public class ConnectionExeInfo { private Thread curThread = null; private String transactionID; private long startTime = -1L; private long endTime = -1L; private LinkedList<SQLExeInfo> historySql = new ConnectionExeInfo.SQLExeList(); private String preSQL = ""; private boolean isClosed = false; private String connID = null; private Throwable connTrace = null; public ConnectionExeInfo() { this.connTrace = new Exception(); } public void startConn() { this.startTime = System.currentTimeMillis(); Transaction trans = TransUtil.getCurTransactionInfo(); if (trans instanceof UAPTransaction) { this.transactionID = ((UAPTransaction)trans).getKey(); } this.curThread = Thread.currentThread(); } public void startSQL(String sql, String[] params) { this.historySql.add(new SQLExeInfo(sql, params)); } public void endSQL(String sql, String[] params) { ((SQLExeInfo)this.historySql.peek()).endSql(sql); } public void preSQL(String sql) { this.preSQL = sql; } public void addBatch(String[] params) { ((SQLExeInfo)this.historySql.peek()).addBatch(params); } public void startPreSQL() { this.historySql.add(new SQLExeInfo(this.preSQL, true)); } public void endPreSQL() { ((SQLExeInfo)this.historySql.peek()).endSql(this.preSQL); } public void endConn() { this.endTime = System.currentTimeMillis(); this.isClosed = true; this.curThread = null; } public long getConnUseTime() { return this.endTime < 0L ? System.currentTimeMillis() - this.startTime : this.endTime - this.startTime; } public Thread getUseThread() { return this.curThread; } public String getTransactionID() { return this.transactionID; } public String getConnID() { return this.connID; } public void setConnID(String connID) { this.connID = connID; } public String getInfo() { StringBuffer info = new StringBuffer(""); info.append("connID::" + this.getConnID() + DSUtil.getBrTag()); info.append("curThread::" + this.getUseThread() + DSUtil.getBrTag()); Logger.error("", this.connTrace); info.append("ThreadDump::" + this.getStackTrace(this.connTrace)); info.append("transactionID::" + this.getTransactionID() + DSUtil.getBrTag()); info.append("isClosed::" + this.isClosed + DSUtil.getBrTag()); info.append("ConnUseTime::" + this.getConnUseTime() + DSUtil.getBrTag()); Iterator i$ = this.historySql.iterator(); while(i$.hasNext()) { SQLExeInfo sqlInfo = (SQLExeInfo)i$.next(); info.append(sqlInfo.toString()); } return info.toString(); } private String getStackTrace(Throwable e) { StringWriter sWriter = new StringWriter(); PrintWriter writer = new PrintWriter(sWriter); String var4; try { e.printStackTrace(writer); var4 = sWriter.toString(); } finally { writer.close(); } return var4; } public long getFreeTime() { return this.isClosed ? System.currentTimeMillis() - this.endTime : 0L; } public Throwable getConnTrace() { return this.connTrace; } public void setConnTrace(Throwable connTrace) { this.connTrace = connTrace; } class SQLExeList<T> extends LinkedList<T> { private static final long serialVersionUID = 1L; SQLExeList() { } public boolean add(T e) { if (this.size() > 30) { this.remove(); } return super.add(e); } } }
那麼SPR如何記錄SQL歷史呢:代理
在NC最底層的JDBC查詢實現 nc.jdbc.framework.JdbcSession#executeQuery(java.lang.String, nc.jdbc.framework.processor.ResultSetProcessor) 方法 咱們發現了一個關鍵調用:rs = statement.executeQuery(sql); 而其中 statement是uap.mw.ds.DBConnection#createStatement() 建立的: uap.mw.ds.UAPStatement對象
// // Source code recreated from a .class file by IntelliJ IDEA // (powered by Fernflower decompiler) // package uap.mw.ds; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import uap.mw.ds.monitor.ConnectionExeInfo; public class UAPStatement extends UAPStatementSkelecton { private ConnectionExeInfo runTimeInfo = null; UAPStatement(Statement ps, ConnectionExeInfo runTimeInfo) { super(ps); this.runTimeInfo = runTimeInfo; } public int executeUpdate(String sql) throws SQLException { this.runTimeInfo.startSQL(sql, (String[])null); boolean var2 = false; int num; try { num = super.executeUpdate(sql); } finally { this.runTimeInfo.endSQL(sql, (String[])null); } return num; } public ResultSet executeQuery(String sql) throws SQLException { this.runTimeInfo.startSQL(sql, (String[])null); ResultSet result = null; try { result = super.executeQuery(sql); } finally { this.runTimeInfo.endSQL(sql, (String[])null); } return result; } }
就此 謎題解開, 這個類裏 直接調用了 this.runTimeInfo.startSQL(sql, (String[])null); 方法記錄了你的SQL,其餘SQL同理。
咱們繼續研究 事務開啓的軌跡,接着上面 咱們發現 事務是調用了 this.tranStack.push(tranText); 方法,這個是一個 鏈表 存儲的是 每個接口方法調用的事務上下文,一個被重寫的內部類 咱們看看push方法:
class UAPTransactionManager implements IUAPTransactionManager, TransactionManager { private static Log log = Log.getInstance(IerpTransactionManager.class); private Thread curThread; private int m_nTranTimeOut; private int m_tranProp; private LinkedList<UAPTransactionContext> tranStack = new LinkedList<UAPTransactionContext>() { public UAPTransactionContext pop() { UAPTransactionContext curContext = (UAPTransactionContext)super.pop(); if (curContext.getTransType() == TransactionContextType.SOURCE || curContext.getTransType() == TransactionContextType.NULL) { UAPTransactionManager.this.resumeCurTrans(); } return curContext; } public void push(UAPTransactionContext item) { if (item.getTransType() == TransactionContextType.SOURCE || item.getTransType() == TransactionContextType.NULL) { UAPTransactionManager.this.suspendCurTrans(); } super.push(item); } };
UAPTransactionManager.this.suspendCurTrans(); 繼續看。。。
private void suspendCurTrans() { if (!this.tranStack.isEmpty()) { UAPTransactionContext transContext = (UAPTransactionContext)this.tranStack.peek(); UAPTransaction uapTran = (UAPTransaction)transContext.getTransaction(); uapTran.suspendAllConnection(); } }
由於最後一個事務上下文是push的,因此是壓入頂部,因此這裏是peek 獲取但不移除頂部第一個元素,也就是 後進先出原則。
拿到後 用最後一次事務上下文 暫停上下文持有的全部數據庫連接。
public void suspendAllConnection() { Iterator i$ = this.getConnectionMap().values().iterator(); while(i$.hasNext()) { DBConnection conn = (DBConnection)i$.next(); conn.changeStates(ConnectionStatus.INUSE_SUSPEND); } }
其實就是拿到全部鏈接 進行狀態記錄下, 那麼問題就是 這個連接集合他是如何維護的,只要知道了 他什麼時候把db的連接對象放入這個map 咱們就知道了 他事務管理的實現原理,由於他最後無非就是 經過惟一的線程事務管理器 拿到全部事務上下文 根據事務上下文是否子事務 依次 循環裏面的db連接集合(上面的DBConnect對象) 對他們集體進行 提交或回滾(調用 realCommit realRollback方法 而後觸發調用 真正的jdbc conn對象的對應方法):
進過調試咱們發現 在 uap.mw.trans.UAPTransaction#enlistConnResource 這個方法提供了 當前事務上下文(棧頂的事務上下文)對db連接 存入到上面的db連接集合裏, 觸發方法調用在:
/** * 構造默認JdbcSession該JdbcSession會默認從當前訪問的DataSource獲得鏈接 */ public JdbcSession() throws DbException { try { Connection con = ConnectionFactory.getConnection(); dbType = DBUtil.getDbType(con); // dbType = DataSourceCenter.getInstance().getDatabaseType(); this.conn = con; } catch (SQLException e) { throw ExceptionFactory.getException(dbType, e.getMessage(), e); } }
獲取連接調用的是 nc.jdbc.framework.DataSourceCenter 的 Connection dummy = DataSourceCenter.getInstance().getConnection();
走到了 uap.mw.ds.UAPDataSource#getTransConnection 方法 後調用
private DBConnection getTransConnection() throws SQLException { IUAPTransactionManager curTranManager = UAPTransactionManagerProxy.getCurTransManager(); UAPTransaction curTrans = (UAPTransaction)curTranManager.getTransaction(); DBConnection conn = curTrans.getConnectionFromCurTrans(this.dataSourceName); if (conn == null) { conn = this.connPool.getConnection(); if (conn != null) { this.setConnToUse(conn, true, curTrans, curTranManager); } } return conn; }
而後:
private void setConnToUse(DBConnection conn, boolean isTrans, UAPTransaction curTrans, IUAPTransactionManager curTranManager) throws SQLException { try { conn.changeStates(ConnectionStatus.INUSE); if (isTrans) { curTrans.enlistConnResource(this.dataSourceName, conn); conn.setAutoCommit(false); } } catch (Exception var6) { conn.setStatus(ConnectionStatus.NEED_DESTROY); conn.realDestroyConn(); this.connPool.releasePhore(); DBLogger.error("set Conn State error!", var6); throw new SQLException("set Conn State error!!"); } }
至此,咱們就已經瞭解了 整個NC事務的實現準備。 那麼最後 就是 事務管理器的 commit和rollback的代碼了:
public void uap.mw.trans.UAPTransactionManagerProxy#end(Exception ex) { IUAPTransactionManager m_tranManager = (IUAPTransactionManager)tm_local.get(); try { if (ex != null) { if (m_tranManager.getTranContext().needRBPoint()) { if (!((UAPTransaction)((UAPTransaction)m_tranManager.getTranContext().getTransaction())).getRollbackOnly()) { m_tranManager.rollBackToCurInvokePoint(); return; } } else { m_tranManager.setCurTransRollBack(); } } m_tranManager.commit(); } catch (Exception var4) { log.error("", var4); } }
而後根據是否有異常決定直接 commit 仍是 rollback到某個檢查點 也就是 當前棧頂的新獨立子事務!
public void uap.mw.trans.UAPTransactionManager#rollBackToCurInvokePoint() throws SQLException { UAPTransactionContext curTransContext = (UAPTransactionContext)this.tranStack.pop(); UAPTransaction trans = (UAPTransaction)this.getTransaction(); Map<String, Savepoint> savePointMap = curTransContext.getSavePointMap(); Map<String, DBConnection> connMap = trans.getConnectionMap(); Iterator i$ = connMap.keySet().iterator(); while(i$.hasNext()) { String dsName = (String)i$.next(); DBConnection conn = (DBConnection)connMap.get(dsName); Savepoint sPoint = (Savepoint)savePointMap.get(dsName); if (sPoint != null) { conn.realRollback(sPoint); } else { conn.realRollback(); } } }
public void setCurTransRollBack() throws IllegalStateException, SystemException { UAPTransactionContext context = this.getTranContext(); Transaction trans = context.getTransaction(); if (trans != null) { trans.setRollbackOnly(); } }
最終的提交實現:
public void uap.mw.trans.UAPTransactionManager#commit() throws RollbackException, IllegalStateException, SystemException, HeuristicRollbackException, SecurityException, HeuristicMixedException { UAPTransactionContext curTransContext = (UAPTransactionContext)this.tranStack.pop(); if (curTransContext != null && !curTransContext.isNullTrans() && !curTransContext.isJoined()) { try { Transaction curTran = curTransContext.getTransaction(); List<Synchronization> synList = ((UAPTransaction)curTran).getSynchronization(); Iterator i$; Synchronization s; if (synList != null && synList.size() > 0) { i$ = synList.iterator(); while(i$.hasNext()) { s = (Synchronization)i$.next(); s.beforeCompletion(); } } try { curTran.commit(); } catch (Exception var9) { log.error("commit error", var9); } if (synList != null && synList.size() > 0) { i$ = synList.iterator(); while(i$.hasNext()) { s = (Synchronization)i$.next(); s.afterCompletion(curTran.getStatus()); } } } finally { if (this.tranStack.isEmpty()) { this.removeCurTransManager(); } } } }
public void uap.mw.trans.UAPTransaction#commit() throws SecurityException, SystemException, RollbackException, HeuristicMixedException, HeuristicRollbackException { if (this.m_bRollbackOnly) { this.rollback(); } else { this.m_status = 8; boolean hasException = false; for(Iterator i$ = this.connecionMap.values().iterator(); i$.hasNext(); this.m_status = 3) { DBConnection conn = (DBConnection)i$.next(); try { conn.realCommit(); } catch (Exception var13) { DBLogger.error(var13.getMessage() + ":" + conn.getDataSourceName(), var13); hasException = true; } finally { try { conn.backPool(); } catch (SQLException var12) { DBLogger.error(var12.getMessage(), var12); } } } this.connecionMap.clear(); this.transEnd(); if (hasException) { throw new SystemException("commit error,Please check log"); } } }