1. ElasticJob 是什麼java
ElasticJob 是一個分佈式調度解決方案,由兩個相互獨立的子項目 ElasticJob-Lite 和 ElasticJob-Cloud 組成。 web
ElasticJob-Lite 定位爲輕量級無中心化解決方案,使用jar的形式提供分佈式任務的協調服務。spring
ElasticJob 已於2020年5月28日成爲 Apache ShardingSphere 的子項目。 apache
ElasticJob特性:api
2. 實例演示服務器
這裏採用最新版本 3.0.0-RC1 session
一、啓動zookeeper服務app
首先,下載zookeeper-3.6.0版本,解壓後複製一份zoo_sample.cfg,重命名未zoo.cfg,保持默認配置便可socket
注意,zookeeper-3.6.0啓動之後會佔用三個端口,其中包括8080哦maven
二、編寫定時任務業務邏輯
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.1</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>elasticjob-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<properties>
<java.version>1.8</java.version>
<elasticjob-lite.version>3.0.0-RC1</elasticjob-lite.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency> <groupId>org.apache.shardingsphere.elasticjob</groupId> <artifactId>elasticjob-lite-spring-boot-starter</artifactId> <version>${elasticjob-lite.version}</version> <exclusions> <exclusion> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.shardingsphere.elasticjob</groupId> <artifactId>elasticjob-error-handler-dingtalk</artifactId> <version>${elasticjob-lite.version}</version> </dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
application.yml
elasticjob:
regCenter:
serverLists: 192.168.100.15:2181
namespace: elasticjob-demo
baseSleepTimeMilliseconds: 2000
maxSleepTimeMilliseconds: 4000
maxRetries: 3
jobs:
firstJob:
elasticJobClass: com.example.job.FirstJob
cron: 0/6 * * * * ?
shardingTotalCount: 3
jobErrorHandlerType: DINGTALK
props:
dingtalk:
webhook: https://oapi.dingtalk.com/robot/send?access_token=xxx
secret: ASDF
connectTimeout: 3000
readTimeout: 5000
secondJob:
elasticJobClass: com.example.job.SecondJob
cron: 0/10 * * * * ?
shardingTotalCount: 1
jobErrorHandlerType: DINGTALK
props:
dingtalk:
webhook: https://oapi.dingtalk.com/robot/send?access_token=xxx
secret: ASDF
connectTimeout: 3000
readTimeout: 5000
兩個定時任務
FirstJob.java
package com.example.job;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
import org.springframework.stereotype.Component;
/**
* @author ChengJianSheng
* @date 2021/1/13
*/
@Component
public class FirstJob implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
switch (shardingContext.getShardingItem()) {
case 0:
// do something by sharding item 0
System.out.println(0);
// int a = 1 / 0;
break;
case 1:
// do something by sharding item 1
System.out.println(1);
break;
case 2:
// do something by sharding item 2
System.out.println(2);
break;
// case n: ...
}
}
}
SecondJob.java
package com.example.job;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
import org.springframework.stereotype.Component;
/**
* @author ChengJianSheng
* @date 2021/1/18
*/
@Component
public class SecondJob implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
System.out.println("hello");
}
}
項目結構
運行項目便可
經過 ElasticJob-UI 查看任務
https://shardingsphere.apache.org/elasticjob/current/cn/downloads/
3. 啓動報錯排查
項目啓動過程當中,可能會報以下錯誤
org.apache.zookeeper.ClientCnxn$EndOfStreamException: Unable to read additional data from server sessionid 0x1000bdf48160002, likely server has closed socket
org.apache.shardingsphere.elasticjob.reg.exception.RegException: org.apache.zookeeper.KeeperException$OperationTimeoutException: KeeperErrorCode = OperationTimeout
Caused by: org.apache.zookeeper.KeeperException$OperationTimeoutException: KeeperErrorCode = OperationTimeout
最開始,我覺得是zookeeper版本的問題,後來換了版本也不行,防火牆關了也不行
而後,我懷疑是開發環境問題,因而在本地運行zookeeper,程序連127.0.0.1:2181,竟然能夠了
因而我陷入了沉思,爲今之計,只剩下一個辦法了,打斷點調試
找到了異常拋出的位置,以下圖
baseSleepTimeMilliseconds 表示 等待重試的間隔時間的初始值
maxSleepTimeMilliseconds 表示 等待重試的間隔時間的最大值
maxRetries 表示 最大重試次數
根據代碼中意思,若是在 maxSleepTimeMilliseconds * maxRetries 毫秒內尚未鏈接成功,則鏈接關閉,並拋出操做超時異常
聯想到,鏈接本地zookeeper能夠,連開發環境zk就不行,再加上觀察日誌從鏈接開始到拋異常的時間間隔,我猜到應該是maxSleepTimeMilliseconds設置過短了
因而,application.yml配置文件中將maxSleepTimeMilliseconds設置爲4000,baseSleepTimeMilliseconds設置爲2000
而後好使
回想剛開始報的那些錯,其實根本就尚未連上zookeeper
4. 做業分片
ElasticJob 中任務分片項的概念,使得任務能夠在分佈式的環境下運行,每臺任務服務器只運行分配給該服務器的分片。 隨着服務器的增長或宕機,ElasticJob 會近乎實時的感知服務器數量的變動,從而從新爲分佈式的任務服務器分配更加合理的任務分片項,使得任務能夠隨着資源的增長而提高效率。
任務的分佈式執行,須要將一個任務拆分爲多個獨立的任務項,而後由分佈式的服務器分別執行某一個或幾個分片項。
也就是說,分片是爲了在分佈式環境下高效合理利用任務服務器資源的。簡單地來說,一個定時任務,咱們運行多臺服務器,這意味着有多個實例在執行同一項任務,分片就是爲了告訴這些實例各自該處理那些數據,最大限度的下降數據重複處理的問題,同時加快任務處理速度。每一個任務實例該處理哪些數據,是根據分片項來的,在任務代碼層面,就能夠根據分片項來進行邏輯判斷。
舉例說明,若是做業分爲 4 片,用兩臺服務器執行,則每一個服務器分到 2 片,分別負責做業的 50% 的負載
分片項
ElasticJob 並不直接提供數據處理的功能,而是將分片項分配至各個運行中的做業服務器,開發者須要自行處理分片項與業務的對應關係。 分片項爲數字,始於 0 而終於分片總數減 1。
個性化分片參數
個性化參數能夠和分片項匹配對應關係,用於將分片項的數字轉換爲更加可讀的業務代碼。
合理使用個性化參數可讓代碼更可讀。例如,若是配置爲 0=北京,1=上海,2=廣州,那麼代碼中直接使用北京,上海,廣州的枚舉值便可完成分片項和業務邏輯的對應關係。
分片策略
平均分片策略
根據分片項平均分片。若是做業服務器數量與分片總數沒法整除,多餘的分片將會順序的分配至每個做業服務器。
舉例說明:
奇偶分片策略
根據做業名稱哈希值的奇偶數決定按照做業服務器 IP 升序或是降序的方式分片。
若是做業名稱哈希值是偶數,則按照 IP 地址進行升序分片; 若是做業名稱哈希值是奇數,則按照 IP 地址進行降序分片。 可用於讓服務器負載在多個做業共同運行時分配的更加均勻。
舉例說明:
輪詢分片策略
根據做業名稱輪詢分片。
5. 官方文檔
https://shardingsphere.apache.org/elasticjob/current/cn/features/elastic/
https://shardingsphere.apache.org/elasticjob/current/cn/user-manual/elasticjob-lite/
https://shardingsphere.apache.org/elasticjob/current/cn/user-manual/elasticjob-lite/configuration/
https://shardingsphere.apache.org/elasticjob/current/cn/dev-manual/