什麼是 Spring Batch
介紹
Spring Batch 做爲 Spring 的子項目,是一款基於 Spring 的企業批處理框架。經過它能夠構建出健壯的企業批處理應用。Spring Batch 不只提供了統一的讀寫接口、豐富的任務處理方式、靈活的事務管理及併發處理,同時還支持日誌、監控、任務重啓與跳過等特性,大大簡化了批處理應用開發,將開發人員從複雜的任務配置管理過程當中解放出來,使他們能夠更多地去關注核心的業務處理過程。java
Spring Batch把批處理簡化爲Job和Job step兩部分,在Job step中,把數據處理分爲讀數據(Reader)、處理數據(Processor)、寫數據(Writer)三個步驟,異常處理機制分爲跳過、重試、重啓三種,做業方式分爲多線程、並行、遠程、分區四種。開發者在開發過程當中,大部分工做是根據業務要求編寫Reader、Processor和Writer便可,提升了批處理開發的效率。同時Spring Batch自己也提供了不少默認的Reader和Writer,開箱即用。spring
官網詳細介紹:https://spring.io/projects/spring-batch數據庫
架構組件分類
-
Application(應用層):包含開發者應用Spring-batch編寫的全部批處理做業和自定義代碼;設計模式
-
Batch Core(核心層):包含加載和控制批處理做業所必需的核心類,它包含了Job,Step,JobLauncher的實現;緩存
-
Infrastructure(基礎架構層):基礎架構層包含了Reader(ItemReader),Writer(ItemWriter),Services能夠被應用層和核心層使用;服務器
優點
-
豐富的開箱即用組件 開箱即用組件包括各類資源的讀、寫。讀/寫:支持文本文件讀/寫、XML文件讀/寫、數據庫讀/寫、JMS隊列讀/寫等。還提供做業倉庫,做業調度器等基礎設施,大大簡化開發複雜度。數據結構
-
面向chunk處理 支持屢次讀、一次寫、避免屢次對資源的寫入,大幅提高批處理效率。多線程
-
事務管理能力 默認採用Spring提供的聲明式事務管理模型,面向Chunk的操做支持事務管理,同時支持爲每一個tasklet操做設置細粒度的事務配置:隔離級別、傳播行爲、超時設置等。架構
-
元數據管理 自動記錄Job和Step的執行狀況、包括成功、失敗、失敗的異常信息、執行次數、重試次數、跳過次數、執行時間等,方便後期的維護和查看。併發
-
易監控的批處理應用 提供了靈活的監控模式,包括直接查看數據庫、經過Spring Batch提供的API查看、JMX控制檯查看等。其中還說到Spring Batch Admin,不過這個項目已不維護,改成用Spring Cloud Data Flow了。
-
豐富的流程定義 支持順序任務、條件分支任務、基於這兩種任務能夠組織複雜的任務流程。
-
健壯的批處理應用 支持做業的跳過、重試、重啓能力、避免因錯誤致使批處理做業的異常中斷。
-
易擴展的批處理應用 擴展機制包括多線程執行一個Step(Multithreaded step)、多線程並行執行多個Step(Parallelizing step)、遠程執行做業(Remote chunking)、分區執行(partitioning step)。
-
複用企業現有IT資產 提供多種Adapter能力,使得企業現有的服務能夠方便集成到批處理應用中。避免從新開發、達到複用企業遺留的服務資產。
使用場景
-
按期提交批處理任務
-
並行批處理
-
企業消息驅動處理
-
大規模並行批處理
-
失敗後手動或定時重啓
-
按順序處理依賴的任務(可擴展爲工做流驅動的批處理)
-
部分處理:跳過記錄(例如,回滾時)
-
批處理事務
原則與建議
當咱們構建一個批處理的過程時,必須注意如下原則:
一、一般狀況下,批處理的過程對系統和架構的設計要夠要求比較高,所以儘量的使用通用架構來處理批量數據處理,下降問題發生的可能性。Spring Batch是一個是一個輕量級的框架,適用於處理一些靈活並無到海量的數據。
二、批處理應該儘量的簡單,儘可能避免在單個批處理中去執行過於複雜的任務。咱們能夠將任務分紅多個批處理或者多個步驟去實現。
三、保證數據處理和物理數據緊密相連。籠統的說就是咱們在處理數據的過程當中有不少步驟,在某些步驟執行完時應該就寫入數據,而不是等全部都處理完。
四、儘量減小系統資源的使用、尤爲是耗費大量資源的IO以及跨服務器引用,儘可能分配好數據處理的批次。
五、按期分析系統的IO使用狀況、SQL語句的執行狀況等,儘量的減小沒必要要的IO操做。優化的原則有:
- 儘可能在一次事物中對同一數據進行讀取或寫緩存。
- 一次事物中,儘量在開始就讀取全部須要使用的數據。
- 優化索引,觀察SQL的執行狀況,儘可能使用主鍵索引,儘可能避免全表掃描或過多的索引掃描。
- SQL中的where儘量經過主鍵查詢。
六、不要在批處理中對相同的數據執行2次相同的操做。
七、對於批處理程序而言應該在批處理啓動以前就分配足夠的內存,以避免處理的過程當中去從新申請新的內存頁。
八、對數據的完整性應該從最差的角度來考慮,每一步的處理都應該創建完備的數據校驗。
九、對於數據的總量咱們應該有一個和數據記錄在數據結構的某個字段 上。
十、全部的批處理系統都須要進行壓力測試。
十一、若是整個批處理的過程是基於文件系統,在處理的過程當中請切記完成文件的備份以及文件內容的校驗。
通用策略
和軟件開發的設計模式同樣,批處理也有各類各樣的現成模式可供參考。當一個開發(設計)人員開始執行批處理任務時,應該將業務邏輯拆分爲一下的步驟或者板塊分批執行:
-
數據轉換:某個(某些)批處理的外部數據可能來自不一樣的外部系統或者外部提供者,這些數據的結構千差萬別。在統一進行批量數據處理以前須要對這些數據進行轉換,合併爲一個統一的結構。所以在數據開始真正的執行業務處理以前,先要使用其餘的方法或者一些批處理任務將這些數據轉換爲統一的格式。
-
數據校驗:批處理是對大量數據進行處理,而且數據的來源千差萬別,因此批處理的輸入數據須要對數據的完整性性進行校驗(好比校驗字段數據是否缺失)。另外批處理輸出的數據也須要進行合適的校驗(例如處理了100條數據,校驗100條數據是否校驗成功)
-
提取數據:批處理的工做是逐條從數據庫或目標文件讀取記錄(records),提取時能夠經過一些規則從數據源中進行數據篩選。
-
數據實時更新處理:根據業務要求,對實時數據進行處理。某些時候一行數據記錄的處理須要綁定在一個事物之下。
-
輸出記錄到標準的文檔格式:數據處理完成以後須要根據格式寫入到對應的外部數據系統中。
以上五個步驟是一個標準的數據批處理過程,Spring batch框架爲業務實現提供了以上幾個功能入口。
數據額外處理
某些狀況須要實現對數據進行額外處理,在進入批處理以前經過其餘方式將數據進行處理。主要內容有:
排序:因爲批處理是以獨立的行數據(record)進行處理的,在處理的時候並不知道記錄先後關係。所以若是須要對總體數據進行排序,最好事先使用其餘方式完成。
分割:數據拆分也建議使用獨立的任務來完成。理由相似排序,由於批處理的過程都是以行記錄爲基本處理單位的,沒法再對分割以後的數據進行擴展處理。
合併:理由如上。
Spring Batch核心概念
Spring Batch在基礎架構層,把任務抽象爲Job和Step,一個Job由多個Step來完成,step就是每一個job要執行的單個步驟。
一、Job:是一個接口,接口中定義了一個做業是怎麼樣執行的
二、JobInstance:是job的一次執行,一個JobInstance可重複執行,若是上一次執行失敗下次執行的時候還會從新執行上次失敗的job,每一次執行就是一個JobExceution
三、JobParameters:做爲參數能夠用來啓動Job,而且能夠用來標識不一樣的Job,運行時提供給JobInstance,jonExceution根據狀態和參數決定下一次是否繼續執行
四、JobExceution:每一次嘗試執行一個Job的時候,咱們就能夠將其稱爲一個JobExceution,這個執行的結果能夠爲成功,也能夠爲失敗,例如一個JobInstance執行失敗了,下一次執行他傳入的參數是上次執行的時間,他將會繼續執行,這樣始終執行的是一個JobInstance,而產生了兩個JobExceution
五、Step:主要分爲兩塊
(1)Tasklet:是一個任務單元,它是屬於能夠重複利用的東西。接口其中包含了一個惟一的方法execute();
(2)Chunk-based:chunk就是數據塊,你須要定義多大的數據量是一個chunk。Chunk裏面就是不斷循環的一個流程,讀數據,處理數據,而後寫數據。Spring Batch會不斷的循環這個流程,直到批處理數據完成。
- ·itemReader:數據輸入input:對於一個Step而言,每次讀取一個條目;
- ·itemProcessor:數據處理processing
- ·ItemWriter:數據輸出output:對於一個Step而言,每次根據設定輸出批量一個條目;
六、StepExecution:一個Step的每一次嘗試執行,都會建立一個StepExection,在一個Step實際開始執行的時候建立
七、ExecutionContext:執行上下文,表明的是一個key-value鍵值對的集合,能夠被Spring框架進行在持久化管理,可以是開發人員存儲持久化狀態,每個JobExecution以及每個StepExecution的執行都會對應一個執行上下文(ExecutionContext);對於StepExecution在每一次提交點時就會保存一下執行上下文,而對於Job是在每個StepExecution執行之間進行保存,例如,咱們從Step1換到Step2是就會保存;
八、JobLauncher:接口,用於啓動和加載Job,根據傳入的參數進行啓動,返回Job一次執行的狀況
九、JobRepository:Job及Job的運行結果和狀態、Step的運行結果和狀態,都會保存在JobRepository中。
概念說明可見下表:
領域對象 | 描述 |
---|---|
JobRepository | 做業倉庫,保存Job、Step執行過程當中的狀態及結果 |
JobLauncher | 做業執行器,是執行Job的入口 |
Job | 一個批處理任務,由一個或多個Step組成 |
Step | 一個任務的具體的執行邏輯單位 |
Item | 一條數據記錄 |
ItemReader | 從數據源讀數據 |
ItemProcessor | 對數據進行處理,如數據清洗、轉換、過濾、校驗等 |
ItemWriter | 寫入數據到指定目標 |
Chunk | 給定數量的Item集合,如讀取到chunk數量後,才進行寫操做 |
Tasklet | Step中具體執行邏輯,可重複執行 |
Spring Batch數據表
batch_job_instance:這張表能看到每次運行的job名字。
batch_job_execution:這張表能看到每次運行job的開始時間,結束時間,狀態,以及失敗後的錯誤消息是什麼。
batch_step_execution:這張表你能看到更多關於step的詳細信息。好比step的開始時間,結束時間,提交次數,讀寫次數,狀態,以及失敗後的錯誤信息等。 圖片描述
Job
簡單的說Job是封裝一個批處理過程的實體,與其餘的Spring項目相似,Job能夠經過XML或Java類配置,稱爲「Job Configuration」。以下圖Job是單個批處理的最頂層。
爲了便於理解,能夠簡單的將Job理解爲是每一步(Step)實例的容器。他結合了多個Step,爲它們提供統一的服務同時也爲Step提供個性化的服務,好比步驟重啓。一般狀況下Job的配置包含如下內容:
- Job的名稱
- 定義和排序Step執行實例。
- 標記每一個Step是否能夠重啓。
Spring Batch爲Job接口提供了默認的實現——SimpleJob,其中實現了一些標準的批處理方法。下面的代碼展現瞭如可注入一個Job。
@Bean public Job testJob() { return this.jobBuilderFactory.get("testJob") //get中命名了Job的名稱 .start(stepOne()) .next(stepTwo()) .next(stepThree()) .end() .build(); }
JobInstance
JobInstance是指批處理做業運行的實例。
例如一個批處理必須在天天執行一次,系統在2019年5月1日執行了一次咱們稱之爲2019-05-01的實例,相似的還會有2019-05-0二、2019-05-03實例。
一般狀況下,一個JobInstance對應一個JobParameters,對應多個JobExecution。(JobParameters、JobExecution見後文)。同一個JobInstance具備相同的上下文(ExecutionContext內容見後文)。
JobParameters
前面討論了JobInstance與Job的區別,可是具體的區別內容都是經過JobParameters體現的。一個JobParameters對象中包含了一系列Job運行相關的參數,這些參數能夠用於參考或者用於實際的業務使用。對應的關係以下圖:
當咱們執行2個不一樣的JobInstance時JobParameters中的屬性都會有差別。能夠簡單的認爲一個JobInstance的標識就是Job+JobParameters。
JobExecution
JobExecution能夠理解爲單次運行Job的容器。一次JobInstance執行的結果多是成功、也多是失敗。可是對於Spring Batch框架而言,只有返回運行成功纔會視爲完成一次批處理。
例如2019-05-01執行了一次JobInstance,可是執行的過程失敗,所以第二次還會有一個「相同的」的JobInstance被執行。
Job用於定義批處理如何執行,JobInstance純粹的就是一個處理對象,把全部的運行內容和信息組織在一塊兒,主要是爲了當面臨問題時定義正確的重啓參數。而JobExecution是運行時的「容器」,記錄動態運行時的各類屬性和上線文。
他包括的信息有:
屬性 | 說明 |
---|---|
status | 狀態類名爲BatchStatus,它指示了執行的狀態。在執行的過程當中狀態爲BatchStatus#STARTED,失敗:BatchStatus#FAILED,完成:BatchStatus#COMPLETED |
startTime | java.util.Date對象,標記批處理任務啓動的系統時間,批處理任務未啓動數據爲空 |
endTime | java.util.Date對象,結束時間不管是否成功都包含該數據,如未處理完爲空 |
exitStatus | ExitStatus類,記錄運行結果。 |
createTime | java.util.Date,JobExecution的建立時間,某些使用execution已經建立可是並未開始運行。 |
lastUpdate | java.util.Date,最後一次更新時間 |
executionContext | 批處理任務執行的全部用戶數據 |
failureExceptions | 記錄在執行Job時的異常,對於排查問題很是有用 |
以上這些內容Spring Batch都會經過JobRepository進行持久化(這些信息官方文成稱之爲MetaData),所以在對應的數據源中能夠看到下列信息:
BATCH_JOB_INSTANCE:
JOB_INST_ID | JOB_NAME |
---|---|
1 | EndOfDayJob |
BATCH_JOB_EXECUTION_PARAMS:
JOB_EXECUTION_ID | TYPE_CD | KEY_NAME | DATE_VAL | IDENTIFYING |
---|---|---|---|---|
1 | DATE | schedule.Date | 2019-01-01 | TRUE |
BATCH_JOB_EXECUTION:
JOB_EXEC_ID | JOB_INST_ID | START_TIME | END_TIME | STATUS |
---|---|---|---|---|
1 | 1 | 2019-01-01 21:00 | 2017-01-01 21:30 | FAILED |
當某個Job批處理任務失敗以後會在對應的數據庫表中路對應的狀態。假設1月1號執行的任務失敗,技術團隊花費了大量的時間解決這個問題到了次日才繼續執行這個任務。
BATCH_JOB_INSTANCE:
JOB_INST_ID | JOB_NAME |
---|---|
1 | EndOfDayJob |
2 | EndOfDayJob |
BATCH_JOB_EXECUTION_PARAMS:
JOB_EXECUTION_ID | TYPE_CD | KEY_NAME | DATE_VAL | IDENTIFYING |
---|---|---|---|---|
1 | DATE | schedule.Date | 2019-01-01 | TRUE |
2 | DATE | schedule.Date | 2019-01-01 | TRUE |
3 | DATE | schedule.Date | 2019-01-02 | TRUE |
BATCH_JOB_EXECUTION:
JOB_EXEC_ID | JOB_INST_ID | START_TIME | END_TIME | STATUS |
---|---|---|---|---|
1 | 1 | 2019-01-01 21:00 | 2017-01-01 21:30 | FAILED |
2 | 1 | 2019-01-02 21:00 | 2017-01-02 21:30 | COMPLETED |
3 | 2 | 2019-01-02 21:31 | 2017-01-02 22:29 | COMPLETED |
從數據上看好似JobInstance是一個接一個順序執行的,可是對於Spring Batch並無進行任何控制。不一樣的JobInstance頗有多是同時在運行(相同的JobInstance同時運行會拋出JobExecutionAlreadyRunningException異常)。
Step
Step是批處理重複運行的最小單元,它按照順序定義了一次執行的必要過程。
所以每一個Job能夠視做由一個或多個多個Step組成。一個Step包含了全部全部進行批處理的必要信息,這些信息的內容是由開發人員決定的並無統一的標準。一個Step能夠很簡單,也能夠很複雜。他能夠是複雜業務的組合,也有可能僅僅用於遷移數據。
與JobExecution的概念相似,Step也有特定的StepExecution,關係結構以下:
StepExecution
StepExecution表示單次執行Step的容器,每次Step執行時都會有一個新的StepExecution被建立。與JobExecution不一樣的是,當某個Step執行失敗後默認並不會從新執行。StepExecution包含如下屬性:
屬性 | 說明 |
---|---|
status | 狀態類名爲BatchStatus,它指示了執行的狀態。在執行的過程當中狀態爲BatchStatus#STARTED,失敗:BatchStatus#FAILED,完成:BatchStatus#COMPLETED |
startTime | java.util.Date對象,標記StepExecution啓動的系統時間,未啓動數據爲空 |
endTime | java.util.Date對象,結束時間,不管是否成功都包含該數據,如未處理完爲空 |
exitStatus | ExitStatus類,記錄運行結果。 |
createTime | java.util.Date,JobExecution的建立時間,某些使用execution已經建立可是並未開始運行。 |
lastUpdate | java.util.Date,最後一次更新時間 |
executionContext | 批處理任務執行的全部用戶數據 |
readCount | 成功讀取數據的次數 |
wirteCount | 成功寫入數據的次數 |
commitCount | 成功提交數據的次數 |
rollbackCount | 迴歸數據的次數,有業務代碼觸發 |
readSkipCount | 當讀數據發生錯誤時跳過處理的次數 |
processSkipCount | 當處理過程發生錯誤,跳過處理的次數 |
filterCount | 被過濾規則攔截未處理的次數 |
writeSkipCount | 寫數據失敗,跳過處理的次數 |
ExecutionContext
前文已經屢次提到ExecutionContext。能夠簡單的認爲ExecutionContext提供了一個Key/Value機制,在StepExecution和JobExecution對象的任何位置均可以獲取到ExecutionContext中的任何數據。最有價值的做用是記錄數據的執行位置,以便發生重啓時候從對應的位置繼續執行:
executionContext.putLong(getKey(LINES_READ_COUNT), reader.getPosition())
好比在任務中有一個名爲「loadData」的Step,他的做用是從文件中讀取數據寫入到數據庫,當第一次執行失敗後,數據庫中有以下數據:
BATCH_JOB_INSTANCE:
JOB_INST_ID | JOB_NAME |
---|---|
1 | EndOfDayJob |
BATCH_JOB_EXECUTION_PARAMS:
JOB_INST_ID | TYPE_CD | KEY_NAME | DATE_VAL |
---|---|---|---|
1 | DATE | schedule.Date | 2019-01-01 |
BATCH_JOB_EXECUTION:
JOB_EXEC_ID | JOB_INST_ID | START_TIME | END_TIME | STATUS |
---|---|---|---|---|
1 | 1 | 2017-01-01 21:00 | 2017-01-01 21:30 | FAILED |
BATCH_STEP_EXECUTION:
STEP_EXEC_ID | JOB_EXEC_ID | STEP_NAME | START_TIME | END_TIME | STATUS |
---|---|---|---|---|---|
1 | 1 | loadData | 2017-01-01 21:00 | 2017-01-01 21:30 | FAILED |
BATCH_STEP_EXECUTION_CONTEXT:
STEP_EXEC_ID | SHORT_CONTEXT |
---|---|
1 | {piece.count=40321} |
在上面的例子中,Step運行30分鐘處理了40321個「pieces」,咱們姑且認爲「pieces」表示行間的行數(實際就是每一個Step完成循環處理的個數)。
這個值會在每一個commit以前被更新記錄在ExecutionContext中(更新須要用到StepListener後文會詳細說明)。
當咱們再次重啓這個Job時並記錄在BATCH_STEP_EXECUTION_CONTEXT中的數據會加載到ExecutionContext中,這樣當咱們繼續執行批處理任務時能夠從上一次中斷的位置繼續處理。
例以下面的代碼在ItemReader中檢查上次執行的結果,並從中斷的位置繼續執行:
if (executionContext.containsKey(getKey(LINES_READ_COUNT))) { log.debug("Initializing for restart. Restart data is: " + executionContext); long lineCount = executionContext.getLong(getKey(LINES_READ_COUNT)); LineReader reader = getReader(); Object record = ""; while (reader.getPosition() < lineCount && record != null) { record = readLine(); } }
ExecutionContext是根據JobInstance進行管理的,所以只要是相同的實例都會具有相同的ExecutionContext(不管是否中止)。此外經過如下方法均可以得到一個ExecutionContext:
ExecutionContext ecStep = stepExecution.getExecutionContext(); ExecutionContext ecJob = jobExecution.getExecutionContext();
可是這2個ExecutionContext並不相同,前者是在一個Step中每次Commit數據之間共享,後者是在Step與Step之間共享。
JobRepository
JobRepository是全部前面介紹的對象實例的持久化機制。他爲JobLauncher、Job、Step的實現提供了CRUD操做。當一個Job第一次被啓動時,一個JobExecution會從數據源中獲取到,同時在執行的過程當中StepExecution、JobExecution的實現都會記錄到數據源中。使用@EnableBatchProcessing註解後JobRepository會進行自動化配置。
JobLauncher
JobLauncher爲Job的啓動運行提供了一個邊界的入口,在啓動Job的同時還能夠定製JobParameters:
public interface JobLauncher { public JobExecution run(Job job, JobParameters jobParameters) throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, JobParametersInvalidException; }
參考: