NC系列,單機NC事務管理實現 底層原理深度解析

在NC系列中,咱們發佈了一個 EJB接口後,對接口的調用 事務的commit 和 rollback是全自動的 通常非特殊需求下無需手工處理,那麼NC如何實現的事務無感知 處理呢? java

上文說道 事務的代理是 nc.bs.framework.ejb.CMTEJBServiceHandler 類 轉發給 delegate,咱們去調試下 看看 他如何實現事務開啓結束子事務等:sql

nc.itf.framework.ejb.CMTProxy_Local 咱們進入看看:數據庫

第一個是 開一個全新子事務,第二個是繼承已有事務 沒有就新開。app

protected void beforeCallMethod(int methodId) {
        Logger.info("Begin Transaction(" + methodId + ")");
        MwTookit.setThreadState("nc.bs.mw.naming.BeanBase.beforeCallMethod");
        this.setLastCallTime(System.currentTimeMillis());
        boolean isCmt = ((HomeBase)this.getEJBLocalHome()).getEJBBeanDescriptor().isCmt();
        if (isCmt) {
            try {
                this.currentMethodTransectionType = this.getMethodTransectionType(methodId);
                int isolateLevel = this.getMethodIsolateLevelType(methodId);
                this.setIerpTransactionManagerProxy(TransactionFactory.getTMProxy());
                this.getIerpTransactionManagerProxy().begin(this.currentMethodTransectionType, isolateLevel);
            } catch (Exception var4) {
                Logger.error("BeforeCallMethod", var4);
            }
        } else {
            if (this.getIerpUserTransaction() == null) {
                this.setIerpTransactionManagerProxy((IContainerTransProxy)null);
                this.setIerpUserTransaction(TransactionFactory.getUTransaction());
            }

            this.getIerpUserTransaction().bindToCurrentThread();
        }

        MwTookit.setThreadState("nc.bs.mw.naming.BeanBase.beforeCallMethod over");
    }

2個都開頭第一行調用了 這個方法, 新開子事務 200, 繼承事務穿的 201. 咱們看看這個方法的核心 this.getIerpTransactionManagerProxy().begin(this.currentMethodTransectionType, isolateLevel); 幹了什麼:dom

public void begin(int transType, int isolateLevel) throws NotSupportedException, SystemException, NamingException, TransactionRequiredException {
        IUAPTransactionManager m_tranManager = (IUAPTransactionManager)tm_local.get();
        if (m_tranManager == null) {
            m_tranManager = new UAPTransactionManager();
            tm_local.set(m_tranManager);
        }

        ((IUAPTransactionManager)m_tranManager).begin(transType);
    }

從當前的線程去除 事務管理器,沒有就new一個新的放入線程,而後直接 調用了 begin。 這裏取一次的緣由是 由於是子事務 若是以前有事務 他須要取以前的事務,而後用以前的事務管理器 begin,由於他要統一事務管理器就一個!  咱們繼續:ui

public void begin(int transType) throws NotSupportedException, SystemException {
        switch(transType) {
        case 0:
            this.createTransaction(TransactionContextType.NULL);
            break;
        case 1:
            if (this.tranStack.isEmpty()) {
                this.createTransaction(TransactionContextType.SOURCE);
            } else {
                this.createTransaction(TransactionContextType.JOINED);
            }
            break;
        case 2:
            if (!this.tranStack.isEmpty()) {
                this.createTransaction(TransactionContextType.NULL);
            } else {
                this.createTransaction(TransactionContextType.JOINED);
            }
            break;
        case 3:
            this.createTransaction(TransactionContextType.SOURCE);
            break;
        case 4:
            if (this.tranStack.isEmpty()) {
                throw new SystemException();
            }

            this.createTransaction(TransactionContextType.JOINED);
            break;
        case 5:
            if (!this.tranStack.isEmpty()) {
                throw new SystemException();
            }

            this.createTransaction(TransactionContextType.NULL);
            break;
        case 6:
        case 7:
        case 8:
        case 9:
        case 10:
        default:
            throw new NotSupportedException("trans type error!");
        case 11:
            this.createTransaction(TransactionContextType.JOINED);

            try {
                this.setCurInvokeSavePoint();
            } catch (SQLException var3) {
                throw new NotSupportedException("savePoint error!");
            }
        }

    }

這裏根據事務的開啓類型不一樣 進入不一樣的邏輯,通常是NULL(無事務),JOINED(新支持事務的接口方法被調用 事務嵌套),SOURCE(第一個新事物) 3種,咱們發現 最終事務管理仍是交給了 uap.mw.trans.UAPTransaction類,this

這個類裏 有個關鍵的 uap.mw.ds.DBConnection 爲Value的Map對象, 這個uap.mw.ds.DBConnection是NC對Jdbc最底層的 Connection對象的包裹對象:線程

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//

package uap.mw.ds;

import java.sql.Array;
import java.sql.Blob;
import java.sql.CallableStatement;
import java.sql.Clob;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.NClob;
import java.sql.PreparedStatement;
import java.sql.SQLClientInfoException;
import java.sql.SQLException;
import java.sql.SQLWarning;
import java.sql.SQLXML;
import java.sql.Savepoint;
import java.sql.Statement;
import java.sql.Struct;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.Map.Entry;
import java.util.concurrent.Executor;
import nc.bs.logging.Log;
import nc.bs.logging.Logger;
import nc.bs.mw.sql.validator.ValidateTool;
import uap.mw.ds.exception.DBRunTimeException;
import uap.mw.ds.monitor.ConnectionExeInfo;
import uap.mw.trans.UAPTransactionManagerProxy;
import uap.mw.trans.util.ConnectionStatus;

public class DBConnection implements Connection {
    private static Log log = Log.getInstance(DBConnection.class);
    private volatile ConnectionStatus status;
    private Connection realConnection;
    private UAPDataSource ds;
    public Exception trace;
    public Thread lastInUseThread;
    private long activeTime;
    private long lastInUseTime;
    private ConnectionExeInfo lastRunTimeInfo;
    private ConnectionExeInfo curRunTimeInfo;
    private String connID;
    private int prepareStatementNum;
    private Map<String, UAPPreparedStatement> statementCached;
    private Map<Integer, UAPPreparedStatement> stmtTracksMap;

    private Connection getRealConnection() {
        if (this.isClosed()) {
            throw new DBRunTimeException(" connection is Closed");
        } else {
            return this.realConnection;
        }
    }

    public Connection getPhysicalConnection() {
        return this.realConnection;
    }

    DBConnection(Connection realConnection, final UAPDataSource ds) {
        this.status = ConnectionStatus.FREE;
        this.realConnection = null;
        this.ds = null;
        this.activeTime = -1L;
        this.lastInUseTime = -1L;
        this.connID = "";
        this.prepareStatementNum = 0;
        this.statementCached = null;
        this.stmtTracksMap = new HashMap();
        this.realConnection = realConnection;
        this.ds = ds;
        this.connID = UUID.randomUUID().toString();
        this.statementCached = new LinkedHashMap<String, UAPPreparedStatement>(ds.getPreparedStatementCacheSize(), 1.0F, true) {
            public UAPPreparedStatement remove(Object sql) {
                UAPPreparedStatement stmt = (UAPPreparedStatement)super.remove(sql);
                if (stmt != null) {
                    stmt.isPooled = false;
                }

                return stmt;
            }

            protected boolean removeEldestEntry(Entry<String, UAPPreparedStatement> eldest) {
                boolean needRemove = this.size() > ds.getPreparedStatementCacheSize();
                if (needRemove) {
                    ((UAPPreparedStatement)eldest.getValue()).isPooled = false;
                    if (!((UAPPreparedStatement)eldest.getValue()).inUse) {
                        DBConnection.this.closeStmt((Statement)eldest.getValue());
                    }
                }

                return needRemove;
            }
        };
    }

    public ConnectionStatus getStatus() {
        return this.status;
    }

    public void setAutoCommit(boolean b) throws SQLException {
        this.getRealConnection().setAutoCommit(b);
    }

    void setStatus(ConnectionStatus status) {
        this.status = status;
    }

    public void setLastUseMeThread(Thread newLastUseMeThread) {
        if (newLastUseMeThread == null) {
            this.trace = null;
            this.lastInUseTime = -1L;
        } else {
            this.trace = new Exception();
            this.lastInUseTime = System.currentTimeMillis();
        }

        this.lastInUseThread = newLastUseMeThread;
    }

    public void commit() throws SQLException {
        if (!UAPTransactionManagerProxy.isTrans()) {
            this.realCommit();
        }

    }

    public void realCommit() throws SQLException {
        this.getRealConnection().commit();
        this.clearStatements();
    }

    public void rollback() throws SQLException {
        if (!UAPTransactionManagerProxy.isTrans()) {
            throw new SQLException("no Transaction ,can not ROLLBACK!");
        }
    }

    public void rollback(Savepoint savepoint) throws SQLException {
        if (!UAPTransactionManagerProxy.isTrans()) {
            throw new SQLException("no Transaction ,can not ROLLBACK!");
        }
    }

    public void realRollback() throws SQLException {
        this.getRealConnection().rollback();
    }

    public void realRollback(Savepoint savepoint) throws SQLException {
        this.getRealConnection().rollback(savepoint);
    }

    public void close() throws SQLException {
        if (!UAPTransactionManagerProxy.isTrans()) {
            if (this.getStatus() != ConnectionStatus.INUSE) {
                throw new SQLException("Connection state Error:the connection state is " + this.getStatus());
            }

            if (!this.getAutoCommit()) {
                this.commit();
            }

            this.backPool();
        }

    }

    public void backPool() throws SQLException {
        this.clearStatements();
        this.ds.closeConnection(this);
    }

    public <T> T unwrap(Class<T> iface) throws SQLException {
        return this.getRealConnection().unwrap(iface);
    }

    public boolean isWrapperFor(Class<?> iface) throws SQLException {
        return this.getRealConnection().isWrapperFor(iface);
    }

    public Statement createStatement() throws SQLException {
        this.updateActivetime();
        return new UAPStatement(this.getRealConnection().createStatement(), this.curRunTimeInfo);
    }

    public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException {
        return new UAPStatement(this.getRealConnection().createStatement(resultSetType, resultSetConcurrency), this.curRunTimeInfo);
    }

    private UAPPreparedStatement prepareCachedStatement(String sql) throws SQLException {
        UAPPreparedStatement ps = new UAPPreparedStatement(this, this.getRealConnection().prepareStatement(sql), this.curRunTimeInfo, true, this.prepareStatementNum++);
        this.statementCached.put(sql, ps);
        return ps;
    }

    public PreparedStatement prepareStatement(String sql) throws SQLException {
        UAPPreparedStatement ps = null;
        if (this.ds.getPreparedStatementCacheSize() <= 0) {
            ps = new UAPPreparedStatement(this, this.getRealConnection().prepareStatement(sql), this.curRunTimeInfo, false, 0);
        } else {
            ps = (UAPPreparedStatement)this.statementCached.get(sql);
            if (ps == null) {
                ps = this.prepareCachedStatement(sql);
            } else if (ps.inUse) {
                this.statementCached.remove(sql);
                ps = this.prepareCachedStatement(sql);
            } else {
                try {
                    ps.clearParameters();
                } catch (SQLException var4) {
                    this.statementCached.remove(sql);
                    this.closeStmt(ps);
                    ps = this.prepareCachedStatement(sql);
                }
            }
        }

        ps.inUse = true;
        this.stmtTracksMap.put(ps.getNum(), ps);
        return ps;
    }

    private void clearStatements() {
        UAPPreparedStatement s;
        try {
            for(Iterator i$ = this.stmtTracksMap.values().iterator(); i$.hasNext(); this.closeStmt(s)) {
                s = (UAPPreparedStatement)i$.next();
                if (s.inUse) {
                    Logger.warn("statement in use!");
                }
            }
        } finally {
            this.prepareStatementNum = 0;
            this.stmtTracksMap.clear();
        }

    }

    private void closeStmt(Statement stmt) {
        try {
            if (stmt != null) {
                stmt.close();
                stmt = null;
            }
        } catch (SQLException var3) {
        }

    }

    public void removeStatementFromIndex(Integer statementNum) {
        this.stmtTracksMap.remove(statementNum);
    }

    /** @deprecated */
    @Deprecated
    public CallableStatement prepareCall(String sql) throws SQLException {
        return this.getRealConnection().prepareCall(sql);
    }

    public String nativeSQL(String sql) throws SQLException {
        return this.getRealConnection().nativeSQL(sql);
    }

    public boolean getAutoCommit() throws SQLException {
        return this.getRealConnection().getAutoCommit();
    }

    public boolean isClosed() {
        return this.status != ConnectionStatus.INUSE;
    }

    public DatabaseMetaData getMetaData() throws SQLException {
        return this.getRealConnection().getMetaData();
    }

    public void setReadOnly(boolean readOnly) throws SQLException {
        this.getRealConnection().setReadOnly(readOnly);
    }

    public boolean isReadOnly() throws SQLException {
        return this.getRealConnection().isReadOnly();
    }

    public void setCatalog(String catalog) throws SQLException {
        this.getRealConnection().setCatalog(catalog);
    }

    public String getCatalog() throws SQLException {
        return this.getRealConnection().getCatalog();
    }

    public void setTransactionIsolation(int level) throws SQLException {
        this.getRealConnection().setTransactionIsolation(level);
    }

    public int getTransactionIsolation() throws SQLException {
        return this.getRealConnection().getTransactionIsolation();
    }

    public SQLWarning getWarnings() throws SQLException {
        return this.getRealConnection().getWarnings();
    }

    public void clearWarnings() throws SQLException {
        this.updateActivetime();
        this.getRealConnection().clearWarnings();
    }

    /** @deprecated */
    @Deprecated
    public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
        throw new UnsupportedOperationException();
    }

    /** @deprecated */
    @Deprecated
    public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
        throw new UnsupportedOperationException();
    }

    public Map<String, Class<?>> getTypeMap() throws SQLException {
        return this.getRealConnection().getTypeMap();
    }

    public void setTypeMap(Map<String, Class<?>> map) throws SQLException {
        this.getRealConnection().setTypeMap(map);
    }

    public void setHoldability(int holdability) throws SQLException {
        this.getRealConnection().setHoldability(holdability);
    }

    public int getHoldability() throws SQLException {
        return this.getRealConnection().getHoldability();
    }

    public Savepoint setSavepoint() throws SQLException {
        return this.getRealConnection().setSavepoint();
    }

    public Savepoint setSavepoint(String name) throws SQLException {
        return this.getRealConnection().setSavepoint(name);
    }

    public void releaseSavepoint(Savepoint savepoint) throws SQLException {
        this.getRealConnection().releaseSavepoint(savepoint);
        this.updateActivetime();
    }

    /** @deprecated */
    @Deprecated
    public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
        throw new UnsupportedOperationException();
    }

    /** @deprecated */
    @Deprecated
    public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
        throw new UnsupportedOperationException();
    }

    /** @deprecated */
    @Deprecated
    public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
        throw new UnsupportedOperationException();
    }

    /** @deprecated */
    @Deprecated
    public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException {
        throw new UnsupportedOperationException();
    }

    /** @deprecated */
    @Deprecated
    public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException {
        throw new UnsupportedOperationException();
    }

    /** @deprecated */
    @Deprecated
    public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException {
        throw new UnsupportedOperationException();
    }

    public Clob createClob() throws SQLException {
        this.updateActivetime();
        return this.getRealConnection().createClob();
    }

    public Blob createBlob() throws SQLException {
        this.updateActivetime();
        return this.getRealConnection().createBlob();
    }

    public NClob createNClob() throws SQLException {
        this.updateActivetime();
        return this.getRealConnection().createNClob();
    }

    public SQLXML createSQLXML() throws SQLException {
        this.updateActivetime();
        return this.getRealConnection().createSQLXML();
    }

    public boolean isValid(int timeout) throws SQLException {
        return this.getRealConnection().isValid(timeout);
    }

    public void setClientInfo(String name, String value) throws SQLClientInfoException {
        this.getRealConnection().setClientInfo(name, value);
    }

    public void setClientInfo(Properties properties) throws SQLClientInfoException {
        this.getRealConnection().setClientInfo(properties);
    }

    public Properties getClientInfo() throws SQLException {
        return this.getRealConnection().getClientInfo();
    }

    public String getClientInfo(String name) throws SQLException {
        return this.getRealConnection().getClientInfo(name);
    }

    public Array createArrayOf(String typeName, Object[] elements) throws SQLException {
        this.updateActivetime();
        return this.getRealConnection().createArrayOf(typeName, elements);
    }

    public Struct createStruct(String typeName, Object[] attributes) throws SQLException {
        this.updateActivetime();
        return this.getRealConnection().createStruct(typeName, attributes);
    }

    public long getActiveTime() {
        return this.activeTime;
    }

    public void setActiveTime(long activeTime) {
        this.activeTime = activeTime;
    }

    private void updateActivetime() {
        this.activeTime = System.currentTimeMillis();
    }

    public boolean validate() {
        boolean m_useAble = false;

        try {
            if (ValidateTool.validate(this, this.ds.getDatabaseType())) {
                m_useAble = true;
            } else {
                m_useAble = false;
            }
        } catch (Exception var3) {
            log.warn("This connection:\"" + this.realConnection + "\" is not usable, it will be removed!", var3);
            m_useAble = false;
        }

        return m_useAble;
    }

    public String getDataSourceName() {
        return this.ds.getDataSourceName();
    }

    public long getLastInUseTime() {
        return this.lastInUseTime;
    }

    public void changeStates(ConnectionStatus newStatus) {
        StatusHandlerFactory.getStatusHandler(this.getStatus(), this).changeStatus(newStatus);
    }

    ConnectionExeInfo getLastConnectionExeInfo() {
        return this.lastRunTimeInfo;
    }

    ConnectionExeInfo getCurConnectionExeInfo() {
        return this.curRunTimeInfo;
    }

    void setLastRunTimeInfo(ConnectionExeInfo lastRunTimeInfo) {
        this.lastRunTimeInfo = lastRunTimeInfo;
    }

    void setCurRunTimeInfo(ConnectionExeInfo curRunTimeInfo) {
        this.curRunTimeInfo = curRunTimeInfo;
        if (curRunTimeInfo != null) {
            curRunTimeInfo.setConnID(this.connID);
        }

    }

    public String toString() {
        return this.ds.getDataSourceName() + "::" + this.connID;
    }

    public String getCurConnectionExeInfoInfo() {
        return this.getCurConnectionExeInfo() != null ? this.getCurConnectionExeInfo().getInfo() : null;
    }

    long getFreeTime() {
        return this.getStatus() != ConnectionStatus.FREE ? 0L : this.curRunTimeInfo.getFreeTime();
    }

    void realDestroyConn() throws SQLException {
        this.getRealConnection().close();
    }

    public void setSchema(String schema) throws SQLException {
    }

    public String getSchema() throws SQLException {
        return null;
    }

    public void abort(Executor executor) throws SQLException {
    }

    public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException {
    }

    public int getNetworkTimeout() throws SQLException {
        return 0;
    }
}

能夠看到裏面 記錄了真正的conn對象 重寫了 commit等方法 邏輯 不進行真實提交, 同時記錄了SPR的信息 好比sql記錄:3d

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//

package uap.mw.ds.monitor;

import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Iterator;
import java.util.LinkedList;
import javax.transaction.Transaction;
import nc.bs.logging.Logger;
import uap.mw.ds.util.DSUtil;
import uap.mw.trans.UAPTransaction;
import uap.mw.trans.util.TransUtil;

public class ConnectionExeInfo {
    private Thread curThread = null;
    private String transactionID;
    private long startTime = -1L;
    private long endTime = -1L;
    private LinkedList<SQLExeInfo> historySql = new ConnectionExeInfo.SQLExeList();
    private String preSQL = "";
    private boolean isClosed = false;
    private String connID = null;
    private Throwable connTrace = null;

    public ConnectionExeInfo() {
        this.connTrace = new Exception();
    }

    public void startConn() {
        this.startTime = System.currentTimeMillis();
        Transaction trans = TransUtil.getCurTransactionInfo();
        if (trans instanceof UAPTransaction) {
            this.transactionID = ((UAPTransaction)trans).getKey();
        }

        this.curThread = Thread.currentThread();
    }

    public void startSQL(String sql, String[] params) {
        this.historySql.add(new SQLExeInfo(sql, params));
    }

    public void endSQL(String sql, String[] params) {
        ((SQLExeInfo)this.historySql.peek()).endSql(sql);
    }

    public void preSQL(String sql) {
        this.preSQL = sql;
    }

    public void addBatch(String[] params) {
        ((SQLExeInfo)this.historySql.peek()).addBatch(params);
    }

    public void startPreSQL() {
        this.historySql.add(new SQLExeInfo(this.preSQL, true));
    }

    public void endPreSQL() {
        ((SQLExeInfo)this.historySql.peek()).endSql(this.preSQL);
    }

    public void endConn() {
        this.endTime = System.currentTimeMillis();
        this.isClosed = true;
        this.curThread = null;
    }

    public long getConnUseTime() {
        return this.endTime < 0L ? System.currentTimeMillis() - this.startTime : this.endTime - this.startTime;
    }

    public Thread getUseThread() {
        return this.curThread;
    }

    public String getTransactionID() {
        return this.transactionID;
    }

    public String getConnID() {
        return this.connID;
    }

    public void setConnID(String connID) {
        this.connID = connID;
    }

    public String getInfo() {
        StringBuffer info = new StringBuffer("");
        info.append("connID::" + this.getConnID() + DSUtil.getBrTag());
        info.append("curThread::" + this.getUseThread() + DSUtil.getBrTag());
        Logger.error("", this.connTrace);
        info.append("ThreadDump::" + this.getStackTrace(this.connTrace));
        info.append("transactionID::" + this.getTransactionID() + DSUtil.getBrTag());
        info.append("isClosed::" + this.isClosed + DSUtil.getBrTag());
        info.append("ConnUseTime::" + this.getConnUseTime() + DSUtil.getBrTag());
        Iterator i$ = this.historySql.iterator();

        while(i$.hasNext()) {
            SQLExeInfo sqlInfo = (SQLExeInfo)i$.next();
            info.append(sqlInfo.toString());
        }

        return info.toString();
    }

    private String getStackTrace(Throwable e) {
        StringWriter sWriter = new StringWriter();
        PrintWriter writer = new PrintWriter(sWriter);

        String var4;
        try {
            e.printStackTrace(writer);
            var4 = sWriter.toString();
        } finally {
            writer.close();
        }

        return var4;
    }

    public long getFreeTime() {
        return this.isClosed ? System.currentTimeMillis() - this.endTime : 0L;
    }

    public Throwable getConnTrace() {
        return this.connTrace;
    }

    public void setConnTrace(Throwable connTrace) {
        this.connTrace = connTrace;
    }

    class SQLExeList<T> extends LinkedList<T> {
        private static final long serialVersionUID = 1L;

        SQLExeList() {
        }

        public boolean add(T e) {
            if (this.size() > 30) {
                this.remove();
            }

            return super.add(e);
        }
    }
}

那麼SPR如何記錄SQL歷史呢:代理

在NC最底層的JDBC查詢實現 nc.jdbc.framework.JdbcSession#executeQuery(java.lang.String, nc.jdbc.framework.processor.ResultSetProcessor) 方法 咱們發現了一個關鍵調用:rs = statement.executeQuery(sql);  而其中 statement是uap.mw.ds.DBConnection#createStatement() 建立的: uap.mw.ds.UAPStatement對象

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//

package uap.mw.ds;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import uap.mw.ds.monitor.ConnectionExeInfo;

public class UAPStatement extends UAPStatementSkelecton {
    private ConnectionExeInfo runTimeInfo = null;

    UAPStatement(Statement ps, ConnectionExeInfo runTimeInfo) {
        super(ps);
        this.runTimeInfo = runTimeInfo;
    }

    public int executeUpdate(String sql) throws SQLException {
        this.runTimeInfo.startSQL(sql, (String[])null);
        boolean var2 = false;

        int num;
        try {
            num = super.executeUpdate(sql);
        } finally {
            this.runTimeInfo.endSQL(sql, (String[])null);
        }

        return num;
    }

    public ResultSet executeQuery(String sql) throws SQLException {
        this.runTimeInfo.startSQL(sql, (String[])null);
        ResultSet result = null;

        try {
            result = super.executeQuery(sql);
        } finally {
            this.runTimeInfo.endSQL(sql, (String[])null);
        }

        return result;
    }
}

就此 謎題解開, 這個類裏 直接調用了  this.runTimeInfo.startSQL(sql, (String[])null); 方法記錄了你的SQL,其餘SQL同理。

咱們繼續研究 事務開啓的軌跡,接着上面 咱們發現 事務是調用了  this.tranStack.push(tranText); 方法,這個是一個 鏈表 存儲的是 每個接口方法調用的事務上下文,一個被重寫的內部類 咱們看看push方法:

class UAPTransactionManager implements IUAPTransactionManager, TransactionManager {
    private static Log log = Log.getInstance(IerpTransactionManager.class);
    private Thread curThread;
    private int m_nTranTimeOut;
    private int m_tranProp;
    private LinkedList<UAPTransactionContext> tranStack = new LinkedList<UAPTransactionContext>() {
        public UAPTransactionContext pop() {
            UAPTransactionContext curContext = (UAPTransactionContext)super.pop();
            if (curContext.getTransType() == TransactionContextType.SOURCE || curContext.getTransType() == TransactionContextType.NULL) {
                UAPTransactionManager.this.resumeCurTrans();
            }

            return curContext;
        }

        public void push(UAPTransactionContext item) {
            if (item.getTransType() == TransactionContextType.SOURCE || item.getTransType() == TransactionContextType.NULL) {
                UAPTransactionManager.this.suspendCurTrans();
            }

            super.push(item);
        }
    };

  UAPTransactionManager.this.suspendCurTrans();  繼續看。。。

private void suspendCurTrans() {
        if (!this.tranStack.isEmpty()) {
            UAPTransactionContext transContext = (UAPTransactionContext)this.tranStack.peek();
            UAPTransaction uapTran = (UAPTransaction)transContext.getTransaction();
            uapTran.suspendAllConnection();
        }

    }

由於最後一個事務上下文是push的,因此是壓入頂部,因此這裏是peek 獲取但不移除頂部第一個元素,也就是 後進先出原則。

拿到後 用最後一次事務上下文 暫停上下文持有的全部數據庫連接。

public void suspendAllConnection() {
        Iterator i$ = this.getConnectionMap().values().iterator();

        while(i$.hasNext()) {
            DBConnection conn = (DBConnection)i$.next();
            conn.changeStates(ConnectionStatus.INUSE_SUSPEND);
        }

    }

其實就是拿到全部鏈接 進行狀態記錄下, 那麼問題就是 這個連接集合他是如何維護的,只要知道了 他什麼時候把db的連接對象放入這個map 咱們就知道了 他事務管理的實現原理,由於他最後無非就是 經過惟一的線程事務管理器 拿到全部事務上下文 根據事務上下文是否子事務 依次 循環裏面的db連接集合(上面的DBConnect對象) 對他們集體進行 提交或回滾(調用 realCommit realRollback方法 而後觸發調用 真正的jdbc conn對象的對應方法):

進過調試咱們發現 在 uap.mw.trans.UAPTransaction#enlistConnResource 這個方法提供了 當前事務上下文(棧頂的事務上下文)對db連接 存入到上面的db連接集合裏, 觸發方法調用在:

/**
	 * 構造默認JdbcSession該JdbcSession會默認從當前訪問的DataSource獲得鏈接
	 */
	public JdbcSession() throws DbException {
		try {
			Connection con = ConnectionFactory.getConnection();
			dbType = DBUtil.getDbType(con);
			// dbType = DataSourceCenter.getInstance().getDatabaseType();
			this.conn = con;
		} catch (SQLException e) {
			throw ExceptionFactory.getException(dbType, e.getMessage(), e);
		}

	}

獲取連接調用的是 nc.jdbc.framework.DataSourceCenter 的         Connection dummy = DataSourceCenter.getInstance().getConnection();

走到了 uap.mw.ds.UAPDataSource#getTransConnection 方法 後調用  

private DBConnection getTransConnection() throws SQLException {
        IUAPTransactionManager curTranManager = UAPTransactionManagerProxy.getCurTransManager();
        UAPTransaction curTrans = (UAPTransaction)curTranManager.getTransaction();
        DBConnection conn = curTrans.getConnectionFromCurTrans(this.dataSourceName);
        if (conn == null) {
            conn = this.connPool.getConnection();
            if (conn != null) {
                this.setConnToUse(conn, true, curTrans, curTranManager);
            }
        }

        return conn;
    }

而後:

private void setConnToUse(DBConnection conn, boolean isTrans, UAPTransaction curTrans, IUAPTransactionManager curTranManager) throws SQLException {
        try {
            conn.changeStates(ConnectionStatus.INUSE);
            if (isTrans) {
                curTrans.enlistConnResource(this.dataSourceName, conn);
                conn.setAutoCommit(false);
            }

        } catch (Exception var6) {
            conn.setStatus(ConnectionStatus.NEED_DESTROY);
            conn.realDestroyConn();
            this.connPool.releasePhore();
            DBLogger.error("set Conn State error!", var6);
            throw new SQLException("set Conn State error!!");
        }
    }

 

 

至此,咱們就已經瞭解了 整個NC事務的實現準備。 那麼最後 就是 事務管理器的 commit和rollback的代碼了:

public void uap.mw.trans.UAPTransactionManagerProxy#end(Exception ex) {
        IUAPTransactionManager m_tranManager = (IUAPTransactionManager)tm_local.get();

        try {
            if (ex != null) {
                if (m_tranManager.getTranContext().needRBPoint()) {
                    if (!((UAPTransaction)((UAPTransaction)m_tranManager.getTranContext().getTransaction())).getRollbackOnly()) {
                        m_tranManager.rollBackToCurInvokePoint();
                        return;
                    }
                } else {
                    m_tranManager.setCurTransRollBack();
                }
            }

            m_tranManager.commit();
        } catch (Exception var4) {
            log.error("", var4);
        }

    }

而後根據是否有異常決定直接 commit 仍是 rollback到某個檢查點 也就是 當前棧頂的新獨立子事務!

public void uap.mw.trans.UAPTransactionManager#rollBackToCurInvokePoint() throws SQLException {
        UAPTransactionContext curTransContext = (UAPTransactionContext)this.tranStack.pop();
        UAPTransaction trans = (UAPTransaction)this.getTransaction();
        Map<String, Savepoint> savePointMap = curTransContext.getSavePointMap();
        Map<String, DBConnection> connMap = trans.getConnectionMap();
        Iterator i$ = connMap.keySet().iterator();

        while(i$.hasNext()) {
            String dsName = (String)i$.next();
            DBConnection conn = (DBConnection)connMap.get(dsName);
            Savepoint sPoint = (Savepoint)savePointMap.get(dsName);
            if (sPoint != null) {
                conn.realRollback(sPoint);
            } else {
                conn.realRollback();
            }
        }

    }
public void setCurTransRollBack() throws IllegalStateException, SystemException {
        UAPTransactionContext context = this.getTranContext();
        Transaction trans = context.getTransaction();
        if (trans != null) {
            trans.setRollbackOnly();
        }

    }

 

最終的提交實現:

public void uap.mw.trans.UAPTransactionManager#commit() throws RollbackException, IllegalStateException, SystemException, HeuristicRollbackException, SecurityException, HeuristicMixedException {
        UAPTransactionContext curTransContext = (UAPTransactionContext)this.tranStack.pop();
        if (curTransContext != null && !curTransContext.isNullTrans() && !curTransContext.isJoined()) {
            try {
                Transaction curTran = curTransContext.getTransaction();
                List<Synchronization> synList = ((UAPTransaction)curTran).getSynchronization();
                Iterator i$;
                Synchronization s;
                if (synList != null && synList.size() > 0) {
                    i$ = synList.iterator();

                    while(i$.hasNext()) {
                        s = (Synchronization)i$.next();
                        s.beforeCompletion();
                    }
                }

                try {
                    curTran.commit();
                } catch (Exception var9) {
                    log.error("commit error", var9);
                }

                if (synList != null && synList.size() > 0) {
                    i$ = synList.iterator();

                    while(i$.hasNext()) {
                        s = (Synchronization)i$.next();
                        s.afterCompletion(curTran.getStatus());
                    }
                }
            } finally {
                if (this.tranStack.isEmpty()) {
                    this.removeCurTransManager();
                }

            }
        }

    }
public void uap.mw.trans.UAPTransaction#commit() throws SecurityException, SystemException, RollbackException, HeuristicMixedException, HeuristicRollbackException {
        if (this.m_bRollbackOnly) {
            this.rollback();
        } else {
            this.m_status = 8;
            boolean hasException = false;

            for(Iterator i$ = this.connecionMap.values().iterator(); i$.hasNext(); this.m_status = 3) {
                DBConnection conn = (DBConnection)i$.next();

                try {
                    conn.realCommit();
                } catch (Exception var13) {
                    DBLogger.error(var13.getMessage() + ":" + conn.getDataSourceName(), var13);
                    hasException = true;
                } finally {
                    try {
                        conn.backPool();
                    } catch (SQLException var12) {
                        DBLogger.error(var12.getMessage(), var12);
                    }

                }
            }

            this.connecionMap.clear();
            this.transEnd();
            if (hasException) {
                throw new SystemException("commit error,Please check log");
            }
        }

    }
相關文章
相關標籤/搜索