我的感受學習Flink其實最不該該錯過的博文是Flink社區的博文系列,裏面的文章是不會讓人失望的。強烈安利:https://ververica.cn/developers-resources/。 java
本文是本身第一次嘗試寫源碼閱讀的文章,會努力將原理和源碼實現流程結合起來。文中有幾個點目前也是沒有弄清楚,如果寫在一篇博客裏,時間跨度太大,但又怕後期遺忘,因此先記下來,後期進一步閱讀源碼後再添上,如果看到不完整版博文的看官,對不住!web
文中如果寫的不許確的地方歡迎留言指出。apache
源碼系列基於Flink 1.9安全
Flink on Yarn模式下提交任務總體流程圖以下(圖源自Flink社區,連接見Ref [1])session
圖1 Flink Runtime層架構圖架構
Flink採起的是經典的master-salve模式,圖中的AM(ApplicationMater)爲master,TaskManager是salve。app
AM中的Dispatcher用於接收client提交的任務和啓動相應的JobManager ;JobManager用於任務的接收,task的分配、管理task manager等;ResourceManager主要用於資源的申請和分配。分佈式
這裏有點須要注意:Flink自己也是具備ResourceManager和TaskManager的,這裏雖然是on Yarn模式,但Flink自己也是擁有一套資源管理架構,雖然各個組件的名字同樣,但這裏yarn只是一個資源的提供者,如果standalone模式,資源的提供者就是物理機或者虛擬機了。 oop
1)執行Flink程序,就相似client,主要是將代碼進行優化造成JobGraph,向yarn的ResourceManager中的ApplicationManager申請資源啓動AM(ApplicationMater),AM所在節點是Yarn上的NodeManager上;源碼分析
2)當AM起來以後會啓動Dispatcher、ResourceManager,其中Dispatcher會啓動JobManager,ResourceManager會啓動slotManager用於slot的管理和分配;
3)JobManager向ResourceManager(RM)申請資源用於任務的執行,最初TaskManager尚未啓動,此時,RM會向yarn去申請資源,得到資源後,會在資源中啓動TaskManager,相應啓動的slot會向slotManager中註冊,而後slotManager會將slot分配給只需資源的task,即向JobManager註冊信息,而後JobManager就會將任務提交到對應的slot中執行。其實Flink on yarn的session模式和Per-job模式最大的區別是,提交任務時RM已向Yarn申請了固定大小的資源,其TaskManager是已經啓動的。
資源分配如詳細過程圖下:
圖2 slot管理圖,源自Ref[1]
更詳細的過程解析,強烈推薦Ref [2],是阿里Flink大牛寫的,本博客在後期的源碼分析過程也多依據此博客。
提交任務語句
./flink run -m yarn-cluster ./flinkExample.jar
flink腳本的入口類是org.apache.flink.client.cli.CliFrontend。
1)在CliFronted類的main()方法中,會加載flnk以及一些全局的配置項以後,根據命令行參數run,調用run()->runProgram()->deployJobCluster(),具體的代碼以下:
private <T> void runProgram( CustomCommandLine<T> customCommandLine, CommandLine commandLine, RunOptions runOptions, PackagedProgram program) throws ProgramInvocationException, FlinkException { final ClusterDescriptor<T> clusterDescriptor = customCommandLine.createClusterDescriptor(commandLine); try { final T clusterId = customCommandLine.getClusterId(commandLine); final ClusterClient<T> client; // directly deploy the job if the cluster is started in job mode and detached if (clusterId == null && runOptions.getDetachedMode()) { int parallelism = runOptions.getParallelism() == -1 ? defaultParallelism : runOptions.getParallelism(); //構建JobGraph final JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, configuration, parallelism); final ClusterSpecification clusterSpecification = customCommandLine.getClusterSpecification(commandLine);
//將任務提交到yarn上 client = clusterDescriptor.deployJobCluster( clusterSpecification, jobGraph, runOptions.getDetachedMode()); logAndSysout("Job has been submitted with JobID " + jobGraph.getJobID()); ...................... } else{........}
2)提交任務會調用YarnClusterDescriptor 類中deployJobCluster()->AbstractYarnClusterDescriptor類中deployInteral(),該方法會一直阻塞直到ApplicationMaster/JobManager在yarn上部署成功,其中最關鍵的調用是對startAppMaster()方法的調用,代碼以下:
1 protected ClusterClient<ApplicationId> deployInternal( 2 ClusterSpecification clusterSpecification, 3 String applicationName, 4 String yarnClusterEntrypoint, 5 @Nullable JobGraph jobGraph, 6 boolean detached) throws Exception { 7 8 //一、驗證集羣是否能夠訪問 9 //二、若用戶組是否開啓安全認證 10 //三、檢查配置以及vcore是否知足flink集羣申請的需求 11 //四、指定的對列是否存在 12 //五、檢查內存是否知足flink JobManager、NodeManager所需 13 //.................................... 14 15 //Entry 16 ApplicationReport report = startAppMaster( 17 flinkConfiguration, 18 applicationName, 19 yarnClusterEntrypoint, 20 jobGraph, 21 yarnClient, 22 yarnApplication, 23 validClusterSpecification); 24 25 //六、獲取flink集羣端口、地址信息 26 //.......................................... 27 }
3)startAppMaster()方法的所在類是AbstractYarnClutserDescriptor,該方法主要是將配置文件和相關文件上傳至分佈式存儲如HDFS,以及向Yarn上提交任務等,源碼分析以下:
1 public ApplicationReport startAppMaster( 2 Configuration configuration, 3 String applicationName, 4 String yarnClusterEntrypoint, 5 JobGraph jobGraph, 6 YarnClient yarnClient, 7 YarnClientApplication yarnApplication, 8 ClusterSpecification clusterSpecification) throws Exception { 9 10 // ....................... 11 12 //一、上傳conf目錄下logback.xml、log4j.properties 13 14 //二、上傳環境變量中FLINK_PLUGINS_DIR ,FLINK_LIB_DIR包含的jar 15 addEnvironmentFoldersToShipFiles(systemShipFiles); 16 //........... 17 //三、設置applications的高可用的方案,經過設置AM重啓次數,默認爲1 18 //四、上傳ship files、user jars、 19 //五、爲TaskManager設置slots、heap memory 20 //六、上傳flink-conf.yaml 21 //七、序列化JobGraph後上傳 22 //八、登陸權限檢查 23 24 //................. 25 26 //得到啓動AM container的Java命令 27 final ContainerLaunchContext amContainer = setupApplicationMasterContainer( 28 yarnClusterEntrypoint, 29 hasLogback, 30 hasLog4j, 31 hasKrb5, 32 clusterSpecification.getMasterMemoryMB()); 33 34 //九、爲aAM啓動綁定環境參數以及classpath和環境變量 35 36 //.......................... 37 38 final String customApplicationName = customName != null ? customName : applicationName; 39 //十、應用名稱、應用類型、用戶提交的應用ContainerLaunchContext 40 appContext.setApplicationName(customApplicationName); 41 appContext.setApplicationType(applicationType != null ? applicationType : "Apache Flink"); 42 appContext.setAMContainerSpec(amContainer); 43 appContext.setResource(capability); 44 45 if (yarnQueue != null) { 46 appContext.setQueue(yarnQueue); 47 } 48 49 setApplicationNodeLabel(appContext); 50 51 setApplicationTags(appContext); 52 53 //十一、部署失敗刪除yarnFilesDir 54 // add a hook to clean up in case deployment fails 55 Thread deploymentFailureHook = new DeploymentFailureHook(yarnClient, yarnApplication, yarnFilesDir); 56 Runtime.getRuntime().addShutdownHook(deploymentFailureHook); 57 58 LOG.info("Submitting application master " + appId); 59 60 //Entry 61 yarnClient.submitApplication(appContext); 62 63 LOG.info("Waiting for the cluster to be allocated"); 64 final long startTime = System.currentTimeMillis(); 65 ApplicationReport report; 66 YarnApplicationState lastAppState = YarnApplicationState.NEW; 67 //十二、阻塞等待直到running 68 loop: while (true) { 69 //................... 70 //每隔250ms經過YarnClient獲取應用報告 71 Thread.sleep(250); 72 } 73 //........................... 74 //1三、部署成功刪除shutdown回調 75 // since deployment was successful, remove the hook 76 ShutdownHookUtil.removeShutdownHook(deploymentFailureHook, getClass().getSimpleName(), LOG); 77 return report; 78 }
4)應用提交的Entry是YarnClientImpl中submitApplication()方法,主要是經過ClientRMService的實例調用方法submitApplication將應用請求提交到Yarn上,以後,程序會循環等待應用任務提交成功(不是NEW和NEW_SAVING即認爲成功)。該方法代碼量較少,就不詳細展開了。
至此,client端的流程就走完了,應用請求已提交到Yarn的ResourceManager上了,下面着重分析Flink Cluster啓動流程。
1)在ClientRMService類的submitApplication()方法中,會先檢查任務是否已經提交(經過applicationID)、Yarn的queue是否爲空等,而後將請求提交到RMAppManager(ARN RM內部管理應用生命週期的組件),若提交成功會輸出Application with id {applicationId.getId()} submitted by user {user}的信息,具體分析以下:
1 public SubmitApplicationResponse submitApplication( 2 SubmitApplicationRequest request) throws YarnException { 3 ApplicationSubmissionContext submissionContext = request 4 .getApplicationSubmissionContext(); 5 ApplicationId applicationId = submissionContext.getApplicationId(); 6 7 // ApplicationSubmissionContext needs to be validated for safety - only 8 // those fields that are independent of the RM's configuration will be 9 // checked here, those that are dependent on RM configuration are validated 10 // in RMAppManager. 11 //這裏僅驗證不屬於RM的配置,屬於RM的配置將在RMAppManager驗證 12 13 //一、檢查application是否已提交 14 //二、檢查提交的queue是否爲null,是,則設置爲默認queue(default) 15 //三、檢查是否設置application名,否,則爲默認(N/A) 16 //四、檢查是否設置application類型,否,則爲默認(YARN);是,若名字長度大於給定的長度(20),則會截斷 17 //............................. 18 19 try { 20 // call RMAppManager to submit application directly 21 //直接submit任務 22 rmAppManager.submitApplication(submissionContext, 23 System.currentTimeMillis(), user); 24 25 //submit成功 26 LOG.info("Application with id " + applicationId.getId() + 27 " submitted by user " + user); 28 RMAuditLogger.logSuccess(user, AuditConstants.SUBMIT_APP_REQUEST, 29 "ClientRMService", applicationId); 30 } catch (YarnException e) { 31 //失敗會拋出異常 32 } 33 //.................. 34 }
2)RMAppManager類的submitApplication()方法主要是建立RMApp和向ResourceScheduler申請AM container,該部分直到在NodeManager上啓動AM container都是Yarn自己所爲,其中具體過程在這裏不詳細分析,詳細過程後期會分析,這裏僅給出入口,代碼以下:
1 protected void submitApplication( 2 ApplicationSubmissionContext submissionContext, long submitTime, 3 String user) throws YarnException { 4 ApplicationId applicationId = submissionContext.getApplicationId(); 5 6 //一、建立RMApp,若具備相同的applicationId會拋出異常 7 RMAppImpl application = 8 createAndPopulateNewRMApp(submissionContext, submitTime, user); 9 ApplicationId appId = submissionContext.getApplicationId(); 10 11 //security模式有simple和kerberos,在配置文件中配置 12 //開始kerberos 13 if (UserGroupInformation.isSecurityEnabled()) { 14 //.................. 15 } else { 16 //simple模式 17 // Dispatcher is not yet started at this time, so these START events 18 // enqueued should be guaranteed to be first processed when dispatcher 19 // gets started. 20 //二、向ResourceScheduler(可插拔的資源調度器)提交任務?????????? 21 this.rmContext.getDispatcher().getEventHandler() 22 .handle(new RMAppEvent(applicationId, RMAppEventType.START)); 23 } 24 }
3)Flink在Per-job模式下,AM container加載運行的入口是YarnJobClusterEntryPoint中的main()方法,源碼分析以下:
1 public static void main(String[] args) { 2 // startup checks and logging 3 //一、輸出環境信息如用戶、環境變量、Java版本等,以及JVM參數 4 EnvironmentInformation.logEnvironmentInfo(LOG, YarnJobClusterEntrypoint.class.getSimpleName(), args); 5 //二、註冊處理各類SIGNAL的handler:記錄到日誌 6 SignalHandler.register(LOG); 7 //三、註冊JVM關閉保障的shutdown hook:避免JVM退出時被其餘shutdown hook阻塞 8 JvmShutdownSafeguard.installAsShutdownHook(LOG); 9 10 Map<String, String> env = System.getenv(); 11 12 final String workingDirectory = env.get(ApplicationConstants.Environment.PWD.key()); 13 Preconditions.checkArgument( 14 workingDirectory != null, 15 "Working directory variable (%s) not set", 16 ApplicationConstants.Environment.PWD.key()); 17 18 try { 19 //四、輸出Yarn運行的用戶信息 20 YarnEntrypointUtils.logYarnEnvironmentInformation(env, LOG); 21 } catch (IOException e) { 22 LOG.warn("Could not log YARN environment information.", e); 23 } 24 //五、加載flink的配置 25 Configuration configuration = YarnEntrypointUtils.loadConfiguration(workingDirectory, env, LOG); 26 27 YarnJobClusterEntrypoint yarnJobClusterEntrypoint = new YarnJobClusterEntrypoint( 28 configuration, 29 workingDirectory); 30 //六、Entry 建立並啓動各種內部服務 31 ClusterEntrypoint.runClusterEntrypoint(yarnJobClusterEntrypoint); 32 }
4)後續的調用過程:ClusterEntrypoint類中runClusterEntrypoint()->startCluster()->runCluster(),該過程比較簡單,這裏着實分析runCluster()方法,以下:
1 //#ClusterEntrypint.java 2 private void runCluster(Configuration configuration) throws Exception { 3 synchronized (lock) { 4 initializeServices(configuration); 5 6 // write host information into configuration 7 configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress()); 8 configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort()); 9 //一、建立dispatcherResour、esourceManager對象,其中有從本地從新建立JobGraph的過程 10 final DispatcherResourceManagerComponentFactory<?> dispatcherResourceManagerComponentFactory = createDispatcherResourceManagerComponentFactory(configuration); 11 //二、Entry 啓動RpcService、HAService、BlobServer、HeartbeatServices、MetricRegistry、ExecutionGraphStore等 12 clusterComponent = dispatcherResourceManagerComponentFactory.create( 13 configuration, 14 commonRpcService, 15 haServices, 16 blobServer, 17 heartbeatServices, 18 metricRegistry, 19 archivedExecutionGraphStore, 20 new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService()), 21 this); 22 23 //............ 24 } 25 }
4)在create()方法中,會啓動Flink的諸多組件,其中與提交任務強相關的是Dispatcher、ResourceManager,具體代碼以下:
1 public DispatcherResourceManagerComponent<T> create( 2 Configuration configuration, 3 RpcService rpcService, 4 HighAvailabilityServices highAvailabilityServices, 5 BlobServer blobServer, 6 HeartbeatServices heartbeatServices, 7 MetricRegistry metricRegistry, 8 ArchivedExecutionGraphStore archivedExecutionGraphStore, 9 MetricQueryServiceRetriever metricQueryServiceRetriever, 10 FatalErrorHandler fatalErrorHandler) throws Exception { 11 12 LeaderRetrievalService dispatcherLeaderRetrievalService = null; 13 LeaderRetrievalService resourceManagerRetrievalService = null; 14 WebMonitorEndpoint<U> webMonitorEndpoint = null; 15 ResourceManager<?> resourceManager = null; 16 JobManagerMetricGroup jobManagerMetricGroup = null; 17 T dispatcher = null; 18 19 try { 20 dispatcherLeaderRetrievalService = highAvailabilityServices.getDispatcherLeaderRetriever(); 21 22 resourceManagerRetrievalService = highAvailabilityServices.getResourceManagerLeaderRetriever(); 23 24 final LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever = new RpcGatewayRetriever<>( 25 rpcService, 26 DispatcherGateway.class, 27 DispatcherId::fromUuid, 28 10, 29 Time.milliseconds(50L)); 30 31 final LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever = new RpcGatewayRetriever<>( 32 rpcService, 33 ResourceManagerGateway.class, 34 ResourceManagerId::fromUuid, 35 10, 36 Time.milliseconds(50L)); 37 38 final ExecutorService executor = WebMonitorEndpoint.createExecutorService( 39 configuration.getInteger(RestOptions.SERVER_NUM_THREADS), 40 configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY), 41 "DispatcherRestEndpoint"); 42 43 final long updateInterval = configuration.getLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL); 44 final MetricFetcher metricFetcher = updateInterval == 0 45 ? VoidMetricFetcher.INSTANCE 46 : MetricFetcherImpl.fromConfiguration( 47 configuration, 48 metricQueryServiceRetriever, 49 dispatcherGatewayRetriever, 50 executor); 51 52 webMonitorEndpoint = restEndpointFactory.createRestEndpoint( 53 configuration, 54 dispatcherGatewayRetriever, 55 resourceManagerGatewayRetriever, 56 blobServer, 57 executor, 58 metricFetcher, 59 highAvailabilityServices.getWebMonitorLeaderElectionService(), 60 fatalErrorHandler); 61 62 log.debug("Starting Dispatcher REST endpoint."); 63 webMonitorEndpoint.start(); 64 65 final String hostname = getHostname(rpcService); 66 67 jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup( 68 metricRegistry, 69 hostname, 70 ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration)); 71 //一、返回的是new YarnResourceManager 72 /*調度過程:AbstractDispatcherResourceManagerComponentFactory 73 ->ActiveResourceManagerFactory 74 ->YarnResourceManagerFactory 75 */ 76 ResourceManager<?> resourceManager1 = resourceManagerFactory.createResourceManager( 77 configuration, 78 ResourceID.generate(), 79 rpcService, 80 highAvailabilityServices, 81 heartbeatServices, 82 metricRegistry, 83 fatalErrorHandler, 84 new ClusterInformation(hostname, blobServer.getPort()), 85 webMonitorEndpoint.getRestBaseUrl(), 86 jobManagerMetricGroup); 87 resourceManager = resourceManager1; 88 89 final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist(configuration, webMonitorEndpoint); 90 //二、在此反序列化獲取JobGraph實例;返回new MiniDispatcher 91 dispatcher = dispatcherFactory.createDispatcher( 92 configuration, 93 rpcService, 94 highAvailabilityServices, 95 resourceManagerGatewayRetriever, 96 blobServer, 97 heartbeatServices, 98 jobManagerMetricGroup, 99 metricRegistry.getMetricQueryServiceGatewayRpcAddress(), 100 archivedExecutionGraphStore, 101 fatalErrorHandler, 102 historyServerArchivist); 103 104 log.debug("Starting ResourceManager."); 105 //啓動resourceManager,此過程當中會經歷如下階段 106 //leader選舉->(ResourceManager.java中) 107 // ->grantLeadership(...) 108 // ->tryAcceptLeadership(...) 109 // ->slotManager的啓動 110 resourceManager.start(); 111 resourceManagerRetrievalService.start(resourceManagerGatewayRetriever); 112 113 log.debug("Starting Dispatcher."); 114 115 //啓動Dispatcher,經歷如下階段: 116 //leader選舉->(Dispatcher.java中)grantLeadership->tryAcceptLeadershipAndRunJobs 117 // ->createJobManagerRunner->startJobManagerRunner->jobManagerRunner.start() 118 // 119 //->(JobManagerRunner.java中)start()->leaderElectionService.start(...) 120 //->grantLeadership(...)->verifyJobSchedulingStatusAndStartJobManager(...) 121 //->startJobMaster(leaderSessionId)這裏的startJobmaster應該是啓動的JobManager 122 // 123 //->(JobManagerRunner.java中)jobMasterService.start(...) 124 //->(JobMaster.java)startJobExecution(...) 125 // ->{startJobMasterServices()在該方法中會啓動slotPool->resourceManagerLeaderRetriever.start(...)} 126 //->startJobExecution(...)-> 127 dispatcher.start(); 128 dispatcherLeaderRetrievalService.start(dispatcherGatewayRetriever); 129 130 return createDispatcherResourceManagerComponent( 131 dispatcher, 132 resourceManager, 133 dispatcherLeaderRetrievalService, 134 resourceManagerRetrievalService, 135 webMonitorEndpoint, 136 jobManagerMetricGroup); 137 138 } catch (Exception exception) { 139 // clean up all started components 140 //失敗會清除已啓動的組件 141 //.............. 142 } 143 }
5)此後,JobManager中的slotPool會向SlotManager申請資源,而SlotManager則向Yarn的ResourceManager申請,申請到後會啓動TaskManager,而後將slot信息註冊到slotManager和slotPool中,詳細過程在此就不展開分析了,留做後面分析。
該博客中還有諸多不完善的地方,須要本身後進一步的閱讀源碼、弄清設計架構後等一系列以後纔能有更好的完善,此外,後期也會對照着Flink 的Per-job模式下任務提交的詳細日誌進一步驗證。
如果文中有描述不清的,很是建議參考如下博文;如果存在不對的地方,很是歡迎大夥留言指出,謝謝了!
Ref
[1]https://files.alicdn.com/tpsservice/7bb8f513c765b97ab65401a1b78c8cb8.pdf
[2]https://yq.aliyun.com/articles/719262?spm=a2c4e.11153940.0.0.3ea9469ei7H3Wx#