Spring Batch學習記錄(一)

背景:以前看了公司的數據導入平臺,導入用戶訂單記錄,使用的是SpringBoot 自帶的Scheduled 任務調度功能,而後經過發郵件通知開發人員任務調度是否成功。總體上來講,功能是支持現有業務,可是開發人員不可以清晰的瞭解任務運行的情況且任務調度也不夠靈活。原本想本身開發一套,想到本身可能考慮的不夠周全,致使開發進度慢,每當有新改動會傷筋動骨,因此,我從網上找到了Spring Batch,首先看下SpringBatch的官網介紹:html

Spring Batch是一個輕量級,全面的批處理框架,旨在開發對企業系統平常運營相當重要的強大批處理應用程序。Spring Batch構建了人們指望的Spring Framework特性(生產力,基於POJO的開發方法和通常易用性),同時使開發人員能夠在必要時輕鬆訪問和利用更高級的企業服務。Spring Batch不是一個調度框架。商業和開源領域都有許多優秀的企業調度程序(例如Quartz,Tivoli,Control-M等)。它旨在與調度程序一塊兒使用,而不是替換調度程序。java

Spring Batch提供了可重複使用的功能,這些功能對於處理大量記錄相當重要,包括記錄/跟蹤,事務管理,做業處理統計,做業重啓,跳過和資源管理。它還提供更高級的技術服務和功能,經過優化和分區技術實現極高容量和高性能的批處理做業。Spring Batch可用於兩種簡單的用例(例如將文件讀入數據庫或運行存儲過程)以及複雜的大量用例(例如在數據庫之間移動大量數據,轉換它等等)上)。大批量批處理做業能夠高度可擴展的方式利用該框架來處理大量信息。mysql

Spring Batch更多偏向於Job的配置,調度的話暫時不急於細看,以後會去專門研究一下Quartz等企業調度程序。git

ATP:咱們先不細緻的去研究Spring Batch具體實現,先從一個小Demo開始入手spring

工具軟件:sql

IDEA:2018.2 數據庫

Java: 1.8.0_171數組

Gradle: 4.10.2app

Demo地址:https://gitee.com/leonchen21/SpringBatchDemo/tree/SpringBatchDemo_01框架

 

1、建立一個SpringBoot Gradle項目

SpringBootVersion:2.1.6

添加開發支持,如圖所示

imageimage

image

首先進行Spring Batch的配置,建立一個配置類,以下

package person.leon.batch.springbatchdemo.config;

import org.springframework.batch.core.configuration.annotation.DefaultBatchConfigurer;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.explore.support.JobExplorerFactoryBean;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;

import javax.sql.DataSource;

/**
 * 自定義Batch基礎配置
 * 
 * @author leon
 * @since 2019/6/28 15:41
 */
@Configuration
public class BatchConfigurerConfig extends DefaultBatchConfigurer {

    @Autowired
    private DataSource dataSource;

    @Override
    public JobLauncher createJobLauncher() throws Exception {
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(getJobRepository());
        //做業異步執行 針對http請求調用
//        jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
        jobLauncher.afterPropertiesSet();


        return jobLauncher;
    }

    @Override
    public JobRepository createJobRepository() throws Exception {
        JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
        factory.setDataSource(dataSource);
        factory.setTransactionManager(getTransactionManager());
        factory.setIsolationLevelForCreate("ISOLATION_SERIALIZABLE");
        factory.setTablePrefix("BATCH_");
        factory.setMaxVarCharLength(1000);
        return factory.getObject();
    }

    @Override
    public JobExplorer createJobExplorer() throws Exception {
        JobExplorerFactoryBean jobExplorerFactoryBean = new JobExplorerFactoryBean();
        jobExplorerFactoryBean.setDataSource(dataSource);
        jobExplorerFactoryBean.afterPropertiesSet();
        return jobExplorerFactoryBean.getObject();
    }

}

而後在application.propertities文件中添加如下配置

# SpringBatch 自動執行建表語句
spring.batch.initialize-schema=ALWAYS

SpringBatch框架自身也須要一些表支持,因此這裏須要咱們進行配置,讓框架自行建立,這裏還須要再加一些默認數據庫的配置。

spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/batchdemo?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=CONVERT_TO_NULL&allowMultiQueries=true&useSSL=false&autoReconnect=true&failOverReadOnly=false&serverTimezone=Asia/Shanghai
spring.datasource.username=root
spring.datasource.password=123456

2、第一個Job配置

ATP:Job配置能夠用xml文件形式配置,本文暫只用JavaConfig配置

現在,咱們有一個任務就是將一個csv文件的數據導入進入mysql數據庫中。

csv文件位置以下圖所示

image

mysql數據庫暫時就用上面默認配置的數據庫,正常應該會分庫處理。

一、建立ImportProvinceJobConfig類

import org.springframework.context.annotation.Configuration;
import org.springframework.beans.factory.annotation.Autowired;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
 
 
@Configuration
@EnableBatchProcessing
@Slf4j
public class ImportProvinceJobConfig {
    @Autowired
public JobBuilderFactory jobBuilderFactory;

@Autowired
public StepBuilderFactory stepBuilderFactory;
}

@Configuration 聲明該類是配置類

@EnableBatchProcessing 自動幫你補全一些重要的有關batch工做時的屬性依賴,若是不聲明,則jobBuilderFactory和stepBuilderFactory會報錯

@Slf4j Lombok的日誌註解,至關於向ImportProvinceJobConfig 類中添加該屬性private static final Logger log = LoggerFactory.getLogger(ImportProvinceJobConfig.class);

JobBuilderFactory 用於建立Job實例

StepBuilderFactory 用於建立Step實例

二、讀取

從csv文件中讀數據,代碼以下

import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.core.io.ClassPathResource;
 
 
    /**
* 從文件讀入數據
*/
@Bean
public FlatFileItemReader<Province> reader() {
return new FlatFileItemReaderBuilder<Province>()
//定義讀取實例的名稱
.name("provinceItemReader")
//在嚴格模式下,ExecutionContext若是輸入資源不存在,
//則reader會拋出異常。不然,它會記錄問題並繼續。
//默認爲true
.strict(true)
//源文件每行數據分隔符類型 分隔符爲「,」
// .lineTokenizer(new DelimitedLineTokenizer())
//定義解析字符
.encoding(StandardCharsets.UTF_8.name())
//定義資源(讀取數據來源)
//Spring資源文檔 https://docs.spring.io/spring/docs/current/spring-framework-reference/core.html#resources
.resource(new ClassPathResource("provinces.csv"))
//文件頂部忽略的行數
.linesToSkip(0)
//linesToSkip設置爲2,則調用此handler接口兩次
//.skippedLinesCallback(handler)
.delimited()
//定義字段對應
.names(provinceSet)
//轉換爲對象
.fieldSetMapper(new ProvinceFieldSetMapper())
.build();
}

ATP:Flat File是一種包含沒有相對關係結構的記錄的文件(作用逗號分隔數值(CSV)的文件)

FlatFileItemReader 該類提供了讀取和解析Flat File的基本功能,該類核心 在於ResourceLineMapper

其中Resource: Spring Core Resource 相關文檔 LineMapper:給定當前行和與之關聯的行號,映射器應返回結果域對象

本代碼中LineMapper 具體實現 由DelimitedLineTokenizer(默認) 和 ProvinceFieldSetMapper實現

DelimitedLineTokenizer 功能是將csv文件中每行文件以逗號隔開生成String數組

ProvinceFieldSetMapper 是由咱們自行定義,代碼以下

import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.FieldSet;
import org.springframework.validation.BindException;
import person.leon.batch.springbatchdemo.entity.Province;

/**
* 從文件讀取 轉換爲對象
*
* @author: Leon
* @date: 2019/6/29 17:24
*/
public class ProvinceFieldSetMapper implements FieldSetMapper<Province> {

@Override
public Province mapFieldSet(FieldSet fieldSet) throws BindException {
Province province = new Province();
province.setId(fieldSet.readLong("id"));
province.setCreateTime(fieldSet.readDate("createTime", "yyyy-MM-dd HH:mm:ss"));
province.setUpdateTime(fieldSet.readDate("updateTime", "yyyy-MM-dd HH:mm:ss"));
province.setProvinceId(fieldSet.readString("provinceId"));
province.setProvinceName(fieldSet.readString("provinceName"));
province.setDisplay(fieldSet.readBoolean("display"));
province.setApkChannel(fieldSet.readString("apkChannel"));
province.setBsChannel(fieldSet.readString("bsChannel"));
return province;
}
}
 
 
 
 


import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import java.util.Date;

/**
* 省份信息
*
* @author leon
* @date 2019/4/25 11:09
*/
@Data
public class Province {

private Long id;
private Date createTime;
private Date updateTime;
private String provinceId;
private String provinceName;
private boolean display;
private String apkChannel;
private String bsChannel;

}

該類實現接口FieldSetMapper的mapFieldSet方法,其中FieldSet爲csv文件中讀取出來的字符串數組

該類的主要做用在於將讀取出來的字符創轉換爲對象。

三、轉換

將讀取數據生成的對象進行轉換,由於沒有特別的要求,本次直接返回原對象

@Bean
public ProvinceItemProcessor processor() {
return new ProvinceItemProcessor();
}

/**
* 轉換
*/
private class ProvinceItemProcessor implements ItemProcessor<Province, Province> {
@Override
public Province process(Province item) throws Exception {
return item;
}
}

四、寫入

/**
* 插入語句
*/
private String insertSql = "INSERT INTO province " +
" (id, create_time, update_time, province_id, province_name, display, apk_channel, bs_channel) " +
" VALUES (" +
":id, :createTime, :updateTime, :provinceId, :provinceName, :display, :apkChannel, :bsChannel)";

 

/**
* 寫入數據庫
*
* @param dataSource
* @return
*/
@Bean
public JdbcBatchItemWriter<Province> writer(DataSource dataSource) {
return new JdbcBatchItemWriterBuilder<Province>()
.itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
.sql(insertSql)
.dataSource(dataSource)
.build();
}

JdbcBatchItemWriter 使用批處理功能 NamedParameterJdbcTemplate來執行全部提供的項目的批處理語句

五、構建Job和Step

@Bean
@Qualifier("importProvinceJob")
public Job importProvinceJob(
JobAroundListener listener,
@Qualifier("importProvinceStep") Step step1) {
return jobBuilderFactory.get("importProvinceJob")
// 配置Job不支持再次啓動(此時從新啓動會拋出JobRestartException異常)
// 默認支持從新啓動
//.preventRestart()
// job參數聲明驗證器
.validator(new DefaultJobParametersValidator())
.incrementer(new RunIdIncrementer())
.listener(listener)
.flow(step1)
.end()
.build();
}

@Bean
@Qualifier("importProvinceStep")
public Step step1(JdbcBatchItemWriter<Province> writer) {
return stepBuilderFactory.get("importProvinceStep")
.<Province, Province>chunk(10)
.reader(reader())
.processor(processor())
.writer(writer)
.build();
}
配置Job實例Bean和Step實例Bean,Job跟Step是一對多的關係。

2、運行Job

兩種運行方式:

1)啓動SpringBoot項目,會自動執行Job;

2)經過http請求調用Job。

    爲了讓項目啓動時不執行Job,在application.properties中添加配置

spring.batch.job.enabled=false

    在瀏覽地址欄輸入localhost:8080/run

驗證執行結果,鏈接上mysql數據庫,查詢表province,查看有沒有數據,其次查看SpringBatch相關表中有沒有數據。

相關文章
相關標籤/搜索