Spring Batch(4)——Item概念及使用代碼

批處理概念 中介紹一個標準的批處理分爲 JobStep。本文將結合代碼介紹在StepReaderProcessorWriter的實際使用。css

Reader

Reader是指從各類各樣的外部輸入中獲取數據,框架爲獲取各類類型的文件已經預約義了常規的Reader實現類。Reader經過ItemReader接口實現:git

public interface ItemReader<T> {
    T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException;
}

read方法的做用就是讀取一條數據,數據以泛型T的實體結構返回,當read返回null時表示全部數據讀取完畢。返回的數據能夠是任何結構,好比文件中的一行字符串,數據庫的一行數據,或者xml文件中的一系列元素,只要是一個Java對象便可。github

Writer

Writer經過ItemWriter接口實現:spring

public interface ItemWriter<T> {
    void write(List<? extends T> items) throws Exception;
}

WriterReader的反向操做,是將數據寫入到特定的數據源中。在Step控制一文已經介紹Writer是根據chunk屬性設定的值按列表進行操做的,因此傳入的是一個List結構。chunk用於表示批處理的事物分片,所以須要注意的是,在writer方法中進行完整數據寫入事物操做。例如向數據庫寫入List中的數據,在寫入完成以後再提交事物。數據庫

讀寫的組合模式

不管是讀仍是寫,有時會須要從多個不一樣的來源獲取文件,或者寫入到不一樣的數據源,或者是須要在讀和寫之間處理一些業務。可使用組合模式來實現這個目的:框架

public class CompositeItemWriter<T> implements ItemWriter<T> {
    ItemWriter<T> itemWriter;
    public CompositeItemWriter(ItemWriter<T> itemWriter) {
        this.itemWriter = itemWriter;
    }

    public void write(List<? extends T> items) throws Exception {
        //Add business logic here
       itemWriter.write(items);
    }

    public void setDelegate(ItemWriter<T> itemWriter){
        this.itemWriter = itemWriter;
    }
}

Processor

除了使用組合模式,直接使用Processor是一種更優雅的方法。ProcessorStep中的可選項,可是批處理大部分時候都須要對數據進行處理,所以框架提供了ItemProcessor接口來知足Processor過程:ide

public interface ItemProcessor<I, O> {
    O process(I item) throws Exception;
}

Processor的結構很是簡單也是否易於理解。傳入一個類型I,而後由Processor處理成爲O。測試

Processor鏈

在一個Step中可使用多個Processor來按照順序處理業務,此時一樣可使用CompositeItem模式來實現:優化

@Bean
public CompositeItemProcessor compositeProcessor() {
    //建立 CompositeItemProcessor
    CompositeItemProcessor<Foo,Foobar> compositeProcessor = new CompositeItemProcessor<Foo,Foobar>();
    List itemProcessors = new ArrayList();
    //添加第一個 Processor
    itemProcessors.add(new FooTransformer());
    //添加第二個 Processor
    itemProcessors.add(new BarTransformer());
    //添加鏈表
    compositeProcessor.setDelegates(itemProcessors);
    return processor;
}

過濾記錄

Reader讀取數據的過程當中,並非全部的數據均可以使用,此時Processor還能夠用於過濾非必要的數據,同時不會影響Step的處理過程。只要ItemProcesspr的實現類在procss方法中返回null即表示改行數據被過濾掉了。ui

ItemStream

Step控制一文中已經提到了ItemStream。在數據批處理概念中提到過,Spring Batch的每一步都是無狀態的,進而ReaderWriter也是無狀態的,這種方式可以很好的隔離每行數據的處理,也能將容錯的範圍收窄到能夠空子的範圍。可是這並不意味着整個批處理的過程當中並不須要控制狀態。例如從數據庫持續讀入或寫入數據,每次ReaderWriter都單獨去申請數據源的連接、維護數據源的狀態(打開、關閉等)。所以框架提供了ItemStream接口來完善這些操做:

public interface ItemStream {
    void open(ExecutionContext executionContext) throws ItemStreamException;
    void update(ExecutionContext executionContext) throws ItemStreamException;
    void close() throws ItemStreamException;
}

持久化數據

在使用Spring Batch以前須要初始化他的元數據存儲(Meta-Data Schema),也就是要將須要用到的表導入到對應的數據庫中。固然,Spring Batch支持不使用任何持久化數據庫,僅僅將數據放到內存中,不設置DataSource便可。

初始化序列

Spring Batch相關的工做須要使用序列SEQUENCE

CREATE SEQUENCE BATCH_STEP_EXECUTION_SEQ;
CREATE SEQUENCE BATCH_JOB_EXECUTION_SEQ;
CREATE SEQUENCE BATCH_JOB_SEQ;

有些數據庫不支持SEQUENCE,能夠經過表代理,好比在MySql(InnoDB數據庫)中:

CREATE TABLE BATCH_STEP_EXECUTION_SEQ (ID BIGINT NOT NULL);
INSERT INTO BATCH_STEP_EXECUTION_SEQ values(0);
CREATE TABLE BATCH_JOB_EXECUTION_SEQ (ID BIGINT NOT NULL);
INSERT INTO BATCH_JOB_EXECUTION_SEQ values(0);
CREATE TABLE BATCH_JOB_SEQ (ID BIGINT NOT NULL);
INSERT INTO BATCH_JOB_SEQ values(0);

關於Version字段

某些表中都有Version字段。由於Spring的更新策略是樂觀鎖,所以在進行數據更新以後都會對錶的Version字段進行+1處理。在內存與數據庫交互的過程當中,會使用採用getVersionincreaseVersion(+1)updateDataAndVersion的過程,若是在update的時候發現Version不是預計的數值(+1),則會拋出OptimisticLockingFailureException的異常。當同一個Job在進羣中不一樣服務上執行時,須要注意這個問題。

BATCH_JOB_INSTANCE

BATCH_JOB_INSTANCE用於記錄JobInstance,在數據批處理概念中介紹了他的工做方式,其結構爲:

CREATE TABLE BATCH_JOB_INSTANCE  (
  JOB_INSTANCE_ID BIGINT  PRIMARY KEY ,
  VERSION BIGINT,
  JOB_NAME VARCHAR(100) NOT NULL ,
  JOB_KEY VARCHAR(2500)
);
字段 說明
JOB_INSTANCE_ID 主鍵,主鍵與單個JobInstance相關。當獲取到某個JobInstance實例後,經過getId方法能夠獲取到此數據
VERSION
JOB_NAME Job的名稱,用於標記運行的Job,在建立Job時候指定
JOB_KEY JobParameters的序列化數值。在數據批處理概念中介紹了一個JobInstance至關於Job+JobParameters。他用於標記同一個Job不一樣的實例

BATCH_JOB_EXECUTION_PARAMS

BATCH_JOB_EXECUTION_PARAMS對應的是JobParameters對象。其核心功能是存儲Key-Value結構的各類狀態數值。字段中IDENTIFYING=true用於標記那些運行過程當中必須的數據(能夠理解是框架須要用到的數據),爲了存儲key-value結構該表一個列數據格式:

CREATE TABLE BATCH_JOB_EXECUTION_PARAMS  (
	JOB_EXECUTION_ID BIGINT NOT NULL ,
	TYPE_CD VARCHAR(6) NOT NULL ,
	KEY_NAME VARCHAR(100) NOT NULL ,
	STRING_VAL VARCHAR(250) ,
	DATE_VAL DATETIME DEFAULT NULL ,
	LONG_VAL BIGINT ,
	DOUBLE_VAL DOUBLE PRECISION ,
	IDENTIFYING CHAR(1) NOT NULL ,
	constraint JOB_EXEC_PARAMS_FK foreign key (JOB_EXECUTION_ID)
	references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
);
字段 說明
JOB_EXECUTION_ID BATCH_JOB_EXECUTION表關聯的外鍵,詳見數據批處理概念JobJobInstanceJobExecute的關係
TYPE_CD 用於標記數據的對象類型,例如 stringdatelongdouble,非空
KEY_NAME key的值
STRING_VAL string類型的數值
DATE_VAL date類型的數值
LONG_VAL long類型的數值
DOUBLE_VAL double類型的數值
IDENTIFYING 標記這對key-valuse是否來自於JobInstace自身

BATCH_JOB_EXECUTION

關聯JobExecution,每當運行一個Job都會產生一個新的JobExecution,對應的在表中都會新增一行數據。

CREATE TABLE BATCH_JOB_EXECUTION  (
  JOB_EXECUTION_ID BIGINT  PRIMARY KEY ,
  VERSION BIGINT,
  JOB_INSTANCE_ID BIGINT NOT NULL,
  CREATE_TIME TIMESTAMP NOT NULL,
  START_TIME TIMESTAMP DEFAULT NULL,
  END_TIME TIMESTAMP DEFAULT NULL,
  STATUS VARCHAR(10),
  EXIT_CODE VARCHAR(20),
  EXIT_MESSAGE VARCHAR(2500),
  LAST_UPDATED TIMESTAMP,
  JOB_CONFIGURATION_LOCATION VARCHAR(2500) NULL,
  constraint JOB_INSTANCE_EXECUTION_FK foreign key (JOB_INSTANCE_ID)
  references BATCH_JOB_INSTANCE(JOB_INSTANCE_ID)
) ;
字段 說明
JOB_EXECUTION_ID JobExecution的主鍵,JobExecution::getId方法能夠獲取到該值
VERSION
JOB_INSTANCE_ID 關聯到JobInstace的外鍵,詳見數據批處理概念JobJobInstanceJobExecute的關係
CREATE_TIME 建立時間戳
START_TIME 開始時間戳
END_TIME 結束時間戳,不管成功或失敗都會更新這一項數據。若是某行數據該值爲空表示運行期間出現錯誤,而且框架沒法更新該值
STATUS JobExecute的運行狀態:COMPLETEDSTARTED或者其餘狀態。此數值對應Java中BatchStatus枚舉值
EXIT_CODE JobExecute執行完畢以後的退出返回值
EXIT_MESSAGE JobExecute退出的詳細內容,若是是異常退出可能會包括異常堆棧的內容
LAST_UPDATED 最後一次更新的時間戳

BATCH_STEP_EXECUTION

該表對應的是StepExecution,其結構和BATCH_JOB_EXECUTION基本類似,只是對應的對象是Step,增長了與之相對的一些字段數值:

CREATE TABLE BATCH_STEP_EXECUTION  (
  STEP_EXECUTION_ID BIGINT  PRIMARY KEY ,
  VERSION BIGINT NOT NULL,
  STEP_NAME VARCHAR(100) NOT NULL,
  JOB_EXECUTION_ID BIGINT NOT NULL,
  START_TIME TIMESTAMP NOT NULL ,
  END_TIME TIMESTAMP DEFAULT NULL,
  STATUS VARCHAR(10),
  COMMIT_COUNT BIGINT ,
  READ_COUNT BIGINT ,
  FILTER_COUNT BIGINT ,
  WRITE_COUNT BIGINT ,
  READ_SKIP_COUNT BIGINT ,
  WRITE_SKIP_COUNT BIGINT ,
  PROCESS_SKIP_COUNT BIGINT ,
  ROLLBACK_COUNT BIGINT ,
  EXIT_CODE VARCHAR(20) ,
  EXIT_MESSAGE VARCHAR(2500) ,
  LAST_UPDATED TIMESTAMP,
  constraint JOB_EXECUTION_STEP_FK foreign key (JOB_EXECUTION_ID)
  references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ;

未填入內容部分見BATCH_JOB_EXECUTION說明。

字段 說明
STEP_EXECUTION_ID StepExecute對應的主鍵
VERSION
STEP_NAME Step名稱
JOB_EXECUTION_ID 關聯到BATCH_JOB_EXECUTION表的外鍵,標記該StepExecute所屬的JobExecute
START_TIME
END_TIME
STATUS
COMMIT_COUNT 執行過程當中,事物提交的次數,該值與數據的規模以及chunk的設置有關
READ_COUNT 讀取數據的次數
FILTER_COUNT Processor中過濾記錄的次數
WRITE_COUNT 吸入數據的次數
READ_SKIP_COUNT 讀數據的跳過次數
WRITE_SKIP_COUNT 寫數據的跳過次數
PROCESS_SKIP_COUNT Processor跳過的次數
ROLLBACK_COUNT 回滾的次數
EXIT_CODE
EXIT_MESSAGE
LAST_UPDATED

BATCH_JOB_EXECUTION_CONTEXT

該表會記錄全部與Job相關的ExecutionContext信息。每一個ExecutionContext都對應一個JobExecution,在運行的過程當中它包含了全部Job範疇的狀態數據,這些數據在執行失敗後對於後續處理有中重大意義。

CREATE TABLE BATCH_JOB_EXECUTION_CONTEXT  (
  JOB_EXECUTION_ID BIGINT PRIMARY KEY,
  SHORT_CONTEXT VARCHAR(2500) NOT NULL,
  SERIALIZED_CONTEXT CLOB,
  constraint JOB_EXEC_CTX_FK foreign key (JOB_EXECUTION_ID)
  references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ;
字段 說明
JOB_EXECUTION_ID 關聯到JobExecution的外鍵,創建JobExecutionExecutionContext的關係。
SHORT_CONTEXT 標記SERIALIZED_CONTEXT的版本號
SERIALIZED_CONTEXT 序列化的ExecutionContext

BATCH_STEP_EXECUTION_CONTEXT

StepExecutionContext相關的數據表,結構與BATCH_JOB_EXECUTION_CONTEXT徹底同樣。

表索引建議

上面的全部建表語句都沒有提供索引,可是並不表明索引沒有價值。當感受到SQL語句的執行有效率問題時候,能夠增長索引。

索引帶來的價值取決於SQL查詢的頻率以及關聯關係,下面是Spring Batch框架在運行過程當中會用到的一些查詢條件語句,用於參考優化索引:

Where條件 執行頻率
BATCH_JOB_INSTANCE JOB_NAME = ? and JOB_KEY = ? 每次Job啓動執時
BATCH_JOB_EXECUTION JOB_INSTANCE_ID = ? 每次Job重啓時
BATCH_EXECUTION_CONTEXT EXECUTION_ID = ? and KEY_NAME = ? chunk的大小而定
BATCH_STEP_EXECUTION VERSION = ? chunk的大小而定
BATCH_STEP_EXECUTION STEP_NAME = ? and JOB_EXECUTION_ID = ? 每個Step執行以前

使用案例

下面是Spring Batch一些簡單的應用,源碼在下列地址的simple工程:

Spring Batch提供了2種執行方式:命令行方式或Java內嵌方式。命令行方式是直到須要執行批處理任務的時候才啓動程序,內嵌方式是結合Web工程或其餘外部化框架來使用。2者最大的差異就是是否直接向IoCs注入一個Job實例。

通用基本配置

兩種方式的基本配置都是同樣的,經過ReaderProcessorWriter來組裝一個Step。代碼中Item並不涉及文件或數據庫的操做,只是簡單的模擬數據讀取、處理、寫入的過程。實體RecordMsg用於模擬數據轉換,基本配置以下:

public class BatchDefaultConfig {
	@Bean
	//配置Step
	public Step simpleStep(StepBuilderFactory builder, ItemReader<Record> reader, ItemProcessor<Record, Msg> processor,
			ItemWriter<Msg> writer) {
		return builder.get("SimpleStep").<Record, Msg>chunk(10).reader(reader).processor(processor).writer(writer)
				.build();
	}

	@Bean
	//配置 Reader
	public ItemReader<Record> reader() {
		return new ItemReader<Record>() {
			private int count = 0;
			public Record read()
					throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
				return ++this.count < 100 ? new Record().setId(this.count).setMsg("Read Number:" + this.count) : null;
			}
		};
	}

	@Bean
	//配置 Processor
	public ItemProcessor<Record, Msg> processor() {
		return new ItemProcessor<Record, Msg>() {
			public Msg process(Record item) throws Exception {
				return new Msg("MSG GET INFO = " + item.getMsg());
			}
		};
	}

	@Bean
	//配置 Writer
	public ItemWriter<Msg> writer() {
		return new ItemWriter<Msg>() {
			private int batchCount = 0;
			public void write(List<? extends Msg> items) throws Exception {
				System.out.println("Batch Count : " + ++batchCount + ". Data:");
				for (Msg msg : items) {
					System.out.println(msg.getInfo());
				}
			}
		};
	}
}

命令行方式運行

有了基本配置以後,命令行運行的方式僅僅是向容器添加一個Job

@Configuration
//導入依賴配置
@Import({ BatchDefaultConfig.class })
public class BatchCommondConfig {
	@Bean
	public Job simpleJob(Step step, JobBuilderFactory builder) {
		return builder.get("SimpleJob").start(step).build(); //向容器返回一個Job的Bean
	}
}

而後啓動Spring Framework則會自動啓用Command Runner運行方式運行——先調用SpringApplication::callRunner方法,而後使用JobLauncherCommandLineRunner::execute運行:

public class CommondSample {
	public static void main(String[] args) throws DuplicateJobException {
		//模擬測試參數, 這些參數值在執行Java時從外部傳入的,好比-Dkey=value
		String[] argsExt = new String[2];
		argsExt[0] = "BuilderParam1=Value1";
		argsExt[1] = "BuilderParam2=Value2";
		//運行Spring Framework
		SpringApplication.run(CommondSample.class, argsExt);
	}
}

啓用以後觀察數據庫已經發生了變動。使用命令行須要經過 Java運行參數(-Dkey=value)傳遞JobParameters的數據,上面的代碼模擬實現了相關的過程。

Java內嵌運行

Java內嵌的方式主要是用於搭配外部工程化使用,好比使用Web框架或則統一調度平臺管之類的結構化框架來統一管理批處理任務。與命令行執行最大的區別就是不向容器注入Job

@Configuration
//導入進出配置 
@Import({BatchDefaultConfig.class})
public class BatchOperatoConfig {
	@Bean
	//返回JobFactory
	public JobFactory simpleJob(Step step, JobBuilderFactory builder) {
		SimpleJobFactory sampleJobFactory = new SimpleJobFactory();
		sampleJobFactory.setJob(builder.get("SimpleJob").start(step).build());
		return sampleJobFactory;
	}
}

配置代碼向容器添加了一個JobFactory的實現類,JobFactory的兩個接口一個是獲取Job一個是獲取Job的名稱,SimpleJobFactory實現了JobFactory

public class SimpleJobFactory implements JobFactory {
	private Job job;
	public void setJob(Job job) {
		this.job = job;
	}
	@Override
	public Job createJob() {
		return job;
	}
	@Override
	public String getJobName() {
		return job.getName();
	}
}

最後經過SimpleJobFactory來啓動一個Job

@SpringBootApplication
@EnableBatchProcessing
@EnableScheduling
public class OperatorSample {
	public static void main(String[] args) throws DuplicateJobException {
		new SuspendThread().run(); //掛起系統一直運行
		ConfigurableApplicationContext ctx = SpringApplication.run(OperatorSample.class);
		Cron cron = ctx.getBean(Cron.class);
		cron.register(); //註冊JobFactory
		cron.runJobLaunch();
	}
}

@Service
class Cron {
	@Autowired
	JobLauncher jobLauncher;

	@Autowired
	private JobOperator jobOperator;

	@Autowired
	private JobRegistry jobRegistry;

	@Autowired
	private JobFactory jobFactory;

	//註冊JobFactory
	void register() throws DuplicateJobException {
		jobRegistry.register(jobFactory);
	}

	//使用JobLaunch執行
	void runJobLaunch() {
		Map<String, JobParameter> map = new HashMap<>();
		map.put("Builder", new JobParameter("1"));
		map.put("Timer", new JobParameter("2"));
		jobLauncher.run(jobFactory.createJob(), new JobParameters(map));
	}

	@Scheduled(cron = "30 * * * * ? ")
	void task1() {
		System.out.println("1");
		runOperator();
	}

	//定時任務使用 JobOperator執行
	private void runOperator() {
		jobOperator.start("SimpleJob", "Builder=1,Timer=2");
	}
}

這裏使用了2種執行方式:JobLauncherJobOperatorJobLauncher簡單明瞭的啓動一個批處理任務。而JobOperator擴展了一些用於Job管理的接口方法,觀察JobOperator的源碼能夠發現它提供了獲取ExecuteContext、檢查JobInstance等功能,若是須要定製開發一個基於Web或者JMX管理批處理任務的系統,JobOperator更合適。JobOperator的第二個參數用於傳遞JobParameters,等號兩端分別是keyvalue,逗號用於分割多行數據。

Job配置與運行說起過一個JobInstance至關於Job+JobParameters,所以雖然上面的代碼使用了兩種不一樣的運行方式,可是JobJobParameters是同樣的。在運行被定時任務包裹的runOperator方法時,會一直拋出JobInstanceAlreadyExistsException異常,由於同一個實例不能運行2次。若是運行失敗可使用對應的restart方法。

後續會介紹各類ReaderWriter的使用。

相關文章
相關標籤/搜索