主要實現批量數據的處理,我對batch進行的封裝,提出了jobBase類型,具體job須要實現它便可。Spring Batch 不只提供了統一的讀寫接口、豐富的任務處理方式、靈活的事務管理及併發處理,同時還支持日誌、監控、任務重啓與跳過等特性,大大簡化了批處理應用開發,將開發人員從複雜的任務配置管理過程當中解放出來,使他們能夠更多地去關注核心的業務處理過程。spring
/** * springBatch的job基礎類. */ public abstract class JobBase<T> { /** * 批次. */ protected int chunkCount = 5000; /** * 監聽器. */ private JobExecutionListener jobExecutionListener; /** * 處理器. */ private ValidatingItemProcessor<T> validatingItemProcessor; /** * job名稱. */ private String jobName; /** * 檢驗器. */ private Validator<T> validator; @Autowired private JobBuilderFactory job; @Autowired private StepBuilderFactory step; /** * 初始化. * * @param jobName job名稱 * @param jobExecutionListener 監聽器 * @param validatingItemProcessor 處理器 * @param validator 檢驗 */ public JobBase(String jobName, JobExecutionListener jobExecutionListener, ValidatingItemProcessor<T> validatingItemProcessor, Validator<T> validator) { this.jobName = jobName; this.jobExecutionListener = jobExecutionListener; this.validatingItemProcessor = validatingItemProcessor; this.validator = validator; } /** * job初始化與啓動. */ public Job getJob() throws Exception { return job.get(jobName).incrementer(new RunIdIncrementer()) .start(syncStep()) .listener(jobExecutionListener) .build(); } /** * 執行步驟. * * @return */ public Step syncStep() throws Exception { return step.get("step1") .<T, T>chunk(chunkCount) .reader(reader()) .processor(processor()) .writer(writer()) .build(); } /** * 單條處理數據. * * @return */ public ItemProcessor<T, T> processor() { validatingItemProcessor.setValidator(processorValidator()); return validatingItemProcessor; } /** * 校驗數據. * * @return */ @Bean public Validator<T> processorValidator() { return validator; } /** * 批量讀數據. * * @return * @throws Exception */ public abstract ItemReader<T> reader() throws Exception; /** * 批量寫數據. * * @return */ @Bean public abstract ItemWriter<T> writer(); }
主要規定了公用方法的執行策略,而具體的job名稱,讀,寫仍是須要具體JOB去實現的。sql
@Configuration @EnableBatchProcessing public class SyncPersonJob extends JobBase<Person> { @Autowired private DataSource dataSource; @Autowired @Qualifier("primaryJdbcTemplate") private JdbcTemplate jdbcTemplate; /** * 初始化,規則了job名稱和監視器. */ public SyncPersonJob() { super("personJob", new PersonJobListener(), new PersonItemProcessor(), new BeanValidator<>()); } @Override public ItemReader<Person> reader() throws Exception { StringBuffer sb = new StringBuffer(); sb.append("select * from person"); String sql = sb.toString(); JdbcCursorItemReader<Person> jdbcCursorItemReader = new JdbcCursorItemReader<>(); jdbcCursorItemReader.setSql(sql); jdbcCursorItemReader.setRowMapper(new BeanPropertyRowMapper<>(Person.class)); jdbcCursorItemReader.setDataSource(dataSource); return jdbcCursorItemReader; } @Override @Bean("personJobWriter") public ItemWriter<Person> writer() { JdbcBatchItemWriter<Person> writer = new JdbcBatchItemWriter<Person>(); writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Person>()); String sql = "insert into person_export " + "(id,name,age,nation,address) " + "values(:id, :name, :age, :nation,:address)"; writer.setSql(sql); writer.setDataSource(dataSource); return writer; } }
注意,須要爲每一個job的write啓個名稱,不然在多job時,write將會被打亂api
/** * 批量寫數據. * * @return */ @Override @Bean("personVerson2JobWriter") public ItemWriter<Person> writer() { }
@Autowired SyncPersonJob syncPersonJob; @Autowired JobLauncher jobLauncher; void exec(Job job) throws Exception { JobParameters jobParameters = new JobParametersBuilder() .addLong("time", System.currentTimeMillis()) .toJobParameters(); jobLauncher.run(job, jobParameters); } @RequestMapping("/run1") public String run1() throws Exception { exec(syncPersonJob.getJob()); return "personJob success"; }