Spring Batch JobExecutionDecider

    根據不一樣的支付渠道選擇不一樣的step進行處理,主要經過實現JobExecutionDecider接口,返回不一樣的FlowExecutionStatus來決定step分支。java

    其大體實現:mysql

    一、maven依賴:redis

<dependencies>
    <dependency>
        <groupId>org.springframework.batch</groupId>
        <artifactId>spring-batch-core</artifactId>
        <version>3.0.6.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.batch</groupId>
        <artifactId>spring-batch-samples</artifactId>
        <version>1.1.4.RELEASE</version>
        <classifier>sources</classifier>
    </dependency>
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>5.1.5</version>
    </dependency>
    <dependency>
        <groupId>commons-dbcp</groupId>
        <artifactId>commons-dbcp</artifactId>
        <version>1.4</version>
    </dependency>
</dependencies>

    二、Reader實現spring

/**
 * Created by heyinbo on 2016/8/17.
 * 抽象數據讀取
 */
public abstract class PayBillItemReader<T> implements ItemReader<T>, InitializingBean {
    /**
     * 當前數據所處位置
     */
    private volatile int current = 0;
    /**
     * 數據集
     */
    protected volatile List<T> result;

    private Object lock = new Object();

    public T doReader() {
        synchronized (lock) {
            if (null == result) {
                doDownLoad();
            }
        }
        int next = current++;
        if (null != result && next < result.size()) {
            return result.get(next);
        }
        return null;
    }

    public abstract void doDownLoad();

    @Override
    public void afterPropertiesSet() throws Exception {
    }
}
/**
 * Created by heyinbo on 2016/8/17.
 * 數據讀取的具體實現
 *
 */
public class WXPayBillItemReader extends PayBillItemReader<PayBillItem> {

    private String id;
    /**
     * 下載日期
     */
    private Date loadDate;
    /**
     * 支付渠道
     */
    private String payWay;

    @Override
    public void doDownLoad() {
        //TODO 下載對帳單
    }

    @Override
    public PayBillItem read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
        return doReader();
    }

    public Date getLoadDate() {
        return loadDate;
    }

    public void setLoadDate(Date loadDate) {
        this.loadDate = loadDate;
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getPayWay() {
        return payWay;
    }

    public void setPayWay(String payWay) {
        this.payWay = payWay;
    }
}

    二、writer實現sql

public class PayBillItemWriter implements ItemWriter<PayBillItem> {

    @Override
    public void write(List items) throws Exception {
        //TODO write database

        //測試事務
//        throw new DemoException("database rollback");
    }
}

    三、JobExecutionDecider實現apache

/**
 * Created by heyinbo on 2016/8/17.
 * 根據JobParameters傳入的值決策流程執行分支
 */
public class PayBillJobExecutionDecider implements JobExecutionDecider {

    @Override
    public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
     //根據傳入的參數決定step,具體參考配置文件
        String payWay = jobExecution.getJobParameters().getString("payWay");
        return new FlowExecutionStatus(payWay);
    }
}  

    五、測試運行app

public class DeciderJobLaunch {
    public static void main(String[] args) {
        ApplicationContext context = new ClassPathXmlApplicationContext("spring-batch-decider.xml");
        JobLauncher launcher = (JobLauncher) context.getBean("jobLauncher");
        Job job = (Job) context.getBean("deciderJob");
        try {
            /* 運行Job */
            JobExecution result = launcher.run(job, new JobParametersBuilder()
                    .addString("id", "10010")
                    .addString("payWay", "weixin")
                    .addDate("loadDate", new Date())
                    .toJobParameters());
            /* 處理結束,控制檯打印處理結果 */
            System.out.println(result.getExitStatus().toString());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}  

    六、batch相關配置less

<bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
   <property name="jobRepository" ref="jobRepository"/>
</bean>

<!-- 用於測試,job的相關狀態都保存在內存中 -->
<!--<bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
</bean>

<bean id="transactionManager"
     class="org.springframework.batch.support.transaction.ResourcelessTransactionManager"/>-->

<!-- 默認會建立幾張系統表用於保存job的執行狀態 -->
<bean id="jobRepository" class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean">
   <property name="dataSource" ref="dataSource" />
   <property name="transactionManager" ref="transactionManager"/>
   <property name="databaseType" value="mysql" />
</bean>
<!-- 數據源 -->
<bean id="dataSource" class="org.apache.commons.dbcp.BasicDataSource"
     destroy-method="close">
   <property name="driverClassName" value="com.mysql.jdbc.Driver" />
   <property name="url" value="jdbc:mysql://192.168.10.1:3306/pay?characterEncoding=UTF8"/>
   <property name="username" value="root"/>
   <property name="password" value="root"/>
</bean>

<!-- 事務管理 -->
<bean id="transactionManager"
     class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
   <property name="dataSource" ref="dataSource" />
</bean>
<bean:import resource="spring-application.xml"/>

<job id="deciderJob">
   <step id="first" next="decision">
      <tasklet ref="firstTasklet" />
   </step>
   <decision id="decision" decider="decider">
      <next on="weixin" to="wx_pay_step" />
      <next on="*" to="end" />
   </decision>
   <step id="wx_pay_step">
      <!-- 該step中的transactionManager事務管理 -->
      <tasklet transaction-manager="transactionManager">
         <!-- commit-interval=1000:沒1000條記錄提交一次到writer中 -->
         <!-- retry-limit=2:重試次數,遇到retryable-exception-classes對應的異常執行 -->
         <chunk reader="reader" processor="processer" writer="writer" commit-interval="1000" retry-limit="2">
            <retryable-exception-classes>
               <include class="com.test.batch.simple.DemoException" />
            </retryable-exception-classes>
         </chunk>
      </tasklet>
   </step>
   <step id="end">
      <tasklet ref="endTasklet" />
   </step>
</job>

<bean:bean id="decider" class="com.test.batch.simple.decider.PayBillJobExecutionDecider" />

<!-- 設置scope=step:保障jobParameter中值得傳遞 -->
<bean:bean id="reader" class="com.test.batch.simple.decider.WXPayBillItemReader" scope="step">
   <bean:property name="id" value="#{jobParameters['id']}" />
   <bean:property name="loadDate" value="#{jobParameters['loadDate']}" />
   <bean:property name="payWay" value="#{jobParameters['payWay']}" />
</bean:bean>

<bean:bean id="processer" class="com.test.batch.simple.decider.WXPayBillItemProcesser" />

<bean:bean id="writer" class="com.test.batch.simple.decider.PayBillItemWriter" />

<bean:bean id="firstTasklet" class="com.test.batch.simple.tasklet.FirstTasklet">
   <bean:property name="message" value="hello spring batch" />
</bean:bean>

<bean:bean id="endTasklet" class="com.test.batch.simple.tasklet.EndTasklet">
   <bean:property name="message" value="bye spring batch" />
</bean:bean>

    以上就是整個job的處理流程,其中包括異常處理以及事務管理。maven

備註:優化對帳方式,將數據初始化到redis服務中,經過redis的sdiff命令進行比對篩選差集。目前採用redis是基於其擴展方便,利於後期針對不一樣渠道進行擴展。ide

相關文章
相關標籤/搜索