根據不一樣的支付渠道選擇不一樣的step進行處理,主要經過實現JobExecutionDecider接口,返回不一樣的FlowExecutionStatus來決定step分支。java
其大體實現:mysql
一、maven依賴:redis
<dependencies> <dependency> <groupId>org.springframework.batch</groupId> <artifactId>spring-batch-core</artifactId> <version>3.0.6.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.batch</groupId> <artifactId>spring-batch-samples</artifactId> <version>1.1.4.RELEASE</version> <classifier>sources</classifier> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.5</version> </dependency> <dependency> <groupId>commons-dbcp</groupId> <artifactId>commons-dbcp</artifactId> <version>1.4</version> </dependency> </dependencies>
二、Reader實現spring
/** * Created by heyinbo on 2016/8/17. * 抽象數據讀取 */ public abstract class PayBillItemReader<T> implements ItemReader<T>, InitializingBean { /** * 當前數據所處位置 */ private volatile int current = 0; /** * 數據集 */ protected volatile List<T> result; private Object lock = new Object(); public T doReader() { synchronized (lock) { if (null == result) { doDownLoad(); } } int next = current++; if (null != result && next < result.size()) { return result.get(next); } return null; } public abstract void doDownLoad(); @Override public void afterPropertiesSet() throws Exception { } }
/** * Created by heyinbo on 2016/8/17. * 數據讀取的具體實現 * */ public class WXPayBillItemReader extends PayBillItemReader<PayBillItem> { private String id; /** * 下載日期 */ private Date loadDate; /** * 支付渠道 */ private String payWay; @Override public void doDownLoad() { //TODO 下載對帳單 } @Override public PayBillItem read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException { return doReader(); } public Date getLoadDate() { return loadDate; } public void setLoadDate(Date loadDate) { this.loadDate = loadDate; } public String getId() { return id; } public void setId(String id) { this.id = id; } public String getPayWay() { return payWay; } public void setPayWay(String payWay) { this.payWay = payWay; } }
二、writer實現sql
public class PayBillItemWriter implements ItemWriter<PayBillItem> { @Override public void write(List items) throws Exception { //TODO write database //測試事務 // throw new DemoException("database rollback"); } }
三、JobExecutionDecider實現apache
/** * Created by heyinbo on 2016/8/17. * 根據JobParameters傳入的值決策流程執行分支 */ public class PayBillJobExecutionDecider implements JobExecutionDecider { @Override public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) { //根據傳入的參數決定step,具體參考配置文件 String payWay = jobExecution.getJobParameters().getString("payWay"); return new FlowExecutionStatus(payWay); } }
五、測試運行app
public class DeciderJobLaunch { public static void main(String[] args) { ApplicationContext context = new ClassPathXmlApplicationContext("spring-batch-decider.xml"); JobLauncher launcher = (JobLauncher) context.getBean("jobLauncher"); Job job = (Job) context.getBean("deciderJob"); try { /* 運行Job */ JobExecution result = launcher.run(job, new JobParametersBuilder() .addString("id", "10010") .addString("payWay", "weixin") .addDate("loadDate", new Date()) .toJobParameters()); /* 處理結束,控制檯打印處理結果 */ System.out.println(result.getExitStatus().toString()); } catch (Exception e) { e.printStackTrace(); } } }
六、batch相關配置less
<bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher"> <property name="jobRepository" ref="jobRepository"/> </bean> <!-- 用於測試,job的相關狀態都保存在內存中 --> <!--<bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean"> </bean> <bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager"/>--> <!-- 默認會建立幾張系統表用於保存job的執行狀態 --> <bean id="jobRepository" class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean"> <property name="dataSource" ref="dataSource" /> <property name="transactionManager" ref="transactionManager"/> <property name="databaseType" value="mysql" /> </bean> <!-- 數據源 --> <bean id="dataSource" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="com.mysql.jdbc.Driver" /> <property name="url" value="jdbc:mysql://192.168.10.1:3306/pay?characterEncoding=UTF8"/> <property name="username" value="root"/> <property name="password" value="root"/> </bean> <!-- 事務管理 --> <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager"> <property name="dataSource" ref="dataSource" /> </bean>
<bean:import resource="spring-application.xml"/> <job id="deciderJob"> <step id="first" next="decision"> <tasklet ref="firstTasklet" /> </step> <decision id="decision" decider="decider"> <next on="weixin" to="wx_pay_step" /> <next on="*" to="end" /> </decision> <step id="wx_pay_step"> <!-- 該step中的transactionManager事務管理 --> <tasklet transaction-manager="transactionManager"> <!-- commit-interval=1000:沒1000條記錄提交一次到writer中 --> <!-- retry-limit=2:重試次數,遇到retryable-exception-classes對應的異常執行 --> <chunk reader="reader" processor="processer" writer="writer" commit-interval="1000" retry-limit="2"> <retryable-exception-classes> <include class="com.test.batch.simple.DemoException" /> </retryable-exception-classes> </chunk> </tasklet> </step> <step id="end"> <tasklet ref="endTasklet" /> </step> </job> <bean:bean id="decider" class="com.test.batch.simple.decider.PayBillJobExecutionDecider" /> <!-- 設置scope=step:保障jobParameter中值得傳遞 --> <bean:bean id="reader" class="com.test.batch.simple.decider.WXPayBillItemReader" scope="step"> <bean:property name="id" value="#{jobParameters['id']}" /> <bean:property name="loadDate" value="#{jobParameters['loadDate']}" /> <bean:property name="payWay" value="#{jobParameters['payWay']}" /> </bean:bean> <bean:bean id="processer" class="com.test.batch.simple.decider.WXPayBillItemProcesser" /> <bean:bean id="writer" class="com.test.batch.simple.decider.PayBillItemWriter" /> <bean:bean id="firstTasklet" class="com.test.batch.simple.tasklet.FirstTasklet"> <bean:property name="message" value="hello spring batch" /> </bean:bean> <bean:bean id="endTasklet" class="com.test.batch.simple.tasklet.EndTasklet"> <bean:property name="message" value="bye spring batch" /> </bean:bean>
以上就是整個job的處理流程,其中包括異常處理以及事務管理。maven
備註:優化對帳方式,將數據初始化到redis服務中,經過redis的sdiff命令進行比對篩選差集。目前採用redis是基於其擴展方便,利於後期針對不一樣渠道進行擴展。ide