Spring Batch_異步併發的processor && writerjava
普通的配置一個job,在這個demo中:http://my.oschina.net/xinxingegeya/blog/343190mysql
job的reader是經過遊標讀取,commit-interval="2"表示每讀取兩條數據,就要進行process,process完成以後就要進行write,process和write是同步進行的,也就是說spring
必須process兩條以後才能進行write,這二者不能異步進行。無疑,當process過程處理時間過長時,會拖慢整個過程的效率。還有process過程是single thread進行處理的,一個線程中處理兩條數據sql
比用兩個線程處理兩條數據效率要慢的多(當處理一條數據花費的時間比較多時),這樣會拖慢process過程的效率。apache
那麼如何提升整個批處理過程的效率?tomcat
對於proceess和write過程異步化多線程
在process過程使用多線程處理數據併發
主要的代碼和配置:app
spring-batch-async.xml異步
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:batch="http://www.springframework.org/schema/batch" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> <!-- 包的掃描 --> <context:component-scan base-package="com.lyx.batch" /> <bean id="exceptionHandler" class="com.lyx.batch.ExceptionListener" /> <batch:step id="abstractStep" abstract="true"> <batch:listeners> <batch:listener ref="exceptionHandler" /> </batch:listeners> </batch:step> <bean id="abstractCursorReader" abstract="true" class="org.springframework.batch.item.database.JdbcCursorItemReader"> <property name="dataSource" ref="dataSource" /> </bean> <batch:job id="addPeopleDescJob"> <batch:step id="addDescStep" parent="abstractStep"> <batch:tasklet> <batch:chunk reader="peopleAddDescReader" processor="asyncProcessor" writer="asynWriter" commit-interval="2" /> </batch:tasklet> </batch:step> </batch:job> <bean id="peopleAddDescReader" parent="abstractCursorReader" scope="step"> <property name="sql"> <value><![CDATA[select first_name ,last_name from people where first_name like ? or last_name like ?]]></value> </property> <property name="rowMapper" ref="peopleRowMapper" /> <property name="preparedStatementSetter" ref="preparedStatementSetter" /> <property name="fetchSize" value="20" /> </bean> <!--配置異步併發的processor和wirter --> <bean id="asyncProcessor" class="com.lyx.batch.AsyncPeopleAddDescItemProcessor"> <property name="taskExecutor" ref="asyncExecutor" /> </bean> <bean id="asyncExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor" /> <bean id="asynWriter" class="com.lyx.batch.AsyncPeopleAddDescItemWriter" /> <bean id="peopleRowMapper" class="com.lyx.batch.PeopleRowMapper" /> <bean id="preparedStatementSetter" class="com.lyx.batch.PeoplePreparedStatementSetter" /> <!--tomcat jdbc pool數據源配置 --> <bean id="dataSource" class="org.apache.tomcat.jdbc.pool.DataSource" destroy-method="close"> <property name="poolProperties"> <bean class="org.apache.tomcat.jdbc.pool.PoolProperties"> <property name="driverClassName" value="com.mysql.jdbc.Driver" /> <property name="url" value="jdbc:mysql://localhost:3306/test" /> <property name="username" value="root" /> <property name="password" value="034039" /> </bean> </property> </bean> <!-- spring batch 配置jobRepository --> <batch:job-repository id="jobRepository" data-source="dataSource" transaction-manager="transactionManager" isolation-level-for-create="REPEATABLE_READ" table-prefix="BATCH_" max-varchar-length="1000" /> <!-- spring的事務管理器 --> <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager"> <property name="dataSource" ref="dataSource" /> </bean> <!-- batch luncher --> <bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher"> <property name="jobRepository" ref="jobRepository" /> </bean> </beans>
AsyncPeopleAddDescItemProcessor.java
package com.lyx.batch; import java.util.concurrent.Callable; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; import org.springframework.batch.item.ItemProcessor; import org.springframework.core.task.SyncTaskExecutor; import org.springframework.core.task.TaskExecutor; public class AsyncPeopleAddDescItemProcessor implements ItemProcessor<People, Future<PeopleDESC>> { /** * SyncTaskExecutor會在當前線程執行完客戶提交給它的任務,即它是以同步方式完成任務的執行的。 * */ private TaskExecutor taskExecutor = new SyncTaskExecutor(); public Future<PeopleDESC> process(final People item) throws Exception { FutureTask<PeopleDESC> task = new FutureTask<PeopleDESC>( new Callable<PeopleDESC>() { public PeopleDESC call() throws Exception { Thread.sleep(2000); System.out.println("process people desc"); return new PeopleDESC(item.getLastName(), item.getFirstName(), Thread.currentThread() .getName()); } }); this.taskExecutor.execute(task); return task; } public TaskExecutor getTaskExecutor() { return this.taskExecutor; } public void setTaskExecutor(TaskExecutor taskExecutor) { this.taskExecutor = taskExecutor; } }
AsyncPeopleAddDescItemWriter.java
package com.lyx.batch; import java.util.LinkedList; import java.util.List; import java.util.concurrent.Future; import javax.sql.DataSource; import org.springframework.batch.item.ItemWriter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Component; @Component public class AsyncPeopleAddDescItemWriter implements ItemWriter<Future<PeopleDESC>> { private JdbcTemplate jdbcTemplate; @Autowired public void setDataSource(DataSource dataSource) { this.jdbcTemplate = new JdbcTemplate(dataSource); } public void write(List<? extends Future<PeopleDESC>> items) throws Exception { LinkedList<Future<PeopleDESC>> linklist = new LinkedList<Future<PeopleDESC>>( items); Future<PeopleDESC> future; // the head of this linklist while ((future = linklist.poll()) != null) { if (future.isDone()) { if (!future.isCancelled()) { System.out.println("write people desc"); PeopleDESC peopleDESC = future.get(); this.jdbcTemplate .update("insert into ok_people (first_name, last_name, batch_desc) values (?, ?, ?)", peopleDESC.getFirstName(), peopleDESC.getLastName(), peopleDESC.getDesc()); } } else { linklist.addLast(future); } } } }
AppMain5.java
package com.lyx.batch; import org.springframework.batch.core.ExitStatus; import org.springframework.batch.core.Job; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobParametersBuilder; import org.springframework.batch.core.JobParametersInvalidException; import org.springframework.batch.core.launch.JobLauncher; import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException; import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException; import org.springframework.batch.core.repository.JobRestartException; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; /** * spring batch 的異步任務執行 reader processor writer之間的異步執行 程序運行時間: 108884ms * * @author Lenovo * */ public class AppMain5 { public static void main(String[] args) throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, JobParametersInvalidException { long startTime = System.currentTimeMillis(); // 獲取開始時間 @SuppressWarnings("resource") ApplicationContext context = new ClassPathXmlApplicationContext( new String[] { "classpath:spring-batch-async.xml" }); JobParametersBuilder jobParametersBuilder = new JobParametersBuilder(); Job job = (Job) context.getBean("addPeopleDescJob"); JobLauncher launcher = (JobLauncher) context.getBean("jobLauncher"); JobExecution result = launcher.run(job, jobParametersBuilder.toJobParameters()); ExitStatus es = result.getExitStatus(); if (es.getExitCode().equals(ExitStatus.COMPLETED.getExitCode())) { System.out.println("任務正常完成"); } else { System.out.println("任務失敗,exitCode=" + es.getExitCode()); } long endTime = System.currentTimeMillis(); // 獲取結束時間 System.out.println("程序運行時間: " + (endTime - startTime) + "ms"); } }
以上就是主要的關於異步併發的代碼。
若是使用異步併發的效果是很是明顯的,前提條件是處理一條數據花費時間過長,若是不是很長,在極短的時間內就能處理完,使用異步併發和不使用異步併發差異不是很明顯。
下面是我沒有使用異步併發作測試的類,在process過程當中thread.sleep(),2秒鐘的時間,同時,異步併發的process也是thread.sleep()兩秒的時間。運行一下看看時間上的差異:
(處理的數據是100條,前者是沒有使用異步併發,後者使用異步併發)
* 程序運行時間: 213657ms 程序運行時間: 108884ms
AddPeopleDescProcessor.java
package com.lyx.batch; import org.springframework.batch.item.ItemProcessor; public class AddPeopleDescProcessor implements ItemProcessor<People, PeopleDESC> { public PeopleDESC process(People item) throws Exception { Thread.sleep(2000); System.out.println("process people desc"); return new PeopleDESC(item.getLastName(), item.getFirstName(), Thread .currentThread().getName()); } }
AddDescPeopleWriter.java
package com.lyx.batch; import java.util.List; import javax.sql.DataSource; import org.springframework.batch.item.ItemWriter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Component; @Component public class AddDescPeopleWriter implements ItemWriter<PeopleDESC> { private JdbcTemplate jdbcTemplate; @Autowired public void setDataSource(DataSource dataSource) { this.jdbcTemplate = new JdbcTemplate(dataSource); } public void write(List<? extends PeopleDESC> items) throws Exception { for (PeopleDESC peopleDESC : items) { System.out.println("write people desc"); this.jdbcTemplate .update("insert into ok_people (first_name, last_name, batch_desc) values (?, ?, ?)", peopleDESC.getFirstName(), peopleDESC.getLastName(), peopleDESC.getDesc()); } } }
=============END=============