Spring Batch 批處理(6) - ItemProcessor

ItemProcessor

<br/>spring

在開發過程當中,咱們常常須要讀取數據後,通過一系列業務邏輯的操做,進而寫入數據到指定持久化過程。Spring Batch爲咱們提供了ItemProcessor接口進行數據處理。app

1.ItemProcessor:spring-batch中數據處理的過程ide

2.ItemProcessor主要用於實現業務邏輯,驗證,過濾,等ui

3.Spring-batch爲咱們提供ItemProcessor<I,O>這個接口,傳入一個類型I,而後由Processor處理成爲Othis

public interface ItemProcessor<I, O> {
    O process(I item) throws Exception;
}

<br/> <br/>url

咱們能夠構建CompositeItemProcessor 的Bean, 在一個Step中能夠使用多個Processor來按照順序處理業務。 咱們能夠構建CompositeItemProcessor 的Bean,存儲多個Processor,再與Step綁定; <br/>.net

@Autowired
    private JobBuilderFactory jobBuilderFactory;
 
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
 
    @Autowired
    private DataSource dataSource;
 
    @Autowired
    private ItemProcessor<Customer, Customer> firstNameUpperCaseProcessor;
 
    @Autowired
    private ItemProcessor<Customer, Customer> idFilterProcessor;
 
    @Bean
    public Job processorDemoJob() throws Exception {
        return jobBuilderFactory.get("processorDemoJob")
                .start(processorDemoStep())
                .build();
 
    }
 
    @Bean
    public Step processorDemoStep() throws Exception {
        return stepBuilderFactory.get("processorDemoStep")
                .<Customer,Customer>chunk(100)
                .reader(dbJdbcDemoReader())
                .processor(processorDemoProcessor())
                .writer(flatFileDemoFlatFileWriter())
                .build();
    }
 
    @Bean
    public CompositeItemProcessor<Customer,Customer> processorDemoProcessor(){
        CompositeItemProcessor<Customer,Customer> processor = new CompositeItemProcessor<>();
			  // 多種處理方式
        List<ItemProcessor<Customer,Customer>> list = new ArrayList<>();
        list.add(firstNameUpperCaseProcessor);
        list.add(idFilterProcessor);
        processor.setDelegates(list);
        
        return processor;
    }
 
    @Bean
    @StepScope
    public JdbcPagingItemReader<Customer> dbJdbcDemoReader() {
        JdbcPagingItemReader<Customer> reader = new JdbcPagingItemReader<>();
 
        reader.setDataSource(this.dataSource);
        reader.setFetchSize(100);
        reader.setRowMapper((rs,rowNum)->{
            return Customer.builder().id(rs.getLong("id"))
                    .firstName(rs.getString("firstName"))
                    .lastName(rs.getString("lastName"))
                    .birthdate(rs.getString("birthdate"))
                    .build();
 
        });
 
        MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
        queryProvider.setSelectClause("id, firstName, lastName, birthdate");
        queryProvider.setFromClause("from Customer");
        Map<String, Order> sortKeys = new HashMap<>(1);
        sortKeys.put("id", Order.ASCENDING);
        queryProvider.setSortKeys(sortKeys);
 
        reader.setQueryProvider(queryProvider);
 
        return reader;
 
    }
 
    @Bean
    public FlatFileItemWriter<Customer> flatFileDemoFlatFileWriter() throws Exception {
        FlatFileItemWriter<Customer> itemWriter = new FlatFileItemWriter<>();
        String path = File.createTempFile("customerInfo",".data").getAbsolutePath();
        System.out.println(">> file is created in: " + path);
        itemWriter.setResource(new FileSystemResource(path));
 
        itemWriter.setLineAggregator(new MyCustomerLineAggregator());
        itemWriter.afterPropertiesSet();
 
        return itemWriter;
 
    }

<br/> <br/>code

具體ItemProcessor實現blog

@Component
public class FirstNameUpperCaseProcessor implements ItemProcessor<Customer,Customer> {
    @Override
    public Customer process(Customer item) throws Exception {
		    // FirstName轉大寫
        return new Customer(item.getId(),item.getFirstName().toUpperCase(),item.getLastName(),
                item.getBirthdate());
    }
}
 

@Component
public class IdFilterProcessor implements ItemProcessor<Customer,Customer> {
    @Override
    public Customer process(Customer item) throws Exception {
		    // 過濾id爲基數的數據
        if (item.getId() % 2 == 0){
            return item;
        } else {
            return null;
        }
    }
}

<br/> <br/> 輸出結果接口

file

<br/> <br/> <br/>

參考:

https://blog.csdn.net/wuzhiwei549/article/details/88622281

相關文章
相關標籤/搜索