本文首發於 vivo互聯網技術 微信公衆號
連接: https://mp.weixin.qq.com/s/l4vuYpNRjKxQRkRTDhyg2Q
做者:陳王榮html
分佈式任務調度框架幾乎是每一個大型應用必備的工具,本文介紹了任務調度框架使用的需求背景和痛點,對業界廣泛使用的開源分佈式任務調度框架的使用進行了探究實踐,並分析了這幾種框架的優劣勢和對自身業務的思考。java
(1)時間驅動處理場景:整點發送優惠券,天天更新收益,天天刷新標籤數據和人羣數據。mysql
(2)批量處理數據:按月批量統計報表數據,批量更新短信狀態,實時性要求不高。spring
(3)異步執行解耦:活動狀態刷新,異步執行離線查詢,與內部邏輯解耦。sql
(1)任務執行監控告警能力。數據庫
(2)任務可靈活動態配置,無需重啓。微信
(3)業務透明,低耦合,配置精簡,開發方便。架構
(4)易測試。併發
(5)高可用,無單點故障。app
(6)任務不可重複執行,防止邏輯異常。
(7)大任務的分發並行處理能力。
Timer缺陷:
Timer底層是使用單線程來處理多個Timer任務,這意味着全部任務實際上都是串行執行,前一個任務的延遲會影響到以後的任務的執行。
因爲單線程的緣故,一旦某個定時任務在運行時,產生未處理的異常,那麼不只當前這個線程會中止,全部的定時任務都會中止。
因爲上述缺陷,儘可能不要使用Timer, idea中也會明確提示,使用ScheduledThreadPoolExecutor替代Timer 。
ScheduledExecutorService對於Timer的缺陷進行了修補,首先ScheduledExecutorService內部實現是ScheduledThreadPool線程池,能夠支持多個任務併發執行。
對於某一個線程執行的任務出現異常,也會處理,不會影響其餘線程任務的執行,另外ScheduledExecutorService是基於時間間隔的延遲,執行不會因爲系統時間的改變發生變化。
固然,ScheduledExecutorService也有本身的侷限性:只能根據任務的延遲來進行調度,沒法知足基於絕對時間和日曆調度的需求。
spring task 是spring自主開發的輕量級定時任務框架,不須要依賴其餘額外的包,配置較爲簡單。
此處使用註解配置
Spring Task 自己不支持持久化,也沒有推出官方的分佈式集羣模式,只能靠開發者在業務應用中本身手動擴展實現,沒法知足可視化,易配置的需求。
Quartz框架是Java領域最著名的開源任務調度工具,也是目前事實上的定時任務標準,幾乎所有的開源定時任務框架都是基於Quartz核心調度構建而成。
核心組件和架構
關鍵概念
(1)Scheduler:任務調度器,是執行任務調度的控制器。本質上是一個計劃調度容器,註冊了所有Trigger和對應的JobDetail, 使用線程池做爲任務運行的基礎組件,提升任務執行效率。
(2)Trigger:觸發器,用於定義任務調度的時間規則,告訴任務調度器何時觸發任務,其中CronTrigger是基於cron表達式構建的功能強大的觸發器。
(3)Calendar:日曆特定時間點的集合。一個trigger能夠包含多個Calendar,可用於排除或包含某些時間點。
(4)JobDetail:是一個可執行的工做,用來描述Job實現類及其它相關的靜態信息,如Job的名稱、監聽器等相關信息。
(5)Job:任務執行接口,只有一個execute方法,用於執行真正的業務邏輯。
(6)JobStore:任務存儲方式,主要有RAMJobStore和JDBCJobStore,RAMJobStore是存儲在JVM的內存中,有丟失和數量受限的風險,JDBCJobStore是將任務信息持久化到數據庫中,支持集羣。
(1)關於Quartz的基本使用
(2)業務使用要知足動態修改和重啓不丟失, 通常須要使用數據庫進行保存。
Quartz自己支持JDBCJobStore,可是其配置的數據表比較多,官方推薦配置可參照官方文檔,超過10張表,業務使用比較重。
(3)組件化
(4)擴展
集羣模式
經過故障轉移和負載均衡實現了任務的高可用性,經過數據庫的鎖機制來確保任務執行的惟一性,可是集羣特性僅僅只是用來HA,節點數量的增長並不會提高單個任務的執行效率,不能實現水平擴展。
Quartz插件
能夠對特定須要進行擴展,好比增長觸發器和任務執行日誌,任務依賴串行處理場景,可參考:quartz插件——實現任務之間的串行調度
(1)須要把任務信息持久化到業務數據表,和業務有耦合。
(2)調度邏輯和執行邏輯並存於同一個項目中,在機器性能固定的狀況下,業務和調度之間不可避免地會相互影響。
(3)quartz集羣模式下,是經過數據庫獨佔鎖來惟一獲取任務,任務執行並無實現完善的負載均衡機制。
XXL-JOB是一個輕量級分佈式任務調度平臺,主打特色是平臺化,易部署,開發迅速、學習簡單、輕量級、易擴展,代碼仍在持續更新中。
「調度中心」是任務調度控制檯,平臺自身並不承擔業務邏輯,只是負責任務的統一管理和調度執行,而且提供任務管理平臺, 「執行器」 負責接收「調度中心」的調度並執行,可直接部署執行器,也能夠將執行器集成到現有業務項目中。 經過將任務的調度控制和任務的執行解耦,業務使用只須要關注業務邏輯的開發。
主要提供了任務的動態配置管理、任務監控和統計報表以及調度日誌幾大功能模塊,支持多種運行模式和路由策略,可基於對應執行器機器集羣數量進行簡單分片數據處理。
2.1.0版本前核心調度模塊都是基於quartz框架,2.1.0版本開始自研調度組件,移除quartz依賴 ,使用時間輪調度。
詳細配置和介紹參考官方文檔。
示例1:實現簡單任務配置,只須要繼承IJobHandler 抽象類,並聲明註解
@JobHandler(value="offlineTaskJobHandler") ,實現業務邏輯便可。(注:這次引入了dubbo,後文介紹)。
@JobHandler(value="offlineTaskJobHandler") @Component public class OfflineTaskJobHandler extends IJobHandler { @Reference(check = false,version = "cms-dev",group="cms-service") private OfflineTaskExecutorFacade offlineTaskExecutorFacade; @Override public ReturnT<String> execute(String param) throws Exception { XxlJobLogger.log(" offlineTaskJobHandler start."); try { offlineTaskExecutorFacade.executeOfflineTask(); } catch (Exception e) { XxlJobLogger.log("offlineTaskJobHandler-->exception." , e); return FAIL; } XxlJobLogger.log("XXL-JOB, offlineTaskJobHandler end."); return SUCCESS; } }
示例2:分片廣播任務。
@JobHandler(value="shardingJobHandler") @Service public class ShardingJobHandler extends IJobHandler { @Override public ReturnT<String> execute(String param) throws Exception { // 分片參數 ShardingUtil.ShardingVO shardingVO = ShardingUtil.getShardingVo(); XxlJobLogger.log("分片參數:當前分片序號 = {}, 總分片數 = {}", shardingVO.getIndex(), shardingVO.getTotal()); // 業務邏輯 for (int i = 0; i < shardingVO.getTotal(); i++) { if (i == shardingVO.getIndex()) { XxlJobLogger.log("第 {} 片, 命中分片開始處理", i); } else { XxlJobLogger.log("第 {} 片, 忽略", i); } } return SUCCESS; } }
(1)引入dubbo-spring-boot-starter和業務facade jar包依賴。
<dependency> <groupId>com.alibaba.spring.boot</groupId> <artifactId>dubbo-spring-boot-starter</artifactId> <version>2.0.0</version> </dependency> <dependency> <groupId>com.demo.service</groupId> <artifactId>xxx-facade</artifactId> <version>1.9-SNAPSHOT</version> </dependency>
(2)配置文件加入dubbo消費端配置(可根據環境定義多個配置文件,經過profile切換)。
## Dubbo 服務消費者配置 spring.dubbo.application.name=xxl-job spring.dubbo.registry.address=zookeeper://zookeeper.xyz:2183 spring.dubbo.port=20880 spring.dubbo.version=demo spring.dubbo.group=demo-service
(3)代碼中經過@Reference注入facade接口便可。
@Reference(check = false,version = "demo",group="demo-service") private OfflineTaskExecutorFacade offlineTaskExecutorFacade;
(4)啓動程序加入@EnableDubboConfiguration註解。
@SpringBootApplication @EnableDubboConfiguration public class XxlJobExecutorApplication { public static void main(String[] args) { SpringApplication.run(XxlJobExecutorApplication.class, args); } }
內置了平臺項目,方便了開發者對任務的管理和執行日誌的監控,並提供了一些便於測試的功能。
(1)任務監控和報表的優化。
(2)任務報警方式的擴展,好比加入告警中心,提供內部消息,短信告警。
(3)對實際業務內部執行出現異常狀況下的不一樣監控告警和重試策略。
Elastic-Job是一個分佈式調度解決方案,由兩個相互獨立的子項目Elastic-Job-Lite和Elastic-Job-Cloud組成。
Elastic-Job-Lite定位爲輕量級無中心化解決方案,使用jar包的形式提供分佈式任務的協調服務。
Elastic-Job-Cloud使用Mesos + Docker的解決方案,額外提供資源治理、應用分發以及進程隔離等服務。
惋惜的是已經兩年沒有迭代更新記錄。
2.5.3.1 demo使用
(1)安裝zookeeper,配置註冊中心config,配置文件加入註冊中心zk的配置。
@Configuration @ConditionalOnExpression("'${regCenter.serverList}'.length() > 0") public class JobRegistryCenterConfig { @Bean(initMethod = "init") public ZookeeperRegistryCenter regCenter(@Value("${regCenter.serverList}") final String serverList, @Value("${regCenter.namespace}") final String namespace) { return new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList, namespace)); } }
spring.application.name=demo_elasticjob regCenter.serverList=localhost:2181 regCenter.namespace=demo_elasticjob spring.datasource.url=jdbc:mysql://127.0.0.1:3306/xxl-job?Unicode=true&characterEncoding=UTF-8 spring.datasource.username=user spring.datasource.password=pwd
(2)配置數據源config,並配置文件中加入數據源配置。
@Getter @Setter @NoArgsConstructor @AllArgsConstructor @ToString @Configuration @ConfigurationProperties(prefix = "spring.datasource") public class DataSourceProperties { private String url; private String username; private String password; @Bean @Primary public DataSource getDataSource() { DruidDataSource dataSource = new DruidDataSource(); dataSource.setUrl(url); dataSource.setUsername(username); dataSource.setPassword(password); return dataSource; } }
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/xxl-job?Unicode=true&characterEncoding=UTF-8 spring.datasource.username=user spring.datasource.password=pwd
(3)配置事件config。
@Configuration public class JobEventConfig { @Autowired private DataSource dataSource; @Bean public JobEventConfiguration jobEventConfiguration() { return new JobEventRdbConfiguration(dataSource); } }
(4)爲了便於靈活配置不一樣的任務觸發事件,加入ElasticSimpleJob註解。
@Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) public @interface ElasticSimpleJob { @AliasFor("cron") String value() default ""; @AliasFor("value") String cron() default ""; String jobName() default ""; int shardingTotalCount() default 1; String shardingItemParameters() default ""; String jobParameter() default ""; }
(5)對配置進行初始化。
@Configuration @ConditionalOnExpression("'${elaticjob.zookeeper.server-lists}'.length() > 0") public class ElasticJobAutoConfiguration { @Value("${regCenter.serverList}") private String serverList; @Value("${regCenter.namespace}") private String namespace; @Autowired private ApplicationContext applicationContext; @Autowired private DataSource dataSource; @PostConstruct public void initElasticJob() { ZookeeperRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList, namespace)); regCenter.init(); Map<String, SimpleJob> map = applicationContext.getBeansOfType(SimpleJob.class); for (Map.Entry<String, SimpleJob> entry : map.entrySet()) { SimpleJob simpleJob = entry.getValue(); ElasticSimpleJob elasticSimpleJobAnnotation = simpleJob.getClass().getAnnotation(ElasticSimpleJob.class); String cron = StringUtils.defaultIfBlank(elasticSimpleJobAnnotation.cron(), elasticSimpleJobAnnotation.value()); SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(JobCoreConfiguration.newBuilder(simpleJob.getClass().getName(), cron, elasticSimpleJobAnnotation.shardingTotalCount()).shardingItemParameters(elasticSimpleJobAnnotation.shardingItemParameters()).build(), simpleJob.getClass().getCanonicalName()); LiteJobConfiguration liteJobConfiguration = LiteJobConfiguration.newBuilder(simpleJobConfiguration).overwrite(true).build(); JobEventRdbConfiguration jobEventRdbConfiguration = new JobEventRdbConfiguration(dataSource); SpringJobScheduler jobScheduler = new SpringJobScheduler(simpleJob, regCenter, liteJobConfiguration, jobEventRdbConfiguration); jobScheduler.init(); } } }
(6)實現 SimpleJob接口,按上文中方法整合dubbo, 完成業務邏輯。
@ElasticSimpleJob( cron = "*/10 * * * * ?", jobName = "OfflineTaskJob", shardingTotalCount = 2, jobParameter = "測試參數", shardingItemParameters = "0=A,1=B") @Component public class MySimpleJob implements SimpleJob { Logger logger = LoggerFactory.getLogger(OfflineTaskJob.class); @Reference(check = false, version = "cms-dev", group = "cms-service") private OfflineTaskExecutorFacade offlineTaskExecutorFacade; @Override public void execute(ShardingContext shardingContext) { offlineTaskExecutorFacade.executeOfflineTask(); logger.info(String.format("Thread ID: %s, 做業分片總數: %s, " + "當前分片項: %s.當前參數: %s," + "做業名稱: %s.做業自定義參數: %s" , Thread.currentThread().getId(), shardingContext.getShardingTotalCount(), shardingContext.getShardingItem(), shardingContext.getShardingParameter(), shardingContext.getJobName(), shardingContext.getJobParameter() )); } }
(1)Saturn:Saturn是惟品會開源的一個分佈式任務調度平臺,在Elastic Job的基礎上進行了改造。
(2)SIA-TASK:是宜信開源的分佈式任務調度平臺。
業務思考:
豐富任務監控數據和告警策略。
接入統一登陸和權限控制。
對於併發場景不是特別高的系統來講,xxl-job配置部署簡單易用,不須要引入多餘的組件,同時提供了可視化的控制檯,使用起來很是友好,是一個比較好的選擇。但願直接利用開源分佈式框架能力的系統,建議根據自身的狀況來進行合適的選型。