揪出XXL-JOB中的細節

前言

國慶節快到了,要開始休假了,筆者仍是很開心的,國慶快樂!java

廢話少說,直接進入正題。算法

相信你們對XXL-JOB都很瞭解,故本文對源碼不進行過多介紹,側重的是看源碼過程當中想到的幾個知識點,不必定都對,請大神們批評指正。服務器

XXL-JOB簡介

  • XXL-JOB是一個輕量級分佈式任務調度平臺,其核心設計目標是開發迅速、學習簡單、輕量級、易擴展。現已開放源代碼並接入多家公司線上產品線,開箱即用。
  • XXL-JOB分爲調度中心、執行器、數據中心,調度中心負責任務管理及調度、執行器管理、日誌管理等,執行器負責任務執行及執行結果回調。

任務調度 - 「類時間輪」的實現

時間輪

時間輪出自Netty中的HashedWheelTimer,是一個環形結構,能夠用時鐘來類比,鐘面上有不少bucket,每個bucket上能夠存放多個任務,使用一個List保存該時刻到期的全部任務,同時一個指針隨着時間流逝一格一格轉動,並執行對應bucket上全部到期的任務。任務經過取模決定應該放入哪一個bucket。和HashMap的原理相似,newTask對應put,使用List來解決 Hash 衝突。負載均衡

以上圖爲例,假設一個bucket是1秒,則指針轉動一輪表示的時間段爲8s,假設當前指針指向 0,此時須要調度一個3s後執行的任務,顯然應該加入到(0+3=3)的方格中,指針再走3s次就能夠執行了;若是任務要在10s後執行,應該等指針走完一輪零2格再執行,所以應放入2,同時將round(1)保存到任務中。檢查到期任務時只執行round爲0的,bucket上其餘任務的round減1。框架

固然,還有優化的「分層時間輪」的實現,請參考https://cnkirito.moe/timer/。dom

XXL-JOB中的「時間輪」

  • XXL-JOB中的調度方式從Quartz變成了自研調度的方式,很像時間輪,能夠理解爲有60個bucket且每一個bucket爲1秒,可是沒有了round的概念。async

  • 具體能夠看下圖。分佈式

  • XXL-JOB中負責任務調度的有兩個線程,分別爲ringThreadscheduleThread,其做用以下。

一、scheduleThread:對任務信息進行讀取,預讀將來5s即將觸發的任務,放入時間輪。 二、ringThread:對當前bucket和前一個bucket中的任務取出並執行。ide

  • 下面結合源代碼看下,爲何說是「類時間輪」,關鍵代碼附上了註解,請你們留意觀看。
// 環狀結構
private volatile static Map<Integer, List<Integer>> ringData = new ConcurrentHashMap<>();

// 任務下次啓動時間(單位爲秒) % 60
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);

// 任務放進時間輪
private void pushTimeRing(int ringSecond, int jobId){
        // push async ring
        List<Integer> ringItemData = ringData.get(ringSecond);
        if (ringItemData == null) {
            ringItemData = new ArrayList<Integer>();
            ringData.put(ringSecond, ringItemData);
        }
        ringItemData.add(jobId);
    }
複製代碼
// 同時取兩個時間刻度的任務
List<Integer> ringItemData = new ArrayList<>();
int nowSecond = Calendar.getInstance().get(Calendar.SECOND);  
// 避免處理耗時太長,跨過刻度,向前校驗一個刻度;
for (int i = 0; i < 2; i++) {
	List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
	if (tmpData != null) {
		ringItemData.addAll(tmpData);
	}
}
// 運行
for (int jobId: ringItemData) {
	JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null);
}
複製代碼

一致性Hash路由中的Hash算法

  • 你們也知道,XXL-JOB在執行任務時,任務具體在哪一個執行器上運行是根據路由策略來決定的,其中有一個策略是一致性Hash策略(源碼在ExecutorRouteConsistentHash.java),天然而然想到了一致性Hash算法
  • 一致性Hash算法是爲了解決分佈式系統中負載均衡的問題時候可使用Hash算法讓固定的一部分請求落到同一臺服務器上,這樣每臺服務器固定處理一部分請求(並維護這些請求的信息),起到負載均衡的做用。
  • 普通的餘數hash(hash(好比用戶id)%服務器機器數)算法伸縮性不好,當新增或者下線服務器機器時候,用戶id與服務器的映射關係會大量失效。一致性hash則利用hash環對其進行了改進。
  • 一致性Hash算法在實踐中,當服務器節點比較少的時候會出現上節所說的一致性hash傾斜的問題,一個解決方法是多加機器,可是加機器是有成本的,那麼就加虛擬節點
  • 具體原理請參考https://www.jianshu.com/p/e968c081f563。
  • 下圖爲帶有虛擬節點的Hash環,其中ip1-1是ip1的虛擬節點,ip2-1是ip2的虛擬節點,ip3-1是ip3的虛擬節點。

可見,一致性Hash算法的關鍵在於Hash算法,保證虛擬節點Hash結果的均勻性, 而均勻性能夠理解爲減小Hash衝突,Hash衝突的知識點請參考從HashMap,Redis 字典看【Hash】。。。函數

  • XXL-JOB中的一致性Hash的Hash函數以下。
// jobId轉換爲md5
// 不直接用hashCode() 是由於擴大hash取值範圍,減小衝突
byte[] digest = md5.digest();

// 32位hashCode
long hashCode = ((long) (digest[3] & 0xFF) << 24)
	| ((long) (digest[2] & 0xFF) << 16)
	| ((long) (digest[1] & 0xFF) << 8)
	| (digest[0] & 0xFF);

long truncateHashCode = hashCode & 0xffffffffL;
複製代碼
  • 看到上圖的Hash函數,讓我想到了HashMap的Hash函數
f(key) = hash(key) & (table.length - 1) 
// 使用>>> 16的緣由,hashCode()的高位和低位都對f(key)有了必定影響力,使得分佈更加均勻,散列衝突的概率就小了。
hash(key) = (h = key.hashCode()) ^ (h >>> 16)
複製代碼
  • 同理,將jobId的md5編碼的高低位都對Hash結果有影響,使得Hash衝突的機率減少。

分片任務的實現 - 維護線程上下文

  • XXL-JOB的分片任務實現了任務的分佈式執行,實際上是筆者調研的重點,平常開發中不少定時任務都是單機執行,對於後續數據量大的任務最好有一個分佈式的解決方案。

  • 分片任務的路由策略,源代碼做者提出了分片廣播的概念,剛開始還有點摸不清頭腦,看了源碼逐漸清晰了起來。

  • 想必看過源碼的也遇到過這麼一個小插曲,路由策略咋沒實現?以下圖所示。

public enum ExecutorRouteStrategyEnum {

    FIRST(I18nUtil.getString("jobconf_route_first"), new ExecutorRouteFirst()),
    LAST(I18nUtil.getString("jobconf_route_last"), new ExecutorRouteLast()),
    ROUND(I18nUtil.getString("jobconf_route_round"), new ExecutorRouteRound()),
    RANDOM(I18nUtil.getString("jobconf_route_random"), new ExecutorRouteRandom()),
    CONSISTENT_HASH(I18nUtil.getString("jobconf_route_consistenthash"), new ExecutorRouteConsistentHash()),
    LEAST_FREQUENTLY_USED(I18nUtil.getString("jobconf_route_lfu"), new ExecutorRouteLFU()),
    LEAST_RECENTLY_USED(I18nUtil.getString("jobconf_route_lru"), new ExecutorRouteLRU()),
    FAILOVER(I18nUtil.getString("jobconf_route_failover"), new ExecutorRouteFailover()),
    BUSYOVER(I18nUtil.getString("jobconf_route_busyover"), new ExecutorRouteBusyover()),
    // 說好的實現呢???居然是null
    SHARDING_BROADCAST(I18nUtil.getString("jobconf_route_shard"), null);
複製代碼
  • 再繼續追查獲得告終論,待我慢慢道來,首先分片任務執行參數傳遞的是什麼?看XxlJobTrigger.trigger函數中的一段代碼。
...
// 若是是分片路由,走的是這段邏輯
if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)
                && group.getRegistryList() != null && !group.getRegistryList().isEmpty()
                && shardingParam == null) {
            for (int i = 0; i < group.getRegistryList().size(); i++) {
	            // 最後兩個參數,i是當前機器在執行器集羣當中的index,group.getRegistryList().size()爲執行器總數
                processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());
            }
        } 
...
複製代碼
  • 參數通過自研RPC傳遞到執行器,在執行器中具體負責任務執行的JobThread.run中,看到了以下代碼。
// 分片廣播的參數比set進了ShardingUtil
ShardingUtil.setShardingVo(new ShardingUtil.ShardingVO(triggerParam.getBroadcastIndex(), triggerParam.getBroadcastTotal()));
...
// 將執行參數傳遞給jobHandler執行
handler.execute(triggerParamTmp.getExecutorParams())
複製代碼
  • 接着看ShardingUtil,才發現了其中的奧祕,請看代碼。
public class ShardingUtil {
	// 線程上下文
    private static InheritableThreadLocal<ShardingVO> contextHolder = new InheritableThreadLocal<ShardingVO>();
	// 分片參數對象
    public static class ShardingVO {

        private int index;  // sharding index
        private int total;  // sharding total
		// 次數省略 get/set
    }
	// 參數對象注入上下文
    public static void setShardingVo(ShardingVO shardingVo){
        contextHolder.set(shardingVo);
    }
	// 從上下文中取出參數對象
    public static ShardingVO getShardingVo(){
        return contextHolder.get();
    }

}
複製代碼
  • 顯而易見,在負責分片任務的ShardingJobHandler裏取出了線程上下文中的分片參數,這裏也給個代碼把~
@JobHandler(value="shardingJobHandler")
@Service
public class ShardingJobHandler extends IJobHandler {

	@Override
	public ReturnT<String> execute(String param) throws Exception {

		// 分片參數
		ShardingUtil.ShardingVO shardingVO = ShardingUtil.getShardingVo();
		XxlJobLogger.log("分片參數:當前分片序號 = {}, 總分片數 = {}", shardingVO.getIndex(), shardingVO.getTotal());

		// 業務邏輯
		for (int i = 0; i < shardingVO.getTotal(); i++) {
			if (i == shardingVO.getIndex()) {
				XxlJobLogger.log("第 {} 片, 命中分片開始處理", i);
			} else {
				XxlJobLogger.log("第 {} 片, 忽略", i);
			}
		}

		return SUCCESS;
	}

}
複製代碼
  • 由此得出,分佈式實現是根據分片參數indextotal來作的,簡單來說,就是給出了當前執行器的標識,根據這個標識將任務的數據或者邏輯進行區分,便可實現分佈式運行。
  • 題外話:至於爲何用外部注入分片參數的方式,不直接execute傳遞?

一、多是由於只有分片任務纔用到這兩個參數 二、IJobHandler只有String類型參數

看完源碼後的思考

  • 一、通過這次看源代碼,XXL-JOB的設計目標確實符合開發迅速、學習簡單、輕量級、易擴展
  • 二、至於自研RPC尚未具體考量,具體接入應該會考慮公司的RPC框架。
  • 三、做者給出的Quartz調度的不足,筆者得繼續深刻了解。
  • 四、框架中不少對宕機、故障、超時等異常情況的兼容值得學習。
  • 五、Rolling日誌以及日誌系統實現須要繼續瞭解。

參考文獻

相關文章
相關標籤/搜索