Spring Batch爲批處理提供了一個輕量化的解決方案,它根據批處理的須要迭代處理各類記錄,提供事物功能。可是Spring Batch僅僅適用於"脫機"場景,在處理的過程當中不能和外部進行任何交互,也不容許有任何輸入。java
如上圖,一般狀況下一個獨立的JVM程序就是僅僅用於處理批處理,而不要和其餘功能重疊。 在最後一層基礎設置(Infrastructure)部分主要分爲3個部分。JobLauncher
、Job
以及Step
。每個Step
又細分爲ItemReader
、ItemProcessor
、ItemWirte
。使用Spring Batch主要就是知道每個基礎設置負責的內容,而後在對應的設施中實現對應的業務。數據庫
當咱們構建一個批處理的過程時,必須注意如下原則:設計模式
一般狀況下,批處理的過程對系統和架構的設計要夠要求比較高,所以儘量的使用通用架構來處理批量數據處理,下降問題發生的可能性。Spring Batch是一個是一個輕量級的框架,適用於處理一些靈活並無到海量的數據。緩存
批處理應該儘量的簡單,儘可能避免在單個批處理中去執行過於複雜的任務。咱們能夠將任務分紅多個批處理或者多個步驟去實現。服務器
保證數據處理和物理數據緊密相連。籠統的說就是咱們在處理數據的過程當中有不少步驟,在某些步驟執行完時應該就寫入數據,而不是等全部都處理完。網絡
儘量減小系統資源的使用、尤爲是耗費大量資源的IO以及跨服務器引用,儘可能分配好數據處理的批次。數據結構
按期分析系統的IO使用狀況、SQL語句的執行狀況等,儘量的減小沒必要要的IO操做。優化的原則有:架構
不要在批處理中對相同的數據執行2次相同的操做。併發
對於批處理程序而言應該在批處理啓動以前就分配足夠的內存,以避免處理的過程當中去從新申請新的內存頁。框架
對數據的完整性應該從最差的角度來考慮,每一步的處理都應該創建完備的數據校驗。
對於數據的總量咱們應該有一個和數據記錄在數據結構的某個字段 上。
全部的批處理系統都須要進行壓力測試。
若是整個批處理的過程是基於文件系統,在處理的過程當中請切記完成文件的備份以及文件內容的校驗。
和軟件開發的設計模式同樣,批處理也有各類各樣的現成模式可供參考。當一個開發(設計)人員開始執行批處理任務時,應該將業務邏輯拆分爲一下的步驟或者板塊分批執行:
數據轉換:某個(某些)批處理的外部數據可能來自不一樣的外部系統或者外部提供者,這些數據的結構千差萬別。在統一進行批量數據處理以前須要對這些數據進行轉換,合併爲一個統一的結構。所以在數據開始真正的執行業務處理以前,先要使用其餘的方法或者一些批處理任務將這些數據轉換爲統一的格式。
數據校驗:批處理是對大量數據進行處理,而且數據的來源千差萬別,因此批處理的輸入數據須要對數據的完整性性進行校驗(好比校驗字段數據是否缺失)。另外批處理輸出的數據也須要進行合適的校驗(例如處理了100條數據,校驗100條數據是否校驗成功)
提取數據:批處理的工做是逐條從數據庫或目標文件讀取記錄(records),提取時能夠經過一些規則從數據源中進行數據篩選。
數據實時更新處理:根據業務要求,對實時數據進行處理。某些時候一行數據記錄的處理須要綁定在一個事物之下。
輸出記錄到標準的文檔格式:數據處理完成以後須要根據格式寫入到對應的外部數據系統中。
以上五個步驟是一個標準的數據批處理過程,Spring batch框架爲業務實現提供了以上幾個功能入口。
某些狀況須要實現對數據進行額外處理,在進入批處理以前經過其餘方式將數據進行處理。主要內容有:
排序:因爲批處理是以獨立的行數據(record)進行處理的,在處理的時候並不知道記錄先後關係。所以若是須要對總體數據進行排序,最好事先使用其餘方式完成。
分割:數據拆分也建議使用獨立的任務來完成。理由相似排序,由於批處理的過程都是以行記錄爲基本處理單位的,沒法再對分割以後的數據進行擴展處理。
合併:理由如上。
批處理的數據源一般包括:
在業務中止的窗口期進行批數據處理,例如銀行對帳、清結算都是在12點日切到黎明之間。簡稱爲離線處理。
在線或併發批處理,可是須要對實際業務或用戶的響應進行考量。
並行處理多種不一樣的批處理做業。
分區處理:將相同的數據分爲不一樣的區塊,而後按照相同的步驟分爲許多獨立的批處理任務對不一樣的區塊進行處理。
以上處理過程進行組合。
在執行2,3點批處理時須要注意事物隔離等級。
下圖是批處理的核心流程圖。
(圖片來源於網絡)
Spring Batch一樣按照批處理的標準實現了各個層級的組件。而且在框架級別保證數據的完整性和事物性。
如圖所示,在一個標準的批處理任務中涵蓋的核心概念有JobLauncher
、Job
、Step
,一個Job
能夠涵蓋多個Step
,一個Job
對應一個啓動的JobLauncher
。一個Step
中分爲ItemReader
、ItemProcessor
、ItemWriter
,根據字面意思它們分別對應數據提取、數據處理和數據寫入。此外JobLauncher
、Job
、Step
會產生元數據(Metadata),它們會被存儲到JobRepository
中。
簡單的說Job
是封裝一個批處理過程的實體,與其餘的Spring項目相似,Job
能夠經過XML或Java類配置,稱爲「Job Configuration」。以下圖Job
是單個批處理的最頂層。
爲了便於理解,能夠簡單的將Job
理解爲是每一步(Step
)實例的容器。他結合了多個Step
,爲它們提供統一的服務同時也爲Step
提供個性化的服務,好比步驟重啓。一般狀況下Job的配置包含如下內容
:
Step
執行實例。Step
是否能夠重啓。Spring Batch爲Job接口提供了默認的實現——SimpleJob
,其中實現了一些標準的批處理方法。下面的代碼展現瞭如可注入一個Job
。
@Bean public Job footballJob() { return this.jobBuilderFactory.get("footballJob") //get中命名了Job的名稱 .start(playerLoad()) //playerLoad、gameLoad、playerSummarization都是Step .next(gameLoad()) .next(playerSummarization()) .end() .build(); }
JobInstance
是指批處理做業運行的實例。例如一個批處理必須在天天執行一次,系統在2019年5月1日執行了一次咱們稱之爲2019-05-01的實例,相似的還會有2019-05-0二、2019-05-03實例。一般狀況下,一個JobInstance
對應一個JobParameters
,對應多個JobExecution
。(JobParameters
、JobExecution
見後文)。同一個JobInstance
具備相同的上下文(ExecutionContext
內容見後文)。
前面討論了JobInstance
與Job
的區別,可是具體的區別內容都是經過JobParameters
體現的。一個JobParameters
對象中包含了一系列Job運行相關的參數,這些參數能夠用於參考或者用於實際的業務使用。對應的關係以下圖:
當咱們執行2個不一樣的JobInstance
時JobParameters
中的屬性都會有差別。能夠簡單的認爲一個JobInstance
的標識就是Job
+JobParameters
。
JobExecution
能夠理解爲單次運行Job
的容器。一次JobInstance
執行的結果多是成功、也多是失敗。可是對於Spring Batch框架而言,只有返回運行成功纔會視爲完成一次批處理。例如2019-05-01執行了一次JobInstance
,可是執行的過程失敗,所以第二次還會有一個「相同的」的JobInstance
被執行。
Job
用於定義批處理如何執行,JobInstance
純粹的就是一個處理對象,把全部的運行內容和信息組織在一塊兒,主要是爲了當面臨問題時定義正確的重啓參數。而JobExecution
是運行時的「容器」,記錄動態運行時的各類屬性和上線文。他包括的信息有:
屬性 | 說明 |
---|---|
status | 狀態類名爲BatchStatus ,它指示了執行的狀態。在執行的過程當中狀態爲BatchStatus#STARTED ,失敗:BatchStatus#FAILED ,完成:BatchStatus#COMPLETED |
startTime | java.util.Date 對象,標記批處理任務啓動的系統時間,批處理任務未啓動數據爲空 |
endTime | java.util.Date 對象,結束時間不管是否成功都包含該數據,如未處理完爲空 |
exitStatus | ExitStatus 類,記錄運行結果。 |
createTime | java.util.Date ,JobExecution 的建立時間,某些使用execution已經建立可是並未開始運行。 |
lastUpdate | java.util.Date ,最後一次更新時間 |
executionContext | 批處理任務執行的全部用戶數據 |
failureExceptions | 記錄在執行Job時的異常,對於排查問題很是有用 |
以上這些內容Spring Batch都會經過JobRepository
進行持久化(這些信息官方文成稱之爲MetaData),所以在對應的數據源中能夠看到下列信息:
BATCH_JOB_INSTANCE:
JOB_INST_ID | JOB_NAME |
---|---|
1 | EndOfDayJob |
BATCH_JOB_EXECUTION_PARAMS:
JOB_EXECUTION_ID | TYPE_CD | KEY_NAME | DATE_VAL | IDENTIFYING |
---|---|---|---|---|
1 | DATE | schedule.Date | 2019-01-01 | TRUE |
BATCH_JOB_EXECUTION:
JOB_EXEC_ID | JOB_INST_ID | START_TIME | END_TIME | STATUS |
---|---|---|---|---|
1 | 1 | 2019-01-01 21:00 | 2017-01-01 21:30 | FAILED |
當某個Job
批處理任務失敗以後會在對應的數據庫表中路對應的狀態。假設1月1號執行的任務失敗,技術團隊花費了大量的時間解決這個問題到了次日21才繼續執行這個任務。
BATCH_JOB_INSTANCE:
JOB_INST_ID | JOB_NAME |
---|---|
1 | EndOfDayJob |
2 | EndOfDayJob |
BATCH_JOB_EXECUTION_PARAMS:
JOB_EXECUTION_ID | TYPE_CD | KEY_NAME | DATE_VAL | IDENTIFYING |
---|---|---|---|---|
1 | DATE | schedule.Date | 2019-01-01 | TRUE |
2 | DATE | schedule.Date | 2019-01-01 | TRUE |
3 | DATE | schedule.Date | 2019-01-02 | TRUE |
BATCH_JOB_EXECUTION:
JOB_EXEC_ID | JOB_INST_ID | START_TIME | END_TIME | STATUS |
---|---|---|---|---|
1 | 1 | 2019-01-01 21:00 | 2017-01-01 21:30 | FAILED |
2 | 1 | 2019-01-02 21:00 | 2017-01-02 21:30 | COMPLETED |
3 | 2 | 2019-01-02 21:31 | 2017-01-02 22:29 | COMPLETED |
從數據上看好似JobInstance
是一個接一個順序執行的,可是對於Spring Batch並無進行任何控制。不一樣的JobInstance
頗有多是同時在運行(相同的JobInstance
同時運行會拋出JobExecutionAlreadyRunningException
異常)。
Step
是批處理重複運行的最小單元,它按照順序定義了一次執行的必要過程。所以每一個Job
能夠視做由一個或多個多個Step
組成。一個Step
包含了全部全部進行批處理的必要信息,這些信息的內容是由開發人員決定的並無統一的標準。一個Step
能夠很簡單,也能夠很複雜。他能夠是複雜業務的組合,也有可能僅僅用於遷移數據。與JobExecution
的概念相似,Step
也有特定的StepExecution
,關係結構以下:
StepExecution
表示單次執行Step的容器,每次Step
執行時都會有一個新的StepExecution
被建立。與JobExecution
不一樣的是,當某個Step
執行失敗後默認並不會從新執行。StepExecution
包含如下屬性:
屬性 | 說明 |
---|---|
status | 狀態類名爲BatchStatus ,它指示了執行的狀態。在執行的過程當中狀態爲BatchStatus#STARTED ,失敗:BatchStatus#FAILED ,完成:BatchStatus#COMPLETED |
startTime | java.util.Date 對象,標記StepExecution 啓動的系統時間,未啓動數據爲空 |
endTime | java.util.Date 對象,結束時間,不管是否成功都包含該數據,如未處理完爲空 |
exitStatus | ExitStatus 類,記錄運行結果。 |
createTime | java.util.Date ,JobExecution 的建立時間,某些使用execution已經建立可是並未開始運行。 |
lastUpdate | java.util.Date ,最後一次更新時間 |
executionContext | 批處理任務執行的全部用戶數據 |
readCount | 成功讀取數據的次數 |
wirteCount | 成功寫入數據的次數 |
commitCount | 成功提交數據的次數 |
rollbackCount | 迴歸數據的次數,有業務代碼觸發 |
readSkipCount | 當讀數據發生錯誤時跳過處理的次數 |
processSkipCount | 當處理過程發生錯誤,跳過處理的次數 |
filterCount | 被過濾規則攔截未處理的次數 |
writeSkipCount | 寫數據失敗,跳過處理的次數 |
前文已經屢次提到ExecutionContext
。能夠簡單的認爲ExecutionContext
提供了一個Key/Value機制,在StepExecution
和JobExecution
對象的任何位置均可以獲取到ExecutionContext
中的任何數據。最有價值的做用是記錄數據的執行位置,以便發生重啓時候從對應的位置繼續執行:
executionContext.putLong(getKey(LINES_READ_COUNT), reader.getPosition())
好比在任務中有一個名爲「loadData」的Step
,他的做用是從文件中讀取數據寫入到數據庫,當第一次執行失敗後,數據庫中有以下數據:
BATCH_JOB_INSTANCE:
JOB_INST_ID | JOB_NAME |
---|---|
1 | EndOfDayJob |
BATCH_JOB_EXECUTION_PARAMS:
JOB_INST_ID | TYPE_CD | KEY_NAME | DATE_VAL |
---|---|---|---|
1 | DATE | schedule.Date | 2019-01-01 |
BATCH_JOB_EXECUTION:
JOB_EXEC_ID | JOB_INST_ID | START_TIME | END_TIME | STATUS |
---|---|---|---|---|
1 | 1 | 2017-01-01 21:00 | 2017-01-01 21:30 | FAILED |
BATCH_STEP_EXECUTION:
STEP_EXEC_ID | JOB_EXEC_ID | STEP_NAME | START_TIME | END_TIME | STATUS |
---|---|---|---|---|---|
1 | 1 | loadData | 2017-01-01 21:00 | 2017-01-01 21:30 | FAILED |
BATCH_STEP_EXECUTION_CONTEXT: |STEP_EXEC_ID|SHORT_CONTEXT| |---|---| |1|{piece.count=40321}|
在上面的例子中,Step
運行30分鐘處理了40321個「pieces」,咱們姑且認爲「pieces」表示行間的行數(實際就是每一個Step完成循環處理的個數)。這個值會在每一個commit
以前被更新記錄在ExecutionContext
中(更新須要用到StepListener
後文會詳細說明)。當咱們再次重啓這個Job
時並記錄在BATCH_STEP_EXECUTION_CONTEXT中的數據會加載到ExecutionContext
中,這樣當咱們繼續執行批處理任務時能夠從上一次中斷的位置繼續處理。例以下面的代碼在ItemReader
中檢查上次執行的結果,並從中斷的位置繼續執行:
if (executionContext.containsKey(getKey(LINES_READ_COUNT))) { log.debug("Initializing for restart. Restart data is: " + executionContext); long lineCount = executionContext.getLong(getKey(LINES_READ_COUNT)); LineReader reader = getReader(); Object record = ""; while (reader.getPosition() < lineCount && record != null) { record = readLine(); } }
ExecutionContext
是根據JobInstance
進行管理的,所以只要是相同的實例都會具有相同的ExecutionContext(不管是否中止)。此外經過如下方法均可以得到一個ExecutionContext
:
ExecutionContext ecStep = stepExecution.getExecutionContext(); ExecutionContext ecJob = jobExecution.getExecutionContext();
可是這2個ExecutionContext
並不相同,前者是在一個Step
中每次Commit
數據之間共享,後者是在Step
與Step
之間共享。
JobRepository
是全部前面介紹的對象實例的持久化機制。他爲JobLauncher
、Job
、Step
的實現提供了CRUD操做。當一個Job
第一次被啓動時,一個JobExecution
會從數據源中獲取到,同時在執行的過程當中StepExecution
、JobExecution
的實現都會記錄到數據源中。使用@EnableBatchProcessing
註解後JobRepository
會進行自動化配置。
JobLauncher
爲Job
的啓動運行提供了一個邊界的入口,在啓動Job
的同時還能夠定製JobParameters
:
public interface JobLauncher { public JobExecution run(Job job, JobParameters jobParameters) throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, JobParametersInvalidException; }