分佈式任務調度框架,結合zookeeper技術解決quartz框架在分佈式系統中重複的定時任務致使的不可預見的錯誤java
pommysql
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid-spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </dependency> <dependency> <groupId>com.github.kuhn-he</groupId> <artifactId>elastic-job-lite-spring-boot-starter</artifactId> <version>2.1.5</version> </dependency> </dependencies>
application.ymlgit
# zk配置 elaticjob: zookeeper: server-lists: localhost:2181 namespace: elastic-job-demo
# 數據源配置
省略
SimpleJobgithub
@ElasticSimpleJob(cron = "0/10 * * * * ?", jobName = "test123", shardingTotalCount = 3, jobParameter = "測試參數", shardingItemParameters = "0=Beijing,1=Shanghai,2=Guangzhou") @Component public class MySimpleJob implements SimpleJob { private static final Logger logger = LoggerFactory.getLogger(MySimpleJob.class); @Override public void execute(ShardingContext shardingContext) { logger.info(String.format("------Thread ID: %s, 任務總片數: %s, " + "當前分片項: %s,當前參數: %s," + "當前任務名稱: %s,當前任務參數: %s,"+ "當前任務的id: %s", //獲取當前線程的id Thread.currentThread().getId(), //獲取任務總片數 shardingContext.getShardingTotalCount(), //獲取當前分片項 shardingContext.getShardingItem(), //獲取當前的參數 shardingContext.getShardingParameter(), //獲取當前的任務名稱 shardingContext.getJobName(), //獲取當前任務參數 shardingContext.getJobParameter(), //獲取任務的id shardingContext.getTaskId() )); } }
默認使用:基於平均分配算法的分片策略web
/* * Copyright 1999-2015 dangdang.com. * <p> * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * </p> */ package io.elasticjob.lite.api.strategy.impl; import io.elasticjob.lite.api.strategy.JobInstance; import io.elasticjob.lite.api.strategy.JobShardingStrategy; import java.util.ArrayList; import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; /** * 基於平均分配算法的分片策略. * * <p> * 若是分片不能整除, 則不能整除的多餘分片將依次追加到序號小的服務器. * 如: * 1. 若是有3臺服務器, 分紅9片, 則每臺服務器分到的分片是: 1=[0,1,2], 2=[3,4,5], 3=[6,7,8]. * 2. 若是有3臺服務器, 分紅8片, 則每臺服務器分到的分片是: 1=[0,1,6], 2=[2,3,7], 3=[4,5]. * 3. 若是有3臺服務器, 分紅10片, 則每臺服務器分到的分片是: 1=[0,1,2,9], 2=[3,4,5], 3=[6,7,8]. * </p> * * @author zhangliang */ public final class AverageAllocationJobShardingStrategy implements JobShardingStrategy { @Override public Map<JobInstance, List<Integer>> sharding(final List<JobInstance> jobInstances, final String jobName, final int shardingTotalCount) { if (jobInstances.isEmpty()) { return Collections.emptyMap(); } Map<JobInstance, List<Integer>> result = shardingAliquot(jobInstances, shardingTotalCount); addAliquant(jobInstances, shardingTotalCount, result); return result; } private Map<JobInstance, List<Integer>> shardingAliquot(final List<JobInstance> shardingUnits, final int shardingTotalCount) { Map<JobInstance, List<Integer>> result = new LinkedHashMap<>(shardingTotalCount, 1); int itemCountPerSharding = shardingTotalCount / shardingUnits.size(); int count = 0; for (JobInstance each : shardingUnits) { List<Integer> shardingItems = new ArrayList<>(itemCountPerSharding + 1); for (int i = count * itemCountPerSharding; i < (count + 1) * itemCountPerSharding; i++) { shardingItems.add(i); } result.put(each, shardingItems); count++; } return result; } private void addAliquant(final List<JobInstance> shardingUnits, final int shardingTotalCount, final Map<JobInstance, List<Integer>> shardingResults) { int aliquant = shardingTotalCount % shardingUnits.size(); int count = 0; for (Map.Entry<JobInstance, List<Integer>> entry : shardingResults.entrySet()) { if (count < aliquant) { entry.getValue().add(shardingTotalCount / shardingUnits.size() * shardingUnits.size() + count); } count++; } } }
第一步:去下載包
https://github.com/miguangying/elastic-job-lite-console#elastic-job-lite-console算法
第二步:解壓縮
解壓縮elastic-job-lite-console-${version}.tar.gz並執行bin\start.sh,windows平臺執行start.bat。打開瀏覽器訪問http://localhost:8899/便可訪問控制檯。spring
8899爲默認端口號,可經過啓動腳本輸入-p自定義端口號。elastic-job-lite-console-${version}.tar.gz可經過mvn install編譯獲取。sql
第三步:登陸
提供兩種帳戶,管理員及訪客,管理員擁有所有操做權限,訪客僅擁有察看權限。默認管理員用戶名和密碼是root/root,訪客用戶名和密碼是guest/guest,可經過conf\auth.properties修改管理員及訪客用戶名及密碼。express