Spring Batch(1)——數據批處理概念

批處理的核心場景

  • 從某個位置讀取大量的記錄,位置能夠是數據庫、文件或者外部推送隊列(MQ)。
  • 根據業務須要實時處理讀取的數據。
  • 將處理後的數據寫入某個位置,能夠是數據庫、文件或者推送到隊列。

Spring Batch能解決的批處理場景

Spring Batch爲批處理提供了一個輕量化的解決方案,它根據批處理的須要迭代處理各類記錄,提供事物功能。可是Spring Batch僅僅適用於"脫機"場景,在處理的過程當中不能和外部進行任何交互,也不容許有任何輸入。java

Spring Batch的目標

  • 開發人員僅關注業務邏輯,底層框架的交互交由Spring Batch去處理。
  • 可以清晰分離業務與框架,框架已經限定了批處理的業務切入點,業務開發只需關注這些切入點(Read、Process、Write)。
  • 提供開箱即用的通用接口。
  • 快速輕鬆的融入Spring 框架,基於Spring Framework可以快速擴展各類功能。
  • 全部現有核心服務都應易於更換或擴展,而不會對基礎架構層產生任何影響。

Spring Batch結構

Spring Batch結構

如上圖,一般狀況下一個獨立的JVM程序就是僅僅用於處理批處理,而不要和其餘功能重疊。 在最後一層基礎設置(Infrastructure)部分主要分爲3個部分。JobLauncherJob以及Step。每個Step又細分爲ItemReaderItemProcessorItemWirte。使用Spring Batch主要就是知道每個基礎設置負責的內容,而後在對應的設施中實現對應的業務。數據庫

Spring Batch 批處理原則與建議

當咱們構建一個批處理的過程時,必須注意如下原則:設計模式

  1. 一般狀況下,批處理的過程對系統和架構的設計要夠要求比較高,所以儘量的使用通用架構來處理批量數據處理,下降問題發生的可能性。Spring Batch是一個是一個輕量級的框架,適用於處理一些靈活並無到海量的數據。緩存

  2. 批處理應該儘量的簡單,儘可能避免在單個批處理中去執行過於複雜的任務。咱們能夠將任務分紅多個批處理或者多個步驟去實現。服務器

  3. 保證數據處理和物理數據緊密相連。籠統的說就是咱們在處理數據的過程當中有不少步驟,在某些步驟執行完時應該就寫入數據,而不是等全部都處理完。網絡

  4. 儘量減小系統資源的使用、尤爲是耗費大量資源的IO以及跨服務器引用,儘可能分配好數據處理的批次。數據結構

  5. 按期分析系統的IO使用狀況、SQL語句的執行狀況等,儘量的減小沒必要要的IO操做。優化的原則有:架構

    1. 儘可能在一次事物中對同一數據進行讀取或寫緩存。
    2. 一次事物中,儘量在開始就讀取全部須要使用的數據。
    3. 優化索引,觀察SQL的執行狀況,儘可能使用主鍵索引,儘可能避免全表掃描或過多的索引掃描。
    4. SQL中的where儘量經過主鍵查詢。
  6. 不要在批處理中對相同的數據執行2次相同的操做。併發

  7. 對於批處理程序而言應該在批處理啓動以前就分配足夠的內存,以避免處理的過程當中去從新申請新的內存頁。框架

  8. 對數據的完整性應該從最差的角度來考慮,每一步的處理都應該創建完備的數據校驗。

  9. 對於數據的總量咱們應該有一個和數據記錄在數據結構的某個字段 上。

  10. 全部的批處理系統都須要進行壓力測試。

  11. 若是整個批處理的過程是基於文件系統,在處理的過程當中請切記完成文件的備份以及文件內容的校驗。

批處理的通用策略

和軟件開發的設計模式同樣,批處理也有各類各樣的現成模式可供參考。當一個開發(設計)人員開始執行批處理任務時,應該將業務邏輯拆分爲一下的步驟或者板塊分批執行:

  1. 數據轉換:某個(某些)批處理的外部數據可能來自不一樣的外部系統或者外部提供者,這些數據的結構千差萬別。在統一進行批量數據處理以前須要對這些數據進行轉換,合併爲一個統一的結構。所以在數據開始真正的執行業務處理以前,先要使用其餘的方法或者一些批處理任務將這些數據轉換爲統一的格式。

  2. 數據校驗:批處理是對大量數據進行處理,而且數據的來源千差萬別,因此批處理的輸入數據須要對數據的完整性性進行校驗(好比校驗字段數據是否缺失)。另外批處理輸出的數據也須要進行合適的校驗(例如處理了100條數據,校驗100條數據是否校驗成功)

  3. 提取數據:批處理的工做是逐條從數據庫或目標文件讀取記錄(records),提取時能夠經過一些規則從數據源中進行數據篩選。

  4. 數據實時更新處理:根據業務要求,對實時數據進行處理。某些時候一行數據記錄的處理須要綁定在一個事物之下。

  5. 輸出記錄到標準的文檔格式:數據處理完成以後須要根據格式寫入到對應的外部數據系統中。

以上五個步驟是一個標準的數據批處理過程,Spring batch框架爲業務實現提供了以上幾個功能入口。

數據額外處理

某些狀況須要實現對數據進行額外處理,在進入批處理以前經過其餘方式將數據進行處理。主要內容有:

  1. 排序:因爲批處理是以獨立的行數據(record)進行處理的,在處理的時候並不知道記錄先後關係。所以若是須要對總體數據進行排序,最好事先使用其餘方式完成。

  2. 分割:數據拆分也建議使用獨立的任務來完成。理由相似排序,由於批處理的過程都是以行記錄爲基本處理單位的,沒法再對分割以後的數據進行擴展處理。

  3. 合併:理由如上。

常規數據源

批處理的數據源一般包括:

  1. 數據庫驅動連接(連接到數據庫)對數據進行逐條提取。
  2. 文件驅動連接,對文件數據進行提取
  3. 消息驅動連接,從MQ、kafka等消息系統提取數據。

典型的處理過程

  1. 在業務中止的窗口期進行批數據處理,例如銀行對帳、清結算都是在12點日切到黎明之間。簡稱爲離線處理。

  2. 在線或併發批處理,可是須要對實際業務或用戶的響應進行考量。

  3. 並行處理多種不一樣的批處理做業。

  4. 分區處理:將相同的數據分爲不一樣的區塊,而後按照相同的步驟分爲許多獨立的批處理任務對不一樣的區塊進行處理。

  5. 以上處理過程進行組合。

在執行2,3點批處理時須要注意事物隔離等級。

Spring Batch批處理的核心概念

下圖是批處理的核心流程圖。 批處理流程

(圖片來源於網絡)

Spring Batch一樣按照批處理的標準實現了各個層級的組件。而且在框架級別保證數據的完整性和事物性。

如圖所示,在一個標準的批處理任務中涵蓋的核心概念有JobLauncherJobStep,一個Job能夠涵蓋多個Step,一個Job對應一個啓動的JobLauncher。一個Step中分爲ItemReaderItemProcessorItemWriter,根據字面意思它們分別對應數據提取、數據處理和數據寫入。此外JobLauncherJobStep會產生元數據(Metadata),它們會被存儲到JobRepository中。

Job

簡單的說Job是封裝一個批處理過程的實體,與其餘的Spring項目相似,Job能夠經過XML或Java類配置,稱爲「Job Configuration」。以下圖Job是單個批處理的最頂層。

爲了便於理解,能夠簡單的將Job理解爲是每一步(Step)實例的容器。他結合了多個Step,爲它們提供統一的服務同時也爲Step提供個性化的服務,好比步驟重啓。一般狀況下Job的配置包含如下內容

  • Job的名稱
  • 定義和排序Step執行實例。
  • 標記每一個Step是否能夠重啓。

Spring Batch爲Job接口提供了默認的實現——SimpleJob,其中實現了一些標準的批處理方法。下面的代碼展現瞭如可注入一個Job

@Bean
public Job footballJob() {
    return this.jobBuilderFactory.get("footballJob") //get中命名了Job的名稱
                     .start(playerLoad())  //playerLoad、gameLoad、playerSummarization都是Step
                     .next(gameLoad())
                     .next(playerSummarization())
                     .end()
                     .build();
}

JobInstance

JobInstance是指批處理做業運行的實例。例如一個批處理必須在天天執行一次,系統在2019年5月1日執行了一次咱們稱之爲2019-05-01的實例,相似的還會有2019-05-0二、2019-05-03實例。一般狀況下,一個JobInstance對應一個JobParameters,對應多個JobExecution。(JobParametersJobExecution見後文)。同一個JobInstance具備相同的上下文(ExecutionContext內容見後文)。

JobParameters

前面討論了JobInstanceJob的區別,可是具體的區別內容都是經過JobParameters體現的。一個JobParameters對象中包含了一系列Job運行相關的參數,這些參數能夠用於參考或者用於實際的業務使用。對應的關係以下圖:

JobParameters

當咱們執行2個不一樣的JobInstanceJobParameters中的屬性都會有差別。能夠簡單的認爲一個JobInstance的標識就是Job+JobParameters

JobExecution

JobExecution能夠理解爲單次運行Job的容器。一次JobInstance執行的結果多是成功、也多是失敗。可是對於Spring Batch框架而言,只有返回運行成功纔會視爲完成一次批處理。例如2019-05-01執行了一次JobInstance,可是執行的過程失敗,所以第二次還會有一個「相同的」的JobInstance被執行。

Job用於定義批處理如何執行,JobInstance純粹的就是一個處理對象,把全部的運行內容和信息組織在一塊兒,主要是爲了當面臨問題時定義正確的重啓參數。而JobExecution是運行時的「容器」,記錄動態運行時的各類屬性和上線文。他包括的信息有:

屬性 說明
status 狀態類名爲BatchStatus,它指示了執行的狀態。在執行的過程當中狀態爲BatchStatus#STARTED,失敗:BatchStatus#FAILED,完成:BatchStatus#COMPLETED
startTime java.util.Date對象,標記批處理任務啓動的系統時間,批處理任務未啓動數據爲空
endTime java.util.Date對象,結束時間不管是否成功都包含該數據,如未處理完爲空
exitStatus ExitStatus類,記錄運行結果。
createTime java.util.Date,JobExecution的建立時間,某些使用execution已經建立可是並未開始運行。
lastUpdate java.util.Date,最後一次更新時間
executionContext 批處理任務執行的全部用戶數據
failureExceptions 記錄在執行Job時的異常,對於排查問題很是有用

以上這些內容Spring Batch都會經過JobRepository進行持久化(這些信息官方文成稱之爲MetaData),所以在對應的數據源中能夠看到下列信息:

BATCH_JOB_INSTANCE:

JOB_INST_ID JOB_NAME
1 EndOfDayJob

BATCH_JOB_EXECUTION_PARAMS:

JOB_EXECUTION_ID TYPE_CD KEY_NAME DATE_VAL IDENTIFYING
1 DATE schedule.Date 2019-01-01 TRUE

BATCH_JOB_EXECUTION:

JOB_EXEC_ID JOB_INST_ID START_TIME END_TIME STATUS
1 1 2019-01-01 21:00 2017-01-01 21:30 FAILED

當某個Job批處理任務失敗以後會在對應的數據庫表中路對應的狀態。假設1月1號執行的任務失敗,技術團隊花費了大量的時間解決這個問題到了次日21才繼續執行這個任務。

BATCH_JOB_INSTANCE:

JOB_INST_ID JOB_NAME
1 EndOfDayJob
2 EndOfDayJob

BATCH_JOB_EXECUTION_PARAMS:

JOB_EXECUTION_ID TYPE_CD KEY_NAME DATE_VAL IDENTIFYING
1 DATE schedule.Date 2019-01-01 TRUE
2 DATE schedule.Date 2019-01-01 TRUE
3 DATE schedule.Date 2019-01-02 TRUE

BATCH_JOB_EXECUTION:

JOB_EXEC_ID JOB_INST_ID START_TIME END_TIME STATUS
1 1 2019-01-01 21:00 2017-01-01 21:30 FAILED
2 1 2019-01-02 21:00 2017-01-02 21:30 COMPLETED
3 2 2019-01-02 21:31 2017-01-02 22:29 COMPLETED

從數據上看好似JobInstance是一個接一個順序執行的,可是對於Spring Batch並無進行任何控制。不一樣的JobInstance頗有多是同時在運行(相同的JobInstance同時運行會拋出JobExecutionAlreadyRunningException異常)。

Step

Step是批處理重複運行的最小單元,它按照順序定義了一次執行的必要過程。所以每一個Job能夠視做由一個或多個多個Step組成。一個Step包含了全部全部進行批處理的必要信息,這些信息的內容是由開發人員決定的並無統一的標準。一個Step能夠很簡單,也能夠很複雜。他能夠是複雜業務的組合,也有可能僅僅用於遷移數據。與JobExecution的概念相似,Step也有特定的StepExecution,關係結構以下:

Step

StepExecution

StepExecution表示單次執行Step的容器,每次Step執行時都會有一個新的StepExecution被建立。與JobExecution不一樣的是,當某個Step執行失敗後默認並不會從新執行。StepExecution包含如下屬性:

屬性 說明
status 狀態類名爲BatchStatus,它指示了執行的狀態。在執行的過程當中狀態爲BatchStatus#STARTED,失敗:BatchStatus#FAILED,完成:BatchStatus#COMPLETED
startTime java.util.Date對象,標記StepExecution啓動的系統時間,未啓動數據爲空
endTime java.util.Date對象,結束時間,不管是否成功都包含該數據,如未處理完爲空
exitStatus ExitStatus類,記錄運行結果。
createTime java.util.Date,JobExecution的建立時間,某些使用execution已經建立可是並未開始運行。
lastUpdate java.util.Date,最後一次更新時間
executionContext 批處理任務執行的全部用戶數據
readCount 成功讀取數據的次數
wirteCount 成功寫入數據的次數
commitCount 成功提交數據的次數
rollbackCount 迴歸數據的次數,有業務代碼觸發
readSkipCount 當讀數據發生錯誤時跳過處理的次數
processSkipCount 當處理過程發生錯誤,跳過處理的次數
filterCount 被過濾規則攔截未處理的次數
writeSkipCount 寫數據失敗,跳過處理的次數

ExecutionContext

前文已經屢次提到ExecutionContext。能夠簡單的認爲ExecutionContext提供了一個Key/Value機制,在StepExecutionJobExecution對象的任何位置均可以獲取到ExecutionContext中的任何數據。最有價值的做用是記錄數據的執行位置,以便發生重啓時候從對應的位置繼續執行:

executionContext.putLong(getKey(LINES_READ_COUNT), reader.getPosition())

好比在任務中有一個名爲「loadData」的Step,他的做用是從文件中讀取數據寫入到數據庫,當第一次執行失敗後,數據庫中有以下數據:

BATCH_JOB_INSTANCE:

JOB_INST_ID JOB_NAME
1 EndOfDayJob

BATCH_JOB_EXECUTION_PARAMS:

JOB_INST_ID TYPE_CD KEY_NAME DATE_VAL
1 DATE schedule.Date 2019-01-01

BATCH_JOB_EXECUTION:

JOB_EXEC_ID JOB_INST_ID START_TIME END_TIME STATUS
1 1 2017-01-01 21:00 2017-01-01 21:30 FAILED

BATCH_STEP_EXECUTION:

STEP_EXEC_ID JOB_EXEC_ID STEP_NAME START_TIME END_TIME STATUS
1 1 loadData 2017-01-01 21:00 2017-01-01 21:30 FAILED

BATCH_STEP_EXECUTION_CONTEXT: |STEP_EXEC_ID|SHORT_CONTEXT| |---|---| |1|{piece.count=40321}|

在上面的例子中,Step運行30分鐘處理了40321個「pieces」,咱們姑且認爲「pieces」表示行間的行數(實際就是每一個Step完成循環處理的個數)。這個值會在每一個commit以前被更新記錄在ExecutionContext中(更新須要用到StepListener後文會詳細說明)。當咱們再次重啓這個Job時並記錄在BATCH_STEP_EXECUTION_CONTEXT中的數據會加載到ExecutionContext中,這樣當咱們繼續執行批處理任務時能夠從上一次中斷的位置繼續處理。例以下面的代碼在ItemReader中檢查上次執行的結果,並從中斷的位置繼續執行:

if (executionContext.containsKey(getKey(LINES_READ_COUNT))) {
    log.debug("Initializing for restart. Restart data is: " + executionContext);

    long lineCount = executionContext.getLong(getKey(LINES_READ_COUNT));

    LineReader reader = getReader();

    Object record = "";
    while (reader.getPosition() < lineCount && record != null) {
        record = readLine();
    }
}

ExecutionContext是根據JobInstance進行管理的,所以只要是相同的實例都會具有相同的ExecutionContext(不管是否中止)。此外經過如下方法均可以得到一個ExecutionContext

ExecutionContext ecStep = stepExecution.getExecutionContext();
ExecutionContext ecJob = jobExecution.getExecutionContext();

可是這2個ExecutionContext並不相同,前者是在一個Step中每次Commit數據之間共享,後者是在StepStep之間共享。

JobRepository

JobRepository是全部前面介紹的對象實例的持久化機制。他爲JobLauncherJobStep的實現提供了CRUD操做。當一個Job第一次被啓動時,一個JobExecution會從數據源中獲取到,同時在執行的過程當中StepExecutionJobExecution的實現都會記錄到數據源中。使用@EnableBatchProcessing註解後JobRepository會進行自動化配置。

JobLauncher

JobLauncherJob的啓動運行提供了一個邊界的入口,在啓動Job的同時還能夠定製JobParameters

public interface JobLauncher {
	public JobExecution run(Job job, JobParameters jobParameters)
				throws JobExecutionAlreadyRunningException, JobRestartException,
					   JobInstanceAlreadyCompleteException, JobParametersInvalidException;
}
相關文章
相關標籤/搜索