批處理任務的主要業務邏輯都是在Step
中去完成的。能夠將Job
理解爲運行Step
的框架,而Step
理解爲業務功能。html
Step
是Job
中的工做單元,每個Step
涵蓋了單行記錄的處理閉環。下圖是一個Step
的簡要結構:spring
一個Step
一般涵蓋三個部分:讀數據(Reader)、處理數據(Processor)和寫數據(Writer)。可是並非全部的Step
都須要自身來完成數據的處理,好比存儲過程等方式是經過外部功能來完成,所以Spring Batch提供了2種Step的處理方式:1)面向分片的ChunkStep
,2)面向過程的TaskletStep
。可是基本上大部分狀況下都是使用面向分片的方式來解決問題。數據庫
在Step
中數據是按記錄(按行)處理的,可是每條記錄處理完畢以後立刻提交事物反而會致使IO的巨大壓力。所以Spring Batch提供了數據處理的分片功能。設置了分片以後,一次工做會從Read開始,而後交由給Processor處理。處理完畢後會進行聚合,待聚合到必定的數量的數據以後一次性調用Write將數據提交到物理數據庫。其過程大體爲:緩存
在Spring Batch中所謂的事物和數據事物的概念同樣,就是一次性提交多少數據。若是在聚合數據期間出現任何錯誤,全部的這些數據都將不執行寫入。安全
@Bean public Job sampleJob(JobRepository jobRepository, Step sampleStep) { return this.jobBuilderFactory.get("sampleJob") .repository(jobRepository) .start(sampleStep) .build(); } @Bean public Step sampleStep(PlatformTransactionManager transactionManager) { return this.stepBuilderFactory.get("sampleStep") .transactionManager(transactionManager) .<String, String>chunk(10) //分片配置 .reader(itemReader()) //reader配置 .writer(itemWriter()) //write配置 .build(); }
觀察sampleStep方法:app
PlatformTransactionManager
對事物進行管理。當配置好事物以後Spring Batch會自動對事物進行管理,無需開發人員顯示操做。是否使用ItemProcessor
是一個可選項。若是沒有Processor能夠將數據視爲讀取並直接寫入。框架
Step
使用PlatformTransactionManager
管理事物。每次事物提交的間隔根據chunk
方法中配置的數據執行。若是設置爲1,那麼在每一條數據處理完以後都會調用ItemWrite
進行提交。提交間隔設置過小,那麼會浪費須要多沒必要要的資源,提交間隔設置的太長,會致使事物鏈太長佔用空間,而且出現失敗會致使大量數據回滾。所以設定一個合理的間隔是很是必要的,這須要根據實際業務狀況、性能要求、以及數據安全程度來設定。若是沒有明確的評估目標,設置爲10~20較爲合適。異步
前文介紹了Job
的重啓,可是每次重啓對Step
也是有很大的影響的,所以須要特定的配置。jvm
某些Step
可能用於處理一些先決的任務,因此當Job再次重啓時這Step
就不必再執行,能夠經過設置startLimit來限定某個Step
重啓的次數。當設置爲1時候表示僅僅運行一次,而出現重啓時將再也不執行:ide
@Bean public Step step1() { return this.stepBuilderFactory.get("step1") .<String, String>chunk(10) .reader(itemReader()) .writer(itemWriter()) .startLimit(1) .build(); }
在單個JobInstance
的上下文中,若是某個Step
已經處理完畢(COMPLETED)那麼在默認狀況下重啓以後這個Step
並不會再執行。能夠經過設置allow-start-if-complete
爲true告知框架每次重啓該Step
都要執行:
@Bean public Step step1() { return this.stepBuilderFactory.get("step1") .<String, String>chunk(10) .reader(itemReader()) .writer(itemWriter()) .allowStartIfComplete(true) .build(); }
某些時候在任務處理單個記錄時中出現失敗並不該該中止任務,而應該跳過繼續處理下一條數據。是否跳過須要根據業務來斷定,所以框架提供了跳過機制交給開發人員使用。如何配置跳過機制:
@Bean public Step step1() { return this.stepBuilderFactory.get("step1") .<String, String>chunk(10) .reader(flatFileItemReader()) .writer(itemWriter()) .faultTolerant() .skipLimit(10) .skip(FlatFileParseException.class) .build(); }
代碼的含義是當處理過程當中拋出FlatFileParseException
異常時就跳過該條記錄的處理。skip-limit
(skipLimit方法)配置的參數表示當跳過的次數超過數值時則會致使整個Step
失敗,從而中止繼續運行。還能夠經過反向配置的方式來忽略某些異常:
@Bean public Step step1() { return this.stepBuilderFactory.get("step1") .<String, String>chunk(10) .reader(flatFileItemReader()) .writer(itemWriter()) .faultTolerant() .skipLimit(10) .skip(Exception.class) .noSkip(FileNotFoundException.class) .build(); }
skip
表示要當捕捉到Exception異常就跳過。可是Exception有不少繼承類,此時可使用noSkip
方法指定某些異常不能跳過。
當處理記錄出個異常以後並不但願他當即跳過或者中止運行,而是但願能夠屢次嘗試執行直到失敗:
@Bean public Step step1() { return this.stepBuilderFactory.get("step1") .<String, String>chunk(2) .reader(itemReader()) .writer(itemWriter()) .faultTolerant() .retryLimit(3) .retry(DeadlockLoserDataAccessException.class) .build(); }
retry(DeadlockLoserDataAccessException.class)
表示只有捕捉到該異常纔會重試,retryLimit(3)
表示最多重試3次,faultTolerant()
表示啓用對應的容錯功能。
默認狀況下,不管是設置了重試(retry)仍是跳過(skip),只要從Writer
拋出一個異常都會致使事物回滾。若是配置了skip機制,那麼在Reader
中拋出的異常不會致使回滾。有些從Writer
拋出一個異常並不須要回滾數據,noRollback
屬性爲Step
提供了沒必要進行事物回滾的異常配置:
@Bean public Step step1() { return this.stepBuilderFactory.get("step1") .<String, String>chunk(2) .reader(itemReader()) .writer(itemWriter()) .faultTolerant() .noRollback(ValidationException.class) //沒必要回滾的異常 .build(); }
一次Setp
分爲Reader
、Processor
和Writer
三個階段,這些階段統稱爲Item
。默認狀況下若是錯誤不是發生在Reader階段,那麼不必再去從新讀取一次數據。可是某些場景下須要Reader部分也須要從新執行,好比Reader是從一個JMS隊列中消費消息,當發生回滾的時候消息也會在隊列上重放,所以也要將Reader歸入到回滾的事物中,根據這個場景可使用readerIsTransactionalQueue
來配置數據重讀:
@Bean public Step step1() { return this.stepBuilderFactory.get("step1") .<String, String>chunk(2) .reader(itemReader()) .writer(itemWriter()) .readerIsTransactionalQueue() //數據重讀 .build(); }
事物的屬性包括隔離等級(isolation)、傳播方式(propagation)以及過時時間(timeout)。關於事物的控制詳見Spring Data Access的說明,下面是相關配置的方法:
@Bean public Step step1() { //配置事物屬性 DefaultTransactionAttribute attribute = new DefaultTransactionAttribute(); attribute.setPropagationBehavior(Propagation.REQUIRED.value()); attribute.setIsolationLevel(Isolation.DEFAULT.value()); attribute.setTimeout(30); return this.stepBuilderFactory.get("step1") .<String, String>chunk(2) .reader(itemReader()) .writer(itemWriter()) .transactionAttribute(attribute) //設置事物屬性 .build(); }
ItemStream
是用於每個階段(Reader、Processor、Writer)的「生命週期回調數據處理器」,後續的文章會詳細介紹ItemStream
。在4.×版本以後默認注入註冊了通用的ItemStream
。
有2種方式將ItemStream
註冊到Step
中,一是使用stream
方法:
@Bean public Step step1() { return this.stepBuilderFactory.get("step1") .<String, String>chunk(2) .reader(itemReader()) .writer(compositeItemWriter()) .stream(fileItemWriter1()) .stream(fileItemWriter2()) .build(); }
二是使用相關方法的代理:
@Bean public CompositeItemWriter compositeItemWriter() { List<ItemWriter> writers = new ArrayList<>(2); writers.add(fileItemWriter1()); writers.add(fileItemWriter2()); CompositeItemWriter itemWriter = new CompositeItemWriter(); itemWriter.setDelegates(writers); return itemWriter; }
在Step
執行的過程當中會產生各類各樣的事件,開發人員能夠利用各類Listener
接口對Step
及Item
進行監聽。一般在建立一個Step的時候添加攔截器:
@Bean public Step step1() { return this.stepBuilderFactory.get("step1") .<String, String>chunk(10) .reader(reader()) .writer(writer()) .listener(chunkListener()) //添加攔截器 .build(); }
Spring Batch提供了多個接口以知足不一樣事件的監聽。
StepExecutionListener
能夠看作一個通用的Step
攔截器,他的做用是在Step開始以前和結束以後進行攔截處理:
public interface StepExecutionListener extends StepListener { void beforeStep(StepExecution stepExecution); //Step執行以前 ExitStatus afterStep(StepExecution stepExecution); //Step執行完畢以後 }
在結束的時候開發人員能夠本身定義返回的ExitStatus
,用於配合流程控制(見後文)實現對整個Step執行過程的控制。
ChunkListener
是在數據事物發生的兩端被觸發。chunk
的配置決定了處理多少項記錄才進行一次事物提交,ChunkListener
的做用就是對一次事物開始以後或事物提交以後進行攔截:
public interface ChunkListener extends StepListener { void beforeChunk(ChunkContext context); //事物開始以後,ItemReader調用以前 void afterChunk(ChunkContext context); //事物提交以後 void afterChunkError(ChunkContext context); //事物回滾以後 }
若是沒有設定chunk也可使用ChunkListener
,它會被TaskletStep
調用(TaskletStep
見後文)。
該接口用於對Reader
相關的事件進行監控:
public interface ItemReadListener<T> extends StepListener { void beforeRead(); void afterRead(T item); void onReadError(Exception ex); }
beforeRead
在每次Reader
調用以前被調用,afterRead
在每次Reader
成功返回以後被調用,而onReadError
會在出現異常以後被調用,能夠將其用於記錄異常日誌。
ItemProcessListener
和ItemReadListener
相似,是圍繞着ItemProcessor
進行處理的:
public interface ItemProcessListener<T, S> extends StepListener { void beforeProcess(T item); //processor執行以前 void afterProcess(T item, S result); //processor直線成功以後 void onProcessError(T item, Exception e); //processor執行出現異常 }
ItemWriteListener
的功能和ItemReadListener
、ItemReadListener
相似,可是須要注意的是它接收和處理的數據對象是一個List
。List
的長度與chunk配置相關。
public interface ItemWriteListener<S> extends StepListener { void beforeWrite(List<? extends S> items); void afterWrite(List<? extends S> items); void onWriteError(Exception exception, List<? extends S> items); }
ItemReadListener
、ItemProcessListener
和ItemWriteListener
都提供了錯誤攔截處理的機制,可是沒有處理跳過(skip)的數據記錄。所以框架提供了SkipListener
來專門處理那麼被跳過的記錄:
public interface SkipListener<T,S> extends StepListener { void onSkipInRead(Throwable t); //Read期間致使跳過的異常 void onSkipInProcess(T item, Throwable t); //Process期間致使跳過的異常 void onSkipInWrite(S item, Throwable t); //Write期間致使跳過的異常 }
SkipListener
的價值是能夠將那些未能成功處理的記錄在某個位置保存下來,而後交給其餘批處理進一步解決,或者人工來處理。Spring Batch保證如下2個特徵:
SkipListener
始終在事物提交以前被調用,這樣能夠保證監聽器使用的事物資源不會被業務事物影響。面向分片(Chunk-oriented processing )的過程並非Step的惟一執行方式。好比用數據庫的存儲過程來處理數據,這個時候使用標準的Reader、Processor、Writer會很奇怪,針對這些狀況框架提供了TaskletStep
。
TaskletStep
是一個很是簡單的接口,僅有一個方法——execute
。TaskletStep
會反覆的調用這個方法直到獲取一個RepeatStatus.FINISHED
返回或者拋出一個異常。全部的Tasklet
調用都會包裝在一個事物中。
註冊一個TaskletStep
很是簡單,只要添加一個實現了Tasklet
接口的類便可:
@Bean public Step step1() { return this.stepBuilderFactory.get("step1") .tasklet(myTasklet()) //注入Tasklet的實現 .build(); }
TaskletStep
還支持適配器處理等,詳見官網說明。
默認狀況下。Step與Step之間是順序執行的,以下圖:
順序執行經過next
方法來標記:
@Bean public Job job() { return this.jobBuilderFactory.get("job") .start(stepA()) .next(stepB()) //順序執行 .next(stepC()) .build(); }
在順序執行的過程當中,在整個執行鏈條中有一個Step
執行失敗則整個Job
就會中止。可是經過條件執行,能夠指定各類狀況下的執行分支:
爲了實現更加複雜的控制,能夠經過Step
執行後的退出命名來定義條件分之。先看一個簡單的代碼:
@Bean public Job job() { return this.jobBuilderFactory.get("job") .start(stepA()) //啓動時執行的step .on("*").to(stepB()) //默認跳轉到stepB .from(stepA()).on("FAILED").to(stepC()) //當返回的ExitStatus爲"FAILED"時,執行。 .end() .build(); }
這裏使用*來表示默認處理,*是一個通配符表示處理任意字符串,對應的還可使用?表示匹配任意字符。在Spring Batch(1)——數據批處理概念一文中介紹了Step的退出都會有ExitStatus
,命名都來源於它。下面是一個更加全面的代碼。
public class SkipCheckingListener extends StepExecutionListenerSupport { public ExitStatus afterStep(StepExecution stepExecution) { String exitCode = stepExecution.getExitStatus().getExitCode(); if (!exitCode.equals(ExitStatus.FAILED.getExitCode()) && stepExecution.getSkipCount() > 0) { //當Skip的Item大於0時,則指定ExitStatus的內容 return new ExitStatus("COMPLETED WITH SKIPS"); } else { return null; } } }
攔截器指示當有一個以上被跳過的記錄時,返回的ExitStatus
爲"COMPLETED WITH SKIPS"。對應的控制流程:
@Bean public Job job() { return this.jobBuilderFactory.get("job") .start(step1()).on("FAILED").end() //執行失敗直接退出 .from(step1()).on("COMPLETED WITH SKIPS").to(errorPrint1()) //有跳過元素執行 errorPrint1() .from(step1()).on("*").to(step2()) //默認(成功)狀況下執行 Step2 .end() .build(); }
Spring Batch爲Job
提供了三種退出機制,這些機制爲批處理的執行提供了豐富的控制方法。在介紹退出機制以前須要回顧一下 數據批處理概念一文中關於StepExecution
的內容。在StepExecution
中有2個表示狀態的值,一個名爲status
,另一個名爲exitStatus
。前者也被稱爲BatchStatus
。
前面以及介紹了ExitStatus
的使用,他能夠控制Step執行鏈條的條件執行過程。除此以外BatchStatus
也會參與到過程的控制。
默認狀況下(沒有使用end
、fail
方法結束),Job
要順序執行直到退出,這個退出稱爲end
。這個時候,BatchStatus
=COMPLETED
、ExitStatus
=COMPLETED
,表示成功執行。
除了Step
鏈式處理天然退出,也能夠顯示調用end
來退出系統。看下面的例子:
@Bean public Job job() { return this.jobBuilderFactory.get("job") .start(step1()) //啓動 .next(step2()) //順序執行 .on("FAILED").end() .from(step2()).on("*").to(step3()) //條件執行 .end() .build(); }
上面的代碼,step1
到step2
是順序執行,當step2
的exitStatus
返回"FAILED"時則直接End退出。其餘狀況執行Step3
。
除了end
還可使用fail
退出,這個時候,BatchStatus
=FAILED
、ExitStatus
=EARLY TERMINATION
,表示執行失敗。這個狀態與End
最大的區別是Job
會嘗試重啓執行新的JobExecution
。看下面代碼的例子:
@Bean public Job job() { return this.jobBuilderFactory.get("job") .start(step1()) //執行step1 .next(step2()).on("FAILED").fail() //step2的ExitStatus=FAILED 執行fail .from(step2()).on("*").to(step3()) //不然執行step3 .end() .build(); }
Spring Batch還支持在指定的節點退出,退出後下次重啓會從中斷的點繼續執行。中斷的做用是某些批處理到某個步驟後須要人工干預,當干預完以後又接着處理:
@Bean public Job job() { return this.jobBuilderFactory.get("job") //若是step1的ExitStatus=COMPLETED則在step2中斷 .start(step1()).on("COMPLETED").stopAndRestart(step2()) //不然直接退出批處理 .end() .build(); }
能夠直接進行編碼來控制Step
之間的扭轉,Spring Batch提供了JobExecutionDecider
接口來協助分支管理:
public class MyDecider implements JobExecutionDecider { public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) { String status; if (someCondition()) { status = "FAILED"; } else { status = "COMPLETED"; } return new FlowExecutionStatus(status); } }
接着將MyDecider
做爲過濾器添加到配置過程當中:
@Bean public Job job() { return this.jobBuilderFactory.get("job") .start(step1()) .next(decider()).on("FAILED").to(step2()) .from(decider()).on("COMPLETED").to(step3()) .end() .build(); }
在線性處理過程當中,流程都是一個接着一個執行的。可是爲了知足某些特殊的須要,Spring Batch提供了執行的過程分裂並行Step
的方法。參看下面的Job
配置:
@Bean public Job job() { Flow flow1 = new FlowBuilder<SimpleFlow>("flow1") .start(step1()) .next(step2()) .build();//並行流程1 Flow flow2 = new FlowBuilder<SimpleFlow>("flow2") .start(step3()) .build();//並行流程2 return this.jobBuilderFactory.get("job") .start(flow1) .split(new SimpleAsyncTaskExecutor()) //建立一個異步執行任務 .add(flow2) .next(step4()) //2個分支執行完畢以後再執行step4。 .end() .build(); }
這裏表示flow1和flow2會並行執行,待2者執行成功後執行step4。
在Job
或Step
的任何位置,均可以獲取到統一配置的數據。好比使用標準的Spring Framework方式:
@Bean public FlatFileItemReader flatFileItemReader(@Value("${input.file.name}") String name) { return new FlatFileItemReaderBuilder<Foo>() .name("flatFileItemReader") .resource(new FileSystemResource(name)) ... }
當咱們經過配置文件(application.properties中 input.file.name=filepath
)或者jvm參數(-Dinput.file.name=filepath
)指定某些數據時,均可以經過這種方式獲取到對應的配置參數。
此外,也能夠從JobParameters
從獲取到Job
運行的上下文參數:
@StepScope @Bean public FlatFileItemReader flatFileItemReader(@Value("#{jobParameters['input.file.name']}") String name) { return new FlatFileItemReaderBuilder<Foo>() .name("flatFileItemReader") .resource(new FileSystemResource(name)) ... }
不管是JobExecution
仍是StepExecution
,其中的內容均可以經過這種方式去獲取參數,例如:
@StepScope @Bean public FlatFileItemReader flatFileItemReader(@Value("#{jobExecutionContext['input.file.name']}") String name) { return new FlatFileItemReaderBuilder<Foo>() .name("flatFileItemReader") .resource(new FileSystemResource(name)) ... }
或者
@StepScope @Bean public FlatFileItemReader flatFileItemReader(@Value("#{stepExecutionContext['input.file.name']}") String name) { return new FlatFileItemReaderBuilder<Foo>() .name("flatFileItemReader") .resource(new FileSystemResource(name)) ... }
注意看上面的代碼例子,都有一個@StepScope
註解。這是爲了進行後期綁定進行的標識。由於在Spring的IoCs容器進行初始化的階段並無任何的*Execution
在執行,進而也不存在任何*ExecutionContext
,因此這個時候根本沒法注入標記的數據。因此須要使用註解顯式的告訴容器直到Step
執行的階段才初始化這個@Bean
。
Job Scope的概念和 Step Scope相似,都是用於標識在到了某個執行時間段再添加和注入Bean。@JobScope
用於告知框架知道JobInstance
存在時候才初始化對應的@Bean
:
@JobScope @Bean // 初始化獲取 jobParameters中的參數 public FlatFileItemReader flatFileItemReader(@Value("#{jobParameters[input]}") String name) { return new FlatFileItemReaderBuilder<Foo>() .name("flatFileItemReader") .resource(new FileSystemResource(name)) ... }
@JobScope @Bean // 初始化獲取jobExecutionContext中的參數 public FlatFileItemReader flatFileItemReader(@Value("#{jobExecutionContext中的參數['input.name']}") String name) { return new FlatFileItemReaderBuilder<Foo>() .name("flatFileItemReader") .resource(new FileSystemResource(name)) ... }