最近項目須要用到批處理,而後在網上搜了一下,搜到一篇蠻不錯的文章,轉載並和你們分享一下。html
轉載:https://blog.csdn.net/william_jm/article/details/78964538java
1簡介 1.1概述 大數據時代,數據的收集、處理、存儲、分析、挖掘、檢索、展現,環環相扣。其中數據處理環節是一個典型的批處理場景——按期對海量數據進行格式化,各類業務規範校驗,複雜的業務邏輯處理,並經過事務的方式處理到本身的數據庫中,同時還應該具有高效率,無人工干預能力。 Spring Batch的出現,很好的應對了該類需求。Spring Batch是一個輕量級的綜合性批處理框架,能夠應用於企業級大數據量處理系統。SpringBatch能夠提供大量的,可重複的數據處理功能,包括日誌/跟蹤(tracing),事務管理,任務處理(processing)統計,任務重啓,忽略(skip),和資源管理等功能。此外還提供了許多高級服務和特性,使之可以經過優化(optimization)和分片技術(partitioning techniques)來高效地執行超大型數據集的批處理任務。須要注意的是,Spring Batch並不提供定時之類的功能,那是quartz,Tivoli,Control-M等調度框架作的事情,它們是協做關係,而不是取代。 1.2背景 在微服務架構討論的如火如荼之際,基於Java的批處理框架卻無人問津。即便企業中一直都有批處理的需求,但因缺少一個標準的、可重用的批處理框架,導致項目/產品中出現大量一次編寫,一次使用的代碼片斷,以及不少其餘不一樣的臨時解決方案。 SpringSource和Accenture(埃森哲)聯手協做,致力於改善這種情況。埃森哲在實現批處理架構上有着豐富的產業實踐經驗,SpringSource有深刻的技術開發積累,背靠Spring框架提供的編程模型,強強聯合,勢必創造出高質量的、市場承認的企業級java解決方案——SpringBatch,基於埃森哲數十年寶貴的經驗並基於最新的軟件平臺(如COBOL/Mainframe,C++/Unix 及如今很是流行的Java平臺)來構建的項目。Spring Batch將來將會由開源社區提交者來驅動項目的開發、加強、以及將來的路線圖。而埃森哲諮詢公司與SpringSource合做的目標是促進軟件處理方法、框架和工具的標準化改進。 1.3場景 典型的批處理流程是讀數據、處理數據、寫數據的三步式架構——從數據庫、文件或隊列中讀取大量數據,而後經過業務規則處理數據,最後將處理完的數據按需求方式寫(數據庫、文件等)。一般Spring Batch工做在離線模式下,不須要用戶干預、就能自動進行基本的批處理迭代,進行相似事務方式的處理。 1.3.1 適用業務 Ø 按期提交批處理任務(日終處理) Ø 併發批處理:並行執行任務 Ø 分階段,企業消息驅動處理 Ø 高併發批處理任務 Ø 失敗後手動或定時重啓 Ø 按順序處理依賴任務 (使用工做流驅動的批處理插件) Ø 局部處理:跳過記錄(例如在回滾時) Ø 完整的批處理事務:由於可能有小數據量的批處理或存在存儲過程/腳本 1.3.2核心能力 Ø 利用Spring編程模式:使開發者專一於業務邏輯,讓框架解決基礎功能 Ø 明確劃分在批處理基礎架構、執行環境、應用 Ø 通用的核心服務以接口形式提供 Ø 提供簡單的默認實現,以實現核心執行接口的「開箱即用」 Ø 易於配置、定製和擴展服務 Ø 核心服務很容易擴展與替換,且不會影響基礎層 Ø 簡單部署模型 2關鍵架構與領域術語 2.1層次架構 Spring Batch的架構設計是充分考慮了系統的可擴展性和各種終端開發的普適性。下圖2.1.1是Spring Batch的層次架構示意圖。mysql
圖2.1.1-SpringBatch層次架構圖 Spring Batch架構主要分爲三類高級組件: 應用層(Application), 核心層(Core) 和基礎架構層(Infrastructure)。 應用層(Application):指開發人員編寫的全部批處理業務做業和自定義代碼。 核心層(Core):指加載和控制批處理做業所必需的核心類。含JobLauncher,Job和 Step的實現。 基礎架構層(Infrastructure):應用層與核心層都構建在基礎架構層之上。基礎架構包括通用的readers(ItemReader)和writers(ItemWriter),以及services (如重試模塊 RetryTemplate),能夠被應用層和核心層所使用。 2.2領域術語 Step:表示做業Job中的一個完整業務邏輯步驟,一個Job能夠有一個或者多個Step組成。 StepExecution:表示試運行一個步驟step的句柄。只有步驟step真的獲得運行纔會被建立。 Job(做業):做業是封裝整個批處理過程的實體。一個簡單的做業須要配置做業名、有序的步驟step、及是否重啓。 JobInstance(做業實例):一個做業實例與其要加載的數據無硬性關聯,這徹底是由數據讀入器ItemReader決定。好比:是否使用同一個做業實例,是由ItemReader根據前一次執行的狀態位(state)決定。用新的JobInstance意味從開頭讀取數據,用已有的表示從上次結束的地方開始。 JobParameter(做業參數):是指一個批量做業開始的參數集。同時,能夠用於標識JobInstance的惟一性。因此能夠認爲JobInstance=Job+JobParameter。 JobExecution:表示試運行一個做業的句柄。 以下圖2.2.1所示,Job比如是容器,能夠包含多個業務邏輯步驟step與多個JobInstance,來組織做業的執行(亦能夠保證做業的重啓),而JobExecution則是致力於記錄執行狀態。每一次執行中JobExecution和step都會進行數據信息傳輸,好比:commitCount、rollbackCount、startTime、endTime等,這些都會記錄進StepExecution。 圖2.2.1-批處理框架web
運行期的模型 JobLauncher(做業調度器):是Spring Batch框架基礎設施層提供運行Job的能力。對於將給定Job名稱和做Job Parameters的Job,由Java程序、命令行或者其它調度框架(如Quartz)中調用JobLauncher執行Job。 JobRepository(做業倉庫):來存儲Job執行期的元數據(這裏的元數據是指Job Instance、Job Execution、Job Parameters、Step Execution、Execution Context等數據)。有兩種默認實現——內存或數據庫。若將元數據存放在數據庫中,能夠隨時監控批處理Job的執行狀態。Job執行結果是成功仍是失敗,而且使失敗Job從新啓動Job成爲可能。 ItemReader:是對step的輸入的抽象,每次只讀入一條記錄,讀取完全部記錄後,則返回null。 ItemProcessor:是對每條記錄按業務邏輯處理的抽象。 ItemWriter:是對step的輸出的抽象,每次只能夠提供給一次批做業或記錄隊(chunk)。 下圖2.2.2顯示了完整的SpringBatch領域概念模型。JobLancaster啓動Job,Job可有多個Step組合,每個step對應一個ItemReader、ItemProcessor及ItemWriter,而JobRepository記錄Job執行信息。spring
2.2.2-Spring Batch領域概念模型sql
3實戰演習 光說不練假把式,這個章節就讓咱們一塊兒實戰操練下。 3.1What I’ll build 定時天天凌晨1點,按業務需求將TEST_TASK_PROPERTY表和DQP_TEST_FILE表數據彙總整合到表DQP_REPORT_A,即將結果數據表彙總到統計表中。 3.2What you’ll need ● Eclipse ● JDK 1.7 or later ● Maven 3.0 3.3Set up the project 本工程是由maven構建,使用SpringBoot簡化複雜的依賴配置及部署,使用Quartz做爲任務調度框架,SpringBatch做爲批處理框架,數據持久化使用JPA。 3.3.1pom.xml文件 [html] view plain copy 120.
121. <projectxmlnsprojectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
122. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0http://maven.apache.org/xsd/maven-4.0.0.xsd">
123. 4.0.0
124. com.william.lab.springboot.springbatch
125. springbatch
126. 0.0.1-SNAPSHOT
127. jar
128. springbatch
129. Testproject for Spring Boot + Spring Batch + Quartz
130.
131.
132. org.springframework.boot
133. spring-boot-starter-parent
134. 1.5.6.RELEASE
135.
136.
137.
138.
139. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
140. <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
141. <java.version>1.7</java.version>
142.
143.
144.
145.
146. org.springframework.boot
147. spring-boot-starter-batch
148.
149.
150. org.springframework.boot
151. spring-boot-starter-data-jpa
152.
153.
154. slf4j-api
155. org.slf4j
156.
157.
158. jboss-logging
159. org.jboss.logging
160.
161.
162.
163.
164. org.springframework.boot
165. spring-boot-starter-web
166.
167.
168. log4j-over-slf4j
169. org.slf4j
170.
171.
172.
173.
174. mysql
175. mysql-connector-java
176. runtime
177.
178.
179. org.springframework
180. spring-context-support
181.
182.
183. org.springframework
184. spring-tx
185.
186.
187. org.quartz-scheduler
188. quartz
189. 2.2.1
190.
191.
192. slf4j-api
193. org.slf4j
194.
195.
196.
197.
198. org.quartz-scheduler
199. quartz-jobs
200. 2.2.1
201.
202.
203. commons-lang
204. commons-lang
205. 2.6
206.
207.
208.
209. com.jcraft
210. jsch
211. 0.1.54
212.
213.
214. commons-io
215. commons-io
216. 2.4
217.
218.
219. commons-net
220. commons-net
221. 3.1
222.
223.
224. org.springframework.boot
225. spring-boot-starter-test
226. test
227.
228.
229.
230.
231.
232.
233. org.springframework.boot
234. spring-boot-maven-plugin
235.
236.
237.
238.
3.3.2Batch做業模塊配置 [java] view plain copy 50. @Configuration
51. @EnableBatchProcessing
52. public class BatchConfiguration {
53. @Autowired
54. private JobBuilderFactoryjobBuilderFactory;
55. @Autowired
56. private StepBuilderFactorystepBuilderFactory;
57. @PersistenceUnit
58. private EntityManagerFactory emf;
59.
60. @StepScope
61. publicJpaPagingItemReader reader() {
62. JpaPagingItemReaderreader = new JpaPagingItemReader();
63. reader.setQueryString("selectnew TestReport(ttp.taskId, tra.fileId, ttp.ruleId,sum( tra.count))"
64. + " fromTestFile tra,TestTaskProperty ttp WHERE ttp.taskId=tra.taskId AND ttp.beginTimeBETWEEN ?1 AND ?2 "
65. + "GROUP BYttp.taskId, tra.fileId, ttp.ruleId");
66. Map<String, Object>parameterValues = new HashMap<>();
67. parameterValues.put("1",CommonUtils.getTimeSection(0, 0, 0));
68. parameterValues.put("2",CommonUtils.getTimeSection(23, 59, 59));
69. reader.setParameterValues(parameterValues);
70. reader.setEntityManagerFactory(emf);
71. reader.setPageSize(Integer.MAX_VALUE);
72. return reader;
73. }
74.
75. @Bean
76. public TestFileProcessor processor(){
77. return newTestFileProcessor();
78. }
79.
80. @Bean
81. publicJpaItemWriter writer() {
82. JpaItemWriterwriter = new JpaItemWriter();
83. writer.setEntityManagerFactory(emf);
84. return writer;
85. }
86.
87. @Bean
88. public Step step() {
89. returnstepBuilderFactory.get("step").<TestReport, TestReport>chunk(10).reader(reader()).processor(processor())
90. .writer(writer()).build();
91. }
92.
93. @Bean
94. public Job importUserJob(JobRepositoryjobRepository) {
95. returnjobBuilderFactory.get("importUserJob").incrementer(newRunIdIncrementer()).repository(jobRepository)
96. .flow(step()).end().build();
97. }
98. }
在Spring的體系中@EnableBatchProcessing 註釋的工做原理與其它的帶有 @Enable * 的註釋相似。在這種狀況下, @EnableBatchProcessing 提供了構建批處理任務的基本配置。在這個基本的配置中,除了建立了一個StepScope的實例,還能夠將一系列可用的bean進行自動裝配: JobRepositorybean 名稱 "jobRepository" JobLauncher bean名稱"jobLauncher" JobRegistry bean名稱"jobRegistry" PlatformTransactionManagerbean名稱 "transactionManager" JobBuilderFactorybean名稱"jobBuilders" StepBuilderFactorybean名稱"stepBuilders" 這種配置的核心接口是BatchConfigurer。它爲以上所述的bean提供了默認的實現方式,並要求在context中提供一個bean,即DataSource。數據庫鏈接池由被JobRepository使用。 注意只有一個配置類須要有@ enablebatchprocessing註釋。只要有一個類添加了這個註釋,則以上全部的bean都是可使用的。 3.3.2.1做業Job和步驟Step Step()方法是組合特定業務需求步驟的,如上章節介紹,是由reader、processor和writer組成。importUserJob()方法提供的是組合業務做業的,由Step組成,並能夠由jobRepository()方法將做業持久化。 3.3.2.2做業處理單元reader、writer、processor reader()方法是讀取數據的方法,這裏實例化是JpaPagingItemReader()方法。JpaPagingItemReader容許您聲明一個JPQL語句,並傳入一個 EntityManagerFactory。而後就和其餘的 ItemReader 同樣,每次調用它的 read 方法都會返回一個 item。當須要更多實體,則內部就會自動發生分頁。 writer()方法是將處理結果持久化進數據庫的,其中JpaItemWriter是 JPA EntityManager aware 的,用來處理事務性工做,而執行實際的寫入工做是委託另外一個非jpa相關的(non-"jpa aware") ItemWriter作的。 processor()方法是業務數據處理方法,以下代碼段,處理了簡單業務邏輯。 [java] view plain copy 10. public class TestFileProcessor implementsItemProcessor<TestReport, TestReport> {
11. private static final Logger log=LoggerFactory.getLogger(TestFileProcessor.class);
12. @Override
13. public TestReport process(finalTestReport testReport) throws Exception {
14. testReport.setTimeSection(CommonUtils.getTimeSection(0,0, 0));
15. log.info("StatisticResult 【" +testReport + "】");
16. return testReport;
17. }
18. }
3.3.3Quartz調度模塊配置 3.3.3.1Trigger觸發器 [java] view plain copy 55. @Component("cronTriggerFactoryBean")
56. public class CronTriggerFactoryBean {
57. @Autowired
58. private SchedulerFactoryBeanschedulerFactoryBean;
59. /** 60. * 添加或修改一個定時任務 61. */
62. public void createNewTask(Stringexpression, int taskId) throws SchedulerException {
63. TriggerKey triggerKey =TriggerKey.triggerKey("TASK-" + taskId, "JOB-" +taskId);
64. CronTrigger trigger = null;
65. // 不存在,建立一個
66. JobKey jobKey = newJobKey("TASK-" + taskId, "JOB-" + taskId);
67. JobDetail jobDetail = JobBuilder.newJob(SpringQuartzJob.class).withIdentity(jobKey).build();
68. // 稽覈任務基礎信息
69. jobDetail.getJobDataMap().put("taskId",taskId);
70. // 表達式調度構建器
71. CronScheduleBuildercronScheduleBuilder = null;
72. cronScheduleBuilder =CronScheduleBuilder.cronSchedule(expression);
73. // 按cronExpression表達式構建一個新的trigger
74. trigger = TriggerBuilder.newTrigger().withIdentity(triggerKey).startAt(newDate()).withSchedule(cronScheduleBuilder).build();
75. // 加入任務隊列
76. Scheduler scheduler =schedulerFactoryBean.getScheduler();
77. scheduler.scheduleJob(jobDetail,trigger);
78. scheduler.rescheduleJob(triggerKey,trigger);
79. }
80. }
81. 這是一個簡單生成周期任務觸發器類,由任務配置接口傳入任務執行週期表達式(cron表達式)和任務編號等基礎信息,創建CronTrigger定時觸發器,調度quartz做業類。
82. 3.3.3.2
83.
84. @Component("springQuartzJob")
85. public class SpringQuartzJob extends QuartzJobBean {
86. @Autowired
87. Job importUserJob;
88. @Autowired
89. private JobLauncher jobLauncher;
90. @Override
91. public void executeInternal(finalJobExecutionContext context) throws JobExecutionException {
92. System.out.println("TestJobStart:" + Thread.currentThread().getId());
93. try {
94. init();
95. JobParameters jobParameters= new JobParametersBuilder().addLong("time", System.currentTimeMillis())
96. .toJobParameters();
97. JobExecution result =jobLauncher.run(importUserJob, jobParameters);
98. } catch (Exception e) {
99. e.printStackTrace();
100. }
101. System.out.println("Job1End");
102. }
103.
104. public void init() {
105. importUserJob =(Job) MyApplicationContextUtil.getBeanObj("importUserJob");
106. jobLauncher =(JobLauncher) MyApplicationContextUtil.getBeanObj("jobLauncher",JobLauncher.class);
107. }
108. }
JobParameters相似與Quartz中的JobDataMap,傳遞做業須要的數據。 jobLauncher.run()方法是經過做業Job和做業參數JobParameters來惟一標識做業倉庫中已有的做業,並執行做業。 3.3.3.3ApplicationContextAware [java] view plain copy 20. public class MyApplicationContextUtil implementsApplicationContextAware {
21. private staticApplicationContext context;
22. public static void setContext(ApplicationContextcontext) {
23. MyApplicationContextUtil.context= context;
24. }
25. @Override
26. public void setApplicationContext(ApplicationContextcontext) throws BeansException {
27. this.context =context;
28. }
29. public staticApplicationContext getContext() {
30. return context;
31. }
32. public final staticObject getBeanObj(String beanName) {
33. return context.getBean(beanName);
34. }
35. public final static Object getBeanObj(StringbeanName, Class<?> requiredType) {
36. return context.getBean(beanName,requiredType);
37. }
38. }
MyApplicationContextUtil繼承了ApplicationContextAware接口,實現public void setApplicationContext(ApplicationContext context)throwsBeansException方法,獲取spring配置上下文ApplicationContext,用於經過bean名字獲取bean方法public final static ObjectgetBeanObj(StringbeanName)。 3.3.4SpringbatchApplication啓動類 [java] view plain copy 22. @SpringBootApplication
23. @PropertySource(value = {"./application.properties" })
24. publicclass SpringbatchApplication {
25. publicstatic ConfigurableApplicationContext ctx;
26. publicstatic void main(String[] args) {
27. ctx= SpringApplication.run(new Object[] { QuartzResource.class}, args);
28. }
29. @Bean
30. publicSchedulerFactoryBean schedulerFactoryBean() throws Exception {
31. SchedulerFactoryBeanschedulerFactoryBean = new SchedulerFactoryBean();
32. PropertiesquartzProperties = new Properties();
33. FileInputStream in = newFileInputStream("./src/main/resources/quartz.properties");
34. quartzProperties.load(in);
35. schedulerFactoryBean.setQuartzProperties(quartzProperties);
36. returnschedulerFactoryBean;
37. }
38. @Bean
39. publicMyApplicationContextUtil myApplicationContextUtil() {
40. returnnew MyApplicationContextUtil();
41. }
42. }
public SchedulerFactoryBean schedulerFactoryBean()throwsException方法是用於初始化quartz配置信息quartz.properties。 3.3.5一個建立定時任務的web接口 [java] view plain copy 20. @RestController
21. @ComponentScan(basePackages= { "com.william.lab.springboot.springbatch.springbatch" })
22. @RequestMapping("/quartz")
23. public class QuartzResource {
24. private Logger LOGGER =LoggerFactory.getLogger(QuartzResource.class);
25. @Autowired
26. private CronTriggerFactoryBeancronTriggerFactoryBean;
27.
28. final int CREATE_ID = 17;
29.
30. @RequestMapping(value ="/get/{taskId}", method = RequestMethod.GET)
31. public void createTask(@PathVariable("taskId")String taskId) throws SchedulerException {
32. String str[] =taskId.split(",");
33. for (int i = 0; i< str.length; i++) {
34. int taskIdx =Integer.parseInt(str[i]);
35. cronTriggerFactoryBean.createNewTask("00/1 * * * ?", 1);
36. }
37. }
38. }
這是一個簡單的接口,用戶能夠經過此接口定義quartz調度batch做業任務。 3.3.6配置文件application.properties與quartz.properties 3.3.6.1application.properties [plain] view plain copy 23. # Tomcatport
24. server.port=18080
25. #Spring Batch
26. spring.batch.job.enabled=false
27. # MySQL DB
28. spring.datasource.url=jdbc:mysql://localhost:3306/william_lab?useUnicode=true&characterEncoding=UTF-8
29. spring.datasource.username=root
30. spring.datasource.password=123456
31. spring.datasource.driver-class-name=com.mysql.jdbc.Driver
32. # log config
33. logging.config=file:./src/main/resources/logback-spring.xml
34. #database pool
35. spring.datasource.tomcat.max-idle=15
36. spring.datasource.tomcat.max-wait=1000
37. spring.datasource.tomcat.maxActive=50
38. spring.datasource.tomcat.min-idle=5
39. spring.datasource.tomcat.initial-size=10
40. spring.datasource.tomcat.validation-query=SELECT1
41. spring.datasource.tomcat.test-on-borrow=false
42. spring.datasource.tomcat.test-while-idle=true
43. spring.datasource.tomcat.time-between-eviction-runs-millis=18800
44. spring.datasource.tomcat.jdbc-interceptors=ConnectionState;SlowQueryReport(threshold=0)
注意:當配置文件裏定義spring.batch.job.enabled爲true,或者沒定義(默認爲true)的時候,會初始化一個JobLauncherCommandLineRunner的bean,自動執行batch配置好的做業Job。鑑於咱們將batch的做業Job調度任務交由Quartz調度,因此設置爲false,這樣工程啓動後只會初始化batch做業配置,但不執行。 3.3.6.2quartz.properties [plain] view plain copy 27. # Configure MainScheduler Properties
28. org.quartz.scheduler.instanceName:DQPScheduler
29. org.quartz.scheduler.instanceId:AUTO
30. org.quartz.scheduler.skipUpdateCheck:false
31. # Configure ThreadPool
32. org.quartz.threadPool.class:org.quartz.simpl.SimpleThreadPool
33. org.quartz.threadPool.threadCount:1000
34. org.quartz.threadPool.threadPriority:5
35. # ConfigureJobStore
36. org.quartz.jobStore.misfireThreshold:60000
37. org.quartz.jobStore.class:org.quartz.impl.jdbcjobstore.JobStoreTX
38. org.quartz.jobStore.driverDelegateClass:org.quartz.impl.jdbcjobstore.StdJDBCDelegate
39. org.quartz.jobStore.useProperties:false
40. org.quartz.jobStore.dataSource:dqpDS
41. org.quartz.jobStore.tablePrefix:dqp_qrtz_
42. org.quartz.jobStore.isClustered:false
43. # Configure Datasources
44. org.quartz.dataSource.dqpDS.driver:com.mysql.jdbc.Driver
45. org.quartz.dataSource.dqpDS.URL:jdbc:mysql://localhost:3306/william_lab?useUnicode=true&characterEncoding=UTF-8&useSSL=false&autoReconnect=true
46. org.quartz.dataSource.dqpDS.user:root
47. org.quartz.dataSource.dqpDS.password:123456
48. org.quartz.dataSource.dqpDS.maxConnections:100
49. org.quartz.dataSource.dqpDS.validationQuery=select1
50. org.quartz.dataSource.dqpDS.idleConnectionValidationSeconds=60
51. org.quartz.dataSource.dqpDS.validateOnCheckout=true
52. org.quartz.dataSource.dqpDS.discardIdleConnectionsSeconds=60
注意:最後4行配置是保證quartz的數據庫鏈接池中,無效連接的釋放。 4總結 Spring Batch將整個批處理做業流程分了3個基礎階段:讀數據、業務處理、歸檔結果數據,且提供了許多讀數據接口(文件,jpa,jdbc、MongDB等),一樣寫數據接口也很豐富(文件,jpa,jdbc、MongDB等),還有日誌、監控、任務重啓與跳過等特性。而開發者只須要關注事務的粒度,日誌監控,執行方式,資源管理,讀數據,處理數據,寫數據的解耦等方面。可是,Spring Batch未提供關於批處理任務調度的功能,所以如何週期性的調用批處理任務須要本身想辦法解決,就Java來講,Quartz是一個不錯的解決方案,或者寫腳本處理之。數據庫