SpringBatch中的retry和skip機制實現分析

SpringBatch是spring框架下的一個子模塊,用於處理批處理的批次框架。git

本文主要分析SpringBatch中的retry和skip機制的實現。github

先簡單說明下SpringBatch在SpringBoot中的使用。spring

若是要在springboot中使用batch的話,直接加入如下依賴便可:springboot

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
複製代碼

而後使用註解開啓Batch模塊:bash

...
@EnableBatchProcessing
public class Application { ... }
複製代碼

以後就能夠注入JobBuilderFactory和StepBuilderFactory:框架

@Autowired
private JobBuilderFactory jobs; 
@Autowired
private StepBuilderFactory steps;
複製代碼

有了這2個factory以後,就能夠build job。ide

SpringBatch中的相關基礎概念好比ItemReader、ItemWriter、Chunk等本文就不介紹了。spring-boot

咱們以FlatFileItemReader做爲reader,一個自定義Writer用於打印reader中讀取出來的數據。ui

這個定義的writer遇到good job這條數據的時候會報錯,具體邏輯以下:spa

@Override
public void write(List<? extends String> items) throws Exception {
    System.out.println("handle start =====" + items);
    for(String a : items) {
        if(a.equals("good job")) {
            throw new Exception("custom exception");
        }
    }
    System.out.println("handle end.. -----" + items);
}
複製代碼

其中reader中讀取的文件中的數據以下:

hello world
hello coder
good job
cool
66666
複製代碼

咱們使用StepBuilderFactory構造Step,chunkSize設置爲2。而後在job1中使用並執行:

stepBuilderFactory.get("test-step").chunk(2).reader(reader).writer(writer).build();
複製代碼

執行job1後console打印以下:

handle start =====[hello world, hello coder]
handle end.. -----[hello world, hello coder]
handle start =====[good job, cool]
複製代碼

job1遇到了good job這條數據,writer拋出了異常,因爲沒有使用skip或者retry機制,致使整個流程中止。job1的處理流程底層在SimpleChunkProcessor這個類中完成,包括processor、writer的使用。

接下里咱們構造一個job2,job2使用skip機制(其中skipLimit必需要和skip(Class<? extends Throwable> type)一塊兒使用),skip機制能夠防止writer發生異常後不中止整個job,可是須要同時知足skip的限制次數和skip對應的Exception是發生異常的父類或自身關係條件纔不會中止整個job,這裏咱們使用Exception做爲異常和Integer.MAX_VALUE做爲skip的限制次數爲例:

stepBuilderFactory.get.get("test-step").chunk(2).reader(reader).writer(writer).faultTolerant().skipLimit(Integer.MAX_VALUE).skip(Exception.class).build();
複製代碼

執行job2 後console打印以下:

handle start =====[hello world, hello coder]
handle end.. -----[hello world, hello coder]
handle start =====[good job, cool]
handle start =====[good job]
handle start =====[cool]
handle end.. -----[cool]
handle start =====[66666]
handle end.. -----[66666]
複製代碼

咱們看到good job這條數據發生的異常被skip掉了,job完整的執行。

可是發現了另一個問題,那就是處理 [good job, cool] 這批數據的時候,發生了異常,可是接下來執行了 [good job] 和 [cool] 這兩批chunk爲1的批次。這是在ItemWriter中執行的,它也會在ItemWriteListener中執行屢次。

換句話說,若是使用了skip功能,那麼對於須要被skip的批次數據中會進行scan操做找出具體是哪1條數據的緣由,這裏的scan操做指的是一條一條數據的遍歷。

這個過程爲何叫scan呢? 在源碼中,FaultTolerantChunkProcessor類(處理帶有skip或者retry機制的處理器,跟SimpleChunkProcessor相似,只不過SimpleChunkProcessor處理簡單的Job)裏有個私有方法scan:

private void scan(final StepContribution contribution, final Chunk<I> inputs, final Chunk<O> outputs,
        ChunkMonitor chunkMonitor, boolean recovery) throws Exception {
 
    ...

    Chunk<I>.ChunkIterator inputIterator = inputs.iterator();
    Chunk<O>.ChunkIterator outputIterator = outputs.iterator();
 
    List<O> items = Collections.singletonList(outputIterator.next()); // 拿出須要寫的數據中的每一條數據
    inputIterator.next();
    try {
        writeItems(items); // 寫每條數據
        doAfterWrite(items);
        contribution.incrementWriteCount(1);
        inputIterator.remove();
        outputIterator.remove();
    }
    catch (Exception e) { // 寫的時候若是發生了異常
        doOnWriteError(e, items);
        if (!shouldSkip(itemWriteSkipPolicy, e, -1) && !rollbackClassifier.classify(e)) {
            inputIterator.remove();
            outputIterator.remove();
        }
        else {
            // 具體的skip策略
            checkSkipPolicy(inputIterator, outputIterator, e, contribution, recovery);
        }
        if (rollbackClassifier.classify(e)) {
            throw e;
        }
    }
    chunkMonitor.incrementOffset();
    if (outputs.isEmpty()) { // 批次裏的全部數據處理完畢以後 scanning 設置爲false
        data.scanning(false);
        inputs.setBusy(false);
        chunkMonitor.resetOffset();
    }
}
複製代碼

這個scan方法觸發的條件是UserData這個內部類裏的scanning被設置爲true,這裏被設置爲true是在處理批次數據出現異常後而且不能retry的狀況下才會被設置的。

try {
    batchRetryTemplate.execute(retryCallback, recoveryCallback, new DefaultRetryState(inputs,
            rollbackClassifier));
}
catch (Exception e) {
    RetryContext context = contextHolder.get();
     if (!batchRetryTemplate.canRetry(context)) {
         // 設置scanning爲true
        data.scanning(true);
    }
    throw e;
}
複製代碼

這就是爲何skip機制在skip數據的時候會去scan批次中的每條數據,而後並找出須要被skip的數據的原理。

job3帶有retry功能,retry的功能在於出現某個異常而且這個異常能夠被retry所接受的話會進行retry,retry的次數能夠進行配置,咱們配置了3次retry:

stepBuilderFactory.get.get("test-step").chunk(2).reader(reader).writer(writer).faultTolerant().skipLimit(Integer.MAX_VALUE).skip(Exception.class).retryLimit(3).retry(Exception.class).build();
複製代碼

執行 job3後console打印以下:

handle start =====[hello world, hello coder]
handle end.. -----[hello world, hello coder]
handle start =====[good job, cool]
handle start =====[good job, cool]
handle start =====[good job, cool]
handle start =====[good job]
handle start =====[cool]
handle end.. -----[cool]
handle start =====[66666]
handle end.. -----[66666]
複製代碼

[good job, cool] 這批數據retry了3次,並且都失敗了。失敗以後進行了skip操做。

SpringBatch中的retry和skip都有對應的policy實現,默認的retry policy是SimpleRetryPolicy,能夠設置retry次數和接收的exception。好比可使用NeverRetryPolicy:

.retryPolicy(new NeverRetryPolicy())
複製代碼

使用NeverRetryPolicy以後,便再也不retry了,只會skip。SpringBatch內部的retry是使用Spring的retry模塊完成的。執行的時候能夠設置RetryCallback和RecoveryCallback。

SpringBatch中默認的skip policy是LimitCheckingItemSkipPolicy。

相關文章
相關標籤/搜索