SpringBatch是spring框架下的一個子模塊,用於處理批處理的批次框架。git
本文主要分析SpringBatch中的retry和skip機制的實現。github
先簡單說明下SpringBatch在SpringBoot中的使用。spring
若是要在springboot中使用batch的話,直接加入如下依賴便可:springboot
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
複製代碼
而後使用註解開啓Batch模塊:bash
...
@EnableBatchProcessing
public class Application { ... }
複製代碼
以後就能夠注入JobBuilderFactory和StepBuilderFactory:框架
@Autowired
private JobBuilderFactory jobs;
@Autowired
private StepBuilderFactory steps;
複製代碼
有了這2個factory以後,就能夠build job。ide
SpringBatch中的相關基礎概念好比ItemReader、ItemWriter、Chunk等本文就不介紹了。spring-boot
咱們以FlatFileItemReader做爲reader,一個自定義Writer用於打印reader中讀取出來的數據。ui
這個定義的writer遇到good job這條數據的時候會報錯,具體邏輯以下:spa
@Override
public void write(List<? extends String> items) throws Exception {
System.out.println("handle start =====" + items);
for(String a : items) {
if(a.equals("good job")) {
throw new Exception("custom exception");
}
}
System.out.println("handle end.. -----" + items);
}
複製代碼
其中reader中讀取的文件中的數據以下:
hello world
hello coder
good job
cool
66666
複製代碼
咱們使用StepBuilderFactory構造Step,chunkSize設置爲2。而後在job1中使用並執行:
stepBuilderFactory.get("test-step").chunk(2).reader(reader).writer(writer).build();
複製代碼
執行job1後console打印以下:
handle start =====[hello world, hello coder]
handle end.. -----[hello world, hello coder]
handle start =====[good job, cool]
複製代碼
job1遇到了good job這條數據,writer拋出了異常,因爲沒有使用skip或者retry機制,致使整個流程中止。job1的處理流程底層在SimpleChunkProcessor這個類中完成,包括processor、writer的使用。
接下里咱們構造一個job2,job2使用skip機制(其中skipLimit必需要和skip(Class<? extends Throwable> type)一塊兒使用),skip機制能夠防止writer發生異常後不中止整個job,可是須要同時知足skip的限制次數和skip對應的Exception是發生異常的父類或自身關係條件纔不會中止整個job,這裏咱們使用Exception做爲異常和Integer.MAX_VALUE做爲skip的限制次數爲例:
stepBuilderFactory.get.get("test-step").chunk(2).reader(reader).writer(writer).faultTolerant().skipLimit(Integer.MAX_VALUE).skip(Exception.class).build();
複製代碼
執行job2 後console打印以下:
handle start =====[hello world, hello coder]
handle end.. -----[hello world, hello coder]
handle start =====[good job, cool]
handle start =====[good job]
handle start =====[cool]
handle end.. -----[cool]
handle start =====[66666]
handle end.. -----[66666]
複製代碼
咱們看到good job這條數據發生的異常被skip掉了,job完整的執行。
可是發現了另一個問題,那就是處理 [good job, cool] 這批數據的時候,發生了異常,可是接下來執行了 [good job] 和 [cool] 這兩批chunk爲1的批次。這是在ItemWriter中執行的,它也會在ItemWriteListener中執行屢次。
換句話說,若是使用了skip功能,那麼對於須要被skip的批次數據中會進行scan操做找出具體是哪1條數據的緣由,這裏的scan操做指的是一條一條數據的遍歷。
這個過程爲何叫scan呢? 在源碼中,FaultTolerantChunkProcessor類(處理帶有skip或者retry機制的處理器,跟SimpleChunkProcessor相似,只不過SimpleChunkProcessor處理簡單的Job)裏有個私有方法scan:
private void scan(final StepContribution contribution, final Chunk<I> inputs, final Chunk<O> outputs,
ChunkMonitor chunkMonitor, boolean recovery) throws Exception {
...
Chunk<I>.ChunkIterator inputIterator = inputs.iterator();
Chunk<O>.ChunkIterator outputIterator = outputs.iterator();
List<O> items = Collections.singletonList(outputIterator.next()); // 拿出須要寫的數據中的每一條數據
inputIterator.next();
try {
writeItems(items); // 寫每條數據
doAfterWrite(items);
contribution.incrementWriteCount(1);
inputIterator.remove();
outputIterator.remove();
}
catch (Exception e) { // 寫的時候若是發生了異常
doOnWriteError(e, items);
if (!shouldSkip(itemWriteSkipPolicy, e, -1) && !rollbackClassifier.classify(e)) {
inputIterator.remove();
outputIterator.remove();
}
else {
// 具體的skip策略
checkSkipPolicy(inputIterator, outputIterator, e, contribution, recovery);
}
if (rollbackClassifier.classify(e)) {
throw e;
}
}
chunkMonitor.incrementOffset();
if (outputs.isEmpty()) { // 批次裏的全部數據處理完畢以後 scanning 設置爲false
data.scanning(false);
inputs.setBusy(false);
chunkMonitor.resetOffset();
}
}
複製代碼
這個scan方法觸發的條件是UserData這個內部類裏的scanning被設置爲true,這裏被設置爲true是在處理批次數據出現異常後而且不能retry的狀況下才會被設置的。
try {
batchRetryTemplate.execute(retryCallback, recoveryCallback, new DefaultRetryState(inputs,
rollbackClassifier));
}
catch (Exception e) {
RetryContext context = contextHolder.get();
if (!batchRetryTemplate.canRetry(context)) {
// 設置scanning爲true
data.scanning(true);
}
throw e;
}
複製代碼
這就是爲何skip機制在skip數據的時候會去scan批次中的每條數據,而後並找出須要被skip的數據的原理。
job3帶有retry功能,retry的功能在於出現某個異常而且這個異常能夠被retry所接受的話會進行retry,retry的次數能夠進行配置,咱們配置了3次retry:
stepBuilderFactory.get.get("test-step").chunk(2).reader(reader).writer(writer).faultTolerant().skipLimit(Integer.MAX_VALUE).skip(Exception.class).retryLimit(3).retry(Exception.class).build();
複製代碼
執行 job3後console打印以下:
handle start =====[hello world, hello coder]
handle end.. -----[hello world, hello coder]
handle start =====[good job, cool]
handle start =====[good job, cool]
handle start =====[good job, cool]
handle start =====[good job]
handle start =====[cool]
handle end.. -----[cool]
handle start =====[66666]
handle end.. -----[66666]
複製代碼
[good job, cool] 這批數據retry了3次,並且都失敗了。失敗以後進行了skip操做。
SpringBatch中的retry和skip都有對應的policy實現,默認的retry policy是SimpleRetryPolicy,能夠設置retry次數和接收的exception。好比可使用NeverRetryPolicy:
.retryPolicy(new NeverRetryPolicy())
複製代碼
使用NeverRetryPolicy以後,便再也不retry了,只會skip。SpringBatch內部的retry是使用Spring的retry模塊完成的。執行的時候能夠設置RetryCallback和RecoveryCallback。
SpringBatch中默認的skip policy是LimitCheckingItemSkipPolicy。