用Elastic-Job可解決分佈式重複執行問題java
若是業務工程採用集羣化的部署,可能會屢次重複執行定時任務而致使系統的業務邏輯錯誤,併產生系統故障。spring
job.propertiesapi
simple.id=recommendJob simple.class=com.tf56.mk.job.RecommendJob #天天23點執行 simple.cron=0 0 23 * * ? * ##0 0 23 * * ? * ##0 0/1 * * * ? ## 分片,會根據分片數配置多線程處理。在批量數據處理時,分片處理會提升效率。 simple.shardingTotalCount=1 ## 分片參數配置,使得分片更易於理解,某些時候可易於做業處理。 simple.shardingItemParameters=0=Beijing ## 是否監控做業運行狀態。在任務執行時間短、任務間隔時間短等狀況下,不建議開啓,多少會影響效率。 simple.monitorExecution=true ## 失效轉移。在多服務器時比較有用。 simple.failover=true ## 做業描述 simple.description=MK簡單做業 ## 做業是否禁止啓動。可用於部署做業時,先禁止啓動,部署結束後統一啓動。 simple.disabled=false ## 本地配置是否可覆蓋註冊中心配置。若是可覆蓋,每次啓動做業都以本地配置爲準 simple.overwrite=true ## 做業監控端口。建議開啓,方便dump做業信息,排查問題,特別是分佈式做業狀況下。 simple.monitorPort=9888 ## 做業前置、後置處理監聽器。有點像是攔截器的概念。 listener.simple=com.tf56.mk.job.listener.SimpleListener simple1.id=projectJob simple1.class=com.tf56.mk.job.ProjectJob ##0 0 1 1/1 * ? 每日1點執行 ##0 0/5 * * * ? 每5分鐘執行 simple1.cron=0 0/5 * * * ?
job.xml服務器
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:reg="http://www.dangdang.com/schema/ddframe/reg" xmlns:job="http://www.dangdang.com/schema/ddframe/job" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.dangdang.com/schema/ddframe/reg http://www.dangdang.com/schema/ddframe/reg/reg.xsd http://www.dangdang.com/schema/ddframe/job http://www.dangdang.com/schema/ddframe/job/job.xsd "> <context:property-placeholder location="classpath:config/*.properties" /> <reg:zookeeper id="regCenter" server-lists="${serverLists}" namespace="${namespace}" base-sleep-time-milliseconds="${baseSleepTimeMilliseconds}" max-sleep-time-milliseconds="${maxSleepTimeMilliseconds}" max-retries="${maxRetries}" /> <job:simple id="${simple.id}" class="${simple.class}" registry-center-ref="regCenter" sharding-total-count="${simple.shardingTotalCount}" cron="${simple.cron}" sharding-item-parameters="${simple.shardingItemParameters}" monitor-execution="${simple.monitorExecution}" monitor-port="${simple.monitorPort}" failover="${simple.failover}" description="${simple.description}" disabled="${simple.disabled}" overwrite="${simple.overwrite}"> <job:listener class="${listener.simple}" /> <job:event-log /> </job:simple> <job:simple id="projectJob" class="${simple1.class}" registry-center-ref="regCenter" sharding-total-count="${simple.shardingTotalCount}" cron="${simple1.cron}" sharding-item-parameters="${simple.shardingItemParameters}" monitor-execution="${simple.monitorExecution}" monitor-port="${simple.monitorPort}" failover="${simple.failover}" description="${simple.description}" disabled="${simple.disabled}" overwrite="${simple.overwrite}"> <job:listener class="${listener.simple}" /> <job:event-log /> </job:simple> </beans>
reg.properites多線程
serverLists=mt-zookeeper-vip:2181 namespace=esjob-monkeyKingService baseSleepTimeMilliseconds=1000 maxSleepTimeMilliseconds=3000 maxRetries=3
package com.tf56.mk.job; import com.dangdang.ddframe.job.api.ShardingContext; import com.dangdang.ddframe.job.api.simple.SimpleJob; import com.tf56.mk.common.util.PropertiesUtil; import com.tf56.mk.dao.RequestRecommendDao; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.HashMap; import java.util.Map; @Component public class RecommendJob implements SimpleJob { private final Logger logger = LoggerFactory.getLogger(this.getClass()); @Autowired RequestRecommendDao requestRecommendDao; /** * * @param arg0 */ @Override public void execute(ShardingContext arg0) { logger.info("ES Job Doing--doRecommendJob"); Map map = new HashMap(); map.put("mkExpertRecommendJobTime",Long.parseLong(PropertiesUtil.getPropertieValue("mkExpertRecommendJobTime"))); map.put("mkExpertRecommendNum",Long.parseLong(PropertiesUtil.getPropertieValue("mkExpertRecommendNum"))); requestRecommendDao.doRecommendJob(map); } }
package com.tf56.mk.job.listener; import com.dangdang.ddframe.job.executor.ShardingContexts; import com.dangdang.ddframe.job.lite.api.listener.ElasticJobListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class SimpleListener implements ElasticJobListener { private static Logger logger = LoggerFactory.getLogger(SimpleListener.class); public void beforeJobExecuted(final ShardingContexts shardingContexts) { logger.info("beforeJobExecuted:" + shardingContexts.getJobName()); } public void afterJobExecuted(final ShardingContexts shardingContexts) { logger.info("afterJobExecuted:" + shardingContexts.getJobName()); } }
pom.xmleclipse
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <java.version>1.7</java.version> <elastic-job.version>2.0.1</elastic-job.version> </properties>
<!-- esjob --> <dependency> <groupId>com.dangdang</groupId> <artifactId>elastic-job-lite-core</artifactId> <version>${elastic-job.version}</version> <exclusions> <exclusion> <groupId>org.eclipse.jetty.orbit</groupId> <artifactId>javax.annotation</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>com.dangdang</groupId> <artifactId>elastic-job-lite-spring</artifactId> <version>${elastic-job.version}</version> </dependency>
JobConfig.java 引入xml配置分佈式
@Configuration @ImportResource(locations = {"classpath:spring/job.xml"}) public class JobConfig { }