Elastic-Job定時任務

用Elastic-Job可解決分佈式重複執行問題java

若是業務工程採用集羣化的部署,可能會屢次重複執行定時任務而致使系統的業務邏輯錯誤,併產生系統故障。spring

job.propertiesapi

simple.id=recommendJob
simple.class=com.tf56.mk.job.RecommendJob
#天天23點執行
simple.cron=0 0 23 * * ? *
##0 0 23 * * ? *
##0 0/1 * * * ?
## 分片,會根據分片數配置多線程處理。在批量數據處理時,分片處理會提升效率。
simple.shardingTotalCount=1
## 分片參數配置,使得分片更易於理解,某些時候可易於做業處理。
simple.shardingItemParameters=0=Beijing
## 是否監控做業運行狀態。在任務執行時間短、任務間隔時間短等狀況下,不建議開啓,多少會影響效率。
simple.monitorExecution=true
## 失效轉移。在多服務器時比較有用。
simple.failover=true
## 做業描述
simple.description=MK簡單做業
## 做業是否禁止啓動。可用於部署做業時,先禁止啓動,部署結束後統一啓動。
simple.disabled=false
## 本地配置是否可覆蓋註冊中心配置。若是可覆蓋,每次啓動做業都以本地配置爲準
simple.overwrite=true
## 做業監控端口。建議開啓,方便dump做業信息,排查問題,特別是分佈式做業狀況下。
simple.monitorPort=9888
## 做業前置、後置處理監聽器。有點像是攔截器的概念。
listener.simple=com.tf56.mk.job.listener.SimpleListener


simple1.id=projectJob
simple1.class=com.tf56.mk.job.ProjectJob
##0 0 1 1/1 * ?  每日1點執行
##0 0/5 * * * ?  每5分鐘執行
simple1.cron=0 0/5 * * * ?

job.xml服務器

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
       xmlns:reg="http://www.dangdang.com/schema/ddframe/reg" xmlns:job="http://www.dangdang.com/schema/ddframe/job"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
                        http://www.springframework.org/schema/beans/spring-beans.xsd
                        http://www.springframework.org/schema/context
                        http://www.springframework.org/schema/context/spring-context.xsd
                        http://www.dangdang.com/schema/ddframe/reg
                        http://www.dangdang.com/schema/ddframe/reg/reg.xsd
                        http://www.dangdang.com/schema/ddframe/job
                        http://www.dangdang.com/schema/ddframe/job/job.xsd
                        ">
    <context:property-placeholder location="classpath:config/*.properties" />

    <reg:zookeeper id="regCenter" server-lists="${serverLists}"
                   namespace="${namespace}" base-sleep-time-milliseconds="${baseSleepTimeMilliseconds}"
                   max-sleep-time-milliseconds="${maxSleepTimeMilliseconds}" max-retries="${maxRetries}" />

    <job:simple id="${simple.id}" class="${simple.class}"
                registry-center-ref="regCenter" sharding-total-count="${simple.shardingTotalCount}"
                cron="${simple.cron}" sharding-item-parameters="${simple.shardingItemParameters}"
                monitor-execution="${simple.monitorExecution}" monitor-port="${simple.monitorPort}"
                failover="${simple.failover}" description="${simple.description}"
                disabled="${simple.disabled}" overwrite="${simple.overwrite}">
        <job:listener class="${listener.simple}" />
        <job:event-log />
    </job:simple>

    <job:simple id="projectJob" class="${simple1.class}"
                registry-center-ref="regCenter" sharding-total-count="${simple.shardingTotalCount}"
                cron="${simple1.cron}" sharding-item-parameters="${simple.shardingItemParameters}"
                monitor-execution="${simple.monitorExecution}" monitor-port="${simple.monitorPort}"
                failover="${simple.failover}" description="${simple.description}"
                disabled="${simple.disabled}" overwrite="${simple.overwrite}">
        <job:listener class="${listener.simple}" />
        <job:event-log />
    </job:simple>

</beans>

reg.properites多線程

serverLists=mt-zookeeper-vip:2181
namespace=esjob-monkeyKingService
baseSleepTimeMilliseconds=1000
maxSleepTimeMilliseconds=3000
maxRetries=3

 

package com.tf56.mk.job;

import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import com.tf56.mk.common.util.PropertiesUtil;
import com.tf56.mk.dao.RequestRecommendDao;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;

@Component
public class RecommendJob implements SimpleJob {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    @Autowired
    RequestRecommendDao requestRecommendDao;

    /**
     * 
     * @param arg0
     */
    @Override
    public void execute(ShardingContext arg0) {
        logger.info("ES Job Doing--doRecommendJob");
        Map map = new HashMap();
        map.put("mkExpertRecommendJobTime",Long.parseLong(PropertiesUtil.getPropertieValue("mkExpertRecommendJobTime")));
        map.put("mkExpertRecommendNum",Long.parseLong(PropertiesUtil.getPropertieValue("mkExpertRecommendNum")));
        requestRecommendDao.doRecommendJob(map);
    }
}

 

package com.tf56.mk.job.listener;

import com.dangdang.ddframe.job.executor.ShardingContexts;
import com.dangdang.ddframe.job.lite.api.listener.ElasticJobListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SimpleListener implements ElasticJobListener {
    private static Logger logger = LoggerFactory.getLogger(SimpleListener.class);

    public void beforeJobExecuted(final ShardingContexts shardingContexts) {
        logger.info("beforeJobExecuted:" + shardingContexts.getJobName());
    }

    public void afterJobExecuted(final ShardingContexts shardingContexts) {
        logger.info("afterJobExecuted:" + shardingContexts.getJobName());
    }
}

pom.xmleclipse

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <java.version>1.7</java.version>
    <elastic-job.version>2.0.1</elastic-job.version>
</properties>
<!-- esjob -->
<dependency>
    <groupId>com.dangdang</groupId>
    <artifactId>elastic-job-lite-core</artifactId>
    <version>${elastic-job.version}</version>
    <exclusions>
        <exclusion>
            <groupId>org.eclipse.jetty.orbit</groupId>
            <artifactId>javax.annotation</artifactId>
        </exclusion>
    </exclusions>
</dependency>

<dependency>
    <groupId>com.dangdang</groupId>
    <artifactId>elastic-job-lite-spring</artifactId>
    <version>${elastic-job.version}</version>
</dependency>

JobConfig.java 引入xml配置分佈式

@Configuration
@ImportResource(locations = {"classpath:spring/job.xml"})
public class JobConfig {
}
相關文章
相關標籤/搜索