在 Spring Batch 中進行數據及參數傳遞的方法。
本文是 Spring Batch 系列文章的第9篇,有興趣的可見文章:java
前面文章以實例的方式對 Spring Batch 進行批處理進行詳細說明,相信你們對字符串、文件,關係型數據庫及 NoSQL 數據庫的讀取,處理,寫入流程已比較熟悉。有小夥伴就問,針對這個任務流程,期間有多個步驟,從任務( Job )啓動,到做業步( Step )的執行,其中又包含讀組件、處理組件、寫組件,那麼,針對這個流程,若中間須要傳遞自定義的數據,該如何處理?本文將對 Spring Batch 進行數據傳遞的方法進行描述,依然會使用代碼實例的方式進行講解。包括如下幾個內容:mysql
本示例源碼已放至github:https://github.com/mianshenglee/spring-batch-example/tree/master/spring-batch-param
,請結合示例代碼進行閱讀。git
本示例仍是使用原來示例功能,從源數據庫讀取用戶數據,處理數據,而後寫入到目標數據庫。其中會在任務啓動時傳遞參數,並在做業步中傳遞參數。以前已經介紹過如何使用 beetlsql 進行多數據源配置(便捷的數據讀寫-spring batch(5)結合beetlSql進行數據讀寫),實現數據批處理。還有不少朋友使用 Mybatis 或 Mybatis-plus 進行數據庫讀寫,所以,有必要提一下 Spring Batch 如何結合 Mybatis 或 Mybatis-plus 配置多數據源操做。本示例以 Mybatis-plus 爲例。github
示例工程中的sql
目錄有相應的數據庫腳本,其中源數據庫mytest.sql
腳本建立一個test_user
表,並有相應的測試數據。目標數據庫 my_test1.sql
與 mytest.sql
表結構一致,spring-batch-mysql.sql
是 Spring Batch 自己提供的數據庫腳本。spring
<!--mybatis-plus--> <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-boot-starter</artifactId> <version>3.4.0</version> </dependency>
本示例會涉及三個數據庫,分別是 Spring Batch 自己數據庫,須要批處理的源數據庫,批處理的目標數據庫。所以須要處理多個數據庫,利用多套源策略,能夠很簡單就完成多套數據源的處理。簡單來講主要分爲如下幾個步驟:sql
mapper
包,entity
包,mapper.xml
文件包 SqlSessionFactory
mapper
關於多數據源多套源策略的詳細配置過程,能夠參考個人另外一篇文章《搞定SpringBoot多數據源(1):多套源策略》數據庫
關於 Spring Batch 的讀數據( ItemReader )、處理數據( ItemProcessor )、寫數據( ItemWriter )的配置流程,能夠參考前面系列文章,本文再也不詳細描述。咱們須要記住的是,當一個做業( Job )啓動,Spring Batch 是經過做業名稱( Job name)及 做業參數( JobParameters )做爲惟一標識來區分不一樣的做業。一個 Job 下能夠有多個做業步( Step ),每一個 Step 中就是有具體的操做邏輯(讀、處理、寫)。在 Job 和 Step 下的各個操做步驟間,如何傳遞,,這裏就須要理解 ExecutionContext 的概念。mybatis
在 Job 的運行及 Step 的運行過程當中,Spring Batch 提供 ExecutionContext 進行運行數據持久化,利用它,能夠根據業務進行數據共享,如用來重啓的靜態數據與狀態數據。以下圖:app
Execution Context 本質上來說就是一個 Map<String,Object>
,它是Spring Batch 框架提供的持久化與控制的 key/value 對,可讓開發者在 Step 運行或Job 運行過程當中保存須要進行持久化的狀態,它能夠。分爲兩類,一類是Job 運行的上下文(對應數據表:BATCH_JOB_EXECUTION_CONTEXT),另外一類是Step Execution的上下文(對應數據表BATCH_STEP_EXECUTION_CONTEXT)。兩類上下文關係:一個 Job 運行對應一個 Job Execution 的上下文(如上圖中藍色部分的 ExecutionContext ),每一個 Step 運行對應一個 Step Execution 上下文(如上圖中粉色部分的 ExecutionContext );同一個 Job 中的 Step Execution 共用 Job Execution 的上下文。也就是說,它們的做用範圍有區別。所以,若是同一個 Job 的不一樣 Step 間須要共享數據時,能夠經過 Job Execution 的上下文共享數據。根據 ExecutionContext 的共享數據特性,則能夠實如今不一樣步驟間傳遞數據。框架
一個 Job 啓動後,會生成一個 JobExecution ,用於存放和記錄 Job 運行的信息,一樣,在 Step 啓動後,也會有對應的 StepExecution 。如前面所說,在 JobExecution 和 StepExecution 中都會有一個 ExecutionContext ,用於存儲上下文。所以,數據傳遞的思路就是肯定數據使用範圍,而後經過 ExecutionContext 傳入數據,而後就能夠在對應的範圍內共享數據。如當前示例,須要 Job 範圍內共享數據,在讀組件( ItemReader )和寫組件( ItemWriter )中傳遞讀與寫數據的數量( size ),在 Job 結束時,輸出讀及寫的數據量。實際上 Spring Batch 會自動計算讀寫數量,本示例僅爲了顯示數據共享功能。
那麼,如何獲取對應的 Execution ?,Spring Batch 提供了 JobExecutionListener 和 StepExecutionListener 監聽器接口,經過實現監聽器接口,分別能夠在開啓做業前( beforeJob )和 完成做業後( afterJob )afterJob ),開啓做業步前( beforeStep)及 完成做業步後( afterStep )獲取對應的 Execution ,而後進行操做。
在自定義的 UserItemReader 和 UserItemWriter 中,實現 StepExecutionListener 接口,其中使用 StepExecution 做爲成員,從 beforeStep 中獲取。以下:
public class UserItemWriter implements ItemWriter<TargetUser>, StepExecutionListener { private StepExecution stepExecution; //...略 @Override public void beforeStep(StepExecution stepExecution) { this.stepExecution = stepExecution; } }
讀組件( UserItemReader )也使用一樣的方式。而在做業結束後,獲取參數,則能夠繼承 JobExecutionListenerSupport ,實現本身感興趣的方法,也從參數中獲取 JobExecution,而後獲取參數進行處理。
public class ParamJobEndListener extends JobExecutionListenerSupport { @Override public void afterJob(JobExecution jobExecution) {} }
因爲咱們須要在 Job 範圍內傳遞參數,獲取到 StepExecution 後,能夠得到相應的 JobExecution ,進而獲取 Job 對應的 executionContext,這樣,就能夠在 Job 範圍內共享參數數據了。以下是在讀組件中進行配置
ExecutionContext executionContext = stepExecution.getJobExecution().getExecutionContext(); executionContext.put(SyncConstants.PASS_PARAM_READ_NUM, items.size());
一樣在寫組件中,獲取到 ExecutionContext 後,能夠對參數進行處理。本示例中,是經過對 ItemReader 傳遞的處理數目參數進行累加處理,獲得結果。
@Override public void write(List<? extends TargetUser> items) { ExecutionContext executionContext = stepExecution.getJobExecution().getExecutionContext(); Object currentWriteNum = executionContext.get(SyncConstants.PASS_PARAM_WRITE_NUM); if (Objects.nonNull(currentWriteNum)) { log.info("currentWriteNum:{}", currentWriteNum); executionContext.put(SyncConstants.PASS_PARAM_WRITE_NUM, items.size()+(Integer)currentWriteNum); } else { executionContext.put(SyncConstants.PASS_PARAM_WRITE_NUM, items.size()); }
最後在做業結束後,在實現 JobExecutionListenerSupport 的接口中,afterJob 函數中,對參數進行輸出。
public class ParamJobEndListener extends JobExecutionListenerSupport { @Override public void afterJob(JobExecution jobExecution) { ExecutionContext executionContext = jobExecution.getExecutionContext(); Integer writeNum = (Integer)executionContext.get(SyncConstants.PASS_PARAM_WRITE_NUM); log.info(LogConstants.LOG_TAG + "writeNum:{}",writeNum); } }
前面說到在 Job 及 Step 範圍內,使用 ExecutionContext 進行數據共享,但,若是須要在 Job 啓動前設置參數,而且每次啓動輸入的參數是動態變化的(好比增量同步時,日期是基於上一次同步的時間或者ID),也就是說,每次運行,須要根據參數新建一個操做步驟(如 ItemReader、ItemWriter等),咱們知道,因爲在 Spring IOC 中加載的Bean,默認都是單例模式的,所以,須要每次運行新建,運行完銷燬,新建是在運行時進行的。這就須要用到StepScope 及後期綁定技術。
在以前的示例中,已出現過 StepScope,它的做用是提供了操做步驟的做用範圍,某個 Spring Bean 使用註解StepScope,則表示此 Bean 在做業步( Step )開始的時候初始化,在 Step 結束的時候銷燬,也就是說 Bean的做用範圍是在 Step 這個生命週期中。而 Spring Batch 經過屬性後期綁定技術,在運行期獲取屬性值,並使用 SPEL 的表達式進行屬性綁定。而在 StepScope 中,Spring Batch 框架提供 JobParameters,JobExecutionContext,StepExecutionContext,固然也可使用 Spring 容器中的 Bean ,如 JobExecution ,StepExecution。
一個 Job 是由 Job name 及 JobParameters 做爲惟一標識的,也就是說只有 job name 和 JobParameters 不一致時,Spring Batch 纔會啓動一個新的 Job,一致的話就看成是同一個 Job ,若 此 Job 未執行過,則執行;若已執行過且是 FAILED 狀態,則嘗試從新運行此 Job ,若已執行過且是 COMPLETED 狀態,則會報錯。
本示例中,Job 啓動時輸入時間參數,在 ItemReader 中使用 StepScope 註解,而後把時間參數綁定到 ItemReader 中,同時綁定 StepExecution ,以便於在 ItemReader 對時間參數及 StepExecution 進行操做。
在使用 JobLauncher 啓動 Job 時,是須要輸入 jobParameters 做爲參數的。所以能夠建立此對象,並設置參數。
JobParameters jobParameters = new JobParametersBuilder() .addLong("time",timMillis) .toJobParameters();
在配置 Step 時,須要建立ItemReader 的 Bean,爲了使用動態參數,在 ItemReader 中設置 Map 存放參數,並設置 StepExecution 爲成員,以便於後面使用 ExecutionContext。
public class UserItemReader implements ItemReader<User> { protected Map<String, Object> params; private StepExecution stepExecution; public void setStepExecution(StepExecution stepExecution) { this.stepExecution = stepExecution; } }
使用 StepScope 進行配置:
@Bean @StepScope public ItemReader paramItemReader(@Value("#{stepExecution}") StepExecution stepExecution, @Value("#{jobParameters['time']}") Long timeParam) { UserItemReader userItemReader = new UserItemReader(); //設置參數 Map<String, Object> params = CollUtil.newHashMap(); Date datetime = new Date(timeParam); params.put(SyncConstants.PASS_PARAM_DATETIME, datetime); userItemReader.setParams(params); userItemReader.setStepExecution(stepExecution); return userItemReader; }
注意:此時 ItemReader 不可再使用實現 StepExecutionListener 的方式來對 stepExecution 賦值,因爲 ItemReader 是動態綁定的,StepExecutionListener 將再也不起做用,所以須要在後期綁定中來綁定 stepExecution Bean 的方式來賦值。
ItemReader 獲取到 StepExecution 後便可獲取 ExecutionContext,而後能夠像前面說的使用 ExecutionContext 方式進行數據傳遞。以下:
ExecutionContext executionContext = stepExecution.getJobExecution().getExecutionContext(); //readNum參數 executionContext.put(SyncConstants.PASS_PARAM_READ_NUM, items.size()); //datetime參數 executionContext.put(SyncConstants.PASS_PARAM_DATETIME,params.get(SyncConstants.PASS_PARAM_DATETIME));
在 Job 和 Step 不一樣的數據範圍中,可以使用 ExecutionContext 共享數據。本文以傳遞處理數量爲例,使用 Mybatis-plus,基於 ExecutionContext ,結合 StepScope及後期綁定技術,實如今 Job 啓動傳入參數,而後在 ItemReader、ItemProcessor、ItemWriter 及 Job 完成後的數據共享及傳遞。若是你在使用 Spring Batch 過程當中須要進行數據共享與傳遞,請試試這種方式吧。
若是文章內容對你有幫助,歡迎轉發分享~
個人公衆號(搜索Mason技術記錄
),獲取更多技術記錄: