在 批處理概念 中介紹一個標準的批處理分爲 Job 和 Step。本文將結合代碼介紹在Step
中Reader
、Processor
、Writer
的實際使用。css
Reader
是指從各類各樣的外部輸入中獲取數據,框架爲獲取各類類型的文件已經預約義了常規的Reader
實現類。Reader
經過ItemReader
接口實現:git
public interface ItemReader<T> { T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException; }
read
方法的做用就是讀取一條數據,數據以泛型T的實體結構返回,當read返回null時表示全部數據讀取完畢。返回的數據能夠是任何結構,好比文件中的一行字符串,數據庫的一行數據,或者xml文件中的一系列元素,只要是一個Java對象便可。github
Writer
經過ItemWriter
接口實現:spring
public interface ItemWriter<T> { void write(List<? extends T> items) throws Exception; }
Writer
是Reader
的反向操做,是將數據寫入到特定的數據源中。在Step控制一文已經介紹Writer
是根據chunk
屬性設定的值按列表進行操做的,因此傳入的是一個List
結構。chunk
用於表示批處理的事物分片,所以須要注意的是,在writer
方法中進行完整數據寫入事物操做。例如向數據庫寫入List
中的數據,在寫入完成以後再提交事物。數據庫
不管是讀仍是寫,有時會須要從多個不一樣的來源獲取文件,或者寫入到不一樣的數據源,或者是須要在讀和寫之間處理一些業務。可使用組合模式來實現這個目的:框架
public class CompositeItemWriter<T> implements ItemWriter<T> { ItemWriter<T> itemWriter; public CompositeItemWriter(ItemWriter<T> itemWriter) { this.itemWriter = itemWriter; } public void write(List<? extends T> items) throws Exception { //Add business logic here itemWriter.write(items); } public void setDelegate(ItemWriter<T> itemWriter){ this.itemWriter = itemWriter; } }
除了使用組合模式,直接使用Processor
是一種更優雅的方法。Processor
是Step
中的可選項,可是批處理大部分時候都須要對數據進行處理,所以框架提供了ItemProcessor
接口來知足Processor
過程:ide
public interface ItemProcessor<I, O> { O process(I item) throws Exception; }
Processor
的結構很是簡單也是否易於理解。傳入一個類型I,而後由Processor
處理成爲O。測試
在一個Step中可使用多個Processor
來按照順序處理業務,此時一樣可使用CompositeItem
模式來實現:優化
@Bean public CompositeItemProcessor compositeProcessor() { //建立 CompositeItemProcessor CompositeItemProcessor<Foo,Foobar> compositeProcessor = new CompositeItemProcessor<Foo,Foobar>(); List itemProcessors = new ArrayList(); //添加第一個 Processor itemProcessors.add(new FooTransformer()); //添加第二個 Processor itemProcessors.add(new BarTransformer()); //添加鏈表 compositeProcessor.setDelegates(itemProcessors); return processor; }
在Reader
讀取數據的過程當中,並非全部的數據均可以使用,此時Processor
還能夠用於過濾非必要的數據,同時不會影響Step
的處理過程。只要ItemProcesspr
的實現類在procss
方法中返回null
即表示改行數據被過濾掉了。ui
在Step控制一文中已經提到了ItemStream
。在數據批處理概念中提到過,Spring Batch的每一步都是無狀態的,進而Reader
和Writer
也是無狀態的,這種方式可以很好的隔離每行數據的處理,也能將容錯的範圍收窄到能夠空子的範圍。可是這並不意味着整個批處理的過程當中並不須要控制狀態。例如從數據庫持續讀入或寫入數據,每次Reader
和Writer
都單獨去申請數據源的連接、維護數據源的狀態(打開、關閉等)。所以框架提供了ItemStream
接口來完善這些操做:
public interface ItemStream { void open(ExecutionContext executionContext) throws ItemStreamException; void update(ExecutionContext executionContext) throws ItemStreamException; void close() throws ItemStreamException; }
在使用Spring Batch以前須要初始化他的元數據存儲(Meta-Data Schema),也就是要將須要用到的表導入到對應的數據庫中。固然,Spring Batch支持不使用任何持久化數據庫,僅僅將數據放到內存中,不設置DataSource
便可。
Spring Batch相關的工做須要使用序列SEQUENCE
:
CREATE SEQUENCE BATCH_STEP_EXECUTION_SEQ; CREATE SEQUENCE BATCH_JOB_EXECUTION_SEQ; CREATE SEQUENCE BATCH_JOB_SEQ;
有些數據庫不支持SEQUENCE
,能夠經過表代理,好比在MySql(InnoDB數據庫)中:
CREATE TABLE BATCH_STEP_EXECUTION_SEQ (ID BIGINT NOT NULL); INSERT INTO BATCH_STEP_EXECUTION_SEQ values(0); CREATE TABLE BATCH_JOB_EXECUTION_SEQ (ID BIGINT NOT NULL); INSERT INTO BATCH_JOB_EXECUTION_SEQ values(0); CREATE TABLE BATCH_JOB_SEQ (ID BIGINT NOT NULL); INSERT INTO BATCH_JOB_SEQ values(0);
某些表中都有Version
字段。由於Spring的更新策略是樂觀鎖,所以在進行數據更新以後都會對錶的Version
字段進行+1處理。在內存與數據庫交互的過程當中,會使用採用getVersion、increaseVersion(+1)、updateDataAndVersion的過程,若是在update
的時候發現Version不是預計的數值(+1),則會拋出OptimisticLockingFailureException
的異常。當同一個Job
在進羣中不一樣服務上執行時,須要注意這個問題。
BATCH_JOB_INSTANCE
用於記錄JobInstance,在數據批處理概念中介紹了他的工做方式,其結構爲:
CREATE TABLE BATCH_JOB_INSTANCE ( JOB_INSTANCE_ID BIGINT PRIMARY KEY , VERSION BIGINT, JOB_NAME VARCHAR(100) NOT NULL , JOB_KEY VARCHAR(2500) );
字段 | 說明 |
---|---|
JOB_INSTANCE_ID | 主鍵,主鍵與單個JobInstance 相關。當獲取到某個JobInstance 實例後,經過getId 方法能夠獲取到此數據 |
VERSION | |
JOB_NAME | Job的名稱,用於標記運行的Job,在建立Job時候指定 |
JOB_KEY | JobParameters的序列化數值。在數據批處理概念中介紹了一個JobInstance 至關於Job+JobParameters。他用於標記同一個Job 不一樣的實例 |
BATCH_JOB_EXECUTION_PARAMS
對應的是JobParameters
對象。其核心功能是存儲Key-Value結構的各類狀態數值。字段中IDENTIFYING=true
用於標記那些運行過程當中必須的數據(能夠理解是框架須要用到的數據),爲了存儲key-value結構該表一個列數據格式:
CREATE TABLE BATCH_JOB_EXECUTION_PARAMS ( JOB_EXECUTION_ID BIGINT NOT NULL , TYPE_CD VARCHAR(6) NOT NULL , KEY_NAME VARCHAR(100) NOT NULL , STRING_VAL VARCHAR(250) , DATE_VAL DATETIME DEFAULT NULL , LONG_VAL BIGINT , DOUBLE_VAL DOUBLE PRECISION , IDENTIFYING CHAR(1) NOT NULL , constraint JOB_EXEC_PARAMS_FK foreign key (JOB_EXECUTION_ID) references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID) );
字段 | 說明 |
---|---|
JOB_EXECUTION_ID | 與BATCH_JOB_EXECUTION表關聯的外鍵,詳見數據批處理概念中Job、JobInstance、JobExecute的關係 |
TYPE_CD | 用於標記數據的對象類型,例如 string、date、long、double,非空 |
KEY_NAME | key的值 |
STRING_VAL | string類型的數值 |
DATE_VAL | date類型的數值 |
LONG_VAL | long類型的數值 |
DOUBLE_VAL | double類型的數值 |
IDENTIFYING | 標記這對key-valuse是否來自於JobInstace自身 |
關聯JobExecution
,每當運行一個Job
都會產生一個新的JobExecution
,對應的在表中都會新增一行數據。
CREATE TABLE BATCH_JOB_EXECUTION ( JOB_EXECUTION_ID BIGINT PRIMARY KEY , VERSION BIGINT, JOB_INSTANCE_ID BIGINT NOT NULL, CREATE_TIME TIMESTAMP NOT NULL, START_TIME TIMESTAMP DEFAULT NULL, END_TIME TIMESTAMP DEFAULT NULL, STATUS VARCHAR(10), EXIT_CODE VARCHAR(20), EXIT_MESSAGE VARCHAR(2500), LAST_UPDATED TIMESTAMP, JOB_CONFIGURATION_LOCATION VARCHAR(2500) NULL, constraint JOB_INSTANCE_EXECUTION_FK foreign key (JOB_INSTANCE_ID) references BATCH_JOB_INSTANCE(JOB_INSTANCE_ID) ) ;
字段 | 說明 |
---|---|
JOB_EXECUTION_ID | JobExecution的主鍵,JobExecution::getId方法能夠獲取到該值 |
VERSION | |
JOB_INSTANCE_ID | 關聯到JobInstace的外鍵,詳見數據批處理概念中Job、JobInstance、JobExecute的關係 |
CREATE_TIME | 建立時間戳 |
START_TIME | 開始時間戳 |
END_TIME | 結束時間戳,不管成功或失敗都會更新這一項數據。若是某行數據該值爲空表示運行期間出現錯誤,而且框架沒法更新該值 |
STATUS | JobExecute的運行狀態:COMPLETED、STARTED或者其餘狀態。此數值對應Java中BatchStatus枚舉值 |
EXIT_CODE | JobExecute執行完畢以後的退出返回值 |
EXIT_MESSAGE | JobExecute退出的詳細內容,若是是異常退出可能會包括異常堆棧的內容 |
LAST_UPDATED | 最後一次更新的時間戳 |
該表對應的是StepExecution
,其結構和BATCH_JOB_EXECUTION
基本類似,只是對應的對象是Step
,增長了與之相對的一些字段數值:
CREATE TABLE BATCH_STEP_EXECUTION ( STEP_EXECUTION_ID BIGINT PRIMARY KEY , VERSION BIGINT NOT NULL, STEP_NAME VARCHAR(100) NOT NULL, JOB_EXECUTION_ID BIGINT NOT NULL, START_TIME TIMESTAMP NOT NULL , END_TIME TIMESTAMP DEFAULT NULL, STATUS VARCHAR(10), COMMIT_COUNT BIGINT , READ_COUNT BIGINT , FILTER_COUNT BIGINT , WRITE_COUNT BIGINT , READ_SKIP_COUNT BIGINT , WRITE_SKIP_COUNT BIGINT , PROCESS_SKIP_COUNT BIGINT , ROLLBACK_COUNT BIGINT , EXIT_CODE VARCHAR(20) , EXIT_MESSAGE VARCHAR(2500) , LAST_UPDATED TIMESTAMP, constraint JOB_EXECUTION_STEP_FK foreign key (JOB_EXECUTION_ID) references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID) ) ;
未填入內容部分見BATCH_JOB_EXECUTION
說明。
字段 | 說明 |
---|---|
STEP_EXECUTION_ID | StepExecute對應的主鍵 |
VERSION | |
STEP_NAME | Step名稱 |
JOB_EXECUTION_ID | 關聯到BATCH_JOB_EXECUTION表的外鍵,標記該StepExecute所屬的JobExecute |
START_TIME | |
END_TIME | |
STATUS | |
COMMIT_COUNT | 執行過程當中,事物提交的次數,該值與數據的規模以及chunk的設置有關 |
READ_COUNT | 讀取數據的次數 |
FILTER_COUNT | Processor中過濾記錄的次數 |
WRITE_COUNT | 吸入數據的次數 |
READ_SKIP_COUNT | 讀數據的跳過次數 |
WRITE_SKIP_COUNT | 寫數據的跳過次數 |
PROCESS_SKIP_COUNT | Processor跳過的次數 |
ROLLBACK_COUNT | 回滾的次數 |
EXIT_CODE | |
EXIT_MESSAGE | |
LAST_UPDATED |
該表會記錄全部與Job
相關的ExecutionContext
信息。每一個ExecutionContext
都對應一個JobExecution
,在運行的過程當中它包含了全部Job
範疇的狀態數據,這些數據在執行失敗後對於後續處理有中重大意義。
CREATE TABLE BATCH_JOB_EXECUTION_CONTEXT ( JOB_EXECUTION_ID BIGINT PRIMARY KEY, SHORT_CONTEXT VARCHAR(2500) NOT NULL, SERIALIZED_CONTEXT CLOB, constraint JOB_EXEC_CTX_FK foreign key (JOB_EXECUTION_ID) references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID) ) ;
字段 | 說明 |
---|---|
JOB_EXECUTION_ID | 關聯到JobExecution的外鍵,創建JobExecution和ExecutionContext的關係。 |
SHORT_CONTEXT | 標記SERIALIZED_CONTEXT的版本號 |
SERIALIZED_CONTEXT | 序列化的ExecutionContext |
Step
中ExecutionContext
相關的數據表,結構與BATCH_JOB_EXECUTION_CONTEXT
徹底同樣。
上面的全部建表語句都沒有提供索引,可是並不表明索引沒有價值。當感受到SQL語句的執行有效率問題時候,能夠增長索引。
索引帶來的價值取決於SQL查詢的頻率以及關聯關係,下面是Spring Batch框架在運行過程當中會用到的一些查詢條件語句,用於參考優化索引:
表 | Where條件 | 執行頻率 |
---|---|---|
BATCH_JOB_INSTANCE | JOB_NAME = ? and JOB_KEY = ? | 每次Job啓動執時 |
BATCH_JOB_EXECUTION | JOB_INSTANCE_ID = ? | 每次Job重啓時 |
BATCH_EXECUTION_CONTEXT | EXECUTION_ID = ? and KEY_NAME = ? | 視chunk的大小而定 |
BATCH_STEP_EXECUTION | VERSION = ? | 視chunk的大小而定 |
BATCH_STEP_EXECUTION | STEP_NAME = ? and JOB_EXECUTION_ID = ? | 每個Step執行以前 |
下面是Spring Batch一些簡單的應用,源碼在下列地址的simple工程:
Spring Batch提供了2種執行方式:命令行方式或Java內嵌方式。命令行方式是直到須要執行批處理任務的時候才啓動程序,內嵌方式是結合Web工程或其餘外部化框架來使用。2者最大的差異就是是否直接向IoCs注入一個Job
實例。
兩種方式的基本配置都是同樣的,經過Reader
、Processor
、Writer
來組裝一個Step
。代碼中Item
並不涉及文件或數據庫的操做,只是簡單的模擬數據讀取、處理、寫入的過程。實體Record
和Msg
用於模擬數據轉換,基本配置以下:
public class BatchDefaultConfig { @Bean //配置Step public Step simpleStep(StepBuilderFactory builder, ItemReader<Record> reader, ItemProcessor<Record, Msg> processor, ItemWriter<Msg> writer) { return builder.get("SimpleStep").<Record, Msg>chunk(10).reader(reader).processor(processor).writer(writer) .build(); } @Bean //配置 Reader public ItemReader<Record> reader() { return new ItemReader<Record>() { private int count = 0; public Record read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException { return ++this.count < 100 ? new Record().setId(this.count).setMsg("Read Number:" + this.count) : null; } }; } @Bean //配置 Processor public ItemProcessor<Record, Msg> processor() { return new ItemProcessor<Record, Msg>() { public Msg process(Record item) throws Exception { return new Msg("MSG GET INFO = " + item.getMsg()); } }; } @Bean //配置 Writer public ItemWriter<Msg> writer() { return new ItemWriter<Msg>() { private int batchCount = 0; public void write(List<? extends Msg> items) throws Exception { System.out.println("Batch Count : " + ++batchCount + ". Data:"); for (Msg msg : items) { System.out.println(msg.getInfo()); } } }; } }
有了基本配置以後,命令行運行的方式僅僅是向容器添加一個Job
:
@Configuration //導入依賴配置 @Import({ BatchDefaultConfig.class }) public class BatchCommondConfig { @Bean public Job simpleJob(Step step, JobBuilderFactory builder) { return builder.get("SimpleJob").start(step).build(); //向容器返回一個Job的Bean } }
而後啓動Spring Framework則會自動啓用Command Runner運行方式運行——先調用SpringApplication::callRunner
方法,而後使用JobLauncherCommandLineRunner::execute
運行:
public class CommondSample { public static void main(String[] args) throws DuplicateJobException { //模擬測試參數, 這些參數值在執行Java時從外部傳入的,好比-Dkey=value String[] argsExt = new String[2]; argsExt[0] = "BuilderParam1=Value1"; argsExt[1] = "BuilderParam2=Value2"; //運行Spring Framework SpringApplication.run(CommondSample.class, argsExt); } }
啓用以後觀察數據庫已經發生了變動。使用命令行須要經過 Java運行參數(-Dkey=value)傳遞JobParameters
的數據,上面的代碼模擬實現了相關的過程。
Java內嵌的方式主要是用於搭配外部工程化使用,好比使用Web框架或則統一調度平臺管之類的結構化框架來統一管理批處理任務。與命令行執行最大的區別就是不向容器注入Job
:
@Configuration //導入進出配置 @Import({BatchDefaultConfig.class}) public class BatchOperatoConfig { @Bean //返回JobFactory public JobFactory simpleJob(Step step, JobBuilderFactory builder) { SimpleJobFactory sampleJobFactory = new SimpleJobFactory(); sampleJobFactory.setJob(builder.get("SimpleJob").start(step).build()); return sampleJobFactory; } }
配置代碼向容器添加了一個JobFactory
的實現類,JobFactory
的兩個接口一個是獲取Job
一個是獲取Job
的名稱,SimpleJobFactory
實現了JobFactory
:
public class SimpleJobFactory implements JobFactory { private Job job; public void setJob(Job job) { this.job = job; } @Override public Job createJob() { return job; } @Override public String getJobName() { return job.getName(); } }
最後經過SimpleJobFactory
來啓動一個Job
:
@SpringBootApplication @EnableBatchProcessing @EnableScheduling public class OperatorSample { public static void main(String[] args) throws DuplicateJobException { new SuspendThread().run(); //掛起系統一直運行 ConfigurableApplicationContext ctx = SpringApplication.run(OperatorSample.class); Cron cron = ctx.getBean(Cron.class); cron.register(); //註冊JobFactory cron.runJobLaunch(); } } @Service class Cron { @Autowired JobLauncher jobLauncher; @Autowired private JobOperator jobOperator; @Autowired private JobRegistry jobRegistry; @Autowired private JobFactory jobFactory; //註冊JobFactory void register() throws DuplicateJobException { jobRegistry.register(jobFactory); } //使用JobLaunch執行 void runJobLaunch() { Map<String, JobParameter> map = new HashMap<>(); map.put("Builder", new JobParameter("1")); map.put("Timer", new JobParameter("2")); jobLauncher.run(jobFactory.createJob(), new JobParameters(map)); } @Scheduled(cron = "30 * * * * ? ") void task1() { System.out.println("1"); runOperator(); } //定時任務使用 JobOperator執行 private void runOperator() { jobOperator.start("SimpleJob", "Builder=1,Timer=2"); } }
這裏使用了2種執行方式:JobLauncher
和JobOperator
。JobLauncher
簡單明瞭的啓動一個批處理任務。而JobOperator
擴展了一些用於Job
管理的接口方法,觀察JobOperator
的源碼能夠發現它提供了獲取ExecuteContext
、檢查JobInstance
等功能,若是須要定製開發一個基於Web或者JMX管理批處理任務的系統,JobOperator
更合適。JobOperator
的第二個參數用於傳遞JobParameters
,等號兩端分別是key
和value
,逗號用於分割多行數據。
在Job配置與運行說起過一個JobInstance
至關於Job
+JobParameters
,所以雖然上面的代碼使用了兩種不一樣的運行方式,可是Job
和JobParameters
是同樣的。在運行被定時任務包裹的runOperator
方法時,會一直拋出JobInstanceAlreadyExistsException
異常,由於同一個實例不能運行2次。若是運行失敗可使用對應的restart
方法。
後續會介紹各類Reader
和Writer
的使用。