國慶節快到了,要開始休假了,筆者仍是很開心的,國慶快樂!java
廢話少說,直接進入正題。算法
相信你們對XXL-JOB
都很瞭解,故本文對源碼不進行過多介紹,側重的是看源碼過程當中想到的幾個知識點,不必定都對,請大神們批評指正。服務器
XXL-JOB
是一個輕量級分佈式任務調度平臺,其核心設計目標是開發迅速、學習簡單、輕量級、易擴展。現已開放源代碼並接入多家公司線上產品線,開箱即用。XXL-JOB
分爲調度中心、執行器、數據中心,調度中心負責任務管理及調度、執行器管理、日誌管理等,執行器負責任務執行及執行結果回調。時間輪出自Netty
中的HashedWheelTimer
,是一個環形結構,能夠用時鐘來類比,鐘面上有不少bucket
,每個bucket
上能夠存放多個任務,使用一個List
保存該時刻到期的全部任務,同時一個指針隨着時間流逝一格一格轉動,並執行對應bucket
上全部到期的任務。任務經過取模決定應該放入哪一個bucket
。和HashMap
的原理相似,newTask
對應put
,使用List
來解決 Hash 衝突。負載均衡
以上圖爲例,假設一個bucket
是1秒,則指針轉動一輪表示的時間段爲8s,假設當前指針指向 0,此時須要調度一個3s後執行的任務,顯然應該加入到(0+3=3)的方格中,指針再走3s次就能夠執行了;若是任務要在10s後執行,應該等指針走完一輪零2格再執行,所以應放入2,同時將round(1)
保存到任務中。檢查到期任務時只執行round
爲0的,bucket
上其餘任務的round
減1。框架
固然,還有優化的「分層時間輪」的實現,請參考https://cnkirito.moe/timer/。dom
XXL-JOB中的調度方式從Quartz
變成了自研調度的方式,很像時間輪,能夠理解爲有60個bucket
且每一個bucket
爲1秒,可是沒有了round
的概念。async
具體能夠看下圖。分佈式
ringThread
和scheduleThread
,其做用以下。一、scheduleThread:對任務信息進行讀取,預讀將來5s即將觸發的任務,放入時間輪。 二、ringThread:對當前
bucket
和前一個bucket
中的任務取出並執行。ide
// 環狀結構
private volatile static Map<Integer, List<Integer>> ringData = new ConcurrentHashMap<>();
// 任務下次啓動時間(單位爲秒) % 60
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
// 任務放進時間輪
private void pushTimeRing(int ringSecond, int jobId){
// push async ring
List<Integer> ringItemData = ringData.get(ringSecond);
if (ringItemData == null) {
ringItemData = new ArrayList<Integer>();
ringData.put(ringSecond, ringItemData);
}
ringItemData.add(jobId);
}
複製代碼
// 同時取兩個時間刻度的任務
List<Integer> ringItemData = new ArrayList<>();
int nowSecond = Calendar.getInstance().get(Calendar.SECOND);
// 避免處理耗時太長,跨過刻度,向前校驗一個刻度;
for (int i = 0; i < 2; i++) {
List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
if (tmpData != null) {
ringItemData.addAll(tmpData);
}
}
// 運行
for (int jobId: ringItemData) {
JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null);
}
複製代碼
XXL-JOB
在執行任務時,任務具體在哪一個執行器上運行是根據路由策略來決定的,其中有一個策略是一致性Hash策略(源碼在ExecutorRouteConsistentHash.java),天然而然想到了一致性Hash算法。可見,一致性Hash算法的關鍵在於Hash算法,保證虛擬節點及Hash結果的均勻性, 而均勻性能夠理解爲減小Hash衝突,Hash衝突的知識點請參考從HashMap,Redis 字典看【Hash】。。。。函數
// jobId轉換爲md5
// 不直接用hashCode() 是由於擴大hash取值範圍,減小衝突
byte[] digest = md5.digest();
// 32位hashCode
long hashCode = ((long) (digest[3] & 0xFF) << 24)
| ((long) (digest[2] & 0xFF) << 16)
| ((long) (digest[1] & 0xFF) << 8)
| (digest[0] & 0xFF);
long truncateHashCode = hashCode & 0xffffffffL;
複製代碼
HashMap
的Hash函數f(key) = hash(key) & (table.length - 1)
// 使用>>> 16的緣由,hashCode()的高位和低位都對f(key)有了必定影響力,使得分佈更加均勻,散列衝突的概率就小了。
hash(key) = (h = key.hashCode()) ^ (h >>> 16)
複製代碼
XXL-JOB的分片任務實現了任務的分佈式執行,實際上是筆者調研的重點,平常開發中不少定時任務都是單機執行,對於後續數據量大的任務最好有一個分佈式的解決方案。
分片任務的路由策略,源代碼做者提出了分片廣播的概念,剛開始還有點摸不清頭腦,看了源碼逐漸清晰了起來。
想必看過源碼的也遇到過這麼一個小插曲,路由策略咋沒實現?以下圖所示。
public enum ExecutorRouteStrategyEnum {
FIRST(I18nUtil.getString("jobconf_route_first"), new ExecutorRouteFirst()),
LAST(I18nUtil.getString("jobconf_route_last"), new ExecutorRouteLast()),
ROUND(I18nUtil.getString("jobconf_route_round"), new ExecutorRouteRound()),
RANDOM(I18nUtil.getString("jobconf_route_random"), new ExecutorRouteRandom()),
CONSISTENT_HASH(I18nUtil.getString("jobconf_route_consistenthash"), new ExecutorRouteConsistentHash()),
LEAST_FREQUENTLY_USED(I18nUtil.getString("jobconf_route_lfu"), new ExecutorRouteLFU()),
LEAST_RECENTLY_USED(I18nUtil.getString("jobconf_route_lru"), new ExecutorRouteLRU()),
FAILOVER(I18nUtil.getString("jobconf_route_failover"), new ExecutorRouteFailover()),
BUSYOVER(I18nUtil.getString("jobconf_route_busyover"), new ExecutorRouteBusyover()),
// 說好的實現呢???居然是null
SHARDING_BROADCAST(I18nUtil.getString("jobconf_route_shard"), null);
複製代碼
XxlJobTrigger.trigger
函數中的一段代碼。...
// 若是是分片路由,走的是這段邏輯
if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)
&& group.getRegistryList() != null && !group.getRegistryList().isEmpty()
&& shardingParam == null) {
for (int i = 0; i < group.getRegistryList().size(); i++) {
// 最後兩個參數,i是當前機器在執行器集羣當中的index,group.getRegistryList().size()爲執行器總數
processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());
}
}
...
複製代碼
JobThread.run
中,看到了以下代碼。// 分片廣播的參數比set進了ShardingUtil
ShardingUtil.setShardingVo(new ShardingUtil.ShardingVO(triggerParam.getBroadcastIndex(), triggerParam.getBroadcastTotal()));
...
// 將執行參數傳遞給jobHandler執行
handler.execute(triggerParamTmp.getExecutorParams())
複製代碼
ShardingUtil
,才發現了其中的奧祕,請看代碼。public class ShardingUtil {
// 線程上下文
private static InheritableThreadLocal<ShardingVO> contextHolder = new InheritableThreadLocal<ShardingVO>();
// 分片參數對象
public static class ShardingVO {
private int index; // sharding index
private int total; // sharding total
// 次數省略 get/set
}
// 參數對象注入上下文
public static void setShardingVo(ShardingVO shardingVo){
contextHolder.set(shardingVo);
}
// 從上下文中取出參數對象
public static ShardingVO getShardingVo(){
return contextHolder.get();
}
}
複製代碼
ShardingJobHandler
裏取出了線程上下文中的分片參數,這裏也給個代碼把~@JobHandler(value="shardingJobHandler")
@Service
public class ShardingJobHandler extends IJobHandler {
@Override
public ReturnT<String> execute(String param) throws Exception {
// 分片參數
ShardingUtil.ShardingVO shardingVO = ShardingUtil.getShardingVo();
XxlJobLogger.log("分片參數:當前分片序號 = {}, 總分片數 = {}", shardingVO.getIndex(), shardingVO.getTotal());
// 業務邏輯
for (int i = 0; i < shardingVO.getTotal(); i++) {
if (i == shardingVO.getIndex()) {
XxlJobLogger.log("第 {} 片, 命中分片開始處理", i);
} else {
XxlJobLogger.log("第 {} 片, 忽略", i);
}
}
return SUCCESS;
}
}
複製代碼
index
及total
來作的,簡單來說,就是給出了當前執行器的標識,根據這個標識將任務的數據或者邏輯進行區分,便可實現分佈式運行。execute
傳遞?一、多是由於只有分片任務纔用到這兩個參數 二、IJobHandler只有String類型參數
Quartz
調度的不足,筆者得繼續深刻了解。