Spring Batch_JOB執行流程分析

Spring Batch_JOB執行流程分析java

基於如下job的配置,spring

<batch:job id="addPeopleDescJob">
    <batch:step id="addDescStep" parent="abstractStep">
        <batch:tasklet>
            <batch:chunk reader="peopleAddDescReader" processor="allowSkipProcessor"
                writer="addDescPeopleWriter" commit-interval="2">
            </batch:chunk>
        </batch:tasklet>
    </batch:step>
</batch:job>

debug代碼ide

JobExecution result = launcher.run(job,
		jobParametersBuilder.toJobParameters());

這是啓動job的方法,以下是方法的具體實現:post

SimpleJobLauncher.java run方法的具體實現(刪除了部分代碼)ui

@Override
public JobExecution run(final Job job, final JobParameters jobParameters)
		throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException,
		JobParametersInvalidException {
	final JobExecution jobExecution;
	jobExecution = jobRepository.createJobExecution(job.getName(), jobParameters);
	try {
		taskExecutor.execute(new Runnable() {
			@Override
			public void run() {
				try {
					job.execute(jobExecution);
				}
				catch (Throwable t) {
					rethrow(t);
				}
			}
			private void rethrow(Throwable t) {
			}
		});
	}
	return jobExecution;
}

看taskExecutor.execute 方法的主要邏輯,就是把 job execute 封裝進一個抽象的任務內,經過taskExecutor 執行 ,taskExecutor 是一個什麼類型——org.springframework.core.task.SyncTaskExecutor,一個同步的任務執行類。這樣就能夠明確的知道每一個chunk的處理都是在一個單線程內,循環往復的處理每一個commit-interval。this

taskExecutor.execute(new Runnable() {
	@Override
	public void run() {	
		job.execute(jobExecution);	
	}
});

 

繼續走代碼,那接下來執行的方法是job.execute(jobExecution);那先來看job 是什麼類型的對象,spa

那麼就是他了,FlowJob: [name=addPeopleDescJob]。進入job.execute ,線程

他就直接跳進了 org.springframework.batch.core.job.AbstractJob.execute(JobExecution execution)方法,這是FlowJob的父類,FlowJob 顯然沒有覆寫該方法。debug

AbstractJob.execute(JobExecution execution)code

@Override
public final void execute(JobExecution execution) {
	doExecute(execution);
}

這是方法的主要執行邏輯,那麼這個doExecute方法就是FlowJob實現父類的抽象方法,完成 job的執行的任務。

FlowJob.doExecute()

/**
 * @see AbstractJob#doExecute(JobExecution)
 */
@Override
protected void doExecute(final JobExecution execution) throws JobExecutionException {
   try {
      JobFlowExecutor executor = new JobFlowExecutor(getJobRepository(),
            new SimpleStepHandler(getJobRepository()), execution);
      executor.updateJobExecutionStatus(flow.start(executor).getStatus());
   }
   catch (FlowExecutionException e) {
      if (e.getCause() instanceof JobExecutionException) {
         throw (JobExecutionException) e.getCause();
      }
      throw new JobExecutionException("Flow execution ended unexpectedly", e);
   }
}

>>>>>>>>>>>>>>>>>>>>>>>>>>>

繼續debug

job是任務的任務的抽象表示,完成的具體任務還要在step中,那麼接下來就是step的執行了,step是如何執行的?

咱們的代碼最終調到這org.springframework.batch.core.step.AbstractStep.execute (StepExecution stepExecution) ,step的抽象類。這個方法的主要邏輯以下:

@Override
public final void execute(StepExecution stepExecution)
		throws JobInterruptedException, UnexpectedJobExecutionException {
	doExecute(stepExecution);
}

這個step又是什麼類型的對象——TaskletStep: [name=addDescStep],其父類爲AbstractStep。

繼續看TaskletStep.doExecute(stepExecution); 

@Override
protected void doExecute(StepExecution stepExecution) throws Exception {
	stepExecution.getExecutionContext().put(TASKLET_TYPE_KEY, tasklet.getClass().getName());
	stepExecution.getExecutionContext().put(STEP_TYPE_KEY, this.getClass().getName());

	stream.update(stepExecution.getExecutionContext());
	getJobRepository().updateExecutionContext(stepExecution);

	// Shared semaphore per step execution, so other step executions can run
	// in parallel without needing the lock
	final Semaphore semaphore = createSemaphore();

	stepOperations.iterate(new StepContextRepeatCallback(stepExecution) {

		@Override
		public RepeatStatus doInChunkContext(RepeatContext repeatContext, ChunkContext chunkContext)
				throws Exception {

			StepExecution stepExecution = chunkContext.getStepContext().getStepExecution();

			// Before starting a new transaction, check for
			// interruption.
			interruptionPolicy.checkInterrupted(stepExecution);

			RepeatStatus result;
			try {
				result = new TransactionTemplate(transactionManager, transactionAttribute)
				.execute(new ChunkTransactionCallback(chunkContext, semaphore));
			}
			catch (UncheckedTransactionException e) {
				// Allow checked exceptions to be thrown inside callback
				throw (Exception) e.getCause();
			}

			chunkListener.afterChunk(chunkContext);

			// Check for interruption after transaction as well, so that
			// the interrupted exception is correctly propagated up to
			// caller
			interruptionPolicy.checkInterrupted(stepExecution);

			return result;
		}

	});

}

上面就是TaskletStep.doExecute的完整代碼,其中主要的邏輯是:

stepOperations.iterate(new StepContextRepeatCallback(stepExecution) {
	@Override
	public RepeatStatus doInChunkContext(RepeatContext repeatContext, ChunkContext chunkContext)
			throws Exception {

		RepeatStatus result;
		try {
			result = new TransactionTemplate(transactionManager, transactionAttribute)
			.execute(new ChunkTransactionCallback(chunkContext, semaphore));
		}

		return result;
	}

});

stepOperations.iterate() 方法傳入StepContextRepeatCallback的一個匿名對象。

繼續debug,轉了一圈又回到這個回調方法上StepContextRepeatCallback.doInChunkContext(),

result = new TransactionTemplate(transactionManager, transactionAttribute)
	.execute(new ChunkTransactionCallback(chunkContext, semaphore));

new TransactionTemplate().execute()方法忽略過去,在這個方法裏面最終還要調用ChunkTransactionCallback.doInTransaction() 的回調方法

@Override
public RepeatStatus doInTransaction(TransactionStatus status) {
	RepeatStatus result = RepeatStatus.CONTINUABLE;
	result = tasklet.execute(contribution, chunkContext);
	return result;
}

這個tasklet 的類型是 :org.springframework.batch.core.step.item.ChunkOrientedTasklet ,定義step的執行策略。ChunkOrientedTasklet.execute() 方法:

@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {

	@SuppressWarnings("unchecked")
	Chunk<I> inputs = (Chunk<I>) chunkContext.getAttribute(INPUTS_KEY);
	if (inputs == null) {
		inputs = chunkProvider.provide(contribution);
		if (buffering) {
			chunkContext.setAttribute(INPUTS_KEY, inputs);
		}
	}

	chunkProcessor.process(contribution, inputs);
	chunkProvider.postProcess(contribution, inputs);

	// Allow a message coming back from the processor to say that we
	// are not done yet
	if (inputs.isBusy()) {
		logger.debug("Inputs still busy");
		return RepeatStatus.CONTINUABLE;
	}

	chunkContext.removeAttribute(INPUTS_KEY);
	chunkContext.setComplete();

	logger.debug("Inputs not busy, ended: " + inputs.isEnd());
	return RepeatStatus.continueIf(!inputs.isEnd());

}

Chunk<I> inputs,就是要讀入的數據,他是怎麼來的 :

inputs = chunkProvider.provide(contribution);

chunkProvider 是 org.springframework.batch.core.step.item.SimpleChunkProvider 的類型,provide方法:

@Override
public Chunk<I> provide(final StepContribution contribution) throws Exception {

	final Chunk<I> inputs = new Chunk<I>();
	repeatOperations.iterate(new RepeatCallback() {

		@Override
		public RepeatStatus doInIteration(final RepeatContext context) throws Exception {
			I item = null;
			try {
				item = read(contribution, inputs);
			}
			catch (SkipOverflowException e) {
				// read() tells us about an excess of skips by throwing an
				// exception
				return RepeatStatus.FINISHED;
			}
			if (item == null) {
				inputs.setEnd();
				return RepeatStatus.FINISHED;
			}
			inputs.add(item);
			contribution.incrementReadCount();
			return RepeatStatus.CONTINUABLE;
		}

	});

	return inputs;

}

在 provide 方法內,逐條讀取一塊數據(經過jdbc遊標來讀取一條或經過分頁來讀取多條)item = read(contribution, inputs);而後放入inputs 數據塊chunk inputs 。數據讀完了(此commit內,根據commit-interval的配置,讀取配置的次數或方法返回null表示數據讀取完成)接下來就要處理inputs 數據了,到chunkProcessor.process (contribution, inputs) :

@Override
public final void process(StepContribution contribution, Chunk<I> inputs) throws Exception {

	// Allow temporary state to be stored in the user data field
	initializeUserData(inputs);

	// If there is no input we don't have to do anything more
	if (isComplete(inputs)) {
		return;
	}

	// Make the transformation, calling remove() on the inputs iterator if
	// any items are filtered. Might throw exception and cause rollback.
	Chunk<O> outputs = transform(contribution, inputs);

	// Adjust the filter count based on available data
	contribution.incrementFilterCount(getFilterCount(inputs, outputs));

	// Adjust the outputs if necessary for housekeeping purposes, and then
	// write them out...
	write(contribution, inputs, getAdjustedOutputs(inputs, outputs));

}

在chunkProcessor.process 方法內定義了 兩個步驟,一個是transform,也就是process讀入的數據集合,一個是write。代碼跳到transform這:

protected Chunk<O> transform(StepContribution contribution, Chunk<I> inputs) throws Exception {
	Chunk<O> outputs = new Chunk<O>();
	for (Chunk<I>.ChunkIterator iterator = inputs.iterator(); iterator.hasNext();) {
		final I item = iterator.next();
		O output;
		try {
			output = doProcess(item);
		}
		catch (Exception e) {
			/*
			 * For a simple chunk processor (no fault tolerance) we are done
			 * here, so prevent any more processing of these inputs.
			 */
			inputs.clear();
			throw e;
		}
		if (output != null) {
			outputs.add(output);
		}
		else {
			iterator.remove();
		}
	}
	return outputs;
}

對於 inputs 集合 ,遍歷處理後獲得 outputs 集合。

output = doProcess(item);

上面這行代碼就是要調用自定義的process 方法進行處理。而後看write 方法是如何寫入數據的,

protected void write(StepContribution contribution, Chunk<I> inputs, Chunk<O> outputs) throws Exception {
	try {
		doWrite(outputs.getItems());
	}
	catch (Exception e) {
		/*
		 * For a simple chunk processor (no fault tolerance) we are done
		 * here, so prevent any more processing of these inputs.
		 */
		inputs.clear();
		throw e;
	}
	contribution.incrementWriteCount(outputs.size());
}

在write 方法內調用了 doWrite() 方法 ,而後調用自定義的 write方法把數據寫入。

上面就是 在一次 commit-interval 內的主要過程和主要的邏輯代碼。那麼 spring batch 是如何重複commit-interval 的呢 (經過RepeatTemplate)?之後再詳細說來。

 

總結:reader,processor和writer過程當中數據的處理邏輯

在course reader中,經過遊標逐條的讀取數據,造成一個inputs chunk,至關於一個集合,而後逐條的處理inputs chunk,進行item process,造成一個outpus chunk,由outputs chunk獲得一個集合outputs.getItems(),直接處理這個集合,也就是write方法的參數是一個集合類型。

================END================

相關文章
相關標籤/搜索