本文主要介紹一下pg jdbc statement的queryTimeout及resultSet的next方法html
@Test public void testReadTimeout() throws SQLException { Connection connection = dataSource.getConnection(); //https://jdbc.postgresql.org/documentation/head/query.html connection.setAutoCommit(false); //NOTE 爲了設置fetchSize,必須設置爲false String sql = "select * from demo_table"; PreparedStatement pstmt; try { pstmt = (PreparedStatement)connection.prepareStatement(sql); pstmt.setQueryTimeout(1); //NOTE 設置Statement執行完成的超時時間,前提是socket的timeout比這個大 pstmt.setFetchSize(5000); //NOTE 這樣設置爲了模擬query timeout的異常 System.out.println("ps.getQueryTimeout():" + pstmt.getQueryTimeout()); System.out.println("ps.getFetchSize():" + pstmt.getFetchSize()); System.out.println("ps.getFetchDirection():" + pstmt.getFetchDirection()); System.out.println("ps.getMaxFieldSize():" + pstmt.getMaxFieldSize()); ResultSet rs = pstmt.executeQuery(); //NOTE 設置Statement執行完成的超時時間,前提是socket的timeout比這個大 //NOTE 這裏返回了就表明statement執行完成,默認返回fetchSize的數據 int col = rs.getMetaData().getColumnCount(); System.out.println("============================"); while (rs.next()) { //NOTE 這個的timeout由socket的超時時間設置,oracle.jdbc.ReadTimeout=60000 for (int i = 1; i <= col; i++) { System.out.print(rs.getObject(i)); } System.out.println(""); } System.out.println("============================"); } catch (SQLException e) { e.printStackTrace(); } finally { //close resources } }
ostgresql-9.4.1212.jre7-sources.jar!/org/postgresql/jdbc/PgStatement.javajava
private void executeInternal(CachedQuery cachedQuery, ParameterList queryParameters, int flags) throws SQLException { closeForNextExecution(); // Enable cursor-based resultset if possible. if (fetchSize > 0 && !wantsScrollableResultSet() && !connection.getAutoCommit() && !wantsHoldableResultSet()) { flags |= QueryExecutor.QUERY_FORWARD_CURSOR; } if (wantsGeneratedKeysOnce || wantsGeneratedKeysAlways) { flags |= QueryExecutor.QUERY_BOTH_ROWS_AND_STATUS; // If the no results flag is set (from executeUpdate) // clear it so we get the generated keys results. // if ((flags & QueryExecutor.QUERY_NO_RESULTS) != 0) { flags &= ~(QueryExecutor.QUERY_NO_RESULTS); } } if (isOneShotQuery(cachedQuery)) { flags |= QueryExecutor.QUERY_ONESHOT; } // Only use named statements after we hit the threshold. Note that only // named statements can be transferred in binary format. if (connection.getAutoCommit()) { flags |= QueryExecutor.QUERY_SUPPRESS_BEGIN; } // updateable result sets do not yet support binary updates if (concurrency != ResultSet.CONCUR_READ_ONLY) { flags |= QueryExecutor.QUERY_NO_BINARY_TRANSFER; } Query queryToExecute = cachedQuery.query; if (queryToExecute.isEmpty()) { flags |= QueryExecutor.QUERY_SUPPRESS_BEGIN; } if (!queryToExecute.isStatementDescribed() && forceBinaryTransfers && (flags & QueryExecutor.QUERY_EXECUTE_AS_SIMPLE) == 0) { // Simple 'Q' execution does not need to know parameter types // When binaryTransfer is forced, then we need to know resulting parameter and column types, // thus sending a describe request. int flags2 = flags | QueryExecutor.QUERY_DESCRIBE_ONLY; StatementResultHandler handler2 = new StatementResultHandler(); connection.getQueryExecutor().execute(queryToExecute, queryParameters, handler2, 0, 0, flags2); ResultWrapper result2 = handler2.getResults(); if (result2 != null) { result2.getResultSet().close(); } } StatementResultHandler handler = new StatementResultHandler(); result = null; try { startTimer(); connection.getQueryExecutor().execute(queryToExecute, queryParameters, handler, maxrows, fetchSize, flags); } finally { killTimerTask(); } result = firstUnclosedResult = handler.getResults(); if (wantsGeneratedKeysOnce || wantsGeneratedKeysAlways) { generatedKeys = result; result = result.getNext(); if (wantsGeneratedKeysOnce) { wantsGeneratedKeysOnce = false; } } }
注意,這裏在執行先後分別調用了startTimer()和killTimerTask()
private void startTimer() { /* * there shouldn't be any previous timer active, but better safe than sorry. */ cleanupTimer(); STATE_UPDATER.set(this, StatementCancelState.IN_QUERY); if (timeout == 0) { return; } TimerTask cancelTask = new TimerTask() { public void run() { try { if (!CANCEL_TIMER_UPDATER.compareAndSet(PgStatement.this, this, null)) { // Nothing to do here, statement has already finished and cleared // cancelTimerTask reference return; } PgStatement.this.cancel(); } catch (SQLException e) { } } }; CANCEL_TIMER_UPDATER.set(this, cancelTask); connection.addTimerTask(cancelTask, timeout); }
/** * Clears {@link #cancelTimerTask} if any. Returns true if and only if "cancel" timer task would * never invoke {@link #cancel()}. */ private boolean cleanupTimer() { TimerTask timerTask = CANCEL_TIMER_UPDATER.get(this); if (timerTask == null) { // If timeout is zero, then timer task did not exist, so we safely report "all clear" return timeout == 0; } if (!CANCEL_TIMER_UPDATER.compareAndSet(this, timerTask, null)) { // Failed to update reference -> timer has just fired, so we must wait for the query state to // become "cancelling". return false; } timerTask.cancel(); connection.purgeTimerTasks(); // All clear return true; }
注意這裏更新statement狀態以後,調用task的cancel,以及connection.purgeTimerTasks()
public void cancel() throws SQLException { if (!STATE_UPDATER.compareAndSet(this, StatementCancelState.IN_QUERY, StatementCancelState.CANCELING)) { // Not in query, there's nothing to cancel return; } try { // Synchronize on connection to avoid spinning in killTimerTask synchronized (connection) { connection.cancelQuery(); } } finally { STATE_UPDATER.set(this, StatementCancelState.CANCELLED); synchronized (connection) { connection.notifyAll(); // wake-up killTimerTask } } }
executeQuery超時了則直接調用connection.cancelQuery()
public void cancelQuery() throws SQLException { checkClosed(); queryExecutor.sendQueryCancel(); }
postgresql-9.4.1212.jre7-sources.jar!/org/postgresql/core/QueryExecutorBase.javasql
public void sendQueryCancel() throws SQLException { if (cancelPid <= 0) { return; } PGStream cancelStream = null; // Now we need to construct and send a cancel packet try { if (logger.logDebug()) { logger.debug(" FE=> CancelRequest(pid=" + cancelPid + ",ckey=" + cancelKey + ")"); } cancelStream = new PGStream(pgStream.getSocketFactory(), pgStream.getHostSpec(), cancelSignalTimeout); if (cancelSignalTimeout > 0) { cancelStream.getSocket().setSoTimeout(cancelSignalTimeout); } cancelStream.sendInteger4(16); cancelStream.sendInteger2(1234); cancelStream.sendInteger2(5678); cancelStream.sendInteger4(cancelPid); cancelStream.sendInteger4(cancelKey); cancelStream.flush(); cancelStream.receiveEOF(); } catch (IOException e) { // Safe to ignore. if (logger.logDebug()) { logger.debug("Ignoring exception on cancel request:", e); } } finally { if (cancelStream != null) { try { cancelStream.close(); } catch (IOException e) { // Ignored. } } } }
向數據庫server發送cancel指令
private void killTimerTask() { boolean timerTaskIsClear = cleanupTimer(); // The order is important here: in case we need to wait for the cancel task, the state must be // kept StatementCancelState.IN_QUERY, so cancelTask would be able to cancel the query. // It is believed that this case is very rare, so "additional cancel and wait below" would not // harm it. if (timerTaskIsClear && STATE_UPDATER.compareAndSet(this, StatementCancelState.IN_QUERY, StatementCancelState.IDLE)) { return; } // Being here means someone managed to call .cancel() and our connection did not receive // "timeout error" // We wait till state becomes "cancelled" boolean interrupted = false; while (!STATE_UPDATER.compareAndSet(this, StatementCancelState.CANCELLED, StatementCancelState.IDLE)) { synchronized (connection) { try { // Note: wait timeout here is irrelevant since synchronized(connection) would block until // .cancel finishes connection.wait(10); } catch (InterruptedException e) { // NOSONAR // Either re-interrupt this method or rethrow the "InterruptedException" interrupted = true; } } } if (interrupted) { Thread.currentThread().interrupt(); } }
這裏先調用cleanupTimer,而後更新statement的狀態
postgresql-9.4.1212.jre7-sources.jar!/org/postgresql/jdbc/PgConnection.java數據庫
private synchronized Timer getTimer() { if (cancelTimer == null) { cancelTimer = Driver.getSharedTimer().getTimer(); } return cancelTimer; }
這裏建立或獲取一個timer
public void addTimerTask(TimerTask timerTask, long milliSeconds) { Timer timer = getTimer(); timer.schedule(timerTask, milliSeconds); }
這個添加timerTask就是直接調度了
postgresql-9.4.1212.jre7-sources.jar!/org/postgresql/jdbc/PgConnection.javaoracle
public void purgeTimerTasks() { Timer timer = cancelTimer; if (timer != null) { timer.purge(); } }
在cleanupTimer中被調用,用來清理已經被cancel掉的timer task
postgresql-9.4.1212.jre7-sources.jar!/org/postgresql/jdbc/PgResultSet.javaapp
public boolean next() throws SQLException { checkClosed(); if (onInsertRow) { throw new PSQLException(GT.tr("Can''t use relative move methods while on the insert row."), PSQLState.INVALID_CURSOR_STATE); } if (current_row + 1 >= rows.size()) { if (cursor == null || (maxRows > 0 && row_offset + rows.size() >= maxRows)) { current_row = rows.size(); this_row = null; rowBuffer = null; return false; // End of the resultset. } // Ask for some more data. row_offset += rows.size(); // We are discarding some data. int fetchRows = fetchSize; if (maxRows != 0) { if (fetchRows == 0 || row_offset + fetchRows > maxRows) { // Fetch would exceed maxRows, limit it. fetchRows = maxRows - row_offset; } } // Execute the fetch and update this resultset. connection.getQueryExecutor().fetch(cursor, new CursorResultHandler(), fetchRows); current_row = 0; // Test the new rows array. if (rows.isEmpty()) { this_row = null; rowBuffer = null; return false; } } else { current_row++; } initRowBuffer(); return true; }
這裏的fetch沒有像executeQuery那樣加timer
postgresql-9.4.1212.jre7-sources.jar!/org/postgresql/core/v3/QueryExecutorImpl.javasocket
public synchronized void fetch(ResultCursor cursor, ResultHandler handler, int fetchSize) throws SQLException { waitOnLock(); final Portal portal = (Portal) cursor; // Insert a ResultHandler that turns bare command statuses into empty datasets // (if the fetch returns no rows, we see just a CommandStatus..) final ResultHandler delegateHandler = handler; handler = new ResultHandlerDelegate(delegateHandler) { public void handleCommandStatus(String status, int updateCount, long insertOID) { handleResultRows(portal.getQuery(), null, new ArrayList<byte[][]>(), null); } }; // Now actually run it. try { processDeadParsedQueries(); processDeadPortals(); sendExecute(portal.getQuery(), portal, fetchSize); sendSync(); processResults(handler, 0); estimatedReceiveBufferBytes = 0; } catch (IOException e) { abort(); handler.handleError( new PSQLException(GT.tr("An I/O error occurred while sending to the backend."), PSQLState.CONNECTION_FAILURE, e)); } handler.handleCompletion(); }
timeout時間不宜過長,不過正常執行完sql,會調用killTimerTask()方,裏頭會先cleanupTimer,取消timerTask,而後調用purgeTimerTasks()清理cancel掉的task,避免timeout時間過長致使task堆積最後內存溢出
這裏頭的機制有待深刻研究,多是server端返回timeout error
)