在Spring batch由上至下的結構中Job、Step都是屬於框架級別的的功能,大部分時候都是提供一些配置選項給開發人員使用,而Item中的Reader
、Processor
和Writer
是屬於業務級別的,它開放了一些業務切入的接口。 可是文件的讀寫過程當中有不少通用一致的功能Spring Batch爲這些相同的功能提供了一致性實現類。git
扁平結構文件(也稱爲矩陣結構文件,後文簡稱爲文件)是最多見的一種文件類型。他一般以一行表示一條記錄,字段數據之間用某種方式分割。與標準的格式數據(xml、json等)主要差異在於他沒有結構性描述方案(SXD、JSON-SCHEME),進而沒有結構性分割規範。所以在讀寫此類文件以前須要先設定好字段的分割方法。github
文件的字段數據分割方式一般有兩種:使用分隔符或固定字段長度。前者一般使用逗號(,
)之類的符號對字段數據進行劃分,後者的每一列字段數據長度是固定的。 框架爲文件的讀取提供了FieldSet
用於將文件結構中的信息映射到一個對象。FieldSet
的做用是將文件的數據與類的field
進行綁定(field是Java中常見的概念,不清楚的能夠了解Java反射)。spring
Spring Batch爲文件讀取提供了FlatFileItemReader
類,它爲文件中的數據的讀取和轉換提供了基本功能。在FlatFileItemReader
中有2個主要的功能接口,一是Resource
、二是LineMapper
。 Resource
用於外部文件獲取,詳情請查看Spring核心——資源管理部分的內容,下面是一個例子:數據庫
Resource resource = new FileSystemResource("resources/trades.csv");
在複雜的生產環境中,文件一般由中心化、或者流程式的基礎框架來管理(好比EAI)。所以文件每每須要使用FTP等方式從其餘位置獲取。如何遷移文件已經超出了Spring Batch框架的範圍,在Spring的體系中能夠參考Spring Integration
項目。json
下面是FlatFileItemReader
的屬性,每個屬性都提供了Setter方法。app
屬性名 | 參數類型 | 說明 |
---|---|---|
comments | String[] | 指定文件中的註釋前綴,用於過濾註釋內容行 |
encoding | String | 指定文件的編碼方式,默認爲Charset.defaultCharset() |
lineMapper | LineMapper | 利用LineMapper接口將一行字符串轉換爲對象 |
linesToSkip | int | 跳過文件開始位置的行數,用於跳過一些字段的描述行 |
recordSeparatorPolicy | RecordSeparatorPolicy | 用於判斷數據是否結束 |
resource | Resource | 指定外部資源文件位置 |
skippedLinesCallback | LineCallbackHandler | 當配置linesToSkip,每執行一次跳過都會被回調一次,會傳入跳過的行數據內容 |
每一個屬性都爲文件的解析提供了某方面的功能,下面是結構的說明。框架
這個接口的做用是將字符串轉換爲對象:curl
public interface LineMapper { T mapLine(String line, int lineNumber) throws Exception; }
接口的基本處理邏輯是聚合類(FlatFileItemReader
)傳遞一行字符串以及行號給LineMapper::mapLine
,方法處理後返回一個映射的對象。ide
這個接口的做用是將一行數據轉換爲一個FieldSet
結構。對於Spring Batch而言,扁平結構文件的到Java實體的映射都經過FieldSet
來控制,所以讀寫文件的過程須要完成字符串到FieldSet
的轉換:工具
public interface LineTokenizer { FieldSet tokenize(String line); }
這個接口的含義是:傳遞一行字符串數據,而後獲取一個FieldSet
。
框架爲LineTokenizer
提供三個實現類:
DelimitedLineTokenizer
:利用分隔符將數據轉換爲FieldSet
。最多見的分隔符是逗號,
,類提供了分隔符的配置和解析方法。
FixedLengthTokenizer
:根據字段的長度來解析出FieldSet
結構。必須爲記錄定義字段寬度。
PatternMatchingCompositeLineTokenizer
:使用一個匹配機制來動態決定使用哪一個LineTokenizer
。
該接口是將FieldSet
轉換爲對象:
public interface FieldSetMapper { T mapFieldSet(FieldSet fieldSet) throws BindException; }
FieldSetMapper
一般和LineTokenizer
聯合在一塊兒使用:String->FieldSet->Object。
DefaultLineMapper
是LineMapper
的實現,他實現了從文件到Java實體的映射:
public class DefaultLineMapper implements LineMapper<>, InitializingBean { private LineTokenizer tokenizer; private FieldSetMapper fieldSetMapper; public T mapLine(String line, int lineNumber) throws Exception { return fieldSetMapper.mapFieldSet(tokenizer.tokenize(line)); } public void setLineTokenizer(LineTokenizer tokenizer) { this.tokenizer = tokenizer; } public void setFieldSetMapper(FieldSetMapper fieldSetMapper) { this.fieldSetMapper = fieldSetMapper; } }
在解析文件時數據是按行解析的:
LineTokenizer
將字符串解析爲FieldSet
結構。FieldSetMapper
繼續解析爲一個Java實體對象返回給調用者。DefaultLineMapper
是框架提供的默認實現類,看似很是簡單,可是利用組合模式能夠擴展出不少功能。
在轉換過程當中若是將FieldSet
的names
屬性與目標類的field
綁定在一塊兒,那麼能夠直接使用反射實現數據轉換,爲此框架提供了BeanWrapperFieldSetMapper
來實現。
DefaultLineMapper<WeatherEntity> lineMapper = new DefaultLineMapper<>(); //建立LineMapper DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer(); //建立LineTokenizer tokenizer.setNames(new String[] { "siteId", "month", "type", "value", "ext" }); //設置Field名稱 BeanWrapperFieldSetMapper<WeatherEntity> wrapperMapper = new BeanWrapperFieldSetMapper<>(); //建立FieldSetMapper wrapperMapper.setTargetType(WeatherEntity.class); //設置實體,實體的field名稱必須和tokenizer.names一致。 // 組合lineMapper lineMapper.setLineTokenizer(tokenizer); lineMapper.setFieldSetMapper(wrapperMapper);
上面提到了各類接口和實現,實際上都是圍繞着FlatFileItemReader
的屬性在介紹,雖然內容不少可是實際上就如下幾點:
首先要定位文件,Spring Batch提供了Resource
相關的定位方法。
其次是將文件中的行字符串數據轉換爲對象,LineMapper
的功能就是完成這個功能。
框架爲LineMapper
提供了DefaultLineMapper
做爲默認實現方法,在DefaultLineMapper
中須要組合使用LineTokenizer
和FieldSetMapper
。前者將字符串轉爲爲一個Field
,後者將Field
轉換爲目標對象。
LineTokenizer
有3個實現類可供使用、FieldSetMapper
有一個默認實現類BeanWrapperFieldSetMapper
。
可執行的源碼在下列地址的items子工程中:
運行以前須要配置數據庫連接,參看源碼庫中的README.md。
文件讀取的主要邏輯在org.chenkui.spring.batch.sample.items.FlatFileReader
類:
public class FlatFileReader { // FeildSet的字段名,設置字段名以後能夠直接使用名字做爲索引獲取數據。也可使用索引位置來獲取數據 public final static String[] Tokenizer = new String[] { "siteId", "month", "type", "value", "ext" }; private boolean userWrapper = false; @Bean //定義FieldSetMapper用於FieldSet->WeatherEntity public FieldSetMapper<WeatherEntity> fieldSetMapper() { return new FieldSetMapper<WeatherEntity>() { @Override public WeatherEntity mapFieldSet(FieldSet fieldSet) throws BindException { if (null == fieldSet) { return null; // fieldSet不存在則跳過該行處理 } else { WeatherEntity observe = new WeatherEntity(); observe.setSiteId(fieldSet.readRawString("siteId")); //Setter return observe; } } }; } @Bean // 配置 Reader public ItemReader<WeatherEntity> flatFileReader( @Qualifier("fieldSetMapper") FieldSetMapper<WeatherEntity> fieldSetMapper) { FlatFileItemReader<WeatherEntity> reader = new FlatFileItemReader<>(); reader.setResource(new FileSystemResource("src/main/resources/data.csv")); // 讀取資源文件 DefaultLineMapper<WeatherEntity> lineMapper = new DefaultLineMapper<>(); // 初始化 LineMapper實現類 DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer(); // 建立LineTokenizer接口實現 tokenizer.setNames(Tokenizer); // 設定每一個字段的名稱,若是不設置須要使用索引獲取值 lineMapper.setLineTokenizer(tokenizer); // 設置tokenizer工具 if (userWrapper) { //使用 BeanWrapperFieldSetMapper 使用反射直接轉換 BeanWrapperFieldSetMapper<WeatherEntity> wrapperMapper = new BeanWrapperFieldSetMapper<>(); wrapperMapper.setTargetType(WeatherEntity.class); fieldSetMapper = wrapperMapper; } lineMapper.setFieldSetMapper(fieldSetMapper); reader.setLineMapper(lineMapper); reader.setLinesToSkip(1); // 跳過的初始行,用於過濾字段行 reader.open(new ExecutionContext()); return reader; } }
除了按照分隔符,有些文件能夠字段數據的佔位長度來提取數據。按照前面介紹的過程,實際上只要修改LineTokenizer接口便可,框架提供了FixedLengthTokenizer
類:
@Bean public FixedLengthTokenizer fixedLengthTokenizer() { FixedLengthTokenizer tokenizer = new FixedLengthTokenizer(); tokenizer.setNames("ISIN", "Quantity", "Price", "Customer"); //Range用於設定數據的長度。 tokenizer.setColumns(new Range(1-12), new Range(13-15), new Range(16-20), new Range(21-29)); return tokenizer; }
將數據寫入到文件與讀取的過程正好相反:將對象轉換爲字符串。
與LineMapper
相對應的是LineAggregator
,他的功能是將實體轉換爲字符串:
public interface LineAggregator<T> { public String aggregate(T item); }
框架爲LineAggregator
接口提供了一個很是簡單的實現類——PassThroughLineAggregator
,其惟一實現就是使用對象的toString
方法:
public class PassThroughLineAggregator<T> implements LineAggregator<T> { public String aggregate(T item) { return item.toString(); } }
LineAggregator
的另一個實現類是DelimitedLineAggregator
。與PassThroughLineAggregator
簡單直接使用toString
方法不一樣的是,DelimitedLineAggregator
須要一個轉換接口FieldExtractor
:
DelimitedLineAggregator<CustomerCredit> lineAggregator = new DelimitedLineAggregator<>(); lineAggregator.setDelimiter(","); lineAggregator.setFieldExtractor(fieldExtractor);
FieldExtractor
用於實體類到collection
結構的轉換。它能夠和LineTokenizer
進行類比,前者是將實體類轉換爲扁平結構的數據,後者是將String
轉換爲一個FieldSet
結構。
public interface FieldExtractor<T> { Object[] extract(T item); }
框架爲FieldExtractor
接口提供了一個基於反射的實現類BeanWrapperFieldExtractor
,其過程就是將實體對象轉換爲列表:
BeanWrapperFieldExtractor<CustomerCredit> fieldExtractor = new BeanWrapperFieldExtractor<>(); fieldExtractor.setNames(new String[] {"field1", "field2"});
setName
方法用於指定要轉換的field
列表。
文件讀取的邏輯很是簡單:文件存在打開文件並寫入數據,當文件不存在拋出異常。可是寫入文件明顯不能這麼簡單粗暴。新建一個JobInstance
時最直觀的操做是:存在同名文件就拋出異常,不存在則建立文件並寫入數據。可是這樣作顯然有很大的問題,當批處理過程當中出現問題須要restart
,此時並不會從頭開始處理全部的數據,而是要求文件存在並接着繼續寫入。爲了確保這個過程FlatFileItemWriter
默認會在新JobInstance
運行時刪除已有文件,而運行重啓時繼續在文件末尾寫入。FlatFileItemWriter
可使用shouldDeleteIfExists
、appendAllowed
、shouldDeleteIfEmpty
來有針對性的控制文件。
文件寫入主要代碼在org.chenkui.spring.batch.sample.items.FlatFileWriter
:
public class FlatFileWriter { private boolean useBuilder = true; @Bean public ItemWriter<MaxTemperatureEntiry> flatFileWriter() { BeanWrapperFieldExtractor<MaxTemperatureEntiry> fieldExtractor = new BeanWrapperFieldExtractor<>(); fieldExtractor.setNames(new String[] { "siteId", "date", "temperature" }); //設置映射field fieldExtractor.afterPropertiesSet(); //參數檢查 DelimitedLineAggregator<MaxTemperatureEntiry> lineAggregator = new DelimitedLineAggregator<>(); lineAggregator.setDelimiter(","); //設置輸出分隔符 lineAggregator.setFieldExtractor(fieldExtractor); //設置FieldExtractor處理器 FlatFileItemWriter<MaxTemperatureEntiry> fileWriter = new FlatFileItemWriter<>(); fileWriter.setLineAggregator(lineAggregator); fileWriter.setResource(new FileSystemResource("src/main/resources/out-data.csv")); //設置輸出文件位置 fileWriter.setName("outpufData"); if (useBuilder) {//使用builder方式建立 fileWriter = new FlatFileItemWriterBuilder<MaxTemperatureEntiry>().name("outpufData") .resource(new FileSystemResource("src/main/resources/out-data.csv")).lineAggregator(lineAggregator) .build(); } return fileWriter; } }
文件的寫入過程與讀取過程徹底對稱相反:先用FieldExtractor
將對象轉換爲一個collection
結構(列表),而後用lineAggregator
將collection
轉化爲帶分隔符的字符串。
代碼中的測試數據來自數據分析交流項目bi-process-example,是NOAA的2015年全球天氣監控數據。爲了便於源碼存儲進行了大量的刪減,原始數據有百萬條,若有須要使用下列方式下載:
curl -O ftp://ftp.ncdc.noaa.gov/pub/data/ghcn/daily/by_year/2015.csv.gz #數據文件 curl -O ftp://ftp.ncdc.noaa.gov/pub/data/ghcn/daily/ghcnd-stations.txt # 文件結構及類型說明
代碼實現了讀取文件、處理文件、寫入文件的整個過程。處理文件的過程是隻獲取監控的最高溫度信息(Type=TMAX
),其餘都過濾。
本案例的代碼使用org.chenkui.spring.batch.sample.flatfile.FlatFileItemApplication::main
方法運行,使用的是Command Runner的方式執行(運行方式的說明見Item概念及使用代碼的命令行方式運行、Java內嵌運行)。