tags: springbatchhtml
上一篇文章《快速瞭解組件-spring batch(2)之helloworld》對Spring Batch
進行了入門級的開發,也對基本的組件有了必定的瞭解。但實際開發過程當中,更多的是涉及文件及數據庫的操做,以定時後臺運行的方式,實現批處理操做。典型操做是從文本數據(csv/txt
等文件)中讀取數據,而後寫入到數據庫存儲。以下圖所示:java
若須要開發此過程,能夠按照上一篇文章所寫的,自定義ItemReader
和ItemWriter
來實現,可是Spring Batch
其實已經提供現成的文件讀取和數據庫寫入的組件,開發人員能夠直接使用,提升開發效率。本文將會對文件讀取和數據庫寫入進行實戰介紹。mysql
在使用Spring Batch
內置的讀寫組件時,首先咱們先弄清楚有哪些組件能夠用,按讀、寫、處理,見下面說明。Spring Batch
已提供了比較全面的支持。git
ItemReader | 說明 |
---|---|
ListItemReader | 讀取List類型數據,只能讀一次 |
ItemReaderAdapter | ItemReader適配器,能夠複用現有的讀操做 |
FlatFileItemReader | 讀Flat類型文件 |
StaxEventItemReader | 讀XML類型文件 |
JdbcCursorItemReader | 基於JDBC遊標方式讀數據庫 |
HibernateCursorItemReader | 基於Hibernate遊標方式讀數據庫 |
StoredProcedureItemReader | 基於存儲過程讀數據庫 |
JpaPagingItemReader | 基於Jpa方式分頁讀數據庫 |
JdbcPagingItemReader | 基於JDBC方式分頁讀數據庫 |
HibernatePagingItemReader | 基於Hibernate方式分頁讀取數據庫 |
JmsItemReader | 讀取JMS隊列 |
IteratorItemReader | 迭代方式的讀組件 |
MultiResourceItemReader | 多文件讀組件 |
MongoItemReader | 基於分佈式文件存儲的數據庫 MongoDB讀組件 |
Neo4jItemReader | 面向網絡的數據庫Neo4j的讀組件 |
ResourcesItemReader | 基於批量資源的讀組件,每次讀取返回資源對象 AmqpItemReader讀取AMQP隊列組件 |
RepositoryItemReader | 基於 Spring Data的讀組件 |
ItemWriter | 說明 |
---|---|
FlatFileItemWriter | 寫Flat類型文件 |
MultiResourceItemWriter | 多文件寫組件 |
StaxEventItemWriter | 寫XML類型文件 |
AmqpItemWriter | 寫AMQP類型消息 |
ClassifierCompositeItemWriter | 根據 Classifier路由不一樣的Item到特定的ItemWriter處理 |
HiberateItemWriter | 基於Hibernate方式寫數據庫 |
ItemWriterAdapter | ItemWriter適配器,能夠複用現有的寫服務 |
JdbcBatchItemWriter | 基於JDBC方式寫數據庫 |
JmsItemWriter | 寫JMS隊列 JpaItemWriter基於Jpa方式寫數據庫 |
GemfireItemWriter | 基於分佈式數據庫Gemfire的寫組件 |
SpELMappingGemfireItemWriter | 基於Spring表達式語言寫分佈式數據庫Gemfire的寫組件 |
MimeMessageItemWriter | 發送郵件的寫組件 |
MongoItemWriter | 基於分佈式文件存儲的數據庫MongoDB寫組件 |
Neo4jItemWriter | 面向網絡的數據庫Neo4j的讀組件 |
PropertyExtractingDelegatingItemWriter | 屬性抽取代理寫組件:經過調用給定的 Spring Bean方法執行寫入,參數由Item中指定的屬性字段獲取做爲參數 |
RepositoryItemWriter基於 | Spring Data的寫組件 |
SimpleMailMessageItemWriter | 發送郵件的寫組件 |
CompositeItemWriter | 條目寫的組合模式,支持組裝多個ItemWriter |
ItemProcessor | 說明 |
---|---|
CompositeItemProcessor | 組合處理器,能夠封裝多個業務處理服務 |
ItemProcessorAdapter | ItemProcessor適配器,能夠複用現有的業務處理服務 |
PassThroughItemProcessor | 不作任何業務處理,直接返回讀到的數據 |
ValidatingItemProcessor | 數據校驗處理器,支持對數據的校驗,若是校驗不經過能夠進行過濾掉或者經過skip的方式跳過對記錄的處理 |
根據當前示例,從csv
文件中讀數據,寫入到mysql
數據庫,只須要使用FlatFileItemReader
和JdbcBatchItemWriter
便可。下面對開發流程做簡要說明。示例工程能夠在這裏獲取,裏面有文件resources/user-data.csv
及相應的目標數據庫腳本mytest.sql
。github
Spring Batch
的運行須要數據庫的支持,以保存任務的運行狀態及結果。所以須要先建立數據庫。在mysql
中建立名爲my_spring_batch
的數據庫。並在此數據庫中執行 Spring Batch
的數據庫腳本,腳本位置在spring-batch-core-4.1.2.RELEASE.jar
的jar包中的\org\springframework\batch\core\schema-mysql.sql
(也能夠在示例工程的sql
文件夾中獲取)。執行完成後,數據庫表以下圖所示:spring
數據庫共9張表,以seq
結尾的是用於生成主鍵的。其它6張表,以batch_job
開頭的是存儲任務的相關信息,batch_step
開頭的存儲步驟相關信息。sql
job
與job instance
與job execution
關係 任務job
是咱們說的邏輯概念,即完整的一個批處理工做,它的實例就是job instance
,此任務信息是存儲在batch_job_instance
表中。有點相似java中的類和類實例的概念,是一對多的關係。對於每個job instance
,執行的時候會生成記錄存儲在batch_job_execution
中,表示每個job
執行的實際狀況。注意,這裏job instance
和job execution
也是一對多的關係,即同一個實例有可能會執行屢次(如上一次執行失敗了,後面從新再執行)。數據庫
batch_job_execution_context
及batch_job_execution_params
存儲任務執行時須要用到的上下文(以json
格式存儲)及運行時使用的參數。json
batch_step_execution
及batch_step_execution_context
存儲任務執行過程當中的做業步驟及運行時上下文。bash
本示例只涉及一個test_user
表。建立mytest
數據庫庫,執行mytest.sql
腳本便可。
通常來講,咱們會把Spring Batch
的數據存儲在獨立的數據庫中,而實際的應用使用的則是目標數據庫,所以須要配置多數據源訪問。基於上一篇文章的工程進行開發。
<!-- 數據庫相關依賴-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
複製代碼
Spring Boot
對多數據源的支持比較友好,配置也很簡單,先在配置文件中添加數據庫配置,而後在java配置文件中添加相應的註解便可。以下:
application.properties
配置內容# spring batch db
spring.datasource.jdbc-url=jdbc:mysql://localhost:3310/my_spring_batch?serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=utf8&useSSL=false
spring.datasource.username=root
spring.datasource.password=111111
# target db
spring.target-datasource.jdbc-url=jdbc:mysql://localhost:3310/mytest?serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=utf8&useSSL=false
spring.target-datasource.username=root
spring.target-datasource.password=111111
複製代碼
DataSourceConfig
配置內容 新建DataSourceConfig.java
文件,配置多數據源,以下:@Configuration
public class DataSourceConfig {
@Bean("datasource")
@ConfigurationProperties(prefix="spring.datasource")
@Primary
public DataSource batchDatasource() {
return DataSourceBuilder.create().build();
}
@Bean("targetDatasource")
@ConfigurationProperties(prefix="spring.target-datasource")
public DataSource targetDatasource() {
return DataSourceBuilder.create().build();
}
}
複製代碼
這樣,後面就能夠直接使用datasource
及targetDatasource
兩個Bean進行數據庫訪問。
本實例中,讀取csv
文件,轉爲User
實體,而後存儲到數據庫,所以須要先把User
這個實體做一個定義。使用了lombok
和jpa
的註解,以下:
@Entity
@Data
@Table(name="test_user")
public class User{
@Id
@GeneratedValue
/**
* id
*/
private Long id;
/**
* 姓名
*/
private String name;
/**
* 手機號
*/
private String phone;
...略
複製代碼
使用內置的FlatFileItemReader
便可。以下:
@Bean
public ItemReader file2DbItemReader(){
String funcName = Thread.currentThread().getStackTrace()[1].getMethodName();
return new FlatFileItemReaderBuilder<User>()
.name(funcName)
.resource(new ClassPathResource("user-data.csv"))
// .linesToSkip(1)
.delimited()
.names(new String[]{"id","name","phone","title","email","gender","date_of_birth","sys_create_time","sys_create_user","sys_update_time","sys_update_user"})
.fieldSetMapper(new UserFieldSetMapper())
.build();
}
複製代碼
說明:
FlatFileItemReaderBuilder
用於建立FlatFileItemReader
,設置相應的行爲,包括使用它來設置讀取文件的位置(resource
),文件分隔符(默認是','
),是否跳過前面幾行(linesToSkip
),標識每一列對應的列名稱(可與數據庫的字段名一致)。設置文件字段與數據庫實體字段的對應關係。FieldSetMapper
來實現,其中FieldSet
表明每一行文本數據,返回值即爲實體對象。以下所示:public class UserFieldSetMapper implements FieldSetMapper<User> {
@Override
public User mapFieldSet(FieldSet fieldSet) throws BindException {
String patternYmd = "yyyy-MM-dd";
String patternYmdHms = "yyyy-MM-dd HH:mm:ss";
User user = new User();
user.setId(fieldSet.readLong("id"));
user.setName(fieldSet.readString("name"));
user.setPhone(fieldSet.readString("phone"));
user.setTitle(fieldSet.readString("title"));
user.setEmail(fieldSet.readString("email"));
user.setGender(fieldSet.readString("gender"));
//此字段有可能爲null
String dataOfBirthStr = fieldSet.readString("date_of_birth");
if(SyncConstants.STR_CSV_NULL.equals(dataOfBirthStr)){
user.setDateOfBirth(null);
}else{
DateTime dateTime = DateUtil.parse(dataOfBirthStr, patternYmd);
user.setDateOfBirth(dateTime.toJdkDate());
}
user.setSysCreateTime(fieldSet.readDate("sys_create_time",patternYmdHms));
user.setSysCreateUser(fieldSet.readString("sys_create_user"));
user.setSysUpdateTime(fieldSet.readDate("sys_update_time",patternYmdHms));
user.setSysUpdateUser(fieldSet.readString("sys_update_user"));
return user;
}
}
複製代碼
因爲csv
文本文件中的數據null
值數據標識符爲\N
,所以能夠在處理組件中進行處理,把標識符\N
設置爲null
值。以下所示:
@Slf4j
public class File2DbItemProcessor implements ItemProcessor<User,User> {
@Override
public User process(User user) throws Exception {
user.setPhone(checkStr(user.getPhone()));
user.setTitle(checkStr(user.getTitle()));
user.setEmail(checkStr(user.getEmail()));
user.setGender(checkStr(user.getGender()));
log.info(LogConstants.LOG_TAG + "item process: " +user.getName());
return user;
}
public String checkStr(String dataToCheck){
if(SyncConstants.STR_CSV_NULL.equals(dataToCheck)){
return null;
}
return dataToCheck;
}
}
複製代碼
數據庫寫入組件使用JdbcBatchItemWriter
便可,以下:
@Bean
public ItemWriter file2DbWriter(@Qualifier("targetDatasource") DataSource datasource){
return new JdbcBatchItemWriterBuilder<User>()
.itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
.sql("INSERT INTO test_user(id,name,phone,title,email,gender,date_of_birth,sys_create_time,sys_create_user,sys_update_time,sys_update_user) " +
"VALUES (:id,:name,:phone,:title,:email,:gender,:dateOfBirth,:sysCreateTime,:sysCreateUser,:sysUpdateTime,:sysUpdateUser)")
.dataSource(datasource)
.build();
}
複製代碼
說明:
JdbcBatchItemWriterBuilder
進行JdbcBatchItemWriter
的建立,設置插入數據庫的sql語句,同時指定數據源便可。@Qualifier("targetDatasource") DataSource datasource
用於指定數據源BeanPropertyItemSqlParameterSourceProvider
能夠直接把讀取的數據實體的屬性數據做爲參數填充到sql
語句中,從而實現數據插入操做。通過上面的操做,可使用一個java配置,把讀、寫、處理組裝成完整的step
和job
,以下所示(詳細可見示例工程文件):
File2DbBatchConfig.java
@Bean
public Job file2DbJob(Step file2DbStep,JobExecutionListener file2DbListener){
String funcName = Thread.currentThread().getStackTrace()[1].getMethodName();
return jobBuilderFactory.get(funcName)
.listener(file2DbListener)
.flow(file2DbStep)
.end().build();
}
@Bean
public Step file2DbStep(ItemReader file2DbItemReader , ItemProcessor file2DbProcessor ,ItemWriter file2DbWriter){
String funcName = Thread.currentThread().getStackTrace()[1].getMethodName();
return stepBuilderFactory.get(funcName)
.<User,User>chunk(10)
.reader(file2DbItemReader)
.processor(file2DbProcessor)
.writer(file2DbWriter)
.build();
}
複製代碼
參考上一文章的ConsoleJobTest
,編寫File2DbJobTest
文件。
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {MainBootApplication.class,File2DbBatchConfig.class})
@Slf4j
public class File2DbJobTest {
@Autowired
private JobLauncherService jobLauncherService;
@Autowired
private Job file2DbJob;
@Test
public void testFile2DbJob() throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
//構建任務運行參數
JobParameters jobParameters = JobUtil.makeJobParameters();
//執行並顯示結果
Map<String, Object> stringObjectMap = jobLauncherService.startJob(file2DbJob, jobParameters);
Assert.assertEquals(ExitStatus.COMPLETED,stringObjectMap.get(SyncConstants.STR_RETURN_EXITSTATUS));
}
}
複製代碼
執行後結果輸出以下(exitCode=COMPLETED
):
本文先對Spring Batch
的開箱即用的ItemReader
,ItemWriter
、ItemProcessor
做了一個簡要的概覽,而後以讀csv
文件,處理null值,再插入到數據庫的處理邏輯爲案例,介紹了Spring Batch
的數據庫腳本,FlatFileItemReader
及JdbcBatchItemWriter
的使用。但願對你們更深刻的瞭解Spring Batch
有幫助,並能用到實踐中。
劉相《Spring Batch 批處理框架》:書中對Spring Batch進行了詳細的描述,本文章主要參考此書。
《Spring Batch - Reference Documentation》:書中對Spring Batch進行了詳細的描述,本文章主要參考此書。