SpringBatch是解決企業數據邏輯較簡單,重複性高,大數據量而設計的.從他提供的各類Reader就能看出來.起碼我是這樣理解的.最適合作的如:數據清洗,數據分析後轉移,或者定時的和其餘系統交互的地方等. java
在上一篇文章中,我使用了 JdbcPagingItemReader讀取HSQLDB數據庫的數據. spring
<bean id="sysAppStoreMapper" class="net.dbatch.mapper.SysAppStoreMapper" /> <bean id="dbReader" class="org.springframework.batch.item.database.JdbcPagingItemReader"> <property name="dataSource" ref="dataSource"/> <property name="rowMapper" ref="sysAppStoreMapper"/> <property name="queryProvider" ref="appQueryProvider"/> </bean> <bean id="appQueryProvider" class="org.springframework.batch.item.database.support.HsqlPagingQueryProvider"> <property name="selectClause" value="a.APP_ID, a.PARENT_ID, a.APP_DESC, a.APP_URL, a.FOLDER, a.SEQ"/> <property name="fromClause" value="sys_appstore a"/> <property name="sortKey" value="SEQ"/> </bean>
事實上SpringBatch提供了不少的Reader,自定義的Reader只要是繼承自org.springframework.batch.item.ItemReader接口的均可以.可是好多都不用你麻煩了,SpringBatch都替你作好了.2.1.8API中基本經常使用的和數據庫[Hibernate/Ibatis/JDBC],文件系統,JMS消息等Reader現成的實現.如圖: sql
對於喜歡SpringJDBC的用戶[我就很是不喜歡Hibernate ]可使用JdbcPagingItemReader shell
,而後指定一個queryProvider ,queryProvider 是針對各類數據庫的一個分頁的實現,經常使用的數據庫 queryProvider也有現成的.如圖: 數據庫
好了.若是上面你實在找不到你可使用的數據庫對應的實現,而你又瞭解你的數據庫SQL,你可使用JdbcCursorItemReader.這個Reader容許你本身set SQL. app
如我上面實現的例子,用JdbcCursorItemReader改寫也很是簡單: ide
<bean id="dbReader" class="org.springframework.batch.item.database.JdbcCursorItemReader"> <property name="dataSource" ref="dataSource" /> <property name="sql" value="select a.APP_ID, a.PARENT_ID, a.APP_DESC, a.APP_URL, a.FOLDER from sys_appstore a order by a.SEQ" /> <property name="rowMapper" ref="sysAppStoreMapper" /> </bean>
他仍然能夠工做的很好,並且還簡單了. 測試
若是個人數據來源不是從數據庫,從文件的怎麼辦? 大數據
看到剛纔的Reader實現裏有個FlatFileItemReader沒?他就是讀取文件[文本文件]的. ui
假如我要分析這樣結構的log日誌信息
User1,20 User2,21 User3,22 User4,23 User5,24 User6,25 User7,26 User8,27 User9,28 User10,29
他都是一些結構化的文本文件,我能夠很容易的實現.如Spring代碼:
<bean id="delimitedLineTokenizer" class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer" /> <bean id="lineMapper" class="org.springframework.batch.item.file.mapping.DefaultLineMapper"> <property name="lineTokenizer" ref="delimitedLineTokenizer" /> <property name="fieldSetMapper"> <bean class="net.dbatch.sample.UserMapper" /> </property> </bean> <bean id="messageReader" class="org.springframework.batch.item.file.FlatFileItemReader"> <property name="lineMapper" ref="lineMapper" /> <property name="resource" value="classpath:/users.txt" /> </bean>
再寫上一個對應的Bean
public class UserMapper implements FieldSetMapper<User> { public User mapFieldSet(FieldSet fs) throws BindException { User u = new User(); u.setName(fs.readString(0)); u.setAge(fs.readInt(1)); return u; } }
Processor:
public class MessagesItemProcessor implements ItemProcessor<User, Message> { public Message process(User user) throws Exception { if(!StringUtils.hasText(user.getName())){ throw new RuntimeException("The user name is required!"); } Message m = new Message();//Message是user一個簡單的包裝 m.setUser(user); m.setContent("Hello " + user.getName() + ",please pay promptly at end of this month."); return m; } }
Writer:
public class MessagesItemWriter implements ItemWriter<Message> { public void write(List<? extends Message> messages) throws Exception { System.out.println("write results"); for (Message m : messages) { System.out.println(m.getContent()); //只作輸出 } } }
測試代碼:
public static void main(String[] args) { ClassPathXmlApplicationContext c = new ClassPathXmlApplicationContext("localfile_job.xml"); SimpleJobLauncher launcher = new SimpleJobLauncher(); launcher.setJobRepository((JobRepository) c.getBean("jobRepository")); launcher.setTaskExecutor(new SyncTaskExecutor()); try { JobExecution je = launcher.run((Job) c.getBean("messageJob"), new JobParametersBuilder().toJobParameters()); System.out.println(je); System.out.println(je.getJobInstance()); System.out.println(je.getStepExecutions()); } catch (Exception e) { e.printStackTrace(); } }
輸出:
10-20 15:28:32 INFO [job.SimpleStepHandler] - <Executing step: [messageStep]> write results Hello User1,please pay promptly at end of this month. Hello User2,please pay promptly at end of this month. Hello User3,please pay promptly at end of this month. Hello User4,please pay promptly at end of this month. Hello User5,please pay promptly at end of this month. write results Hello User6,please pay promptly at end of this month. Hello User7,please pay promptly at end of this month. Hello User8,please pay promptly at end of this month. Hello User9,please pay promptly at end of this month. Hello User10,please pay promptly at end of this month. 10-20 15:28:32 INFO [support.SimpleJobLauncher] - <Job: [FlowJob: [name=messageJob]] completed with the following parameters: [{run.month=2011-10}] and the following status: [COMPLETED]> JobExecution: id=0, version=2, startTime=Sat Oct 20 15:28:32 CST 2012, endTime=Sat Oct 20 15:28:32 CST 2012, lastUpdated=Sat Oct 20 15:28:32 CST 2012, status=COMPLETED, exitStatus=exitCode=COMPLETED;exitDescription=, job=[JobInstance: id=0, version=0, JobParameters=[{run.month=2011-10}], Job=[messageJob]] JobInstance: id=0, version=0, JobParameters=[{run.month=2011-10}], Job=[messageJob] [StepExecution: id=1, version=5, name=messageStep, status=COMPLETED, exitStatus=COMPLETED, readCount=10, filterCount=0, writeCount=10 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=3, rollbackCount=0, exitDescription=]
從日誌裏,咱們能夠清楚的看到.他是每行的讀取並送入Processor中處理,完成5次讀取進行一次性的寫入.tasklet的屬性 commit-interval能夠調節此值.
所有的Spring配置:
<batch:job id="messageJob" restartable="true"> <batch:step id="messageStep"> <batch:tasklet> <batch:chunk reader="messageReader" processor="messageProcessor" writer="messageWriter" commit-interval="5" chunk-completion-policy="" retry-limit="2"> <batch:retryable-exception-classes> <batch:include class="java.lang.RuntimeException" /> </batch:retryable-exception-classes> </batch:chunk> </batch:tasklet> </batch:step> </batch:job>