快速瞭解組件-spring batch(2)之helloworld

tags: springbatchjava


1.引言

前面《數據批處理神器-Spring Batch(1)簡介及使用場景》已經介紹了Spring Batch是一個輕量級,完善的批處理框架,它使用起來簡單,方便,比較適合有點編程基礎(特別是使用Spring及SpringBoot框架)的開發人員,針對業務編程,只須要關心具體的業務實現便可,把流程以及流程的控制交給Spring Batch就好。常言道"talk is cheap, show me the code",下面咱們就經過一個簡單的hello world,進入Spring Batch的世界,經過這個示例,能夠快速瞭解開發批處理的流程和Spring Batch開發用到的組件,爲後續的操做打下基礎。git

2.開發環境

  • JDK: jdk1.8
  • Spring Boot: 2.1.4.RELEASE
  • Spring Batch:4.1.2.RELEASE
  • 開發IDE: IDEA
  • 構建工具Maven: 3.3.9
  • 日誌組件logback:1.2.3
  • lombok:1.18.6

3.helloworld開發

3.1 helloworld說明

本helloworld實現一個很是簡單的功能,就是從數據組中讀取字符串,把字符串轉爲大寫,而後輸出到控制檯。如圖:github

字符串讀寫

整個過程就是一個批重任務(Job),它只有一個步驟(Job Step),步驟裏分爲三個階段,讀數據(ItemReader)、處理數據(ItemProcessor)、寫數據(ItemWriter)。spring

3.2 開發流程

開發的主要代碼以下:數據庫

主要代碼

整體來講就是,經過ReaderProcessorWriter完成任務,結束後經過Listener進行監聽,整個任務經過配置(BatchConfig)進行配置。編程

3.2.1 建立Spring Boot工程

直接使用Idea生成或在使用Spring Initializr生成便可,此處不詳細說明。也能夠直接使用個人代碼示例。當前使用的Spring Boot版本是2.1.4.RELEASE數組

3.2.2 添加相關依賴

  • Spring Batch依賴 在使用spring-boot-starter-parent的狀況下,直接添加如下依賴便可:
<!-- 批處理框架-->
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
複製代碼

引用後,會引用兩個jar包,一個是spring-batch-infrastructure,一個是spring-batch-core,版本是4.1.2.RELEASE。分別對應的是基礎框架層和核心層。bash

  • 添加內存數據庫H2 Spring Batch是須要數據庫來存儲任務的基本信息以及運行狀態的,本例中不須要操做數據庫邏輯,直接使用內存數據庫H2便可。添加如下依賴:
<dependency>
	<groupId>com.h2database</groupId>
	<artifactId>h2</artifactId>
</dependency>
複製代碼
  • 添加測試及工具類依賴 爲了簡化開發,使用lombok進行處理。使用Spring Boot進行單元測試,添加依賴以下:
<!-- 工具包:lombok -->
<dependency>
	<groupId>org.projectlombok</groupId>
	<artifactId>lombok</artifactId>
	<version>1.18.6</version>
</dependency>
<!-- 測試框架 -->
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-test</artifactId>
	<scope>test</scope>
</dependency>
複製代碼

3.2.3 開發讀數據組件ItemReader

添加完依賴後,就能夠進入業務邏輯編程了。按Spring Batch的批處理流程,讀數據ItemReader是第一步,當前示例中,咱們的任務是從數組中讀取數據。ItemReader是一個接口,開發人員直接實現此接口便可。此接口定義了核心方法read(),負責從給定的資源中讀取可用的數據。具體實現以下:框架

@Slf4j
public class StringReader implements ItemReader<String> {
    private String[] messages = {"aaa1","aaa2","aaa3","aaa4"};
    private int count = 0;
    @Override
    public String read() throws UnexpectedInputException, ParseException, NonTransientResourceException {
        if(count < messages.length){
            String message = messages[count++];
            log.debug(LogConstants.LOG_TAG + "read data:"+message);
            return message;
        }else{
            log.debug(LogConstants.LOG_TAG + "read data end.");
            count = 0;
        }
        return null;
    }
}
複製代碼

說明:ide

  • (1)StringReader實現ItemReader接口;
  • (2)messages是數據源;
  • (3)count表示讀取數據的下標,每讀一次,下標自增,讀取完後返回null表示結束。同時把count置爲0,以方便下次讀取。
  • (4)日誌輸出使用的是logback,結合lombok的@Slf4j註解,直接可以使用log進行輸出,簡化操做。

3.2.4 開發處理數據組件ItemProcessor

讀取數據後,返回的數據會流到ItemProcessor進行處理。一樣,ItemProcessor是一個接口,要實現本身的處理邏輯,實現此接口便可。固然,若是沒有ItemProcessor,讀到的數據直接就到ItemWriter流程也是能夠的。此處,Spring Batch有一個Chunk的概念,用於屢次讀,直到chunk指定的數量後,再統一給到processor和writer,以提升效率。本示例對於ItemProcessor的實現很簡單,即把字符串轉爲大寫。以下:

@Slf4j
public class ConvertProcessor implements ItemProcessor<String,String> {
    @Autowired
    private ConsoleService consoleService;
    @Override
    public String process(String data) {
        String dataProcessed = consoleService.convert2UpperCase(data);
        log.debug(LogConstants.LOG_TAG + data +" process data --> " + dataProcessed);
        return dataProcessed;
    }
}
複製代碼

說明:

  • 實現ItemProcessor接口,它有兩個泛型,分別是I和O,I是讀階段獲取的數據,O是提交給寫階段的數據。
  • 使用ConsoleService服務,對數據進行大寫轉換,裏面的實現直接使用字符串的toUpperCase()方法

3.2.5 開發寫數據組件ItemWriter

數據處理完後,會統一交給寫組件(ItemWriter)進行寫入。ItemWriter也是一個接口,核心方法是write方法,參數是數組。要實現本身的邏輯,實現此接口便可。本示例中,直接把數據輸出到日誌中便可。以下:

@Slf4j
public class ConsoleWriter implements ItemWriter<String> {
    @Override
    public void write(List<? extends String> list) {
        for (String msg :list) {
            log.debug(LogConstants.LOG_TAG + "write data: "+msg);
        }
    }
}
複製代碼

3.2.6 開發任務完成後的監聽器JobExecutionListener

數據寫入到目標後,任務即結束,但有時候咱們還須要在任務結束時去作一些其它工做,如清理數據,更新時間等,則須要在任務完成後進行邏輯處理。Spring Batch對於任務或步驟開始和結束都會提供監聽,以便於開發人員實現監聽邏輯。如經過繼承JobExecutionListenerSupport,能夠實現beforeJobafterJob的監聽,以實現開始任務前和結束任務後的處理。當前示例中,僅輸出任務完成的日誌。以下:

@Slf4j
public class ConsoleJobEndListener extends JobExecutionListenerSupport {
    @Override
    public void afterJob(JobExecution jobExecution) {
        if(jobExecution.getStatus() == BatchStatus.COMPLETED){
            log.debug("console batch job complete!");
        }
    }
}
複製代碼

3.2.7 配置完整任務

通過上面的讀、處理、寫、任務完成後監聽的操做,如今須要把它們組裝在一塊兒,造成一個完成的任務,使用Spring Boot,簡單的使用幾個配置便可完成任務的組裝。任務及其相關組件的關係以下:

組件關係

建立配置文件ConsoleBatchConfig.java,具體代碼以下:

@Configuration
@EnableBatchProcessing
public class ConsoleBatchConfig {
    @Autowired
    public JobBuilderFactory jobBuilderFactory;
    @Autowired
    public StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job consoleJob(Step consoleStep,JobExecutionListener consoleListener){
        String funcName = Thread.currentThread().getStackTrace()[1].getMethodName();
        return jobBuilderFactory.get(funcName).listener(consoleListener).flow(consoleStep)
                .end().build();
    }

    @Bean
    public Step consoleStep(ItemReader stringReader,ItemProcessor convertProcessor ,ItemWriter consoleWriter, CommonStepListener commonStepListener){
        String funcName = Thread.currentThread().getStackTrace()[1].getMethodName();
        return stepBuilderFactory.get(funcName).listener(commonStepListener)
                .<String,String>chunk(3).reader(stringReader).processor(convertProcessor)
                .writer(consoleWriter).build();
    }

    @Bean
    public ItemReader stringReader(){return new StringReader();}

    @Bean
    public ItemWriter consoleWriter(){return new ConsoleWriter();}

    @Bean
    public ItemProcessor convertProcessor(){return new ConvertProcessor();}

    @Bean
    public JobExecutionListener consoleListener(){return new ConsoleJobEndListener();}
}
複製代碼

說明:

  • 添加註解@Configuration及@EnableBatchProcessing,標識爲配置及啓用Spring Batch的配置(能夠直接使用JobBuilderFactoryStepBuilderFactory分別用於建立Job和Step)。
  • 建立ItemReaderItemWriterItemProcessorListener對應的Bean,以供Step及Job的注入。
  • 使用stepBuilderFactory建立做業Step,其中chunk進行面向塊的處理,即屢次讀取後再寫入,提升效率。當前配置是3個爲一個chunk。
  • 使用jobBuilderFactory添加step,建立任務。
  • 注意step和Job都須要有對應的名稱(get方法肯定),此處直接使用方法名做爲Job和Step的名稱。

3.2.8 測試批處理

通過上面的步驟,已經完成Job的開發,測試則可以使用兩種方式,一個是編寫Controller,以接口調用的方式運行job,一種編寫單元測試。

  • Job的運行 經過JobLauncherrun方法來運行任務,run方法參數分別是JobjobParameters,即已配置的Job及job運行的參數。每一個任務的區分是經過任務名(jobName)和任務參數(jobParameters)做爲區別的,即若是jobNamejobParameters相同,Spring Batch會認爲是同一任務,若任務已運行成功,同一任務不會再運行。所以,通常來講,不一樣的任務,咱們的jobParameters能夠直接以時間做爲參數,以便於區別。生成jobParameters。代碼以下:
JobParameters jobParameters = new JobParametersBuilder()
                .addLong("time",System.currentTimeMillis())
                .toJobParameters();
複製代碼
  • 編寫單元測試 編寫ConsoleJobTest,加載job,運行測試,以下所示:
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {MainBootApplication.class,ConsoleBatchConfig.class})
@Slf4j
public class ConsoleJobTest {
    @Autowired
    private JobLauncher jobLauncher;

    @Autowired
    private Job consoleJob;

    public void testConsoleJob2() throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
        //構建參數
        JobParameters jobParameters = new JobParametersBuilder()
                .addLong("time",System.currentTimeMillis())
                .toJobParameters();
        //執行任務
        JobExecution run = jobLauncher.run(consoleJob, jobParameters);
        ExitStatus exitStatus = run.getExitStatus();
        log.debug(exitStatus.toString());
    }
}
複製代碼

說明:引入SpringBootTest註解時,須要把Spring Batch任務也引入進來。

  • 執行結果輸出 執行結果以下圖所示:

    執行結果

    從輸出可知,因爲設置的chunk是3,讀取3個數據後,就統一給ItemProcessor進行大寫轉換處理,而後統一交給ItemWriter進行寫入。執行完成後,Job的exitCode表示任務執行的狀態,若是正常則爲COMPLETED,失敗則爲FAILED

4.總結

通過以上的操做步驟,便可完成批處理操做。關於任務的狀態,流程的步驟(讀、處理、寫)均交給Spring Batch來完成,開發人員所作的工做是根據本身的業務邏輯編寫具體的讀數據、處理數據和寫數據便可。但願經過本文,你們能夠對Spring Batch的組件有清晰的瞭解。

相關文章
相關標籤/搜索