定時任務分佈式鎖的簡單實現

在集羣環境下,若每一臺機器都運行一個定時任務,會致使生產數據一致性問題,因此必需要實現一個鎖。保證當時任務在同一時間段只能在一臺機器上面運行。java

有的同窗應該已經想到分佈式鎖了,例如用redis或者zookeeper來實現分佈式鎖。redis

下面我介紹一種最簡單的實現定時任務互斥執行的機制,那就是使用數據庫樂觀鎖的原理。spring

運行環境:springMvc+quartz+mybatis數據庫

package com.test.job;


import com.test.common.constants.Constants;
import com.test.common.util.BlankUtil;
import com.test.common.util.DateUtil;
import com.test.common.dao.BaseJobConfigMapper;
import com.test.common.dao.BaseJobConfigRecordMapper;
import com.test.model.BaseJobConfig;
import com.test.model.BaseJobConfigRecord;
import com.test.utils.SpringContextUtil;
import org.apache.log4j.Logger;
import org.quartz.CronTrigger;
import org.quartz.JobExecutionContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.scheduling.quartz.QuartzJobBean;
import org.springframework.stereotype.Component;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.DefaultTransactionDefinition;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.text.SimpleDateFormat;
import java.util.Date;

@Component
public abstract class BaseJob extends QuartzJobBean {

    private static final Logger logger = Logger.getLogger(BaseJob.class);

    @Autowired
    private static BaseJobConfigMapper baseJobConfigMapper;

    @Autowired
    private static BaseJobConfigRecordMapper baseJobConfigRecordMapper;

    @Autowired
    private static DataSourceTransactionManager transactionManager;

    public static String IP_STRING = null;

    static {
        if (baseJobConfigMapper == null) {
            baseJobConfigMapper = (BaseJobConfigMapper) SpringContextUtil.getBean("baseJobConfigMapper");
        }

        if (baseJobConfigRecordMapper == null) {
            baseJobConfigRecordMapper = (BaseJobConfigRecordMapper) SpringContextUtil.getBean("baseJobConfigRecordMapper");
        }

        if (transactionManager == null) {
            transactionManager = (DataSourceTransactionManager) SpringContextUtil.getBean("transactionManager");
        }

        try {
            InetAddress ip = InetAddress.getLocalHost();
            if (!BlankUtil.isBlank(ip)) {
                IP_STRING = ip.getHostAddress();
                logger.info("本機地址" + IP_STRING);
            }
        } catch (UnknownHostException e) {
            logger.error(e.getMessage(), e);
        }
    }

    /**
     * Job名稱
     */
    public String JOB_NAME = getJobName();

    /**
     * 重置時間--分鐘
     */
    public int JOB_RESET_TIME = resetJobTime();


    /**
     * 要調度的具體任務
     */
    @Override
    @Transactional
    protected void executeInternal(JobExecutionContext context) {

        if (!Constants.IS_DEV_MODE) {
            //一、先判斷JOB_NAME是否不爲空,爲空則結束
            if (!BlankUtil.isBlank(JOB_NAME)) {

                CronTrigger cTrigger = (CronTrigger) context.getTrigger();

                SimpleDateFormat format = new SimpleDateFormat(DateUtil.DEFAULT_DATE_TIME);

                Date triggerTime = cTrigger.getPreviousFireTime();
                String currentTime = format.format(triggerTime);

                BaseJobConfig selectBaseJobConfig = new BaseJobConfig();
                selectBaseJobConfig.setKeyName(JOB_NAME);

                long numLong = baseJobConfigMapper.selectCount(selectBaseJobConfig);

                //三、如果沒有,則插入,狀態爲0(待執行)
                if (numLong <= 0) {
                    BaseJobConfig baseJobConfig = new BaseJobConfig();
                    baseJobConfig.setKeyName(JOB_NAME);
                    //初始化觸發時間是上一個小時,避免爲當前時間時,被誤認爲已經跑過
                    baseJobConfig.setSchedulePreTime(DateUtil.getDateByDifferHours(triggerTime, -1));
                    baseJobConfig.setCreateTime(new Date());
                    baseJobConfig.setState(0);
                    baseJobConfigMapper.insert(baseJobConfig);
                }

                int numInt = 0;

                //30分鐘未執行完,則重置狀態
                numInt = baseJobConfigMapper.updateBaseJobConfigStatusByTimeOut(JOB_NAME, JOB_RESET_TIME);
                logger.info("[baseJob.doJob]job.key.resertjobName=" + JOB_NAME + ",num=" + numInt);

                //有狀態爲0的待執行數據,則將狀態修改成1(執行中)
                numInt = baseJobConfigMapper.updateBaseJobConfigStatusAtStartTime(JOB_NAME, currentTime);

                if (numInt > 0) {

                    DefaultTransactionDefinition def = new DefaultTransactionDefinition();
                    def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
                    TransactionStatus status = transactionManager.getTransaction(def);

                    Date startTime = new Date();
                    //執行業務邏輯
                    try {
                        logger.info("[baseJob.doJob]執行開始jobName=" + JOB_NAME);
                        jobExecute();
                        transactionManager.commit(status);
                    } catch (Exception e) {
                        logger.error(e.getMessage(), e);
                        transactionManager.rollback(status);
                    } finally {
                        //五、業務邏輯執行完後,將狀態修改成0
                        numInt = baseJobConfigMapper.updateBaseJobConfigStatusAtEndTime(JOB_NAME, currentTime);

                        if (numInt <= 0) {
                            logger.error("[baseJob.doJob]active update error jobName=" + JOB_NAME);
                        }

                        Date endTime = new Date();

                        //記錄定時任務運行狀況
                        recordBaseJob(JOB_NAME, triggerTime, startTime, endTime, (endTime.getTime() - startTime.getTime()), IP_STRING);

                        logger.info("[baseJob.doJob]執行結束jobName=" + JOB_NAME + "------" + (endTime.getTime() - startTime.getTime()));
                    }
                }
            } else {
                logger.error("[baseJob.doJob]JOB_NAME is null");
            }

        }

    }

    public abstract void jobExecute() throws Exception;

    public abstract String getJobName();

    public int resetJobTime() {
        return 30;
    }

    /**
     * 功能描述:
     * 記錄任務運行狀況
     *
     */
    private void recordBaseJob(String keyName, Date triggerTime, Date startTime, Date endTime, Long costTime, String ip) {
        try {
            BaseJobConfigRecord baseJobConfigRecord = new BaseJobConfigRecord();
            baseJobConfigRecord.setKeyName(keyName);
            baseJobConfigRecord.setTriggerTime(triggerTime);
            baseJobConfigRecord.setStartTime(startTime);
            baseJobConfigRecord.setEndTime(endTime);
            baseJobConfigRecord.setCostTime(costTime);
            baseJobConfigRecord.setIp(ip);
            baseJobConfigRecord.setCreateTime(new Date());
            baseJobConfigRecordMapper.insert(baseJobConfigRecord);
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }

    }

}

定時任務實現類只須要繼承baseJob,實現jobExecute方法便可實現定時任務互斥執行,以下:apache

package com.test.job;

import com.test.job.BaseJob;
import org.apache.log4j.Logger;

/**
 * 功能描述:
 * 測試定時任務
 */
public class TestJob extends BaseJob {

    private static final Logger logger = Logger.getLogger(TestJob.class);
    

    @Override
    public void jobExecute() throws Exception {
        
        System.out.println("測試定時任務");
    }

    @Override
    public String getJobName() {
        return "TestJob";
    }
}

spring.xml添加以下配置:服務器

 <!-- 定時任務註解配置  -->
    <task:annotation-driven scheduler="scheduler" mode="proxy"/>
    <task:scheduler id="scheduler" pool-size="10"/>

spring-job.xml添加以下配置:mybatis

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
                        http://www.springframework.org/schema/beans/spring-beans.xsd">


    <bean id="trigger" class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
        <property name="triggers">
            <list>
                <ref bean="testJobCronTriggerBean"/>
            </list>
        </property>
    </bean>

    <!--測試任務 start-->
    <bean id="testJobDetail" class="org.springframework.scheduling.quartz.JobDetailFactoryBean">
        <property name="jobClass" value="com.test.job.TestJob"/>
        <property name="durability" value="true"/>

    </bean>

    <bean id="testJobCronTriggerBean"
          class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
        <property name="jobDetail" ref="testJobDetail"/>
        <property name="cronExpression" value="0 0/1 * * * ?"/>
    </bean>
    <!--測試任務 end-->

</beans>

mybatis相關代碼以下:app

package com.test.common.dao;

import com.test.model.BaseJobConfig;
import org.apache.ibatis.annotations.Param;
import tk.mybatis.mapper.common.Mapper;


public interface BaseJobConfigMapper extends Mapper<BaseJobConfig> {

    int updateBaseJobConfigStatusByTimeOut(@Param("keyName") String keyName, @Param("resetTime") Integer resetTime);

    int updateBaseJobConfigStatusAtStartTime(@Param("keyName") String keyName, @Param("schedulePreTime") String schedulePreTime);

    int updateBaseJobConfigStatusAtEndTime(@Param("keyName") String keyName, @Param("schedulePreTime") String schedulePreTime);
}
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.test.common.dao.BaseJobConfigMapper">
  <resultMap id="BaseResultMap" type="com.test.model.BaseJobConfig">
    <!--
      WARNING - @mbg.generated
    -->
    <id column="key_name" jdbcType="VARCHAR" property="keyName" />
    <result column="key_value" jdbcType="VARCHAR" property="keyValue" />
    <result column="schedule_pre_time" jdbcType="TIMESTAMP" property="schedulePreTime" />
    <result column="actual_pre_time" jdbcType="TIMESTAMP" property="actualPreTime" />
    <result column="state" jdbcType="TINYINT" property="state" />
    <result column="create_time" jdbcType="TIMESTAMP" property="createTime" />
  </resultMap>
  


  <!-- 運行超時,重置狀態 -->
  <update id="updateBaseJobConfigStatusByTimeOut">
    <![CDATA[
      update basejob_config set state = 0 where key_name =  #{keyName} and state = 1 and actual_pre_time <  DATE_SUB(now(), INTERVAL #{resetTime} MINUTE)
    ]]>
  </update>

  <!-- 定時任務運行開始更新互斥狀態 -->
  <update id="updateBaseJobConfigStatusAtStartTime">
    <![CDATA[
      update basejob_config set state = 1,actual_pre_time = now(),schedule_pre_time = #{schedulePreTime} where key_name = #{keyName} and state = 0 and schedule_pre_time != #{schedulePreTime}
    ]]>
  </update>

  <!-- 定時任務運行結束後更新互斥狀態 -->
  <update id="updateBaseJobConfigStatusAtEndTime">
    <![CDATA[
      update basejob_config set state = 0 where key_name = #{keyName} and state = 1 and schedule_pre_time = #{schedulePreTime}
    ]]>
  </update>


</mapper>

數據庫表設計:分佈式

CREATE TABLE `basejob_config` (
  `key_name` varchar(255) NOT NULL DEFAULT '' COMMENT '參數code',
  `key_value` varchar(255) DEFAULT NULL COMMENT '參數值',
  `schedule_pre_time` datetime DEFAULT NULL COMMENT '上一次計劃運行時間',
  `actual_pre_time` datetime DEFAULT NULL COMMENT '上一次實際執行時間',
  `state` tinyint(4) NOT NULL DEFAULT '0' COMMENT '狀態--1表明正在執行0表明等待執行',
  `create_time` datetime DEFAULT NULL COMMENT '建立時間',
  PRIMARY KEY (`key_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='baseJob配置表';

CREATE TABLE `basejob_config_record` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主鍵',
  `key_name` varchar(255) DEFAULT NULL COMMENT '定時任務名稱',
  `trigger_time` datetime DEFAULT NULL COMMENT '定時任務計劃觸發時間',
  `start_time` datetime DEFAULT NULL COMMENT '定時任務開始時間',
  `end_time` datetime DEFAULT NULL COMMENT '定時任務結束時間',
  `cost_time` bigint(20) DEFAULT NULL COMMENT '耗時',
  `ip` varchar(50) DEFAULT NULL COMMENT '運行服務器IP',
  `create_time` datetime DEFAULT NULL COMMENT '建立時間',
  PRIMARY KEY (`id`),
  KEY `idx_basejob_config_record_key_name` (`key_name`) USING BTREE,
  KEY `idx_basejob_config_record_trigger_time` (`trigger_time`) USING BTREE,
  KEY `idx_basejob_config_record_start_time` (`start_time`) USING BTREE,
  KEY `idx_basejob_config_record_end_time` (`end_time`) USING BTREE,
  KEY `idx_basejob_config_record_create_time` (`create_time`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='定時任務運行記錄表';

原理就是利用數據庫樂觀鎖對數據庫行記錄進行update,若能update成功,則證實服務器搶到一個鎖,則執行定時任務,update不成功的服務器,則直接退出,這樣一個簡單的定時任務分佈式鎖就實現了。ide

相關文章
相關標籤/搜索