在大型企業中,因爲業務複雜、數據量大、數據格式不一樣、數據交互格式繁雜,並不是全部的操做都能經過交互界面進行處理。而有一些操做須要按期讀取大批量的數據,而後進行一系列的後續處理。這樣的過程就是「批處理」。web
批處理應用一般有如下特色:spring
- 數據量大,從數萬到數百萬甚至上億不等;
- 整個過程所有自動化,並預留必定接口進行自定義配置;
- 這樣的應用一般是週期性運行,好比按日、周、月運行;
- 對數據處理的準確性要求高,而且須要容錯機制、回滾機制、完善的日誌監控等。
什麼是Spring batch
Spring batch是一個輕量級的全面的批處理框架,它專爲大型企業而設計,幫助開發健壯的批處理應用。Spring batch爲處理大批量數據提供了不少必要的可重用的功能,好比日誌追蹤、事務管理、job執行統計、重啓job和資源管理等。同時它也提供了優化和分片技術用於實現高性能的批處理任務。sql
它的核心功能包括:數據庫
- 事務管理
- 基於塊的處理過程
- 聲明式的輸入/輸出操做
- 啓動、終止、重啓任務
- 重試/跳過任務
- 基於Web的管理員接口
筆者所在的部門屬於國外某大型金融公司的CRM部門,在平常工做中咱們常常須要開發一些批處理應用,對Spring Batch有着豐富的使用經驗。近段時間筆者特地總結了這些經驗。安全
使用Spring Batch 3.0以及Spring Boot
在使用Spring Batch時推薦使用最新的Spring Batch 3.0版本。相比Spring Batch2.2,它作了如下方面的提高:多線程
- 支持JSR-352標準
- 支持Spring4以及Java8
- 加強了Spring Batch Integration的功能
- 支持JobScope
- 支持SQLite
支持Spring4和Java8是一個重大的提高。這樣就可使用Spring4引入的Spring boot組件,從而開發效率方面有了一個質的飛躍。引入Spring-batch框架只須要在build.gradle中加入一行代碼便可:app
1
|
compile("org.springframework.boot:spring-boot-starter-batch") |
而加強Spring Batch Integration的功能後,咱們就能夠很方便的和Spring家族的其餘組件集成,還能夠以多種方式來調用job,也支持遠程分區操做以及遠程塊處理。框架
而支持JobScope後咱們能夠隨時爲對象注入當前Job實例的上下文信息。只要咱們制定Bean的scope爲job scope,那麼就能夠隨時使用jobParameters和jobExecutionContext等信息。運維
1 2 3 4 5 6 7 |
<bean id="..." class="..." scope="job"> <property name="name" value="#{jobParameters[input]}" /> </bean> <bean id="..." class="..." scope="job"> <property name="name" value="#{jobExecutionContext['input.name']}.txt" /> </bean> |
使用Java Config而不是xml的配置方式
以前咱們在配置job和step的時候都習慣用xml的配置方式,可是隨着時間的推移發現問題頗多。async
- xml文件數急劇膨脹,配置塊長且複雜,可讀性不好;
- xml文件缺乏語法檢查,有些低級錯誤只有在運行集成測試的時候才能發現;
- 在xml文件中進行代碼跳轉時IDE的支持力度不夠;
咱們漸漸發現使用純Java類的配置方式更靈活,它是類型安全的,並且IDE的支持更好。在構建job或step時採用的流式語法相比xml更加簡潔易懂。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
@Bean public Step step(){ return stepBuilders.get("step") .<Partner,Partner>chunk(1) .reader(reader()) .processor(processor()) .writer(writer()) .listener(logProcessListener()) .faultTolerant() .skipLimit(10) .skip(UnknownGenderException.class) .listener(logSkipListener()) .build(); } |
在這個例子中能夠很清楚的看到該step的配置,好比reader/processor/writer組件,以及配置了哪些listener等。
本地集成測試中使用內存數據庫
Spring batch在運行時須要數據庫支持,由於它須要在數據庫中創建一套schema來存儲job和step運行的統計信息。而在本地集成測試中咱們能夠藉助Spring batch提供的內存Repository來存儲Spring batch的任務執行信息,這樣即避免了在本地配置一個數據庫,又能夠加快job的執行。
1 2 3 4 |
<bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean"> <property name="transactionManager" ref="transactionManager"/> </bean> |
咱們在build.gradle中加入對hsqldb的依賴:
1
|
runtime(‘org.hsqldb:hsqldb:2.3.2’) |
而後在測試類中添加對DataSource的配置。
1 2 3 4 5 6 7 |
@EnableAutoConfiguration @EnableBatchProcessing @DataJpaTest @Import({DataSourceAutoConfiguration.class, BatchAutoConfiguration.class}) public class TestConfiguration { } |
而且在applicaton.properties配置中添加初始化Database的配置:
1
|
spring.batch.initializer.enable=true |
合理的使用Chunk機制
Spring batch在配置Step時採用的是基於Chunk的機制。即每次讀取一條數據,再處理一條數據,累積到必定數量後再一次性交給writer進行寫入操做。這樣能夠最大化的優化寫入效率,整個事務也是基於Chunk來進行。
當咱們在須要將數據寫入到文件、數據庫中之類的操做時能夠適當設置Chunk的值以知足寫入效率最大化。但有些場景下咱們的寫入操做實際上是調用一個web service或者將消息發送到某個消息隊列中,那麼這些場景下咱們就須要設置Chunk的值爲1,這樣既能夠及時的處理寫入,也不會因爲整個Chunk中發生異常後,在重試時出現重複調用服務或者重複發送消息的狀況。
使用Listener來監視job執行狀況並及時作相應的處理
Spring batch提供了大量的Listener來對job的各個執行環節進行全面的監控。
在job層面Spring batch提供了JobExecutionListener接口,其支持在Job開始或結束時進行一些額外處理。在step層面Spring batch提供了StepExecutionListener,ChunkListener,ItemReadListener,ItemProcessListener,ItemWriteListener,SkipListener等接口,同時對Retry和Skip操做也提供了RetryListener及SkipListener。
一般咱們會爲每一個job都實現一個JobExecutionListener,在afterJob操做中咱們輸出job的執行信息,包括執行時間、job參數、退出代碼、執行的step以及每一個step的詳細信息。這樣不管是開發、測試仍是運維人員對整個job的執行狀況瞭如指掌。
若是某個step會發生skip的操做,咱們也會爲其實現一個SkipListener,並在其中記錄skip的數據條目,用於下一步的處理。
實現Listener有兩種方式,一種是繼承自相應的接口,好比繼承JobExecutionListener接口,另外一種是使用annoation(註解)的方式。通過實踐咱們認爲使用註解的方式更好一些,由於使用接口你須要實現接口的全部方法,而使用註解則只須要對相應的方法添加annoation便可。
下面的這個類採用了繼承接口的方式,咱們看到其實咱們只用到了第一個方法,第二個和第三個都沒有用到。可是咱們必須提供一個空的實現。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
public class CustomSkipListener implements SkipListener<String, String> { @Override public void onSkipInRead(Throwable t) { // business logic } @Override public void onSkipInWrite(String item, Throwable t) { // no need } @Override public void onSkipInProcess(String item, Throwable t) { // no need } } |
而使用annoation的方式能夠簡寫爲:
1 2 3 4 5 6 7 |
public class CustomSkipListener { @OnSkipInRead public void onSkipInRead(Throwable t) { // business logic } } |
使用Retry和Skip加強批處理工做的健壯性
在處理百萬級的數據過程過程當中不免會出現異常。若是一旦出現異常而致使整個批處理工做終止的話那麼會致使後續的數據沒法被處理。Spring Batch內置了Retry(重試)和Skip(跳過)機制幫助咱們輕鬆處理各類異常。適合Retry的異常的特色是這些異常可能會隨着時間推移而消失,好比數據庫目前有鎖沒法寫入、web服務當前不可用、web服務滿載等。因此對這些異常咱們能夠配置Retry機制。而有些異常則不該該配置Retry,好比解析文件出現異常等,由於這些異常即便Retry也會始終失敗。
即便Retry屢次仍然失敗也無需讓整個step失敗,能夠對指定的異常設置Skip選項從而保證後續的數據可以被繼續處理。咱們也能夠配置SkipLimit選項保證當Skip的數據條目達到必定數量後及時終止整個Job。
有時候咱們須要在每次Retry中間隔作一些操做,好比延長Retry時間,恢復操做現場等,Spring Batch提供了BackOffPolicy來達到目的。下面是一個配置了Retry機制、Skip機制以及BackOffPolicy的step示例。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
@Bean public Step step(){ return stepBuilders.get("step") .<Partner,Partner>chunk(1) .reader(reader()) .processor(processor()) .writer(writer()) .listener(logProcessListener()) .faultTolerant() .skipLimit(10) .skip(UnknownGenderException.class) .retryLimit(5) .retry(ServiceUnavailableException.class) .backOffPolicy(backoffPolicy) .listener(logSkipListener()) .build(); } |
使用自定義的Decider來實現Job flow
在Job執行過程當中不必定都是順序執行的,咱們常常須要根據某個job的輸出數據或執行結果來決定下一步的走向。之前咱們會把一些判斷放置在下游step中進行,這樣可能會致使有些step實際運行了,但其實並無作任何事情。好比一個step執行過程當中會將失敗的數據條目記錄到一個報告中,而下一個step會判斷有沒有生成報告,若是生成了報告則將該報告發送給指定聯繫人,若是沒有則不作任何事情。這種狀況下能夠經過Decider機制來實現Job的執行流程。在Spring batch 3.0中Decider已經從Step中獨立出來,和Step處於同一級別。
1 2 3 4 5 6 7 8 9 10 |
public class ReportDecider implements JobExecutionDecider { @Override public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) { if (report.isExist()) { return new FlowExecutionStatus(「SEND"); } return new FlowExecutionStatus(「SKIP"); } } |
而在job配置中能夠這樣來使用Decider。這樣整個Job的執行流程會更加清晰易懂。
1 2 3 4 5 6 7 8 |
public Job job() { return new JobBuilder("petstore") .start(orderProcess()) .next(reportDecider) .on("SEND").to(sendReportStep) .on("SKIP").end().build() .build() } |
採用多種機制加速Job的執行
批處理工做處理的數據量大,而執行窗口通常又要求比較小。因此必需要經過多種方式來加速Job的執行。通常咱們有四種方式來實現:
- 在單個step中多線程執行任務
- 並行執行不一樣的Step
- 並行執行同一個Step
- 遠程執行Chunk任務
在單個step多線程執行任務能夠藉助於taskExecutor來實現。這種狀況適合於reader、writer是線程安全的而且是無狀態的場景。咱們還能夠設置線程數量。
1 2 3 4 5 6 |
public Step step() { return stepBuilders.get("step") .tasklet(tasklet) .throttleLimit(20) .build(); } |
上述示例中的tasklet須要實現TaskExecutor,Spring Batch提供了一個簡單的多線程TaskExecutor供咱們使用:SimpleAsyncTaskExecutor。
並行執行不一樣的Step在Spring batch中很容易實現,如下是一個示例:
1 2 3 4 5 6 7 |
public Job job() { return stepBuilders.get("parallelSteps") .start(step1) .split(asyncTaskExecutor).add(flow1, flow2) .next(step3) .build(); } |
在這個示例中咱們先執行step1,而後並行執行flow1和flow2,最後再執行step3。
Spring batch提供了PartitionStep來實現對同一個step在多個進程中實現並行處理。經過PartitonStep再配合PartitionHandler能夠將一個step擴展到多個Slave上實現並行運行。
遠程執行Chunk任務則是將某個Step的processer操做分割到多個進程中,多個進程經過一些中間件進行通信(好比採用消息的方式)。這種方式適合於Processer是瓶頸而Reader和Writer不是瓶頸的場景。
結語
Spring Batch對批處理場景進行了合理的抽象,封裝了大量的實用功能,使用它來開發批處理應用能夠達到事半功倍的效果。在使用的過程當中咱們仍須要堅持總結一些最佳實踐,從而可以交付高質量的可維護的批處理應用,知足企業級應用的苛刻要求。