Spring Batch(3)——Step控制

批處理任務的主要業務邏輯都是在Step中去完成的。能夠將Job理解爲運行Step的框架,而Step理解爲業務功能。html

Step配置

StepJob中的工做單元,每個Step涵蓋了單行記錄的處理閉環。下圖是一個Step的簡要結構:spring

Step

一個Step一般涵蓋三個部分:讀數據(Reader)、處理數據(Processor)和寫數據(Writer)。可是並非全部的Step都須要自身來完成數據的處理,好比存儲過程等方式是經過外部功能來完成,所以Spring Batch提供了2種Step的處理方式:1)面向分片的ChunkStep,2)面向過程的TaskletStep。可是基本上大部分狀況下都是使用面向分片的方式來解決問題。數據庫

面向分片的處理過程

Step中數據是按記錄(按行)處理的,可是每條記錄處理完畢以後立刻提交事物反而會致使IO的巨大壓力。所以Spring Batch提供了數據處理的分片功能。設置了分片以後,一次工做會從Read開始,而後交由給Processor處理。處理完畢後會進行聚合,待聚合到必定的數量的數據以後一次性調用Write將數據提交到物理數據庫。其過程大體爲:緩存

Step批處理過程

在Spring Batch中所謂的事物和數據事物的概念同樣,就是一次性提交多少數據。若是在聚合數據期間出現任何錯誤,全部的這些數據都將不執行寫入。安全

面向對象配置Step

@Bean
public Job sampleJob(JobRepository jobRepository, Step sampleStep) {
    return this.jobBuilderFactory.get("sampleJob")
    			.repository(jobRepository)
                .start(sampleStep)
                .build();
}

@Bean
public Step sampleStep(PlatformTransactionManager transactionManager) {
	return this.stepBuilderFactory.get("sampleStep")
				.transactionManager(transactionManager)
				.<String, String>chunk(10) //分片配置
				.reader(itemReader()) //reader配置
				.writer(itemWriter()) //write配置
				.build();
}

觀察sampleStep方法:app

  1. reader: 使用ItemReader提供讀數據的方法。
  2. write:ItemWrite提供寫數據的方法。
  3. transactionManager:使用默認的 PlatformTransactionManager 對事物進行管理。當配置好事物以後Spring Batch會自動對事物進行管理,無需開發人員顯示操做
  4. chunk:指定一次性數據提交的記錄數,由於任務是基於Step分次處理的,當累計到chunk配置的次數則進行一次提交。提交的內容除了業務數據,還有批處理任務運行相關的元數據。

是否使用ItemProcessor是一個可選項。若是沒有Processor能夠將數據視爲讀取並直接寫入。框架

提交間隔

Step使用PlatformTransactionManager管理事物。每次事物提交的間隔根據chunk方法中配置的數據執行。若是設置爲1,那麼在每一條數據處理完以後都會調用ItemWrite進行提交。提交間隔設置過小,那麼會浪費須要多沒必要要的資源,提交間隔設置的太長,會致使事物鏈太長佔用空間,而且出現失敗會致使大量數據回滾。所以設定一個合理的間隔是很是必要的,這須要根據實際業務狀況、性能要求、以及數據安全程度來設定。若是沒有明確的評估目標,設置爲10~20較爲合適。異步

配置Step重啓

前文介紹了Job的重啓,可是每次重啓對Step也是有很大的影響的,所以須要特定的配置。jvm

限定重啓次數

某些Step可能用於處理一些先決的任務,因此當Job再次重啓時這Step就不必再執行,能夠經過設置startLimit來限定某個Step重啓的次數。當設置爲1時候表示僅僅運行一次,而出現重啓時將再也不執行:ide

@Bean
public Step step1() {
	return this.stepBuilderFactory.get("step1")
				.<String, String>chunk(10)
				.reader(itemReader())
				.writer(itemWriter())
				.startLimit(1)
				.build();
}

重啓已經完成任務的Step

在單個JobInstance的上下文中,若是某個Step已經處理完畢(COMPLETED)那麼在默認狀況下重啓以後這個Step並不會再執行。能夠經過設置allow-start-if-complete爲true告知框架每次重啓該Step都要執行:

@Bean
public Step step1() {
	return this.stepBuilderFactory.get("step1")
				.<String, String>chunk(10)
				.reader(itemReader())
				.writer(itemWriter())
				.allowStartIfComplete(true)
				.build();
}

配置略過邏輯

某些時候在任務處理單個記錄時中出現失敗並不該該中止任務,而應該跳過繼續處理下一條數據。是否跳過須要根據業務來斷定,所以框架提供了跳過機制交給開發人員使用。如何配置跳過機制:

@Bean
public Step step1() {
	return this.stepBuilderFactory.get("step1")
				.<String, String>chunk(10)
				.reader(flatFileItemReader())
				.writer(itemWriter())
				.faultTolerant()
				.skipLimit(10)
				.skip(FlatFileParseException.class)
				.build();
}

代碼的含義是當處理過程當中拋出FlatFileParseException異常時就跳過該條記錄的處理。skip-limit(skipLimit方法)配置的參數表示當跳過的次數超過數值時則會致使整個Step失敗,從而中止繼續運行。還能夠經過反向配置的方式來忽略某些異常:

@Bean
public Step step1() {
	return this.stepBuilderFactory.get("step1")
				.<String, String>chunk(10)
				.reader(flatFileItemReader())
				.writer(itemWriter())
				.faultTolerant()
				.skipLimit(10)
				.skip(Exception.class)
				.noSkip(FileNotFoundException.class)
				.build();
}

skip表示要當捕捉到Exception異常就跳過。可是Exception有不少繼承類,此時可使用noSkip方法指定某些異常不能跳過。

設置重試邏輯

當處理記錄出個異常以後並不但願他當即跳過或者中止運行,而是但願能夠屢次嘗試執行直到失敗:

@Bean
public Step step1() {
	return this.stepBuilderFactory.get("step1")
				.<String, String>chunk(2)
				.reader(itemReader())
				.writer(itemWriter())
				.faultTolerant()
				.retryLimit(3)
				.retry(DeadlockLoserDataAccessException.class)
				.build();
}

retry(DeadlockLoserDataAccessException.class)表示只有捕捉到該異常纔會重試,retryLimit(3)表示最多重試3次,faultTolerant()表示啓用對應的容錯功能。

事物回滾控制

默認狀況下,不管是設置了重試(retry)仍是跳過(skip),只要從Writer拋出一個異常都會致使事物回滾。若是配置了skip機制,那麼在Reader中拋出的異常不會致使回滾。有些從Writer拋出一個異常並不須要回滾數據,noRollback屬性爲Step提供了沒必要進行事物回滾的異常配置:

@Bean
public Step step1() {
	return this.stepBuilderFactory.get("step1")
				.<String, String>chunk(2)
				.reader(itemReader())
				.writer(itemWriter())
				.faultTolerant()
				.noRollback(ValidationException.class) //沒必要回滾的異常
				.build();
}

事物數據讀取的緩存

一次Setp分爲ReaderProcessorWriter三個階段,這些階段統稱爲Item。默認狀況下若是錯誤不是發生在Reader階段,那麼不必再去從新讀取一次數據。可是某些場景下須要Reader部分也須要從新執行,好比Reader是從一個JMS隊列中消費消息,當發生回滾的時候消息也會在隊列上重放,所以也要將Reader歸入到回滾的事物中,根據這個場景可使用readerIsTransactionalQueue來配置數據重讀:

@Bean
public Step step1() {
	return this.stepBuilderFactory.get("step1")
				.<String, String>chunk(2)
				.reader(itemReader())
				.writer(itemWriter())
				.readerIsTransactionalQueue() //數據重讀
				.build();
}

事物屬性

事物的屬性包括隔離等級(isolation)傳播方式(propagation)以及過時時間(timeout)。關於事物的控制詳見Spring Data Access的說明,下面是相關配置的方法:

@Bean
public Step step1() {
	//配置事物屬性
	DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
	attribute.setPropagationBehavior(Propagation.REQUIRED.value());
	attribute.setIsolationLevel(Isolation.DEFAULT.value());
	attribute.setTimeout(30);

	return this.stepBuilderFactory.get("step1")
				.<String, String>chunk(2)
				.reader(itemReader())
				.writer(itemWriter())
				.transactionAttribute(attribute) //設置事物屬性
				.build();
}

向Step註冊 ItemStream

ItemStream是用於每個階段(Reader、Processor、Writer)的「生命週期回調數據處理器」,後續的文章會詳細介紹ItemStream。在4.×版本以後默認注入註冊了通用的ItemStream

有2種方式將ItemStream註冊到Step中,一是使用stream方法:

@Bean
public Step step1() {
	return this.stepBuilderFactory.get("step1")
				.<String, String>chunk(2)
				.reader(itemReader())
				.writer(compositeItemWriter())
				.stream(fileItemWriter1())
				.stream(fileItemWriter2())
				.build();
}

二是使用相關方法的代理:

@Bean
public CompositeItemWriter compositeItemWriter() {
	List<ItemWriter> writers = new ArrayList<>(2);
	writers.add(fileItemWriter1());
	writers.add(fileItemWriter2());
	CompositeItemWriter itemWriter = new CompositeItemWriter();
	itemWriter.setDelegates(writers);
	return itemWriter;
}

StepExecution攔截器

Step執行的過程當中會產生各類各樣的事件,開發人員能夠利用各類Listener接口對StepItem進行監聽。一般在建立一個Step的時候添加攔截器:

@Bean
public Step step1() {
	return this.stepBuilderFactory.get("step1")
				.<String, String>chunk(10)
				.reader(reader())
				.writer(writer())
				.listener(chunkListener()) //添加攔截器
				.build();
}

Spring Batch提供了多個接口以知足不一樣事件的監聽。

StepExecutionListener

StepExecutionListener能夠看作一個通用的Step攔截器,他的做用是在Step開始以前和結束以後進行攔截處理:

public interface StepExecutionListener extends StepListener {
    void beforeStep(StepExecution stepExecution); //Step執行以前
    ExitStatus afterStep(StepExecution stepExecution); //Step執行完畢以後
}

在結束的時候開發人員能夠本身定義返回的ExitStatus,用於配合流程控制(見後文)實現對整個Step執行過程的控制。

ChunkListener

ChunkListener是在數據事物發生的兩端被觸發。chunk的配置決定了處理多少項記錄才進行一次事物提交,ChunkListener的做用就是對一次事物開始以後或事物提交以後進行攔截:

public interface ChunkListener extends StepListener {
    void beforeChunk(ChunkContext context); //事物開始以後,ItemReader調用以前
    void afterChunk(ChunkContext context); //事物提交以後
    void afterChunkError(ChunkContext context); //事物回滾以後
}

若是沒有設定chunk也可使用ChunkListener,它會被TaskletStep調用(TaskletStep見後文)。

ItemReadListener

該接口用於對Reader相關的事件進行監控:

public interface ItemReadListener<T> extends StepListener {
    void beforeRead();
    void afterRead(T item);
    void onReadError(Exception ex);
}

beforeRead在每次Reader調用以前被調用,afterRead在每次Reader成功返回以後被調用,而onReadError會在出現異常以後被調用,能夠將其用於記錄異常日誌。

ItemProcessListener

ItemProcessListenerItemReadListener相似,是圍繞着ItemProcessor進行處理的:

public interface ItemProcessListener<T, S> extends StepListener {
    void beforeProcess(T item); //processor執行以前
    void afterProcess(T item, S result); //processor直線成功以後
    void onProcessError(T item, Exception e); //processor執行出現異常
}

ItemWriteListener

ItemWriteListener的功能和ItemReadListenerItemReadListener相似,可是須要注意的是它接收和處理的數據對象是一個ListList的長度與chunk配置相關。

public interface ItemWriteListener<S> extends StepListener {
    void beforeWrite(List<? extends S> items);
    void afterWrite(List<? extends S> items);
    void onWriteError(Exception exception, List<? extends S> items);
}

SkipListener

ItemReadListenerItemProcessListenerItemWriteListener都提供了錯誤攔截處理的機制,可是沒有處理跳過(skip)的數據記錄。所以框架提供了SkipListener來專門處理那麼被跳過的記錄:

public interface SkipListener<T,S> extends StepListener {
    void onSkipInRead(Throwable t); //Read期間致使跳過的異常
    void onSkipInProcess(T item, Throwable t); //Process期間致使跳過的異常
    void onSkipInWrite(S item, Throwable t); //Write期間致使跳過的異常
}

SkipListener的價值是能夠將那些未能成功處理的記錄在某個位置保存下來,而後交給其餘批處理進一步解決,或者人工來處理。Spring Batch保證如下2個特徵:

  1. 跳過的元素只會出現一次。
  2. SkipListener始終在事物提交以前被調用,這樣能夠保證監聽器使用的事物資源不會被業務事物影響。

TaskletStep

面向分片(Chunk-oriented processing )的過程並非Step的惟一執行方式。好比用數據庫的存儲過程來處理數據,這個時候使用標準的Reader、Processor、Writer會很奇怪,針對這些狀況框架提供了TaskletStep

TaskletStep是一個很是簡單的接口,僅有一個方法——executeTaskletStep會反覆的調用這個方法直到獲取一個RepeatStatus.FINISHED返回或者拋出一個異常。全部的Tasklet調用都會包裝在一個事物中。

註冊一個TaskletStep很是簡單,只要添加一個實現了Tasklet接口的類便可:

@Bean
public Step step1() {
    return this.stepBuilderFactory.get("step1")
    			.tasklet(myTasklet()) //注入Tasklet的實現
    			.build();
}

TaskletStep還支持適配器處理等,詳見官網說明

控制Step執行流程

順序執行

默認狀況下。Step與Step之間是順序執行的,以下圖:

Step順序執行

順序執行經過next方法來標記:

@Bean
public Job job() {
	return this.jobBuilderFactory.get("job")
				.start(stepA())
				.next(stepB()) //順序執行
				.next(stepC())
				.build();
}

條件執行

在順序執行的過程當中,在整個執行鏈條中有一個Step執行失敗則整個Job就會中止。可是經過條件執行,能夠指定各類狀況下的執行分支:

Step條件執行

爲了實現更加複雜的控制,能夠經過Step執行後的退出命名來定義條件分之。先看一個簡單的代碼:

@Bean
public Job job() {
	return this.jobBuilderFactory.get("job")
				.start(stepA()) //啓動時執行的step
				.on("*").to(stepB()) //默認跳轉到stepB
				.from(stepA()).on("FAILED").to(stepC()) //當返回的ExitStatus爲"FAILED"時,執行。
				.end()
				.build();
}

這裏使用*來表示默認處理,*是一個通配符表示處理任意字符串,對應的還可使用?表示匹配任意字符。在Spring Batch(1)——數據批處理概念一文中介紹了Step的退出都會有ExitStatus,命名都來源於它。下面是一個更加全面的代碼。

  1. 配置攔截器處理ExitCode:
public class SkipCheckingListener extends StepExecutionListenerSupport {
    public ExitStatus afterStep(StepExecution stepExecution) {
        String exitCode = stepExecution.getExitStatus().getExitCode();
        if (!exitCode.equals(ExitStatus.FAILED.getExitCode()) &&
              stepExecution.getSkipCount() > 0) { //當Skip的Item大於0時,則指定ExitStatus的內容
            return new ExitStatus("COMPLETED WITH SKIPS");
        }
        else {
            return null;
        }
    }
}

攔截器指示當有一個以上被跳過的記錄時,返回的ExitStatus爲"COMPLETED WITH SKIPS"。對應的控制流程:

@Bean
public Job job() {
	return this.jobBuilderFactory.get("job")
			.start(step1()).on("FAILED").end() //執行失敗直接退出
			.from(step1()).on("COMPLETED WITH SKIPS").to(errorPrint1()) //有跳過元素執行 errorPrint1()
			.from(step1()).on("*").to(step2()) //默認(成功)狀況下執行 Step2
			.end()
			.build();
}

Step的停機退出機制

Spring Batch爲Job提供了三種退出機制,這些機制爲批處理的執行提供了豐富的控制方法。在介紹退出機制以前須要回顧一下 數據批處理概念一文中關於StepExecution的內容。在StepExecution中有2個表示狀態的值,一個名爲status,另一個名爲exitStatus。前者也被稱爲BatchStatus

前面以及介紹了ExitStatus的使用,他能夠控制Step執行鏈條的條件執行過程。除此以外BatchStatus也會參與到過程的控制。

End退出

默認狀況下(沒有使用endfail方法結束),Job要順序執行直到退出,這個退出稱爲end。這個時候,BatchStatus=COMPLETEDExitStatus=COMPLETED,表示成功執行。

除了Step鏈式處理天然退出,也能夠顯示調用end來退出系統。看下面的例子:

@Bean
public Job job() {
	return this.jobBuilderFactory.get("job")
				.start(step1()) //啓動
				.next(step2()) //順序執行
				.on("FAILED").end()
				.from(step2()).on("*").to(step3()) //條件執行
				.end()
				.build();
}

上面的代碼,step1step2是順序執行,當step2exitStatus返回"FAILED"時則直接End退出。其餘狀況執行Step3

Fail退出

除了end還可使用fail退出,這個時候,BatchStatus=FAILEDExitStatus=EARLY TERMINATION,表示執行失敗。這個狀態與End最大的區別是Job會嘗試重啓執行新的JobExecution。看下面代碼的例子:

@Bean
public Job job() {
	return this.jobBuilderFactory.get("job")
			.start(step1()) //執行step1
			.next(step2()).on("FAILED").fail() //step2的ExitStatus=FAILED 執行fail
			.from(step2()).on("*").to(step3()) //不然執行step3
			.end()
			.build();
}

在指定的節點中斷

Spring Batch還支持在指定的節點退出,退出後下次重啓會從中斷的點繼續執行。中斷的做用是某些批處理到某個步驟後須要人工干預,當干預完以後又接着處理:

@Bean
public Job job() {
	return this.jobBuilderFactory.get("job")
	 		//若是step1的ExitStatus=COMPLETED則在step2中斷
			.start(step1()).on("COMPLETED").stopAndRestart(step2())
			 //不然直接退出批處理
			.end()
			.build();
}

程序化流程的分支

能夠直接進行編碼來控制Step之間的扭轉,Spring Batch提供了JobExecutionDecider接口來協助分支管理:

public class MyDecider implements JobExecutionDecider {
    public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
        String status;
        if (someCondition()) {
            status = "FAILED";
        }
        else {
            status = "COMPLETED";
        }
        return new FlowExecutionStatus(status);
    }
}

接着將MyDecider做爲過濾器添加到配置過程當中:

@Bean
public Job job() {
	return this.jobBuilderFactory.get("job")
			.start(step1())
			.next(decider()).on("FAILED").to(step2())
			.from(decider()).on("COMPLETED").to(step3())
			.end()
			.build();
}

流程分裂

在線性處理過程當中,流程都是一個接着一個執行的。可是爲了知足某些特殊的須要,Spring Batch提供了執行的過程分裂並行Step的方法。參看下面的Job配置:

@Bean
public Job job() {
	Flow flow1 = new FlowBuilder<SimpleFlow>("flow1")
			.start(step1())
			.next(step2())
			.build();//並行流程1
	Flow flow2 = new FlowBuilder<SimpleFlow>("flow2")
			.start(step3())
			.build();//並行流程2

	return this.jobBuilderFactory.get("job")
				.start(flow1)
				.split(new SimpleAsyncTaskExecutor()) //建立一個異步執行任務
				.add(flow2)
				.next(step4()) //2個分支執行完畢以後再執行step4。
				.end()
				.build();
}

這裏表示flow1和flow2會並行執行,待2者執行成功後執行step4。

數據綁定

JobStep的任何位置,均可以獲取到統一配置的數據。好比使用標準的Spring Framework方式:

@Bean
public FlatFileItemReader flatFileItemReader(@Value("${input.file.name}") String name) {
	return new FlatFileItemReaderBuilder<Foo>()
			.name("flatFileItemReader")
			.resource(new FileSystemResource(name))
			...
}

當咱們經過配置文件(application.properties中 input.file.name=filepath)或者jvm參數(-Dinput.file.name=filepath)指定某些數據時,均可以經過這種方式獲取到對應的配置參數。

此外,也能夠從JobParameters從獲取到Job運行的上下文參數:

@StepScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{jobParameters['input.file.name']}") String name) {
	return new FlatFileItemReaderBuilder<Foo>()
			.name("flatFileItemReader")
			.resource(new FileSystemResource(name))
			...
}

不管是JobExecution仍是StepExecution,其中的內容均可以經過這種方式去獲取參數,例如:

@StepScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{jobExecutionContext['input.file.name']}") String name) {
	return new FlatFileItemReaderBuilder<Foo>()
			.name("flatFileItemReader")
			.resource(new FileSystemResource(name))
			...
}

或者

@StepScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{stepExecutionContext['input.file.name']}") String name) {
	return new FlatFileItemReaderBuilder<Foo>()
			.name("flatFileItemReader")
			.resource(new FileSystemResource(name))
			...
}

Step Scope

注意看上面的代碼例子,都有一個@StepScope註解。這是爲了進行後期綁定進行的標識。由於在Spring的IoCs容器進行初始化的階段並無任何的*Execution在執行,進而也不存在任何*ExecutionContext,因此這個時候根本沒法注入標記的數據。因此須要使用註解顯式的告訴容器直到Step執行的階段才初始化這個@Bean

Job Scope

Job Scope的概念和 Step Scope相似,都是用於標識在到了某個執行時間段再添加和注入Bean。@JobScope用於告知框架知道JobInstance存在時候才初始化對應的@Bean

@JobScope
@Bean
// 初始化獲取 jobParameters中的參數
public FlatFileItemReader flatFileItemReader(@Value("#{jobParameters[input]}") String name) {
	return new FlatFileItemReaderBuilder<Foo>()
			.name("flatFileItemReader")
			.resource(new FileSystemResource(name))
			...
}
@JobScope
@Bean
// 初始化獲取jobExecutionContext中的參數
public FlatFileItemReader flatFileItemReader(@Value("#{jobExecutionContext中的參數['input.name']}") String name) {
	return new FlatFileItemReaderBuilder<Foo>()
			.name("flatFileItemReader")
			.resource(new FileSystemResource(name))
			...
}
相關文章
相關標籤/搜索