Spring Batch_JOB重啓機制

Spring Batch_JOB重啓機制html

在這一篇文章 對於restart作了試驗,http://my.oschina.net/xinxingegeya/blog/344817在這片文章裏,咱們只是當job成功時,重啓了job,對於job失敗後,重啓job有什麼效果,我沒有演示,下面咱們就來演示一下當job失敗退出後,再重啓job有什麼效果。java

先作一個 致使job失敗的情景,以下的processor :mysql

ThrowExceptionProcessor.javaspring

package com.lyx.batch;

import org.springframework.batch.item.ItemProcessor;

public class ThrowExceptionProcessor implements
		ItemProcessor<People, PeopleDESC> {

	public PeopleDESC process(People item) throws Exception {
		System.out.println("process people desc");
		if ("lyx".equals(item.getFirstName())) {
			throw new InvalidDataException("invalid data");
		}
		return new PeopleDESC(item.getLastName(), item.getFirstName(), Thread
				.currentThread().getName());
	}
}


當判斷某條數據符合失敗條件後,拋出異常 ,致使job失敗。sql

下面是整個配置文件:spring-batch-failure-restart.xml數據庫

<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:batch="http://www.springframework.org/schema/batch"
	xmlns:context="http://www.springframework.org/schema/context"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
		http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd
		http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
	<!-- 包的掃描 -->
	<context:component-scan base-package="com.lyx.batch" />

	<bean id="exceptionHandler" class="com.lyx.batch.ExceptionListener" />

	<batch:step id="abstractStep" abstract="true">
		<batch:listeners>
			<batch:listener ref="exceptionHandler" />
		</batch:listeners>
	</batch:step>
	<bean id="abstractCursorReader" abstract="true"
		class="org.springframework.batch.item.database.JdbcCursorItemReader">
		<property name="dataSource" ref="dataSource" />
	</bean>

	<!-- add people desc job begin -->
	<batch:job id="addPeopleDescJob" restartable="true">
		<batch:step id="addDescStep" parent="abstractStep">
			<batch:tasklet allow-start-if-complete="true"
				start-limit="3">
				<batch:chunk reader="peopleAddDescReader" processor="throwExceptionProcessor"
					writer="addDescPeopleWriter" commit-interval="2" />
			</batch:tasklet>
		</batch:step>
	</batch:job>
	<!-- add people desc job end -->

	<bean id="peopleAddDescReader" parent="abstractCursorReader"
		scope="step">
		<property name="sql">
			<value><![CDATA[select first_name ,last_name from people where 
			first_name like ? or last_name like ?]]></value>
		</property>
		<property name="rowMapper" ref="peopleRowMapper" />
		<property name="preparedStatementSetter" ref="preparedStatementSetter" />
		<property name="fetchSize" value="20" />
	</bean>
	<bean id="peopleRowMapper" class="com.lyx.batch.PeopleRowMapper" />
	<bean id="preparedStatementSetter" class="com.lyx.batch.PeoplePreparedStatementSetter" />
	<bean id="throwExceptionProcessor" class="com.lyx.batch.ThrowExceptionProcessor" />
	<bean id="addDescPeopleWriter" class="com.lyx.batch.AddDescPeopleWriter" />

	<!--tomcat jdbc pool數據源配置 -->
	<bean id="dataSource" class="org.apache.tomcat.jdbc.pool.DataSource"
		destroy-method="close">
		<property name="poolProperties">
			<bean class="org.apache.tomcat.jdbc.pool.PoolProperties">
				<property name="driverClassName" value="com.mysql.jdbc.Driver" />
				<property name="url" value="jdbc:mysql://localhost:3306/test" />
				<property name="username" value="root" />
				<property name="password" value="034039" />
			</bean>
		</property>
	</bean>

	<!-- spring batch 配置jobRepository -->
	<batch:job-repository id="jobRepository"
		data-source="dataSource" transaction-manager="transactionManager"
		isolation-level-for-create="REPEATABLE_READ" table-prefix="BATCH_"
		max-varchar-length="1000" />
	<!-- spring的事務管理器 -->
	<bean id="transactionManager"
		class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
		<property name="dataSource" ref="dataSource" />
	</bean>

	<!-- batch luncher -->
	<bean id="jobLauncher"
		class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
		<property name="jobRepository" ref="jobRepository" />
	</bean>
</beans>


運行任務:apache

AppMain13.javatomcat

package com.lyx.batch;

import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

/**
 * 測試當任務失敗時,重啓任務
 * 
 * @author Lenovo
 *
 */
public class AppMain13 {
	public static void main(String[] args)
			throws JobExecutionAlreadyRunningException, JobRestartException,
			JobInstanceAlreadyCompleteException, JobParametersInvalidException {

		long startTime = System.currentTimeMillis(); // 獲取開始時間

		@SuppressWarnings("resource")
		ApplicationContext context = new ClassPathXmlApplicationContext(
				new String[] { "classpath:spring-batch-failure-restart.xml" });
		JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
		Job job = (Job) context.getBean("addPeopleDescJob");
		JobLauncher launcher = (JobLauncher) context.getBean("jobLauncher");
		JobExecution result = launcher.run(job,
				jobParametersBuilder.toJobParameters());
		ExitStatus es = result.getExitStatus();
		if (es.getExitCode().equals(ExitStatus.COMPLETED.getExitCode())) {
			System.out.println("任務正常完成");
		} else {
			System.out.println("任務失敗,exitCode=" + es.getExitCode());
		}

		long endTime = System.currentTimeMillis(); // 獲取結束時間
		System.out.println("程序運行時間: " + (endTime - startTime) + "ms");
	}
}


第一次運行的結果:

嚴重: Encountered an error executing step addDescStep in job addPeopleDescJobapp

com.lyx.batch.InvalidDataException: invalid dataless

at com.lyx.batch.ThrowExceptionProcessor.process(ThrowExceptionProcessor.java:11)

十一月 19, 2014 4:56:16 下午 org.springframework.batch.core.launch.support.SimpleJobLauncher run

信息: Job: [FlowJob: [name=addPeopleDescJob]] completed with the following parameters: [{}] and the following status: [FAILED]

任務失敗,exitCode=FAILED

程序運行時間: 7028ms

如上,顯示job失敗,那麼失敗的job 在spring batch 的meta table裏存儲了什麼信息:

mysql> select * from batch_step_execution \G
*************************** 1. row ***************************
 STEP_EXECUTION_ID: 1
           VERSION: 52
         STEP_NAME: addDescStep
  JOB_EXECUTION_ID: 1
        START_TIME: 2014-11-19 16:56:11
          END_TIME: 2014-11-19 16:56:16
            STATUS: FAILED
      COMMIT_COUNT: 50
        READ_COUNT: 102
      FILTER_COUNT: 0
       WRITE_COUNT: 100
   READ_SKIP_COUNT: 0
  WRITE_SKIP_COUNT: 0
PROCESS_SKIP_COUNT: 0
    ROLLBACK_COUNT: 1
         EXIT_CODE: FAILED
      EXIT_MESSAGE: com.lyx.batch.InvalidDataException: invalid data
        at com.lyx.batch.ThrowExceptionProcessor.process(ThrowExceptionProcessor.java:11)
        at com.lyx.batch.ThrowExceptionProcessor.process(ThrowExceptionProcessor.java:1)
        at org.springframework.batch.core.step.item.SimpleChunkProcessor.doProcess(SimpleChunkProcessor.java:126)
        at org.springframework.batch.core.step.item.SimpleChunkProcessor.transform(SimpleChunkProcessor.java:293)
        at org.springframework.batch.core.step.item.SimpleChunkProcessor.process(SimpleChunkProcessor.java:192)
        at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:75)
        at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:406)
        at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:330)
        at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:133)
        at o
      LAST_UPDATED: 2014-11-19 16:56:16
1 row in set (0.00 sec)

mysql>

這就是step 的運行信息,請注意幾個關鍵的數據,就是

      COMMIT_COUNT: 50
        READ_COUNT: 102
      FILTER_COUNT: 0
       WRITE_COUNT: 100
   READ_SKIP_COUNT: 0
  WRITE_SKIP_COUNT: 0
PROCESS_SKIP_COUNT: 0
    ROLLBACK_COUNT: 1

這些字段表示什麼意思,請看這裏:http://docs.spring.io/spring-batch/trunk/reference/html/metaDataSchema.html#metaDataBatchStepExecution

好了,咱們先作這些工做,先不去修正會致使異常拋出的數據,咱們第二次運行這個job


第二次運行的結果:

信息: Executing step: [addDescStep]

process people desc

十一月 19, 2014 5:03:30 下午 org.springframework.batch.core.step.AbstractStep execute

嚴重: Encountered an error executing step addDescStep in job addPeopleDescJob

com.lyx.batch.InvalidDataException: invalid data


任務失敗,exitCode=FAILED

程序運行時間: 3233ms

十一月 19, 2014 5:03:30 下午 org.springframework.batch.core.launch.support.SimpleJobLauncher run

信息: Job: [FlowJob: [name=addPeopleDescJob]] completed with the following parameters: [{}] and the following status: [FAILED]


如上所示,任務失敗,再來看一下失敗的任務在 spring batch meta table 裏存儲了什麼信息:

mysql> select * from batch_step_execution where step_execution_id = 2 \G
*************************** 1. row ***************************
 STEP_EXECUTION_ID: 2
           VERSION: 2
         STEP_NAME: addDescStep
  JOB_EXECUTION_ID: 2
        START_TIME: 2014-11-19 17:03:30
          END_TIME: 2014-11-19 17:03:30
            STATUS: FAILED
      COMMIT_COUNT: 0
        READ_COUNT: 2
      FILTER_COUNT: 0
       WRITE_COUNT: 0
   READ_SKIP_COUNT: 0
  WRITE_SKIP_COUNT: 0
PROCESS_SKIP_COUNT: 0
    ROLLBACK_COUNT: 1
         EXIT_CODE: FAILED
      EXIT_MESSAGE: com.lyx.batch.InvalidDataException: invalid data
        at com.lyx.batch.ThrowExceptionProcessor.process(ThrowExceptionProcessor.java:11)
        at com.lyx.batch.ThrowExceptionProcessor.process(ThrowExceptionProcessor.java:1)
        at org.springframework.batch.core.step.item.SimpleChunkProcessor.doProcess(SimpleChunkProcessor.java:126)
        at org.springframework.batch.core.step.item.SimpleChunkProcessor.transform(SimpleChunkProcessor.java:293)
        at org.springframework.batch.core.step.item.SimpleChunkProcessor.process(SimpleChunkProcessor.java:192)
        at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:75)
        at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:406)
        at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:330)
        at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:133)
        at o
      LAST_UPDATED: 2014-11-19 17:03:30
1 row in set (0.00 sec)

mysql>


要注意這些數據:

      COMMIT_COUNT: 0
        READ_COUNT: 2
      FILTER_COUNT: 0
       WRITE_COUNT: 0
   READ_SKIP_COUNT: 0
  WRITE_SKIP_COUNT: 0
PROCESS_SKIP_COUNT: 0
    ROLLBACK_COUNT: 1

看到了沒,這裏說明了讀出來的數據條數爲 2 , 除了回滾的次數爲 1 外,其餘爲 0 ;


第三次運行的結果:

在第三次運行前,咱們把數據庫裏的數據修正,再運行

update people set first_name = 'hello',last_name = 'DOE' where first_name = 'lyx';

修正完成,那麼第三次運行

信息: Job: [FlowJob: [name=addPeopleDescJob]] completed with the following parameters: [{}] and the following status: [COMPLETED]

任務正常完成

程序運行時間: 3960ms


好的,運行成功了,最重要的就是此時 spring batch在meta table 裏存的數據,再來看一下:

mysql> select * from batch_step_execution where step_execution_id = 3 \G
*************************** 1. row ***************************
 STEP_EXECUTION_ID: 3
           VERSION: 14
         STEP_NAME: addDescStep
  JOB_EXECUTION_ID: 3
        START_TIME: 2014-11-19 17:11:40
          END_TIME: 2014-11-19 17:11:42
            STATUS: COMPLETED
      COMMIT_COUNT: 12
        READ_COUNT: 23
      FILTER_COUNT: 0
       WRITE_COUNT: 23
   READ_SKIP_COUNT: 0
  WRITE_SKIP_COUNT: 0
PROCESS_SKIP_COUNT: 0
    ROLLBACK_COUNT: 0
         EXIT_CODE: COMPLETED
      EXIT_MESSAGE:
      LAST_UPDATED: 2014-11-19 17:11:42
1 row in set (0.00 sec)

mysql>

經過對這三次運行結果的分析,咱們能夠知道spring batch 對失敗的job進行restart ,不是從頭開始處理數據,而是從出錯的事務邊界內第一條記錄重複執行的,這樣便確保了數據完整性。

當 job 運行成功後(運行成功後也沒有必要進行restart),若是 restart 一個job,spring batch就會從第一條記錄開始讀數據,處理數據,致使數據被重複處理。

 

batch_step_execution 表字段含義

  • STEP_EXECUTION_ID: Primary key that uniquely identifies this execution. The value of this column should be obtainable by calling the getId method of the StepExecution object.

  • VERSION: See above section.

  • STEP_NAME: The name of the step to which this execution belongs.

  • JOB_EXECUTION_ID: Foreign key from the BATCH_JOB_EXECUTION table indicating the JobExecution to which this StepExecution belongs. There may be only one StepExecution for a given JobExecution for a given Step name.

  • START_TIME: Timestamp representing the time the execution was started.

  • END_TIME: Timestamp representing the time the execution was finished, regardless of success or failure. An empty value in this column even though the job is not currently running indicates that there has been some type of error and the framework was unable to perform a last save before failing.

  • STATUS: Character string representing the status of the execution. This may be COMPLETED, STARTED, etc. The object representation of this column is the BatchStatus enumeration.

  • COMMIT_COUNT: The number of times in which the step has committed a transaction during this execution.

  • READ_COUNT: The number of items read during this execution.

  • FILTER_COUNT: The number of items filtered out of this execution.

  • WRITE_COUNT: The number of items written and committed during this execution.

  • READ_SKIP_COUNT: The number of items skipped on read during this execution.

  • WRITE_SKIP_COUNT: The number of items skipped on write during this execution.

  • PROCESS_SKIP_COUNT: The number of items skipped during processing during this execution.

  • ROLLBACK_COUNT: The number of rollbacks during this execution. Note that this count includes each time rollback occurs, including rollbacks for retry and those in the skip recovery procedure.

  • EXIT_CODE: Character string representing the exit code of the execution. In the case of a command line job, this may be converted into a number.

  • EXIT_MESSAGE: Character string representing a more detailed description of how the job exited. In the case of failure, this might include as much of the stack trace as is possible.

  • LAST_UPDATED: Timestamp representing the last time this execution was persisted.

=======================END=======================

相關文章
相關標籤/搜索