springbatch每小時讀取實時追加數據的每日報表入庫

需求:每小時定時讀取文件,文件名稱是按日期遞增,文件信息爲同一個文件追加數據,須要每次批處理操做讀取的時候設置起始行;java

 

目前咱們項目中所採用的方法,不知道還有沒有更好的方法,但願你們分享web

 

一、建立項目,配置數據源就不講了,個人是springboot項目,其中(1)爲springbatch項目的文件目錄(2)爲須要處理的文件,(3)爲yml中的springbatch配置,其中個屬性配置web應用啓動的時候不執行批處理,默認爲true執行,項目中是經過定時任務來執行賠處理因此設置爲啓動不執行,第二個屬性爲設置springbatch是否每次執行都建立其數據庫表,第一次執行須要設置爲true,初始化完數據庫表以後設爲false不然會報錯  注意:引入的springbatch依賴中不要有內存數據庫的依賴spring


 
 二、job配置文件sql

package com.st.batch.job;

import com.st.batch.listener.JobCompletionNotificationListener;
import com.st.batch.step.MemberInfoStepConf;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableBatchProcessing
public class JobConf {

    @Autowired
    public JobBuilderFactory jobBuilderFactory;

    @Autowired
    public Step memberInfoStep;
    @Bean
    public Job MFJob(JobCompletionNotificationListener listener) {

        return jobBuilderFactory.get("MFJob")
                .incrementer(new RunIdIncrementer())
                //.listener(listener)
                .flow(memberInfoStep)
                .end()
                .build();
    }

}

 

 

 三、step配置文件數據庫

 

package com.st.batch.step;

import com.st.batch.entity.MemberInfo;
import com.st.batch.listener.JobCompletionNotificationListener;
import com.st.batch.listener.StepCompletionNotificationListener;
import com.st.batch.mapper.MemberInfoLineMapper;
import com.st.service.StartCountService;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.JsonLineMapper;
import org.springframework.batch.item.file.separator.JsonRecordSeparatorPolicy;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;

import javax.sql.DataSource;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;

@Configuration
@EnableBatchProcessing
public class MemberInfoStepConf {

    @Autowired
    public StepBuilderFactory stepBuilderFactory;

    @Autowired
    public DataSource dataSource;

    @Autowired
    StepCompletionNotificationListener steplistener;

    @Autowired
    StartCountService startCountService;


    @Bean
    public Step memberInfoStep() {
        return stepBuilderFactory.get("memberInfoStep")
                .allowStartIfComplete(true)
                .listener(steplistener)
                .<MemberInfo, MemberInfo> chunk(10)
                .reader(memberInfoReader())
                .writer(writer())
                .build();
    }

    @Bean
    @StepScope
    public FlatFileItemReader<MemberInfo> memberInfoReader() {

        //建立Reader
        FlatFileItemReader<MemberInfo> reader = new FlatFileItemReader<MemberInfo>();
        //加載Resource
        reader.setResource(new ClassPathResource("classpath:").createRelative("IF_MemberInfo-"+new SimpleDateFormat("yyyy-MM-dd").format(new Date().getTime() - 40*60*1000)+".txt"));
        //查詢起始執行行數
        HashMap parm = new HashMap();
        parm.put("step", "memberInfoStep");
        parm.put("date",new SimpleDateFormat("yyyy-MM-dd").format(new Date(new Date().getTime() - 40*60*1000)));
        reader.setLinesToSkip(startCountService.getCount(parm) == null ? 0 :startCountService.getCount(parm));
        reader.setRecordSeparatorPolicy(new JsonRecordSeparatorPolicy());
        reader.setLineMapper(new MemberInfoLineMapper(new JsonLineMapper()));
        return reader;
    }

    @Bean
    public JdbcBatchItemWriter<MemberInfo> writer() {
        JdbcBatchItemWriter<MemberInfo> writer = new JdbcBatchItemWriter<MemberInfo>();
        writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<MemberInfo>());
        writer.setSql("INSERT INTO memberInfo (" +
                "BrandCode,IFMemberId,MemberCode,MemName,Gender,MobilePhone,Email,IdentityCard,BirthDay,CounterCodeBelong,JoinDate,JoinTime,TotalPoint,MemberLevelCode,DataSource) " +
                "VALUES (:brandCode,:iFMemberId,:memberCode,:memName,:gender,:mobilePhone,:email,:identityCard,:birthDay,:counterCodeBelong,:joinDate,:joinTime,:totalPoint,:memberLevelCode,:dataSource)");
        writer.setDataSource(dataSource);
        return writer;
    }
}

 其中reader.setLinesToSkip()方法爲設置起始行數,是經過查詢下圖中表所得到,圖中batch前綴的表爲yml中第二項配置所配置,都是batch執行中的狀態信息,

 

 中的READ_COUNT來實現的,注意reader上必須使用@StepScope註解json

四、LineMapperspringboot

 

package com.st.batch.mapper;

import com.st.batch.entity.MemberInfo;
import com.st.util.MapUtils;
import org.springframework.batch.item.file.LineMapper;
import org.springframework.batch.item.file.mapping.JsonLineMapper;

/**
 * Created by admin on 2016/12/29.
 */
public class MemberInfoLineMapper implements LineMapper<MemberInfo> {

    private JsonLineMapper delegate;
    @Override
    public MemberInfo mapLine(String line, int lineNumber) throws Exception {
        return MapUtils.toObject(MemberInfo.class,delegate.mapLine(line, lineNumber),true);
//將每條對應信息轉化爲領域對象的工具類
    }

    public MemberInfoLineMapper(JsonLineMapper delegate) {
        this.delegate = delegate;
    }

    public JsonLineMapper getDelegate() {
        return delegate;
    }

    public void setDelegate(JsonLineMapper delegate) {
        this.delegate = delegate;
    }
}

 五、定時調用jobmybatis

 

package com.st.scheduled;


import com.st.batch.listener.JobCompletionNotificationListener;
import com.st.service.StartCountService;
import com.st.util.SpringContextUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Configurable;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
 * Created by sxm on 2016/10/14.
 */
@Component
@Configurable
@EnableScheduling
public class ScheduledTask {
    private static final Logger log = LoggerFactory.getLogger(JobCompletionNotificationListener.class);

    @Autowired
    StartCountService startCountService;

    @Scheduled(cron = "0 0/1 * * * ?")
    public void reportCurrentTime() {
        JobLauncher launcher = SpringContextUtil.getBean(JobLauncher.class);
        Job importUserJob = SpringContextUtil.getBean("MFJob");
        JobParameters jobParameters = new JobParametersBuilder()
                .addDate("date", new Date()).toJobParameters();
        try {
            launcher.run(importUserJob, jobParameters);
            log.info("批處理任務執行完成,date:"+new Date());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

 IF_MemberInfo-2016-12-30.txt      json格式app

 

{"BrandCode":"MF","IFMemberId":"1267266","MemberCode":"13489568093","MemName":"\u5927\u5927\u5927\u6a58\u5b50458","Gender":"0","MobilePhone":"13489568093","Email":" ","IdentityCard":"","BirthYear":"","BirthDay":"    ","CounterCodeBelong":"","BaCodeBelong":" ","JoinDate":"2016-12-28","JoinTime":"          ","TotalPoint":"","MemberLevelCode":"WMLC003","DataSource":"TB"}
{"BrandCode":"MF","IFMemberId":"1267265","MemberCode":"13840017311","MemName":"\u6768\u96e8\u6615","Gender":"2","MobilePhone":"13840017311","Email":" ","IdentityCard":"","BirthYear":"1999","BirthDay":"0806","CounterCodeBelong":"mf0sy003","BaCodeBelong":"14129994","JoinDate":"2016-12-28","JoinTime":"12:36:30  ","TotalPoint":"802","MemberLevelCode":"WMLC002","DataSource":"POS3"}
{"BrandCode":"MF","IFMemberId":"1267264","MemberCode":"18210648271","MemName":"","Gender":"","MobilePhone":"18210648271","Email":"","IdentityCard":"","BirthYear":"","BirthDay":"","CounterCodeBelong":"WEIXIN","BaCodeBelong":"","JoinDate":"2016-12-28","JoinTime":"","TotalPoint":"","MemberLevelCode":"WMLC003","DataSource":"wechat"}
{"BrandCode":"MF","IFMemberId":"1267263","MemberCode":"18753740991","MemName":"","Gender":"","MobilePhone":"18753740991","Email":"","IdentityCard":"","BirthYear":"","BirthDay":"","CounterCodeBelong":"WEIXIN","BaCodeBelong":"","JoinDate":"2016-12-28","JoinTime":"","TotalPoint":"","MemberLevelCode":"WMLC003","DataSource":"wechat"}
{"BrandCode":"MF","IFMemberId":"1267262","MemberCode":"13918726271","MemName":"","Gender":"","MobilePhone":"13918726271","Email":"","IdentityCard":"","BirthYear":"","BirthDay":"","CounterCodeBelong":"WEIXIN","BaCodeBelong":"","JoinDate":"2016-12-28","JoinTime":"","TotalPoint":"","MemberLevelCode":"WMLC003","DataSource":"wechat"}
{"BrandCode":"MF","IFMemberId":"1267261","MemberCode":"15533079902","MemName":"","Gender":"","MobilePhone":"15533079902","Email":"","IdentityCard":"","BirthYear":"","BirthDay":"","CounterCodeBelong":"WEIXIN","BaCodeBelong":"","JoinDate":"2016-12-28","JoinTime":"","TotalPoint":"","MemberLevelCode":"WMLC003","DataSource":"wechat"}
{"BrandCode":"MF","IFMemberId":"1267260","MemberCode":"18213506880","MemName":"\u9a6c\u5c0f\u59d0","Gender":"2","MobilePhone":"18213506880","Email":" ","IdentityCard":"","BirthYear":"1990","BirthDay":"0625","CounterCodeBelong":"MF0KM003","BaCodeBelong":"16108991","JoinDate":"2016-12-28","JoinTime":"12:14:23  ","TotalPoint":"804","MemberLevelCode":"WMLC002","DataSource":"MPOS"}
{"BrandCode":"MF","IFMemberId":"1267259","MemberCode":"15295502603","MemName":"","Gender":"","MobilePhone":"15295502603","Email":"","IdentityCard":"","BirthYear":"","BirthDay":"","CounterCodeBelong":"WEIXIN","BaCodeBelong":"","JoinDate":"2016-12-28","JoinTime":"","TotalPoint":"","MemberLevelCode":"WMLC003","DataSource":"wechat"}
{"BrandCode":"MF","IFMemberId":"1265714","MemberCode":"18539039009","MemName":"\u6881\u4fca\u971e","Gender":"2","MobilePhone":"18539039009","Email":"","IdentityCard":"","BirthYear":"","BirthDay":"","CounterCodeBelong":"MF0TMALL","BaCodeBelong":"99999998","JoinDate":"2016-12-26","JoinTime":"","TotalPoint":"0","MemberLevelCode":"WMLC003","DataSource":"bycsv"}
{"BrandCode":"MF","IFMemberId":"1262436","MemberCode":"13751786171","MemName":"\u674e\u6631\u84d3","Gender":"2","MobilePhone":"13751786171","Email":"","IdentityCard":"","BirthYear":"","BirthDay":"","CounterCodeBelong":"MF0TMALL","BaCodeBelong":"99999998","JoinDate":"2016-12-23","JoinTime":"","TotalPoint":"0","MemberLevelCode":"WMLC003","DataSource":"bycsv"}
{"BrandCode":"MF","IFMemberId":"1262436","MemberCode":"13751786171","MemName":"\u674e\u6631\u84d3","Gender":"2","MobilePhone":"13751786171","Email":"","IdentityCard":"","BirthYear":"","BirthDay":"","CounterCodeBelong":"MF0TMALL","BaCodeBelong":"99999998","JoinDate":"2016-12-23","JoinTime":"","TotalPoint":"0","MemberLevelCode":"WMLC003","DataSource":"bycsv"}

 這只是項目起始的一個Demo,以後準備定義統一的Reader,並將writer中的模板方式插入數據庫改成經常使用的mybatis方式,ide