【原創】大數據基礎之Oozie(1)簡介、源代碼解析

Oozie4.3shell

一 簡介

 

 

1 官網

http://oozie.apache.org/數據庫

Apache Oozie Workflow Scheduler for Hadoop

Hadoop生態的工做流調度器apache

Overview

Oozie is a workflow scheduler system to manage Apache Hadoop jobs.app

Oozie Workflow jobs are Directed Acyclical Graphs (DAGs) of actions.異步

Oozie Coordinator jobs are recurrent Oozie Workflow jobs triggered by time (frequency) and data availability.tcp

Oozie is integrated with the rest of the Hadoop stack supporting several types of Hadoop jobs out of the box (such as Java map-reduce, Streaming map-reduce, Pig, Hive, Sqoop and Distcp) as well as system specific jobs (such as Java programs and shell scripts).分佈式

Oozie is a scalable, reliable and extensible system.oop

 

2 部署

 

3 數據庫表結構

 

 

wf_jobs:工做流實例url

wf_actions:任務實例scala

coord_jobs:調度實例

coord_actions:調度任務實例

4 概念

l  Control Node:工做流的開始、結束以及決定Workflow的執行路徑的節點(start、end、kill、decision、fork/join)

l  Action Node:工做流執行的計算任務,支持的類型包括(HDFS、MapReduce、Java、Shell、SSH、Pig、Hive、E-Mail、Sub-Workflow、Sqoop、Distcp),即任務

l  Workflow:由Control Node以及一系列Action Node組成的工做流,即工做流

l  Coordinator:根據指定Cron信息觸發workflow,即調度

l  Bundle:按照組的方式批量管理Coordinator任務,實現集中的啓停

二 代碼解析

1 啓動過程

加載配置的全部service:

ServicesLoader.contextInitialized

         Services.init

                  Services.loadServices (oozie.services, oozie.services.ext)

 

Service結構:

Service

         org.apache.oozie.service.SchedulerService,

         org.apache.oozie.service.InstrumentationService,

         org.apache.oozie.service.MemoryLocksService,

         org.apache.oozie.service.UUIDService,

         org.apache.oozie.service.ELService,

         org.apache.oozie.service.AuthorizationService,

         org.apache.oozie.service.UserGroupInformationService,

         org.apache.oozie.service.HadoopAccessorService,

         org.apache.oozie.service.JobsConcurrencyService,

         org.apache.oozie.service.URIHandlerService,

         org.apache.oozie.service.DagXLogInfoService,

         org.apache.oozie.service.SchemaService,

         org.apache.oozie.service.LiteWorkflowAppService,

         org.apache.oozie.service.JPAService,

         org.apache.oozie.service.StoreService,

         org.apache.oozie.service.SLAStoreService,

         org.apache.oozie.service.DBLiteWorkflowStoreService,

         org.apache.oozie.service.CallbackService,

         org.apache.oozie.service.ActionService,

         org.apache.oozie.service.ShareLibService,

         org.apache.oozie.service.CallableQueueService,

         org.apache.oozie.service.ActionCheckerService,

         org.apache.oozie.service.RecoveryService,

         org.apache.oozie.service.PurgeService,

         org.apache.oozie.service.CoordinatorEngineService,

         org.apache.oozie.service.BundleEngineService,

         org.apache.oozie.service.DagEngineService,

         org.apache.oozie.service.CoordMaterializeTriggerService,

         org.apache.oozie.service.StatusTransitService,

         org.apache.oozie.service.PauseTransitService,

         org.apache.oozie.service.GroupsService,

         org.apache.oozie.service.ProxyUserService,

         org.apache.oozie.service.XLogStreamingService,

         org.apache.oozie.service.JvmPauseMonitorService,

         org.apache.oozie.service.SparkConfigurationService

 

2 核心引擎

BaseEngine

         DAGEngine (負責workflow執行

         CoordinatorEngine 負責coordinator執行

         BundleEngine 負責bundle執行

 

3 workflow提交執行過程

DAGEngine.submitJob| submitJobFromCoordinator (提交workflow)

         SubmitXCommand.call

                  execute

                          LiteWorkflowAppService.parseDef (解析獲得WorkflowApp)

                                   LiteWorkflowLib.parseDef

                                            LiteWorkflowAppParser.validateAndParse

                                                     parse

                          WorkflowLib.createInstance (建立WorkflowInstance)

                          BatchQueryExecutor.executeBatchInsertUpdateDelete (保存WorkflowJobBean 到wf_jobs)

         StartXCommand.call

                  SignalXCommand.call

                          execute

                                   WorkflowInstance.start

                                            LiteWorkflowInstance.start

                                                     signal

                                                             NodeHandler.enter

                                                                      ActionNodeHandler.enter

                                                                               start

                                                                                        LiteWorkflowStoreService.liteExecute (添加WorkflowActionBean到ACTIONS_TO_START)

                                   WorkflowStoreService.getActionsToStart (從ACTIONS_TO_START取Action)

                                            ActionStartXCommand.call

                                                     ActionExecutor.start

                                                     WorkflowNotificationXCommand.call

                                            BatchQueryExecutor.executeBatchInsertUpdateDelete (保存WorkflowActionBean到wf_actions)

 

ActionExecutor.start是異步的,還須要檢查Action執行狀態來推動流程,有兩種狀況:

一種是Oozie Server正常運行:利用JobEndNotification

CallbackServlet.doGet

         DagEngine.processCallback

                  CompletedActionXCommand.call

                          ActionCheckXCommand.call

                                   ActionExecutor.check

ActionEndXCommand.call

                                            SignalXCommand.call

一種是Oozie Server重啓:利用ActionCheckerService

ActionCheckerService.init

         ActionCheckRunnable.run

                  runWFActionCheck (GET_RUNNING_ACTIONS, oozie.service.ActionCheckerService.action.check.delay=600)

                          ActionCheckXCommand.call

                                    ActionExecutor.check

                                   ActionEndXCommand.call

                                            SignalXCommand.call

                  runCoordActionCheck

 

4 coordinator提交執行過程

CoordinatorEngine.submitJob(提交coordinator)

         CoordSubmitXCommand.call

                  submit

                          submitJob

                                   storeToDB

                                            CoordJobQueryExecutor.insert (保存CoordinatorJobBean到coord_jobs)

                                   queueMaterializeTransitionXCommand

                                            CoordMaterializeTransitionXCommand.call

                                                     execute

                                                              materialize

                                                                      materializeActions

                                                                               CoordCommandUtils.materializeOneInstance(建立CoordinatorActionBean)

                                                                                storeToDB

                                                             performWrites

                                                                      BatchQueryExecutor.executeBatchInsertUpdateDelete(保存CoordinatorActionBean到coord_actions)

                                                                      CoordActionInputCheckXCommand.call

                                                                               CoordActionReadyXCommand.call

                                                                                        CoordActionStartXCommand.call

                                                                                                DAGEngine.submitJobFromCoordinator

定時任務觸發Materialize:

CoordMaterializeTriggerService.init

         CoordMaterializeTriggerRunnable.run

                  CoordMaterializeTriggerService.runCoordJobMatLookup

                          materializeCoordJobs (GET_COORD_JOBS_OLDER_FOR_MATERIALIZATION)

                                   CoordMaterializeTransitionXCommand.call

 

5 分佈式

有些內部任務只能啓動一個,單server環境Oozie中經過MemoryLocksService來保證,多server環境Oozie經過ZKLocksService來保證,要開啓ZK,須要開啓一些service:

org.apache.oozie.service.ZKLocksService,

org.apache.oozie.service.ZKXLogStreamingService,

org.apache.oozie.service.ZKJobsConcurrencyService,

org.apache.oozie.service.ZKUUIDService

同時須要配置oozie.zookeeper.connection.string

6 任務執行過程

ActionExecutor是任務執行的核心抽象基類,全部的具體任務都是這個類的子類

ActionExecutor

         JavaActionExecutor

         SshActionExecutor

         FsActionExecutor

         SubWorkflowActionExecutor

 

其中JavaActionExecutor是最重要的一個子類,不少其餘的任務都是這個類的子類(好比HiveActionExecutor、SparkActionExecutor等)

JavaActionExecutor.start

         prepareActionDir

         submitLauncher

                  JobClient.getJob

                  injectLauncherCallback

                          ActionExecutor.Context.getCallbackUrl

                                   job.end.notification.url

                  createLauncherConf

                          LauncherMapperHelper.setupLauncherInfo

                  JobClient.submitJob

         check

 

JavaActionExecutor執行時會提交一個map任務到yarn,即LauncherMapper,

LauncherMapper.map

         LauncherMain.main

 

LauncherMain是具體任務的執行類

LauncherMain

         JavaMain

         HiveMain

         Hive2Main

         SparkMain

         ShellMain

         SqoopMain

相關文章
相關標籤/搜索