持續更新:https://github.com/dchack/Mybatis-source-code-learnjava
有這麼個定律,有鏈接的地方就有池。
在市面上,能夠適配Mybatis DateSource的鏈接池有很對,好比:git
Mybatis也自帶來鏈接池的功能,先學習下Mybatis的,相對簡單的實現。
涉及的類:
github
public class PoolState { protected PooledDataSource dataSource; // 空閒鏈接集合 protected final List<PooledConnection> idleConnections = new ArrayList<PooledConnection>(); // 正在使用的鏈接集合 protected final List<PooledConnection> activeConnections = new ArrayList<PooledConnection>(); // 請求次數,每次獲取鏈接,都會自增,用於 protected long requestCount = 0; // 累計請求耗時,每次獲取鏈接時計算累加,除以requestCount能夠得到平均耗時 protected long accumulatedRequestTime = 0; // 累計鏈接使用時間 protected long accumulatedCheckoutTime = 0; // 過時鏈接次數 protected long claimedOverdueConnectionCount = 0; protected long accumulatedCheckoutTimeOfOverdueConnections = 0; // 累計等待獲取鏈接時間 protected long accumulatedWaitTime = 0; // 等待獲取鏈接的次數 protected long hadToWaitCount = 0; // 鏈接已關閉的次數 protected long badConnectionCount = 0; public PoolState(PooledDataSource dataSource) { this.dataSource = dataSource; } public synchronized long getRequestCount() { return requestCount; } public synchronized long getAverageRequestTime() { return requestCount == 0 ? 0 : accumulatedRequestTime / requestCount; } public synchronized long getAverageWaitTime() { return hadToWaitCount == 0 ? 0 : accumulatedWaitTime / hadToWaitCount; } public synchronized long getHadToWaitCount() { return hadToWaitCount; } public synchronized long getBadConnectionCount() { return badConnectionCount; } public synchronized long getClaimedOverdueConnectionCount() { return claimedOverdueConnectionCount; } public synchronized long getAverageOverdueCheckoutTime() { return claimedOverdueConnectionCount == 0 ? 0 : accumulatedCheckoutTimeOfOverdueConnections / claimedOverdueConnectionCount; } public synchronized long getAverageCheckoutTime() { return requestCount == 0 ? 0 : accumulatedCheckoutTime / requestCount; } public synchronized int getIdleConnectionCount() { return idleConnections.size(); } public synchronized int getActiveConnectionCount() { return activeConnections.size(); } }
注意代碼中的字段都是用protected修飾的,表示pooled包內均可訪問,在寫這份代碼的時候必然默認這個包下實現一個獨立的功能,內部字段均可以共享使用,不然都寫set,get方法太麻煩了。
在PoolState
類中,不少指標好比requestCount
,claimedOverdueConnectionCount
等都不和鏈接池核心邏輯相關,純粹只是表示鏈接池的一些指標而已。
做爲鏈接池,在這裏最重要的就是兩個List:sql
synchronized
關鍵字來處理併發場景的。組成池的兩個List中存儲的是PooledConnection
,而PooledConnection
經過java動態代理機制實現代理真正Connection。
PooledConnection
繼承InvocationHandler
,因此實現了invoke
方法:數據庫
/* * Required for InvocationHandler implementation. * * @param proxy - not used * @param method - the method to be executed * @param args - the parameters to be passed to the method * @see java.lang.reflect.InvocationHandler#invoke(Object, java.lang.reflect.Method, Object[]) */ @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { String methodName = method.getName(); if (CLOSE.hashCode() == methodName.hashCode() && CLOSE.equals(methodName)) { dataSource.pushConnection(this); return null; } else { try { if (!Object.class.equals(method.getDeclaringClass())) { // issue #579 toString() should never fail // throw an SQLException instead of a Runtime checkConnection(); } return method.invoke(realConnection, args); } catch (Throwable t) { throw ExceptionUtil.unwrapThrowable(t); } } } private void checkConnection() throws SQLException { if (!valid) { throw new SQLException("Error accessing PooledConnection. Connection is invalid."); } }
主要看到這個代理實現處理了close
方法,就是將鏈接從使用列表中彈出。
對於其餘方法,會判斷方法是否屬於Object中的方法,若是不是則進行鏈接合法的校驗,而後執行真正Connection
即realConnection
中對應的方法。
得到一個代理類的代碼,即調用Proxy.newProxyInstance
方法,在PooledConnection
中的構造函數中:緩存
/* * Constructor for SimplePooledConnection that uses the Connection and PooledDataSource passed in * * @param connection - the connection that is to be presented as a pooled connection * @param dataSource - the dataSource that the connection is from */ public PooledConnection(Connection connection, PooledDataSource dataSource) { this.hashCode = connection.hashCode(); this.realConnection = connection; this.dataSource = dataSource; this.createdTimestamp = System.currentTimeMillis(); this.lastUsedTimestamp = System.currentTimeMillis(); this.valid = true; this.proxyConnection = (Connection) Proxy.newProxyInstance(Connection.class.getClassLoader(), IFACES, this); }
咱們能夠看到realConnection
是在構造函數時就傳入的了。mybatis
而配置這個池的參數都是在PooledDataSource
中:併發
官方文檔:
poolMaximumActiveConnections – 在任意時間能夠存在的活動(也就是正在使用)鏈接數量,默認值:10
poolMaximumIdleConnections – 任意時間可能存在的空閒鏈接數。
poolMaximumCheckoutTime – 在被強制返回以前,池中鏈接被檢出(checked out)時間,默認值:20000 毫秒(即 20 秒)
poolTimeToWait – 這是一個底層設置,若是獲取鏈接花費了至關長的時間,鏈接池會打印狀態日誌並從新嘗試獲取一個鏈接(避免在誤配置的狀況下一直安靜的失敗),默認值:20000 毫秒(即 20 秒)。
poolMaximumLocalBadConnectionTolerance – 這是一個關於壞鏈接容忍度的底層設置, 做用於每個嘗試從緩存池獲取鏈接的線程。 若是這個線程獲取到的是一個壞的鏈接,那麼這個數據源容許這個線程嘗試從新獲取一個新的鏈接,可是這個從新嘗試的次數不該該超過 poolMaximumIdleConnections 與 poolMaximumLocalBadConnectionTolerance 之和。 默認值:3 (新增於 3.4.5)
poolPingQuery – 發送到數據庫的偵測查詢,用來檢驗鏈接是否正常工做並準備接受請求。默認是「NO PING QUERY SET」,這會致使多數數據庫驅動失敗時帶有一個恰當的錯誤消息。
poolPingEnabled – 是否啓用偵測查詢。若開啓,須要設置 poolPingQuery 屬性爲一個可執行的 SQL 語句(最好是一個速度很是快的 SQL 語句),默認值:false。
poolPingConnectionsNotUsedFor – 配置 poolPingQuery 的頻率。能夠被設置爲和數據庫鏈接超時時間同樣,來避免沒必要要的偵測,默認值:0(即全部鏈接每一時刻都被偵測 — 固然僅當 poolPingEnabled 爲 true 時適用)。app
PooledDataSource
完成池功能的類,直接看拿鏈接的popConnection
方法:ide
private PooledConnection popConnection(String username, String password) throws SQLException { boolean countedWait = false; PooledConnection conn = null; // 觸發獲取鏈接的當前時間 long t = System.currentTimeMillis(); int localBadConnectionCount = 0; while (conn == null) { // 同步 synchronized (state) { // 判斷空閒列表中是否能夠提供鏈接 if (!state.idleConnections.isEmpty()) { // Pool has available connection conn = state.idleConnections.remove(0); if (log.isDebugEnabled()) { log.debug("Checked out connection " + conn.getRealHashCode() + " from pool."); } } else { // Pool does not have available connection // 判斷是否達到最大鏈接數限制 if (state.activeConnections.size() < poolMaximumActiveConnections) { // Can create new connection conn = new PooledConnection(dataSource.getConnection(), this); if (log.isDebugEnabled()) { log.debug("Created connection " + conn.getRealHashCode() + "."); } } else { // Cannot create new connection PooledConnection oldestActiveConnection = state.activeConnections.get(0); long longestCheckoutTime = oldestActiveConnection.getCheckoutTime(); // 判斷最老一個鏈接使用時間是否超過最大值 if (longestCheckoutTime > poolMaximumCheckoutTime) { // Can claim overdue connection state.claimedOverdueConnectionCount++; state.accumulatedCheckoutTimeOfOverdueConnections += longestCheckoutTime; state.accumulatedCheckoutTime += longestCheckoutTime; state.activeConnections.remove(oldestActiveConnection); if (!oldestActiveConnection.getRealConnection().getAutoCommit()) { try { oldestActiveConnection.getRealConnection().rollback(); } catch (SQLException e) { /* Just log a message for debug and continue to execute the following statement like nothing happend. Wrap the bad connection with a new PooledConnection, this will help to not intterupt current executing thread and give current thread a chance to join the next competion for another valid/good database connection. At the end of this loop, bad {@link @conn} will be set as null. */ log.debug("Bad connection. Could not roll back"); } } // 這裏看到將包裝在oldestActiveConnection中的RealConnection從新用PooledConnection包裝出來直接使用,看前面操做是將鏈接進行回滾,可是可能失敗,卻不關心,註釋解釋是,在後面的代碼中會進行isValid的判斷,其中就會判斷鏈接是否可用。 conn = new PooledConnection(oldestActiveConnection.getRealConnection(), this); conn.setCreatedTimestamp(oldestActiveConnection.getCreatedTimestamp()); conn.setLastUsedTimestamp(oldestActiveConnection.getLastUsedTimestamp()); // 將老鏈接設置成invalid oldestActiveConnection.invalidate(); if (log.isDebugEnabled()) { log.debug("Claimed overdue connection " + conn.getRealHashCode() + "."); } } else { // Must wait try { if (!countedWait) { state.hadToWaitCount++; countedWait = true; } if (log.isDebugEnabled()) { log.debug("Waiting as long as " + poolTimeToWait + " milliseconds for connection."); } long wt = System.currentTimeMillis(); // 線程等待,也釋放了鎖 state.wait(poolTimeToWait); state.accumulatedWaitTime += System.currentTimeMillis() - wt; } catch (InterruptedException e) { break; } } } } if (conn != null) { // ping to server and check the connection is valid or not if (conn.isValid()) { if (!conn.getRealConnection().getAutoCommit()) { conn.getRealConnection().rollback(); } conn.setConnectionTypeCode(assembleConnectionTypeCode(dataSource.getUrl(), username, password)); conn.setCheckoutTimestamp(System.currentTimeMillis()); conn.setLastUsedTimestamp(System.currentTimeMillis()); state.activeConnections.add(conn); state.requestCount++; state.accumulatedRequestTime += System.currentTimeMillis() - t; } else { if (log.isDebugEnabled()) { log.debug("A bad connection (" + conn.getRealHashCode() + ") was returned from the pool, getting another connection."); } state.badConnectionCount++; localBadConnectionCount++; // 不可用的鏈接會被設置成null,被回收器回收 conn = null; if (localBadConnectionCount > (poolMaximumIdleConnections + poolMaximumLocalBadConnectionTolerance)) { if (log.isDebugEnabled()) { log.debug("PooledDataSource: Could not get a good connection to the database."); } throw new SQLException("PooledDataSource: Could not get a good connection to the database."); } } } } } if (conn == null) { if (log.isDebugEnabled()) { log.debug("PooledDataSource: Unknown severe error condition. The connection pool returned a null connection."); } throw new SQLException("PooledDataSource: Unknown severe error condition. The connection pool returned a null connection."); } return conn; }
popConnection
方法實如今一個池中獲取鏈接的基本邏輯,依賴最大鏈接數,獲取等待時間,鏈接使用超時時間等來完成一個池的核心能力。
注意這裏使用wait
方法來等待,在java線程池中使用阻塞隊列來出來暫時拿不到資源的請求。
前面提到,在使用Connection
時,調用close
方法,會調用到dataSource.pushConnection(this);
,就是將這個鏈接使用完了還回池的動做:
protected void pushConnection(PooledConnection conn) throws SQLException { // 同樣加鎖 synchronized (state) { // 從使用線程列表中刪除 state.activeConnections.remove(conn); if (conn.isValid()) { // 判斷空閒鏈接列表是否超過最大值 if (state.idleConnections.size() < poolMaximumIdleConnections && conn.getConnectionTypeCode() == expectedConnectionTypeCode) { state.accumulatedCheckoutTime += conn.getCheckoutTime(); if (!conn.getRealConnection().getAutoCommit()) { conn.getRealConnection().rollback(); } PooledConnection newConn = new PooledConnection(conn.getRealConnection(), this); // 加入到空閒鏈接列表中 state.idleConnections.add(newConn); newConn.setCreatedTimestamp(conn.getCreatedTimestamp()); newConn.setLastUsedTimestamp(conn.getLastUsedTimestamp()); conn.invalidate(); if (log.isDebugEnabled()) { log.debug("Returned connection " + newConn.getRealHashCode() + " to pool."); } // 通知等待線程 state.notifyAll(); } else { state.accumulatedCheckoutTime += conn.getCheckoutTime(); if (!conn.getRealConnection().getAutoCommit()) { conn.getRealConnection().rollback(); } conn.getRealConnection().close(); if (log.isDebugEnabled()) { log.debug("Closed connection " + conn.getRealHashCode() + "."); } conn.invalidate(); } } else { if (log.isDebugEnabled()) { log.debug("A bad connection (" + conn.getRealHashCode() + ") attempted to return to the pool, discarding connection."); } state.badConnectionCount++; } } }
歸還鏈接時,須要查看空閒列表中的線程數量是否已經到到設置的最大值,若是已經達到,就不須要歸還了,凡是須要加入空閒列表的都須要進行notifyAll
操做,來通知那些等待的線程來搶這個歸還的鏈接,可是若是此時鏈接池中空閒鏈接充足,並無線程等待,這個操做也就浪費了,因此能夠思考前面popConnection
中的wait
和這裏的notifyAll
是能夠用等待隊列來完成。
另一個方法,用於判斷鏈接是否可用:
protected boolean pingConnection(PooledConnection conn) { boolean result = true; try { // 先用isClosed來獲取結果 result = !conn.getRealConnection().isClosed(); } catch (SQLException e) { if (log.isDebugEnabled()) { log.debug("Connection " + conn.getRealHashCode() + " is BAD: " + e.getMessage()); } result = false; } if (result) { // 能夠經過poolPingEnabled配置來決定是否使用自定義sql if (poolPingEnabled) { if (poolPingConnectionsNotUsedFor >= 0 && conn.getTimeElapsedSinceLastUse() > poolPingConnectionsNotUsedFor) { try { if (log.isDebugEnabled()) { log.debug("Testing connection " + conn.getRealHashCode() + " ..."); } Connection realConn = conn.getRealConnection(); Statement statement = realConn.createStatement(); // 執行poolPingQuery ResultSet rs = statement.executeQuery(poolPingQuery); rs.close(); statement.close(); if (!realConn.getAutoCommit()) { realConn.rollback(); } result = true; if (log.isDebugEnabled()) { log.debug("Connection " + conn.getRealHashCode() + " is GOOD!"); } } catch (Exception e) { log.warn("Execution of ping query '" + poolPingQuery + "' failed: " + e.getMessage()); try { conn.getRealConnection().close(); } catch (Exception e2) { //ignore } result = false; if (log.isDebugEnabled()) { log.debug("Connection " + conn.getRealHashCode() + " is BAD: " + e.getMessage()); } } } } } return result; }
從代碼中能夠看到isClosed
方法並不可靠,最終仍是經過執行sql來判斷鏈接是否可用,這個在不少涉及判斷數據庫鏈接是否有效的地方都是這麼作的,詳細能夠看一下isClosed
方法的註釋。
繼承UnpooledDataSourceFactory,直接返回PooledDataSource對象
public class PooledDataSourceFactory extends UnpooledDataSourceFactory { public PooledDataSourceFactory() { this.dataSource = new PooledDataSource(); } }
在整個線程池的實現代碼中,能夠學習到一個池的實現的要素有哪些,以及錄用基礎代碼如何實現一個池。對於那些封裝成高層次的池的代碼來講,這個實現顯得又些單薄和不夠全面,但是不管鏈接池如何實現核心池的實現邏輯是不會變的。