ItemProcessor
<br/>spring
在開發過程當中,咱們常常須要讀取數據後,通過一系列業務邏輯的操做,進而寫入數據到指定持久化過程。Spring Batch爲咱們提供了ItemProcessor接口進行數據處理。app
1.ItemProcessor:spring-batch中數據處理的過程ide
2.ItemProcessor主要用於實現業務邏輯,驗證,過濾,等ui
3.Spring-batch爲咱們提供ItemProcessor<I,O>這個接口,傳入一個類型I,而後由Processor處理成爲Othis
public interface ItemProcessor<I, O> { O process(I item) throws Exception; }
<br/> <br/>url
咱們能夠構建CompositeItemProcessor 的Bean, 在一個Step中能夠使用多個Processor來按照順序處理業務。 咱們能夠構建CompositeItemProcessor 的Bean,存儲多個Processor,再與Step綁定; <br/>.net
@Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Autowired private DataSource dataSource; @Autowired private ItemProcessor<Customer, Customer> firstNameUpperCaseProcessor; @Autowired private ItemProcessor<Customer, Customer> idFilterProcessor; @Bean public Job processorDemoJob() throws Exception { return jobBuilderFactory.get("processorDemoJob") .start(processorDemoStep()) .build(); } @Bean public Step processorDemoStep() throws Exception { return stepBuilderFactory.get("processorDemoStep") .<Customer,Customer>chunk(100) .reader(dbJdbcDemoReader()) .processor(processorDemoProcessor()) .writer(flatFileDemoFlatFileWriter()) .build(); } @Bean public CompositeItemProcessor<Customer,Customer> processorDemoProcessor(){ CompositeItemProcessor<Customer,Customer> processor = new CompositeItemProcessor<>(); // 多種處理方式 List<ItemProcessor<Customer,Customer>> list = new ArrayList<>(); list.add(firstNameUpperCaseProcessor); list.add(idFilterProcessor); processor.setDelegates(list); return processor; } @Bean @StepScope public JdbcPagingItemReader<Customer> dbJdbcDemoReader() { JdbcPagingItemReader<Customer> reader = new JdbcPagingItemReader<>(); reader.setDataSource(this.dataSource); reader.setFetchSize(100); reader.setRowMapper((rs,rowNum)->{ return Customer.builder().id(rs.getLong("id")) .firstName(rs.getString("firstName")) .lastName(rs.getString("lastName")) .birthdate(rs.getString("birthdate")) .build(); }); MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider(); queryProvider.setSelectClause("id, firstName, lastName, birthdate"); queryProvider.setFromClause("from Customer"); Map<String, Order> sortKeys = new HashMap<>(1); sortKeys.put("id", Order.ASCENDING); queryProvider.setSortKeys(sortKeys); reader.setQueryProvider(queryProvider); return reader; } @Bean public FlatFileItemWriter<Customer> flatFileDemoFlatFileWriter() throws Exception { FlatFileItemWriter<Customer> itemWriter = new FlatFileItemWriter<>(); String path = File.createTempFile("customerInfo",".data").getAbsolutePath(); System.out.println(">> file is created in: " + path); itemWriter.setResource(new FileSystemResource(path)); itemWriter.setLineAggregator(new MyCustomerLineAggregator()); itemWriter.afterPropertiesSet(); return itemWriter; }
<br/> <br/>code
具體ItemProcessor實現blog
@Component public class FirstNameUpperCaseProcessor implements ItemProcessor<Customer,Customer> { @Override public Customer process(Customer item) throws Exception { // FirstName轉大寫 return new Customer(item.getId(),item.getFirstName().toUpperCase(),item.getLastName(), item.getBirthdate()); } } @Component public class IdFilterProcessor implements ItemProcessor<Customer,Customer> { @Override public Customer process(Customer item) throws Exception { // 過濾id爲基數的數據 if (item.getId() % 2 == 0){ return item; } else { return null; } } }
<br/> <br/> 輸出結果接口
<br/> <br/> <br/>
參考: