【原創】大數據基礎之Azkaban(1)簡介、源代碼解析

Azkaban3.45git

一 簡介

1 官網

https://azkaban.github.io/github

Azkaban was implemented at LinkedIn to solve the problem of Hadoop job dependencies. We had jobs that needed to run in order, from ETL jobs to data analytics products.ajax

Initially a single server solution, with the increased number of Hadoop users over the years, Azkaban has evolved to be a more robust solution.sql

Azkaban是由LinkedIn爲了解決Hadoop環境下任務依賴問題而開發的,LinkedIn團隊有不少任務須要按照順序運行,包括ETL任務以及數據分析任務;數據庫

Azkaban一開始是單server方案,如今已經演化爲一個更健壯的方案;(惋惜當前版本的WebServer仍是單點)json

Azkaban consists of 3 key components:oop

  • Relational Database (MySQL)
  • AzkabanWebServer
  • AzkabanExecutorServer

Azkaban有3個核心組件:Mysql、WebServer、ExecutorServer;fetch

2 部署

 

3 數據庫表結構

 

projects:項目編碼

project_flows:工做流定義spa

execution_flows:工做流實例

execution_jobs:任務實例

triggers:調度定義

ps:表中不少數據都是編碼的,enc_type是編碼類型(對應的枚舉爲EncodingType),2是gzip編碼,其餘爲無編碼,2須要調用GZIPUtils.transformBytesToObject解析獲得原始字符串;

4 概念

l  Job:最小的執行單元,做爲DAG的一個結點,即任務

l  Flow:由多個Job組成,並經過dependent配置Job的依賴屬性,即工做流

l  Tirgger:根據指定Cron信息觸發Flow,即調度

二 代碼解析

1 啓動過程

Web Server

AzkabanWebServer.main

         launch

                  prepareAndStartServer

                          configureRoutes

                                   TriggerManager.start

                          FlowTriggerService.start

                                   recoverIncompleteTriggerInstances

                                            SELECT %s FROM execution_dependencies WHERE trigger_instance_id in (SELECT trigger_instance_id FROM execution_dependencies WHERE dep_status = %s or dep_status = %s or (dep_status = %s and flow_exec_id = %s))

                          FlowTriggerScheduler.start

 

ExecutorManager

         setupExecutors

         loadRunningFlows

QueueProcessorThread.run

ExecutingManagerUpdaterThread.run

Executor Server

AzkabanExecutorServer.main

         launch

                  AzkabanExecutorServer.start

                          insertExecutorEntryIntoDB

 

2 工做流執行過程

Web Server兩個入口:

ExecuteFlowAction.doAction

ExecutorServlet.ajaxExecuteFlow

 

Web Server分配任務:

ExecutorManager.submitExecutableFlow

         JdbcExecutorLoader.uploadExecutableFlow

                  INSERT INTO execution_flows (project_id, flow_id, version, status, submit_time, submit_user, update_time) values (?,?,?,?,?,?,?)

         ExecutorLoader.addActiveExecutableReference

                  INSERT INTO active_executing_flows (exec_id, update_time) values (?,?)

         queuedFlows.enqueue

 

QueueProcessorThread.run

         processQueuedFlows

                  ExecutorManager.selectExecutorAndDispatchFlow (get from queuedFlows)

                          selectExecutor

                          dispatch

                                   JdbcExecutorLoader.assignExecutor

                                            UPDATE execution_flows SET executor_id=? where exec_id=?

                                   ExecutorApiGateway.callWithExecutable (調用Executor Server)

 

Executor Server執行任務:

ExecutorServlet.doGet

         handleAjaxExecute

                  FlowRunnerManager.submitFlow

                          JdbcExecutorLoader.fetchExecutableFlow

                                 SELECT exec_id, enc_type, flow_data FROM execution_flows WHERE exec_id=?

                          FlowPreparer.setup

                          FlowRunner.run

                                   setupFlowExecution

                                   updateFlow

                                            UPDATE execution_flows SET status=?,update_time=?,start_time=?,end_time=?,enc_type=?,flow_data=? WHERE exec_id=?

                                   runFlow

                                            progressGraph

                                                     runReadyJob

                                                             runExecutableNode

                                                                      JobRunner.run

                                                                               uploadExecutableNode

                                                                                        INSERT INTO execution_jobs (exec_id, project_id, version, flow_id, job_id, start_time, end_time, status, input_params, attempt) VALUES (?,?,?,?,?,?,?,?,?,?)

                                                                               prepareJob

                                                                               runJob

                                                                                        Job.run (ProcessJob, JavaJob)

 

Web Server輪詢流程狀態:

ExecutingManagerUpdaterThread.run

         getFlowToExecutorMap

         ExecutorApiGateway.callWithExecutionId

         updateExecution

 

3 調度執行過程

TriggerManager.start

         loadTriggers

                  SELECT trigger_id, trigger_source, modify_time, enc_type, data FROM triggers

         TriggerScannerThread.start

                  checkAllTriggers

                          onTriggerTrigger

                                   TriggerAction.doAction

                                            ExecuteFlowAction.doAction

 

PS:還有另外一套徹底獨立的定時任務邏輯,經過azkaban.server.schedule.enable_quartz控制(默認false),如下爲register job到quartz:

ProjectManagerServlet.ajaxHandleUpload

         SELECT id, name, active, modified_time, create_time, version, last_modified_by, description, enc_type, settings_blob FROM projects WHERE name=? AND active=true

         ProjectManager.loadAllProjectFlows

                  SELECT project_id, version, flow_id, modified_time, encoding_type, json FROM project_flows WHERE project_id=? AND version=?

         FlowTriggerScheduler.scheduleAll

                  SELECT MAX(flow_version) FROM project_flow_files WHERE project_id=? AND project_version=? AND flow_name=?

                  SELECT flow_file FROM project_flow_files WHERE project_id=? AND project_version=? AND flow_name=? AND flow_version=?

                  registerJob

 

如下爲quartz job執行:

FlowTriggerQuartzJob.execute

         FlowTriggerService.startTrigger

                  TriggerInstanceProcessor.processSucceed

                          TriggerInstanceProcessor.executeFlowAndUpdateExecID

                                   ExecutorManager.submitExecutableFlow

 

4 任務執行過程

Job是任務的核心接口,全部具體任務都是該接口的子類:

Job

         AbstractJob

                  AbstractProcessJob

                          ProcessJob (Shell任務)

                                   JavaProcessJob (Java任務)

                                            JavaJob

相關文章
相關標籤/搜索