快速使用組件-spring batch(3)讀文件數據到數據庫

tags: springbatchhtml


1.引言

上一篇文章《快速瞭解組件-spring batch(2)之helloworld》Spring Batch進行了入門級的開發,也對基本的組件有了必定的瞭解。但實際開發過程當中,更多的是涉及文件及數據庫的操做,以定時後臺運行的方式,實現批處理操做。典型操做是從文本數據(csv/txt等文件)中讀取數據,而後寫入到數據庫存儲。以下圖所示:java

讀文件流程

若須要開發此過程,能夠按照上一篇文章所寫的,自定義ItemReaderItemWriter來實現,可是Spring Batch其實已經提供現成的文件讀取和數據庫寫入的組件,開發人員能夠直接使用,提升開發效率。本文將會對文件讀取和數據庫寫入進行實戰介紹。mysql

2.開發環境

  • JDK: jdk1.8
  • Spring Boot: 2.1.4.RELEASE
  • Spring Batch:4.1.2.RELEASE
  • 開發IDE: IDEA
  • 構建工具Maven: 3.3.9
  • 日誌組件logback:1.2.3
  • lombok:1.18.6

3.Spring Batch提供的讀-處理-寫組件一覽

在使用Spring Batch內置的讀寫組件時,首先咱們先弄清楚有哪些組件能夠用,按讀、寫、處理,見下面說明。Spring Batch已提供了比較全面的支持。git

3.1 ItemReader

ItemReader 說明
ListItemReader 讀取List類型數據,只能讀一次
ItemReaderAdapter ItemReader適配器,能夠複用現有的讀操做
FlatFileItemReader 讀Flat類型文件
StaxEventItemReader 讀XML類型文件
JdbcCursorItemReader 基於JDBC遊標方式讀數據庫
HibernateCursorItemReader 基於Hibernate遊標方式讀數據庫
StoredProcedureItemReader 基於存儲過程讀數據庫
JpaPagingItemReader 基於Jpa方式分頁讀數據庫
JdbcPagingItemReader 基於JDBC方式分頁讀數據庫
HibernatePagingItemReader 基於Hibernate方式分頁讀取數據庫
JmsItemReader 讀取JMS隊列
IteratorItemReader 迭代方式的讀組件
MultiResourceItemReader 多文件讀組件
MongoItemReader 基於分佈式文件存儲的數據庫 MongoDB讀組件
Neo4jItemReader 面向網絡的數據庫Neo4j的讀組件
ResourcesItemReader 基於批量資源的讀組件,每次讀取返回資源對象 AmqpItemReader讀取AMQP隊列組件
RepositoryItemReader 基於 Spring Data的讀組件

3.2 ItemWriter

ItemWriter 說明
FlatFileItemWriter 寫Flat類型文件
MultiResourceItemWriter 多文件寫組件
StaxEventItemWriter 寫XML類型文件
AmqpItemWriter 寫AMQP類型消息
ClassifierCompositeItemWriter 根據 Classifier路由不一樣的Item到特定的ItemWriter處理
HiberateItemWriter 基於Hibernate方式寫數據庫
ItemWriterAdapter ItemWriter適配器,能夠複用現有的寫服務
JdbcBatchItemWriter 基於JDBC方式寫數據庫
JmsItemWriter 寫JMS隊列 JpaItemWriter基於Jpa方式寫數據庫
GemfireItemWriter 基於分佈式數據庫Gemfire的寫組件
SpELMappingGemfireItemWriter 基於Spring表達式語言寫分佈式數據庫Gemfire的寫組件
MimeMessageItemWriter 發送郵件的寫組件
MongoItemWriter 基於分佈式文件存儲的數據庫MongoDB寫組件
Neo4jItemWriter 面向網絡的數據庫Neo4j的讀組件
PropertyExtractingDelegatingItemWriter 屬性抽取代理寫組件:經過調用給定的 Spring Bean方法執行寫入,參數由Item中指定的屬性字段獲取做爲參數
RepositoryItemWriter基於 Spring Data的寫組件
SimpleMailMessageItemWriter 發送郵件的寫組件
CompositeItemWriter 條目寫的組合模式,支持組裝多個ItemWriter

3.3 ItemProcessor

ItemProcessor 說明
CompositeItemProcessor 組合處理器,能夠封裝多個業務處理服務
ItemProcessorAdapter ItemProcessor適配器,能夠複用現有的業務處理服務
PassThroughItemProcessor 不作任何業務處理,直接返回讀到的數據
ValidatingItemProcessor 數據校驗處理器,支持對數據的校驗,若是校驗不經過能夠進行過濾掉或者經過skip的方式跳過對記錄的處理

4.開發流程

根據當前示例,從csv文件中讀數據,寫入到mysql數據庫,只須要使用FlatFileItemReaderJdbcBatchItemWriter便可。下面對開發流程做簡要說明。示例工程能夠在這裏獲取,裏面有文件resources/user-data.csv及相應的目標數據庫腳本mytest.sqlgithub

4.1 建立spring batch數據庫

4.1.1 建立數據庫並執行sql腳本

Spring Batch的運行須要數據庫的支持,以保存任務的運行狀態及結果。所以須要先建立數據庫。在mysql中建立名爲my_spring_batch的數據庫。並在此數據庫中執行 Spring Batch的數據庫腳本,腳本位置在spring-batch-core-4.1.2.RELEASE.jar的jar包中的\org\springframework\batch\core\schema-mysql.sql(也能夠在示例工程sql文件夾中獲取)。執行完成後,數據庫表以下圖所示:spring

數據庫

4.1.2 數據庫表說明

數據庫共9張表,以seq結尾的是用於生成主鍵的。其它6張表,以batch_job開頭的是存儲任務的相關信息,batch_step開頭的存儲步驟相關信息。sql

  • jobjob instancejob execution關係 任務job是咱們說的邏輯概念,即完整的一個批處理工做,它的實例就是job instance,此任務信息是存儲在batch_job_instance表中。有點相似java中的類和類實例的概念,是一對多的關係。對於每個job instance,執行的時候會生成記錄存儲在batch_job_execution中,表示每個job執行的實際狀況。注意,這裏job instancejob execution也是一對多的關係,即同一個實例有可能會執行屢次(如上一次執行失敗了,後面從新再執行)。數據庫

  • batch_job_execution_contextbatch_job_execution_params 存儲任務執行時須要用到的上下文(以json格式存儲)及運行時使用的參數。json

  • batch_step_executionbatch_step_execution_context 存儲任務執行過程當中的做業步驟及運行時上下文。bash

4.1.3 建立示例目標數據庫

本示例只涉及一個test_user表。建立mytest數據庫庫,執行mytest.sql腳本便可。

4.2 配置多數據源

通常來講,咱們會把Spring Batch的數據存儲在獨立的數據庫中,而實際的應用使用的則是目標數據庫,所以須要配置多數據源訪問。基於上一篇文章的工程進行開發。

4.2.1 添加mysql數據庫依賴

<!-- 數據庫相關依賴-->
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
	<groupId>mysql</groupId>
	<artifactId>mysql-connector-java</artifactId>
	<scope>runtime</scope>
</dependency>
複製代碼

4.2.2 配置多數據源訪問

Spring Boot對多數據源的支持比較友好,配置也很簡單,先在配置文件中添加數據庫配置,而後在java配置文件中添加相應的註解便可。以下:

  • application.properties配置內容
# spring batch db
spring.datasource.jdbc-url=jdbc:mysql://localhost:3310/my_spring_batch?serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=utf8&useSSL=false
spring.datasource.username=root
spring.datasource.password=111111
# target db
spring.target-datasource.jdbc-url=jdbc:mysql://localhost:3310/mytest?serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=utf8&useSSL=false
spring.target-datasource.username=root
spring.target-datasource.password=111111
複製代碼
  • DataSourceConfig配置內容 新建DataSourceConfig.java文件,配置多數據源,以下:
@Configuration
public class DataSourceConfig {
    @Bean("datasource")
    @ConfigurationProperties(prefix="spring.datasource")
    @Primary
    public DataSource batchDatasource() {
        return DataSourceBuilder.create().build();
    }

    @Bean("targetDatasource")
    @ConfigurationProperties(prefix="spring.target-datasource")
    public DataSource targetDatasource() {
        return DataSourceBuilder.create().build();
    }
}
複製代碼

這樣,後面就能夠直接使用datasourcetargetDatasource兩個Bean進行數據庫訪問。

4.3 添加User實體

本實例中,讀取csv文件,轉爲User實體,而後存儲到數據庫,所以須要先把User這個實體做一個定義。使用了lombokjpa的註解,以下:

@Entity
@Data
@Table(name="test_user")
public class User{
    @Id
    @GeneratedValue
    /**
     * id
     */
    private Long id;

    /**
     * 姓名
     */
    private String name;

    /**
     * 手機號
     */
    private String phone;
    ...略
複製代碼

4.4 添加文件讀取組件ItemReader

使用內置的FlatFileItemReader便可。以下:

@Bean
    public ItemReader file2DbItemReader(){
        String funcName = Thread.currentThread().getStackTrace()[1].getMethodName();
        return new FlatFileItemReaderBuilder<User>()
                .name(funcName)
                .resource(new ClassPathResource("user-data.csv"))
// .linesToSkip(1)
                .delimited()
                .names(new String[]{"id","name","phone","title","email","gender","date_of_birth","sys_create_time","sys_create_user","sys_update_time","sys_update_user"})
                .fieldSetMapper(new UserFieldSetMapper())
                .build();
    }
複製代碼

說明:

  • FlatFileItemReaderBuilder用於建立FlatFileItemReader,設置相應的行爲,包括使用它來設置讀取文件的位置(resource),文件分隔符(默認是','),是否跳過前面幾行(linesToSkip),標識每一列對應的列名稱(可與數據庫的字段名一致)。設置文件字段與數據庫實體字段的對應關係。
  • 設置文件字段與數據庫實體字段的對應關係,使用FieldSetMapper來實現,其中FieldSet表明每一行文本數據,返回值即爲實體對象。以下所示:
public class UserFieldSetMapper implements FieldSetMapper<User> {
    @Override
    public User mapFieldSet(FieldSet fieldSet) throws BindException {
        String patternYmd = "yyyy-MM-dd";
        String patternYmdHms = "yyyy-MM-dd HH:mm:ss";
        User user = new User();
        user.setId(fieldSet.readLong("id"));
        user.setName(fieldSet.readString("name"));
        user.setPhone(fieldSet.readString("phone"));
        user.setTitle(fieldSet.readString("title"));
        user.setEmail(fieldSet.readString("email"));
        user.setGender(fieldSet.readString("gender"));
        //此字段有可能爲null
        String dataOfBirthStr = fieldSet.readString("date_of_birth");
        if(SyncConstants.STR_CSV_NULL.equals(dataOfBirthStr)){
            user.setDateOfBirth(null);
        }else{
            DateTime dateTime = DateUtil.parse(dataOfBirthStr, patternYmd);
            user.setDateOfBirth(dateTime.toJdkDate());
        }
        user.setSysCreateTime(fieldSet.readDate("sys_create_time",patternYmdHms));
        user.setSysCreateUser(fieldSet.readString("sys_create_user"));
        user.setSysUpdateTime(fieldSet.readDate("sys_update_time",patternYmdHms));
        user.setSysUpdateUser(fieldSet.readString("sys_update_user"));
        return user;
    }
}
複製代碼

4.5 添加處理組件ItemProcessor

因爲csv文本文件中的數據null值數據標識符爲\N,所以能夠在處理組件中進行處理,把標識符\N設置爲null值。以下所示:

@Slf4j
public class File2DbItemProcessor implements ItemProcessor<User,User> {

    @Override
    public User process(User user) throws Exception {
        user.setPhone(checkStr(user.getPhone()));
        user.setTitle(checkStr(user.getTitle()));
        user.setEmail(checkStr(user.getEmail()));
        user.setGender(checkStr(user.getGender()));
        log.info(LogConstants.LOG_TAG + "item process: " +user.getName());
        return user;
    }

    public String checkStr(String dataToCheck){
        if(SyncConstants.STR_CSV_NULL.equals(dataToCheck)){
            return null;
        }
        return dataToCheck;
    }
}
複製代碼

4.6 添加數據庫寫入組件ItemWriter

數據庫寫入組件使用JdbcBatchItemWriter便可,以下:

@Bean
    public ItemWriter file2DbWriter(@Qualifier("targetDatasource") DataSource datasource){
        return new JdbcBatchItemWriterBuilder<User>()
                .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
                .sql("INSERT INTO test_user(id,name,phone,title,email,gender,date_of_birth,sys_create_time,sys_create_user,sys_update_time,sys_update_user) " +
                        "VALUES (:id,:name,:phone,:title,:email,:gender,:dateOfBirth,:sysCreateTime,:sysCreateUser,:sysUpdateTime,:sysUpdateUser)")
                .dataSource(datasource)
                .build();
    }
複製代碼

說明:

  • 使用JdbcBatchItemWriterBuilder進行JdbcBatchItemWriter的建立,設置插入數據庫的sql語句,同時指定數據源便可。
  • @Qualifier("targetDatasource") DataSource datasource用於指定數據源
  • 使用BeanPropertyItemSqlParameterSourceProvider能夠直接把讀取的數據實體的屬性數據做爲參數填充到sql語句中,從而實現數據插入操做。

4.7 組裝完整任務

通過上面的操做,可使用一個java配置,把讀、寫、處理組裝成完整的stepjob,以下所示(詳細可見示例工程文件):

File2DbBatchConfig.java

@Bean
public Job file2DbJob(Step file2DbStep,JobExecutionListener file2DbListener){
        String funcName = Thread.currentThread().getStackTrace()[1].getMethodName();
        return jobBuilderFactory.get(funcName)
                .listener(file2DbListener)
                .flow(file2DbStep)
                .end().build();
    }
@Bean
public Step file2DbStep(ItemReader file2DbItemReader , ItemProcessor file2DbProcessor ,ItemWriter file2DbWriter){
        String funcName = Thread.currentThread().getStackTrace()[1].getMethodName();
        return stepBuilderFactory.get(funcName)
                .<User,User>chunk(10)
                .reader(file2DbItemReader)
                .processor(file2DbProcessor)
                .writer(file2DbWriter)
                .build();
    }
複製代碼

4.8 編寫測試

參考上一文章的ConsoleJobTest,編寫File2DbJobTest文件。

@RunWith(SpringRunner.class)
@SpringBootTest(classes = {MainBootApplication.class,File2DbBatchConfig.class})
@Slf4j
public class File2DbJobTest {

    @Autowired
    private JobLauncherService jobLauncherService;

    @Autowired
    private Job file2DbJob;

    @Test
    public void testFile2DbJob() throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
        //構建任務運行參數
        JobParameters jobParameters = JobUtil.makeJobParameters();
        //執行並顯示結果
        Map<String, Object> stringObjectMap = jobLauncherService.startJob(file2DbJob, jobParameters);
        Assert.assertEquals(ExitStatus.COMPLETED,stringObjectMap.get(SyncConstants.STR_RETURN_EXITSTATUS));
    }
}
複製代碼

執行後結果輸出以下(exitCode=COMPLETED):

執行結果

5.總結

本文先對Spring Batch的開箱即用的ItemReaderItemWriterItemProcessor做了一個簡要的概覽,而後以讀csv文件,處理null值,再插入到數據庫的處理邏輯爲案例,介紹了Spring Batch的數據庫腳本,FlatFileItemReaderJdbcBatchItemWriter的使用。但願對你們更深刻的瞭解Spring Batch有幫助,並能用到實踐中。

參考資源

相關文章
相關標籤/搜索