Spring Batch_Parallel Steps_使用並行的Stephtml
spring 官方文檔:http://docs.spring.io/spring-batch/trunk/reference/html/scalability.html#scalabilityParallelStepsjava
As long as the application logic that needs to be parallelized can be split into distinct responsibilities, and assigned to individual steps then it can be parallelized in a single process. Parallel Step execution is easy to configure and use, for example, to execute steps (step1,step2) in parallel with step3, you could configure a flow like this:mysql
<job id="job1"> <split id="split1" task-executor="taskExecutor" next="step4"> <flow> <step id="step1" parent="s1" next="step2"/> <step id="step2" parent="s2"/> </flow> <flow> <step id="step3" parent="s3"/> </flow> </split> <step id="step4" parent="s4"/> </job>
<beans:bean id="taskExecutor" class="org.spr...SimpleAsyncTaskExecutor"/>spring
The configurable "task-executor" attribute is used to specify which TaskExecutor implementation should be used to execute the individual flows. The default is SyncTaskExecutor, but an asynchronous TaskExecutor is required to run the steps in parallel. Note that the job will ensure that every flow in the split completes before aggregating the exit statuses and transitioning.sql
多個step之間的並行化,能夠提升批處理的效率。什麼狀況下能夠應用step之間的並行化,那就要根據具體的業務需求來定。數據庫
那咱們假設有這樣一種場景:有一類數據,分別存在於文件和數據庫,數據的內容同樣,只是形式不同,那麼咱們能夠定義並行的step來分別處理來自文件的數據和來自數據庫的數據,而後,分別進行一樣的processor,而後寫入數據庫。apache
下面咱們就剛纔講的那種場景實現咱們並行的step,以下:tomcat
spring-batch-split.xmlapp
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:batch="http://www.springframework.org/schema/batch" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> <!-- 包的掃描 --> <context:component-scan base-package="com.lyx.batch" /> <bean id="exceptionHandler" class="com.lyx.batch.ExceptionListener" /> <batch:step id="abstractStep" abstract="true"> <batch:listeners> <batch:listener ref="exceptionHandler" /> </batch:listeners> </batch:step> <bean id="abstractCursorReader" abstract="true" class="org.springframework.batch.item.database.JdbcCursorItemReader"> <property name="dataSource" ref="dataSource" /> </bean> <bean id="taskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor" /> <!-- 並行的step --> <batch:job id="addPeopleDescJob"> <batch:split id="split1" task-executor="taskExecutor"> <batch:flow> <batch:step id="parallel_step_1"> <batch:tasklet> <batch:chunk reader="peopleAddDescReader_db" processor="addDescProcessor" writer="addDescPeopleWriter" commit-interval="10" /> </batch:tasklet> </batch:step> </batch:flow> <batch:flow> <batch:step id="parallel_step_2"> <batch:tasklet> <batch:chunk reader="peopleAddDescReader_file" processor="addDescProcessor" writer="addDescPeopleWriter" commit-interval="10" /> </batch:tasklet> </batch:step> </batch:flow> </batch:split> </batch:job> <!-- 從數據庫讀取數據的reader --> <bean id="peopleAddDescReader_db" parent="abstractCursorReader" scope="step"> <property name="sql"> <value><![CDATA[select first_name ,last_name from people where first_name like ? or last_name like ?]]></value> </property> <property name="rowMapper" ref="peopleRowMapper" /> <property name="preparedStatementSetter" ref="preparedStatementSetter" /> <property name="fetchSize" value="20" /> </bean> <bean id="peopleRowMapper" class="com.lyx.batch.PeopleRowMapper" /> <bean id="preparedStatementSetter" class="com.lyx.batch.PeoplePreparedStatementSetter" /> <bean id="addDescProcessor" class="com.lyx.batch.AddPeopleDescProcessor" /> <bean id="addDescPeopleWriter" class="com.lyx.batch.AddDescPeopleWriter" /> <!-- 從文件中讀取數據的reader --> <bean id="lineTokenizer" class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer"> <property name="delimiter" value="," /> <property name="names"> <list> <value>firstName</value> <value>lastName</value> </list> </property> </bean> <bean id="fieldSetMapper" class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper"> <property name="prototypeBeanName" value="people" /> </bean> <bean id="people" class="com.lyx.batch.People" scope="prototype" /> <bean id="lineMapper" class="org.springframework.batch.item.file.mapping.DefaultLineMapper"> <property name="lineTokenizer" ref="lineTokenizer" /> <property name="fieldSetMapper" ref="fieldSetMapper" /> </bean> <bean id="resource" class="org.springframework.core.io.ClassPathResource"> <constructor-arg index="0" type="java.lang.String" value="sample-data.csv" /> </bean> <bean id="peopleAddDescReader_file" class="org.springframework.batch.item.file.FlatFileItemReader"> <property name="resource" ref="resource" /> <property name="encoding" value="utf-8" /> <property name="lineMapper" ref="lineMapper" /> </bean> <!--從文件讀取數據的reader end --> <!--tomcat jdbc pool數據源配置 --> <bean id="dataSource" class="org.apache.tomcat.jdbc.pool.DataSource" destroy-method="close"> <property name="poolProperties"> <bean class="org.apache.tomcat.jdbc.pool.PoolProperties"> <property name="driverClassName" value="com.mysql.jdbc.Driver" /> <property name="url" value="jdbc:mysql://localhost:3306/test" /> <property name="username" value="root" /> <property name="password" value="034039" /> </bean> </property> </bean> <!-- spring batch 配置jobRepository --> <batch:job-repository id="jobRepository" data-source="dataSource" transaction-manager="transactionManager" isolation-level-for-create="REPEATABLE_READ" table-prefix="BATCH_" max-varchar-length="1000" /> <!-- spring的事務管理器 --> <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager"> <property name="dataSource" ref="dataSource" /> </bean> <!-- batch luncher --> <bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher"> <property name="jobRepository" ref="jobRepository" /> </bean> </beans>
以上就是step 並行化的主要配置。async
AppMain7.java
package com.lyx.batch; import org.springframework.batch.core.ExitStatus; import org.springframework.batch.core.Job; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobParametersBuilder; import org.springframework.batch.core.JobParametersInvalidException; import org.springframework.batch.core.launch.JobLauncher; import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException; import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException; import org.springframework.batch.core.repository.JobRestartException; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; /** * 測試並行的job * * @author Lenovo * */ public class AppMain7 { public static void main(String[] args) throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, JobParametersInvalidException { long startTime = System.currentTimeMillis(); // 獲取開始時間 @SuppressWarnings("resource") ApplicationContext context = new ClassPathXmlApplicationContext( new String[] { "classpath:spring-batch-split.xml" }); JobParametersBuilder jobParametersBuilder = new JobParametersBuilder(); Job job = (Job) context.getBean("addPeopleDescJob"); JobLauncher launcher = (JobLauncher) context.getBean("jobLauncher"); JobExecution result = launcher.run(job, jobParametersBuilder.toJobParameters()); ExitStatus es = result.getExitStatus(); if (es.getExitCode().equals(ExitStatus.COMPLETED.getExitCode())) { System.out.println("任務正常完成"); } else { System.out.println("任務失敗,exitCode=" + es.getExitCode()); } long endTime = System.currentTimeMillis(); // 獲取結束時間 System.out.println("程序運行時間: " + (endTime - startTime) + "ms"); } }
==================END==================