Spring Batch(2)——Job配置與運行

Spring Batch(1)——數據批處理概念 文中介紹了批處理的概念以及Spring Batch相關的使用場景,後續將會陸續說明在代碼層面如何使用。html

引入

Spring batch的引入很是簡單,只須要引入Spring Framework、Datasource以及Spring Batch。在Spring Boot體系下只需引入spring-boot-starter-batch 便可。他已經涵蓋了以上全部內容。spring

Job配置

Job接口有多種多樣的實現類,一般咱們使用configuration類來構建獲取一個Jobapp

@Bean
public Job footballJob() {
    return this.jobBuilderFactory.get("footballJob") //Job名稱
                     .start(playerLoad()) //Job Step
                     .next(gameLoad()) //Job Step
                     .next(playerSummarization()) //Job Step
                     .end()
                     .build();
}

上面的代碼定義了一個Job實例,而且在這個實例中包含了三個Step實例框架

重啓(啓動)配置

批處理的一個核心問題是須要定義重啓(啓動)時的一些行爲。當指定的JobInstanceJobExecution執行時候即認爲某個Job已經重啓(啓動)。理想狀態下,全部的任務都應該能夠從它們以前中斷的位置啓動,可是某些狀況下這樣作是沒法實現的。開發人員能夠關閉重啓機制或認爲每次啓動都是新的JobInstance異步

@Bean
public Job footballJob() {
    return this.jobBuilderFactory.get("footballJob")
                     .preventRestart() //防止重啓
                     ...
                     .build();
}

監聽Job Execution

當任務執行完畢或開始執行時,須要執行一些處理工做。這個時候能夠使用JobExecutionListener分佈式

public interface JobExecutionListener {
    void beforeJob(JobExecution jobExecution);
    void afterJob(JobExecution jobExecution);
}

添加方式:ide

@Bean
public Job footballJob() {
    return this.jobBuilderFactory.get("footballJob")
                     .listener(sampleListener()) //JobExecutionListener的實現類
                     ...
                     .build();
}

須要注意的是afterJob方法不管批處理任務成功仍是失敗都會被執行,因此增長如下判斷:spring-boot

public void afterJob(JobExecution jobExecution){
    if( jobExecution.getStatus() == BatchStatus.COMPLETED ){
        //job success
    }
    else if(jobExecution.getStatus() == BatchStatus.FAILED){
        //job failure
    }
}

除了直接實現接口還能夠用 @BeforeJob 和 @AfterJob 註解。ui

Java配置

在Spring Batch 2.2.0版本以後(Spring 3.0+)支持純Java配置。其核心是@EnableBatchProcessing註解和兩個構造器。@EnableBatchProcessing的做用相似於Spring中的其餘@Enable*,使用@EnableBatchProcessing以後會提供一個基本的配置用於執行批處理任務。對應的會有一系列StepScope實例被注入到Ioc容器中:JobRepositoryJobLauncherJobRegistryPlatformTransactionManagerJobBuilderFactory以及StepBuilderFactorythis

配置的核心接口是BatchConfigurer,默認狀況下須要在容器中指定DataSource,該數據源用於JobRepository相關的表。開發的過程當中能夠使用自定義的BatchConfigurer實現來提供以上全部的Bean。一般狀況下能夠擴展重載DefaultBatchConfigurer類中的Getter方法用於實現部分自定義功能:

@Bean
public BatchConfigurer batchConfigurer() {
    return new DefaultBatchConfigurer() {
        @Override
        public PlatformTransactionManager getTransactionManager() {
            return new MyTransactionManager();
        }
    };
}

使用了@EnableBatchProcessing以後開發人員能夠使用如下的方法來配置一個Job:

@Configuration
@EnableBatchProcessing
@Import(DataSourceConfiguration.class)
public class AppConfig {

    @Autowired
    private JobBuilderFactory jobs;

    @Autowired
    private StepBuilderFactory steps;

    @Bean
    public Job job(@Qualifier("step1") Step step1, @Qualifier("step2") Step step2) {
        return jobs.get("myJob").start(step1).next(step2).build();
    }

    @Bean
    protected Step step1(ItemReader<Person> reader,
                         ItemProcessor<Person, Person> processor,
                         ItemWriter<Person> writer) {
        return steps.get("step1")
            .<Person, Person> chunk(10)
            .reader(reader)
            .processor(processor)
            .writer(writer)
            .build();
    }

    @Bean
    protected Step step2(Tasklet tasklet) {
        return steps.get("step2")
            .tasklet(tasklet)
            .build();
    }
}

JobRepository配置

一旦使用了@EnableBatchProcessing 註解,JobRepository即會被注入到IoCs容器中並自動使用容器中的DataSourceJobRepository用於處理批處理表的CURD,整個Spring Batch的運行都會使用到它。除了使用容器中默認的DataSoruce以及其餘組件,還能夠在BatchConfigurer中進行配置:

@Override
protected JobRepository createJobRepository() throws Exception {
    JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
    factory.setDataSource(dataSource);
    factory.setTransactionManager(transactionManager);
    factory.setIsolationLevelForCreate("ISOLATION_SERIALIZABLE");
    factory.setTablePrefix("BATCH_");
    factory.setMaxVarCharLength(1000);
    return factory.getObject();
}

在代碼中能夠看到,設置JobRepository須要DataSourceTransactionManager,若是沒有指定將會使用容器中的默認配置。

JobRepository的事物配置

默認狀況下框架爲JobRepository提供了默認PlatformTransactionManager事物管理。它用於確保批處理執行過程當中的元數據正確的寫入到指定數據源中。若是缺少事物,那麼框架產生元數據就沒法和整個處理過程徹底契合。

以下圖,在BatchConfigurer中的setIsolationLevelForCreate方法中能夠指定事物的隔離等級:

protected JobRepository createJobRepository() throws Exception {
    JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
    factory.setDataSource(dataSource);
    factory.setTransactionManager(transactionManager);
    factory.setIsolationLevelForCreate("ISOLATION_REPEATABLE_READ");
    return factory.getObject();
}

setIsolationLevelForCreate方法支持2個值:ISOLATION_SERIALIZABLEISOLATION_REPEATABLE_READ,前者是默認配置,相似於@Transactional(isolation = Isolation.SERIALIZABLE),表示查詢和寫入都是一次事物,會對事物進行嚴格的鎖定,當事物完成提交後才能進行其餘的讀寫操做,容易死鎖。後者是讀事物開放,寫事物鎖定。任什麼時候候均可以快速的讀取數據,可是寫入事物有嚴格的事物機制。當一個事物掛起某些記錄時,其餘寫操做必須排隊。

修改表名稱

默認狀況下,JobRepository管理的表都以*BATCH_*開頭。須要時能夠修改前綴:

// This would reside in your BatchConfigurer implementation
@Override
protected JobRepository createJobRepository() throws Exception {
    JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
    factory.setDataSource(dataSource);
    factory.setTransactionManager(transactionManager);
    factory.setTablePrefix("SYSTEM.TEST_"); //修改前綴
    return factory.getObject();
}

內存級存儲

Spring Batch支持將運行時的狀態數據(元數據)僅保存在內存中。重載JobRepository不設置DataSource便可:

@Override
protected JobRepository createJobRepository() throws Exception {
    MapJobRepositoryFactoryBean factory = new MapJobRepositoryFactoryBean();
    factory.setTransactionManager(transactionManager);
    return factory.getObject();
}

須要注意的是,內存級存儲沒法知足分佈式系統。

JobLauncher配置

啓用了@EnableBatchProcessing以後JobLauncher會自動注入到容器中以供使用。此外能夠自行進行配置:

@Override
protected JobLauncher createJobLauncher() throws Exception {
    SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
    jobLauncher.setJobRepository(jobRepository);
    jobLauncher.afterPropertiesSet();
    return jobLauncher;
}

JobLauncher惟一的必要依賴只有JobRepository。以下圖,Job的執行一般是一個同步過程: Job同步執行

能夠經過修改TaskExecutor來指定Job的執行過程:

@Bean
public JobLauncher jobLauncher() {
    SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
    jobLauncher.setJobRepository(jobRepository());
    jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor()); //轉換爲異步任務
    jobLauncher.afterPropertiesSet();
    return jobLauncher;
}

這樣執行過程變爲:

Job異步過程

運行一個Job

以一個Http爲例:

@Controller
public class JobLauncherController {

    @Autowired
    JobLauncher jobLauncher;

    @Autowired
    Job job;

    @RequestMapping("/jobLauncher.html")
    public void handle() throws Exception{
        jobLauncher.run(job, new JobParameters());
    }
}

單單是配置好Job是確定沒法執行的,還須要對Step進行配置。後面會陸續介紹。

相關文章
相關標籤/搜索