Oozie4.3shell
Hadoop生態的工做流調度器apache
Oozie is a workflow scheduler system to manage Apache Hadoop jobs.app
Oozie Workflow jobs are Directed Acyclical Graphs (DAGs) of actions.異步
Oozie Coordinator jobs are recurrent Oozie Workflow jobs triggered by time (frequency) and data availability.tcp
Oozie is integrated with the rest of the Hadoop stack supporting several types of Hadoop jobs out of the box (such as Java map-reduce, Streaming map-reduce, Pig, Hive, Sqoop and Distcp) as well as system specific jobs (such as Java programs and shell scripts).分佈式
Oozie is a scalable, reliable and extensible system.oop
wf_jobs:工做流實例url
wf_actions:任務實例scala
coord_jobs:調度實例
coord_actions:調度任務實例
l Control Node:工做流的開始、結束以及決定Workflow的執行路徑的節點(start、end、kill、decision、fork/join)
l Action Node:工做流執行的計算任務,支持的類型包括(HDFS、MapReduce、Java、Shell、SSH、Pig、Hive、E-Mail、Sub-Workflow、Sqoop、Distcp),即任務
l Workflow:由Control Node以及一系列Action Node組成的工做流,即工做流
l Coordinator:根據指定Cron信息觸發workflow,即調度
l Bundle:按照組的方式批量管理Coordinator任務,實現集中的啓停
加載配置的全部service:
ServicesLoader.contextInitialized
Services.init
Services.loadServices (oozie.services, oozie.services.ext)
Service結構:
Service
org.apache.oozie.service.SchedulerService,
org.apache.oozie.service.InstrumentationService,
org.apache.oozie.service.MemoryLocksService,
org.apache.oozie.service.UUIDService,
org.apache.oozie.service.ELService,
org.apache.oozie.service.AuthorizationService,
org.apache.oozie.service.UserGroupInformationService,
org.apache.oozie.service.HadoopAccessorService,
org.apache.oozie.service.JobsConcurrencyService,
org.apache.oozie.service.URIHandlerService,
org.apache.oozie.service.DagXLogInfoService,
org.apache.oozie.service.SchemaService,
org.apache.oozie.service.LiteWorkflowAppService,
org.apache.oozie.service.JPAService,
org.apache.oozie.service.StoreService,
org.apache.oozie.service.SLAStoreService,
org.apache.oozie.service.DBLiteWorkflowStoreService,
org.apache.oozie.service.CallbackService,
org.apache.oozie.service.ActionService,
org.apache.oozie.service.ShareLibService,
org.apache.oozie.service.CallableQueueService,
org.apache.oozie.service.ActionCheckerService,
org.apache.oozie.service.RecoveryService,
org.apache.oozie.service.PurgeService,
org.apache.oozie.service.CoordinatorEngineService,
org.apache.oozie.service.BundleEngineService,
org.apache.oozie.service.DagEngineService,
org.apache.oozie.service.CoordMaterializeTriggerService,
org.apache.oozie.service.StatusTransitService,
org.apache.oozie.service.PauseTransitService,
org.apache.oozie.service.GroupsService,
org.apache.oozie.service.ProxyUserService,
org.apache.oozie.service.XLogStreamingService,
org.apache.oozie.service.JvmPauseMonitorService,
org.apache.oozie.service.SparkConfigurationService
BaseEngine
DAGEngine (負責workflow執行)
CoordinatorEngine (負責coordinator執行)
BundleEngine (負責bundle執行)
DAGEngine.submitJob| submitJobFromCoordinator (提交workflow)
SubmitXCommand.call
execute
LiteWorkflowAppService.parseDef (解析獲得WorkflowApp)
LiteWorkflowLib.parseDef
LiteWorkflowAppParser.validateAndParse
parse
WorkflowLib.createInstance (建立WorkflowInstance)
BatchQueryExecutor.executeBatchInsertUpdateDelete (保存WorkflowJobBean 到wf_jobs)
StartXCommand.call
SignalXCommand.call
execute
WorkflowInstance.start
LiteWorkflowInstance.start
signal
NodeHandler.enter
ActionNodeHandler.enter
start
LiteWorkflowStoreService.liteExecute (添加WorkflowActionBean到ACTIONS_TO_START)
WorkflowStoreService.getActionsToStart (從ACTIONS_TO_START取Action)
ActionStartXCommand.call
ActionExecutor.start
WorkflowNotificationXCommand.call
BatchQueryExecutor.executeBatchInsertUpdateDelete (保存WorkflowActionBean到wf_actions)
ActionExecutor.start是異步的,還須要檢查Action執行狀態來推動流程,有兩種狀況:
一種是Oozie Server正常運行:利用JobEndNotification
CallbackServlet.doGet
DagEngine.processCallback
CompletedActionXCommand.call
ActionCheckXCommand.call
ActionExecutor.check
ActionEndXCommand.call
SignalXCommand.call
一種是Oozie Server重啓:利用ActionCheckerService
ActionCheckerService.init
ActionCheckRunnable.run
runWFActionCheck (GET_RUNNING_ACTIONS, oozie.service.ActionCheckerService.action.check.delay=600)
ActionCheckXCommand.call
ActionExecutor.check
ActionEndXCommand.call
SignalXCommand.call
runCoordActionCheck
CoordinatorEngine.submitJob(提交coordinator)
CoordSubmitXCommand.call
submit
submitJob
storeToDB
CoordJobQueryExecutor.insert (保存CoordinatorJobBean到coord_jobs)
queueMaterializeTransitionXCommand
CoordMaterializeTransitionXCommand.call
execute
materialize
materializeActions
CoordCommandUtils.materializeOneInstance(建立CoordinatorActionBean)
storeToDB
performWrites
BatchQueryExecutor.executeBatchInsertUpdateDelete(保存CoordinatorActionBean到coord_actions)
CoordActionInputCheckXCommand.call
CoordActionReadyXCommand.call
CoordActionStartXCommand.call
DAGEngine.submitJobFromCoordinator
定時任務觸發Materialize:
CoordMaterializeTriggerService.init
CoordMaterializeTriggerRunnable.run
CoordMaterializeTriggerService.runCoordJobMatLookup
materializeCoordJobs (GET_COORD_JOBS_OLDER_FOR_MATERIALIZATION)
CoordMaterializeTransitionXCommand.call
有些內部任務只能啓動一個,單server環境Oozie中經過MemoryLocksService來保證,多server環境Oozie經過ZKLocksService來保證,要開啓ZK,須要開啓一些service:
org.apache.oozie.service.ZKLocksService,
org.apache.oozie.service.ZKXLogStreamingService,
org.apache.oozie.service.ZKJobsConcurrencyService,
org.apache.oozie.service.ZKUUIDService
同時須要配置oozie.zookeeper.connection.string
ActionExecutor是任務執行的核心抽象基類,全部的具體任務都是這個類的子類
ActionExecutor
JavaActionExecutor
SshActionExecutor
FsActionExecutor
SubWorkflowActionExecutor
其中JavaActionExecutor是最重要的一個子類,不少其餘的任務都是這個類的子類(好比HiveActionExecutor、SparkActionExecutor等)
JavaActionExecutor.start
prepareActionDir
submitLauncher
JobClient.getJob
injectLauncherCallback
ActionExecutor.Context.getCallbackUrl
job.end.notification.url
createLauncherConf
LauncherMapperHelper.setupLauncherInfo
JobClient.submitJob
check
JavaActionExecutor執行時會提交一個map任務到yarn,即LauncherMapper,
LauncherMapper.map
LauncherMain.main
LauncherMain是具體任務的執行類
LauncherMain
JavaMain
HiveMain
Hive2Main
SparkMain
ShellMain
SqoopMain