Azkaban3.45git
https://azkaban.github.io/github
Azkaban was implemented at LinkedIn to solve the problem of Hadoop job dependencies. We had jobs that needed to run in order, from ETL jobs to data analytics products.ajax
Initially a single server solution, with the increased number of Hadoop users over the years, Azkaban has evolved to be a more robust solution.sql
Azkaban是由LinkedIn爲了解決Hadoop環境下任務依賴問題而開發的,LinkedIn團隊有不少任務須要按照順序運行,包括ETL任務以及數據分析任務;數據庫
Azkaban一開始是單server方案,如今已經演化爲一個更健壯的方案;(惋惜當前版本的WebServer仍是單點)json
Azkaban consists of 3 key components:oop
Azkaban有3個核心組件:Mysql、WebServer、ExecutorServer;fetch
projects:項目編碼
project_flows:工做流定義spa
execution_flows:工做流實例
execution_jobs:任務實例
triggers:調度定義
ps:表中不少數據都是編碼的,enc_type是編碼類型(對應的枚舉爲EncodingType),2是gzip編碼,其餘爲無編碼,2須要調用GZIPUtils.transformBytesToObject解析獲得原始字符串;
l Job:最小的執行單元,做爲DAG的一個結點,即任務
l Flow:由多個Job組成,並經過dependent配置Job的依賴屬性,即工做流
l Tirgger:根據指定Cron信息觸發Flow,即調度
AzkabanWebServer.main
launch
prepareAndStartServer
configureRoutes
TriggerManager.start
FlowTriggerService.start
recoverIncompleteTriggerInstances
SELECT %s FROM execution_dependencies WHERE trigger_instance_id in (SELECT trigger_instance_id FROM execution_dependencies WHERE dep_status = %s or dep_status = %s or (dep_status = %s and flow_exec_id = %s))
FlowTriggerScheduler.start
ExecutorManager
setupExecutors
loadRunningFlows
QueueProcessorThread.run
ExecutingManagerUpdaterThread.run
AzkabanExecutorServer.main
launch
AzkabanExecutorServer.start
insertExecutorEntryIntoDB
Web Server兩個入口:
ExecuteFlowAction.doAction
ExecutorServlet.ajaxExecuteFlow
Web Server分配任務:
ExecutorManager.submitExecutableFlow
JdbcExecutorLoader.uploadExecutableFlow
INSERT INTO execution_flows (project_id, flow_id, version, status, submit_time, submit_user, update_time) values (?,?,?,?,?,?,?)
ExecutorLoader.addActiveExecutableReference
INSERT INTO active_executing_flows (exec_id, update_time) values (?,?)
queuedFlows.enqueue
QueueProcessorThread.run
processQueuedFlows
ExecutorManager.selectExecutorAndDispatchFlow (get from queuedFlows)
selectExecutor
dispatch
JdbcExecutorLoader.assignExecutor
UPDATE execution_flows SET executor_id=? where exec_id=?
ExecutorApiGateway.callWithExecutable (調用Executor Server)
Executor Server執行任務:
ExecutorServlet.doGet
handleAjaxExecute
FlowRunnerManager.submitFlow
JdbcExecutorLoader.fetchExecutableFlow
SELECT exec_id, enc_type, flow_data FROM execution_flows WHERE exec_id=?
FlowPreparer.setup
FlowRunner.run
setupFlowExecution
updateFlow
UPDATE execution_flows SET status=?,update_time=?,start_time=?,end_time=?,enc_type=?,flow_data=? WHERE exec_id=?
runFlow
progressGraph
runReadyJob
runExecutableNode
JobRunner.run
uploadExecutableNode
INSERT INTO execution_jobs (exec_id, project_id, version, flow_id, job_id, start_time, end_time, status, input_params, attempt) VALUES (?,?,?,?,?,?,?,?,?,?)
prepareJob
runJob
Job.run (ProcessJob, JavaJob)
Web Server輪詢流程狀態:
ExecutingManagerUpdaterThread.run
getFlowToExecutorMap
ExecutorApiGateway.callWithExecutionId
updateExecution
TriggerManager.start
loadTriggers
SELECT trigger_id, trigger_source, modify_time, enc_type, data FROM triggers
TriggerScannerThread.start
checkAllTriggers
onTriggerTrigger
TriggerAction.doAction
ExecuteFlowAction.doAction
PS:還有另外一套徹底獨立的定時任務邏輯,經過azkaban.server.schedule.enable_quartz控制(默認false),如下爲register job到quartz:
ProjectManagerServlet.ajaxHandleUpload
SELECT id, name, active, modified_time, create_time, version, last_modified_by, description, enc_type, settings_blob FROM projects WHERE name=? AND active=true
ProjectManager.loadAllProjectFlows
SELECT project_id, version, flow_id, modified_time, encoding_type, json FROM project_flows WHERE project_id=? AND version=?
FlowTriggerScheduler.scheduleAll
SELECT MAX(flow_version) FROM project_flow_files WHERE project_id=? AND project_version=? AND flow_name=?
SELECT flow_file FROM project_flow_files WHERE project_id=? AND project_version=? AND flow_name=? AND flow_version=?
registerJob
如下爲quartz job執行:
FlowTriggerQuartzJob.execute
FlowTriggerService.startTrigger
TriggerInstanceProcessor.processSucceed
TriggerInstanceProcessor.executeFlowAndUpdateExecID
ExecutorManager.submitExecutableFlow
Job是任務的核心接口,全部具體任務都是該接口的子類:
Job
AbstractJob
AbstractProcessJob
ProcessJob (Shell任務)
JavaProcessJob (Java任務)
JavaJob