本文主要研究一下jdbc statement的fetchSizehtml
這裏以postgres jdbc driver爲例,主要是由於postgres的jdbc driver有公開源碼,並且命名比較規範。以前看oracle jdbc,因爲沒有源碼,反編譯出來一大堆var1,var2等的變量命名,很是晦澀。默認狀況下pgjdbc driver會一次性拉取全部結果集,也就是在executeQuery的時候。對於大數據量的查詢來講,很是容易形成OOM。這種場景就須要設置fetchSize,執行query的時候先返回第一批數據,以後next完一批數據以後再去拉取下一批。java
可是這個有幾個要求:git
@Test public void testReadTimeout() throws SQLException { Connection connection = dataSource.getConnection(); //https://jdbc.postgresql.org/documentation/head/query.html connection.setAutoCommit(false); //NOTE 爲了設置fetchSize,必須設置爲false String sql = "select * from demo_table"; PreparedStatement pstmt; try { pstmt = (PreparedStatement)connection.prepareStatement(sql); pstmt.setFetchSize(50); System.out.println("ps.getQueryTimeout():" + pstmt.getQueryTimeout()); System.out.println("ps.getFetchSize():" + pstmt.getFetchSize()); System.out.println("ps.getFetchDirection():" + pstmt.getFetchDirection()); System.out.println("ps.getMaxFieldSize():" + pstmt.getMaxFieldSize()); ResultSet rs = pstmt.executeQuery(); //NOTE 這裏返回了就表明statement執行完成,默認返回fetchSize的數據 int col = rs.getMetaData().getColumnCount(); System.out.println("============================"); while (rs.next()) { for (int i = 1; i <= col; i++) { System.out.print(rs.getObject(i)); } System.out.println(""); } System.out.println("============================"); } catch (SQLException e) { e.printStackTrace(); } finally { //close resources } }
postgresql-9.4.1212.jre7-sources.jar!/org/postgresql/jdbc/PgPreparedStatement.javagithub
/* * A Prepared SQL query is executed and its ResultSet is returned * * @return a ResultSet that contains the data produced by the * query - never null * * @exception SQLException if a database access error occurs */ public java.sql.ResultSet executeQuery() throws SQLException { if (!executeWithFlags(0)) { throw new PSQLException(GT.tr("No results were returned by the query."), PSQLState.NO_DATA); } if (result.getNext() != null) { throw new PSQLException(GT.tr("Multiple ResultSets were returned by the query."), PSQLState.TOO_MANY_RESULTS); } return result.getResultSet(); }
executeQuery首先調用executeWithFlags方法,源碼裏頭直接寫在if裏頭的,這個不是推薦的方式,由於放在if比較容易忽略。
public boolean executeWithFlags(int flags) throws SQLException { try { checkClosed(); if (connection.getPreferQueryMode() == PreferQueryMode.SIMPLE) { flags |= QueryExecutor.QUERY_EXECUTE_AS_SIMPLE; } execute(preparedQuery, preparedParameters, flags); return (result != null && result.getResultSet() != null); } finally { defaultTimeZone = null; } } protected final void execute(CachedQuery cachedQuery, ParameterList queryParameters, int flags) throws SQLException { try { executeInternal(cachedQuery, queryParameters, flags); } catch (SQLException e) { // Don't retry composite queries as it might get partially executed if (cachedQuery.query.getSubqueries() != null || !connection.getQueryExecutor().willHealOnRetry(e)) { throw e; } cachedQuery.query.close(); // Execute the query one more time executeInternal(cachedQuery, queryParameters, flags); } }
這裏又調用execute方法,在調用executeInternal
postgresql-9.4.1212.jre7-sources.jar!/org/postgresql/jdbc/PgPreparedStatement.javasql
private void executeInternal(CachedQuery cachedQuery, ParameterList queryParameters, int flags) throws SQLException { closeForNextExecution(); // Enable cursor-based resultset if possible. if (fetchSize > 0 && !wantsScrollableResultSet() && !connection.getAutoCommit() && !wantsHoldableResultSet()) { flags |= QueryExecutor.QUERY_FORWARD_CURSOR; } if (wantsGeneratedKeysOnce || wantsGeneratedKeysAlways) { flags |= QueryExecutor.QUERY_BOTH_ROWS_AND_STATUS; // If the no results flag is set (from executeUpdate) // clear it so we get the generated keys results. // if ((flags & QueryExecutor.QUERY_NO_RESULTS) != 0) { flags &= ~(QueryExecutor.QUERY_NO_RESULTS); } } if (isOneShotQuery(cachedQuery)) { flags |= QueryExecutor.QUERY_ONESHOT; } // Only use named statements after we hit the threshold. Note that only // named statements can be transferred in binary format. if (connection.getAutoCommit()) { flags |= QueryExecutor.QUERY_SUPPRESS_BEGIN; } // updateable result sets do not yet support binary updates if (concurrency != ResultSet.CONCUR_READ_ONLY) { flags |= QueryExecutor.QUERY_NO_BINARY_TRANSFER; } Query queryToExecute = cachedQuery.query; if (queryToExecute.isEmpty()) { flags |= QueryExecutor.QUERY_SUPPRESS_BEGIN; } if (!queryToExecute.isStatementDescribed() && forceBinaryTransfers && (flags & QueryExecutor.QUERY_EXECUTE_AS_SIMPLE) == 0) { // Simple 'Q' execution does not need to know parameter types // When binaryTransfer is forced, then we need to know resulting parameter and column types, // thus sending a describe request. int flags2 = flags | QueryExecutor.QUERY_DESCRIBE_ONLY; StatementResultHandler handler2 = new StatementResultHandler(); connection.getQueryExecutor().execute(queryToExecute, queryParameters, handler2, 0, 0, flags2); ResultWrapper result2 = handler2.getResults(); if (result2 != null) { result2.getResultSet().close(); } } StatementResultHandler handler = new StatementResultHandler(); result = null; try { startTimer(); connection.getQueryExecutor().execute(queryToExecute, queryParameters, handler, maxrows, fetchSize, flags); } finally { killTimerTask(); } result = firstUnclosedResult = handler.getResults(); if (wantsGeneratedKeysOnce || wantsGeneratedKeysAlways) { generatedKeys = result; result = result.getNext(); if (wantsGeneratedKeysOnce) { wantsGeneratedKeysOnce = false; } } }
主要看這段
connection.getQueryExecutor().execute(queryToExecute, queryParameters, handler, maxrows, fetchSize, flags);
經過把fetchSize傳遞進去,拉取指定大小的result最後調用sendExecute以及processResults方法來拉取數據
postgresql-9.4.1212.jre7-sources.jar!/org/postgresql/core/v3/QueryExecutorImpl.java數據庫
private void sendExecute(SimpleQuery query, Portal portal, int limit) throws IOException { // // Send Execute. // if (logger.logDebug()) { logger.debug(" FE=> Execute(portal=" + portal + ",limit=" + limit + ")"); } byte[] encodedPortalName = (portal == null ? null : portal.getEncodedPortalName()); int encodedSize = (encodedPortalName == null ? 0 : encodedPortalName.length); // Total size = 4 (size field) + 1 + N (source portal) + 4 (max rows) pgStream.sendChar('E'); // Execute pgStream.sendInteger4(4 + 1 + encodedSize + 4); // message size if (encodedPortalName != null) { pgStream.send(encodedPortalName); // portal name } pgStream.sendChar(0); // portal name terminator pgStream.sendInteger4(limit); // row limit pendingExecuteQueue.add(new ExecuteRequest(query, portal, false)); } protected void processResults(ResultHandler handler, int flags) throws IOException { boolean noResults = (flags & QueryExecutor.QUERY_NO_RESULTS) != 0; boolean bothRowsAndStatus = (flags & QueryExecutor.QUERY_BOTH_ROWS_AND_STATUS) != 0; List<byte[][]> tuples = null; int c; boolean endQuery = false; // At the end of a command execution we have the CommandComplete // message to tell us we're done, but with a describeOnly command // we have no real flag to let us know we're done. We've got to // look for the next RowDescription or NoData message and return // from there. boolean doneAfterRowDescNoData = false; while (!endQuery) { c = pgStream.receiveChar(); switch (c) { case 'A': // Asynchronous Notify receiveAsyncNotify(); break; case '1': // Parse Complete (response to Parse) pgStream.receiveInteger4(); // len, discarded SimpleQuery parsedQuery = pendingParseQueue.removeFirst(); String parsedStatementName = parsedQuery.getStatementName(); //... } } }
postgresql-9.4.1212.jre7-sources.jar!/org/postgresql/jdbc/PgResultSet.javaoracle
public boolean next() throws SQLException { checkClosed(); if (onInsertRow) { throw new PSQLException(GT.tr("Can''t use relative move methods while on the insert row."), PSQLState.INVALID_CURSOR_STATE); } if (current_row + 1 >= rows.size()) { if (cursor == null || (maxRows > 0 && row_offset + rows.size() >= maxRows)) { current_row = rows.size(); this_row = null; rowBuffer = null; return false; // End of the resultset. } // Ask for some more data. row_offset += rows.size(); // We are discarding some data. int fetchRows = fetchSize; if (maxRows != 0) { if (fetchRows == 0 || row_offset + fetchRows > maxRows) { // Fetch would exceed maxRows, limit it. fetchRows = maxRows - row_offset; } } // Execute the fetch and update this resultset. connection.getQueryExecutor().fetch(cursor, new CursorResultHandler(), fetchRows); current_row = 0; // Test the new rows array. if (rows.isEmpty()) { this_row = null; rowBuffer = null; return false; } } else { current_row++; } initRowBuffer(); return true; }
next方法能夠看到,首先判斷current_row + 1是否小於rows.size(),小於的話,那就current_row++;不然表示這一批fetchSize的數據被消費完了,須要判斷是否結束或者拉取下一批數據,以後更新current_row
connection.getQueryExecutor().fetch(cursor, new CursorResultHandler(), fetchRows);
這個方法拉取fetchRows條數的下一批數據
private void initRowBuffer() { this_row = rows.get(current_row); // We only need a copy of the current row if we're going to // modify it via an updatable resultset. if (resultsetconcurrency == ResultSet.CONCUR_UPDATABLE) { rowBuffer = new byte[this_row.length][]; System.arraycopy(this_row, 0, rowBuffer, 0, this_row.length); } else { rowBuffer = null; } }
這就是next移動以後,把要消費的這行數據放到rowBuffer裏頭。
對於查詢數據量大的場景下,很是有必要設置fetchSize,不然全量拉取很容易OOM,可是使用fetchSize的時候,要求數據可以在遍歷resultSet的時候及時處理,而不是收集完全部數據返回回去再去處理。app