[源碼解析]Oozie前因後果以內部執行

[源碼解析]Oozie前因後果以內部執行

0x00 摘要

Oozie由Cloudera公司貢獻給Apache的基於工做流引擎的開源框架,是用於Hadoop平臺的開源的工做流調度引擎,用來管理Hadoop做業,進行。本文是系列的第二篇,介紹Oozie的內部執行階段。html

前文[源碼解析]Oozie的前因後果 --- (1)提交任務階段 已經爲你們展現了用戶提交一個Oozie Job以後作了什麼,本文將沿着一個Workflow的執行流程爲你們繼續剖析Oozie接下來作什麼。java

大體以下:shell

  • 在Oozie中準備Yarn Application Master
  • 介紹新舊兩版本的Yarn Application Master區別
  • 介紹Hive on Yarn
  • Tez是如何亂入到這個流程中的
  • Java on Yarn會是如何執行
  • Yarn Job結束以後如何返回Oozie

0x01 Oozie階段

1.1 ActionStartXCommand

咱們假設Workflow在start以後,就進入到了一個Hive命令。數據庫

ActionStartXCommand的主要做用就是和Yarn交互,最後提交一個Yarn Application Masterapache

ActionStartXCommand是 WorkflowXCommand的子類。重點函數仍是loadState和execute。api

public class ActionStartXCommand extends ActionXCommand<org.apache.oozie.command.wf.ActionXCommand.ActionExecutorContext> {
    private String jobId = null;
    protected String actionId = null;
    protected WorkflowJobBean wfJob = null;
    protected WorkflowActionBean wfAction = null;
    private JPAService jpaService = null;
    private ActionExecutor executor = null;
    private List<UpdateEntry> updateList = new ArrayList<UpdateEntry>();
    private List<JsonBean> insertList = new ArrayList<JsonBean>();
    protected ActionExecutorContext context = null;  
}

loadState 的做用就是從數據庫中獲取 WorkflowJobBean 和 WorkflowActionBean 信息session

protected void loadState() throws CommandException {
    try {
        jpaService = Services.get().get(JPAService.class);
        if (jpaService != null) {
            if (wfJob == null) {
                this.wfJob = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW, jobId);
            }
            this.wfAction = WorkflowActionQueryExecutor.getInstance().get(WorkflowActionQuery.GET_ACTION, actionId);
        }
    }
}

execute函數以下。其主要業務就是executor.start(context, wfAction); 這裏的executor是HiveActionExecutor。併發

@Override
protected ActionExecutorContext execute() throws CommandException {
    Configuration conf = wfJob.getWorkflowInstance().getConf();
    try {
        if(!caught) {
            // 這裏是業務重點,就是啓動任務
            executor.start(context, wfAction);
          
            if (wfAction.isExecutionComplete()) {
                if (!context.isExecuted()) {
                    failJob(context);
                } else {
                    wfAction.setPending();
                    if (!(executor instanceof ControlNodeActionExecutor)) {
                        queue(new ActionEndXCommand(wfAction.getId(), wfAction.getType()));
                    }
                    else {
                        execSynchronous = true;
                    }
                }
            }
            updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_START, wfAction));
        }
    }
    finally {
            BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null);
            ......
            if (execSynchronous) {
                // Changing to synchronous call from asynchronous queuing to prevent
                // undue delay from ::start:: to action due to queuing
                callActionEnd();
            }
        }
    }
    return null;
}

ActionExecutor.start是異步的,還須要檢查Action執行狀態來推動流程,oozie經過兩種方式來檢查任務是否完成。app

  • 回調:當一個任務和一個計算被啓動後,會爲任務提供一個回調url,該任務執行完成後,會執行回調來通知oozie框架

  • 輪詢:在任務執行回調失敗的狀況下,不管任何緣由,都支持以輪詢的方式進行查詢。

oozie提供這兩種方式來控制任務。後續咱們會再提到。

1.2 HiveActionExecutor

上面代碼中 executor.start(context, wfAction); 就是啓動任務。

HiveActionExecutor繼承 ScriptLanguageActionExecutor,ScriptLanguageActionExecutor繼承 JavaActionExecutor,因此後續不少函數執行的是JavaActionExecutor中的函數。

public class HiveActionExecutor extends ScriptLanguageActionExecutor {}

ActionExecutor.start就是執行的JavaActionExecutor.start()。

其會檢查文件系統,好比hdfs是否是支持,Action Dir是否ready,而後會submitLauncher。

public void start(Context context, WorkflowAction action) throws ActionExecutorException {
        FileSystem actionFs = context.getAppFileSystem();
        prepareActionDir(actionFs, context);
        submitLauncher(actionFs, context, action); // 這裏是業務
        check(context, action);
}

submitLauncher主要功能是:

  • 1)對於某些類型job,調用injectActionCallback配置回調Action
  • 2)配置 action job
  • 3)調用createLauncherConf配置LauncherAM, 即Application Master
    • 3.1)配置回調conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_URL, callback);
    • 3.2)設置"launcher Main Class"。LauncherHelper.setupMainClass(launcherJobConf, getLauncherMain(launcherJobConf, actionXml));
  • 4)調用HadoopAccessorService.createYarnClient來建立一個YarnClient
  • 5)調用UserGroupInformation繼續配置
  • 6)調用yarnClient.createApplication建立一個YarnClientApplication
  • 7)記錄ApplicationId
  • 8)調用createAppSubmissionContext創建Yarn App的執行環境
    • 8.1)appContext.setApplicationType("Oozie Launcher");
    • 8.2)設置容器信息 ContainerLaunchContext
    • 8.3)vargs.add(LauncherAM.class.getCanonicalName()); 好比設置AM啓動類
    • 8.4)return appContext;
  • 9)提交App,yarnClient.submitApplication(appContext); appContext就是前面return的。

具體代碼以下:

public void submitLauncher(final FileSystem actionFs, final Context context, final WorkflowAction action)throws ActionExecutorException {
    YarnClient yarnClient = null;
    try {
        // action job configuration
        Configuration actionConf = loadHadoopDefaultResources(context, actionXml);
        setupActionConf(actionConf, context, actionXml, appPathRoot);
        addAppNameContext(context, action);
        setLibFilesArchives(context, actionXml, appPathRoot, actionConf);
				// 配置回調Action
        injectActionCallback(context, actionConf);

        Configuration launcherConf = createLauncherConf(actionFs, context, action, actionXml, actionConf);
        yarnClient = createYarnClient(context, launcherConf);
      
        //繼續配置各類Credentials
        if (UserGroupInformation.isSecurityEnabled()) {
           ......
        }

        if (alreadyRunning && !isUserRetry) {
          ......
        }
        else {
            YarnClientApplication newApp = yarnClient.createApplication();
            ApplicationId appId = newApp.getNewApplicationResponse().getApplicationId();
            ApplicationSubmissionContext appContext =
                    createAppSubmissionContext(appId, launcherConf, context, actionConf, action, credentials, actionXml);
            // 這裏正式與 Yarn 交互。
            yarnClient.submitApplication(appContext);

            launcherId = appId.toString();
            ApplicationReport appReport = yarnClient.getApplicationReport(appId);
            consoleUrl = appReport.getTrackingUrl();
        }

        String jobTracker = launcherConf.get(HADOOP_YARN_RM);
        context.setStartData(launcherId, jobTracker, consoleUrl);
    }
}

protected YarnClient createYarnClient(Context context, Configuration jobConf) throws HadoopAccessorException {
        String user = context.getWorkflow().getUser();
        return Services.get().get(HadoopAccessorService.class).createYarnClient(user, jobConf);
}

0x2 舊版本LauncherMapper

這裏咱們有必要提一下舊版本的實現:LauncherMapper。

網上關於Oozie的文章不少都是基於舊版本,因此基本都提到了 LauncherMapper,好比:

Oozie本質就是一個做業協調工具(底層原理是經過將xml語言轉換成mapreduce程序來作,但只是在集中map端作處理,避免shuffle的過程)。

Oozie執行Action時,即ActionExecutor(最主要的子類是JavaActionExecutor,hive、spark等action都是這個類的子類),JavaActionExecutor首先會提交一個LauncherMapper(map任務)到yarn,其中會執行LauncherMain(具體的action是其子類,好比JavaMain、SparkMain等),spark任務會執行SparkMain,在SparkMain中會調用org.apache.spark.deploy.SparkSubmit來提交任務。其實訴個人map任務就是識別你是什麼樣的任務(hive,shell,spark等),並經過該任務來啓動任務所須要的環境來提交任務。提供了提交任務的接口(如hive任務,啓動hive客戶端或beeline等)

從文檔看,OOZIE-2918 Delete LauncherMapper and its test (asasvari via pbacsko) 這時候被移除了。

咱們從舊版本代碼中大體看看LauncherMapper的實現。

LauncherMapper繼承了 import org.apache.hadoop.mapred.Mapper;,實現了 map 函數。其內部就是調用用戶代碼的主函數。

import org.apache.hadoop.mapred.Mapper;

public class LauncherMapper<K1, V1, K2, V2> implements Mapper<K1, V1, K2, V2>, Runnable {
   @Override
    public void map(K1 key, V1 value, OutputCollector<K2, V2> collector, Reporter reporter) throws IOException {
        SecurityManager initialSecurityManager = System.getSecurityManager();
        try {
            else {
                String mainClass = getJobConf().get(CONF_OOZIE_ACTION_MAIN_CLASS);

                    new LauncherSecurityManager();
                    setupHeartBeater(reporter);
                    setupMainConfiguration();
                    // Propagating the conf to use by child job.
                    propagateToHadoopConf();

                    executePrepare();
                    Class klass = getJobConf().getClass(CONF_OOZIE_ACTION_MAIN_CLASS, Object.class);
                    Method mainMethod = klass.getMethod("main", String[].class);
                    mainMethod.invoke(null, (Object) args);
             }
        }
    }
}

在LauncherMapperHelper中,會設置LauncherMapper爲啓動函數。

public static void setupLauncherInfo(JobConf launcherConf, String jobId, String actionId, Path actionDir, String recoveryId, Configuration actionConf, String prepareXML) throws IOException, HadoopAccessorException {
        launcherConf.setMapperClass(LauncherMapper.class);
}

在 JavaActionExecutor 中有 org.apache.hadoop.mapred.JobClient

import org.apache.hadoop.mapred.JobClient;

public void submitLauncher(FileSystem actionFs, Context context, WorkflowAction action) throws ActionExecutorException {
            jobClient = createJobClient(context, launcherJobConf);
            LauncherMapperHelper.setupLauncherInfo(launcherJobConf, jobId, actionId, actionDir, recoveryId, actionConf, prepareXML);

            // Set the launcher Main Class
            LauncherMapperHelper.setupMainClass(launcherJobConf, getLauncherMain(launcherJobConf, actionXml)); 
            LauncherMapperHelper.setupMainArguments(launcherJobConf, args);
            ......
  
            runningJob = jobClient.submitJob(launcherJobConf);  // 這裏進行了提交
}

綜上所述,舊版本 LauncherMapper 實現了一個 import org.apache.hadoop.mapred.Mapper;,具體是org.apache.hadoop.mapred.JobClient 負責與hadoop交互

0x3 新版本Yarn Application Master

新版本的Oozie是和Yarn深度綁定的,因此咱們須要先介紹Yarn。

3. 1 YARN簡介

YARN 是 Hadoop 2.0 中的資源管理系統,它的基本設計思想是將 MRv1 中的 JobTracker拆分紅了兩個獨立的服務:一個全局的資源管理器 ResourceManager 和每一個應用程序特有的ApplicationMaster。 其中 ResourceManager 負責整個系統的資源管理和分配, 而 ApplicationMaster負責單個應用程序的管理。

YARN 整體上仍然是 Master/Slave 結構,在整個資源管理框架中,ResourceManager 爲Master,NodeManager 爲 Slave,ResourceManager 負責對各個 NodeManager 上的資源進行統一管理和調度。

當用戶提交一個應用程序時,須要提供一個用以跟蹤和管理這個程序的ApplicationMaster, 它負責向 ResourceManager 申請資源,並要求 NodeManager 啓動能夠佔用必定資源的任務。 因爲不一樣的ApplicationMaster 被分佈到不一樣的節點上,所以它們之間不會相互影響。

3.2 ApplicationMaster

用戶提交的每一個應用程序均包含一個 AM,主要功能包括:

  • 與 RM 調度器協商以獲取資源(用 Container 表示);
  • 將獲得的任務進一步分配給內部的任務;
  • 與 NM 通訊以啓動 / 中止任務;
  • 監控全部任務運行狀態,並在任務運行失敗時從新爲任務申請資源以重啓任務。

當用戶向 YARN 中提交一個應用程序後, YARN 將分兩個階段運行該應用程序 :

  • 第一個階段是啓動 ApplicationMaster ;
  • 第二個階段是由 ApplicationMaster 建立應用程序, 爲它申請資源, 並監控它的整個運行過程, 直到運行完成。

工做流程分爲如下幾個步驟:

  1. 用 戶 向 YARN 中 提 交 應 用 程 序, 其 中 包 括 ApplicationMaster 程 序、 啓 動ApplicationMaster 的命令、 用戶程序等。
  2. ResourceManager 爲 該 應 用程 序 分 配 第 一 個 Container, 並 與 對應 的 NodeManager 通訊,要求它在這個 Container 中啓動應用程序的 ApplicationMaster。
  3. ApplicationMaster 首 先 向 ResourceManager 注 冊, 這 樣 用 戶 可 以 直 接 通 過ResourceManage 查看應用程序的運行狀態, 而後它將爲各個任務申請資源, 並監控它的運行狀態, 直到運行結束, 即重複步驟 4~7。
  4. ApplicationMaster 採用輪詢的方式經過 RPC 協議向 ResourceManager 申請和領取資源。
  5. 一旦 ApplicationMaster 申請到資源後, 便與對應的 NodeManager 通訊, 要求它啓動任務。
  6. NodeManager 爲任務設置好運行環境(包括環境變量、 JAR 包、 二進制程序等) 後, 將任務啓動命令寫到一個腳本中, 並經過運行該腳本啓動任務。
  7. 各個任務經過某個 RPC 協議向 ApplicationMaster 彙報本身的狀態和進度, 以讓 ApplicationMaster 隨時掌握各個任務的運行狀態,從而能夠在任務失敗時從新啓動任務。在應用程序運行過程當中,用戶可隨時經過RPC向ApplicationMaster查詢應用程序的當前運行狀態。
  8. 應用程序運行完成後,ApplicationMaster 向 ResourceManager 註銷並關閉本身。

3.3 LauncherAM

LauncherAM就是Oozie的ApplicationMaster實現。LauncherAM.main就是Yarn調用之處。

public class LauncherAM {
  
    public static void main(String[] args) throws Exception {
        final LocalFsOperations localFsOperations = new LocalFsOperations();
        final Configuration launcherConf = readLauncherConfiguration(localFsOperations);
        UserGroupInformation.setConfiguration(launcherConf);
        // MRAppMaster adds this call as well, but it's included only in Hadoop 2.9+
        // SecurityUtil.setConfiguration(launcherConf);
        UserGroupInformation ugi = getUserGroupInformation(launcherConf);
        // Executing code inside a doAs with an ugi equipped with correct tokens.
        ugi.doAs(new PrivilegedExceptionAction<Object>() {
            @Override
            public Object run() throws Exception {
                  LauncherAM launcher = new LauncherAM(new AMRMClientAsyncFactory(),
                        new AMRMCallBackHandler(),
                        new HdfsOperations(new SequenceFileWriterFactory()),
                        new LocalFsOperations(),
                        new PrepareActionsHandler(new LauncherURIHandlerFactory(null)),
                        new LauncherAMCallbackNotifierFactory(),
                        new LauncherSecurityManager(),
                        sysenv.getenv(ApplicationConstants.Environment.CONTAINER_ID.name()),
                        launcherConf);
                    launcher.run();
                    return null;
            }
        });
    }  
}

launcher.run主要完成

經過registerWithRM調用AMRMClientAsync來註冊到Resource Manager

  • executePrepare / setupMainConfiguration 完成初始化,準備和配置
  • runActionMain會根據配置調用具體的main函數,好比HiveMain
    • Class<?> klass = launcherConf.getClass(CONF_OOZIE_ACTION_MAIN_CLASS, null);
    • Method mainMethod = klass.getMethod("main", String[].class);
    • mainMethod.invoke(null, (Object) mainArgs);
  • 調用uploadActionDataToHDFS同步HDFS
  • 調用unregisterWithRM從RM解綁
  • 調用LauncherAMCallbackNotifier.notifyURL通知Oozie

具體代碼以下:

public void run() throws Exception {
    try {
        actionDir = new Path(launcherConf.get(OOZIE_ACTION_DIR_PATH));
        registerWithRM(amrmCallBackHandler);
        // Run user code without the AM_RM_TOKEN so users can't request containers
        UserGroupInformation ugi = getUserGroupInformation(launcherConf, AMRMTokenIdentifier.KIND_NAME);

        ugi.doAs(new PrivilegedExceptionAction<Object>() {
            @Override
            public Object run() throws Exception {
                executePrepare(errorHolder);
                setupMainConfiguration();
                runActionMain(errorHolder); // 會根據配置調用具體的main函數,好比HiveMain
                return null;
            }
        });
    } 
    finally {
        try {
            actionData.put(ACTION_DATA_FINAL_STATUS, actionResult.toString());
            hdfsOperations.uploadActionDataToHDFS(launcherConf, actionDir, actionData);
        } finally {
            try {
                unregisterWithRM(actionResult, errorHolder.getErrorMessage());
            } finally {
                LauncherAMCallbackNotifier cn = callbackNotifierFactory.createCallbackNotifier(launcherConf);
                cn.notifyURL(actionResult);
            }
        }
    }
}

可是你會發現,對比以前所說的ApplicationMaster應該實現的功能,LauncherAM 作得恁少了點,這是個疑問! 咱們在後續研究中會爲你們揭開這個祕密。

0x4 Hive on Yarn

上文提到,runActionMain會根據配置調用具體的main函數。咱們假設是hive action,則對應的是HiveMain。

Hive job的入口函數是在HIVE_MAIN_CLASS_NAME配置的。

public class HiveActionExecutor extends ScriptLanguageActionExecutor {
    private static final String HIVE_MAIN_CLASS_NAME = "org.apache.oozie.action.hadoop.HiveMain";

	  @Override
    public List<Class<?>> getLauncherClasses() {
        List<Class<?>> classes = new ArrayList<Class<?>>();
        classes.add(Class.forName(HIVE_MAIN_CLASS_NAME)); // 這裏配置了 HiveMain
        return classes;
    }  
}

HiveMain後續調用以下

HiveMain.main ----> run ----> runHive ----> CliDriver.main(args);

最後調用 org.apache.hadoop.hive.cli.CliDriver 完成了hive操做,大體有:

  • 設定參數;
  • 若是有腳本,則設定腳本路徑;
  • 若是有以前的yarn child jobs,殺掉;
  • 執行hive;
  • 寫log;

具體以下:

public class HiveMain extends LauncherMain {
    public static void main(String[] args) throws Exception {
        run(HiveMain.class, args);
    }
  
   @Override
    protected void run(String[] args) throws Exception {
        Configuration hiveConf = setUpHiveSite();
        List<String> arguments = new ArrayList<String>();

        String logFile = setUpHiveLog4J(hiveConf);
        arguments.add("--hiveconf");
        arguments.add("hive.log4j.file=" + new File(HIVE_L4J_PROPS).getAbsolutePath());
        arguments.add("--hiveconf");
        arguments.add("hive.exec.log4j.file=" + new File(HIVE_EXEC_L4J_PROPS).getAbsolutePath());

        //setting oozie workflow id as caller context id for hive
        String callerId = "oozie:" + System.getProperty(LauncherAM.OOZIE_JOB_ID);
        arguments.add("--hiveconf");
        arguments.add("hive.log.trace.id=" + callerId);

        String scriptPath = hiveConf.get(HiveActionExecutor.HIVE_SCRIPT);
        String query = hiveConf.get(HiveActionExecutor.HIVE_QUERY);
        if (scriptPath != null) {
            ......
            // print out current directory & its contents
            File localDir = new File("dummy").getAbsoluteFile().getParentFile();
            String[] files = localDir.list();

            // Prepare the Hive Script
            String script = readStringFromFile(scriptPath);
            arguments.add("-f");
            arguments.add(scriptPath);
        } else if (query != null) {
            String filename = createScriptFile(query);
            arguments.add("-f");
            arguments.add(filename);
        } 

        // Pass any parameters to Hive via arguments
        ......
        String[] hiveArgs = ActionUtils.getStrings(hiveConf, HiveActionExecutor.HIVE_ARGS);
        for (String hiveArg : hiveArgs) {
            arguments.add(hiveArg);
        }
        LauncherMain.killChildYarnJobs(hiveConf);

        try {
            runHive(arguments.toArray(new String[arguments.size()]));
        }
        finally {
            writeExternalChildIDs(logFile, HIVE_JOB_IDS_PATTERNS, "Hive");
        }
    }  
}

所以咱們能看到,Oozie ApplicationMaster 在被Yarn調用以後,就是經過org.apache.hadoop.hive.cli.CliDriver 給Hive發送命令讓其執行,沒有什麼再和ResourceManager / NodeManager 交互的過程,這真的很奇怪。這個祕密要由下面的Tez來解答。

0x5 Tez計算框架

Tez是Apache開源的支持DAG做業的計算框架,它直接源於MapReduce框架,核心思想是將Map和Reduce兩個操做進一步拆分,即Map被拆分紅Input、Processor、Sort、Merge和Output, Reduce被拆分紅Input、Shuffle、Sort、Merge、Processor和Output等,這樣,這些分解後的元操做能夠任意靈活組合,產生新的操做,這些操做通過一些控制程序組裝後,可造成一個大的DAG做業。

Tez有如下特色:

  • Apache二級開源項目
  • 運行在YARN之上
  • 適用於DAG(有向圖)應用(同Impala、Dremel和Drill同樣,可用於替換Hive/Pig等)

能夠看到,Tez也是和Yarn深度綁定的。

5.1 DAGAppMaster

首先咱們就找到了Tez對應的Application Master,即Tez DAG Application Master

public class DAGAppMaster extends AbstractService {
  public String submitDAGToAppMaster(DAGPlan dagPlan,
      Map<String, LocalResource> additionalResources) throws TezException {
      startDAG(dagPlan, additionalResources);
    }
  }  
}

咱們能看到提交Application Master代碼。

public class TezYarnClient extends FrameworkClient {
  @Override
  public ApplicationId submitApplication(ApplicationSubmissionContext appSubmissionContext)
      throws YarnException, IOException, TezException {
   	ApplicationId appId= yarnClient.submitApplication(appSubmissionContext);
    ApplicationReport appReport = getApplicationReport(appId);
    return appId;
  }
}

這裏是創建Application Master context 代碼,設置了Application Maste類和Container。

public static ApplicationSubmissionContext createApplicationSubmissionContext(
      ApplicationId appId, DAG dag, String amName,
      AMConfiguration amConfig, Map<String, LocalResource> tezJarResources,
      Credentials sessionCreds, boolean tezLrsAsArchive,
      TezApiVersionInfo apiVersionInfo,
      ServicePluginsDescriptor servicePluginsDescriptor, JavaOptsChecker javaOptsChecker)
      throws IOException, YarnException {

    // Setup the command to run the AM
    List<String> vargs = new ArrayList<String>(8);
    vargs.add(Environment.JAVA_HOME.$() + "/bin/java");

    String amOpts = constructAMLaunchOpts(amConfig.getTezConfiguration(), capability);
    vargs.add(amOpts);

    // 這裏設置了 Application Master
    vargs.add(TezConstants.TEZ_APPLICATION_MASTER_CLASS);

    // 這裏設置了命令行參數 
    Vector<String> vargsFinal = new Vector<String>(8);
    // Final command
    StringBuilder mergedCommand = new StringBuilder();
    for (CharSequence str : vargs) {
      mergedCommand.append(str).append(" ");
    }
    vargsFinal.add(mergedCommand.toString());

    // 設置了container
    // Setup ContainerLaunchContext for AM container
    ContainerLaunchContext amContainer =
        ContainerLaunchContext.newInstance(amLocalResources, environment,
            vargsFinal, serviceData, securityTokens, acls);

    // Set up the ApplicationSubmissionContext
    ApplicationSubmissionContext appContext = Records
        .newRecord(ApplicationSubmissionContext.class);

    appContext.setAMContainerSpec(amContainer);

    return appContext;
}

5.2 與Resource Manager交互

這裏只摘要部分代碼,能看到Tez實現了與Yarn Resource Manager交互

YarnTaskSchedulerService實現了AMRMClientAsync.CallbackHandler,其功能是處理由Resource Manager收到的消息,其實現了方法

import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;

public class YarnTaskSchedulerService extends TaskScheduler
                             implements AMRMClientAsync.CallbackHandler {
  @Override
  public void onContainersAllocated(List<Container> containers) {
      if (!shouldReuseContainers) {
        List<Container> modifiableContainerList = Lists.newLinkedList(containers);
        assignedContainers = assignNewlyAllocatedContainers(
            modifiableContainerList);
      } 
    }
    // upcall to app must be outside locks
    informAppAboutAssignments(assignedContainers);
  }

  @Override
  public void onContainersCompleted(List<ContainerStatus> statuses) {
    synchronized (this) {
      for(ContainerStatus containerStatus : statuses) {
        ContainerId completedId = containerStatus.getContainerId();
        HeldContainer delayedContainer = heldContainers.get(completedId);

        Object task = releasedContainers.remove(completedId);
        appContainerStatus.put(task, containerStatus);
        continue;
       }

        // not found in released containers. check currently allocated containers
        // no need to release this container as the RM has already completed it
        task = unAssignContainer(completedId, false);
        if (delayedContainer != null) {
          heldContainers.remove(completedId);
          Resources.subtract(allocatedResources, delayedContainer.getContainer().getResource());
        } 
        if(task != null) {
          // completion of a container we have allocated currently
          // an allocated container completed. notify app. This will cause attempt to get killed
          appContainerStatus.put(task, containerStatus);
          continue;
        }
      }
    }

    // upcall to app must be outside locks
    for (Entry<Object, ContainerStatus> entry : appContainerStatus.entrySet()) {
      getContext().containerCompleted(entry.getKey(), entry.getValue());
    }
  }
}
  • onContainersAllocated : 當有新的Container 可使用。這裏時啓動container 的代碼。
  • onContainersCompleted 是Container 運行結束。 在onContainersCompleted 中,若是是失敗的Container,咱們須要從新申請並啓動Container,成功的將作記錄既能夠。

由此咱們能夠看到,Oozie是一個甩手掌櫃,他只管啓動Hive,具體後續如何與RM交互,則徹底由Tez搞定。這就解答了以前咱們全部疑惑

最後總結下新流程:

  1. Oozie提交LauncherAM到Yarn;
  2. LauncherAM運行HiveMain,其調用CliDriver.main給Hive提交任務;
  3. Hive on Tez,因此Tez準備DAGAppMaster;
  4. Yarn與Tez交互:Tez提交DAGAppMaster到Yarn,Tez解析運行Hive命令;
  5. Hive運行結束後,調用回調 url 通知Oozie;

原諒我用這種辦法畫圖,由於我最討厭看到一篇好文,結果發現圖沒了......

+---------+                       +----------+                       +-----------+
|         | 1-submit LauncherAM   |          | 2.CliDriver.main      |           |  
|         |---------------------->| HiveMain |---------------------> |           |
|         |                       |          |                       |           |--+
| [Oozie] |                       |  [Yarn]  |                       |   [Hive]  |  | 3.Run 
|         |                       |          |                       |           |  | Hive     
|         | 5-notifyURL of Oozie  |          | 4-submit DAGAppMaster |           |<-+
|         |<----------------------|          | <-------------------->|    Tez    |
|         |                       |          |                       |           |
+---------+                       +----------+                       +-----------+

0x6 Java on Yarn

下面咱們看看若是Oozie執行一個Java程序,是如何進行的。

Java程序的主執行函數是 JavaMain,這個就簡單多了,就是直接調用用戶的Java主函數。

public class JavaMain extends LauncherMain {
    public static final String JAVA_MAIN_CLASS = "oozie.action.java.main";

   /**
    * @param args Invoked from LauncherAM:run()
    * @throws Exception in case of error when running the application
    */
    public static void main(String[] args) throws Exception {
        run(JavaMain.class, args);
    }

    @Override
    protected void run(String[] args) throws Exception {

        Configuration actionConf = loadActionConf();
        setYarnTag(actionConf);
        setApplicationTags(actionConf, TEZ_APPLICATION_TAGS);
        setApplicationTags(actionConf, SPARK_YARN_TAGS);

        LauncherMain.killChildYarnJobs(actionConf);

        Class<?> klass = actionConf.getClass(JAVA_MAIN_CLASS, Object.class);
        Method mainMethod = klass.getMethod("main", String[].class);
        mainMethod.invoke(null, (Object) args);
    }
}

0x7 Yarn job 執行結束

7.1 檢查任務機制

前面提到,ActionExecutor.start是異步的,還須要檢查Action執行狀態來推動流程,oozie經過兩種方式來檢查任務是否完成。

  • 回調:當一個任務和一個計算被啓動後,會爲任務提供一個回調url,該任務執行完成後,會執行回調來通知oozie
  • 輪詢:在任務執行回調失敗的狀況下,不管任何緣由,都支持以輪詢的方式進行查詢。

oozie提供這兩種方式來控制任務。

7.2 回調機制

LauncherAM 在用戶程序執行完成以後,會作以下調用,以通知Oozie。這就用到了「回調」機制。

LauncherAMCallbackNotifier cn = callbackNotifierFactory.createCallbackNotifier(launcherConf);
                cn.notifyURL(actionResult);

Oozie的CallbackServlet會響應這個調用。能夠看到,DagEngine.processCallback是Oozie處理程序結束之處。

public class CallbackServlet extends JsonRestServlet {
    @Override
    protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
        String queryString = request.getQueryString();
        CallbackService callbackService = Services.get().get(CallbackService.class);

        String actionId = callbackService.getActionId(queryString);

        DagEngine dagEngine = Services.get().get(DagEngineService.class).getSystemDagEngine();

        dagEngine.processCallback(actionId, callbackService.getExternalStatus(queryString), null);
        }
    }
}

DagEngine.processCallback主要是使用CompletedActionXCommand來進行。能夠看到這個命令是放到 CallableQueueService 的 queue中,因此下面咱們須要介紹 CallableQueueService

public void processCallback(String actionId, String externalStatus, Properties actionData)
          throws DagEngineException {
      XCallable<Void> command = new CompletedActionXCommand(actionId, externalStatus,
      actionData, HIGH_PRIORITY);
      if (!Services.get().get(CallableQueueService.class).queue(command)) {
          LOG.warn(XLog.OPS, "queue is full or system is in SAFEMODE, ignoring callback");
      }
}

7.3 異步執行

7.3.1 CallableQueueService

Oozie 使用 CallableQueueService 來異步執行操做;

public class CallableQueueService implements Service, Instrumentable {
    private final Map<String, AtomicInteger> activeCallables = new HashMap<String, AtomicInteger>();
    private final Map<String, Date> uniqueCallables = new ConcurrentHashMap<String, Date>();
    private final ConcurrentHashMap<String, Set<XCallable<?>>> interruptCommandsMap = new ConcurrentHashMap<>();
    private Set<String> interruptTypes;
    private int interruptMapMaxSize;
    private int maxCallableConcurrency;
    private int queueAwaitTerminationTimeoutSeconds;
    private int queueSize;
    private PriorityDelayQueue<CallableWrapper<?>> queue;
    private ThreadPoolExecutor executor;
    private Instrumentation instrumentation;
    private boolean newImpl = false;
    private AsyncXCommandExecutor asyncXCommandExecutor; 
  
    public void init(Services services) {
          queue = new PollablePriorityDelayQueue<CallableWrapper<?>>(PRIORITIES,
                    MAX_CALLABLE_WAITTIME_MS,
                    TimeUnit.MILLISECONDS,
                    queueSize) {
                @Override
                protected boolean eligibleToPoll(QueueElement<?> element) {
                    if (element != null) {
                        CallableWrapper wrapper = (CallableWrapper) element;
                        if (element.getElement() != null) {
                            return callableReachMaxConcurrency(wrapper.getElement());
                        }
                    }
                    return false;
                }
            };  
    }
}

特色:

  • 加入執行隊列的任務多是能夠當即被吊起的,也多是將來某個時間才觸發的。
  • 執行線程池根據 任務的執行時間和任務的優先級別來選取任務吊起。
  • 執行線程池的任務隊列大小可配置,當到達隊列最大值,線程池將再也不接收任務。

7.3.3 PriorityDelayQueue

線程池選取的隊列是oozie自定義的隊列 PriorityDelayQueue:

特色:

根據隊列中元素的延時時間以及其執行優先級出隊列:

實現策略:

PriorityDelayQueue 中爲每一個優先級別的任務設置一個 延時隊列 DelayQueue
由於使用的是jdk自帶的延時隊列 DelayQueue,能夠保證的是若是任務在該隊列中的延時時間知足條件,咱們
經過poll()方法便可獲得知足延時條件的任務,若是 poll()獲得的是null,說明該隊列的中任務沒有知足時間條件的任務。

如何編排多個優先級的隊列:
每次從PriorityDelayQueue去選取任務,都優先從最高優先級的隊列來poll出任務,若是最高的優先級隊列中沒有知足條件的任務,則次優先級隊列poll出任務,若是仍未獲取
將按照隊列優先等級以此類推。
餓死現象:假如高優先級中的任務在每次獲取的時候都知足條件,這樣容易將低優先級的隊列中知足條件的任務活活餓死,爲了防止這種狀況的產生,在每次選取任務以前,遍歷
低優先級隊列任務,若是任務早已經知足出隊列條件,若是超時時間超過了咱們設定的最大值,咱們會爲這個任務提升優先級,將這個任務優先級加一,添加到上個優先級隊列中進行
排隊。

7.3.3 PollablePriorityDelayQueue

特色:

在從隊列中選取任務的時候,先判斷知足時間的任務是否知足併發等限制,若是知足再從隊列中取出,而不是像PriorityDelayQueue那樣,先取出若是不知足併發等限制,再將該任務從新放置回去。

任務類型:

使用線程池異步執行任務,任務和任務之間是無序的,針對具體的業務場景,可能執行的單元是須要串序執行的。oozie中封裝了 CompositeCallable 和 通常的 XCallable的任務類型,前者是XCallable的一個集合,它能保證的是這個集合裏面的XCallable是順序執行的。

7.4 跳轉下一個操做

CompletedActionXCommand 當Workflow command結束時候會執行,且只執行一次。對於程序結束,會在異步隊列中加入一個 ActionCheckXCommand。

public class CompletedActionXCommand extends WorkflowXCommand<Void> {
    @Override
    protected Void execute() throws CommandException {
        if (this.wfactionBean.getStatus() == WorkflowActionBean.Status.PREP) {
           .....
        } else {    // RUNNING
            ActionExecutor executor = Services.get().get(ActionService.class).getExecutor(this.wfactionBean.getType());
            // this is done because oozie notifications (of sub-wfs) is send
            // every status change, not only on completion.
            if (executor.isCompleted(externalStatus)) {
                queue(new ActionCheckXCommand(this.wfactionBean.getId(), getPriority(), -1));
            }
        }
        return null;
    }  
}

異步調用到ActionCheckXCommand,其主要做用是:

  • 若是有重試機制,則作相應配置
  • 調用 executor.check(context, wfAction); 來檢查環境信息
  • 更新數據庫中的任務信息
  • 由於已經結束了,因此用ActionEndXCommand來執行結束
public class ActionCheckXCommand extends ActionXCommand<Void> {
    @Override
    protected Void execute() throws CommandException {

        ActionExecutorContext context = null;
        boolean execSynchronous = false;
        try {
            boolean isRetry = false; // 若是有重試機制,則作相應配置
            if (wfAction.getRetries() > 0) {
                isRetry = true;
            }
            boolean isUserRetry = false;
            context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry, isUserRetry);
          
            executor.check(context, wfAction); // 檢查環境信息

            if (wfAction.isExecutionComplete()) {
                if (!context.isExecuted()) {
                    failJob(context);
                    generateEvent = true;
                } else {
                    wfAction.setPending();
                    execSynchronous = true;
                }
            }
            updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_CHECK, wfAction));
            updateList.add(new UpdateEntry<WorkflowJobQuery> (WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED,
                    wfJob));
        }
        finally {
                // 更新數據庫中的任務信息
                BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null);
                if (generateEvent && EventHandlerService.isEnabled()) {
                    generateEvent(wfAction, wfJob.getUser());
                }
                if (execSynchronous) {
                    // 用ActionEndXCommand來執行結束
                    new ActionEndXCommand(wfAction.getId(), wfAction.getType()).call();
                }
        }
        return null;
    }
}

調用到 JavaActionExecutor.check

  • 根據配置信息創建 yarnClient = createYarnClient(context, jobConf);
  • 獲取程序報告信息 ApplicationReport appReport = yarnClient.getApplicationReport(applicationId);
  • 獲取程序數據 Map<String, String> actionData = LauncherHelper.getActionData(actionFs, actionDir, jobConf);
  • 設置各類信息
@Override
public void check(Context context, WorkflowAction action) throws ActionExecutorException {
    boolean fallback = false;
    YarnClient yarnClient = null;
    try {
        Element actionXml = XmlUtils.parseXml(action.getConf());
        Configuration jobConf = createBaseHadoopConf(context, actionXml);
        FileSystem actionFs = context.getAppFileSystem();
        yarnClient = createYarnClient(context, jobConf); // 根據配置信息創建
        FinalApplicationStatus appStatus = null;
        try {
            final String effectiveApplicationId = findYarnApplicationId(context, action);
            final ApplicationId applicationId = ConverterUtils.toApplicationId(effectiveApplicationId);
            final ApplicationReport appReport = yarnClient.getApplicationReport(applicationId); // 獲取程序報告信息
            final YarnApplicationState appState = appReport.getYarnApplicationState();
            if (appState == YarnApplicationState.FAILED || appState == YarnApplicationState.FINISHED
                    || appState == YarnApplicationState.KILLED) {
                appStatus = appReport.getFinalApplicationStatus();
            }
        } 
        if (appStatus != null || fallback) {
            Path actionDir = context.getActionDir();
            // load sequence file into object
            Map<String, String> actionData = LauncherHelper.getActionData(actionFs, actionDir, jobConf);   // 獲取程序數據
            if (fallback) {
                String finalStatus = actionData.get(LauncherAM.ACTION_DATA_FINAL_STATUS);
                if (finalStatus != null) {
                    appStatus = FinalApplicationStatus.valueOf(finalStatus);
                } else {
                    context.setExecutionData(FAILED, null);
                }
            }

            String externalID = actionData.get(LauncherAM.ACTION_DATA_NEW_ID);  // MapReduce was launched
            if (externalID != null) {
                context.setExternalChildIDs(externalID);
             }

           // Multiple child IDs - Pig or Hive action
            String externalIDs = actionData.get(LauncherAM.ACTION_DATA_EXTERNAL_CHILD_IDS);
            if (externalIDs != null) {
                context.setExternalChildIDs(externalIDs);
             }

            // 設置各類信息
            context.setExecutionData(appStatus.toString(), null);
            if (appStatus == FinalApplicationStatus.SUCCEEDED) {
                if (getCaptureOutput(action) && LauncherHelper.hasOutputData(actionData)) {
                    context.setExecutionData(SUCCEEDED, PropertiesUtils.stringToProperties(actionData
                            .get(LauncherAM.ACTION_DATA_OUTPUT_PROPS)));
                }
                else {
                    context.setExecutionData(SUCCEEDED, null);
                }
                if (LauncherHelper.hasStatsData(actionData)) {
                    context.setExecutionStats(actionData.get(LauncherAM.ACTION_DATA_STATS));
                }
                getActionData(actionFs, action, context);
            }
            else {
                ......
                context.setExecutionData(FAILED_KILLED, null);
            }
        }
    }
    finally {
        if (yarnClient != null) {
            IOUtils.closeQuietly(yarnClient);
        }
    }
}

ActionEndXCommand會進行結束和跳轉:

  • 調用Executor來完成結束操做 executor.end(context, wfAction);
  • 更新數據庫的job信息 BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete
  • 用 SignalXCommand 來進行跳轉,進行下一個Action的執行
public class ActionEndXCommand extends ActionXCommand<Void> {
    @Override
    protected Void execute() throws CommandException {

        Configuration conf = wfJob.getWorkflowInstance().getConf();

        if (!(executor instanceof ControlNodeActionExecutor)) {
            maxRetries = conf.getInt(OozieClient.ACTION_MAX_RETRIES, executor.getMaxRetries());
            retryInterval = conf.getLong(OozieClient.ACTION_RETRY_INTERVAL, executor.getRetryInterval());
        }

        executor.setMaxRetries(maxRetries);
        executor.setRetryInterval(retryInterval);

        boolean isRetry = false;
        if (wfAction.getStatus() == WorkflowActionBean.Status.END_RETRY
                || wfAction.getStatus() == WorkflowActionBean.Status.END_MANUAL) {
            isRetry = true;
        }
        boolean isUserRetry = false;
        ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry, isUserRetry);
        try {
          
            executor.end(context, wfAction); // 調用Executor來完成結束操做

            if (!context.isEnded()) {
                failJob(context);
            } else {
                wfAction.setRetries(0);
                wfAction.setEndTime(new Date());

                boolean shouldHandleUserRetry = false;
                Status slaStatus = null;
                switch (wfAction.getStatus()) {
                    case OK:
                        slaStatus = Status.SUCCEEDED;
                        break;
                    ......
                }
                if (!shouldHandleUserRetry || !handleUserRetry(context, wfAction)) {
                    SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(), slaStatus,
                            SlaAppType.WORKFLOW_ACTION);
                    if(slaEvent != null) {
                        insertList.add(slaEvent);
                    }
                }
            }
            WorkflowInstance wfInstance = wfJob.getWorkflowInstance();
            DagELFunctions.setActionInfo(wfInstance, wfAction);
            wfJob.setWorkflowInstance(wfInstance);

            updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_END,wfAction));
            wfJob.setLastModifiedTime(new Date());
            updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED, wfJob));
        }
        finally {
            try { 
                // 更新數據庫的job信息
                BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null);
            }
            if (!(executor instanceof ControlNodeActionExecutor) && EventHandlerService.isEnabled()) {
                generateEvent(wfAction, wfJob.getUser());
            }
            new SignalXCommand(jobId, actionId).call(); // 進行跳轉,進行下一個Action的執行
        }
        return null;
    }  
}

0xFF 參考

大數據之Oozie——源碼分析(一)程序入口

什麼是Oozie——大數據任務調度框架

Oozie基礎小結

【原創】大數據基礎之Oozie(1)簡介、源代碼解析

【原創】大叔經驗分享(6)Oozie如何查看提交到Yarn上的任務日誌

Oozie和Azkaban的技術選型和對比

Oozie-TransitionXCommand

Oozie-Service-CallableQueueService

YARN基本框架分析

Oozie任務調度阻塞及內存優化方法

相關文章
相關標籤/搜索