在 Spring Batch(1)——數據批處理概念 文中介紹了批處理的概念以及Spring Batch相關的使用場景,後續將會陸續說明在代碼層面如何使用。html
Spring batch的引入很是簡單,只須要引入Spring Framework、Datasource以及Spring Batch。在Spring Boot體系下只需引入spring-boot-starter-batch
便可。他已經涵蓋了以上全部內容。spring
Job
接口有多種多樣的實現類,一般咱們使用configuration類來構建獲取一個Job
:app
@Bean public Job footballJob() { return this.jobBuilderFactory.get("footballJob") //Job名稱 .start(playerLoad()) //Job Step .next(gameLoad()) //Job Step .next(playerSummarization()) //Job Step .end() .build(); }
上面的代碼定義了一個Job
實例,而且在這個實例中包含了三個Step實例框架
批處理的一個核心問題是須要定義重啓(啓動)時的一些行爲。當指定的JobInstance
被JobExecution
執行時候即認爲某個Job
已經重啓(啓動)。理想狀態下,全部的任務都應該能夠從它們以前中斷的位置啓動,可是某些狀況下這樣作是沒法實現的。開發人員能夠關閉重啓機制或認爲每次啓動都是新的JobInstance
:異步
@Bean public Job footballJob() { return this.jobBuilderFactory.get("footballJob") .preventRestart() //防止重啓 ... .build(); }
當任務執行完畢或開始執行時,須要執行一些處理工做。這個時候能夠使用JobExecutionListener
:分佈式
public interface JobExecutionListener { void beforeJob(JobExecution jobExecution); void afterJob(JobExecution jobExecution); }
添加方式:ide
@Bean public Job footballJob() { return this.jobBuilderFactory.get("footballJob") .listener(sampleListener()) //JobExecutionListener的實現類 ... .build(); }
須要注意的是afterJob
方法不管批處理任務成功仍是失敗都會被執行,因此增長如下判斷:spring-boot
public void afterJob(JobExecution jobExecution){ if( jobExecution.getStatus() == BatchStatus.COMPLETED ){ //job success } else if(jobExecution.getStatus() == BatchStatus.FAILED){ //job failure } }
除了直接實現接口還能夠用 @BeforeJob 和 @AfterJob 註解。ui
在Spring Batch 2.2.0版本以後(Spring 3.0+)支持純Java配置。其核心是@EnableBatchProcessing
註解和兩個構造器。@EnableBatchProcessing
的做用相似於Spring中的其餘@Enable*,使用@EnableBatchProcessing
以後會提供一個基本的配置用於執行批處理任務。對應的會有一系列StepScope
實例被注入到Ioc容器中:JobRepository
、JobLauncher
、JobRegistry
、PlatformTransactionManager
、JobBuilderFactory
以及StepBuilderFactory
。this
配置的核心接口是BatchConfigurer
,默認狀況下須要在容器中指定DataSource
,該數據源用於JobRepository相關的表。開發的過程當中能夠使用自定義的BatchConfigurer
實現來提供以上全部的Bean。一般狀況下能夠擴展重載DefaultBatchConfigurer
類中的Getter方法用於實現部分自定義功能:
@Bean public BatchConfigurer batchConfigurer() { return new DefaultBatchConfigurer() { @Override public PlatformTransactionManager getTransactionManager() { return new MyTransactionManager(); } }; }
使用了@EnableBatchProcessing
以後開發人員能夠使用如下的方法來配置一個Job:
@Configuration @EnableBatchProcessing @Import(DataSourceConfiguration.class) public class AppConfig { @Autowired private JobBuilderFactory jobs; @Autowired private StepBuilderFactory steps; @Bean public Job job(@Qualifier("step1") Step step1, @Qualifier("step2") Step step2) { return jobs.get("myJob").start(step1).next(step2).build(); } @Bean protected Step step1(ItemReader<Person> reader, ItemProcessor<Person, Person> processor, ItemWriter<Person> writer) { return steps.get("step1") .<Person, Person> chunk(10) .reader(reader) .processor(processor) .writer(writer) .build(); } @Bean protected Step step2(Tasklet tasklet) { return steps.get("step2") .tasklet(tasklet) .build(); } }
一旦使用了@EnableBatchProcessing
註解,JobRepository
即會被注入到IoCs容器中並自動使用容器中的DataSource
。JobRepository
用於處理批處理表的CURD,整個Spring Batch的運行都會使用到它。除了使用容器中默認的DataSoruce
以及其餘組件,還能夠在BatchConfigurer
中進行配置:
@Override protected JobRepository createJobRepository() throws Exception { JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean(); factory.setDataSource(dataSource); factory.setTransactionManager(transactionManager); factory.setIsolationLevelForCreate("ISOLATION_SERIALIZABLE"); factory.setTablePrefix("BATCH_"); factory.setMaxVarCharLength(1000); return factory.getObject(); }
在代碼中能夠看到,設置JobRepository
須要DataSource
和TransactionManager
,若是沒有指定將會使用容器中的默認配置。
默認狀況下框架爲JobRepository
提供了默認PlatformTransactionManager
事物管理。它用於確保批處理執行過程當中的元數據正確的寫入到指定數據源中。若是缺少事物,那麼框架產生元數據就沒法和整個處理過程徹底契合。
以下圖,在BatchConfigurer中的setIsolationLevelForCreate
方法中能夠指定事物的隔離等級:
protected JobRepository createJobRepository() throws Exception { JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean(); factory.setDataSource(dataSource); factory.setTransactionManager(transactionManager); factory.setIsolationLevelForCreate("ISOLATION_REPEATABLE_READ"); return factory.getObject(); }
setIsolationLevelForCreate
方法支持2個值:ISOLATION_SERIALIZABLE、ISOLATION_REPEATABLE_READ,前者是默認配置,相似於@Transactional(isolation = Isolation.SERIALIZABLE)
,表示查詢和寫入都是一次事物,會對事物進行嚴格的鎖定,當事物完成提交後才能進行其餘的讀寫操做,容易死鎖。後者是讀事物開放,寫事物鎖定。任什麼時候候均可以快速的讀取數據,可是寫入事物有嚴格的事物機制。當一個事物掛起某些記錄時,其餘寫操做必須排隊。
默認狀況下,JobRepository
管理的表都以*BATCH_*開頭。須要時能夠修改前綴:
// This would reside in your BatchConfigurer implementation @Override protected JobRepository createJobRepository() throws Exception { JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean(); factory.setDataSource(dataSource); factory.setTransactionManager(transactionManager); factory.setTablePrefix("SYSTEM.TEST_"); //修改前綴 return factory.getObject(); }
Spring Batch支持將運行時的狀態數據(元數據)僅保存在內存中。重載JobRepository
不設置DataSource
便可:
@Override protected JobRepository createJobRepository() throws Exception { MapJobRepositoryFactoryBean factory = new MapJobRepositoryFactoryBean(); factory.setTransactionManager(transactionManager); return factory.getObject(); }
須要注意的是,內存級存儲沒法知足分佈式系統。
啓用了@EnableBatchProcessing
以後JobLauncher
會自動注入到容器中以供使用。此外能夠自行進行配置:
@Override protected JobLauncher createJobLauncher() throws Exception { SimpleJobLauncher jobLauncher = new SimpleJobLauncher(); jobLauncher.setJobRepository(jobRepository); jobLauncher.afterPropertiesSet(); return jobLauncher; }
JobLauncher
惟一的必要依賴只有JobRepository
。以下圖,Job的執行一般是一個同步過程:
能夠經過修改TaskExecutor來指定Job的執行過程:
@Bean public JobLauncher jobLauncher() { SimpleJobLauncher jobLauncher = new SimpleJobLauncher(); jobLauncher.setJobRepository(jobRepository()); jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor()); //轉換爲異步任務 jobLauncher.afterPropertiesSet(); return jobLauncher; }
這樣執行過程變爲:
以一個Http爲例:
@Controller public class JobLauncherController { @Autowired JobLauncher jobLauncher; @Autowired Job job; @RequestMapping("/jobLauncher.html") public void handle() throws Exception{ jobLauncher.run(job, new JobParameters()); } }
單單是配置好Job
是確定沒法執行的,還須要對Step進行配置。後面會陸續介紹。