Spring batch是在Accenture(埃森哲)公司的批處理體系框架的基礎上,再由SpringSource團隊(原Interface21公司)大量參考和優化後所得的Java批處理產品。spring batch讓java大數據批處理的標準化變得更好更容易。
本技術文檔會以spring batch目前新穩定版本(2017年初的新穩定版本爲V3.0.7)爲基礎,重點介紹spring batch的高級特性,而且都是以最新的規範使用註解來配置。html
在使用Spring batch的時候,須要用到一些類和接口以及組件等,這裏給出簡單的介紹:java
名 稱 | 用 途 |
JobRepository | 用於註冊和存儲Job的容器 |
JobLauncher | 用於啓動Job |
Job | 實際要執行的做業,包含一個或多個step |
step | 步驟,批處理的步驟通常包含ItemReader, ItemProcessor, ItemWriter |
ItemReader | 從給定的數據源讀取item |
ItemProcessor | 在item寫入數據源以前進行數據整理 |
ItemWriter | 把Chunk中包含的item寫入數據源。 |
Chunk | 數據塊,給定數量的item集合,讓item進行屢次讀和處理,當知足必定數量的時候再一次寫入 |
TaskLet | 子任務表, step的一個事務過程,包含重複執行,同步/異步規則等。 |
1,Job的實例是Job的具體化,即做業,是由JobName + JobParameters來肯定惟一,若是JobName和JobParameters相同,則定義爲同一個Job實例。
2,相同的做業只能成功運行一次,若是須要再次運行,則須要改變JobParameters。
3,Job是由一個或者多個step組成,通常的,每一個step由一組ItemReader-ItemProcessor-ItemWriter組成。 將這些概念圖形化能夠獲得下面的幾張圖:mysql
Job 實例示意圖redis
------------------------------------------------------------------------------------------spring
Job與step的關係圖sql
-------------------------------------------------------------------------------------------數據庫
step內部結構關係圖架構
在Spring boot架構下的項目,引入Spring batch很是簡單,直接在pom.xml文件中,加入如下的依賴便可,參考配置以下:app
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId> </dependency>
Spring boot會自動爲咱們初始化spring batch的數據庫和數據表。當咱們程序啓動的時候,spring batch的job會持久化到數據庫中。
若是須要修改spring boot爲咱們而設置的默認配置,能夠前往application.yaml文件中,加入下面的配置,參考配置以下:框架
spring: batch: table-prefix: #這裏是設置spring batch數據庫表的前綴 initializer: enabled: true #這裏是容許自動初始化spring batch的數據庫 job: enabled: false #這裏是設置不會自動先執行一次定義的job
新版本的Spring一直提倡「約定優於配置」的觀點,因此對於之前xml形式的配置,都大部分改成了註解,Spring batch的新版本也同樣,許多配置均可以經過註解進行。Spring batch用到的註解除了@Bean、@Service、@Component外,還有@StepScope、@BeforeJob、@AfterJob等註解。
如下是一個簡單的Spring Batch Job的配置參考代碼:
@Configuration @EnableBatchProcessing public class BatchConfig { @Autowired private GlobalStepValueMap globalStepValueMap; @Autowired private MessageService messageService; @Autowired private RedisService redisService; @Autowired private ApplicationService applicationService; @Bean public JobRepository jobRepository(DataSource dataSource,PlatformTransactionManager transactionManager) throws Exception{ JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean(); jobRepositoryFactoryBean.setDataSource(dataSource); jobRepositoryFactoryBean.setTransactionManager(transactionManager); jobRepositoryFactoryBean.setDatabaseType("MYSQL"); return jobRepositoryFactoryBean.getObject(); } @Bean public SimpleJobLauncher jobLauncher(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception{ SimpleJobLauncher jobLauncher = new SimpleJobLauncher(); jobLauncher.setJobRepository(jobRepository(dataSource, transactionManager)); System.out.println(">>>>>>>>>>" + transactionManager.getClass()); return jobLauncher; } //------ ItemReader, ItemProcessor, ItemWriter ------ //讀數據 @Bean @StepScope public ListItemReader<MessageConfigBean> firstStepReader(@Value("#{jobParameters['request']}") String request) throws UnexpectedInputException, ParseException, NonTransientResourceException, Exception { System.out.println("------1st step Reader--------"); ...... List<MessageConfigBean> listMsgCfgBean = new ArrayList<MessageConfigBean>(); listMsgCfgBean.add(newMsgConfigBean); ListItemReader reader = new ListItemReader(listMsgCfgBean); return reader; } @Bean @StepScope public ListItemReader<String> secondStepReader(@Value("#{jobParameters['request']}") String request) throws UnexpectedInputException, ParseException, NonTransientResourceException, Exception { System.out.println("------2nd step Reader--------"); ...... ListItemReader reader = new ListItemReader(listAudiences); return reader; } //處理數據 @Bean @StepScope public MsgCfgToMsgModelItemProcessor firstStepProcessor(@Value("#{jobParameters['request']}") String request) throws JsonParseException, JsonMappingException, IOException { System.out.println("------1st step Processor--------"); ...... MsgCfgToMsgModelItemProcessor m2mProcessor = new MsgCfgToMsgModelItemProcessor(); m2mProcessor.setJobId(requestModel.getJob()); return m2mProcessor; } @Bean @StepScope public ItemProcessor secondStepProcessor(@Value("#{jobParameters['request']}") String request) throws JsonParseException, JsonMappingException, IOException { System.out.println("------2nd step Processor--------"); return new AliasesToFullMsgModelItemProcessor(); } //寫數據 @Bean @StepScope public ItemWriter<MessageBussinessBean> firstStepItemWriter(@Value("#{jobParameters['request']}") String request) throws JsonParseException, JsonMappingException, IOException { System.out.println("------1st step writer--------"); ...... MessageFullSetItemWriter writer = new MessageFullSetItemWriter(); writer.setRequestJobId(requestModel.getJob()); return writer; } @Bean @StepScope public ItemWriter<String> secondStepItemWriter(@Value("#{jobParameters['request']}") String request) throws JsonParseException, JsonMappingException, IOException { System.out.println("------2nd step writer--------"); ...... MQChannelModelItemWriter writer = new MQChannelModelItemWriter(); return writer; } //--------------- job & step ---------------- @Bean public Job messageCoreBatch(JobBuilderFactory jobs, @Qualifier("step1")Step firstStep, @Qualifier("step2")Step secondStep, JobExecutionListener listener) { return jobs.get("messageCoreBatch") .incrementer(new RunIdIncrementer()) .listener(listener) .start(firstStep).next(secondStep) .build(); } @Bean public Step step1(StepBuilderFactory stepBuilderFactory, @Qualifier("firstStepReader")ListItemReader<MessageConfigBean> reader, @Qualifier("firstStepItemWriter")ItemWriter<MessageBussinessBean> writer, @Qualifier("firstStepProcessor")ItemProcessor<MessageConfigBean, MessageBussinessBean> processor, StepListener stepListener) { return stepBuilderFactory.get("step1") .<MessageConfigBean, MessageBussinessBean> chunk(100) .reader(reader) .processor(processor) .writer(writer).listener(stepListener) .build(); } @Bean public Step step2(StepBuilderFactory stepBuilderFactory, @Qualifier("secondStepReader")ListItemReader<String> reader, @Qualifier("secondStepItemWriter")ItemWriter<String> writer, @Qualifier("secondStepProcessor")ItemProcessor<String, String> processor) { return stepBuilderFactory.get("step2") .<String, String> chunk(300) .reader(reader) .processor(processor) .writer(writer) .build(); } @Bean public PlatformTransactionManager transactionManager(DataSource dataSource) { return new DataSourceTransactionManager(dataSource); } @Bean public static JdbcTemplate jdbcTemplate(DataSource dataSource) { return new JdbcTemplate(dataSource); } }
代碼的具體意思會在後面的章節介紹。
經過查閱上面的代碼能夠看出,對於各個組件的定義,經過@Bean註解,Spring就能夠將其自動生成Spring batch的相關配置項,等待其餘程序的使用。其中@StepScope是說明該註解下的組件實行後綁定技術,即生成step的時候,才進行該註解下Bean的生成,這時候再進行參數的綁定,JobParameters也在這個時候才傳入。
Spring batch有以下幾個監聽器:
1)JobExecutionListener
2)StepExecutionListener
3)ChunkListener
4)ItemReadeListener
5)ItemProcessListener
6)ItemWriteListener
7)SkipListener
1~6中的每一種粒度的listener都有着對應於該粒度的before和after監聽方法。例如StepExecutionListener有beforeStep()和afterStep()監聽方法,分別用於監聽step啓動前和step運行後的那一時刻。對於4~6中,還額外有對應的onReadError(), onProcessError(), onWriteError()監聽方法。
而剩下的SkipListener則對應着有onSkipInRead(), onSkipInProcess(), onSkipInWrite()三種監聽方法。
因爲Spring batch的監聽器有許多種,但建立方法都十分類似,因此這裏只以StepExecutionListener爲例子,來創建Step粒度的監聽器。
建立本身的StepExecutionListener的方法,主要有實現StepExecutionListener接口,以及使用StepListener粒度的註解這兩種方法。因爲實現接口的方式,須要把全部的接口內的方法都須要實現一遍,不太靈活,因此使用註解的方法創建監聽器,會比較容易,即咱們想使用哪個監聽器的監聽方法,就在咱們的邏輯方法上面,加上該監聽器方法對應的註解便可。
下面是兩種建立方法的比較,參考代碼以下:
implements 接口方式:
public class NewStepListener implements StepExecutionListener { @Override public void beforeStep(StepExecution stepExecution) { // 寫入本身的beforeStep邏輯 } @Override public ExitStatus afterStep(StepExecution stepExecution) { // 寫入本身的afterStep邏輯 return null; } }
使用註解方式:
@Component public class NewStepListener { @BeforeStep public void testBeforeStep(){ // 寫入本身的beforeStep邏輯 } @AfterStep public void testBeforeRead(){ // 寫入本身的afterStep邏輯 } }
咱們甚至能夠只使用一個Class文件,把多種不一樣粒度的註解寫入,這樣就能夠一個class監聽器包含了多種監聽器的多個監聽方法。參考代碼:
public class NewStepListener{ @BeforeStep public void beforeStep() { // 寫入本身的beforeStep邏輯 } @BeforeRead public void afterStep() { // 寫入本身的BeforeRead邏輯 } @OnSkipInRead public void onSkipInRead(Throwable t) { // 寫入本身的SkipInRead邏輯 } @OnSkipInWrite public void onSkipInWrite(Object item, Throwable t) { // 寫入本身的SkipInWrite邏輯 } @BeforeWrite public void beforeWrite(List items) { // 寫入本身的BeforeWrite邏輯 } @AfterWrite public void afterWrite(List items) { // 寫入本身的AfterWrite邏輯 } @OnWriteError public void onWriteError(Exception exception, List items) { // 寫入本身的OnWriteError邏輯 } }
上面這個監聽器包含了多種粒度下的不一樣的監聽方法。
不一樣粒度的監聽器,須要放入不一樣位置。通常咱們在配置Spring batch的job和step的時候將監聽器放入。
例如job粒度的監聽器,是在spring batch的class配置文件BatchConfig中,配置job時放入,參考代碼以下:
jobs.get("messageCoreBatch") .incrementer(new RunIdIncrementer()) .listener(newJoblistener) .start(firstStep).next(secondStep) .build();
而對於step或者step之內的粒度的監聽器,在配置的時候,能夠放到Step中,參考代碼以下:
stepBuilderFactory.get("step1") .<MessageConfigBean, MessageBussinessBean> chunk(1) .reader(reader) .processor(processor) .writer(writer).listener(newStepListener) .build();
Spring batch的事務有以下的特色:
1)step之間事務獨立。
二、step劃分紅多個chunk執行,chunk事務彼此獨立,互不影響。
3)chunk定義,例若有chunk(N),即讀取N條數據做爲一個chunk,chunk開始開啓一個事務,正常結束提交。
4)事務提交條件:chunk執行正常,未拋RuntimeExecption。
5)默認狀況下,Reader、Processor、Writer拋出未捕獲RuntimeException,當前chunk事務回滾,step失敗,job失敗。
6)Spring batch 能夠設置retryLimit,即重試次數。若是重試達了指定次數,或者重試策略不知足時,step失敗,job失敗。
7)Spring batch 能夠設置skipLimit,即跳過次數。若是Spring batch 同時設置了retryLimit和skipLimit,則,當retryLimit次數達到後,則進行skip操做。若是重試次數達了指定次數,到或者重試策略不知足時,step失敗,job失敗。
這些概念能夠圖形化爲下面這幾張圖:
step中的事務示意圖:
監聽器組件的事務示意圖:
Spring的事務配置方法通常爲:
1)先配置好相關的事務管理器。
2)用註解 @EnableTransactionManagement 開啓事務支持。
3)在訪問數據庫的Service方法上添加註解 @Transactional並指定事務管理器。
而Spring batch的事務配置也與之相同,除此以外,還能夠在Job倉庫和JobLauncher配置中,直接指定好事務管理器,從而省略2~3的步驟。
咱們能夠在spring batch的class配置文件BatchConfig中,配置相關的事務管理器,參考代碼以下:
@Bean public PlatformTransactionManager transactionManager(DataSource dataSource) { return new DataSourceTransactionManager(dataSource); }
或者
@Bean public PlatformTransactionManager transactionManager(EntityManagerFactory entityMngFactory) { return new JpaTransactionManager(entityMngFactory); }
上面的代碼分別是兩種事務管理器,DataSourceTransactionManager 以及 JpaTransactionManager 。其中DataSourceTransactionManager針對的是JDBC資源的事務管理;JpaTransactionManager針對的是JPA資源的事務管理。若是咱們不進行配置,則Spring batch會查找相關配置,自動加入這兩個事務管理器中的其中一個。但其實除了這兩種事務管理器外,Spring還有其餘的幾種事務管理器,因此最好顯式配置。
仍是在spring batch的class配置文件BatchConfig中,咱們在下面的兩個方法中,分別將事務管理器加入到JobRepository和JobLauncher中。參考代碼以下:
@Bean public JobRepository jobRepository(DataSource dataSource,PlatformTransactionManager transactionManager) throws Exception{ JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean(); jobRepositoryFactoryBean.setDataSource(dataSource); jobRepositoryFactoryBean.setTransactionManager(transactionManager); jobRepositoryFactoryBean.setDatabaseType("MYSQL"); return jobRepositoryFactoryBean.getObject(); } @Bean public SimpleJobLauncher jobLauncher(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception{ SimpleJobLauncher jobLauncher = new SimpleJobLauncher(); jobLauncher.setJobRepository(jobRepository(dataSource, transactionManager)); return jobLauncher; }
上一節中,咱們已經配置好Spring batch的事務,結合3.1節中咱們的介紹能夠知道,Step中的chunk,以及chunk中的reader,processor,writer都是開啓了事務的。也就是說咱們只要再在spring batch的class配置文件BatchConfig中,配置好相關的step以及step的內部組件,那麼這些step的組件就會受到事務管理器的管理。
從新打開BatchConfig文件,寫入相關的step組件,參考代碼以下:
@Configuration @EnableBatchProcessing public class BatchConfig { @Bean public JobRepository jobRepository(DataSource dataSource,PlatformTransactionManager transactionManager) throws Exception{ JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean(); jobRepositoryFactoryBean.setDataSource(dataSource); jobRepositoryFactoryBean.setTransactionManager(transactionManager); jobRepositoryFactoryBean.setDatabaseType("MYSQL"); return jobRepositoryFactoryBean.getObject(); } @Bean public SimpleJobLauncher jobLauncher(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception{ SimpleJobLauncher jobLauncher = new SimpleJobLauncher(); jobLauncher.setJobRepository(jobRepository(dataSource, transactionManager)); System.out.println(">>>>>>>>>>" + transactionManager.getClass()); return jobLauncher; } ...... //讀數據 @Bean @StepScope public ListItemReader<String> stepForTranscationReader() throws UnexpectedInputException, ParseException, NonTransientResourceException, Exception { System.out.println("------tx step Reader--------"); List<String> indexVals = new ArrayList<String>(); indexVals.add("001"); indexVals.add("002"); indexVals.add("003"); indexVals.add("004"); indexVals.add("005"); indexVals.add("006"); indexVals.add("007"); indexVals.add("008008008008"); indexVals.add("009"); indexVals.add("010"); indexVals.add("011"); indexVals.add("012"); ListItemReader reader = new ListItemReader(indexVals); return reader; } ....... //處理數據 @Bean @StepScope public ItemProcessor stepForTranscationProcessor() throws JsonParseException, JsonMappingException, IOException { System.out.println("------tx step Processor--------"); return new StringToStringDoNotingProcessor(); } ...... //寫數據 @Bean @StepScope public ItemWriter<String> stepForTranscationWriter(JdbcTemplate jdbcTemplate) throws JsonParseException, JsonMappingException, IOException { System.out.println("------tx step writer--------"); TestTableWriter writer = new TestTableWriter(); writer.setJdbcTemplate(jdbcTemplate); return writer; } //---------------job & step---------------- ...... @Bean public Job testBatchTranscation(JobBuilderFactory jobs, @Qualifier("step1")Step firstStep, @Qualifier("step2")Step secondStep, @Qualifier("stepForTranscation")Step stepForTranscation, JobExecutionListener listener) { return jobs.get("testBatchTranscation") .incrementer(new RunIdIncrementer()) .listener(listener) .start(stepForTranscation) .build(); } @Bean public Step stepForTranscation(StepBuilderFactory stepBuilderFactory, @Qualifier("stepForTranscationReader")ListItemReader<String> reader, @Qualifier("stepForTranscationProcessor")ItemProcessor<String, String> processor, @Qualifier("stepForTranscationWriter")ItemWriter<String> writer) { return stepBuilderFactory.get("stepForTranscation") .<String, String> chunk(3) .reader(reader) .processor(processor) .writer(writer) .build(); } ...... //@Bean //public PlatformTransactionManager transactionManager(DataSource dataSource) { //return new DataSourceTransactionManager(dataSource); //} @Bean public PlatformTransactionManager transactionManager(EntityManagerFactory entityMngFactory) { return new JpaTransactionManager(entityMngFactory); } // end::jobstep[] @Bean public static JdbcTemplate jdbcTemplate(DataSource dataSource) { return new JdbcTemplate(dataSource); } }
上面的這段代碼中,其中有兩個被引用的類的參考代碼:
StringToStringDoNotingProcessor類:
public class StringToStringDoNotingProcessor implements ItemProcessor<String, String> { @Override public String process(String item) throws Exception { // TODO Auto-generated method stub return item; } }
這個是一個模擬Processor的代碼。通常的,Processor是對reader中的數據item進行處理,例如進行校驗,格式轉換或者運算等等,而後再把處理好的新數據item給到writer。但咱們例子中爲了化簡了這一過程,直接不作任何事。
因此也將其起名爲StringToStringDoNotingProcessor。
另外一個TestTableWriter類:
public class TestTableWriter implements ItemWriter<String> { private JdbcTemplate jdbcTemplate; public JdbcTemplate getJdbcTemplate() { return jdbcTemplate; } public void setJdbcTemplate(JdbcTemplate jdbcTemplate) { this.jdbcTemplate = jdbcTemplate; } @Override public void write(List<? extends String> indexVals) throws Exception { System.out.println("-------tx step writer--write()-------"); for(String tmpIndex : indexVals){ String key = "key_" + tmpIndex; String value = "value_" + tmpIndex; jdbcTemplate.update("insert into test_tbl values('" + key + "','" + value + "')"); } } }
這裏咱們是一個咱們本身實現的wrtier,其中有一個write方法,進行對數據庫的寫處理。通常的咱們設置了chunk的數值後,例如本例中咱們設置了3,Spring batch會按這樣的機制處理:每當累計有3條數據item到達writer後,會進行一次write()方法的調用,即寫一次數據庫。(若最後一次不足3條數據的時候,會進行最後一次寫的操做把剩餘數據item寫入)
★其餘代碼講解:
@Bean public Step stepForTranscation(StepBuilderFactory stepBuilderFactory, @Qualifier("stepForTranscationReader")ListItemReader<String> reader, @Qualifier("stepForTranscationProcessor")ItemProcessor<String, String> processor, @Qualifier("stepForTranscationWriter")ItemWriter<String> writer) { return stepBuilderFactory.get("stepForTranscation") .<String, String> chunk(3) .reader(reader) .processor(processor) .writer(writer) .build(); }
這個代碼是Spring batch配置代碼中的一小部分,是定義一個step的過程。其中指定了選用的reader,processor,writer組件,以及chunk的大小。這裏的chunk設置爲3,即每3個數據item爲一個chunk。
這裏的step的例子,是以批處理讀取數據,而後寫入到一個test_tbl的表的過程。
其中,stepForTranscationReader中,咱們設置了總共讀取12條數據item,而step中設置的chunk條數是3,因此,實質上這12條數據item是分了4個chunk,每一個chunk讀和處理3條,而後將3條數據item一次寫入數據庫,這裏的每一個chunk都是一個獨立的事務,若是在該step過程當中出錯了,則單獨對當前出錯的chunk進行回滾操做。
test_tbl的建表參考代碼以下:
CREATE TABLE `test_tbl` ( `key` varchar(10) DEFAULT NULL, `value` varchar(15) DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
即該表只有兩個字段,分別爲key和value。並且key和value的大小分別爲10和15個字符。
回到stepForTranscationReader中,咱們設置的12條數據item中,第8條數據是超過了數據庫表的限制的:
List<String> indexVals = new ArrayList<String>(); indexVals.add("001"); indexVals.add("002"); indexVals.add("003"); indexVals.add("004"); indexVals.add("005"); indexVals.add("006"); indexVals.add("007"); indexVals.add("008008008008"); indexVals.add("009"); indexVals.add("010"); indexVals.add("011"); indexVals.add("012");
這就是說,當咱們使用Spring batch處理到第8條數據的時候,會報數據庫異常。那咱們運行一下程序,看看Spring batch的事務機制是如何處理的。
啓動Spring boot以及Spring batch, 參考代碼以下:
@SpringBootApplication(scanBasePackages={"com.ljp.spring.batchtest"}) @EnableConfigurationProperties @EnableTransactionManagement public class Starter { public static void main(String[] args) throws JobExecutionAlreadyRunningException, org.springframework.batch.core.repository.JobRestartException, JobInstanceAlreadyCompleteException, JobParametersInvalidException, SchedulerException, BeansException, JsonProcessingException, ParseException, InterruptedException{ ApplicationContext context = SpringApplication.run(Starter.class, args); JobLauncher jobLauncher = (JobLauncher)context.getBean("jobLauncher"); SimpleJob testBatchJob = (SimpleJob) context.getBean("testBatchTranscation"); JobExecution execution = null; try { execution = jobLauncher.run(testBatchJob, new JobParametersBuilder().toJobParameters()); } catch (JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException | JobParametersInvalidException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (org.springframework.batch.core.repository.JobRestartException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
結果以下:
能夠看出一共有6條數據寫入了test_tbl表。
java後臺的運行console有以下信息:
2017-05-08 17:50:21.973 INFO 324136 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=testBatchTranscation]] launched with the following parameters: [{}]
2017-05-08 17:50:22.108 INFO 324136 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [stepForTranscation]
------tx step Reader--------
------tx step Processor--------
------tx step writer--------
-------tx step writer--write()-------
-------tx step writer--write()-------
-------tx step writer--write()-------
2017-05-08 17:50:22.591 INFO 324136 --- [ main] o.s.b.f.xml.XmlBeanDefinitionReader : Loading XML bean definitions from class path resource [org/springframework/jdbc/support/sql-error-codes.xml]
2017-05-08 17:50:23.177 INFO 324136 --- [ main] o.s.jdbc.support.SQLErrorCodesFactory : SQLErrorCodes loaded: [DB2, Derby, H2, HSQL, Informix, MS-SQL, MySQL, Oracle, PostgreSQL, Sybase, Hana]
2017-05-08 17:50:23.286 ERROR 324136 --- [ main] o.s.batch.core.step.AbstractStep : Encountered an error executing step stepForTranscation in job testBatchTranscation
org.springframework.dao.DataIntegrityViolationException: StatementCallback; SQL [insert into test_tbl values('key_008008008008','value_008008008008')]; Data truncation: Data too long for column 'key' at row 1;
nested exception is com.mysql.jdbc.MysqlDataTruncation: Data truncation: Data too long for column 'key' at row 1
at org.springframework.jdbc.support.SQLStateSQLExceptionTranslator.doTranslate(SQLStateSQLExceptionTranslator.java:102) ~[spring-jdbc-4.3.4.RELEASE.jar:4.3.4.RELEASE]
......
......
2017-05-08 17:50:23.695 INFO 324136 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=testBatchTranscation]] completed with the following parameters: [{}] and the following status: [FAILED]
咱們對結果的解讀是:
數據庫一共寫入了6條記錄,分別是Spring batch的step中的前兩個chunk所爲(每3個數據item爲一個chunk)。而後處處理第3個chunk的時候,007,008008008008,009這3個數據item中,第2個數據item超出了數據庫的限制長度,因此Java後臺console顯示會報出:「Data too long for column 'key' at row 1; 」的提示。因爲每個chunk咱們都設置了事務,因此,這個chunk中,哪怕007數據是能夠寫入數據庫的,但因爲008008008008這條數據報錯,因此致使整一個chunk進行回滾,而007數據也進行了回滾。另外因爲每個chunk的事務獨立,因此第3個chunk回滾的事件不會影響到前兩個chunk,因此001~006的6條數據item都能成功寫入數據庫。
此外,咱們還能夠去到數據庫,查找Spring batch持久化的表,進行進一步的瞭解:
這裏的數據庫表,告訴咱們,的確是發生了回滾。Spring batch總共commit了2次事務,分別有6條數據寫入,對應了2個chunk。而總共讀取了9條數據,即第3個chunk,但就在這時有數據錯誤,進行了回滾操做,整個step狀態爲FAILED。
Spring batch的容錯機制是一種與事務機制相結合的機制,它主要包括有3種操做:
1)restart
2)retry
3)skip
其中,restart是針對job來使用,retry和skip是針對step以及其內部組件來使用。
restart是重啓job的一個操做。通常的,只有job是失敗的狀況下,才能restart。前面也說了,相同的做業只能成功運行一次,若是須要再次運行,則須要改變JobParameters。
retry是對job的某一step而言,處理一條數據item的時候發現有異常,則重試一次該數據item的step的操做。
skip是對job的某一個step而言,處理一條數據item的時候發現有異常,則跳過該數據item的step的操做。
咱們來更改一下以前step的配置,參考代碼以下:
@Bean public Step stepForTranscation(StepBuilderFactory stepBuilderFactory, @Qualifier("stepForTranscationReader")ListItemReader<String> reader, @Qualifier("stepForTranscationProcessor")ItemProcessor<String, String> processor, @Qualifier("stepForTranscationWriter")ItemWriter<String> writer) { return stepBuilderFactory.get("stepForTranscation") .<String, String> chunk(3) .reader(reader) .processor(processor) .writer(writer).faultTolerant().retryLimit(3).retry(DataIntegrityViolationException.class).skipLimit(1).skip(DataIntegrityViolationException.class).startLimit(3) .build(); }
這個新的step配置中,咱們比以前多了一些配置項,以下:
.faultTolerant() .retryLimit(3) .retry(DataIntegrityViolationException.class) .skipLimit(1) .skip(DataIntegrityViolationException.class) .startLimit(3)
這裏就是retry,skip,restart的配置。
這裏設置了容許重試的次數爲3次,容許跳過的數據最多爲1條,若是job失敗了,運行重跑次數最多爲3次。
咱們從新運行程序,能夠獲得新的結果:
此次咱們看到了,12條數據中總共有11條數據進入到數據庫,而過長的008008008008數據,則由於設置了skip,因此容錯機制容許它不進入數據庫,此次的Spring batch最終沒有由於回滾而中斷。
咱們查閱一下Spring batch的持久化數據表:
咱們能夠看出,的確是有一條數據被跳過了,但由於是咱們容許它跳過的,因此整個job順利完成,即COMPLETED。
1,網文《全面解析spring batch大數據批處理框架》: http://mt.sohu.com/20161116/n473372684.shtml 2,網文《Spring batch的事務處理》: http://blog.csdn.net/karott/article/details/44154501 3,Spring batch 官網信息 : http://projects.spring.io/spring-batch/