SpringBoot整合SpringBatch實用簡例

SpringBatch主要是一個輕量級的大數據量的並行處理(批處理)的框架。html

做用和Hadoop很類似,不過Hadoop是基於重量級的分佈式環境(處理巨量數據),而SpringBatch是基於輕量的應用框架(處理中小數據)。java

這裏使用SpringBatch作了一個能跑的最簡單例子,進行描述SpringBatch的基本做用。mysql

若是須要進行深刻學習,請詳細參考閱讀 https://docs.spring.io/spring-batch/4.0.x/reference/html/index.html ;英文很差的同窗,請和我同樣右鍵(翻譯成中文查看)。git

簡單的技術棧 : SpringBoot + SpringBatch + JPA , 完整demo的項目地址 : https://github.com/EalenXie/springboot-batchgithub

1 . 新建項目springboot-batch,基本的pom.xml依賴 : spring

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>name.ealen</groupId>
    <artifactId>springboot-batch</artifactId>
    <version>1.0</version>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.1.RELEASE</version>
    </parent>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-batch</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <scope>runtime</scope>
        </dependency>
    </dependencies>
</project>

2 . 你須要在數據庫中創建springbatch的相關元數據表,因此你須要在數據庫中執行以下來自官方元數據模式的腳本。sql

-- do not edit this file
-- BATCH JOB 實例表 包含與aJobInstance相關的全部信息 -- JOB ID由batch_job_seq分配 -- JOB 名稱,與spring配置一致 -- JOB KEY 對job參數的MD5編碼,正由於有這個字段的存在,同一個job若是第一次運行成功,第二次再運行會拋出JobInstanceAlreadyCompleteException異常。
CREATE TABLE BATCH_JOB_INSTANCE ( JOB_INSTANCE_ID BIGINT  NOT NULL PRIMARY KEY , VERSION BIGINT , JOB_NAME VARCHAR(100) NOT NULL, JOB_KEY VARCHAR(32) NOT NULL, constraint JOB_INST_UN unique (JOB_NAME, JOB_KEY) ) ENGINE=InnoDB; -- 該BATCH_JOB_EXECUTION表包含與該JobExecution對象相關的全部信息
CREATE TABLE BATCH_JOB_EXECUTION ( JOB_EXECUTION_ID BIGINT  NOT NULL PRIMARY KEY , VERSION BIGINT , JOB_INSTANCE_ID BIGINT NOT NULL, CREATE_TIME DATETIME NOT NULL, START_TIME DATETIME DEFAULT NULL , END_TIME DATETIME DEFAULT NULL , STATUS VARCHAR(10) , EXIT_CODE VARCHAR(2500) , EXIT_MESSAGE VARCHAR(2500) , LAST_UPDATED DATETIME, JOB_CONFIGURATION_LOCATION VARCHAR(2500) NULL, constraint JOB_INST_EXEC_FK foreign key (JOB_INSTANCE_ID) references BATCH_JOB_INSTANCE(JOB_INSTANCE_ID) ) ENGINE=InnoDB; -- 該表包含與該JobParameters對象相關的全部信息
CREATE TABLE BATCH_JOB_EXECUTION_PARAMS ( JOB_EXECUTION_ID BIGINT NOT NULL , TYPE_CD VARCHAR(6) NOT NULL , KEY_NAME VARCHAR(100) NOT NULL , STRING_VAL VARCHAR(250) , DATE_VAL DATETIME DEFAULT NULL , LONG_VAL BIGINT , DOUBLE_VAL DOUBLE PRECISION , IDENTIFYING CHAR(1) NOT NULL , constraint JOB_EXEC_PARAMS_FK foreign key (JOB_EXECUTION_ID) references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID) ) ENGINE=InnoDB; -- 該表包含與該StepExecution 對象相關的全部信息
CREATE TABLE BATCH_STEP_EXECUTION ( STEP_EXECUTION_ID BIGINT  NOT NULL PRIMARY KEY , VERSION BIGINT NOT NULL, STEP_NAME VARCHAR(100) NOT NULL, JOB_EXECUTION_ID BIGINT NOT NULL, START_TIME DATETIME NOT NULL , END_TIME DATETIME DEFAULT NULL , STATUS VARCHAR(10) , COMMIT_COUNT BIGINT , READ_COUNT BIGINT , FILTER_COUNT BIGINT , WRITE_COUNT BIGINT , READ_SKIP_COUNT BIGINT , WRITE_SKIP_COUNT BIGINT , PROCESS_SKIP_COUNT BIGINT , ROLLBACK_COUNT BIGINT , EXIT_CODE VARCHAR(2500) , EXIT_MESSAGE VARCHAR(2500) , LAST_UPDATED DATETIME, constraint JOB_EXEC_STEP_FK foreign key (JOB_EXECUTION_ID) references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID) ) ENGINE=InnoDB; -- 該BATCH_STEP_EXECUTION_CONTEXT表包含ExecutionContext與Step相關的全部信息
CREATE TABLE BATCH_STEP_EXECUTION_CONTEXT ( STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY, SHORT_CONTEXT VARCHAR(2500) NOT NULL, SERIALIZED_CONTEXT TEXT , constraint STEP_EXEC_CTX_FK foreign key (STEP_EXECUTION_ID) references BATCH_STEP_EXECUTION(STEP_EXECUTION_ID) ) ENGINE=InnoDB; -- 該表包含ExecutionContext與Job相關的全部信息
CREATE TABLE BATCH_JOB_EXECUTION_CONTEXT ( JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY, SHORT_CONTEXT VARCHAR(2500) NOT NULL, SERIALIZED_CONTEXT TEXT , constraint JOB_EXEC_CTX_FK foreign key (JOB_EXECUTION_ID) references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID) ) ENGINE=InnoDB; CREATE TABLE BATCH_STEP_EXECUTION_SEQ ( ID BIGINT NOT NULL, UNIQUE_KEY CHAR(1) NOT NULL, constraint UNIQUE_KEY_UN unique (UNIQUE_KEY) ) ENGINE=InnoDB; INSERT INTO BATCH_STEP_EXECUTION_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, '0' as UNIQUE_KEY) as tmp where not exists(select * from BATCH_STEP_EXECUTION_SEQ); CREATE TABLE BATCH_JOB_EXECUTION_SEQ ( ID BIGINT NOT NULL, UNIQUE_KEY CHAR(1) NOT NULL, constraint UNIQUE_KEY_UN unique (UNIQUE_KEY) ) ENGINE=InnoDB; INSERT INTO BATCH_JOB_EXECUTION_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, '0' as UNIQUE_KEY) as tmp where not exists(select * from BATCH_JOB_EXECUTION_SEQ); CREATE TABLE BATCH_JOB_SEQ ( ID BIGINT NOT NULL, UNIQUE_KEY CHAR(1) NOT NULL, constraint UNIQUE_KEY_UN unique (UNIQUE_KEY) ) ENGINE=InnoDB; INSERT INTO BATCH_JOB_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, '0' as UNIQUE_KEY) as tmp where not exists(select * from BATCH_JOB_SEQ);

3 . 測試數據的實體類 : Access.java數據庫

package name.ealen.model; import javax.persistence.*; /** * Created by EalenXie on 2018/9/10 16:17. */ @Entity @Table public class Access { @Id @GeneratedValue(strategy = GenerationType.AUTO) private Integer id; private String username; private String shopName; private String categoryName; private String brandName; private String shopId; private String omit; private String updateTime; private boolean deleteStatus; private String createTime; private String description; public Integer getId() { return id; } public void setId(Integer id) { this.id = id; } public String getUsername() { return username; } public void setUsername(String username) { this.username = username; } public String getShopName() { return shopName; } public void setShopName(String shopName) { this.shopName = shopName; } public String getCategoryName() { return categoryName; } public void setCategoryName(String categoryName) { this.categoryName = categoryName; } public String getBrandName() { return brandName; } public void setBrandName(String brandName) { this.brandName = brandName; } public String getShopId() { return shopId; } public void setShopId(String shopId) { this.shopId = shopId; } public String getOmit() { return omit; } public void setOmit(String omit) { this.omit = omit; } public String getUpdateTime() { return updateTime; } public void setUpdateTime(String updateTime) { this.updateTime = updateTime; } public boolean isDeleteStatus() { return deleteStatus; } public void setDeleteStatus(boolean deleteStatus) { this.deleteStatus = deleteStatus; } public String getCreateTime() { return createTime; } public void setCreateTime(String createTime) { this.createTime = createTime; } public String getDescription() { return description; } public void setDescription(String description) { this.description = description; } @Override public String toString() { return "Access{" +
                "id=" + id +
                ", username='" + username + '\'' +
                ", shopName='" + shopName + '\'' +
                ", categoryName='" + categoryName + '\'' +
                ", brandName='" + brandName + '\'' +
                ", shopId='" + shopId + '\'' +
                ", omit='" + omit + '\'' +
                ", updateTime='" + updateTime + '\'' +
                ", deleteStatus=" + deleteStatus +
                ", createTime='" + createTime + '\'' +
                ", description='" + description + '\'' +
                '}'; } }

4 . 配置一個最簡單的Job 以前,準備一些基本配置,例如爲Job添加一個監聽器 : apache

  配置TaskExecutor,ExecutorConfiguration.javaspringboot

package name.ealen.config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; /** * 配置TaskExecutor */ @Configuration public class ExecutorConfiguration { @Bean public ThreadPoolTaskExecutor threadPoolTaskExecutor() { ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor(); threadPoolTaskExecutor.setCorePoolSize(50); threadPoolTaskExecutor.setMaxPoolSize(200); threadPoolTaskExecutor.setQueueCapacity(1000); threadPoolTaskExecutor.setThreadNamePrefix("Data-Job"); return threadPoolTaskExecutor; } }

  爲Job準備一個簡單的監聽器 ,實現JobExecutionListener便可 : 

package name.ealen.listener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.core.BatchStatus; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobExecutionListener; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import javax.annotation.Resource; /** * Created by EalenXie on 2018/9/10 15:09. * 一個簡單的JOB listener */ @Component public class JobListener implements JobExecutionListener { private static final Logger log = LoggerFactory.getLogger(JobListener.class); @Resource private ThreadPoolTaskExecutor threadPoolTaskExecutor; private long startTime; @Override public void beforeJob(JobExecution jobExecution) { startTime = System.currentTimeMillis(); log.info("job before " + jobExecution.getJobParameters()); } @Override public void afterJob(JobExecution jobExecution) { log.info("JOB STATUS : {}", jobExecution.getStatus()); if (jobExecution.getStatus() == BatchStatus.COMPLETED) { log.info("JOB FINISHED"); threadPoolTaskExecutor.destroy(); } else if (jobExecution.getStatus() == BatchStatus.FAILED) { log.info("JOB FAILED"); } log.info("Job Cost Time : {}ms" , (System.currentTimeMillis() - startTime)); } }

5 . 配置一個最基本的Job : 一個Job 一般由一個或多個Step組成(基本就像是一個工做流);一個Step一般由三部分組成(讀入數據 ItemReader,處理數據 ItemProcessor,寫入數據 ItemWriter)

package name.ealen.batch; import name.ealen.listener.JobListener; import name.ealen.model.Access; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.launch.support.RunIdIncrementer; import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemWriter; import org.springframework.batch.item.database.JpaPagingItemReader; import org.springframework.batch.item.database.orm.JpaNativeQueryProvider; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import javax.annotation.Resource; import javax.persistence.EntityManagerFactory; /** * Created by EalenXie on 2018/9/10 14:50. * :@EnableBatchProcessing提供用於構建批處理做業的基本配置 */ @Configuration @EnableBatchProcessing public class DataBatchConfiguration { private static final Logger log = LoggerFactory.getLogger(DataBatchConfiguration.class); @Resource private JobBuilderFactory jobBuilderFactory;    //用於構建JOB
 @Resource private StepBuilderFactory stepBuilderFactory;  //用於構建Step
 @Resource private EntityManagerFactory emf;           //注入實例化Factory 訪問數據
 @Resource private JobListener jobListener;            //簡單的JOB listener

    /** * 一個簡單基礎的Job一般由一個或者多個Step組成 */ @Bean public Job dataHandleJob() { return jobBuilderFactory.get("dataHandleJob"). incrementer(new RunIdIncrementer()). start(handleDataStep()). //start是JOB執行的第一個step // next(xxxStep()). // next(xxxStep()). // ...
        listener(jobListener).      //設置了一個簡單JobListener
 build(); } /** * 一個簡單基礎的Step主要分爲三個部分 * ItemReader : 用於讀取數據 * ItemProcessor : 用於處理數據 * ItemWriter : 用於寫數據 */ @Bean public Step handleDataStep() { return stepBuilderFactory.get("getData"). <Access, Access>chunk(100).        // <輸入,輸出> 。chunk通俗的講相似於SQL的commit; 這裏表示處理(processor)100條後寫入(writer)一次。
                faultTolerant().retryLimit(3).retry(Exception.class).skipLimit(100).skip(Exception.class). //捕捉到異常就重試,重試100次仍是異常,JOB就中止並標誌失敗
                reader(getDataReader()).         //指定ItemReader
                processor(getDataProcessor()).   //指定ItemProcessor
                writer(getDataWriter()).         //指定ItemWriter
 build(); } @Bean public ItemReader<? extends Access> getDataReader() { //讀取數據,這裏能夠用JPA,JDBC,JMS 等方式 讀入數據
        JpaPagingItemReader<Access> reader = new JpaPagingItemReader<>(); //這裏選擇JPA方式讀數據 一個簡單的 native SQL
        String sqlQuery = "SELECT * FROM access"; try { JpaNativeQueryProvider<Access> queryProvider = new JpaNativeQueryProvider<>(); queryProvider.setSqlQuery(sqlQuery); queryProvider.setEntityClass(Access.class); queryProvider.afterPropertiesSet(); reader.setEntityManagerFactory(emf); reader.setPageSize(3); reader.setQueryProvider(queryProvider); reader.afterPropertiesSet(); //全部ItemReader和ItemWriter實現都會在ExecutionContext提交以前將其當前狀態存儲在其中,若是不但願這樣作,能夠設置setSaveState(false)
            reader.setSaveState(true); } catch (Exception e) { e.printStackTrace(); } return reader; } @Bean public ItemProcessor<Access, Access> getDataProcessor() { return new ItemProcessor<Access, Access>() { @Override public Access process(Access access) throws Exception { log.info("processor data : " + access.toString());  //模擬 僞裝處理數據,這裏處理就是打印一下
                return access; } }; // lambda也能夠寫爲: // return access -> { // log.info("processor data : " + access.toString()); // return access; // };
 } @Bean public ItemWriter<Access> getDataWriter() { return list -> { for (Access access : list) { log.info("write data : " + access); //模擬 僞裝寫數據 ,這裏寫真正寫入數據的邏輯
 } }; } }

6 . 配置好基本的Job以後,爲Access表導入一些基本的數據(git上面有demo數據,access.sql),寫一個SpringBoot的啓動類進行測試。

  注意 : Job中的各個組件請使用@Bean註解聲明,這樣在元數據中才會有相應的正常操做記錄 : 

package name.ealen; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; /** * Created by EalenXie on 2018/9/10 14:41. */ @SpringBootApplication public class SpringBatchApplication { public static void main(String[] args) { SpringApplication.run(SpringBatchApplication.class, args); } }

7 . 運行能夠看到基本數據處理效果,這裏是模擬處理,和模擬寫入 : 

8 . 從元數據等表中查看驗證JOB的執行狀況 : 

  

這裏提一下,以前寫過一篇SpringBoot+Quartz的整合, 你們應該想到些什麼了吧。SpringBatch像是一個自然的Job,Quartz是徹底能夠作爲它運做的調度器。二者結合,效果很不錯。

感謝各位提出意見和支持。

相關文章
相關標籤/搜索