flink on yarn部分源碼解析

轉發請註明原創地址:https://www.cnblogs.com/dongxiao-yang/p/9403427.htmlhtml

 

flink任務的deploy形式有不少種選擇,常見的有standalone,on yarn , Meos , Kubernetes等方式,目前公司內部統一採用flink on yarn的 single job模式(每一個flink job 單獨在yarn上聲明一個flink集羣),本文分析的是flink1.5.1版本源碼使用legacy 模式提交yarn single job到yarn集羣的部分源碼。web

典型的flink提交single job命令格式以下: ./flink run  -m yarn-cluster -d  -yst -yqu flinkqu -yst  -yn 4 -ys 2 -c flinkdemoclass  flinkdemo.jar  args1 args2 ... apache

flink腳本的入口類爲org.apache.flink.client.cli.CliFrontendsession

在CliFrontend的main函數中首先經過loadCustomCommandLines方法加載了提交yarn任務初始化一個重要工具類app

org.apache.flink.yarn.cli.FlinkYarnSessionCli
    public static List<CustomCommandLine<?>> loadCustomCommandLines(Configuration configuration, String configurationDirectory) {
        List<CustomCommandLine<?>> customCommandLines = new ArrayList<>(2);

        //    Command line interface of the YARN session, with a special initialization here
        //    to prefix all options with y/yarn.
        //    Tips: DefaultCLI must be added at last, because getActiveCustomCommandLine(..) will get the
        //          active CustomCommandLine in order and DefaultCLI isActive always return true.
        final String flinkYarnSessionCLI = "org.apache.flink.yarn.cli.FlinkYarnSessionCli";
        try {
            customCommandLines.add(
                loadCustomCommandLine(flinkYarnSessionCLI,
                    configuration,
                    configurationDirectory,
                    "y",
                    "yarn"));
        } catch (NoClassDefFoundError | Exception e) {
            LOG.warn("Could not load CLI class {}.", flinkYarnSessionCLI, e);
        }

        if (configuration.getString(CoreOptions.MODE).equalsIgnoreCase(CoreOptions.NEW_MODE)) {
            customCommandLines.add(new DefaultCLI(configuration));
        } else {
            customCommandLines.add(new LegacyCLI(configuration));
        }

        return customCommandLines;
    }

 

根據啓動參數,CliFrontend開始運行方法run()->runProgram(),runProgram內部與yarn相關的一個重點方法爲ide

client = clusterDescriptor.deploySessionCluster(clusterSpecification);

上文中的clusterDescriptor就是前面的FlinkYarnSessionCli執行createClusterDescriptor()方法後產生的集羣屬性描述對象,在本模式中對應的具體類是org.apache.flink.yarn.LegacyYarnClusterDescriptor,父類爲AbstractYarnClusterDescriptor函數

deploySessionCluster內部進一步調用deployInternal來向yarn集羣提交一個flink集羣。工具

protected ClusterClient<ApplicationId> deployInternal(
            ClusterSpecification clusterSpecification,
            String applicationName,
            String yarnClusterEntrypoint,
            @Nullable JobGraph jobGraph,
            boolean detached) throws Exception {

        // ------------------ Check if configuration is valid --------------------
        validateClusterSpecification(clusterSpecification);

        if (UserGroupInformation.isSecurityEnabled()) {
            // note: UGI::hasKerberosCredentials inaccurately reports false
            // for logins based on a keytab (fixed in Hadoop 2.6.1, see HADOOP-10786),
            // so we check only in ticket cache scenario.
            boolean useTicketCache = flinkConfiguration.getBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE);

            UserGroupInformation loginUser = UserGroupInformation.getCurrentUser();
            if (loginUser.getAuthenticationMethod() == UserGroupInformation.AuthenticationMethod.KERBEROS
                && useTicketCache && !loginUser.hasKerberosCredentials()) {
                LOG.error("Hadoop security with Kerberos is enabled but the login user does not have Kerberos credentials");
                throw new RuntimeException("Hadoop security with Kerberos is enabled but the login user " +
                    "does not have Kerberos credentials");
            }
        }

        isReadyForDeployment(clusterSpecification);

        // ------------------ Check if the specified queue exists --------------------

        checkYarnQueues(yarnClient);

        // ------------------ Add dynamic properties to local flinkConfiguraton ------
        Map<String, String> dynProperties = getDynamicProperties(dynamicPropertiesEncoded);
        for (Map.Entry<String, String> dynProperty : dynProperties.entrySet()) {
            flinkConfiguration.setString(dynProperty.getKey(), dynProperty.getValue());
        }

        // ------------------ Check if the YARN ClusterClient has the requested resources --------------

        // Create application via yarnClient
        final YarnClientApplication yarnApplication = yarnClient.createApplication();
        final GetNewApplicationResponse appResponse = yarnApplication.getNewApplicationResponse();

        Resource maxRes = appResponse.getMaximumResourceCapability();

        final ClusterResourceDescription freeClusterMem;
        try {
            freeClusterMem = getCurrentFreeClusterResources(yarnClient);
        } catch (YarnException | IOException e) {
            failSessionDuringDeployment(yarnClient, yarnApplication);
            throw new YarnDeploymentException("Could not retrieve information about free cluster resources.", e);
        }

        final int yarnMinAllocationMB = yarnConfiguration.getInt(yarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 0);

        final ClusterSpecification validClusterSpecification;
        try {
            validClusterSpecification = validateClusterResources(
                clusterSpecification,
                yarnMinAllocationMB,
                maxRes,
                freeClusterMem);
        } catch (YarnDeploymentException yde) {
            failSessionDuringDeployment(yarnClient, yarnApplication);
            throw yde;
        }

        LOG.info("Cluster specification: {}", validClusterSpecification);

        final ClusterEntrypoint.ExecutionMode executionMode = detached ?
            ClusterEntrypoint.ExecutionMode.DETACHED
            : ClusterEntrypoint.ExecutionMode.NORMAL;

        flinkConfiguration.setString(ClusterEntrypoint.EXECUTION_MODE, executionMode.toString());

        ApplicationReport report = startAppMaster(
            flinkConfiguration,
            applicationName,
            yarnClusterEntrypoint,
            jobGraph,
            yarnClient,
            yarnApplication,
            clusterSpecification);

        String host = report.getHost();
        int port = report.getRpcPort();

        // Correctly initialize the Flink config
        flinkConfiguration.setString(JobManagerOptions.ADDRESS, host);
        flinkConfiguration.setInteger(JobManagerOptions.PORT, port);

        flinkConfiguration.setString(RestOptions.ADDRESS, host);
        flinkConfiguration.setInteger(RestOptions.PORT, port);

        // the Flink cluster is deployed in YARN. Represent cluster
        return createYarnClusterClient(
            this,
            clusterSpecification.getNumberTaskManagers(),
            clusterSpecification.getSlotsPerTaskManager(),
            report,
            flinkConfiguration,
            true);
    }

 deployInternal方法開頭對yarn集羣的可用內存,queue等進行檢查後申請了一個application,並調用startAppMaster聲明瞭AM的啓動類:YarnApplicationMasterRunneroop

 

public ApplicationReport startAppMaster(
            Configuration configuration,
            String applicationName,
            String yarnClusterEntrypoint,
            JobGraph jobGraph,
            YarnClient yarnClient,
            YarnClientApplication yarnApplication,
            ClusterSpecification clusterSpecification) throws Exception {
.....

        setApplicationTags(appContext);

        // add a hook to clean up in case deployment fails
        Thread deploymentFailureHook = new DeploymentFailureHook(yarnClient, yarnApplication, yarnFilesDir);
        Runtime.getRuntime().addShutdownHook(deploymentFailureHook);
        LOG.info("Submitting application master " + appId);
        yarnClient.submitApplication(appContext);

        LOG.info("Waiting for the cluster to be allocated");
        final long startTime = System.currentTimeMillis();
        ApplicationReport report;


}

 

 YarnApplicationMasterRunner會在yarn集羣上做爲appmaster與resourcemanager通訊申請對應的Taskmanagercontainer服務,啓動jobmanager服務和webui服務等ui

    protected int runApplicationMaster(Configuration config) {
......
......
    webMonitor = BootstrapTools.startWebMonitorIfConfigured(
                config,
                highAvailabilityServices,
                new AkkaJobManagerRetriever(actorSystem, webMonitorTimeout, 10, Time.milliseconds(50L)),
                new AkkaQueryServiceRetriever(actorSystem, webMonitorTimeout),
                webMonitorTimeout,
                new ScheduledExecutorServiceAdapter(futureExecutor),
                LOG);

            metricRegistry = new MetricRegistryImpl(
                MetricRegistryConfiguration.fromConfiguration(config));

            metricRegistry.startQueryService(actorSystem, null);

            // 2: the JobManager
            LOG.debug("Starting JobManager actor");

            // we start the JobManager with its standard name
            ActorRef jobManager = JobManager.startJobManagerActors(
                config,
                actorSystem,
                futureExecutor,
                ioExecutor,
                highAvailabilityServices,
                metricRegistry,
                webMonitor == null ? Option.empty() : Option.apply(webMonitor.getRestAddress()),
                new Some<>(JobMaster.JOB_MANAGER_NAME),
                Option.<String>empty(),
                getJobManagerClass(),
                getArchivistClass())._1();

            final String webMonitorURL = webMonitor == null ? null : webMonitor.getRestAddress();

            // 3: Flink's Yarn ResourceManager
            LOG.debug("Starting YARN Flink Resource Manager");

            Props resourceMasterProps = YarnFlinkResourceManager.createActorProps(
                getResourceManagerClass(),
                config,
                yarnConfig,
                highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
                appMasterHostname,
                webMonitorURL,
                taskManagerParameters,
                taskManagerContext,
                numInitialTaskManagers,
                LOG);

            ActorRef resourceMaster = actorSystem.actorOf(resourceMasterProps);

 

另外一方面,flink客戶端在提交完集羣后從runprogram()方法進入executeProgram();

    protected void executeProgram(PackagedProgram program, ClusterClient<?> client, int parallelism) throws ProgramMissingJobException, ProgramInvocationException {
        logAndSysout("Starting execution of program");

        final JobSubmissionResult result = client.run(program, parallelism);

        if (null == result) {
            throw new ProgramMissingJobException("No JobSubmissionResult returned, please make sure you called " +
                "ExecutionEnvironment.execute()");
        }

        if (result.isJobExecutionResult()) {
            logAndSysout("Program execution finished");
            JobExecutionResult execResult = result.getJobExecutionResult();
            System.out.println("Job with JobID " + execResult.getJobID() + " has finished.");
            System.out.println("Job Runtime: " + execResult.getNetRuntime() + " ms");
            Map<String, Object> accumulatorsResult = execResult.getAllAccumulatorResults();
            if (accumulatorsResult.size() > 0) {
                System.out.println("Accumulator Results: ");
                System.out.println(AccumulatorHelper.getResultsFormatted(accumulatorsResult));
            }
        } else {
            logAndSysout("Job has been submitted with JobID " + result.getJobID());
        }
    }

代碼從ClusterClient.run()->prog.invokeInteractiveModeForExecution()開始真正進入用戶flink job的main方法。

main方法中,代碼最後的env.execute() 會把生成job的執行plan並返回對應的DetachedEnvironment對象。

方法調用鏈路爲DetachedEnvironment.finalizeExecute()->ClusterClient.run()->YarnClusterClient.submitJob->ClusterClient.runDetached();

    /**
     * Submits a JobGraph detached.
     * @param jobGraph The JobGraph
     * @param classLoader User code class loader to deserialize the results and errors (may contain custom classes).
     * @return JobSubmissionResult
     * @throws ProgramInvocationException
     */
    public JobSubmissionResult runDetached(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {

        waitForClusterToBeReady();

        final ActorGateway jobManagerGateway;
        try {
            jobManagerGateway = getJobManagerGateway();
        } catch (Exception e) {
            throw new ProgramInvocationException("Failed to retrieve the JobManager gateway.", e);
        }

        try {
            logAndSysout("Submitting Job with JobID: " + jobGraph.getJobID() + ". Returning after job submission.");
            JobClient.submitJobDetached(
                new AkkaJobManagerGateway(jobManagerGateway),
                flinkConfig,
                jobGraph,
                Time.milliseconds(timeout.toMillis()),
                classLoader);
            return new JobSubmissionResult(jobGraph.getJobID());
        } catch (JobExecutionException e) {
            throw new ProgramInvocationException("The program execution failed: " + e.getMessage(), e);
        }
    }

 

 

    @Override
    public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
        if (isDetached()) {
            if (newlyCreatedCluster) {
                stopAfterJob(jobGraph.getJobID());
            }
            LOG.info("super.runDetached");
            return super.runDetached(jobGraph, classLoader);
        } else {
            LOG.info("super.run");
            return super.run(jobGraph, classLoader);
        }
    }

 

最後,客戶端鏈接到前文對應的jobmanager服務並把flink job grafaph提交給yarn上已經申請好的flink集羣。 

 

結論:flink on yarn的single job模式提交做業的邏輯爲flink客戶端首先申請一個yarn集羣的application,等待集羣成功部署後再聯繫jobmanager並把job提交到集羣上面。這個模式的優勢是每一個

flink job有一個獨立的集羣便於資源規劃和管理,缺點是通過驗證在am掛掉後yarn只能把原來的集羣重啓回來可是沒法恢復flink jobgraph的行爲,因此須要額外配置ha信息

相關文章
相關標籤/搜索