Spring Batch(5)——文件讀寫

Spring batch由上至下的結構JobStep都是屬於框架級別的的功能,大部分時候都是提供一些配置選項給開發人員使用,而Item中的ReaderProcessorWriter是屬於業務級別的,它開放了一些業務切入的接口。 可是文件的讀寫過程當中有不少通用一致的功能Spring Batch爲這些相同的功能提供了一致性實現類。git

扁平結構文件

扁平結構文件(也稱爲矩陣結構文件,後文簡稱爲文件)是最多見的一種文件類型。他一般以一行表示一條記錄,字段數據之間用某種方式分割。與標準的格式數據(xml、json等)主要差異在於他沒有結構性描述方案(SXD、JSON-SCHEME),進而沒有結構性分割規範。所以在讀寫此類文件以前須要先設定好字段的分割方法。github

文件的字段數據分割方式一般有兩種:使用分隔符固定字段長度。前者一般使用逗號()之類的符號對字段數據進行劃分,後者的每一列字段數據長度是固定的。 框架爲文件的讀取提供了FieldSet用於將文件結構中的信息映射到一個對象。FieldSet的做用是將文件的數據與類的field進行綁定(field是Java中常見的概念,不清楚的能夠了解Java反射)。spring

數據讀取

Spring Batch爲文件讀取提供了FlatFileItemReader類,它爲文件中的數據的讀取和轉換提供了基本功能。在FlatFileItemReader中有2個主要的功能接口,一是Resource、二是LineMapperResource用於外部文件獲取,詳情請查看Spring核心——資源管理部分的內容,下面是一個例子:數據庫

Resource resource = new FileSystemResource("resources/trades.csv");

在複雜的生產環境中,文件一般由中心化、或者流程式的基礎框架來管理(好比EAI)。所以文件每每須要使用FTP等方式從其餘位置獲取。如何遷移文件已經超出了Spring Batch框架的範圍,在Spring的體系中能夠參考Spring Integration項目。json

下面是FlatFileItemReader的屬性,每個屬性都提供了Setter方法。app

屬性名 參數類型 說明
comments String[] 指定文件中的註釋前綴,用於過濾註釋內容行
encoding String 指定文件的編碼方式,默認爲Charset.defaultCharset()
lineMapper LineMapper 利用LineMapper接口將一行字符串轉換爲對象
linesToSkip int 跳過文件開始位置的行數,用於跳過一些字段的描述行
recordSeparatorPolicy RecordSeparatorPolicy 用於判斷數據是否結束
resource Resource 指定外部資源文件位置
skippedLinesCallback LineCallbackHandler 當配置linesToSkip,每執行一次跳過都會被回調一次,會傳入跳過的行數據內容

每一個屬性都爲文件的解析提供了某方面的功能,下面是結構的說明。框架

LineMapper

這個接口的做用是將字符串轉換爲對象:curl

public interface LineMapper { T mapLine(String line, int lineNumber) throws Exception; }

接口的基本處理邏輯是聚合類(FlatFileItemReader)傳遞一行字符串以及行號給LineMapper::mapLine,方法處理後返回一個映射的對象。ide

LineTokenizer

這個接口的做用是將一行數據轉換爲一個FieldSet結構。對於Spring Batch而言,扁平結構文件的到Java實體的映射都經過FieldSet來控制,所以讀寫文件的過程須要完成字符串到FieldSet的轉換:工具

public interface LineTokenizer { FieldSet tokenize(String line); }

這個接口的含義是:傳遞一行字符串數據,而後獲取一個FieldSet

框架爲LineTokenizer提供三個實現類:

  • DelimitedLineTokenizer:利用分隔符將數據轉換爲FieldSet。最多見的分隔符是逗號,,類提供了分隔符的配置和解析方法。

  • FixedLengthTokenizer:根據字段的長度來解析出FieldSet結構。必須爲記錄定義字段寬度。

  • PatternMatchingCompositeLineTokenizer:使用一個匹配機制來動態決定使用哪一個LineTokenizer

FieldSetMapper

該接口是將FieldSet轉換爲對象:

public interface FieldSetMapper { T mapFieldSet(FieldSet fieldSet) throws BindException; }

FieldSetMapper一般和LineTokenizer聯合在一塊兒使用:String->FieldSet->Object

DefaultLineMapper

DefaultLineMapperLineMapper的實現,他實現了從文件到Java實體的映射:

public class DefaultLineMapper implements LineMapper<>, InitializingBean {
	private LineTokenizer tokenizer;
	private FieldSetMapper fieldSetMapper;
	public T mapLine(String line, int lineNumber) throws Exception {
		return fieldSetMapper.mapFieldSet(tokenizer.tokenize(line));
	}
	public void setLineTokenizer(LineTokenizer tokenizer) {
		this.tokenizer = tokenizer;
	}
	public void setFieldSetMapper(FieldSetMapper fieldSetMapper) {
		this.fieldSetMapper = fieldSetMapper;
	}
}

在解析文件時數據是按行解析的:

  1. 傳入一行字符串。
  2. LineTokenizer將字符串解析爲FieldSet結構。
  3. FieldSetMapper繼續解析爲一個Java實體對象返回給調用者。

DefaultLineMapper是框架提供的默認實現類,看似很是簡單,可是利用組合模式能夠擴展出不少功能。

數據自動映射

在轉換過程當中若是將FieldSetnames屬性與目標類的field綁定在一塊兒,那麼能夠直接使用反射實現數據轉換,爲此框架提供了BeanWrapperFieldSetMapper來實現。

DefaultLineMapper<WeatherEntity> lineMapper = new DefaultLineMapper<>(); //建立LineMapper

DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer(); //建立LineTokenizer
tokenizer.setNames(new String[] { "siteId", "month", "type", "value", "ext" }); //設置Field名稱

BeanWrapperFieldSetMapper<WeatherEntity> wrapperMapper 
	= new BeanWrapperFieldSetMapper<>(); //建立FieldSetMapper
wrapperMapper.setTargetType(WeatherEntity.class); //設置實體,實體的field名稱必須和tokenizer.names一致。

// 組合lineMapper
lineMapper.setLineTokenizer(tokenizer);
lineMapper.setFieldSetMapper(wrapperMapper);

文件讀取總結

上面提到了各類接口和實現,實際上都是圍繞着FlatFileItemReader的屬性在介紹,雖然內容不少可是實際上就如下幾點:

  • 首先要定位文件,Spring Batch提供了Resource相關的定位方法。

  • 其次是將文件中的行字符串數據轉換爲對象,LineMapper的功能就是完成這個功能。

  • 框架爲LineMapper提供了DefaultLineMapper做爲默認實現方法,在DefaultLineMapper中須要組合使用LineTokenizerFieldSetMapper。前者將字符串轉爲爲一個Field,後者將Field轉換爲目標對象。

  • LineTokenizer有3個實現類可供使用、FieldSetMapper有一個默認實現類BeanWrapperFieldSetMapper

文件讀取可執行源碼

可執行的源碼在下列地址的items子工程中:

運行以前須要配置數據庫連接,參看源碼庫中的README.md。

文件讀取的主要邏輯在org.chenkui.spring.batch.sample.items.FlatFileReader類:

public class FlatFileReader {
    // FeildSet的字段名,設置字段名以後能夠直接使用名字做爲索引獲取數據。也可使用索引位置來獲取數據
    public final static String[] Tokenizer = new String[] { "siteId", "month", "type", "value", "ext" };
    private boolean userWrapper = false;

    @Bean
    //定義FieldSetMapper用於FieldSet->WeatherEntity
    public FieldSetMapper<WeatherEntity> fieldSetMapper() {
        return new FieldSetMapper<WeatherEntity>() {
            @Override
            public WeatherEntity mapFieldSet(FieldSet fieldSet) throws BindException {
                if (null == fieldSet) {
                    return null; // fieldSet不存在則跳過該行處理
                } else {
                    WeatherEntity observe = new WeatherEntity();
                    observe.setSiteId(fieldSet.readRawString("siteId"));
                    //Setter
                    return observe;
                }
            }
        };
    }

    @Bean
    // 配置 Reader
    public ItemReader<WeatherEntity> flatFileReader(
                           @Qualifier("fieldSetMapper") FieldSetMapper<WeatherEntity> fieldSetMapper) {
        FlatFileItemReader<WeatherEntity> reader = new FlatFileItemReader<>();
        reader.setResource(new FileSystemResource("src/main/resources/data.csv")); // 讀取資源文件
        DefaultLineMapper<WeatherEntity> lineMapper = new DefaultLineMapper<>(); // 初始化 LineMapper實現類
        DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer(); // 建立LineTokenizer接口實現

        tokenizer.setNames(Tokenizer); // 設定每一個字段的名稱,若是不設置須要使用索引獲取值
        lineMapper.setLineTokenizer(tokenizer); // 設置tokenizer工具

        if (userWrapper) { //使用 BeanWrapperFieldSetMapper 使用反射直接轉換
            BeanWrapperFieldSetMapper<WeatherEntity> wrapperMapper = new BeanWrapperFieldSetMapper<>();
            wrapperMapper.setTargetType(WeatherEntity.class);
            fieldSetMapper = wrapperMapper;
        }

        lineMapper.setFieldSetMapper(fieldSetMapper);
        reader.setLineMapper(lineMapper);
        reader.setLinesToSkip(1); // 跳過的初始行,用於過濾字段行
        reader.open(new ExecutionContext());
        return reader;
    }
}

按字段長度格讀取文件

除了按照分隔符,有些文件能夠字段數據的佔位長度來提取數據。按照前面介紹的過程,實際上只要修改LineTokenizer接口便可,框架提供了FixedLengthTokenizer類:

@Bean
public FixedLengthTokenizer fixedLengthTokenizer() {
    FixedLengthTokenizer tokenizer = new FixedLengthTokenizer();

    tokenizer.setNames("ISIN", "Quantity", "Price", "Customer");
    //Range用於設定數據的長度。
    tokenizer.setColumns(new Range(1-12),
                        new Range(13-15),
                        new Range(16-20),
                        new Range(21-29));
	return tokenizer;
}

寫入扁平結構文件

將數據寫入到文件與讀取的過程正好相反:將對象轉換爲字符串。

LineAggregator

LineMapper相對應的是LineAggregator,他的功能是將實體轉換爲字符串:

public interface LineAggregator<T> {
    public String aggregate(T item);
}

PassThroughLineAggregator

框架爲LineAggregator接口提供了一個很是簡單的實現類——PassThroughLineAggregator,其惟一實現就是使用對象的toString方法:

public class PassThroughLineAggregator<T> implements LineAggregator<T> {
    public String aggregate(T item) {
        return item.toString();
    }
}

DelimitedLineAggregator

LineAggregator的另一個實現類是DelimitedLineAggregator。與PassThroughLineAggregator簡單直接使用toString方法不一樣的是,DelimitedLineAggregator須要一個轉換接口FieldExtractor

DelimitedLineAggregator<CustomerCredit> lineAggregator = new DelimitedLineAggregator<>();
lineAggregator.setDelimiter(",");
lineAggregator.setFieldExtractor(fieldExtractor);

FieldExtractor

FieldExtractor用於實體類到collection結構的轉換。它能夠和LineTokenizer進行類比,前者是將實體類轉換爲扁平結構的數據,後者是將String轉換爲一個FieldSet結構。

public interface FieldExtractor<T> {
    Object[] extract(T item);
}

框架爲FieldExtractor接口提供了一個基於反射的實現類BeanWrapperFieldExtractor,其過程就是將實體對象轉換爲列表:

BeanWrapperFieldExtractor<CustomerCredit> fieldExtractor = new BeanWrapperFieldExtractor<>();
fieldExtractor.setNames(new String[] {"field1", "field2"});

setName方法用於指定要轉換的field列表。

輸出文件處理

文件讀取的邏輯很是簡單:文件存在打開文件並寫入數據,當文件不存在拋出異常。可是寫入文件明顯不能這麼簡單粗暴。新建一個JobInstance時最直觀的操做是:存在同名文件就拋出異常,不存在則建立文件並寫入數據。可是這樣作顯然有很大的問題,當批處理過程當中出現問題須要restart,此時並不會從頭開始處理全部的數據,而是要求文件存在並接着繼續寫入。爲了確保這個過程FlatFileItemWriter默認會在新JobInstance運行時刪除已有文件,而運行重啓時繼續在文件末尾寫入。FlatFileItemWriter可使用shouldDeleteIfExistsappendAllowedshouldDeleteIfEmpty來有針對性的控制文件。

文件寫入可執源碼

文件寫入主要代碼在org.chenkui.spring.batch.sample.items.FlatFileWriter

public class FlatFileWriter {

    private boolean useBuilder = true;

    @Bean
    public ItemWriter<MaxTemperatureEntiry> flatFileWriter() {
        BeanWrapperFieldExtractor<MaxTemperatureEntiry> fieldExtractor = new BeanWrapperFieldExtractor<>();
        fieldExtractor.setNames(new String[] { "siteId", "date", "temperature" }); //設置映射field
        fieldExtractor.afterPropertiesSet(); //參數檢查

        DelimitedLineAggregator<MaxTemperatureEntiry> lineAggregator = new DelimitedLineAggregator<>();
        lineAggregator.setDelimiter(","); //設置輸出分隔符
        lineAggregator.setFieldExtractor(fieldExtractor); //設置FieldExtractor處理器

        FlatFileItemWriter<MaxTemperatureEntiry> fileWriter = new FlatFileItemWriter<>();
        fileWriter.setLineAggregator(lineAggregator);
        fileWriter.setResource(new FileSystemResource("src/main/resources/out-data.csv")); //設置輸出文件位置
        fileWriter.setName("outpufData");

        if (useBuilder) {//使用builder方式建立
            fileWriter = new FlatFileItemWriterBuilder<MaxTemperatureEntiry>().name("outpufData")
                .resource(new FileSystemResource("src/main/resources/out-data.csv")).lineAggregator(lineAggregator)
                .build();
        }
        return fileWriter;
    }
}

文件的寫入過程與讀取過程徹底對稱相反:先用FieldExtractor將對象轉換爲一個collection結構(列表),而後用lineAggregatorcollection轉化爲帶分隔符的字符串。

代碼說明

  • 代碼中的測試數據來自數據分析交流項目bi-process-example,是NOAA的2015年全球天氣監控數據。爲了便於源碼存儲進行了大量的刪減,原始數據有百萬條,若有須要使用下列方式下載:

    curl -O ftp://ftp.ncdc.noaa.gov/pub/data/ghcn/daily/by_year/2015.csv.gz #數據文件
    	curl -O ftp://ftp.ncdc.noaa.gov/pub/data/ghcn/daily/ghcnd-stations.txt # 文件結構及類型說明
  • 代碼實現了讀取文件、處理文件、寫入文件的整個過程。處理文件的過程是隻獲取監控的最高溫度信息(Type=TMAX),其餘都過濾。

  • 本案例的代碼使用org.chenkui.spring.batch.sample.flatfile.FlatFileItemApplication::main方法運行,使用的是Command Runner的方式執行(運行方式的說明見Item概念及使用代碼命令行方式運行Java內嵌運行)。

相關文章
相關標籤/搜索