轉發請註明原創地址:https://www.cnblogs.com/dongxiao-yang/p/9403427.htmlhtml
flink任務的deploy形式有不少種選擇,常見的有standalone,on yarn , Meos , Kubernetes等方式,目前公司內部統一採用flink on yarn的 single job模式(每一個flink job 單獨在yarn上聲明一個flink集羣),本文分析的是flink1.5.1版本源碼使用legacy 模式提交yarn single job到yarn集羣的部分源碼。web
典型的flink提交single job命令格式以下: ./flink run -m yarn-cluster -d -yst -yqu flinkqu -yst -yn 4 -ys 2 -c flinkdemoclass flinkdemo.jar args1 args2 ... apache
flink腳本的入口類爲org.apache.flink.client.cli.CliFrontendsession
在CliFrontend的main函數中首先經過loadCustomCommandLines方法加載了提交yarn任務初始化一個重要工具類app
org.apache.flink.yarn.cli.FlinkYarnSessionCli
public static List<CustomCommandLine<?>> loadCustomCommandLines(Configuration configuration, String configurationDirectory) { List<CustomCommandLine<?>> customCommandLines = new ArrayList<>(2); // Command line interface of the YARN session, with a special initialization here // to prefix all options with y/yarn. // Tips: DefaultCLI must be added at last, because getActiveCustomCommandLine(..) will get the // active CustomCommandLine in order and DefaultCLI isActive always return true. final String flinkYarnSessionCLI = "org.apache.flink.yarn.cli.FlinkYarnSessionCli"; try { customCommandLines.add( loadCustomCommandLine(flinkYarnSessionCLI, configuration, configurationDirectory, "y", "yarn")); } catch (NoClassDefFoundError | Exception e) { LOG.warn("Could not load CLI class {}.", flinkYarnSessionCLI, e); } if (configuration.getString(CoreOptions.MODE).equalsIgnoreCase(CoreOptions.NEW_MODE)) { customCommandLines.add(new DefaultCLI(configuration)); } else { customCommandLines.add(new LegacyCLI(configuration)); } return customCommandLines; }
根據啓動參數,CliFrontend開始運行方法run()->runProgram(),runProgram內部與yarn相關的一個重點方法爲ide
client = clusterDescriptor.deploySessionCluster(clusterSpecification);
上文中的clusterDescriptor就是前面的FlinkYarnSessionCli執行createClusterDescriptor()方法後產生的集羣屬性描述對象,在本模式中對應的具體類是org.apache.flink.yarn.LegacyYarnClusterDescriptor,父類爲AbstractYarnClusterDescriptor函數
deploySessionCluster內部進一步調用deployInternal來向yarn集羣提交一個flink集羣。工具
protected ClusterClient<ApplicationId> deployInternal( ClusterSpecification clusterSpecification, String applicationName, String yarnClusterEntrypoint, @Nullable JobGraph jobGraph, boolean detached) throws Exception { // ------------------ Check if configuration is valid -------------------- validateClusterSpecification(clusterSpecification); if (UserGroupInformation.isSecurityEnabled()) { // note: UGI::hasKerberosCredentials inaccurately reports false // for logins based on a keytab (fixed in Hadoop 2.6.1, see HADOOP-10786), // so we check only in ticket cache scenario. boolean useTicketCache = flinkConfiguration.getBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE); UserGroupInformation loginUser = UserGroupInformation.getCurrentUser(); if (loginUser.getAuthenticationMethod() == UserGroupInformation.AuthenticationMethod.KERBEROS && useTicketCache && !loginUser.hasKerberosCredentials()) { LOG.error("Hadoop security with Kerberos is enabled but the login user does not have Kerberos credentials"); throw new RuntimeException("Hadoop security with Kerberos is enabled but the login user " + "does not have Kerberos credentials"); } } isReadyForDeployment(clusterSpecification); // ------------------ Check if the specified queue exists -------------------- checkYarnQueues(yarnClient); // ------------------ Add dynamic properties to local flinkConfiguraton ------ Map<String, String> dynProperties = getDynamicProperties(dynamicPropertiesEncoded); for (Map.Entry<String, String> dynProperty : dynProperties.entrySet()) { flinkConfiguration.setString(dynProperty.getKey(), dynProperty.getValue()); } // ------------------ Check if the YARN ClusterClient has the requested resources -------------- // Create application via yarnClient final YarnClientApplication yarnApplication = yarnClient.createApplication(); final GetNewApplicationResponse appResponse = yarnApplication.getNewApplicationResponse(); Resource maxRes = appResponse.getMaximumResourceCapability(); final ClusterResourceDescription freeClusterMem; try { freeClusterMem = getCurrentFreeClusterResources(yarnClient); } catch (YarnException | IOException e) { failSessionDuringDeployment(yarnClient, yarnApplication); throw new YarnDeploymentException("Could not retrieve information about free cluster resources.", e); } final int yarnMinAllocationMB = yarnConfiguration.getInt(yarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 0); final ClusterSpecification validClusterSpecification; try { validClusterSpecification = validateClusterResources( clusterSpecification, yarnMinAllocationMB, maxRes, freeClusterMem); } catch (YarnDeploymentException yde) { failSessionDuringDeployment(yarnClient, yarnApplication); throw yde; } LOG.info("Cluster specification: {}", validClusterSpecification); final ClusterEntrypoint.ExecutionMode executionMode = detached ? ClusterEntrypoint.ExecutionMode.DETACHED : ClusterEntrypoint.ExecutionMode.NORMAL; flinkConfiguration.setString(ClusterEntrypoint.EXECUTION_MODE, executionMode.toString()); ApplicationReport report = startAppMaster( flinkConfiguration, applicationName, yarnClusterEntrypoint, jobGraph, yarnClient, yarnApplication, clusterSpecification); String host = report.getHost(); int port = report.getRpcPort(); // Correctly initialize the Flink config flinkConfiguration.setString(JobManagerOptions.ADDRESS, host); flinkConfiguration.setInteger(JobManagerOptions.PORT, port); flinkConfiguration.setString(RestOptions.ADDRESS, host); flinkConfiguration.setInteger(RestOptions.PORT, port); // the Flink cluster is deployed in YARN. Represent cluster return createYarnClusterClient( this, clusterSpecification.getNumberTaskManagers(), clusterSpecification.getSlotsPerTaskManager(), report, flinkConfiguration, true); }
deployInternal方法開頭對yarn集羣的可用內存,queue等進行檢查後申請了一個application,並調用startAppMaster聲明瞭AM的啓動類:YarnApplicationMasterRunneroop
public ApplicationReport startAppMaster( Configuration configuration, String applicationName, String yarnClusterEntrypoint, JobGraph jobGraph, YarnClient yarnClient, YarnClientApplication yarnApplication, ClusterSpecification clusterSpecification) throws Exception { ..... setApplicationTags(appContext); // add a hook to clean up in case deployment fails Thread deploymentFailureHook = new DeploymentFailureHook(yarnClient, yarnApplication, yarnFilesDir); Runtime.getRuntime().addShutdownHook(deploymentFailureHook); LOG.info("Submitting application master " + appId); yarnClient.submitApplication(appContext); LOG.info("Waiting for the cluster to be allocated"); final long startTime = System.currentTimeMillis(); ApplicationReport report; }
YarnApplicationMasterRunner會在yarn集羣上做爲appmaster與resourcemanager通訊申請對應的Taskmanagercontainer服務,啓動jobmanager服務和webui服務等ui
protected int runApplicationMaster(Configuration config) { ...... ...... webMonitor = BootstrapTools.startWebMonitorIfConfigured( config, highAvailabilityServices, new AkkaJobManagerRetriever(actorSystem, webMonitorTimeout, 10, Time.milliseconds(50L)), new AkkaQueryServiceRetriever(actorSystem, webMonitorTimeout), webMonitorTimeout, new ScheduledExecutorServiceAdapter(futureExecutor), LOG); metricRegistry = new MetricRegistryImpl( MetricRegistryConfiguration.fromConfiguration(config)); metricRegistry.startQueryService(actorSystem, null); // 2: the JobManager LOG.debug("Starting JobManager actor"); // we start the JobManager with its standard name ActorRef jobManager = JobManager.startJobManagerActors( config, actorSystem, futureExecutor, ioExecutor, highAvailabilityServices, metricRegistry, webMonitor == null ? Option.empty() : Option.apply(webMonitor.getRestAddress()), new Some<>(JobMaster.JOB_MANAGER_NAME), Option.<String>empty(), getJobManagerClass(), getArchivistClass())._1(); final String webMonitorURL = webMonitor == null ? null : webMonitor.getRestAddress(); // 3: Flink's Yarn ResourceManager LOG.debug("Starting YARN Flink Resource Manager"); Props resourceMasterProps = YarnFlinkResourceManager.createActorProps( getResourceManagerClass(), config, yarnConfig, highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID), appMasterHostname, webMonitorURL, taskManagerParameters, taskManagerContext, numInitialTaskManagers, LOG); ActorRef resourceMaster = actorSystem.actorOf(resourceMasterProps);
另外一方面,flink客戶端在提交完集羣后從runprogram()方法進入executeProgram();
protected void executeProgram(PackagedProgram program, ClusterClient<?> client, int parallelism) throws ProgramMissingJobException, ProgramInvocationException { logAndSysout("Starting execution of program"); final JobSubmissionResult result = client.run(program, parallelism); if (null == result) { throw new ProgramMissingJobException("No JobSubmissionResult returned, please make sure you called " + "ExecutionEnvironment.execute()"); } if (result.isJobExecutionResult()) { logAndSysout("Program execution finished"); JobExecutionResult execResult = result.getJobExecutionResult(); System.out.println("Job with JobID " + execResult.getJobID() + " has finished."); System.out.println("Job Runtime: " + execResult.getNetRuntime() + " ms"); Map<String, Object> accumulatorsResult = execResult.getAllAccumulatorResults(); if (accumulatorsResult.size() > 0) { System.out.println("Accumulator Results: "); System.out.println(AccumulatorHelper.getResultsFormatted(accumulatorsResult)); } } else { logAndSysout("Job has been submitted with JobID " + result.getJobID()); } }
代碼從ClusterClient.run()->prog.invokeInteractiveModeForExecution()開始真正進入用戶flink job的main方法。
main方法中,代碼最後的env.execute() 會把生成job的執行plan並返回對應的DetachedEnvironment對象。
方法調用鏈路爲DetachedEnvironment.finalizeExecute()->ClusterClient.run()->YarnClusterClient.submitJob->ClusterClient.runDetached();
/** * Submits a JobGraph detached. * @param jobGraph The JobGraph * @param classLoader User code class loader to deserialize the results and errors (may contain custom classes). * @return JobSubmissionResult * @throws ProgramInvocationException */ public JobSubmissionResult runDetached(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException { waitForClusterToBeReady(); final ActorGateway jobManagerGateway; try { jobManagerGateway = getJobManagerGateway(); } catch (Exception e) { throw new ProgramInvocationException("Failed to retrieve the JobManager gateway.", e); } try { logAndSysout("Submitting Job with JobID: " + jobGraph.getJobID() + ". Returning after job submission."); JobClient.submitJobDetached( new AkkaJobManagerGateway(jobManagerGateway), flinkConfig, jobGraph, Time.milliseconds(timeout.toMillis()), classLoader); return new JobSubmissionResult(jobGraph.getJobID()); } catch (JobExecutionException e) { throw new ProgramInvocationException("The program execution failed: " + e.getMessage(), e); } }
@Override public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException { if (isDetached()) { if (newlyCreatedCluster) { stopAfterJob(jobGraph.getJobID()); } LOG.info("super.runDetached"); return super.runDetached(jobGraph, classLoader); } else { LOG.info("super.run"); return super.run(jobGraph, classLoader); } }
最後,客戶端鏈接到前文對應的jobmanager服務並把flink job grafaph提交給yarn上已經申請好的flink集羣。
結論:flink on yarn的single job模式提交做業的邏輯爲flink客戶端首先申請一個yarn集羣的application,等待集羣成功部署後再聯繫jobmanager並把job提交到集羣上面。這個模式的優勢是每一個
flink job有一個獨立的集羣便於資源規劃和管理,缺點是通過驗證在am掛掉後yarn只能把原來的集羣重啓回來可是沒法恢復flink jobgraph的行爲,因此須要額外配置ha信息。