tomcat提供了JdbcInterceptor能夠用來監控jdbc的執行狀況,默認提供了好幾個現成的interceptor能夠用,SlowQueryReport以及SlowQueryReportJmx就是其中的兩個。java
/** * Abstract class that is to be extended for implementations of interceptors. * Everytime an operation is called on the {@link java.sql.Connection} object the * {@link #invoke(Object, Method, Object[])} method on the interceptor will be called. * Interceptors are useful to change or improve behavior of the connection pool.<br> * Interceptors can receive a set of properties. Each sub class is responsible for parsing the properties during runtime when they * are needed or simply override the {@link #setProperties(Map)} method. * Properties arrive in a key-value pair of Strings as they were received through the configuration. * This method is called once per cached connection object when the object is first configured. * * @version 1.0 */ public abstract class JdbcInterceptor implements InvocationHandler { /** * {@link java.sql.Connection#close()} method name */ public static final String CLOSE_VAL = "close"; /** * {@link Object#toString()} method name */ public static final String TOSTRING_VAL = "toString"; /** * {@link java.sql.Connection#isClosed()} method name */ public static final String ISCLOSED_VAL = "isClosed"; /** * {@link javax.sql.PooledConnection#getConnection()} method name */ public static final String GETCONNECTION_VAL = "getConnection"; /** * {@link java.sql.Wrapper#unwrap(Class)} method name */ public static final String UNWRAP_VAL = "unwrap"; /** * {@link java.sql.Wrapper#isWrapperFor(Class)} method name */ public static final String ISWRAPPERFOR_VAL = "isWrapperFor"; /** * {@link java.sql.Connection#isValid(int)} method name */ public static final String ISVALID_VAL = "isValid"; /** * {@link java.lang.Object#equals(Object)} */ public static final String EQUALS_VAL = "equals"; /** * {@link java.lang.Object#hashCode()} */ public static final String HASHCODE_VAL = "hashCode"; /** * Properties for this interceptor. */ protected Map<String,InterceptorProperty> properties = null; /** * The next interceptor in the chain */ private volatile JdbcInterceptor next = null; /** * Property that decides how we do string comparison, default is to use * {@link String#equals(Object)}. If set to <code>false</code> then the * equality operator (==) is used. */ private boolean useEquals = true; /** * Public constructor for instantiation through reflection */ public JdbcInterceptor() { // NOOP } /** * Gets invoked each time an operation on {@link java.sql.Connection} is invoked. * {@inheritDoc} */ @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { if (getNext()!=null) return getNext().invoke(proxy,method,args); else throw new NullPointerException(); } /** * Returns the next interceptor in the chain * @return the next interceptor in the chain */ public JdbcInterceptor getNext() { return next; } /** * configures the next interceptor in the chain * @param next The next chain item */ public void setNext(JdbcInterceptor next) { this.next = next; } /** * Performs a string comparison, using references unless the useEquals property is set to true. * @param name1 The first name * @param name2 The second name * @return true if name1 is equal to name2 based on {@link #useEquals} */ public boolean compare(String name1, String name2) { if (isUseEquals()) { return name1.equals(name2); } else { return name1==name2; } } /** * Compares a method name (String) to a method (Method) * {@link #compare(String,String)} * Uses reference comparison unless the useEquals property is set to true * @param methodName The method name * @param method The method * @return <code>true</code> if the name matches */ public boolean compare(String methodName, Method method) { return compare(methodName, method.getName()); } /** * Gets called each time the connection is borrowed from the pool * This means that if an interceptor holds a reference to the connection * the interceptor can be reused for another connection. * <br> * This method may be called with null as both arguments when we are closing down the connection. * @param parent - the connection pool owning the connection * @param con - the pooled connection */ public abstract void reset(ConnectionPool parent, PooledConnection con); /** * Called when {@link java.sql.Connection#close()} is called on the underlying connection. * This is to notify the interceptors, that the physical connection has been released. * Implementation of this method should be thought through with care, as no actions should trigger an exception. * @param parent - the connection pool that this connection belongs to * @param con - the pooled connection that holds this connection * @param finalizing - if this connection is finalizing. True means that the pooled connection will not reconnect the underlying connection */ public void disconnected(ConnectionPool parent, PooledConnection con, boolean finalizing) { } /** * Returns the properties configured for this interceptor * @return the configured properties for this interceptor */ public Map<String,InterceptorProperty> getProperties() { return properties; } /** * Called during the creation of an interceptor * The properties can be set during the configuration of an interceptor * Override this method to perform type casts between string values and object properties * @param properties The properties */ public void setProperties(Map<String,InterceptorProperty> properties) { this.properties = properties; final String useEquals = "useEquals"; InterceptorProperty p = properties.get(useEquals); if (p!=null) { setUseEquals(Boolean.parseBoolean(p.getValue())); } } /** * @return true if the compare method uses the Object.equals(Object) method * false if comparison is done on a reference level */ public boolean isUseEquals() { return useEquals; } /** * Set to true if string comparisons (for the {@link #compare(String, Method)} and {@link #compare(String, String)} methods) should use the Object.equals(Object) method * The default is false * @param useEquals <code>true</code> if equals will be used for comparisons */ public void setUseEquals(boolean useEquals) { this.useEquals = useEquals; } /** * This method is invoked by a connection pool when the pool is closed. * Interceptor classes can override this method if they keep static * variables or other tracking means around. * <b>This method is only invoked on a single instance of the interceptor, and not on every instance created.</b> * @param pool - the pool that is being closed. */ public void poolClosed(ConnectionPool pool) { // NOOP } /** * This method is invoked by a connection pool when the pool is first started up, usually when the first connection is requested. * Interceptor classes can override this method if they keep static * variables or other tracking means around. * <b>This method is only invoked on a single instance of the interceptor, and not on every instance created.</b> * @param pool - the pool that is being closed. */ public void poolStarted(ConnectionPool pool) { // NOOP } }
能夠看到它實現了InvocationHandler這個接口,也就是使用的是java內置的動態代理技術,主要是由於jdbc自己就是面向接口編程的,於是用java內置的動態代理是水到渠成的。sql
tomcat-jdbc-8.5.11-sources.jar!/org/apache/tomcat/jdbc/pool/ConnectionPool.javaapache
/** * configures a pooled connection as a proxy. * This Proxy implements {@link java.sql.Connection} and {@link javax.sql.PooledConnection} interfaces. * All calls on {@link java.sql.Connection} methods will be propagated down to the actual JDBC connection except for the * {@link java.sql.Connection#close()} method. * @param con a {@link PooledConnection} to wrap in a Proxy * @return a {@link java.sql.Connection} object wrapping a pooled connection. * @throws SQLException if an interceptor can't be configured, if the proxy can't be instantiated */ protected Connection setupConnection(PooledConnection con) throws SQLException { //fetch previously cached interceptor proxy - one per connection JdbcInterceptor handler = con.getHandler(); if (handler==null) { //build the proxy handler handler = new ProxyConnection(this,con,getPoolProperties().isUseEquals()); //set up the interceptor chain PoolProperties.InterceptorDefinition[] proxies = getPoolProperties().getJdbcInterceptorsAsArray(); for (int i=proxies.length-1; i>=0; i--) { try { //create a new instance JdbcInterceptor interceptor = proxies[i].getInterceptorClass().newInstance(); //configure properties interceptor.setProperties(proxies[i].getProperties()); //setup the chain interceptor.setNext(handler); //call reset interceptor.reset(this, con); //configure the last one to be held by the connection handler = interceptor; }catch(Exception x) { SQLException sx = new SQLException("Unable to instantiate interceptor chain."); sx.initCause(x); throw sx; } } //cache handler for the next iteration con.setHandler(handler); } else { JdbcInterceptor next = handler; //we have a cached handler, reset it while (next!=null) { next.reset(this, con); next = next.getNext(); } } try { getProxyConstructor(con.getXAConnection() != null); //create the proxy //TODO possible optimization, keep track if this connection was returned properly, and don't generate a new facade Connection connection = null; if (getPoolProperties().getUseDisposableConnectionFacade() ) { connection = (Connection)proxyClassConstructor.newInstance(new Object[] { new DisposableConnectionFacade(handler) }); } else { connection = (Connection)proxyClassConstructor.newInstance(new Object[] {handler}); } //return the connection return connection; }catch (Exception x) { SQLException s = new SQLException(); s.initCause(x); throw s; } }
這裏判斷有沒有interceptor,有的話,則建立ProxyConnection,而後構造interceptor的鏈編程
public class ProxyConnection extends JdbcInterceptor { protected PooledConnection connection = null; protected ConnectionPool pool = null; public PooledConnection getConnection() { return connection; } public void setConnection(PooledConnection connection) { this.connection = connection; } public ConnectionPool getPool() { return pool; } public void setPool(ConnectionPool pool) { this.pool = pool; } protected ProxyConnection(ConnectionPool parent, PooledConnection con, boolean useEquals) { pool = parent; connection = con; setUseEquals(useEquals); } @Override public void reset(ConnectionPool parent, PooledConnection con) { this.pool = parent; this.connection = con; } public boolean isWrapperFor(Class<?> iface) { if (iface == XAConnection.class && connection.getXAConnection()!=null) { return true; } else { return (iface.isInstance(connection.getConnection())); } } public Object unwrap(Class<?> iface) throws SQLException { if (iface == PooledConnection.class) { return connection; }else if (iface == XAConnection.class) { return connection.getXAConnection(); } else if (isWrapperFor(iface)) { return connection.getConnection(); } else { throw new SQLException("Not a wrapper of "+iface.getName()); } } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { if (compare(ISCLOSED_VAL,method)) { return Boolean.valueOf(isClosed()); } if (compare(CLOSE_VAL,method)) { if (connection==null) return null; //noop for already closed. PooledConnection poolc = this.connection; this.connection = null; pool.returnConnection(poolc); return null; } else if (compare(TOSTRING_VAL,method)) { return this.toString(); } else if (compare(GETCONNECTION_VAL,method) && connection!=null) { return connection.getConnection(); } else if (method.getDeclaringClass().equals(XAConnection.class)) { try { return method.invoke(connection.getXAConnection(),args); }catch (Throwable t) { if (t instanceof InvocationTargetException) { throw t.getCause() != null ? t.getCause() : t; } else { throw t; } } } if (isClosed()) throw new SQLException("Connection has already been closed."); if (compare(UNWRAP_VAL,method)) { return unwrap((Class<?>)args[0]); } else if (compare(ISWRAPPERFOR_VAL,method)) { return Boolean.valueOf(this.isWrapperFor((Class<?>)args[0])); } try { PooledConnection poolc = connection; if (poolc!=null) { return method.invoke(poolc.getConnection(),args); } else { throw new SQLException("Connection has already been closed."); } }catch (Throwable t) { if (t instanceof InvocationTargetException) { throw t.getCause() != null ? t.getCause() : t; } else { throw t; } } } public boolean isClosed() { return connection==null || connection.isDiscarded(); } public PooledConnection getDelegateConnection() { return connection; } public ConnectionPool getParentPool() { return pool; } @Override public String toString() { return "ProxyConnection["+(connection!=null?connection.toString():"null")+"]"; } }
ProxyConnection自己就是JdbcInterceptor,包裝了PooledConnectiontomcat
這個是JdbcInterceptor的一個比較重要的擴展,SlowQueryReport就是基於這個擴展的。這個定義了一些抽象方法供子類實現。app
/** * Abstraction interceptor. This component intercepts all calls to create some type of SQL statement. * By extending this class, one can intercept queries and update statements by overriding the {@link #createStatement(Object, Method, Object[], Object, long)} * method. * @version 1.0 */ public abstract class AbstractCreateStatementInterceptor extends JdbcInterceptor { protected static final String CREATE_STATEMENT = "createStatement"; protected static final int CREATE_STATEMENT_IDX = 0; protected static final String PREPARE_STATEMENT = "prepareStatement"; protected static final int PREPARE_STATEMENT_IDX = 1; protected static final String PREPARE_CALL = "prepareCall"; protected static final int PREPARE_CALL_IDX = 2; protected static final String[] STATEMENT_TYPES = {CREATE_STATEMENT, PREPARE_STATEMENT, PREPARE_CALL}; protected static final int STATEMENT_TYPE_COUNT = STATEMENT_TYPES.length; protected static final String EXECUTE = "execute"; protected static final String EXECUTE_QUERY = "executeQuery"; protected static final String EXECUTE_UPDATE = "executeUpdate"; protected static final String EXECUTE_BATCH = "executeBatch"; protected static final String[] EXECUTE_TYPES = {EXECUTE, EXECUTE_QUERY, EXECUTE_UPDATE, EXECUTE_BATCH}; public AbstractCreateStatementInterceptor() { super(); } /** * {@inheritDoc} */ @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { if (compare(CLOSE_VAL,method)) { closeInvoked(); return super.invoke(proxy, method, args); } else { boolean process = false; process = isStatement(method, process); if (process) { long start = System.currentTimeMillis(); Object statement = super.invoke(proxy,method,args); long delta = System.currentTimeMillis() - start; return createStatement(proxy,method,args,statement, delta); } else { return super.invoke(proxy,method,args); } } } /** * This method will be invoked after a successful statement creation. This method can choose to return a wrapper * around the statement or return the statement itself. * If this method returns a wrapper then it should return a wrapper object that implements one of the following interfaces. * {@link java.sql.Statement}, {@link java.sql.PreparedStatement} or {@link java.sql.CallableStatement} * @param proxy the actual proxy object * @param method the method that was called. It will be one of the methods defined in {@link #STATEMENT_TYPES} * @param args the arguments to the method * @param statement the statement that the underlying connection created * @param time Elapsed time * @return a {@link java.sql.Statement} object */ public abstract Object createStatement(Object proxy, Method method, Object[] args, Object statement, long time); /** * Method invoked when the operation {@link java.sql.Connection#close()} is invoked. */ public abstract void closeInvoked(); /** * Returns true if the method that is being invoked matches one of the statement types. * * @param method the method being invoked on the proxy * @param process boolean result used for recursion * @return returns true if the method name matched */ protected boolean isStatement(Method method, boolean process){ return process(STATEMENT_TYPES, method, process); } /** * Returns true if the method that is being invoked matches one of the execute types. * * @param method the method being invoked on the proxy * @param process boolean result used for recursion * @return returns true if the method name matched */ protected boolean isExecute(Method method, boolean process){ return process(EXECUTE_TYPES, method, process); } /* * Returns true if the method that is being invoked matches one of the method names passed in * @param names list of method names that we want to intercept * @param method the method being invoked on the proxy * @param process boolean result used for recursion * @return returns true if the method name matched */ protected boolean process(String[] names, Method method, boolean process) { final String name = method.getName(); for (int i=0; (!process) && i<names.length; i++) { process = compare(names[i],name); } return process; } /** * no-op for this interceptor. no state is stored. */ @Override public void reset(ConnectionPool parent, PooledConnection con) { // NOOP } }
主要實現了createStatement方法:less
/** * Creates a statement interceptor to monitor query response times */ @Override public Object createStatement(Object proxy, Method method, Object[] args, Object statement, long time) { try { Object result = null; String name = method.getName(); String sql = null; Constructor<?> constructor = null; if (compare(CREATE_STATEMENT,name)) { //createStatement constructor = getConstructor(CREATE_STATEMENT_IDX,Statement.class); }else if (compare(PREPARE_STATEMENT,name)) { //prepareStatement sql = (String)args[0]; constructor = getConstructor(PREPARE_STATEMENT_IDX,PreparedStatement.class); if (sql!=null) { prepareStatement(sql, time); } }else if (compare(PREPARE_CALL,name)) { //prepareCall sql = (String)args[0]; constructor = getConstructor(PREPARE_CALL_IDX,CallableStatement.class); prepareCall(sql,time); }else { //do nothing, might be a future unsupported method //so we better bail out and let the system continue return statement; } result = constructor.newInstance(new Object[] { new StatementProxy(statement,sql) }); return result; }catch (Exception x) { log.warn("Unable to create statement proxy for slow query report.",x); } return statement; }
這裏一樣適用了jdk的動態代理,包裝了statementide
/** * Class to measure query execute time * */ protected class StatementProxy implements InvocationHandler { protected boolean closed = false; protected Object delegate; protected final String query; public StatementProxy(Object parent, String query) { this.delegate = parent; this.query = query; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { //get the name of the method for comparison final String name = method.getName(); //was close invoked? boolean close = compare(JdbcInterceptor.CLOSE_VAL,name); //allow close to be called multiple times if (close && closed) return null; //are we calling isClosed? if (compare(JdbcInterceptor.ISCLOSED_VAL,name)) return Boolean.valueOf(closed); //if we are calling anything else, bail out if (closed) throw new SQLException("Statement closed."); boolean process = false; //check to see if we are about to execute a query process = isExecute( method, process); //if we are executing, get the current time long start = (process)?System.currentTimeMillis():0; Object result = null; try { //execute the query result = method.invoke(delegate,args); }catch (Throwable t) { reportFailedQuery(query,args,name,start,t); if (t instanceof InvocationTargetException && t.getCause() != null) { throw t.getCause(); } else { throw t; } } //measure the time long delta = (process)?(System.currentTimeMillis()-start):Long.MIN_VALUE; //see if we meet the requirements to measure if (delta>threshold) { try { //report the slow query reportSlowQuery(query, args, name, start, delta); }catch (Exception t) { if (log.isWarnEnabled()) log.warn("Unable to process slow query",t); } } else if (process) { reportQuery(query, args, name, start, delta); } //perform close cleanup if (close) { closed=true; delegate = null; } return result; } }
這裏記錄了sql的執行耗時,而後跟閾值對比,判斷是否記錄到slow queryoop