Spring Batch 快速入門

  1. Spring Batch 是用來處理大量數據操做的框架,主要用來讀取大量數據,而後進行必定處理後輸出成指定的形式。
  2. Spring Batch 主要組成部分
  • JobRepository:用來註冊Job的容器
  • JobLauncher:用來啓動Job的接口
  • Job:實際執行的任務,包含一個或多個Step
  • Step:Step包含ItemReader,ItemProcessor,ItemWriter
  • ItemReader:讀取數據的接口
  • ItemProcessor:處理數據的接口
  • ItemWriter:輸出數據的接口

以上Spring Batch 主要組成部分只須要註冊成Spring的Bean便可,在配置類上使用@EnableBatchProcession註解開啓批處理支持java

package com.springbatch.config;

import com.springbatch.batch.CsvBeanValidator;
import com.springbatch.batch.CsvItemProcessor;
import com.springbatch.batch.CsvJobListener;
import com.springbatch.entity.Person;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.batch.item.validator.Validator;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.transaction.PlatformTransactionManager;

import javax.sql.DataSource;

/**Batch配置類
 * Created by Administrator on 2016/12/8.
 */
@Configuration
//開啓批處理支持
@EnableBatchProcessing
public class CsvBathConfig {

    //ItemReader定義
    @Bean
    public ItemReader<Person> reader() throws Exception {
        //使用FlatFileItemReader讀取文件
        FlatFileItemReader<Person> reader = new FlatFileItemReader<Person>();
        //設置csv文件路徑
        reader.setResource(new ClassPathResource("people.csv"));
        //對csv文件的數據和領域模型類作對應映射
        reader.setLineMapper(new DefaultLineMapper<Person>(){{
           setLineTokenizer(new DelimitedLineTokenizer(){{
               setNames(new String[]{"name", "age", "nation", "address"});
           }});
            setFieldSetMapper(new BeanWrapperFieldSetMapper<Person>(){{
                setTargetType(Person.class);
            }});
        }});
        return reader;
    }

    //ItemProcessor定義
    @Bean
    public ItemProcessor<Person, Person> processor() {
        //採用自定義的ItemProcessor的實現
        CsvItemProcessor processor = new CsvItemProcessor();
        //指定自定義檢驗器
        processor.setValidator(csvBeanValidator());
        return processor;
    }

    //ItemWriter定義
    @Bean
    public ItemWriter<Person> writer(DataSource dataSource) {//自動注入dataSource
        //使用jdbc批處理的JdbcBatchItemWriter寫數據到數據庫
        JdbcBatchItemWriter<Person> writer = new JdbcBatchItemWriter<Person>();
        writer.setItemSqlParameterSourceProvider(new
                BeanPropertyItemSqlParameterSourceProvider<Person>());
        //要執行批處理的sql語句
        String sql = "insert into person " + "(name,age,nation,address) " +
                "values (:name, :age, :nation, :address)";
        writer.setSql(sql);
        writer.setDataSource(dataSource);
        return writer;
    }

    @Bean
    public JobRepository jobRepository(DataSource dataSource,
              PlatformTransactionManager transactionManager) throws Exception {
        JobRepositoryFactoryBean jobRepositoryFactoryBean = new
                JobRepositoryFactoryBean();
        jobRepositoryFactoryBean.setDataSource(dataSource);
        jobRepositoryFactoryBean.setTransactionManager(transactionManager);
        jobRepositoryFactoryBean.setDatabaseType("mysql");
        return jobRepositoryFactoryBean.getObject();
    }

    @Bean
    public SimpleJobLauncher jobLauncher(DataSource dataSource,
               PlatformTransactionManager transactionManager) throws Exception {
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(jobRepository(dataSource, transactionManager));
        return jobLauncher;
    }

    @Bean
    public Job importJob(JobBuilderFactory jobs, Step s1) {
        return jobs.get("importJob")
                .incrementer(new RunIdIncrementer())
                .flow(s1)
                .end()
                .listener(csvJobListener())
                .build();
    }

    @Bean
    public Step step1(StepBuilderFactory stepBuilderFactory, ItemReader<Person>
                      reader, ItemWriter<Person> writer,
                      ItemProcessor<Person, Person> processor) {
        return stepBuilderFactory
                .get("step1")
                .<Person, Person>chunk(65000)//每次提交65000條數據
                .reader(reader)
                .processor(processor)
                .writer(writer)
                .build();
    }

    @Bean
    public CsvJobListener csvJobListener() {
        return new CsvJobListener();
    }

    @Bean
    public Validator<Person> csvBeanValidator() {
        return new CsvBeanValidator<Person>();
    }

}

3.Job監聽mysql

    用於監聽Job的執行狀況,需定義一個類實現JobExecutionListener,並在定義Job的Bean上綁定該監聽器git

定義:spring

public class CsvJobListener implements JobExecutionListener {

    long startTime;
    long endTime;

    @Override
    public void beforeJob(JobExecution jobExecution) {
        startTime = System.currentTimeMillis();
        System.out.println("任務處理開始");
    }

    @Override
    public void afterJob(JobExecution jobExecution) {
        endTime = System.currentTimeMillis();
        System.out.println("任務處理結束");
        System.out.println("耗時:" + (endTime - startTime) + "ms");
    }
}

註冊:sql

@Bean
public Job importJob(JobBuilderFactory jobs, Step s1) {
    return jobs.get("importJob")
            .incrementer(new RunIdIncrementer())
            .flow(s1)
            .end()
            .listener(csvJobListener())
            .build();
}
@Bean
public CsvJobListener csvJobListener() {
    return new CsvJobListener();
}

4.數據讀取數據庫

    Spring Batch 提供了大量的ItemReader實現app

5.數據處理和校驗框架

  • 經過ItemProcessor接口實現來完成
/**
 * 數據處理
 * Created by Administrator on 2016/12/8.
 */
public class CsvItemProcessor extends ValidatingItemProcessor<Person> {

    @Override
    public Person process(Person item) throws ValidationException {
        super.process(item);//調用自定義校驗器

        //簡單數據處理
        if (item.getNation().equals("漢族")) {
            item.setNation("01");
        } else {
            item.setNation("02");
        }
        return item;
    }
}

 

/**
 * 數據校驗
 * Created by Administrator on 2016/12/8.
 */
public class CsvBeanValidator<T> implements Validator<T>, InitializingBean {

    private javax.validation.Validator validator;

    //Validator初始化
    @Override
    public void afterPropertiesSet() throws Exception {
        ValidatorFactory validatorFactory = Validation.buildDefaultValidatorFactory();
        validator = validatorFactory.usingContext().getValidator();
    }

    @Override
    public void validate(T value) throws ValidationException {
        //使用Validator的validate方法校驗
        Set<ConstraintViolation<T>> constraintViolations = validator.validate(value);
        if (constraintViolations.size() > 0 ) {
            StringBuilder message = new StringBuilder();
            for (ConstraintViolation<T> constraintViolation : constraintViolations) {
                message.append(constraintViolation.getMessage() + "\n");
            }
            throw new ValidationException(message.toString());
        }
    }
}

定義ItemProcessor時把自定義校驗器設置進去ide

//ItemProcessor定義
@Bean
    public ItemProcessor<Person, Person> processor() {
    //採用自定義的ItemProcessor的實現
    CsvItemProcessor processor = new CsvItemProcessor();
    //指定自定義檢驗器
    processor.setValidator(csvBeanValidator());
    return processor;
}
@Bean
public Validator<Person> csvBeanValidator() {
    return new CsvBeanValidator<Person>();
}

6.數據輸出,Spring Batch 提供了大量的ItemWriter的實現ui

7.計劃任務

    只需在普通計劃任務方法中只需JobLauncher的run方法便可

8.參數後置綁定

    能夠在JobParameters中綁定參數,定義Bean時使用@StepScope註解,而後經過@Value注入此參數

參數設置:

@RequestMapping("/imp")
public String imp(String fileName) throws Exception {
    String path = fileName + ".csv";
    jobParameters = new JobParametersBuilder()
            .addLong("time", System.currentTimeMillis())
            .addString("input.file.name", path)
            .toJobParameters();
    jobLauncher.run(importJob, jobParameters);
    return "ok";
}

定義Bean:

@Configuration
@EnableBatchProcessing
public class TriggerBatchConfig {
    //ItemReader定義
    @Bean
    @StepScope
    public FlatFileItemReader<Person> reader(@Value("#{jobParameters['input.file.name']}")
                                                         String pathToFile) throws Exception {
        //使用FlatFileItemReader讀取文件
        FlatFileItemReader<Person> reader = new FlatFileItemReader<Person>();
        //設置csv文件路徑
        reader.setResource(new ClassPathResource(pathToFile));
        //對csv文件的數據和領域模型類作對應映射
        reader.setLineMapper(new DefaultLineMapper<Person>(){{
            setLineTokenizer(new DelimitedLineTokenizer(){{
                setNames(new String[]{"name", "age", "nation", "address"});
            }});
            setFieldSetMapper(new BeanWrapperFieldSetMapper<Person>(){{
                setTargetType(Person.class);
            }});
        }});
        return reader;
    }

源碼:https://git.oschina.net/NeedLoser/Spring-Boot.git 的springbatch項目

相關文章
相關標籤/搜索