本文主要介紹一下mysql jdbc statement的queryTimeout及resultSet的next方法java
mysql-connector-java-5.1.43-sources.jar!/com/mysql/jdbc/PreparedStatement.javamysql
/** * A Prepared SQL query is executed and its ResultSet is returned * * @return a ResultSet that contains the data produced by the query - never * null * * @exception SQLException * if a database access error occurs */ public java.sql.ResultSet executeQuery() throws SQLException { synchronized (checkClosed().getConnectionMutex()) { MySQLConnection locallyScopedConn = this.connection; checkForDml(this.originalSql, this.firstCharOfStmt); this.batchedGeneratedKeys = null; resetCancelledState(); implicitlyCloseAllOpenResults(); clearWarnings(); if (this.doPingInstead) { doPingInstead(); return this.results; } setupStreamingTimeout(locallyScopedConn); Buffer sendPacket = fillSendPacket(); String oldCatalog = null; if (!locallyScopedConn.getCatalog().equals(this.currentCatalog)) { oldCatalog = locallyScopedConn.getCatalog(); locallyScopedConn.setCatalog(this.currentCatalog); } // // Check if we have cached metadata for this query... // CachedResultSetMetaData cachedMetadata = null; if (locallyScopedConn.getCacheResultSetMetadata()) { cachedMetadata = locallyScopedConn.getCachedMetaData(this.originalSql); } Field[] metadataFromCache = null; if (cachedMetadata != null) { metadataFromCache = cachedMetadata.fields; } locallyScopedConn.setSessionMaxRows(this.maxRows); this.results = executeInternal(this.maxRows, sendPacket, createStreamingResultSet(), true, metadataFromCache, false); if (oldCatalog != null) { locallyScopedConn.setCatalog(oldCatalog); } if (cachedMetadata != null) { locallyScopedConn.initializeResultsMetadataFromCache(this.originalSql, cachedMetadata, this.results); } else { if (locallyScopedConn.getCacheResultSetMetadata()) { locallyScopedConn.initializeResultsMetadataFromCache(this.originalSql, null /* will be created */, this.results); } } this.lastInsertId = this.results.getUpdateID(); return this.results; } }
results爲JDBC42ResultSet,這裏經過executeInternal來執行
/** * Actually execute the prepared statement. This is here so server-side * PreparedStatements can re-use most of the code from this class. * * @param maxRowsToRetrieve * the max number of rows to return * @param sendPacket * the packet to send * @param createStreamingResultSet * should a 'streaming' result set be created? * @param queryIsSelectOnly * is this query doing a SELECT? * @param unpackFields * * @return the results as a ResultSet * * @throws SQLException * if an error occurs. */ protected ResultSetInternalMethods executeInternal(int maxRowsToRetrieve, Buffer sendPacket, boolean createStreamingResultSet, boolean queryIsSelectOnly, Field[] metadataFromCache, boolean isBatch) throws SQLException { synchronized (checkClosed().getConnectionMutex()) { try { MySQLConnection locallyScopedConnection = this.connection; this.numberOfExecutions++; ResultSetInternalMethods rs; CancelTask timeoutTask = null; try { if (locallyScopedConnection.getEnableQueryTimeouts() && this.timeoutInMillis != 0 && locallyScopedConnection.versionMeetsMinimum(5, 0, 0)) { timeoutTask = new CancelTask(this); locallyScopedConnection.getCancelTimer().schedule(timeoutTask, this.timeoutInMillis); } if (!isBatch) { statementBegins(); } rs = locallyScopedConnection.execSQL(this, null, maxRowsToRetrieve, sendPacket, this.resultSetType, this.resultSetConcurrency, createStreamingResultSet, this.currentCatalog, metadataFromCache, isBatch); if (timeoutTask != null) { timeoutTask.cancel(); locallyScopedConnection.getCancelTimer().purge(); if (timeoutTask.caughtWhileCancelling != null) { throw timeoutTask.caughtWhileCancelling; } timeoutTask = null; } synchronized (this.cancelTimeoutMutex) { if (this.wasCancelled) { SQLException cause = null; if (this.wasCancelledByTimeout) { cause = new MySQLTimeoutException(); } else { cause = new MySQLStatementCancelledException(); } resetCancelledState(); throw cause; } } } finally { if (!isBatch) { this.statementExecuting.set(false); } if (timeoutTask != null) { timeoutTask.cancel(); locallyScopedConnection.getCancelTimer().purge(); } } return rs; } catch (NullPointerException npe) { checkClosed(); // we can't synchronize ourselves against async connection-close due to deadlock issues, so this is the next best thing for // this particular corner case. throw npe; } } }
timeoutTask = new CancelTask(this); locallyScopedConnection.getCancelTimer().schedule(timeoutTask, this.timeoutInMillis);
timeoutTask.cancel(); locallyScopedConnection.getCancelTimer().purge();
最後是經過MysqlIO的getResultSet方法來獲取
mysql-connector-java-5.1.43-sources.jar!/com/mysql/jdbc/MysqlIO.javasql
protected ResultSetImpl getResultSet(StatementImpl callingStatement, long columnCount, int maxRows, int resultSetType, int resultSetConcurrency, boolean streamResults, String catalog, boolean isBinaryEncoded, Field[] metadataFromCache) throws SQLException { Buffer packet; // The packet from the server Field[] fields = null; // Read in the column information if (metadataFromCache == null /* we want the metadata from the server */) { fields = new Field[(int) columnCount]; for (int i = 0; i < columnCount; i++) { Buffer fieldPacket = null; fieldPacket = readPacket(); fields[i] = unpackField(fieldPacket, false); } } else { for (int i = 0; i < columnCount; i++) { skipPacket(); } } // There is no EOF packet after fields when CLIENT_DEPRECATE_EOF is set if (!isEOFDeprecated() || // if we asked to use cursor then there should be an OK packet here (this.connection.versionMeetsMinimum(5, 0, 2) && callingStatement != null && isBinaryEncoded && callingStatement.isCursorRequired())) { packet = reuseAndReadPacket(this.reusablePacket); readServerStatusForResultSets(packet); } // // Handle cursor-based fetch first // if (this.connection.versionMeetsMinimum(5, 0, 2) && this.connection.getUseCursorFetch() && isBinaryEncoded && callingStatement != null && callingStatement.getFetchSize() != 0 && callingStatement.getResultSetType() == ResultSet.TYPE_FORWARD_ONLY) { ServerPreparedStatement prepStmt = (com.mysql.jdbc.ServerPreparedStatement) callingStatement; boolean usingCursor = true; // // Server versions 5.0.5 or newer will only open a cursor and set this flag if they can, otherwise they punt and go back to mysql_store_results() // behavior // if (this.connection.versionMeetsMinimum(5, 0, 5)) { usingCursor = (this.serverStatus & SERVER_STATUS_CURSOR_EXISTS) != 0; } if (usingCursor) { RowData rows = new RowDataCursor(this, prepStmt, fields); ResultSetImpl rs = buildResultSetWithRows(callingStatement, catalog, fields, rows, resultSetType, resultSetConcurrency, isBinaryEncoded); if (usingCursor) { rs.setFetchSize(callingStatement.getFetchSize()); } return rs; } } RowData rowData = null; if (!streamResults) { rowData = readSingleRowSet(columnCount, maxRows, resultSetConcurrency, isBinaryEncoded, (metadataFromCache == null) ? fields : metadataFromCache); } else { rowData = new RowDataDynamic(this, (int) columnCount, (metadataFromCache == null) ? fields : metadataFromCache, isBinaryEncoded); this.streamingData = rowData; } ResultSetImpl rs = buildResultSetWithRows(callingStatement, catalog, (metadataFromCache == null) ? fields : metadataFromCache, rowData, resultSetType, resultSetConcurrency, isBinaryEncoded); return rs; }
這裏要注意this.connection.getUseCursorFetch(),必須這個爲true,usingCursor纔會是true,而後rows對象纔會是RowDataCursor,不然是一口氣拉取全部數據
mysql-connector-java-5.1.43-sources.jar!/com/mysql/jdbc/StatementImpl.javasocket
/** * Thread used to implement query timeouts...Eventually we could be more * efficient and have one thread with timers, but this is a straightforward * and simple way to implement a feature that isn't used all that often. */ class CancelTask extends TimerTask { SQLException caughtWhileCancelling = null; StatementImpl toCancel; Properties origConnProps = null; String origConnURL = ""; long origConnId = 0; CancelTask(StatementImpl cancellee) throws SQLException { this.toCancel = cancellee; this.origConnProps = new Properties(); Properties props = StatementImpl.this.connection.getProperties(); Enumeration<?> keys = props.propertyNames(); while (keys.hasMoreElements()) { String key = keys.nextElement().toString(); this.origConnProps.setProperty(key, props.getProperty(key)); } this.origConnURL = StatementImpl.this.connection.getURL(); this.origConnId = StatementImpl.this.connection.getId(); } @Override public void run() { Thread cancelThread = new Thread() { @Override public void run() { Connection cancelConn = null; java.sql.Statement cancelStmt = null; try { MySQLConnection physicalConn = StatementImpl.this.physicalConnection.get(); if (physicalConn != null) { if (physicalConn.getQueryTimeoutKillsConnection()) { CancelTask.this.toCancel.wasCancelled = true; CancelTask.this.toCancel.wasCancelledByTimeout = true; physicalConn.realClose(false, false, true, new MySQLStatementCancelledException(Messages.getString("Statement.ConnectionKilledDueToTimeout"))); } else { synchronized (StatementImpl.this.cancelTimeoutMutex) { if (CancelTask.this.origConnURL.equals(physicalConn.getURL())) { // All's fine cancelConn = physicalConn.duplicate(); cancelStmt = cancelConn.createStatement(); cancelStmt.execute("KILL QUERY " + physicalConn.getId()); } else { try { cancelConn = (Connection) DriverManager.getConnection(CancelTask.this.origConnURL, CancelTask.this.origConnProps); cancelStmt = cancelConn.createStatement(); cancelStmt.execute("KILL QUERY " + CancelTask.this.origConnId); } catch (NullPointerException npe) { // Log this? "Failed to connect to " + origConnURL + " and KILL query" } } CancelTask.this.toCancel.wasCancelled = true; CancelTask.this.toCancel.wasCancelledByTimeout = true; } } } } catch (SQLException sqlEx) { CancelTask.this.caughtWhileCancelling = sqlEx; } catch (NullPointerException npe) { // Case when connection closed while starting to cancel. // We can't easily synchronize this, because then one thread can't cancel() a running query. // Ignore, we shouldn't re-throw this, because the connection's already closed, so the statement has been timed out. } finally { if (cancelStmt != null) { try { cancelStmt.close(); } catch (SQLException sqlEx) { throw new RuntimeException(sqlEx.toString()); } } if (cancelConn != null) { try { cancelConn.close(); } catch (SQLException sqlEx) { throw new RuntimeException(sqlEx.toString()); } } CancelTask.this.toCancel = null; CancelTask.this.origConnProps = null; CancelTask.this.origConnURL = null; } } }; cancelThread.start(); } }
若是queryTimeoutKillsConnection則kill鏈接,不然發送kill query命令,同時標記狀態
CancelTask.this.toCancel.wasCancelled = true; CancelTask.this.toCancel.wasCancelledByTimeout = true;
executeInternal裏頭會判斷,而後拋出異常
synchronized (this.cancelTimeoutMutex) { if (this.wasCancelled) { SQLException cause = null; if (this.wasCancelledByTimeout) { cause = new MySQLTimeoutException(); } else { cause = new MySQLStatementCancelledException(); } resetCancelledState(); throw cause; } }
mysql-connector-java-5.1.43-sources.jar!/com/mysql/jdbc/ResultSetImpl.javaasync
/** * A ResultSet is initially positioned before its first row, the first call * to next makes the first row the current row; the second call makes the * second row the current row, etc. * * <p> * If an input stream from the previous row is open, it is implicitly closed. The ResultSet's warning chain is cleared when a new row is read * </p> * * @return true if the new current is valid; false if there are no more rows * * @exception SQLException * if a database access error occurs */ public boolean next() throws SQLException { synchronized (checkClosed().getConnectionMutex()) { if (this.onInsertRow) { this.onInsertRow = false; } if (this.doingUpdates) { this.doingUpdates = false; } boolean b; if (!reallyResult()) { throw SQLError.createSQLException(Messages.getString("ResultSet.ResultSet_is_from_UPDATE._No_Data_115"), SQLError.SQL_STATE_GENERAL_ERROR, getExceptionInterceptor()); } if (this.thisRow != null) { this.thisRow.closeOpenStreams(); } if (this.rowData.size() == 0) { b = false; } else { this.thisRow = this.rowData.next(); if (this.thisRow == null) { b = false; } else { clearWarnings(); b = true; } } setRowPositionValidity(); return b; } }
這裏調用了this.rowData.next()來獲取數據,若是開啓cursorFetch模式,則這個rowDate是RowDataCursor對象
mysql-connector-java-5.1.43-sources.jar!/com/mysql/jdbc/RowDataCursor.javaide
/** * Returns the next row. * * @return the next row value * @throws SQLException * if a database error occurs */ public ResultSetRow next() throws SQLException { if (this.fetchedRows == null && this.currentPositionInEntireResult != BEFORE_START_OF_ROWS) { throw SQLError.createSQLException(Messages.getString("ResultSet.Operation_not_allowed_after_ResultSet_closed_144"), SQLError.SQL_STATE_GENERAL_ERROR, this.mysql.getExceptionInterceptor()); } if (!hasNext()) { return null; } this.currentPositionInEntireResult++; this.currentPositionInFetchedRows++; // Catch the forced scroll-passed-end if (this.fetchedRows != null && this.fetchedRows.size() == 0) { return null; } if ((this.fetchedRows == null) || (this.currentPositionInFetchedRows > (this.fetchedRows.size() - 1))) { fetchMoreRows(); this.currentPositionInFetchedRows = 0; } ResultSetRow row = this.fetchedRows.get(this.currentPositionInFetchedRows); row.setMetadata(this.metadata); return row; }
注意hasNext()方法
/** * Returns true if another row exists. * * @return true if more rows * @throws SQLException * if a database error occurs */ public boolean hasNext() throws SQLException { if (this.fetchedRows != null && this.fetchedRows.size() == 0) { return false; } if (this.owner != null && this.owner.owningStatement != null) { int maxRows = this.owner.owningStatement.maxRows; if (maxRows != -1 && this.currentPositionInEntireResult + 1 > maxRows) { return false; } } if (this.currentPositionInEntireResult != BEFORE_START_OF_ROWS) { // Case, we've fetched some rows, but are not at end of fetched block if (this.currentPositionInFetchedRows < (this.fetchedRows.size() - 1)) { return true; } else if (this.currentPositionInFetchedRows == this.fetchedRows.size() && this.lastRowFetched) { return false; } else { // need to fetch to determine fetchMoreRows(); return (this.fetchedRows.size() > 0); } } // Okay, no rows _yet_, so fetch 'em fetchMoreRows(); return this.fetchedRows.size() > 0; }
注意fetchMoreRows()方法
private void fetchMoreRows() throws SQLException { if (this.lastRowFetched) { this.fetchedRows = new ArrayList<ResultSetRow>(0); return; } synchronized (this.owner.connection.getConnectionMutex()) { boolean oldFirstFetchCompleted = this.firstFetchCompleted; if (!this.firstFetchCompleted) { this.firstFetchCompleted = true; } int numRowsToFetch = this.owner.getFetchSize(); if (numRowsToFetch == 0) { numRowsToFetch = this.prepStmt.getFetchSize(); } if (numRowsToFetch == Integer.MIN_VALUE) { // Handle the case where the user used 'old' streaming result sets numRowsToFetch = 1; } this.fetchedRows = this.mysql.fetchRowsViaCursor(this.fetchedRows, this.statementIdOnServer, this.metadata, numRowsToFetch, this.useBufferRowExplicit); this.currentPositionInFetchedRows = BEFORE_START_OF_ROWS; if ((this.mysql.getServerStatus() & SERVER_STATUS_LAST_ROW_SENT) != 0) { this.lastRowFetched = true; if (!oldFirstFetchCompleted && this.fetchedRows.size() == 0) { this.wasEmpty = true; } } } }
這裏會先把firstFetchCompleted標記爲true,而後經過this.mysql.fetchRowsViaCursor去拉取下一批數據
mysql-connector-java-5.1.43-sources.jar!/com/mysql/jdbc/MysqlIO.javafetch
protected List<ResultSetRow> fetchRowsViaCursor(List<ResultSetRow> fetchedRows, long statementId, Field[] columnTypes, int fetchSize, boolean useBufferRowExplicit) throws SQLException { if (fetchedRows == null) { fetchedRows = new ArrayList<ResultSetRow>(fetchSize); } else { fetchedRows.clear(); } this.sharedSendPacket.clear(); this.sharedSendPacket.writeByte((byte) MysqlDefs.COM_FETCH); this.sharedSendPacket.writeLong(statementId); this.sharedSendPacket.writeLong(fetchSize); sendCommand(MysqlDefs.COM_FETCH, null, this.sharedSendPacket, true, null, 0); ResultSetRow row = null; while ((row = nextRow(columnTypes, columnTypes.length, true, ResultSet.CONCUR_READ_ONLY, false, useBufferRowExplicit, false, null)) != null) { fetchedRows.add(row); } return fetchedRows; }
這裏注意sharedSendPacket指定了fetchSize,而後調用sendCommand方法把這批數據拉到reusablePacket,以後nextRow方法是從reusablePacket讀取數據。
/** * Send a command to the MySQL server If data is to be sent with command, * it should be put in extraData. * * Raw packets can be sent by setting queryPacket to something other * than null. * * @param command * the MySQL protocol 'command' from MysqlDefs * @param extraData * any 'string' data for the command * @param queryPacket * a packet pre-loaded with data for the protocol (i.e. * from a client-side prepared statement). * @param skipCheck * do not call checkErrorPacket() if true * @param extraDataCharEncoding * the character encoding of the extraData * parameter. * * @return the response packet from the server * * @throws SQLException * if an I/O error or SQL error occurs */ final Buffer sendCommand(int command, String extraData, Buffer queryPacket, boolean skipCheck, String extraDataCharEncoding, int timeoutMillis) throws SQLException { this.commandCount++; // // We cache these locally, per-command, as the checks for them are in very 'hot' sections of the I/O code and we save 10-15% in overall performance by // doing this... // this.enablePacketDebug = this.connection.getEnablePacketDebug(); this.readPacketSequence = 0; int oldTimeout = 0; if (timeoutMillis != 0) { try { oldTimeout = this.mysqlConnection.getSoTimeout(); this.mysqlConnection.setSoTimeout(timeoutMillis); } catch (SocketException e) { throw SQLError.createCommunicationsException(this.connection, this.lastPacketSentTimeMs, this.lastPacketReceivedTimeMs, e, getExceptionInterceptor()); } } try { checkForOutstandingStreamingData(); // Clear serverStatus...this value is guarded by an external mutex, as you can only ever be processing one command at a time this.oldServerStatus = this.serverStatus; this.serverStatus = 0; this.hadWarnings = false; this.warningCount = 0; this.queryNoIndexUsed = false; this.queryBadIndexUsed = false; this.serverQueryWasSlow = false; // // Compressed input stream needs cleared at beginning of each command execution... // if (this.useCompression) { int bytesLeft = this.mysqlInput.available(); if (bytesLeft > 0) { this.mysqlInput.skip(bytesLeft); } } try { clearInputStream(); // // PreparedStatements construct their own packets, for efficiency's sake. // // If this is a generic query, we need to re-use the sending packet. // if (queryPacket == null) { int packLength = HEADER_LENGTH + COMP_HEADER_LENGTH + 1 + ((extraData != null) ? extraData.length() : 0) + 2; if (this.sendPacket == null) { this.sendPacket = new Buffer(packLength); } this.packetSequence = -1; this.compressedPacketSequence = -1; this.readPacketSequence = 0; this.checkPacketSequence = true; this.sendPacket.clear(); this.sendPacket.writeByte((byte) command); if ((command == MysqlDefs.INIT_DB) || (command == MysqlDefs.QUERY) || (command == MysqlDefs.COM_PREPARE)) { if (extraDataCharEncoding == null) { this.sendPacket.writeStringNoNull(extraData); } else { this.sendPacket.writeStringNoNull(extraData, extraDataCharEncoding, this.connection.getServerCharset(), this.connection.parserKnowsUnicode(), this.connection); } } send(this.sendPacket, this.sendPacket.getPosition()); } else { this.packetSequence = -1; this.compressedPacketSequence = -1; send(queryPacket, queryPacket.getPosition()); // packet passed by PreparedStatement } } catch (SQLException sqlEx) { // don't wrap SQLExceptions throw sqlEx; } catch (Exception ex) { throw SQLError.createCommunicationsException(this.connection, this.lastPacketSentTimeMs, this.lastPacketReceivedTimeMs, ex, getExceptionInterceptor()); } Buffer returnPacket = null; if (!skipCheck) { if ((command == MysqlDefs.COM_EXECUTE) || (command == MysqlDefs.COM_RESET_STMT)) { this.readPacketSequence = 0; this.packetSequenceReset = true; } returnPacket = checkErrorPacket(command); } return returnPacket; } catch (IOException ioEx) { preserveOldTransactionState(); throw SQLError.createCommunicationsException(this.connection, this.lastPacketSentTimeMs, this.lastPacketReceivedTimeMs, ioEx, getExceptionInterceptor()); } catch (SQLException e) { preserveOldTransactionState(); throw e; } finally { if (timeoutMillis != 0) { try { this.mysqlConnection.setSoTimeout(oldTimeout); } catch (SocketException e) { throw SQLError.createCommunicationsException(this.connection, this.lastPacketSentTimeMs, this.lastPacketReceivedTimeMs, e, getExceptionInterceptor()); } } } }
注意這裏的mysqlConnection若是timeoutMillis不爲0則會setSoTimeout;可是fetchRowsViaCursors方法設置的爲0,即不改變原來的設置。那麼就是使用鏈接參數中設定的值。
對於mysql jdbc來講:ui
若是queryTimeoutKillsConnection則kill鏈接,不然發送kill query命令,同時標記狀態,而後拋出MySQLTimeoutException異常.