本文將從0到1講解一個Spring Batch是如何搭建並運行起來的。
本教程將講解從一個文本文件讀取數據,而後寫入MySQL。java
Spring Batch 做爲 Spring 的子項目,是一款基於 Spring 的企業批處理框架。經過它能夠構建出健壯的企業批處理應用。Spring Batch 不只提供了統一的讀寫接口、豐富的任務處理方式、靈活的事務管理及併發處理,同時還支持日誌、監控、任務重啓與跳過等特性,大大簡化了批處理應用開發,將開發人員從複雜的任務配置管理過程當中解放出來,使他們能夠更多地去關注核心的業務處理過程。mysql
更多的介紹能夠參考官網:https://spring.io/projects/sp...spring
我是用的Intellij Idea,用gradle構建。sql
可使用Spring Initializr 來建立Spring boot應用。地址:https://start.spring.io/數據庫
首先選擇Gradle Project,而後選擇Java。填上你的Group和Artifact名字。json
最後再搜索你須要用的包,好比Batch是必定要的。另外,因爲我寫的Batch項目是使用JPA向MySQL插入數據,因此也添加了JPA和MySQL。其餘能夠根據本身須要添加。併發
點擊Generate Project,一個項目就建立好了。app
Build.gralde文件大概就長這個樣子:框架
buildscript { ext { springBootVersion = '2.0.4.RELEASE' } repositories { mavenCentral() } dependencies { classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}") } } apply plugin: 'java' apply plugin: 'idea' apply plugin: 'org.springframework.boot' apply plugin: 'io.spring.dependency-management' group = 'com.demo' version = '0.0.1-SNAPSHOT' sourceCompatibility = 1.8 repositories { mavenCentral() } dependencies { compile('org.springframework.boot:spring-boot-starter-batch') compile('org.springframework.boot:spring-boot-starter-jdbc') compile("org.springframework.boot:spring-boot-starter-data-jpa") compile group: 'com.fasterxml.jackson.datatype', name: 'jackson-datatype-joda', version: '2.9.4' compile group: 'org.jadira.usertype', name: 'usertype.core', version: '6.0.1.GA' compile group: 'mysql', name: 'mysql-connector-java', version: '6.0.6', testCompile('org.springframework.boot:spring-boot-starter-test') testCompile('org.springframework.batch:spring-batch-test') }
網上有不少Spring Batch結構和原理的講解,我就不詳細闡述了,我這裏只講一下Spring Batch的一個基本層級結構。maven
首先,Spring Batch運行的基本單位是一個Job,一個Job就作一件批處理的事情。
一個Job包含不少Step,step就是每一個job要執行的單個步驟。
以下圖所示,Step裏面,會有Tasklet,Tasklet是一個任務單元,它是屬於能夠重複利用的東西。
而後是Chunk,chunk就是數據塊,你須要定義多大的數據量是一個chunk。
Chunk裏面就是不斷循環的一個流程,讀數據,處理數據,而後寫數據。Spring Batch會不斷的循環這個流程,直到批處理數據完成。
首先,咱們須要一個全局的Configuration來配置全部的Job和一些全局配置。
代碼以下:
@Configuration @EnableAutoConfiguration @EnableBatchProcessing(modular = true) public class SpringBatchConfiguration { @Bean public ApplicationContextFactory firstJobContext() { return new GenericApplicationContextFactory(FirstJobConfiguration.class); } @Bean public ApplicationContextFactory secondJobContext() { return new GenericApplicationContextFactory(SecondJobConfiguration.class); } }
@EnableBatchProcessing是打開Batch。若是要實現多Job的狀況,須要把EnableBatchProcessing註解的modular設置爲true,讓每一個Job使用本身的ApplicationConext。
好比上面代碼的就建立了兩個Job。
本博客的例子是遷移數據,數據源是一個文本文件,數據量是上百萬條,一行就是一條數據。而後咱們經過Spring Batch幫咱們把文本文件的數據所有遷移到MySQL數據庫對應的表裏面。
假設咱們遷移的數據是Message,那麼咱們就須要提早建立一個叫Message的和數據庫映射的數據類。
@Entity @Table(name = "message") public class Message { @Id @Column(name = "object_id", nullable = false) private String objectId; @Column(name = "content") private String content; @Column(name = "last_modified_time") private LocalDateTime lastModifiedTime; @Column(name = "created_time") private LocalDateTime createdTime; }
首先咱們須要一個關於這個Job的Configuration,它將在SpringBatchConfigration裏面被加載。
@Configuration @EnableAutoConfiguration @EnableBatchProcessing(modular = true) public class SpringBatchConfiguration { @Bean public ApplicationContextFactory messageMigrationJobContext() { return new GenericApplicationContextFactory(MessageMigrationJobConfiguration.class); } }
下面的關於構建Job的代碼都將寫在這個MessageMigrationJobConfiguration裏面。
public class MessageMigrationJobConfiguration { }
咱們先定義一個Job的Bean。
@Autowired private JobBuilderFactory jobBuilderFactory; @Bean public Job messageMigrationJob(@Qualifier("messageMigrationStep") Step messageMigrationStep) { return jobBuilderFactory.get("messageMigrationJob") .start(messageMigrationStep) .build(); }
jobBuilderFactory是注入進來的,get裏面的就是job的名字。
這個job只有一個step。
接下來就是建立Step。
@Autowired private StepBuilderFactory stepBuilderFactory; @Bean public Step messageMigrationStep(@Qualifier("jsonMessageReader") FlatFileItemReader<Message> jsonMessageReader, @Qualifier("messageItemWriter") JpaItemWriter<Message> messageItemWriter, @Qualifier("errorWriter") Writer errorWriter) { return stepBuilderFactory.get("messageMigrationStep") .<Message, Message>chunk(CHUNK_SIZE) .reader(jsonMessageReader).faultTolerant().skip(JsonParseException.class).skipLimit(SKIP_LIMIT) .listener(new MessageItemReadListener(errorWriter)) .writer(messageItemWriter).faultTolerant().skip(Exception.class).skipLimit(SKIP_LIMIT) .listener(new MessageWriteListener()) .build(); }
stepBuilderFactory是注入進來的,而後get裏面是Step的名字。
咱們的Step中能夠構建不少東西,好比reader,processer,writer,listener等等。
下面咱們就逐個來看看step裏面的這些東西是如何使用的。
Spring batch在配置Step時採用的是基於Chunk的機制,即每次讀取一條數據,再處理一條數據,累積到必定數量後再一次性交給writer進行寫入操做。這樣能夠最大化的優化寫入效率,整個事務也是基於Chunk來進行。
好比咱們定義chunk size是50,那就意味着,spring batch處理了50條數據後,再統一貫數據庫寫入。
這裏有個很重要的點,chunk前面須要定義數據輸入類型和輸出類型,因爲咱們輸入是Message,輸出也是Message,因此兩個都直接寫Message了。
若是不定義這個類型,會報錯。
.<Message, Message>chunk(CHUNK_SIZE)
Reader顧名思義就是從數據源讀取數據。
Spring Batch給咱們提供了不少好用實用的reader,基本能知足咱們全部需求。好比FlatFileItemReader,JdbcCursorItemReader,JpaPagingItemReader等。也能夠本身實現Reader。
本例子裏面,數據源是文本文件,因此咱們就使用FlatFileItemReader。FlatFileItemReader是從文件裏面一行一行的讀取數據。
首先須要設置文件路徑,也就是設置resource。
由於咱們須要把一行文本映射爲Message類,因此咱們須要本身設置並實現LineMapper。
@Bean public FlatFileItemReader<Message> jsonMessageReader() { FlatFileItemReader<Message> reader = new FlatFileItemReader<>(); reader.setResource(new FileSystemResource(new File(MESSAGE_FILE))); reader.setLineMapper(new MessageLineMapper()); return reader; }
LineMapper的輸入就是獲取一行文本,和行號,而後轉換成Message。
在本例子裏面,一行文本就是一個json對象,因此咱們使用JsonParser來轉換成Message。
public class MessageLineMapper implements LineMapper<Message> { private MappingJsonFactory factory = new MappingJsonFactory(); @Override public Message mapLine(String line, int lineNumber) throws Exception { JsonParser parser = factory.createParser(line); Map<String, Object> map = (Map) parser.readValueAs(Map.class); Message message = new Message(); ... // 轉換邏輯 return message; } }
因爲本例子裏面,數據是一行文本,經過reader變成Message的類,而後writer直接把Message寫入MySQL。因此咱們的例子裏面就不須要Processor,關於如何寫Processor其實和reader/writer是同樣的道理。
從它的接口能夠看出,須要定義輸入和輸出的類型,把輸入I經過某些邏輯處理以後,返回輸出O。
public interface ItemProcessor<I, O> { O process(I item) throws Exception; }
Writer顧名思義就是把數據寫入到目標數據源裏面。
Spring Batch一樣給咱們提供不少好用實用的writer。好比JpaItemWriter,FlatFileItemWriter,HibernateItemWriter,JdbcBatchItemWriter等。一樣也能夠自定義。
本例子裏面,使用的是JpaItemWriter,能夠直接把Message對象寫到數據庫裏面。可是須要設置一個EntityManagerFactory,能夠注入進來。
@Autowired private EntityManagerFactory entityManager; @Bean public JpaItemWriter<Message> messageItemWriter() { JpaItemWriter<Message> writer = new JpaItemWriter<>(); writer.setEntityManagerFactory(entityManager); return writer; }
另外,你須要配置數據庫的鏈接等東西。因爲我使用的spring,因此直接在Application.properties裏面配置以下:
spring.datasource.url=jdbc:mysql://database spring.datasource.username=username spring.datasource.password=password spring.datasource.driverClassName=com.mysql.cj.jdbc.Driver spring.jpa.database-platform=org.hibernate.dialect.MySQLDialect spring.jpa.show-sql=true spring.jpa.properties.jadira.usertype.autoRegisterUserTypes=true spring.jackson.serialization.write-dates-as-timestamps=false spring.batch.initialize-schema=ALWAYS spring.jpa.hibernate.ddl-auto=update
spring.datasource相關的設置都是在配置數據庫的鏈接。
spring.batch.initialize-schema=always表示讓spring batch在數據庫裏面建立默認的數據表。
spring.jpa.show-sql=true表示在控制檯輸出hibernate讀寫數據庫時候的SQL。
spring.jpa.database-platform=org.hibernate.dialect.MySQLDialect是在指定MySQL的方言。
Spring Batch一樣實現了很是完善全面的listener,listener很好理解,就是用來監聽每一個步驟的結果。好比能夠有監聽step的,有監聽job的,有監聽reader的,有監聽writer的。沒有你找不到的listener,只有你想不到的listener。
在本例子裏面,我只關心,read的時候有沒有出錯,和write的時候有沒有出錯,因此,我只實現了ReadListener和WriteListener。
在read出錯的時候,把錯誤結果寫入一個單獨的error列表文件中。
public class MessageItemReadListener implements ItemReadListener<Message> { private Writer errorWriter; public MessageItemReadListener(Writer errorWriter) { this.errorWriter = errorWriter; } @Override public void beforeRead() { } @Override public void afterRead(Message item) { } @Override public void onReadError(Exception ex) { errorWriter.write(format("%s%n", ex.getMessage())); } }
在write出錯的時候,也作一樣的事情,把出錯的緣由寫入單獨的日誌中。
public class MessageWriteListener implements ItemWriteListener<Message> { @Autowired private Writer errorWriter; @Override public void beforeWrite(List<? extends Message> items) { } @Override public void afterWrite(List<? extends Message> items) { } @Override public void onWriteError(Exception exception, List<? extends Message> items) { errorWriter.write(format("%s%n", exception.getMessage())); for (Message message : items) { errorWriter.write(format("Failed writing message id: %s", message.getObjectId())); } } }
前面有說chuck機制,因此write的listener傳入參數是一個List,由於它是累積到必定的數量才一塊兒寫入。
Spring Batch提供了skip的機制,也就是說,若是出錯了,能夠跳過。若是你不設置skip,那麼一條數據出錯了,整個job都會掛掉。
設置skip的時候必定要設置什麼Exception才須要跳過,而且跳過多少條數據。若是失敗的數據超過你設置的skip limit,那麼job就會失敗。
你能夠分別給reader和writer等設置skip機制。
writer(messageItemWriter).faultTolerant().skip(Exception.class).skipLimit(SKIP_LIMIT)
這個和Skip是同樣的原理,就是失敗以後能夠重試,你一樣須要設置重試的次數。
一樣能夠分別給reader,writer等設置retry機制。
若是同時設置了retry和skip,會先重試全部次數,而後再開始skip。好比retry是10次,skip是20,會先重試10次以後,再開始算第一次skip。
全部東西都準備好之後,就是如何運行了。
運行就是在main方法裏面用JobLauncher去運行你制定的job。
下面是我寫的main方法,main方法的第一個參數是job的名字,這樣咱們就能夠經過不一樣的job名字跑不一樣的job了。
首先咱們經過運行起來的Spring application獲得jobRegistry,而後經過job的名字找到對應的job。
接着,咱們就能夠用jobLauncher去運行這個job了,運行的時候會傳一些參數,好比你job裏面須要的文件路徑或者文件日期等,就能夠經過這個jobParameters傳進去。若是沒有參數,能夠默認傳當前時間進去。
public static void main(String[] args) { String jobName = args[0]; try { ConfigurableApplicationContext context = SpringApplication.run(ZuociBatchApplication.class, args); JobRegistry jobRegistry = context.getBean(JobRegistry.class); Job job = jobRegistry.getJob(jobName); JobLauncher jobLauncher = context.getBean(JobLauncher.class); JobExecution jobExecution = jobLauncher.run(job, createJobParams()); if (!jobExecution.getExitStatus().equals(ExitStatus.COMPLETED)) { throw new RuntimeException(format("%s Job execution failed.", jobName)); } } catch (Exception e) { throw new RuntimeException(format("%s Job execution failed.", jobName)); } } private static JobParameters createJobParams() { return new JobParametersBuilder().addDate("date", new Date()).toJobParameters(); }
最後,把jar包編譯出來,在命令行執行下面的命令,就能夠運行你的Spring Batch了。
java -jar YOUR_BATCH_NAME.jar YOUR_JOB_NAME
調試主要依靠控制檯輸出的log,能夠在application.properties裏面設置log輸出的級別,好比你但願輸出INFO信息仍是DEBUG信息。
基本上,經過查看log都能定位到問題。
logging.path=build/logs logging.file=${logging.path}/batch.log logging.level.com.easystudio=INFO logging.level.root=INFO log4j.logger.org.springframework.jdbc=INFO log4j.logger.org.springframework.batch=INFO logging.level.org.hibernate.SQL=INFO
若是你的batch最終會寫入數據庫,那麼Spring Batch會默認在你的數據庫裏面建立一些batch相關的表,來記錄全部job/step運行的狀態和結果。
大部分表你都不須要關心,你只須要關心幾張表。
batch_job_instance:這張表能看到每次運行的job名字。
batch_job_execution:這張表能看到每次運行job的開始時間,結束時間,狀態,以及失敗後的錯誤消息是什麼。
batch_step_execution:這張表你能看到更多關於step的詳細信息。好比step的開始時間,結束時間,提交次數,讀寫次數,狀態,以及失敗後的錯誤信息等。
Spring Batch爲咱們提供了很是實用的功能,對批處理場景進行了完善的抽象,它不只能實現小數據的遷移,也能應對大企業的大數據實踐應用。它讓咱們開發批處理應用能夠事半功倍。
最後一個tips,搭建Spring Batch的過程當中,會遇到各類各樣的問題。只要善用Google,都能找到答案。