Spring Batch(6)——數據庫批數據讀寫

前序文章陸續介紹了批處理的基本概念Job使用Step控制Item的結構以及扁平文件的讀寫。本文將接着前面的內容說明數據庫如何進行批處理讀寫。git

數據讀取

數據庫是絕大部分系統要用到的數據存儲工具,所以針對數據庫執行批量數據處理任務也是很常見的需求。數據的批量處理與常規業務開發不一樣,若是一次性讀取百萬條,對於任何系統而言確定都是不可取的。爲了解決這個問題Spring Batch提供了2套數據讀取方案:github

  • 基於遊標讀取數據
  • 基於分頁讀取數據

遊標讀取數據

對於有經驗大數據工程師而言數據庫遊標的操做應該是很是熟悉的,由於這是從數據庫讀取數據流標準方法,並且在Java中也封裝了ResultSet這種面向遊標操做的數據結構。spring

ResultSet一直都會指向結果集中的某一行數據,使用next方法可讓遊標跳轉到下一行數據。Spring Batch一樣使用這個特性來控制數據的讀取:數據庫

  1. 在初始化時打開遊標。
  2. 每一次調用ItemReader::read方法就從ResultSet獲取一行數據並執行next
  3. 返回可用於數據處理的映射結構(map、dict)。

在一切都執行完畢以後,框架會使用回調過程調用ResultSet::close來關閉遊標。因爲全部的業務過程都綁定在一個事物之上,因此知道到Step執行完畢或異常退出調用執行close。下圖展現了數據讀取的過程:緩存

SQL語句的查詢結果稱爲數據集(對於大部分數據庫而言,其SQL執行結果會產生臨時的表空間索引來存放數據集)。遊標開始會停滯在ID=2的位置,一次ItemReader執行完畢後會產生對應的實體FOO2,而後遊標下移直到最後的ID=6。最後關閉遊標。session

JdbcCursorItemReader

JdbcCursorItemReader是使用遊標讀取數據集的ItemReader實現類之一。它使用JdbcTemplate中的DataSource控制ResultSet,其過程是將ResultSet的每行數據轉換爲所須要的實體類。數據結構

JdbcCursorItemReader的執行過程有三步:app

  1. 經過DataSource建立JdbcTemplate
  2. 設定數據集的SQL語句。
  3. 建立ResultSet到實體類的映射。 大體以下:
//隨風溜達的向日葵 chkui.com
JdbcCursorItemReader itemReader = new JdbcCursorItemReader();
itemReader.setDataSource(dataSource);
itemReader.setSql("SELECT ID, NAME, CREDIT from CUSTOMER");
itemReader.setRowMapper(new CustomerCreditRowMapper());

除了上面的代碼,JdbcCursorItemReader還有其餘屬性:框架

屬性名稱 說明
ignoreWarnings 標記當執行SQL語句出現警告時,是輸出日誌仍是拋出異常,默認爲true——輸出日誌
fetchSize 預通知JDBC驅動全量數據的個數
maxRows 設置ResultSet從數據庫中一次讀取記錄的上限
queryTimeout 設置執行SQL語句的等待超時時間,單位秒。當超過這個時間會拋出DataAccessException
verifyCursorPosition 對遊標位置進行校驗。因爲在RowMapper::mapRow方法中ResultSet是直接暴露給使用者的,所以有可能在業務代碼層面調用了ResultSet::next方法。將這個屬性設置爲true,在框架中會有一個位置計數器與ResultSet保持一致,當執行完Reader後位置不一致會拋出異常。
saveState 標記讀取的狀態是否被存放到ExecutionContext中。默認爲true
driverSupportsAbsolute 告訴框架是指直接使用ResultSet::absolute方法來指定遊標位置,使用這個屬性須要數據庫驅動支持。建議在支持absolute特性的數據庫上開啓這個特性,可以明顯的提高性能。默認爲false
setUseSharedExtendedConnection 標記讀取數據的遊標是否與Step其餘過程綁定成同一個事物。默認爲false,表示讀取數據的遊標是單獨創建鏈接的,具備自身獨立的事物。若是設定爲true須要用ExtendedConnectionDataSourceProxy包裝DataSource用於管理事物過程。此時遊標的建立標記爲'READ_ONLY'、'HOLD_CURSORS_OVER_COMMIT'。須要注意的是該屬性須要數據庫支持3.0以上的JDBC驅動。

可執行源碼

源碼在下列地址的items子項目:less

執行JdbcCursorItemReader的代碼在org.chenkui.spring.batch.sample.items.JdbcReader。啓動位置是org.chenkui.spring.batch.sample.database.cursor.JdbcCurosrApplication

在運行代碼以前請先在數據庫中執行如下DDL語句,並添加部分測試數據。

CREATE TABLE `tmp_test_weather` (
  `id` int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT '主鍵',
  `siteid` varchar(64) NOT NULL COMMENT '業務主鍵',
  `month` varchar(64) NOT NULL COMMENT '日期',
  `type` varchar(64) NOT NULL COMMENT '氣象類型',
  `value` int(11) NOT NULL COMMENT '值',
  `ext` varchar(255) DEFAULT NULL COMMENT '擴展數據',
  PRIMARY KEY (`id`)
) ;

運行代碼:

//隨風溜達的向日葵 chkui.com
public class JdbcReader {

    @Bean
    public RowMapper<WeatherEntity> weatherEntityRowMapper() {

        return new RowMapper<WeatherEntity>() {
            public static final String SITEID_COLUMN = "siteId"; // 設置映射字段
            public static final String MONTH_COLUMN = "month";
            public static final String TYPE_COLUMN = "type";
            public static final String VALUE_COLUMN = "value";
            public static final String EXT_COLUMN = "ext";

            @Override
            // 數據轉換
            public WeatherEntity mapRow(ResultSet resultSet, int rowNum) throws SQLException {
                WeatherEntity weatherEntity = new WeatherEntity();
                weatherEntity.setSiteId(resultSet.getString(SITEID_COLUMN));
                weatherEntity.setMonth(resultSet.getString(MONTH_COLUMN));
                weatherEntity.setType(WeatherEntity.Type.valueOf(resultSet.getString(TYPE_COLUMN)));
                weatherEntity.setValue(resultSet.getInt(VALUE_COLUMN));
                weatherEntity.setExt(resultSet.getString(EXT_COLUMN));
                return weatherEntity;
            }
        };
    }

    @Bean
    public ItemReader<WeatherEntity> jdbcCursorItemReader(
        @Qualifier("weatherEntityRowMapper") RowMapper<WeatherEntity> rowMapper, DataSource datasource) {
        JdbcCursorItemReader<WeatherEntity> itemReader = new JdbcCursorItemReader<>();
        itemReader.setDataSource(datasource); //設置DataSource
        //設置讀取的SQL
        itemReader.setSql("SELECT siteId, month, type, value, ext from TMP_TEST_WEATHER"); 
        itemReader.setRowMapper(rowMapper); //設置轉換
        return itemReader;
    }
}

HibernateCursorItemReader

在Java體系中數據庫操做常見的規範有JPAORM,Spring Batch提供了HibernateCursorItemReader來實現HibernateTemplate,它能夠經過Hibernate框架進行遊標的控制。

須要注意的是:使用Hibernate框架來處理批量數據到目前爲止一直都有爭議,核心緣由是Hibernate最初是爲在線聯機事物型系統開發的。不過這並不意味着不能使用它來處理批數據,解決此問題就是讓Hibernate使用StatelessSession用來保持遊標,而不是standard session一次讀寫,這將致使Hibernate的緩存機制和數據髒讀檢查失效,進而影響批處理的過程。關於Hibernate的狀態控制機制請閱讀官方文檔。

HibernateCursorItemReader使用過程與JdbcCursorItemReader沒多大差別都是逐條讀取數據而後控制狀態連接關閉。只不過他提供了Hibernate所使用的HSQL方案。

@Bean
public ItemReader<WeatherEntity> hibernateCursorItemReader(SessionFactory sessionFactory) {
    HibernateCursorItemReader<WeatherEntity> itemReader = new HibernateCursorItemReader<>();
    itemReader.setName("hibernateCursorItemReader");
    itemReader.setQueryString("from WeatherEntity tmp_test_weather");
    itemReader.setSessionFactory(sessionFactory);
    return itemReader;
}

public ItemReader<WeatherEntity> hibernateCursorItemReader(SessionFactory sessionFactory) {
    return new HibernateCursorItemReaderBuilder<CustomerCredit>()
            .name("creditReader")
            .sessionFactory(sessionFactory)
            .queryString("from CustomerCredit")
            .build();
}

若是沒有特別的須要,不推薦使用Hibernate

StoredProcedureItemReader

存儲過程是在同一個數據庫中處理大量數據的經常使用方法。StoredProcedureItemReader的執行過程和JdbcCursorItemReader一致,可是底層邏輯是先執行存儲過程,而後返回存儲過程執行結果遊標。不一樣的數據庫存儲過程遊標返回會有一些差別:

  1. 做爲一個ResultSet返回。(SQL Server, Sybase, DB2, Derby以及MySQL)
  2. 參數返回一個 ref-cursor實例。好比Oracle、PostgreSQL數據庫,這類數據庫存儲過程是不會直接return任何內容的,須要從傳參獲取。
  3. 返回存儲過程調用後的返回值。

針對以上3個類型,配置上有一些差別:

//隨風溜達的向日葵 chkui.com
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
    StoredProcedureItemReader reader = new StoredProcedureItemReader();

    reader.setDataSource(dataSource);
    reader.setProcedureName("sp_processor_weather");
    reader.setRowMapper(new weatherEntityRowMapper());
	
    reader.setRefCursorPosition(1);//第二種類型須要指定ref-cursor的參數位置

    reader.setFunction(true);//第三種類型須要明確的告知reader經過返回獲取

    return reader;
}

使用存儲過程處理數據的好處是能夠實現針對庫內的數據進行合併、分割、排序等處理。若是數據在同一個數據庫,性能也明顯好於經過Java處理。

分頁讀取數據

相對於遊標,還有一個辦法是進行分頁查詢。分頁查詢意味着再進行批處理的過程當中同一個SQL會屢次執行。在聯機型事物系統中分頁查詢經常使用於列表功能,每一次查詢須要指定開始位置和結束位置。

JdbcPagingItemReader

分頁查詢的默認實現類是JdbcPagingItemReader,它的核心功能是用分頁器PagingQueryProvider進行分頁控制。因爲不一樣的數據庫分頁方法差異很大,因此針對不一樣的數據庫有不一樣的實現類。框架提供了SqlPagingQueryProviderFactoryBean用於檢查當前數據庫並自動注入對應的PagingQueryProvider

JdbcPagingItemReader會從數據庫中一次性讀取一整頁的數據,可是調用Reader的時候仍是會一行一行的返回數據。框架會自行根據運行狀況肯定何時須要執行下一個分頁的查詢。

分頁讀取數據執行源碼

執行JdbcPagingItemReader的代碼在org.chenkui.spring.batch.sample.items.pageReader。啓動位置是org.chenkui.spring.batch.sample.database.paging.JdbcPagingApplication

//隨風溜達的向日葵 chkui.com
public class pageReader {
    final private boolean wrapperBuilder = false;
    @Bean
    //設置 queryProvider
    public SqlPagingQueryProviderFactoryBean queryProvider(DataSource dataSource) {
        SqlPagingQueryProviderFactoryBean provider = new SqlPagingQueryProviderFactoryBean();

        provider.setDataSource(dataSource);
        provider.setSelectClause("select id, siteid, month, type, value, ext");
        provider.setFromClause("from tmp_test_weather");
        provider.setWhereClause("where id>:start");
        provider.setSortKey("id");

        return provider;
    }

    @Bean
    public ItemReader<WeatherEntity> jdbcPagingItemReader(DataSource dataSource,
            PagingQueryProvider queryProvider,
            RowMapper<WeatherEntity> rowMapper) {

        Map<String, Object> parameterValues = new HashMap<>();
        parameterValues.put("start", "1");
        JdbcPagingItemReader<WeatherEntity> itemReader;
        if (wrapperBuilder) {
            itemReader = new JdbcPagingItemReaderBuilder<WeatherEntity>()
                    .name("creditReader")
                    .dataSource(dataSource)
                    .queryProvider(queryProvider)
                    .parameterValues(parameterValues)
                    .rowMapper(rowMapper)
                    .pageSize(1000)
                    .build();
        } else {
            itemReader = new JdbcPagingItemReader<>();
            itemReader.setName("weatherEntityJdbcPagingItemReader");
            itemReader.setDataSource(dataSource);
            itemReader.setQueryProvider(queryProvider);
            itemReader.setParameterValues(parameterValues);
            itemReader.setRowMapper(rowMapper);
            itemReader.setPageSize(1000);
        }
        return itemReader;
    }
}

數據寫入

Spring Batch爲不一樣類型的文件的寫入提供了多個實現類,但並無爲數據庫的寫入提供任何實現類,而是交由開發者本身去實現接口。理由是:

  1. 數據庫的寫入與文件寫入有巨大的差異。對於一個Step而言,在寫入一份文件時須要保持對文件的打開狀態從而可以高效的向隊尾添加數據。若是每次都從新打開文件,從開始位置移動到隊尾會耗費大量的時間(不少文件流沒法在open時就知道長度)。當整個Step結束時才能關閉文件的打開狀態,框架提供的文件讀寫類都實現了這個控制過程。

  2. 另外不管使用何種方式將數據寫入文件都是"逐行進行"的(流數據寫入、字符串逐行寫入)。所以當數據寫入與整個Step綁定爲事物時還須要實現一個控制過程是:在寫入數據的過程當中出現異常時要擦除本次事物已經寫入的數據,這樣才能和整個Step的狀態保持一致。框架中的類一樣實現了這個過程。

  3. 可是向數據庫寫入數據並不須要相似於文件的尾部寫入控制,由於數據庫的各類連接池自己就保證了連接->寫入->釋放的高效執行,也不存在向隊尾添加數據的問題。並且幾乎全部的數據庫驅動都提供了事物能力,在任什麼時候候出現異常都會自動回退,不存在擦除數據的問題。

所以,對於數據庫的寫入操做只要按照常規的批量數據寫入的方式便可,開發者使用任何工具均可以完成這個過程。

寫入數據一個簡單的實現

實現數據寫入方法不少,這和常規的聯機事務系統沒任何區別。下面直接用JdbcTemplate實現了一個簡單的數據庫寫入過程。

執行數據庫寫入的核心代碼在org.chenkui.spring.batch.sample.items.JdbcWriter。啓動位置是org.chenkui.spring.batch.sample.database.output.JdbcWriterApplication

//隨風溜達的向日葵 chkui.com
public class JdbcWriter {

    @Bean
    public ItemWriter<WeatherEntity> jdbcBatchWriter(JdbcTemplate template) {

        return new ItemWriter<WeatherEntity>() {
            final private static String INSERt_SQL = 
                      "INSERT INTO tmp_test_weather(siteid, month, type, value, ext) VALUES(?,?,?,?,?)";
            @Override
            public void write(List<? extends WeatherEntity> items) throws Exception {
                List<Object[]> batchArgs = new ArrayList<>();
                for (WeatherEntity entity : items) {
                    Object[] objects = new Object[5];
                    objects[0] = entity.getSiteId();
                    objects[1] = entity.getMonth();
                    objects[2] = entity.getType().name();
                    objects[3] = entity.getValue();
                    objects[4] = entity.getExt();
                    batchArgs.add(objects);
                }
                template.batchUpdate(INSERt_SQL, batchArgs);
            }
        };
    }
}

組合使用案例

下面是一些組合使用過程,簡單實現了文件到數據庫、數據庫到文件的過程。文件讀寫的過程已經在文件讀寫中介紹過,這裏會重複使用以前介紹的文件讀寫的功能。

下面的案例是將data.csv中的數據寫入到數據庫,而後再將數據寫入到out-data.csv。案例組合使用已有的item完成任務:flatFileReaderjdbcBatchWriterjdbcCursorItemReadersimpleProcessorflatFileWriter。這種ReaderProcessorWriter組合的方式也是完成一個批處理工程的常見開發方式。

案例的運行代碼在org.chenkui.spring.batch.sample.database.complex包中,使用了2個Step來完成任務,一個將數據讀取到數據庫,一個將數據進行過濾,而後再寫入到文件:

//隨風溜達的向日葵 chkui.com
public class FileComplexProcessConfig {
    @Bean
    // 配置Step1
    public Step file2DatabaseStep(StepBuilderFactory builder,
            @Qualifier("flatFileReader") ItemReader<WeatherEntity> reader,
            @Qualifier("jdbcBatchWriter") ItemWriter<WeatherEntity> writer) {
        return builder.get("file2DatabaseStep") // 建立
                .<WeatherEntity, WeatherEntity>chunk(50) // 分片
                .reader(reader) // 讀取
                .writer(writer) // 寫入
                .faultTolerant() // 開啓容錯處理
                .skipLimit(20) // 跳過設置
                .skip(Exception.class) // 跳過異常
                .build();
    }

    @Bean
    // 配置Step2
    public Step database2FileStep(StepBuilderFactory builder, 
            @Qualifier("jdbcCursorItemReader") ItemReader<WeatherEntity> reader,
            @Qualifier("simpleProcessor") ItemProcessor<WeatherEntity, MaxTemperatureEntiry> processor,
            @Qualifier("flatFileWriter") ItemWriter<MaxTemperatureEntiry> writer) {
        return builder.get("database2FileStep") // 建立
                .<WeatherEntity, MaxTemperatureEntiry>chunk(50) // 分片
                .reader(reader) // 讀取
                .processor(processor) //
                .writer(writer) // 寫入
                .faultTolerant() // 開啓容錯處理
                .skipLimit(20) // 跳過設置
                .skip(Exception.class) // 跳過異常
                .build();
    }

    @Bean
    public Job file2DatabaseJob(@Qualifier("file2DatabaseStep") Step step2Database,
            @Qualifier("database2FileStep") Step step2File, JobBuilderFactory builder) {
        return builder.get("File2Database").start(step2Database).next(step2File).build();
    }
}
相關文章
相關標籤/搜索