tags: springbatchjava
前面《數據批處理神器-Spring Batch(1)簡介及使用場景》已經介紹了Spring Batch
是一個輕量級,完善的批處理框架,它使用起來簡單,方便,比較適合有點編程基礎(特別是使用Spring及SpringBoot框架)的開發人員,針對業務編程,只須要關心具體的業務實現便可,把流程以及流程的控制交給Spring Batch
就好。常言道"talk is cheap, show me the code
",下面咱們就經過一個簡單的hello world
,進入Spring Batch
的世界,經過這個示例,能夠快速瞭解開發批處理的流程和Spring Batch
開發用到的組件,爲後續的操做打下基礎。git
本helloworld實現一個很是簡單的功能,就是從數據組中讀取字符串,把字符串轉爲大寫,而後輸出到控制檯。如圖:github
整個過程就是一個批重任務(Job
),它只有一個步驟(Job Step
),步驟裏分爲三個階段,讀數據(ItemReader)、處理數據(ItemProcessor)、寫數據(ItemWriter)。spring
開發的主要代碼以下:數據庫
整體來講就是,經過Reader
,Processor
、Writer
完成任務,結束後經過Listener
進行監聽,整個任務經過配置(BatchConfig
)進行配置。編程
Spring Boot
工程直接使用Idea生成或在使用Spring Initializr
生成便可,此處不詳細說明。也能夠直接使用個人代碼示例。當前使用的Spring Boot
版本是2.1.4.RELEASE
數組
Spring Batch
依賴 在使用spring-boot-starter-parent
的狀況下,直接添加如下依賴便可:<!-- 批處理框架-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
複製代碼
引用後,會引用兩個jar包,一個是spring-batch-infrastructure
,一個是spring-batch-core
,版本是4.1.2.RELEASE
。分別對應的是基礎框架層和核心層。bash
Spring Batch
是須要數據庫來存儲任務的基本信息以及運行狀態的,本例中不須要操做數據庫邏輯,直接使用內存數據庫H2便可。添加如下依賴:<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
</dependency>
複製代碼
lombok
進行處理。使用Spring Boot
進行單元測試,添加依賴以下:<!-- 工具包:lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.6</version>
</dependency>
<!-- 測試框架 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
複製代碼
添加完依賴後,就能夠進入業務邏輯編程了。按Spring Batch
的批處理流程,讀數據ItemReader
是第一步,當前示例中,咱們的任務是從數組中讀取數據。ItemReader
是一個接口,開發人員直接實現此接口便可。此接口定義了核心方法read()
,負責從給定的資源中讀取可用的數據。具體實現以下:框架
@Slf4j
public class StringReader implements ItemReader<String> {
private String[] messages = {"aaa1","aaa2","aaa3","aaa4"};
private int count = 0;
@Override
public String read() throws UnexpectedInputException, ParseException, NonTransientResourceException {
if(count < messages.length){
String message = messages[count++];
log.debug(LogConstants.LOG_TAG + "read data:"+message);
return message;
}else{
log.debug(LogConstants.LOG_TAG + "read data end.");
count = 0;
}
return null;
}
}
複製代碼
說明:ide
@Slf4j
註解,直接可以使用log進行輸出,簡化操做。讀取數據後,返回的數據會流到ItemProcessor
進行處理。一樣,ItemProcessor
是一個接口,要實現本身的處理邏輯,實現此接口便可。固然,若是沒有ItemProcessor
,讀到的數據直接就到ItemWriter
流程也是能夠的。此處,Spring Batch
有一個Chunk
的概念,用於屢次讀,直到chunk指定的數量後,再統一給到processor和writer,以提升效率。本示例對於ItemProcessor
的實現很簡單,即把字符串轉爲大寫。以下:
@Slf4j
public class ConvertProcessor implements ItemProcessor<String,String> {
@Autowired
private ConsoleService consoleService;
@Override
public String process(String data) {
String dataProcessed = consoleService.convert2UpperCase(data);
log.debug(LogConstants.LOG_TAG + data +" process data --> " + dataProcessed);
return dataProcessed;
}
}
複製代碼
說明:
toUpperCase()
方法數據處理完後,會統一交給寫組件(ItemWriter
)進行寫入。ItemWriter
也是一個接口,核心方法是write
方法,參數是數組。要實現本身的邏輯,實現此接口便可。本示例中,直接把數據輸出到日誌中便可。以下:
@Slf4j
public class ConsoleWriter implements ItemWriter<String> {
@Override
public void write(List<? extends String> list) {
for (String msg :list) {
log.debug(LogConstants.LOG_TAG + "write data: "+msg);
}
}
}
複製代碼
數據寫入到目標後,任務即結束,但有時候咱們還須要在任務結束時去作一些其它工做,如清理數據,更新時間等,則須要在任務完成後進行邏輯處理。Spring Batch
對於任務或步驟開始和結束都會提供監聽,以便於開發人員實現監聽邏輯。如經過繼承JobExecutionListenerSupport
,能夠實現beforeJob
和afterJob
的監聽,以實現開始任務前和結束任務後的處理。當前示例中,僅輸出任務完成的日誌。以下:
@Slf4j
public class ConsoleJobEndListener extends JobExecutionListenerSupport {
@Override
public void afterJob(JobExecution jobExecution) {
if(jobExecution.getStatus() == BatchStatus.COMPLETED){
log.debug("console batch job complete!");
}
}
}
複製代碼
通過上面的讀、處理、寫、任務完成後監聽的操做,如今須要把它們組裝在一塊兒,造成一個完成的任務,使用Spring Boot
,簡單的使用幾個配置便可完成任務的組裝。任務及其相關組件的關係以下:
建立配置文件ConsoleBatchConfig.java
,具體代碼以下:
@Configuration
@EnableBatchProcessing
public class ConsoleBatchConfig {
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Bean
public Job consoleJob(Step consoleStep,JobExecutionListener consoleListener){
String funcName = Thread.currentThread().getStackTrace()[1].getMethodName();
return jobBuilderFactory.get(funcName).listener(consoleListener).flow(consoleStep)
.end().build();
}
@Bean
public Step consoleStep(ItemReader stringReader,ItemProcessor convertProcessor ,ItemWriter consoleWriter, CommonStepListener commonStepListener){
String funcName = Thread.currentThread().getStackTrace()[1].getMethodName();
return stepBuilderFactory.get(funcName).listener(commonStepListener)
.<String,String>chunk(3).reader(stringReader).processor(convertProcessor)
.writer(consoleWriter).build();
}
@Bean
public ItemReader stringReader(){return new StringReader();}
@Bean
public ItemWriter consoleWriter(){return new ConsoleWriter();}
@Bean
public ItemProcessor convertProcessor(){return new ConvertProcessor();}
@Bean
public JobExecutionListener consoleListener(){return new ConsoleJobEndListener();}
}
複製代碼
說明:
@Configuration及
和@EnableBatchProcessing
,標識爲配置及啓用Spring Batch
的配置(能夠直接使用JobBuilderFactory
及StepBuilderFactory
分別用於建立Job和Step)。ItemReader
、ItemWriter
、ItemProcessor
、Listener
對應的Bean
,以供Step及Job的注入。stepBuilderFactory
建立做業Step,其中chunk進行面向塊的處理,即屢次讀取後再寫入,提升效率。當前配置是3個爲一個chunk。jobBuilderFactory
添加step,建立任務。get
方法肯定),此處直接使用方法名做爲Job和Step的名稱。通過上面的步驟,已經完成Job的開發,測試則可以使用兩種方式,一個是編寫Controller
,以接口調用的方式運行job,一種編寫單元測試。
JobLauncher
的run
方法來運行任務,run
方法參數分別是Job
和jobParameters
,即已配置的Job及job運行的參數。每一個任務的區分是經過任務名(jobName
)和任務參數(jobParameters
)做爲區別的,即若是jobName
和jobParameters
相同,Spring Batch
會認爲是同一任務,若任務已運行成功,同一任務不會再運行。所以,通常來講,不一樣的任務,咱們的jobParameters
能夠直接以時間做爲參數,以便於區別。生成jobParameters
。代碼以下:JobParameters jobParameters = new JobParametersBuilder()
.addLong("time",System.currentTimeMillis())
.toJobParameters();
複製代碼
ConsoleJobTest
,加載job,運行測試,以下所示:@RunWith(SpringRunner.class)
@SpringBootTest(classes = {MainBootApplication.class,ConsoleBatchConfig.class})
@Slf4j
public class ConsoleJobTest {
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job consoleJob;
public void testConsoleJob2() throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
//構建參數
JobParameters jobParameters = new JobParametersBuilder()
.addLong("time",System.currentTimeMillis())
.toJobParameters();
//執行任務
JobExecution run = jobLauncher.run(consoleJob, jobParameters);
ExitStatus exitStatus = run.getExitStatus();
log.debug(exitStatus.toString());
}
}
複製代碼
說明:引入SpringBootTest
註解時,須要把Spring Batch
任務也引入進來。
執行結果輸出 執行結果以下圖所示:
從輸出可知,因爲設置的chunk
是3,讀取3個數據後,就統一給ItemProcessor
進行大寫轉換處理,而後統一交給ItemWriter
進行寫入。執行完成後,Job的exitCode表示任務執行的狀態,若是正常則爲COMPLETED
,失敗則爲FAILED
。
通過以上的操做步驟,便可完成批處理操做。關於任務的狀態,流程的步驟(讀、處理、寫)均交給Spring Batch
來完成,開發人員所作的工做是根據本身的業務邏輯編寫具體的讀數據、處理數據和寫數據便可。但願經過本文,你們能夠對Spring Batch
的組件有清晰的瞭解。