Spring Batch_JOB執行流程分析java
基於如下job的配置,spring
<batch:job id="addPeopleDescJob"> <batch:step id="addDescStep" parent="abstractStep"> <batch:tasklet> <batch:chunk reader="peopleAddDescReader" processor="allowSkipProcessor" writer="addDescPeopleWriter" commit-interval="2"> </batch:chunk> </batch:tasklet> </batch:step> </batch:job>
debug代碼ide
JobExecution result = launcher.run(job, jobParametersBuilder.toJobParameters());
這是啓動job的方法,以下是方法的具體實現:post
SimpleJobLauncher.java run方法的具體實現(刪除了部分代碼)ui
@Override public JobExecution run(final Job job, final JobParameters jobParameters) throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, JobParametersInvalidException { final JobExecution jobExecution; jobExecution = jobRepository.createJobExecution(job.getName(), jobParameters); try { taskExecutor.execute(new Runnable() { @Override public void run() { try { job.execute(jobExecution); } catch (Throwable t) { rethrow(t); } } private void rethrow(Throwable t) { } }); } return jobExecution; }
看taskExecutor.execute 方法的主要邏輯,就是把 job execute 封裝進一個抽象的任務內,經過taskExecutor 執行 ,taskExecutor 是一個什麼類型——org.springframework.core.task.SyncTaskExecutor,一個同步的任務執行類。這樣就能夠明確的知道每一個chunk的處理都是在一個單線程內,循環往復的處理每一個commit-interval。this
taskExecutor.execute(new Runnable() { @Override public void run() { job.execute(jobExecution); } });
繼續走代碼,那接下來執行的方法是job.execute(jobExecution);那先來看job 是什麼類型的對象,spa
那麼就是他了,FlowJob: [name=addPeopleDescJob]。進入job.execute ,線程
他就直接跳進了 org.springframework.batch.core.job.AbstractJob.execute(JobExecution execution)方法,這是FlowJob的父類,FlowJob 顯然沒有覆寫該方法。debug
AbstractJob.execute(JobExecution execution)code
@Override public final void execute(JobExecution execution) { doExecute(execution); }
這是方法的主要執行邏輯,那麼這個doExecute方法就是FlowJob實現父類的抽象方法,完成 job的執行的任務。
FlowJob.doExecute()
/** * @see AbstractJob#doExecute(JobExecution) */ @Override protected void doExecute(final JobExecution execution) throws JobExecutionException { try { JobFlowExecutor executor = new JobFlowExecutor(getJobRepository(), new SimpleStepHandler(getJobRepository()), execution); executor.updateJobExecutionStatus(flow.start(executor).getStatus()); } catch (FlowExecutionException e) { if (e.getCause() instanceof JobExecutionException) { throw (JobExecutionException) e.getCause(); } throw new JobExecutionException("Flow execution ended unexpectedly", e); } }
>>>>>>>>>>>>>>>>>>>>>>>>>>>
繼續debug
job是任務的任務的抽象表示,完成的具體任務還要在step中,那麼接下來就是step的執行了,step是如何執行的?
咱們的代碼最終調到這org.springframework.batch.core.step.AbstractStep.execute (StepExecution stepExecution) ,step的抽象類。這個方法的主要邏輯以下:
@Override public final void execute(StepExecution stepExecution) throws JobInterruptedException, UnexpectedJobExecutionException { doExecute(stepExecution); }
這個step又是什麼類型的對象——TaskletStep: [name=addDescStep],其父類爲AbstractStep。
繼續看TaskletStep.doExecute(stepExecution);
@Override protected void doExecute(StepExecution stepExecution) throws Exception { stepExecution.getExecutionContext().put(TASKLET_TYPE_KEY, tasklet.getClass().getName()); stepExecution.getExecutionContext().put(STEP_TYPE_KEY, this.getClass().getName()); stream.update(stepExecution.getExecutionContext()); getJobRepository().updateExecutionContext(stepExecution); // Shared semaphore per step execution, so other step executions can run // in parallel without needing the lock final Semaphore semaphore = createSemaphore(); stepOperations.iterate(new StepContextRepeatCallback(stepExecution) { @Override public RepeatStatus doInChunkContext(RepeatContext repeatContext, ChunkContext chunkContext) throws Exception { StepExecution stepExecution = chunkContext.getStepContext().getStepExecution(); // Before starting a new transaction, check for // interruption. interruptionPolicy.checkInterrupted(stepExecution); RepeatStatus result; try { result = new TransactionTemplate(transactionManager, transactionAttribute) .execute(new ChunkTransactionCallback(chunkContext, semaphore)); } catch (UncheckedTransactionException e) { // Allow checked exceptions to be thrown inside callback throw (Exception) e.getCause(); } chunkListener.afterChunk(chunkContext); // Check for interruption after transaction as well, so that // the interrupted exception is correctly propagated up to // caller interruptionPolicy.checkInterrupted(stepExecution); return result; } }); }
上面就是TaskletStep.doExecute的完整代碼,其中主要的邏輯是:
stepOperations.iterate(new StepContextRepeatCallback(stepExecution) { @Override public RepeatStatus doInChunkContext(RepeatContext repeatContext, ChunkContext chunkContext) throws Exception { RepeatStatus result; try { result = new TransactionTemplate(transactionManager, transactionAttribute) .execute(new ChunkTransactionCallback(chunkContext, semaphore)); } return result; } });
stepOperations.iterate() 方法傳入StepContextRepeatCallback的一個匿名對象。
繼續debug,轉了一圈又回到這個回調方法上StepContextRepeatCallback.doInChunkContext(),
result = new TransactionTemplate(transactionManager, transactionAttribute) .execute(new ChunkTransactionCallback(chunkContext, semaphore));
new TransactionTemplate().execute()方法忽略過去,在這個方法裏面最終還要調用ChunkTransactionCallback.doInTransaction() 的回調方法:
@Override public RepeatStatus doInTransaction(TransactionStatus status) { RepeatStatus result = RepeatStatus.CONTINUABLE; result = tasklet.execute(contribution, chunkContext); return result; }
這個tasklet 的類型是 :org.springframework.batch.core.step.item.ChunkOrientedTasklet ,定義step的執行策略。ChunkOrientedTasklet.execute() 方法:
@Override public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { @SuppressWarnings("unchecked") Chunk<I> inputs = (Chunk<I>) chunkContext.getAttribute(INPUTS_KEY); if (inputs == null) { inputs = chunkProvider.provide(contribution); if (buffering) { chunkContext.setAttribute(INPUTS_KEY, inputs); } } chunkProcessor.process(contribution, inputs); chunkProvider.postProcess(contribution, inputs); // Allow a message coming back from the processor to say that we // are not done yet if (inputs.isBusy()) { logger.debug("Inputs still busy"); return RepeatStatus.CONTINUABLE; } chunkContext.removeAttribute(INPUTS_KEY); chunkContext.setComplete(); logger.debug("Inputs not busy, ended: " + inputs.isEnd()); return RepeatStatus.continueIf(!inputs.isEnd()); }
Chunk<I> inputs,就是要讀入的數據,他是怎麼來的 :
inputs = chunkProvider.provide(contribution);
chunkProvider 是 org.springframework.batch.core.step.item.SimpleChunkProvider 的類型,provide方法:
@Override public Chunk<I> provide(final StepContribution contribution) throws Exception { final Chunk<I> inputs = new Chunk<I>(); repeatOperations.iterate(new RepeatCallback() { @Override public RepeatStatus doInIteration(final RepeatContext context) throws Exception { I item = null; try { item = read(contribution, inputs); } catch (SkipOverflowException e) { // read() tells us about an excess of skips by throwing an // exception return RepeatStatus.FINISHED; } if (item == null) { inputs.setEnd(); return RepeatStatus.FINISHED; } inputs.add(item); contribution.incrementReadCount(); return RepeatStatus.CONTINUABLE; } }); return inputs; }
在 provide 方法內,逐條讀取一塊數據(經過jdbc遊標來讀取一條或經過分頁來讀取多條)item = read(contribution, inputs);而後放入inputs 數據塊chunk inputs 。數據讀完了(此commit內,根據commit-interval的配置,讀取配置的次數或方法返回null表示數據讀取完成)接下來就要處理inputs 數據了,到chunkProcessor.process (contribution, inputs) :
@Override public final void process(StepContribution contribution, Chunk<I> inputs) throws Exception { // Allow temporary state to be stored in the user data field initializeUserData(inputs); // If there is no input we don't have to do anything more if (isComplete(inputs)) { return; } // Make the transformation, calling remove() on the inputs iterator if // any items are filtered. Might throw exception and cause rollback. Chunk<O> outputs = transform(contribution, inputs); // Adjust the filter count based on available data contribution.incrementFilterCount(getFilterCount(inputs, outputs)); // Adjust the outputs if necessary for housekeeping purposes, and then // write them out... write(contribution, inputs, getAdjustedOutputs(inputs, outputs)); }
在chunkProcessor.process 方法內定義了 兩個步驟,一個是transform,也就是process讀入的數據集合,一個是write。代碼跳到transform這:
protected Chunk<O> transform(StepContribution contribution, Chunk<I> inputs) throws Exception { Chunk<O> outputs = new Chunk<O>(); for (Chunk<I>.ChunkIterator iterator = inputs.iterator(); iterator.hasNext();) { final I item = iterator.next(); O output; try { output = doProcess(item); } catch (Exception e) { /* * For a simple chunk processor (no fault tolerance) we are done * here, so prevent any more processing of these inputs. */ inputs.clear(); throw e; } if (output != null) { outputs.add(output); } else { iterator.remove(); } } return outputs; }
對於 inputs 集合 ,遍歷處理後獲得 outputs 集合。
output = doProcess(item);
上面這行代碼就是要調用自定義的process 方法進行處理。而後看write 方法是如何寫入數據的,
protected void write(StepContribution contribution, Chunk<I> inputs, Chunk<O> outputs) throws Exception { try { doWrite(outputs.getItems()); } catch (Exception e) { /* * For a simple chunk processor (no fault tolerance) we are done * here, so prevent any more processing of these inputs. */ inputs.clear(); throw e; } contribution.incrementWriteCount(outputs.size()); }
在write 方法內調用了 doWrite() 方法 ,而後調用自定義的 write方法把數據寫入。
上面就是 在一次 commit-interval 內的主要過程和主要的邏輯代碼。那麼 spring batch 是如何重複commit-interval 的呢 (經過RepeatTemplate)?之後再詳細說來。
在course reader中,經過遊標逐條的讀取數據,造成一個inputs chunk,至關於一個集合,而後逐條的處理inputs chunk,進行item process,造成一個outpus chunk,由outputs chunk獲得一個集合outputs.getItems(),直接處理這個集合,也就是write方法的參數是一個集合類型。
================END================