數據共享-spring batch(9)上下文處理

在 Spring Batch 中進行數據及參數傳遞的方法。

1 引言

本文是 Spring Batch 系列文章的第9篇,有興趣的可見文章:java

前面文章以實例的方式對 Spring Batch 進行批處理進行詳細說明,相信你們對字符串、文件,關係型數據庫及 NoSQL 數據庫的讀取,處理,寫入流程已比較熟悉。有小夥伴就問,針對這個任務流程,期間有多個步驟,從任務( Job )啓動,到做業步( Step )的執行,其中又包含讀組件、處理組件、寫組件,那麼,針對這個流程,若中間須要傳遞自定義的數據,該如何處理?本文將對 Spring Batch 進行數據傳遞的方法進行描述,依然會使用代碼實例的方式進行講解。包括如下幾個內容:mysql

  • 基於 Mybatis-plus 集成多數據源的數據庫訪問
  • 使用 ExecutionContext 共享數據
  • StepScope 動態綁定參數傳遞

2 開發環境

  • JDK環境: jdk1.8
  • Spring Boot: 2.1.4.RELEASE
  • Spring Batch:4.1.2.RELEASE
  • 開發IDE: IDEA
  • 構建工具Maven: 3.3.9
  • 日誌組件logback:1.2.3
  • lombok:1.18.6
  • MySQL: 5.6.26
  • Mybatis-plus: 3.4.0

本示例源碼已放至githubhttps://github.com/mianshenglee/spring-batch-example/tree/master/spring-batch-param,請結合示例代碼進行閱讀。git

3 基於 Mybatis-plus 集成多數據源的數據庫訪問

本示例仍是使用原來示例功能,從源數據庫讀取用戶數據,處理數據,而後寫入到目標數據庫。其中會在任務啓動時傳遞參數,並在做業步中傳遞參數。以前已經介紹過如何使用 beetlsql 進行多數據源配置(便捷的數據讀寫-spring batch(5)結合beetlSql進行數據讀寫),實現數據批處理。還有不少朋友使用 Mybatis 或 Mybatis-plus 進行數據庫讀寫,所以,有必要提一下 Spring Batch 如何結合 Mybatis 或 Mybatis-plus 配置多數據源操做。本示例以 Mybatis-plus 爲例。github

示例工程中的sql目錄有相應的數據庫腳本,其中源數據庫mytest.sql腳本建立一個test_user表,並有相應的測試數據。目標數據庫 my_test1.sqlmytest.sql表結構一致,spring-batch-mysql.sql是 Spring Batch 自己提供的數據庫腳本。spring

3.1 pom 文件中引入 Mybatis-plus

<!--mybatis-plus-->
<dependency>
    <groupId>com.baomidou</groupId>
    <artifactId>mybatis-plus-boot-starter</artifactId>
    <version>3.4.0</version>
</dependency>

3.2 配置及使用多數據源

本示例會涉及三個數據庫,分別是 Spring Batch 自己數據庫,須要批處理的源數據庫,批處理的目標數據庫。所以須要處理多個數據庫,利用多套源策略,能夠很簡單就完成多套數據源的處理。簡單來講主要分爲如下幾個步驟:sql

  • 配置多數據源鏈接信息
  • 根據不一樣數據源劃分mapper 包,entity包,mapper.xml文件包
  • 根據不一樣數據源配置獨立的 SqlSessionFactory
  • 根據不一樣的應用場景,使用不一樣的 mapper

關於多數據源多套源策略的詳細配置過程,能夠參考個人另外一篇文章《搞定SpringBoot多數據源(1):多套源策略數據庫

4 ExecutionContext 傳遞參數

關於 Spring Batch 的讀數據( ItemReader )、處理數據( ItemProcessor )、寫數據( ItemWriter )的配置流程,能夠參考前面系列文章,本文再也不詳細描述。咱們須要記住的是,當一個做業( Job )啓動,Spring Batch 是經過做業名稱( Job name)及 做業參數( JobParameters )做爲惟一標識來區分不一樣的做業。一個 Job 下能夠有多個做業步( Step ),每一個 Step 中就是有具體的操做邏輯(讀、處理、寫)。在 Job 和 Step 下的各個操做步驟間,如何傳遞,,這裏就須要理解 ExecutionContext 的概念。mybatis

4.1 ExecutionContext 概念

在 Job 的運行及 Step 的運行過程當中,Spring Batch 提供 ExecutionContext 進行運行數據持久化,利用它,能夠根據業務進行數據共享,如用來重啓的靜態數據與狀態數據。以下圖:app

Spring Batch 參數傳遞

Execution Context 本質上來說就是一個 Map<String,Object> ,它是Spring Batch 框架提供的持久化與控制的 key/value 對,可讓開發者在 Step 運行或Job 運行過程當中保存須要進行持久化的狀態,它能夠。分爲兩類,一類是Job 運行的上下文(對應數據表:BATCH_JOB_EXECUTION_CONTEXT),另外一類是Step Execution的上下文(對應數據表BATCH_STEP_EXECUTION_CONTEXT)。兩類上下文關係:一個 Job 運行對應一個 Job Execution 的上下文(如上圖中藍色部分的 ExecutionContext ),每一個 Step 運行對應一個 Step Execution 上下文(如上圖中粉色部分的 ExecutionContext );同一個 Job 中的 Step Execution 共用 Job Execution 的上下文。也就是說,它們的做用範圍有區別。所以,若是同一個 Job 的不一樣 Step 間須要共享數據時,能夠經過 Job Execution 的上下文共享數據。根據 ExecutionContext 的共享數據特性,則能夠實如今不一樣步驟間傳遞數據。框架

4.2 ExecutionContext 傳遞數據

一個 Job 啓動後,會生成一個 JobExecution ,用於存放和記錄 Job 運行的信息,一樣,在 Step 啓動後,也會有對應的 StepExecution 。如前面所說,在 JobExecution 和 StepExecution 中都會有一個 ExecutionContext ,用於存儲上下文。所以,數據傳遞的思路就是肯定數據使用範圍,而後經過 ExecutionContext 傳入數據,而後就能夠在對應的範圍內共享數據。如當前示例,須要 Job 範圍內共享數據,在讀組件( ItemReader )和寫組件( ItemWriter )中傳遞讀與寫數據的數量( size ),在 Job 結束時,輸出讀及寫的數據量。實際上 Spring Batch 會自動計算讀寫數量,本示例僅爲了顯示數據共享功能。

共享數據

那麼,如何獲取對應的 Execution ?,Spring Batch 提供了 JobExecutionListener 和 StepExecutionListener 監聽器接口,經過實現監聽器接口,分別能夠在開啓做業前( beforeJob )和 完成做業後( afterJob )afterJob ),開啓做業步前( beforeStep)及 完成做業步後( afterStep )獲取對應的 Execution ,而後進行操做。

4.2.1 實現監聽器接口

在自定義的 UserItemReader 和 UserItemWriter 中,實現 StepExecutionListener 接口,其中使用 StepExecution 做爲成員,從 beforeStep 中獲取。以下:

public class UserItemWriter implements ItemWriter<TargetUser>, StepExecutionListener {
    private StepExecution stepExecution;
    //...略
    @Override
    public void beforeStep(StepExecution stepExecution) {
        this.stepExecution = stepExecution;
    }
}

讀組件( UserItemReader )也使用一樣的方式。而在做業結束後,獲取參數,則能夠繼承 JobExecutionListenerSupport ,實現本身感興趣的方法,也從參數中獲取 JobExecution,而後獲取參數進行處理。

public class ParamJobEndListener extends JobExecutionListenerSupport {
    @Override
    public void afterJob(JobExecution jobExecution) {}
}

4.2.2 設置用於傳遞的數據

因爲咱們須要在 Job 範圍內傳遞參數,獲取到 StepExecution 後,能夠得到相應的 JobExecution ,進而獲取 Job 對應的 executionContext,這樣,就能夠在 Job 範圍內共享參數數據了。以下是在讀組件中進行配置

ExecutionContext executionContext = stepExecution.getJobExecution().getExecutionContext();
            executionContext.put(SyncConstants.PASS_PARAM_READ_NUM, items.size());

一樣在寫組件中,獲取到 ExecutionContext 後,能夠對參數進行處理。本示例中,是經過對 ItemReader 傳遞的處理數目參數進行累加處理,獲得結果。

@Override
public void write(List<? extends TargetUser> items) {
    ExecutionContext executionContext = stepExecution.getJobExecution().getExecutionContext();
    Object currentWriteNum = executionContext.get(SyncConstants.PASS_PARAM_WRITE_NUM);
    if (Objects.nonNull(currentWriteNum)) {
        log.info("currentWriteNum:{}", currentWriteNum);
        executionContext.put(SyncConstants.PASS_PARAM_WRITE_NUM, items.size()+(Integer)currentWriteNum);
    } else {
        executionContext.put(SyncConstants.PASS_PARAM_WRITE_NUM, items.size());
    }

最後在做業結束後,在實現 JobExecutionListenerSupport 的接口中,afterJob 函數中,對參數進行輸出。

public class ParamJobEndListener extends JobExecutionListenerSupport {
    @Override
    public void afterJob(JobExecution jobExecution) {
        ExecutionContext executionContext = jobExecution.getExecutionContext();
        Integer writeNum = (Integer)executionContext.get(SyncConstants.PASS_PARAM_WRITE_NUM);
        log.info(LogConstants.LOG_TAG + "writeNum:{}",writeNum);
    }
}

5 StepScope 動態綁定參數傳遞

5.1 StepScope及後期綁定

前面說到在 Job 及 Step 範圍內,使用 ExecutionContext 進行數據共享,但,若是須要在 Job 啓動前設置參數,而且每次啓動輸入的參數是動態變化的(好比增量同步時,日期是基於上一次同步的時間或者ID),也就是說,每次運行,須要根據參數新建一個操做步驟(如 ItemReader、ItemWriter等),咱們知道,因爲在 Spring IOC 中加載的Bean,默認都是單例模式的,所以,須要每次運行新建,運行完銷燬,新建是在運行時進行的。這就須要用到StepScope 及後期綁定技術。

在以前的示例中,已出現過 StepScope,它的做用是提供了操做步驟的做用範圍,某個 Spring Bean 使用註解StepScope,則表示此 Bean 在做業步( Step )開始的時候初始化,在 Step 結束的時候銷燬,也就是說 Bean的做用範圍是在 Step 這個生命週期中。而 Spring Batch 經過屬性後期綁定技術,在運行期獲取屬性值,並使用 SPEL 的表達式進行屬性綁定。而在 StepScope 中,Spring Batch 框架提供 JobParameters,JobExecutionContext,StepExecutionContext,固然也可使用 Spring 容器中的 Bean ,如 JobExecution ,StepExecution。

5.2 做業參數傳遞及動態獲取 StepExecution

一個 Job 是由 Job name 及 JobParameters 做爲惟一標識的,也就是說只有 job name 和 JobParameters 不一致時,Spring Batch 纔會啓動一個新的 Job,一致的話就看成是同一個 Job ,若 此 Job 未執行過,則執行;若已執行過且是 FAILED 狀態,則嘗試從新運行此 Job ,若已執行過且是 COMPLETED 狀態,則會報錯。

本示例中,Job 啓動時輸入時間參數,在 ItemReader 中使用 StepScope 註解,而後把時間參數綁定到 ItemReader 中,同時綁定 StepExecution ,以便於在 ItemReader 對時間參數及 StepExecution 進行操做。

5.2.1 設置時間參數

在使用 JobLauncher 啓動 Job 時,是須要輸入 jobParameters 做爲參數的。所以能夠建立此對象,並設置參數。

JobParameters jobParameters = new JobParametersBuilder()
                .addLong("time",timMillis)
                .toJobParameters();

5.2.2 動態綁定參數

在配置 Step 時,須要建立ItemReader 的 Bean,爲了使用動態參數,在 ItemReader 中設置 Map 存放參數,並設置 StepExecution 爲成員,以便於後面使用 ExecutionContext。

public class UserItemReader implements ItemReader<User> {
    protected Map<String, Object> params;
    private StepExecution stepExecution;

    public void setStepExecution(StepExecution stepExecution) {
        this.stepExecution = stepExecution;
    }
}

使用 StepScope 進行配置:

@Bean
@StepScope
public ItemReader paramItemReader(@Value("#{stepExecution}") StepExecution stepExecution,
                                  @Value("#{jobParameters['time']}") Long timeParam) {
    UserItemReader userItemReader = new UserItemReader();
    //設置參數
    Map<String, Object> params = CollUtil.newHashMap();
    Date datetime = new Date(timeParam);
    params.put(SyncConstants.PASS_PARAM_DATETIME, datetime);
    userItemReader.setParams(params);
    userItemReader.setStepExecution(stepExecution);

    return userItemReader;
}

注意:此時 ItemReader 不可再使用實現 StepExecutionListener 的方式來對 stepExecution 賦值,因爲 ItemReader 是動態綁定的,StepExecutionListener 將再也不起做用,所以須要在後期綁定中來綁定 stepExecution Bean 的方式來賦值。

5.2.3 設置及傳遞參數

ItemReader 獲取到 StepExecution 後便可獲取 ExecutionContext,而後能夠像前面說的使用 ExecutionContext 方式進行數據傳遞。以下:

ExecutionContext executionContext = stepExecution.getJobExecution().getExecutionContext();
//readNum參數
executionContext.put(SyncConstants.PASS_PARAM_READ_NUM, items.size());
//datetime參數
executionContext.put(SyncConstants.PASS_PARAM_DATETIME,params.get(SyncConstants.PASS_PARAM_DATETIME));

6.總結

在 Job 和 Step 不一樣的數據範圍中,可以使用 ExecutionContext 共享數據。本文以傳遞處理數量爲例,使用 Mybatis-plus,基於 ExecutionContext ,結合 StepScope及後期綁定技術,實如今 Job 啓動傳入參數,而後在 ItemReader、ItemProcessor、ItemWriter 及 Job 完成後的數據共享及傳遞。若是你在使用 Spring Batch 過程當中須要進行數據共享與傳遞,請試試這種方式吧。

往期文章

若是文章內容對你有幫助,歡迎轉發分享~

個人公衆號(搜索Mason技術記錄),獲取更多技術記錄:

Mason技術記錄

相關文章
相關標籤/搜索