java mysql大數據量批量插入與流式讀取分析


總結下這周幫助客戶解決報表生成操做的mysql 驅動的使用上的一些問題,與解決方案。因爲生成報表邏輯要從數據庫讀取大量數據並在內存中加工處理後在java

生成大量的彙總數據而後寫入到數據庫。基本流程是 讀取->處理->寫入。mysql

1 讀取操做開始遇到的問題是當sql查詢數據量比較大時候基本讀不出來。開始覺得是server端處理太慢。可是在控制檯是能夠當即返回數據的。因而在應用sql

這邊抓包,發現也是發送sql後當即有數據返回。可是執行ResultSet的next方法確實阻塞的。查文檔翻代碼原來mysql驅動默認的行爲是須要把整個結果所有讀取到數據庫

內存中才開始容許應用讀取結果。顯然與指望的行爲不一致,指望的行爲是流的方式讀取,當結果從myql服務端返回後當即仍是讀取處理。這樣應用就不須要大量內存網絡

來存儲這個結果集。正確的流式讀取方式代碼示例:性能

 

PreparedStatement ps = connection.prepareStatement("select .. from ..", 
            ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); 

//forward only read only也是mysql 驅動的默認值,因此不指定也是能夠的 好比: PreparedStatement ps = connection.prepareStatement("select .. from .."); 

ps.setFetchSize(Integer.MIN_VALUE); //也能夠修改jdbc url經過defaultFetchSize參數來設置,這樣默認因此的返回結果都是經過流方式讀取.
ResultSet rs = ps.executeQuery();

while (rs.next()) {
  System.out.println(rs.getString("fieldName"));
}

 


代碼分析:下面是mysql判斷是否開啓流式讀取結果的方法,有三個條件forward-only,read-only,fatch size是Integer.MIN_VALUE測試

/**
 * We only stream result sets when they are forward-only, read-only, and the
 * fetch size has been set to Integer.MIN_VALUE
 *
 * @return true if this result set should be streamed row at-a-time, rather
 * than read all at once.
 */
protected boolean createStreamingResultSet() {
    try { synchronized(checkClosed().getConnectionMutex()) { return ((this.resultSetType == java.sql.ResultSet.TYPE_FORWARD_ONLY) && (this.resultSetConcurrency == java.sql.ResultSet.CONCUR_READ_ONLY) && (this.fetchSize == Integer.MIN_VALUE)); } } catch (SQLException e) { // we can't break the interface, having this be no-op in case of error is ok return false; } }

 

 

2 批量寫入問題。開始時應用程序是一條一條的執行insert來寫入報表結果。寫入也是比較慢的。主要緣由是單條寫入時候須要應用於db之間大量的
請求響應交互。每一個請求都是一個獨立的事務提交。這樣網絡延遲大的狀況下屢次請求會有大量的時間消耗的網絡延遲上。第二個是因爲每一個事務db都會
有刷新磁盤操做寫事務日誌,保證事務的持久性。因爲每一個事務只是寫入一條數據 因此磁盤io利用率不高,由於對於磁盤io是按塊來的,因此連續寫入大量數據效率
更好。因此必須改爲批量插入的方式,減小請求數與事務數。下面是批量插入的例子:還有jdbc鏈接串必須加下rewriteBatchedStatements=truefetch

int batchSize = 1000;
PreparedStatement ps = connection.prepareStatement("insert into tb1 (c1,c2,c3...) values (?,?,?...)"); for (int i = 0; i < list.size(); i++) { ps.setXXX(list.get(i).getC1()); ps.setYYY(list.get(i).getC2()); ps.setZZZ(list.get(i).getC3()); ps.addBatch(); if ((i + 1) % batchSize == 0) { ps.executeBatch(); } } if (list.size() % batchSize != 0) { ps.executeBatch(); }

 

上面代碼示例是每1000條數據發送一次請求。mysql驅動內部在應用端會把屢次addBatch()的參數合併成一條multi value的insert語句發送給db去執行
好比insert into tb1(c1,c2,c3) values (v1,v2,v3),(v4,v5,v6),(v7,v8,v9)...
這樣能夠比每條一個insert 明顯少不少請求。減小了網絡延遲消耗時間與磁盤io時間,從而提升了tps。

代碼分析: 從代碼能夠看出,
1 rewriteBatchedStatements=true,insert是參數化語句且不是insert ... select 或者 insert... on duplicate key update with an id=last_insert_id(...)的話會執行 
executeBatchedInserts,也就是muti value的方式ui

2 rewriteBatchedStatements=true 語句是都是參數化(沒有addbatch(sql)方式加入的)的並且mysql server版本在4.1以上 語句超過三條,則執行executePreparedBatchAsMultiStatement
就是將多個語句經過;分隔一次提交多條sql。好比 "insert into tb1(c1,c2,c3) values (v1,v2,v3);insert into tb1(c1,c2,c3) values (v1,v2,v3)..."this

3 其他的執行executeBatchSerially,也就是仍是一條條處理

public void addBatch(String sql)throws SQLException {
    synchronized(checkClosed().getConnectionMutex()) { this.batchHasPlainStatements = true; super.addBatch(sql); } } public int[] executeBatch()throws SQLException { //... if (!this.batchHasPlainStatements && this.connection.getRewriteBatchedStatements()) { if (canRewriteAsMultiValueInsertAtSqlLevel()) { return executeBatchedInserts(batchTimeout); } if (this.connection.versionMeetsMinimum(4, 1, 0) && !this.batchHasPlainStatements && this.batchedArgs != null && this.batchedArgs.size() > 3 /* cost of option setting rt-wise */ ) { return executePreparedBatchAsMultiStatement(batchTimeout); } } return executeBatchSerially(batchTimeout); //..... }

 


executeBatchedInserts相比executePreparedBatchAsMultiStatement的方式傳輸效率更好,由於一次請求只重複一次前面的insert table (c1,c2,c3)

mysql server 對請求報文的最大長度有限制,若是batch size 太大形成請求報文超過最大限制,mysql 驅動會內部按最大報文限制查分紅多個報文。因此要真正減小提交次數

還要檢查下mysql server的max_allowed_packet 不然batch size 再大也沒用.

mysql> show VARIABLES like '%max_allowed_packet%';
+--------------------+-----------+
| Variable_name | Value |
+--------------------+-----------+
| max_allowed_packet | 167772160 |
+--------------------+-----------+
1 row in set (0.00 sec)

 

 要想驗證mysql 發送了正確的sql 有兩種方式

1 抓包,下圖是wireshark在 應用端抓包mysql的報文

 

2 另外一個辦法是在mysql server端開啓general log 能夠查看mysql收到的全部sql

 

3 在jdbc url上加上參數traceProtocol=true 或者profileSQL=true or autoGenerateTestcaseScript=true

 

性能測試對比

import java.sql.Connection;
import java.sql.PreparedStatement; import java.sql.SQLException; import com.alibaba.druid.pool.DruidDataSource; public class BatchInsert { public static void main(String[] args) throws SQLException { int batchSize = 1000; int insertCount = 1000; testDefault(batchSize, insertCount); testRewriteBatchedStatements(batchSize,insertCount); } private static void testDefault(int batchSize, int insertCount) throws SQLException { long start = System.currentTimeMillis(); doBatchedInsert(batchSize, insertCount,""); long end = System.currentTimeMillis(); System.out.println("default:" + (end -start) + "ms"); } private static void testRewriteBatchedStatements(int batchSize, int insertCount) throws SQLException { long start = System.currentTimeMillis(); doBatchedInsert(batchSize, insertCount, "rewriteBatchedStatements=true"); long end = System.currentTimeMillis(); System.out.println("rewriteBatchedStatements:" + (end -start) + "ms"); } private static void doBatchedInsert(int batchSize, int insertCount, String mysqlProperties) throws SQLException { DruidDataSource dataSource = new DruidDataSource(); dataSource.setUrl("jdbc:mysql://ip:3306/test?" + mysqlProperties); dataSource.setUsername("name"); dataSource.setPassword("password"); dataSource.init(); Connection connection = dataSource.getConnection(); PreparedStatement preparedStatement = connection.prepareStatement("insert into Test (name,gmt_created,gmt_modified) values (?,now(),now())"); for (int i = 0; i < insertCount; i++) { preparedStatement.setString(1, i+" "); preparedStatement.addBatch(); if((i+1) % batchSize == 0) { preparedStatement.executeBatch(); } } preparedStatement.executeBatch(); connection.close(); dataSource.close(); } }
 

網絡環境ping測試延遲是35ms ,測試結果:

default:75525msrewriteBatchedStatements:914ms

相關文章
相關標籤/搜索