以上Spring Batch 主要組成部分只須要註冊成Spring的Bean便可,在配置類上使用@EnableBatchProcession註解開啓批處理支持java
package com.springbatch.config; import com.springbatch.batch.CsvBeanValidator; import com.springbatch.batch.CsvItemProcessor; import com.springbatch.batch.CsvJobListener; import com.springbatch.entity.Person; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.launch.support.RunIdIncrementer; import org.springframework.batch.core.launch.support.SimpleJobLauncher; import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean; import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemWriter; import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider; import org.springframework.batch.item.database.JdbcBatchItemWriter; import org.springframework.batch.item.file.FlatFileItemReader; import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper; import org.springframework.batch.item.file.mapping.DefaultLineMapper; import org.springframework.batch.item.file.transform.DelimitedLineTokenizer; import org.springframework.batch.item.validator.Validator; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.ClassPathResource; import org.springframework.transaction.PlatformTransactionManager; import javax.sql.DataSource; /**Batch配置類 * Created by Administrator on 2016/12/8. */ @Configuration //開啓批處理支持 @EnableBatchProcessing public class CsvBathConfig { //ItemReader定義 @Bean public ItemReader<Person> reader() throws Exception { //使用FlatFileItemReader讀取文件 FlatFileItemReader<Person> reader = new FlatFileItemReader<Person>(); //設置csv文件路徑 reader.setResource(new ClassPathResource("people.csv")); //對csv文件的數據和領域模型類作對應映射 reader.setLineMapper(new DefaultLineMapper<Person>(){{ setLineTokenizer(new DelimitedLineTokenizer(){{ setNames(new String[]{"name", "age", "nation", "address"}); }}); setFieldSetMapper(new BeanWrapperFieldSetMapper<Person>(){{ setTargetType(Person.class); }}); }}); return reader; } //ItemProcessor定義 @Bean public ItemProcessor<Person, Person> processor() { //採用自定義的ItemProcessor的實現 CsvItemProcessor processor = new CsvItemProcessor(); //指定自定義檢驗器 processor.setValidator(csvBeanValidator()); return processor; } //ItemWriter定義 @Bean public ItemWriter<Person> writer(DataSource dataSource) {//自動注入dataSource //使用jdbc批處理的JdbcBatchItemWriter寫數據到數據庫 JdbcBatchItemWriter<Person> writer = new JdbcBatchItemWriter<Person>(); writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Person>()); //要執行批處理的sql語句 String sql = "insert into person " + "(name,age,nation,address) " + "values (:name, :age, :nation, :address)"; writer.setSql(sql); writer.setDataSource(dataSource); return writer; } @Bean public JobRepository jobRepository(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception { JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean(); jobRepositoryFactoryBean.setDataSource(dataSource); jobRepositoryFactoryBean.setTransactionManager(transactionManager); jobRepositoryFactoryBean.setDatabaseType("mysql"); return jobRepositoryFactoryBean.getObject(); } @Bean public SimpleJobLauncher jobLauncher(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception { SimpleJobLauncher jobLauncher = new SimpleJobLauncher(); jobLauncher.setJobRepository(jobRepository(dataSource, transactionManager)); return jobLauncher; } @Bean public Job importJob(JobBuilderFactory jobs, Step s1) { return jobs.get("importJob") .incrementer(new RunIdIncrementer()) .flow(s1) .end() .listener(csvJobListener()) .build(); } @Bean public Step step1(StepBuilderFactory stepBuilderFactory, ItemReader<Person> reader, ItemWriter<Person> writer, ItemProcessor<Person, Person> processor) { return stepBuilderFactory .get("step1") .<Person, Person>chunk(65000)//每次提交65000條數據 .reader(reader) .processor(processor) .writer(writer) .build(); } @Bean public CsvJobListener csvJobListener() { return new CsvJobListener(); } @Bean public Validator<Person> csvBeanValidator() { return new CsvBeanValidator<Person>(); } }
3.Job監聽mysql
用於監聽Job的執行狀況,需定義一個類實現JobExecutionListener,並在定義Job的Bean上綁定該監聽器git
定義:spring
public class CsvJobListener implements JobExecutionListener { long startTime; long endTime; @Override public void beforeJob(JobExecution jobExecution) { startTime = System.currentTimeMillis(); System.out.println("任務處理開始"); } @Override public void afterJob(JobExecution jobExecution) { endTime = System.currentTimeMillis(); System.out.println("任務處理結束"); System.out.println("耗時:" + (endTime - startTime) + "ms"); } }
註冊:sql
@Bean public Job importJob(JobBuilderFactory jobs, Step s1) { return jobs.get("importJob") .incrementer(new RunIdIncrementer()) .flow(s1) .end() .listener(csvJobListener()) .build(); }
@Bean public CsvJobListener csvJobListener() { return new CsvJobListener(); }
4.數據讀取數據庫
Spring Batch 提供了大量的ItemReader實現app
5.數據處理和校驗框架
/** * 數據處理 * Created by Administrator on 2016/12/8. */ public class CsvItemProcessor extends ValidatingItemProcessor<Person> { @Override public Person process(Person item) throws ValidationException { super.process(item);//調用自定義校驗器 //簡單數據處理 if (item.getNation().equals("漢族")) { item.setNation("01"); } else { item.setNation("02"); } return item; } }
/** * 數據校驗 * Created by Administrator on 2016/12/8. */ public class CsvBeanValidator<T> implements Validator<T>, InitializingBean { private javax.validation.Validator validator; //Validator初始化 @Override public void afterPropertiesSet() throws Exception { ValidatorFactory validatorFactory = Validation.buildDefaultValidatorFactory(); validator = validatorFactory.usingContext().getValidator(); } @Override public void validate(T value) throws ValidationException { //使用Validator的validate方法校驗 Set<ConstraintViolation<T>> constraintViolations = validator.validate(value); if (constraintViolations.size() > 0 ) { StringBuilder message = new StringBuilder(); for (ConstraintViolation<T> constraintViolation : constraintViolations) { message.append(constraintViolation.getMessage() + "\n"); } throw new ValidationException(message.toString()); } } }
定義ItemProcessor時把自定義校驗器設置進去ide
//ItemProcessor定義 @Bean public ItemProcessor<Person, Person> processor() { //採用自定義的ItemProcessor的實現 CsvItemProcessor processor = new CsvItemProcessor(); //指定自定義檢驗器 processor.setValidator(csvBeanValidator()); return processor; }
@Bean public Validator<Person> csvBeanValidator() { return new CsvBeanValidator<Person>(); }
6.數據輸出,Spring Batch 提供了大量的ItemWriter的實現ui
7.計劃任務
只需在普通計劃任務方法中只需JobLauncher的run方法便可
8.參數後置綁定
能夠在JobParameters中綁定參數,定義Bean時使用@StepScope註解,而後經過@Value注入此參數
參數設置:
@RequestMapping("/imp") public String imp(String fileName) throws Exception { String path = fileName + ".csv"; jobParameters = new JobParametersBuilder() .addLong("time", System.currentTimeMillis()) .addString("input.file.name", path) .toJobParameters(); jobLauncher.run(importJob, jobParameters); return "ok"; }
定義Bean:
@Configuration @EnableBatchProcessing public class TriggerBatchConfig { //ItemReader定義 @Bean @StepScope public FlatFileItemReader<Person> reader(@Value("#{jobParameters['input.file.name']}") String pathToFile) throws Exception { //使用FlatFileItemReader讀取文件 FlatFileItemReader<Person> reader = new FlatFileItemReader<Person>(); //設置csv文件路徑 reader.setResource(new ClassPathResource(pathToFile)); //對csv文件的數據和領域模型類作對應映射 reader.setLineMapper(new DefaultLineMapper<Person>(){{ setLineTokenizer(new DelimitedLineTokenizer(){{ setNames(new String[]{"name", "age", "nation", "address"}); }}); setFieldSetMapper(new BeanWrapperFieldSetMapper<Person>(){{ setTargetType(Person.class); }}); }}); return reader; }
源碼:https://git.oschina.net/NeedLoser/Spring-Boot.git 的springbatch項目