寫一個job,其配置文件爲:java
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:batch="http://www.springframework.org/schema/batch" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-3.0.xsd" default-lazy-init="true"> <!-- 將數據庫中數據查詢出來入到txt文件中 --> <batch:job id="batchCreateReconFileJob" job-repository="jobRepository"> <!-- 從channel ftp獲取對帳文件到本地臨時目錄以供讀取解析入庫 --> <batch:step id="batchCreateReconFile0"> <batch:tasklet> <batch:chunk reader="batchCreateReconFileReader" processor="batchCreateReconFileProcessor" writer="batchCreateReconFileWriter" commit-interval="1000"/> </batch:tasklet> </batch:step> </batch:job> <bean id="batchCreateReconFileReader" parent="abstractCursorReader" scope="step"> <property name="dataSource" ref="dataSource" /> <property name="sql"> <value> <![CDATA[ SELECT qp.pay_no AS payNo, qp.channel_pay_no AS channelPayNo, qp.bank_pay_no AS bankPayNo, sign_no AS signNo FROM channel_qptrade qp WHERE create_time >= '#{jobParameters['startDate']}' and create_time<'#{jobParameters['endDate']}' ]]> </value> </property> <property name="rowMapper"> <bean class="org.springframework.jdbc.core.BeanPropertyRowMapper"> <property name="mappedClass" value="com.ninefbank.smallpay.clear.vo.ChannelQptradeVO"/> </bean> </property> </bean> <bean id="batchCreateReconFileWriter" class="org.springframework.batch.item.file.FlatFileItemWriter" scope="step"> <property name="resource" value="file:#{jobParameters['outputFilePath']}"></property> <property name="lineAggregator"> <bean class="org.springframework.batch.item.file.transform.FormatterLineAggregator"> <property name="fieldExtractor"> <bean class="org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor"> <property name="names" value="payNo,channelPayNo,bankPayNo,signNo"></property> </bean> </property> <property name="format" value="%s,%s,%s,%s"></property> </bean> </property> </bean> </beans>
處理類爲web
package com.ninefbank.smallpay.clear.batchReconFile.processer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.item.ItemProcessor; import org.springframework.stereotype.Service; import com.ninefbank.smallpay.clear.vo.ChannelQptradeVO; /** * OuterReconProcessor * * 外部對帳 訂單處理流程 */ @Service("batchCreateReconFileProcessor") public class BatchCreateReconFileProcessor implements ItemProcessor<ChannelQptradeVO, ChannelQptradeVO> { private static Logger logger = LoggerFactory.getLogger(BatchCreateReconFileProcessor.class); /** * * 檢查對帳成功重複狀態 * * 檢測異常,長款 ,短款 狀態 金額 * * @param data * @return * @throws Exception */ @Override public ChannelQptradeVO process(ChannelQptradeVO data) throws Exception { logger.info("批量生成外部對帳文件:{start} {}", new Object[] { data.toString() }); return data; } }
寫一個測試的controller:spring
package com.ninefbank.smallpay.clear.controller; import java.util.Date; import java.util.HashMap; import java.util.Map; import org.apache.commons.lang3.StringUtils; //import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; import com.ninefbank.smallpay.clear.constant.ClearConstants; import com.ninefbank.smallpay.clear.service.IBatchCreateReconFileService; //import com.ninefbank.smallpay.clear.service.IHandleFillDiffFileService; import com.ninefbank.smallpay.clear.service.ITaskStartInHandService; import com.ninefbank.smallpay.common.exception.ApplicationException; import com.ninefbank.smallpay.common.util.DateUtil; /** * * TaskStartInHand * * ll * ll * 2015年12月23日 下午8:26:01 * * @version 1.0.0 * */ @Controller @RequestMapping("/taskBatchCreateReconFileStart") public class BatchCreateReconFileController { private static Logger logger = LoggerFactory.getLogger(BatchCreateReconFileController.class); @Autowired private IBatchCreateReconFileService batchCreateReconFileService; /** * 按日期手動啓動單個任務 * * @param jobId * @param transDate 交易日期 * @return */ @RequestMapping(value="/run") @ResponseBody public Map<String, Object> execute(String jobId, String transDate, String transDate1){ Map<String, Object> ret = new HashMap<String, Object>(); if(StringUtils.isBlank(transDate)){ ret.put("result", "fail"); return ret; } Map<String, Object> params = new HashMap<String, Object>(); params.put("jobId", jobId); params.put("transDate", transDate); params.put("transDate1", transDate1); try { boolean flag = batchCreateReconFileService.run(params); if(flag){ ret.put("result", "success"); }else{ ret.put("result", "fail"); } } catch (ApplicationException e) { logger.error("手動觸發任務{}失敗!!!", new Object[]{jobId}, e); ret.put("result", "fail"); } return ret; } }
裏面涉及到的service:sql
package com.ninefbank.smallpay.clear.service; import java.util.Map; import com.ninefbank.smallpay.common.exception.ApplicationException; public interface IBatchCreateReconFileService { public boolean run(Map<String, Object> params) throws ApplicationException; }
涉及到的實現類:數據庫
package com.ninefbank.smallpay.clear.service.impl; import java.util.Calendar; import java.util.Date; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.core.ExitStatus; import org.springframework.batch.core.Job; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobParametersBuilder; import org.springframework.batch.core.launch.JobLauncher; import org.springframework.stereotype.Service; import com.ninefbank.smallpay.clear.constant.ClearConstants; import com.ninefbank.smallpay.clear.datasync.jobParams.JobParamsDataSync; import com.ninefbank.smallpay.clear.inner.JobInnerParams; import com.ninefbank.smallpay.clear.service.IBatchCreateReconFileService; import com.ninefbank.smallpay.clear.service.ITaskStartInHandService; import com.ninefbank.smallpay.clear.util.ClearConfigUtils; import com.ninefbank.smallpay.clear.util.JobParamsUtil; import com.ninefbank.smallpay.common.exception.ApplicationException; import com.ninefbank.smallpay.common.util.DateUtil; import com.ninefbank.smallpay.common.util.SpringContextHolder; /** * * TaskStartInHandServiceImpl * * ll * ll * 2015年12月23日 下午8:36:06 * * @version 1.0.0 * */ @Service("batchCreateReconFileService") public class BatchCreateReconFileServiceImpl implements IBatchCreateReconFileService { private static Logger logger = LoggerFactory.getLogger(BatchCreateReconFileServiceImpl.class); // 工做線程池 public static ExecutorService workThreadPool = Executors.newCachedThreadPool(); /* (non-Javadoc) * @see com.ninefbank.smallpay.clear.service.ITaskStartInHandService#run(java.lang.String) * @param taskType: * 內部對帳:innerReconSetJob(銀行卡理財&委託收款、理財金轉出、理財兌付) * 內部對帳: innerReconOneSetJob(支付衝正、委託提現、代付) * 內部對帳: innerReconTwoSetJob(提現凍結、提現劃撥、提現解凍) */ @Override public boolean run(Map<String, Object> params) throws ApplicationException { String jobId = (String)params.get("jobId"); String startDate = (String)params.get("transDate"); String endDate = (String)params.get("transDate1"); logger.info("開始執行任務,jobId:{}", new Object[]{jobId}); boolean flag = false; if(StringUtils.isBlank(jobId) || StringUtils.isBlank(startDate)){ return false; } final String temp = jobId; final String startDateS = startDate; final String endDateS = endDate; try { workThreadPool.execute(getNamedThread(new Runnable() { @Override public void run() { call((Job)SpringContextHolder.getBean(temp), startDateS, "startDate", endDateS,"endDate",temp); } })); flag = true; } catch (Exception e) { e.printStackTrace(); flag = false; } return flag; } private void call(Job job, String startDate, String paramName,String endDate, String paramName1, String jobId) throws ApplicationException{ JobLauncher launcher = SpringContextHolder.getBean("jobLauncher"); JobExecution result = null; try { JobParametersBuilder builder = getJPB(jobId, startDate,endDate); result = launcher.run(job, builder.toJobParameters()); } catch (Exception e) { logger.error("執行job失敗,job名稱:{}", new Object[]{job.getName()}, e); throw new ApplicationException("執行job失敗"); } ExitStatus es = result.getExitStatus(); String exitCode = es.getExitCode(); if (ExitStatus.COMPLETED.getExitCode().equals(exitCode)) { logger.info("任務執行完成,job名稱:{};exitCode={};exitDesc={}", new Object[]{job.getName(), exitCode, es.getExitDescription()}); } else { logger.debug("任務執行失敗,job名稱:{};exitCode={};exitDesc={}", new Object[]{job.getName(), exitCode, es.getExitDescription()}); } } private JobParametersBuilder getJPB(String jobId, String transDate, String endDate){ JobParametersBuilder builder = null; Date startDate = null; switch (jobId) { case "innerReconSetJob"://內部對帳【銀行卡理財、理財金轉出、理財兌付】 builder = new JobInnerParams().getJobParametersBuilder(); startDate = DateUtil.getDateStartTime(transDate, ClearConstants.DATE_FORMAT_8); builder.addDate("startDate", startDate); builder.addDate("endDate", DateUtil.addDate(startDate, Calendar.DAY_OF_MONTH, 1)); break; case "innerReconOneSetJob"://內部對帳【支付衝正、委託提現、代付】 builder = new JobInnerParams().getJobParametersBuilder(); startDate = DateUtil.getDateStartTime(transDate, ClearConstants.DATE_FORMAT_8); builder.addDate("startDate", startDate); builder.addDate("endDate", DateUtil.addDate(startDate, Calendar.DAY_OF_MONTH, 1)); break; case "innerReconTwoSetJob"://內部對帳【提現凍結、提現劃撥、提現解凍】 builder = new JobInnerParams().getJobParametersBuilder(); startDate = DateUtil.getDateStartTime(transDate, ClearConstants.DATE_FORMAT_8); builder.addDate("startDate", startDate); builder.addDate("endDate", DateUtil.addDate(startDate, Calendar.DAY_OF_MONTH, 1)); break; case "batchCreateReconFileJob"://內部對帳【提現凍結、提現劃撥、提現解凍】 builder = new JobInnerParams().getJobParametersBuilder(); startDate = DateUtil.getDateStartTime(transDate, ClearConstants.DATE_FORMAT_8); builder.addString("startDate", transDate); builder.addString("endDate", endDate); String tempPath =ClearConfigUtils.CLEAR_PROPS.getProperty("fy_ftp_file_path")+"fixedLengthOutputFile.txt"; // builder.addString( // "outputFilePath", // "E:\\batchRecon\\fixedLengthOutputFile.txt"); builder.addString("outputFilePath",tempPath); break; default: builder = JobParamsUtil.getJobParametersBuilder(); break; } return builder; } /** * 獲取線程並setName * * @param command * @return */ public Thread getNamedThread(Runnable command) { Thread thread = new Thread(command); // 設置線程name爲"Clear."+方法名 thread.setName("Clear." + Thread.currentThread().getStackTrace()[2].getMethodName()); return thread; } }
配置文件中的生成文件路徑爲:apache
fy_ftp_file_path=F:\\0921\\tmp\\ftp01\\dev\\app
啓動項目訪問controler就能夠了:ip :port/項目名/taskBatchCreateReconFileStart/run?jobId=batchCreateReconFileJob&transDate=2017-06-01&transDate1=2017-09-21async