在對調度系統架構說明以前,咱們先來認識一下調度系統經常使用的名詞前端
DAG: 全稱Directed Acyclic Graph,簡稱DAG。工做流中的Task任務以有向無環圖的形式組裝起來,從入度爲零的節點進行拓撲遍歷,直到無後繼節點爲止。舉例以下圖: java
流程定義:經過拖拽任務節點並創建任務節點的關聯所造成的可視化DAGgit
流程實例:流程實例是流程定義的實例化,能夠經過手動啓動或定時調度生成github
任務實例:任務實例是流程定義中任務節點的實例化,標識着具體的任務執行狀態算法
任務類型: 目前支持有SHELL、SQL、SUB_PROCESS、PROCEDURE、MR、SPARK、PYTHON、DEPENDENT,同時計劃支持動態插件擴展,注意:其中子 SUB_PROCESS 也是一個單獨的流程定義,是能夠單獨啓動執行的數據庫
調度方式: 系統支持基於cron表達式的定時調度和手動調度。命令類型支持:啓動工做流、從當前節點開始執行、恢復被容錯的工做流、恢復暫停流程、從失敗節點開始執行、補數、調度、重跑、暫停、中止、恢復等待線程。其中 恢復被容錯的工做流 和 恢復等待線程 兩種命令類型是由調度內部控制使用,外部沒法調用json
定時調度:系統採用 quartz 分佈式調度器,並同時支持cron表達式可視化的生成api
依賴:系統不僅僅支持 DAG 簡單的前驅和後繼節點之間的依賴,同時還提供任務依賴節點,支持流程間的自定義任務依賴網絡
優先級 :支持流程實例和任務實例的優先級,若是流程實例和任務實例的優先級不設置,則默認是先進先出架構
郵件告警:支持 SQL任務 查詢結果郵件發送,流程實例運行結果郵件告警及容錯告警通知
失敗策略:對於並行運行的任務,若是有任務失敗,提供兩種失敗策略處理方式,繼續是指無論並行運行任務的狀態,直到流程失敗結束。結束是指一旦發現失敗任務,則同時Kill掉正在運行的並行任務,流程失敗結束
補數:補歷史數據,支持區間並行和串行兩種補數方式
MasterServer
MasterServer採用分佈式無中心設計理念,MasterServer主要負責 DAG 任務切分、任務提交監控,並同時監聽其它MasterServer和WorkerServer的健康狀態。 MasterServer服務啓動時向Zookeeper註冊臨時節點,經過監聽Zookeeper臨時節點變化來進行容錯處理。
Distributed Quartz分佈式調度組件,主要負責定時任務的啓停操做,當quartz調起任務後,Master內部會有線程池具體負責處理任務的後續操做
MasterSchedulerThread是一個掃描線程,定時掃描數據庫中的 command 表,根據不一樣的命令類型進行不一樣的業務操做
MasterExecThread主要是負責DAG任務切分、任務提交監控、各類不一樣命令類型的邏輯處理
MasterTaskExecThread主要負責任務的持久化
WorkerServer
WorkerServer也採用分佈式無中心設計理念,WorkerServer主要負責任務的執行和提供日誌服務。WorkerServer服務啓動時向Zookeeper註冊臨時節點,並維持心跳。
FetchTaskThread主要負責不斷從Task Queue中領取任務,並根據不一樣任務類型調用TaskScheduleThread對應執行器。
LoggerServer是一個RPC服務,提供日誌分片查看、刷新和下載等功能
ZooKeeper
ZooKeeper服務,系統中的MasterServer和WorkerServer節點都經過ZooKeeper來進行集羣管理和容錯。另外系統還基於ZooKeeper進行事件監聽和分佈式鎖。 咱們也曾經基於Redis實現過隊列,不過咱們但願EasyScheduler依賴到的組件儘可能地少,因此最後仍是去掉了Redis實現。
Task Queue
提供任務隊列的操做,目前隊列也是基於Zookeeper來實現。因爲隊列中存的信息較少,沒必要擔憂隊列裏數據過多的狀況,實際上咱們壓測過百萬級數據存隊列,對系統穩定性和性能沒影響。
Alert
提供告警相關接口,接口主要包括告警兩種類型的告警數據的存儲、查詢和通知功能。其中通知功能又有郵件通知和**SNMP(暫未實現)**兩種。
API
API接口層,主要負責處理前端UI層的請求。該服務統一提供RESTful api向外部提供請求服務。 接口包括工做流的建立、定義、查詢、修改、發佈、下線、手工啓動、中止、暫停、恢復、從該節點開始執行等等。
UI
系統的前端頁面,提供系統的各類可視化操做界面,詳見**系統使用手冊**部分。
中心化的設計理念比較簡單,分佈式集羣中的節點按照角色分工,大致上分爲兩種角色:
中心化思想設計存在的問題:
在去中心化設計裏,一般沒有Master/Slave的概念,全部的角色都是同樣的,地位是平等的,全球互聯網就是一個典型的去中心化的分佈式系統,聯網的任意節點設備down機,都只會影響很小範圍的功能。
去中心化設計的核心設計在於整個分佈式系統中不存在一個區別於其餘節點的」管理者」,所以不存在單點故障問題。但因爲不存在」 管理者」節點因此每一個節點都須要跟其餘節點通訊才獲得必需要的機器信息,而分佈式系統通訊的不可靠行,則大大增長了上述功能的實現難度。
實際上,真正去中心化的分佈式系統並很少見。反而動態中心化分佈式系統正在不斷涌出。在這種架構下,集羣中的管理者是被動態選擇出來的,而不是預置的,而且集羣在發生故障的時候,集羣的節點會自發的舉行"會議"來選舉新的"管理者"去主持工做。最典型的案例就是ZooKeeper及Go語言實現的Etcd。
EasyScheduler的去中心化是Master/Worker註冊到Zookeeper中,實現Master集羣和Worker集羣無中心,並使用Zookeeper分佈式鎖來選舉其中的一臺Master或Worker爲「管理者」來執行任務。
EasyScheduler使用ZooKeeper分佈式鎖來實現同一時刻只有一臺Master執行Scheduler,或者只有一臺Worker執行任務的提交。
對於啓動新Master來打破僵局,彷佛有點差強人意,因而咱們提出瞭如下三種方案來下降這種風險:
注意:Master Scheduler線程在獲取Command的時候是FIFO的方式執行的。
因而咱們選擇了第三種方式來解決線程不足的問題。
容錯分爲服務宕機容錯和任務重試,服務宕機容錯又分爲Master容錯和Worker容錯兩種狀況
服務容錯設計依賴於ZooKeeper的Watcher機制,實現原理如圖:
其中Master監控其餘Master和Worker的目錄,若是監聽到remove事件,則會根據具體的業務邏輯進行流程實例容錯或者任務實例容錯。
Master Scheduler線程一旦發現任務實例爲」 須要容錯」狀態,則接管任務並進行從新提交。
注意:因爲」 網絡抖動」可能會使得節點短期內失去和ZooKeeper的心跳,從而發生節點的remove事件。對於這種狀況,咱們使用最簡單的方式,那就是節點一旦和ZooKeeper發生超時鏈接,則直接將Master或Worker服務停掉。
這裏首先要區分任務失敗重試、流程失敗恢復、流程失敗重跑的概念:
接下來講正題,咱們將工做流中的任務節點分了兩種類型。
一種是業務節點,這種節點都對應一個實際的腳本或者處理語句,好比Shell節點,MR節點、Spark節點、依賴節點等。
還有一種是邏輯節點,這種節點不作實際的腳本或語句處理,只是整個流程流轉的邏輯處理,好比子流程節等。
每個業務節點均可以配置失敗重試的次數,當該任務節點失敗,會自動重試,直到成功或者超過配置的重試次數。邏輯節點不支持失敗重試。可是邏輯節點裏的任務支持重試。
若是工做流中有任務失敗達到最大重試次數,工做流就會失敗中止,失敗的工做流能夠手動進行重跑操做或者流程恢復操做
在早期調度設計中,若是沒有優先級設計,採用公平調度設計的話,會遇到先行提交的任務可能會和後繼提交的任務同時完成的狀況,而不能作到設置流程或者任務的優先級,所以咱們對此進行了從新設計,目前咱們設計以下:
具體實現是根據任務實例的json解析優先級,而後把流程實例優先級_流程實例id_任務優先級_任務id信息保存在ZooKeeper任務隊列中,當從任務隊列獲取的時候,經過字符串比較便可得出最須要優先執行的任務
- 任務的優先級也分爲5級,依次爲HIGHEST、HIGH、MEDIUM、LOW、LOWEST。以下圖
複製代碼
因爲Web(UI)和Worker不必定在同一臺機器上,因此查看日誌不能像查詢本地文件那樣。有兩種方案:
將日誌放到ES搜索引擎上
經過gRPC通訊獲取遠程日誌信息
介於考慮到儘量的EasyScheduler的輕量級性,因此選擇了gRPC實現遠程訪問日誌信息。
/** * task log appender */
public class TaskLogAppender extends FileAppender<ILoggingEvent {
...
@Override
protected void append(ILoggingEvent event) {
if (currentlyActiveFile == null){
currentlyActiveFile = getFile();
}
String activeFile = currentlyActiveFile;
// thread name: taskThreadName-processDefineId_processInstanceId_taskInstanceId
String threadName = event.getThreadName();
String[] threadNameArr = threadName.split("-");
// logId = processDefineId_processInstanceId_taskInstanceId
String logId = threadNameArr[1];
...
super.subAppend(event);
}
}
複製代碼
以/流程定義id/流程實例id/任務實例id.log的形式生成日誌
過濾匹配以TaskLogInfo開始的線程名稱:
TaskLogFilter實現以下:
/** * task log filter */
public class TaskLogFilter extends Filter<ILoggingEvent {
@Override
public FilterReply decide(ILoggingEvent event) {
if (event.getThreadName().startsWith("TaskLogInfo-")){
return FilterReply.ACCEPT;
}
return FilterReply.DENY;
}
}
複製代碼
本文從調度出發,初步介紹了大數據分佈式工做流調度系統--EasyScheduler的架構原理及實現思路。