在集羣環境下,若每一臺機器都運行一個定時任務,會致使生產數據一致性問題,因此必需要實現一個鎖。保證當時任務在同一時間段只能在一臺機器上面運行。java
有的同窗應該已經想到分佈式鎖了,例如用redis或者zookeeper來實現分佈式鎖。redis
下面我介紹一種最簡單的實現定時任務互斥執行的機制,那就是使用數據庫樂觀鎖的原理。spring
運行環境:springMvc+quartz+mybatis數據庫
package com.test.job; import com.test.common.constants.Constants; import com.test.common.util.BlankUtil; import com.test.common.util.DateUtil; import com.test.common.dao.BaseJobConfigMapper; import com.test.common.dao.BaseJobConfigRecordMapper; import com.test.model.BaseJobConfig; import com.test.model.BaseJobConfigRecord; import com.test.utils.SpringContextUtil; import org.apache.log4j.Logger; import org.quartz.CronTrigger; import org.quartz.JobExecutionContext; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jdbc.datasource.DataSourceTransactionManager; import org.springframework.scheduling.quartz.QuartzJobBean; import org.springframework.stereotype.Component; import org.springframework.transaction.TransactionDefinition; import org.springframework.transaction.TransactionStatus; import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.support.DefaultTransactionDefinition; import java.net.InetAddress; import java.net.UnknownHostException; import java.text.SimpleDateFormat; import java.util.Date; @Component public abstract class BaseJob extends QuartzJobBean { private static final Logger logger = Logger.getLogger(BaseJob.class); @Autowired private static BaseJobConfigMapper baseJobConfigMapper; @Autowired private static BaseJobConfigRecordMapper baseJobConfigRecordMapper; @Autowired private static DataSourceTransactionManager transactionManager; public static String IP_STRING = null; static { if (baseJobConfigMapper == null) { baseJobConfigMapper = (BaseJobConfigMapper) SpringContextUtil.getBean("baseJobConfigMapper"); } if (baseJobConfigRecordMapper == null) { baseJobConfigRecordMapper = (BaseJobConfigRecordMapper) SpringContextUtil.getBean("baseJobConfigRecordMapper"); } if (transactionManager == null) { transactionManager = (DataSourceTransactionManager) SpringContextUtil.getBean("transactionManager"); } try { InetAddress ip = InetAddress.getLocalHost(); if (!BlankUtil.isBlank(ip)) { IP_STRING = ip.getHostAddress(); logger.info("本機地址" + IP_STRING); } } catch (UnknownHostException e) { logger.error(e.getMessage(), e); } } /** * Job名稱 */ public String JOB_NAME = getJobName(); /** * 重置時間--分鐘 */ public int JOB_RESET_TIME = resetJobTime(); /** * 要調度的具體任務 */ @Override @Transactional protected void executeInternal(JobExecutionContext context) { if (!Constants.IS_DEV_MODE) { //一、先判斷JOB_NAME是否不爲空,爲空則結束 if (!BlankUtil.isBlank(JOB_NAME)) { CronTrigger cTrigger = (CronTrigger) context.getTrigger(); SimpleDateFormat format = new SimpleDateFormat(DateUtil.DEFAULT_DATE_TIME); Date triggerTime = cTrigger.getPreviousFireTime(); String currentTime = format.format(triggerTime); BaseJobConfig selectBaseJobConfig = new BaseJobConfig(); selectBaseJobConfig.setKeyName(JOB_NAME); long numLong = baseJobConfigMapper.selectCount(selectBaseJobConfig); //三、如果沒有,則插入,狀態爲0(待執行) if (numLong <= 0) { BaseJobConfig baseJobConfig = new BaseJobConfig(); baseJobConfig.setKeyName(JOB_NAME); //初始化觸發時間是上一個小時,避免爲當前時間時,被誤認爲已經跑過 baseJobConfig.setSchedulePreTime(DateUtil.getDateByDifferHours(triggerTime, -1)); baseJobConfig.setCreateTime(new Date()); baseJobConfig.setState(0); baseJobConfigMapper.insert(baseJobConfig); } int numInt = 0; //30分鐘未執行完,則重置狀態 numInt = baseJobConfigMapper.updateBaseJobConfigStatusByTimeOut(JOB_NAME, JOB_RESET_TIME); logger.info("[baseJob.doJob]job.key.resertjobName=" + JOB_NAME + ",num=" + numInt); //有狀態爲0的待執行數據,則將狀態修改成1(執行中) numInt = baseJobConfigMapper.updateBaseJobConfigStatusAtStartTime(JOB_NAME, currentTime); if (numInt > 0) { DefaultTransactionDefinition def = new DefaultTransactionDefinition(); def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED); TransactionStatus status = transactionManager.getTransaction(def); Date startTime = new Date(); //執行業務邏輯 try { logger.info("[baseJob.doJob]執行開始jobName=" + JOB_NAME); jobExecute(); transactionManager.commit(status); } catch (Exception e) { logger.error(e.getMessage(), e); transactionManager.rollback(status); } finally { //五、業務邏輯執行完後,將狀態修改成0 numInt = baseJobConfigMapper.updateBaseJobConfigStatusAtEndTime(JOB_NAME, currentTime); if (numInt <= 0) { logger.error("[baseJob.doJob]active update error jobName=" + JOB_NAME); } Date endTime = new Date(); //記錄定時任務運行狀況 recordBaseJob(JOB_NAME, triggerTime, startTime, endTime, (endTime.getTime() - startTime.getTime()), IP_STRING); logger.info("[baseJob.doJob]執行結束jobName=" + JOB_NAME + "------" + (endTime.getTime() - startTime.getTime())); } } } else { logger.error("[baseJob.doJob]JOB_NAME is null"); } } } public abstract void jobExecute() throws Exception; public abstract String getJobName(); public int resetJobTime() { return 30; } /** * 功能描述: * 記錄任務運行狀況 * */ private void recordBaseJob(String keyName, Date triggerTime, Date startTime, Date endTime, Long costTime, String ip) { try { BaseJobConfigRecord baseJobConfigRecord = new BaseJobConfigRecord(); baseJobConfigRecord.setKeyName(keyName); baseJobConfigRecord.setTriggerTime(triggerTime); baseJobConfigRecord.setStartTime(startTime); baseJobConfigRecord.setEndTime(endTime); baseJobConfigRecord.setCostTime(costTime); baseJobConfigRecord.setIp(ip); baseJobConfigRecord.setCreateTime(new Date()); baseJobConfigRecordMapper.insert(baseJobConfigRecord); } catch (Exception e) { logger.error(e.getMessage(), e); } } }
定時任務實現類只須要繼承baseJob,實現jobExecute方法便可實現定時任務互斥執行,以下:apache
package com.test.job; import com.test.job.BaseJob; import org.apache.log4j.Logger; /** * 功能描述: * 測試定時任務 */ public class TestJob extends BaseJob { private static final Logger logger = Logger.getLogger(TestJob.class); @Override public void jobExecute() throws Exception { System.out.println("測試定時任務"); } @Override public String getJobName() { return "TestJob"; } }
spring.xml添加以下配置:服務器
<!-- 定時任務註解配置 --> <task:annotation-driven scheduler="scheduler" mode="proxy"/> <task:scheduler id="scheduler" pool-size="10"/>
spring-job.xml添加以下配置:mybatis
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <bean id="trigger" class="org.springframework.scheduling.quartz.SchedulerFactoryBean"> <property name="triggers"> <list> <ref bean="testJobCronTriggerBean"/> </list> </property> </bean> <!--測試任務 start--> <bean id="testJobDetail" class="org.springframework.scheduling.quartz.JobDetailFactoryBean"> <property name="jobClass" value="com.test.job.TestJob"/> <property name="durability" value="true"/> </bean> <bean id="testJobCronTriggerBean" class="org.springframework.scheduling.quartz.CronTriggerFactoryBean"> <property name="jobDetail" ref="testJobDetail"/> <property name="cronExpression" value="0 0/1 * * * ?"/> </bean> <!--測試任務 end--> </beans>
mybatis相關代碼以下:app
package com.test.common.dao; import com.test.model.BaseJobConfig; import org.apache.ibatis.annotations.Param; import tk.mybatis.mapper.common.Mapper; public interface BaseJobConfigMapper extends Mapper<BaseJobConfig> { int updateBaseJobConfigStatusByTimeOut(@Param("keyName") String keyName, @Param("resetTime") Integer resetTime); int updateBaseJobConfigStatusAtStartTime(@Param("keyName") String keyName, @Param("schedulePreTime") String schedulePreTime); int updateBaseJobConfigStatusAtEndTime(@Param("keyName") String keyName, @Param("schedulePreTime") String schedulePreTime); }
<?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> <mapper namespace="com.test.common.dao.BaseJobConfigMapper"> <resultMap id="BaseResultMap" type="com.test.model.BaseJobConfig"> <!-- WARNING - @mbg.generated --> <id column="key_name" jdbcType="VARCHAR" property="keyName" /> <result column="key_value" jdbcType="VARCHAR" property="keyValue" /> <result column="schedule_pre_time" jdbcType="TIMESTAMP" property="schedulePreTime" /> <result column="actual_pre_time" jdbcType="TIMESTAMP" property="actualPreTime" /> <result column="state" jdbcType="TINYINT" property="state" /> <result column="create_time" jdbcType="TIMESTAMP" property="createTime" /> </resultMap> <!-- 運行超時,重置狀態 --> <update id="updateBaseJobConfigStatusByTimeOut"> <![CDATA[ update basejob_config set state = 0 where key_name = #{keyName} and state = 1 and actual_pre_time < DATE_SUB(now(), INTERVAL #{resetTime} MINUTE) ]]> </update> <!-- 定時任務運行開始更新互斥狀態 --> <update id="updateBaseJobConfigStatusAtStartTime"> <![CDATA[ update basejob_config set state = 1,actual_pre_time = now(),schedule_pre_time = #{schedulePreTime} where key_name = #{keyName} and state = 0 and schedule_pre_time != #{schedulePreTime} ]]> </update> <!-- 定時任務運行結束後更新互斥狀態 --> <update id="updateBaseJobConfigStatusAtEndTime"> <![CDATA[ update basejob_config set state = 0 where key_name = #{keyName} and state = 1 and schedule_pre_time = #{schedulePreTime} ]]> </update> </mapper>
數據庫表設計:分佈式
CREATE TABLE `basejob_config` ( `key_name` varchar(255) NOT NULL DEFAULT '' COMMENT '參數code', `key_value` varchar(255) DEFAULT NULL COMMENT '參數值', `schedule_pre_time` datetime DEFAULT NULL COMMENT '上一次計劃運行時間', `actual_pre_time` datetime DEFAULT NULL COMMENT '上一次實際執行時間', `state` tinyint(4) NOT NULL DEFAULT '0' COMMENT '狀態--1表明正在執行0表明等待執行', `create_time` datetime DEFAULT NULL COMMENT '建立時間', PRIMARY KEY (`key_name`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='baseJob配置表'; CREATE TABLE `basejob_config_record` ( `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主鍵', `key_name` varchar(255) DEFAULT NULL COMMENT '定時任務名稱', `trigger_time` datetime DEFAULT NULL COMMENT '定時任務計劃觸發時間', `start_time` datetime DEFAULT NULL COMMENT '定時任務開始時間', `end_time` datetime DEFAULT NULL COMMENT '定時任務結束時間', `cost_time` bigint(20) DEFAULT NULL COMMENT '耗時', `ip` varchar(50) DEFAULT NULL COMMENT '運行服務器IP', `create_time` datetime DEFAULT NULL COMMENT '建立時間', PRIMARY KEY (`id`), KEY `idx_basejob_config_record_key_name` (`key_name`) USING BTREE, KEY `idx_basejob_config_record_trigger_time` (`trigger_time`) USING BTREE, KEY `idx_basejob_config_record_start_time` (`start_time`) USING BTREE, KEY `idx_basejob_config_record_end_time` (`end_time`) USING BTREE, KEY `idx_basejob_config_record_create_time` (`create_time`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='定時任務運行記錄表';
原理就是利用數據庫樂觀鎖對數據庫行記錄進行update,若能update成功,則證實服務器搶到一個鎖,則執行定時任務,update不成功的服務器,則直接退出,這樣一個簡單的定時任務分佈式鎖就實現了。ide