By yyz940922原創html
項目模塊 (除去.git, .github, .idea, docs等):前端
flink-annotations: flink註解java
org.apache.flink.annotation 註解類node
Experimental.java (實驗性註解)python
package org.apache.flink.annotation; import java.lang.annotation.Documented; import java.lang.annotation.ElementType; import java.lang.annotation.Target; /** * Annotation to mark classes for experimental use. * * <p>Classes with this annotation are neither battle-tested nor stable, and may be changed or removed in future versions. * 有該註解的類未通過考驗, 且不必定穩定, 在將來版本可能被移除 * * <p>This annotation also excludes classes with evolving interfaces / signatures * annotated with {@link Public} and {@link PublicEvolving}. * 該註解排除了帶有Public註解和PublicEvolving註解的接口或署名 */ @Documented @Target({ElementType.TYPE, ElementType.METHOD, ElementType.FIELD, ElementType.CONSTRUCTOR }) @Public public @interface Experimental { }
package org.apache.flink.annotation; import java.lang.annotation.Documented; import java.lang.annotation.ElementType; import java.lang.annotation.Target; /** * Interface to mark methods within stable, public APIs as an internal developer API. * 該註釋用於將帶有穩定公開API的方法標註爲內部開發者API * <p>Developer APIs are stable but internal to Flink and might change across releases. * 開發者API雖然穩定, 可是爲flink內部全部且可能會隨着版本迭代改變 */ @Documented @Target({ ElementType.TYPE, ElementType.METHOD, ElementType.CONSTRUCTOR }) @Public public @interface Internal { }
package org.apache.flink.annotation; import java.lang.annotation.Documented; import java.lang.annotation.ElementType; import java.lang.annotation.Target; /** * Annotation for marking classes as public, stable interfaces. * 用於標註穩定公開的接口 * * <p>Classes, methods and fields with this annotation are stable across minor releases (1.0, 1.1, 1.2). In other words, applications using @Public annotated classes will compile against newer versions of the same major release. * 帶有該註解的類, 方法和域在1.0, 1.1, 1.2最小發行版是穩定的。也就是說帶有Public註解的類會被相 * 同大版本的flink編譯 * <p>Only major releases (1.0, 2.0, 3.0) can break interfaces with this annotation. * 只有大版本不一樣纔會破壞帶有此接註解的接口 */ @Documented @Target(ElementType.TYPE) @Public public @interface Public {}
package org.apache.flink.annotation; import java.lang.annotation.Documented; import java.lang.annotation.ElementType; import java.lang.annotation.Target; /** * Annotation to mark classes and methods for public use, but with evolving interfaces. * 帶有該註解的類和方法可被公開使用, 可是須要實現接口 * * <p>Classes and methods with this annotation are intended for public use and have * stable behavior. * 帶有該註解的方法和類擬被公共使用且表現穩定 * * However, their interfaces and signatures are not considered to be stable and might be changed across versions. * 然鵝, 他們所實現的接口和署名不必定穩定, 可能隨着版本迭代而改變 * * <p>This annotation also excludes methods and classes with evolving interfaces / * signatures within classes annotated with {@link Public}. * 該接口還排除了帶有Public註解的接口/署名 的類和方法 */ @Documented @Target({ ElementType.TYPE, ElementType.METHOD, ElementType.FIELD, ElementType.CONSTRUCTOR }) @Public public @interface PublicEvolving { }
package org.apache.flink.annotation; import java.lang.annotation.Documented; import java.lang.annotation.ElementType; import java.lang.annotation.Target; /** * This annotations declares that a function, field, constructor, or entire type, is only visible for testing purposes. * 帶有該註解的方法, 域, 構造器 或 整個類型 只有測試可見 * * <p>This annotation is typically attached when for example a method should be {@code * private} * 該註解爲典型依附註解, 好比依附於私有方法 * (because it is not intended to be called externally), but cannot be declared private, because some tests need to have access to it. * 由於它並不傾向於被外部調取, 但又不能被聲明爲私有的(由於有些測試須要獲取它)。 */ @Documented @Target({ ElementType.TYPE, ElementType.METHOD, ElementType.FIELD, ElementType.CONSTRUCTOR }) @Internal public @interface VisibleForTesting {}
org.apache.flink.annotation.docs 註解文件類linux
package org.apache.flink.annotation.docs; import org.apache.flink.annotation.Internal; import java.lang.annotation.Target; /** * A class that specifies a group of config options. The name of the group will be * used as the basis for the filename of the generated html file, as defined in * {@link ConfigOptionsDocGenerator}. * 這是一個明確規定了一組配置選項的類。該組的名字會做爲產生的html文件的初始名稱, 這 * 在ConfigOptionsDocGenerator(配置選項文件生成器)已經定義過了。 * * @see ConfigGroups */ @Target({}) @Internal public @interface ConfigGroup { String name(); String keyPrefix(); }
package org.apache.flink.annotation.docs; import org.apache.flink.annotation.Internal; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; /** * Annotation used on classes containing config options that enables the separation * of options into different tables based on key prefixes. * 該註解被用於包含可以基於key前綴將選項分離入不一樣表中的類。 * * A config option is assigned to a {@link ConfigGroup} if the option key matches the * group prefix. * 若是配置選項的key與組的前綴相匹配, 則其等價於ConfigGroup註解。 * * If a key matches multiple prefixes the longest matching prefix takes priority. An * option is never assigned to multiple groups. * 若是一個key匹配多個前綴, 最長的匹配的前綴有優先權。因此一個選項不要被分配給多個組。 * * Options that don't match any group are implicitly added to a default group. * 若是選項不匹配任何組, 該選項會被隱式添加入一個默認組 */ @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Internal public @interface ConfigGroups { ConfigGroup[] groups() default {}; }
package org.apache.flink.annotation.docs; import org.apache.flink.annotation.Internal; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; /** * Collection of annotations to modify the behavior of the documentation generators. * 被用於修改文檔生成器行爲的註解集合 */ public final class Documentation { /** * Annotation used on config option fields to override the documented default. * 重寫被documented註釋的默認配置選項 */ @Target(ElementType.FIELD) @Retention(RetentionPolicy.RUNTIME) @Internal public @interface OverrideDefault { String value(); } /** * Annotation used on config option fields to include them in the "Common * Options" section. * 用於將配置選項域添加入經常使用選項 * * <p>The {@link CommonOption#position()} argument controls the position in the * generated table, with lower values being placed at the top. Fields with the * same position are sorted alphabetically by key. * 經常使用選項中的位置參數控制了該配置選項域在表中的位置, 越低的值越在頭部。相同位置的域按 * key的字典序排序 */ @Target(ElementType.FIELD) @Retention(RetentionPolicy.RUNTIME) @Internal public @interface CommonOption { // 位置的內存 int POSITION_MEMORY = 10; // 位置的並行度(與拓撲結構相關) int POSITION_PARALLELISM_SLOTS = 20; // 容錯率係數 int POSITION_FAULT_TOLERANCE = 30; // 位置高可用係數 int POSITION_HIGH_AVAILABILITY = 40; // 位置安全係數 int POSITION_SECURITY = 50; int position() default Integer.MAX_VALUE; } /** * Annotation used on table config options for adding meta data labels. * 用於向標配置選項表中添加元數據標籤的註解 * * <p>The {@link TableOption#execMode()} argument indicates the execution mode * the config works for (batch, streaming or both). * 表選項執行模式參數代表了工做配置的執行方式(批處理, 流 或 二者都用) */ @Target(ElementType.FIELD) @Retention(RetentionPolicy.RUNTIME) @Internal public @interface TableOption { ExecMode execMode(); } /** * The execution mode the config works for. * 工做配置執行方式(枚舉類, 單例) */ public enum ExecMode { BATCH("Batch"), STREAMING("Streaming"), BATCH_STREAMING("Batch and Streaming"); private final String name; ExecMode(String name) { this.name = name; } @Override public String toString() { return name; } } /** * Annotation used on config option fields to exclude the config option from * documentation. * 將配置選項從文檔中排除的註解 */ @Target(ElementType.FIELD) @Retention(RetentionPolicy.RUNTIME) @Internal public @interface ExcludeFromDocumentation { /** * The optional reason why the config option is excluded from documentation. * 可將該值替換爲將此配置選項從文檔中排除的緣由。 */ String value() default ""; } private Documentation(){ } }
flink-clientsgit
org.apache.flink.clientgithub
package org.apache.flink.client; import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders; import java.io.File; import java.io.IOException; import java.net.URISyntaxException; import java.net.URL; import java.util.List; import java.util.jar.JarFile; /** * Utility functions for Flink client. */ public enum ClientUtils { ; // 檢查Jar文件 public static void checkJarFile(URL jar) throws IOException { File jarFile; try { jarFile = new File(jar.toURI()); } catch (URISyntaxException e) { throw new IOException("JAR file path is invalid '" + jar + '\''); } // jar 文件不存在 if (!jarFile.exists()) { throw new IOException("JAR file does not exist '" + jarFile.getAbsolutePath() + '\''); } // jar 文件不可讀取 if (!jarFile.canRead()) { throw new IOException("JAR file can't be read '" + jarFile.getAbsolutePath() + '\''); } try (JarFile ignored = new JarFile(jarFile)) { // verify that we can open the Jar file 驗證jar文件可否打開 } catch (IOException e) { throw new IOException("Error while opening jar file '" + jarFile.getAbsolutePath() + '\'', e); } } // 建立用戶代碼加載器 public static ClassLoader buildUserCodeClassLoader(List<URL> jars, List<URL> classpaths, ClassLoader parent) { URL[] urls = new URL[jars.size() + classpaths.size()]; for (int i = 0; i < jars.size(); i++) { urls[i] = jars.get(i); } for (int i = 0; i < classpaths.size(); i++) { urls[i + jars.size()] = classpaths.get(i); } return FlinkUserCodeClassLoaders.parentFirst(urls, parent); } }
package org.apache.flink.client; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.Plan; import org.apache.flink.api.common.PlanExecutor; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.RestOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.optimizer.DataStatistics; import org.apache.flink.optimizer.Optimizer; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.minicluster.JobExecutorService; import org.apache.flink.runtime.minicluster.MiniCluster; import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; import org.apache.flink.runtime.minicluster.RpcServiceSharing; import static org.apache.flink.util.Preconditions.checkNotNull; /** * A PlanExecutor that runs Flink programs on a local embedded Flink runtime instance. * 一個會在本地嵌入式運行實例上跑Flink程序的計劃執行器。 * * <p>By simply calling the {@link #executePlan(org.apache.flink.api.common.Plan)} * method, this executor still start up and shut down again immediately after the program * finished.</p> * 經過調用executePlan() 方法, 該執行器就會啓動執行程序並在程序結束時迅速關閉。 * * <p>To use this executor to execute many dataflow programs that constitute one job * together,then this executor needs to be explicitly started, to keep running across * several executions.</p> * 當使用該執行器執行一個有多個數據流程序組成的任務時, 該執行器須要被明確地啓動來穿過許多的執行程 * 序。 */ public class LocalExecutor extends PlanExecutor { /** Custom user configuration for the execution. 客戶用戶配置文件*/ private final Configuration baseConfiguration; public LocalExecutor() { this(new Configuration()); } public LocalExecutor(Configuration conf) { this.baseConfiguration = checkNotNull(conf); } // 根據配置文件信息建立工做執行服務 private JobExecutorService createJobExecutorService(Configuration configuration) throws Exception { // 若是配置文件中沒有捆綁端口, 就將端口設置爲0 if (!configuration.contains(RestOptions.BIND_PORT)) { configuration.setString(RestOptions.BIND_PORT, "0"); } // 使用了構造器模式, 生成不變的最小集羣配置文件 final MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder() .setConfiguration(configuration) .setNumTaskManagers( // 任務控制器數量 configuration.getInteger( ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, ConfigConstants.DEFAULT_LOCAL_NUMBER_TASK_MANAGER)) .setRpcServiceSharing(RpcServiceSharing.SHARED) // 設置遠程服務共享 .setNumSlotsPerTaskManager( // 設置每一個任務管理器的插槽數量 configuration.getInteger( TaskManagerOptions.NUM_TASK_SLOTS, 1)) .build(); // 初始化最小集羣 final MiniCluster miniCluster = new MiniCluster(miniClusterConfiguration); miniCluster.start(); configuration.setInteger(RestOptions.PORT, miniCluster.getRestAddress().get().getPort()); return miniCluster; } /** * Executes the given program on a local runtime and waits for the job to finish. * 在本地運行程序並等待程序結束 * * <p>If the executor has not been started before, this starts the executor and shuts * it down after the job finished. If the job runs in session mode, the executor is * kept alive until no more references to the executor exist.</p> * 若是該執行器此前沒有被啓動過, 那麼此次啓動會在任務完成後自動關閉該執行器 * * @param plan The plan of the program to execute. * 執行的計劃 * @return The net runtime of the program, in milliseconds. * 網絡運行時間(毫秒) * * @throws Exception Thrown, if either the startup of the local execution context, or * the execution caused an exception. * 開啓本地執行內容或執行自己致使的異常 */ @Override public JobExecutionResult executePlan(Plan plan) throws Exception { // 查看方法非空 checkNotNull(plan); // 初始化任務執行器的服務配置(不變) final Configuration jobExecutorServiceConfiguration = configureExecution(plan); try (final JobExecutorService executorService = createJobExecutorService(jobExecutorServiceConfiguration)) { // 初始化優化程序 Optimizer pc = new Optimizer(new DataStatistics(), jobExecutorServiceConfiguration); OptimizedPlan op = pc.compile(plan); // 初始化任務視圖生成器 JobGraphGenerator jgg = new JobGraphGenerator(jobExecutorServiceConfiguration); // 初始化任務視圖 JobGraph jobGraph = jgg.compileJobGraph(op, plan.getJobId()); // 執行任務阻塞, 返回任務執行結果 return executorService.executeJobBlocking(jobGraph); } } // 配置執行 private Configuration configureExecution(final Plan plan) { // 初始化執行器配置 final Configuration executorConfiguration = createExecutorServiceConfig(plan); // 設置計劃並行數 setPlanParallelism(plan, executorConfiguration); return executorConfiguration; } // 建立執行器服務配置 private Configuration createExecutorServiceConfig(final Plan plan) { final Configuration newConfiguration = new Configuration(); // 將添加的配置選項的任務槽數量做爲key, 將計劃的最大並行數做爲value添加至新配置文件 newConfiguration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, plan.getMaximumParallelism()); // 將全部基本配置信息添加至新配置中 newConfiguration.addAll(baseConfiguration); return newConfiguration; } // 設置並行數 private void setPlanParallelism(final Plan plan, final Configuration executorServiceConfig) { // TODO: Set job's default parallelism to max number of slots // 將任務的默認並行數設置爲任務槽的最大數(任務管理器槽數 * 任務管理器數量) final int slotsPerTaskManager = executorServiceConfig.getInteger( TaskManagerOptions.NUM_TASK_SLOTS, plan.getMaximumParallelism()); final int numTaskManagers = executorServiceConfig.getInteger( ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1); plan.setDefaultParallelism(slotsPerTaskManager * numTaskManagers); } }
package org.apache.flink.client; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.Plan; import org.apache.flink.api.common.PlanExecutor; import org.apache.flink.client.program.ClusterClient; import org.apache.flink.client.program.rest.RestClusterClient; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.RestOptions; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import java.net.InetSocketAddress; import java.net.URL; import java.util.Collections; import java.util.List; import static org.apache.flink.util.Preconditions.checkNotNull; /** * The RemoteExecutor is a {@link org.apache.flink.api.common.PlanExecutor} that takes the program and ships it to a remote Flink cluster for execution. * 遠程執行器是一個獲取程序並將它運送至一個遠程Flink集羣去執行的計劃執行器 * * <p>The RemoteExecutor is pointed at the JobManager and gets the program and (if * necessary) the set of libraries that need to be shipped together with the program.</p> * 遠程執行器指向任務管理器並獲取要執行的程序。(若是須要的話),它會將 所需的lib文件同程序一塊兒裝 * 載。 * * <p>The RemoteExecutor is used in the {@link * org.apache.flink.api.java.RemoteEnvironment} to remotely execute program parts.</p> * 遠程執行器在遠程環境中被用於遠程執行部分程序 */ public class RemoteExecutor extends PlanExecutor { // jar文件列表 private final List<URL> jarFiles; // 全局類路徑列表 private final List<URL> globalClasspaths; // 客戶端配置 private final Configuration clientConfiguration; private int defaultParallelism = 1; public RemoteExecutor(String hostname, int port) { this(hostname, port, new Configuration(), Collections.emptyList(), Collections.emptyList()); } public RemoteExecutor( String hostname, int port, Configuration clientConfiguration, List<URL> jarFiles, List<URL> globalClasspaths) { this(new InetSocketAddress(hostname, port), clientConfiguration, jarFiles, globalClasspaths); } public RemoteExecutor( InetSocketAddress inet, Configuration clientConfiguration, List<URL> jarFiles, List<URL> globalClasspaths) { this.clientConfiguration = clientConfiguration; this.jarFiles = jarFiles; this.globalClasspaths = globalClasspaths; clientConfiguration.setString(JobManagerOptions.ADDRESS, inet.getHostName()); clientConfiguration.setInteger(JobManagerOptions.PORT, inet.getPort()); clientConfiguration.setInteger(RestOptions.PORT, inet.getPort()); } // ------------------------------------------------------------------------ // Properties // ------------------------------------------------------------------------ /** * Sets the parallelism that will be used when neither the program does not define * any parallelism at all. * 當沒程序沒有定義任何的並行量時, 須要設置被使用的並行量 * * @param defaultParallelism The default parallelism for the executor. * 執行器的默認並行量 */ public void setDefaultParallelism(int defaultParallelism) { if (defaultParallelism < 1) { throw new IllegalArgumentException("The default parallelism must be at least one"); } this.defaultParallelism = defaultParallelism; } /** * Gets the parallelism that will be used when neither the program does not define * any parallelism at all. * 獲取默認並行量 * * @return The default parallelism for the executor. */ public int getDefaultParallelism() { return defaultParallelism; } // ------------------------------------------------------------------------ // Executing programs // ------------------------------------------------------------------------ @Override public JobExecutionResult executePlan(Plan plan) throws Exception { // 查看計劃是否爲空 checkNotNull(plan); try (ClusterClient<?> client = new RestClusterClient<>(clientConfiguration, "RemoteExecutor")) { // 類加載器, 雙親委派 ClassLoader classLoader = ClientUtils.buildUserCodeClassLoader(jarFiles, globalClasspaths, getClass().getClassLoader()); // 返回任務提交結果 return client.run( plan, jarFiles, globalClasspaths, classLoader, defaultParallelism, SavepointRestoreSettings.none()).getJobExecutionResult(); } } }
org.apache.flink.client.cliweb
CustomCommandLine 客戶端命令行shell
package org.apache.flink.client.cli; import org.apache.flink.client.deployment.ClusterDescriptor; import org.apache.flink.client.deployment.ClusterSpecification; import org.apache.flink.util.FlinkException; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Options; import javax.annotation.Nullable; /** * Custom command-line interface to load hooks for the command-line interface. * 加載與命令行接口相掛鉤的客戶端命令行接口 */ public interface CustomCommandLine<T> { /** * Signals whether the custom command-line wants to execute or not. * 判斷該客戶端命令行是否須要被執行的信號 * @param commandLine The command-line options * @return True if the command-line wants to run, False otherwise */ boolean isActive(CommandLine commandLine); /** * Gets the unique identifier of this CustomCommandLine. * 獲取該命令行的惟一識別標識 * @return A unique identifier */ String getId(); /** * Adds custom options to the existing run options. * 將用戶選項添加至已存在的運行選項 * @param baseOptions The existing options. */ void addRunOptions(Options baseOptions); /** * Adds custom options to the existing general options. * 將用戶選項添加至已存在的通用選項 * @param baseOptions The existing options. */ void addGeneralOptions(Options baseOptions); /** * Create a {@link ClusterDescriptor} from the given configuration, * configuration directory and the command line. * 根據現有配置, 配置目錄和命令行建立一個集羣描述器 * * @param commandLine containing command line options relevant for the * ClusterDescriptor * commandLine參數包含了全部與集羣描述器相關的命令行選項 * * @return ClusterDescriptor * @throws FlinkException if the ClusterDescriptor could not be created * 若是集羣描述器不能被建立, 就拋出該異常 */ ClusterDescriptor<T> createClusterDescriptor(CommandLine commandLine) throws FlinkException; /** * Returns the cluster id if a cluster id was specified on the command line, * otherwise it returns null. * 猶如集羣id在命令行中有被具體詳述, 就返回該集羣的id, 不然就返回空 * * <p>A cluster id identifies a running cluster, e.g. the Yarn application id * for a Flink cluster running on Yarn. * 一個集羣id標註了一個運行中的集羣, 好比yarn應用id標註了一個運行在yarn上的Flink集羣 * * @param commandLine containing command line options relevant for the cluster * id retrieval * commandLine參數包含了與集羣id檢索相關的命令行選項 * @return Cluster id identifying the cluster to deploy jobs to or null * 返回要部署任務的集羣的id 或 空 */ @Nullable T getClusterId(CommandLine commandLine); /** * Returns the {@link ClusterSpecification} specified by the configuration and * the command line options. This specification can be used to deploy a new * Flink cluster. * 返回由配置文件及命令行選項決定的集羣規格。該規格能夠被用於部署一個新的Flink集羣。 * * @param commandLine containing command line options relevant for the * ClusterSpecification * commandLine參數包含了與集羣規格相關的命令行選項 * * @return ClusterSpecification for a new Flink cluster * 爲一個新的Flink集羣返回集羣規格 * @throws FlinkException if the ClusterSpecification could not be created * 若是集羣規格未被建立, 拋出該異常 */ ClusterSpecification getClusterSpecification(CommandLine commandLine) throws FlinkException; // 默認解析命令行選項(stopAtNonOptions: 是否在沒有選項時中止) default CommandLine parseCommandLineOptions(String[] args, boolean stopAtNonOptions) throws CliArgsException { final Options options = new Options(); // 添加至通用選項 addGeneralOptions(options); // 添加至運行選項 addRunOptions(options); // 經過客戶端頭尾解析器解析選項, 參數 和 是否在沒有選項時中止解析, 返回命令行 return CliFrontendParser.parse(options, args, stopAtNonOptions); } }
AbstractCustomCommandLine 抽象用戶命令行(抽象類)
package org.apache.flink.client.cli; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.UnmodifiableConfiguration; import org.apache.flink.util.FlinkException; import org.apache.flink.util.NetUtils; import org.apache.flink.util.Preconditions; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import java.net.InetSocketAddress; import static org.apache.flink.client.cli.CliFrontend.setJobManagerAddressInConfig; /** * Base class for {@link CustomCommandLine} implementations which specify a JobManager * address and a ZooKeeper namespace. * 用戶命令行接口的基本類, 實現該接口時須要列出一個任務管理器地址及Zookeeper名稱空間 */ public abstract class AbstractCustomCommandLine<T> implements CustomCommandLine<T> { // Option構造器: 選項(縮寫名), 選項(長名), 是否有參數, 描述 protected final Option zookeeperNamespaceOption = new Option("z", "zookeeperNamespace", true, "Namespace to create the Zookeeper sub-paths for high availability mode"); protected final Option addressOption = new Option("m", "jobmanager", true, "Address of the JobManager (master) to which to connect. " + "Use this flag to connect to a different JobManager than the one specified in the configuration."); protected final Configuration configuration; protected AbstractCustomCommandLine(Configuration configuration) { // 調用前置條件類中的檢查非空方法來檢查當前配置, 檢查完畢後, 將當前配置設置爲沒法修改狀態 this.configuration = new UnmodifiableConfiguration(Preconditions.checkNotNull(configuration)); } public Configuration getConfiguration() { return configuration; } @Override public void addRunOptions(Options baseOptions) { // nothing to add here } @Override public void addGeneralOptions(Options baseOptions) { baseOptions.addOption(addressOption); baseOptions.addOption(zookeeperNamespaceOption); } /** * Override configuration settings by specified command line options. * 經過具體的命令行選項重寫配置文件設定 * * @param commandLine containing the overriding values * commandLine 參數包含了重寫的值 * @return Effective configuration with the overridden configuration settings * 返回帶有重寫的配置設置的有效配置文件 */ // applyCommandLineOptionsToConfiguration: 將命令行選項應用到配置中 protected Configuration applyCommandLineOptionsToConfiguration(CommandLine commandLine) throws FlinkException { final Configuration resultingConfiguration = new Configuration(configuration); // 若是命令行有地址選項 if (commandLine.hasOption(addressOption.getOpt())) { // 獲取端口 String addressWithPort = commandLine.getOptionValue(addressOption.getOpt()); // 獲取地址 InetSocketAddress jobManagerAddress = NetUtils.parseHostPortAddress(addressWithPort); // 設置配置中的任務管理器地址 setJobManagerAddressInConfig(resultingConfiguration, jobManagerAddress); } // 若是命令行有Zookeeper選項地址 if (commandLine.hasOption(zookeeperNamespaceOption.getOpt())) { // 獲取Zookeeper的名稱空間 String zkNamespace = commandLine.getOptionValue(zookeeperNamespaceOption.getOpt()); // 將高可用集羣選項的集羣Id做爲key, 將Zookeeper名稱空間做爲value添加至結果配置中 // 這實際上是個裝飾器模式(setString方法內部調用的是setValueInternal(key.key(), value)), 有點坑 resultingConfiguration.setString(HighAvailabilityOptions.HA_CLUSTER_ID, zkNamespace); } return resultingConfiguration; } }
```java
CliArgsException 客戶端參數異常
package org.apache.flink.client.cli; /** * Special exception that is thrown when the command line parsing fails. * 當命令行解析失敗時會被拋出的異常 */ public class CliArgsException extends Exception { private static final long serialVersionUID = 1L; public CliArgsException(String message) { super(message); } public CliArgsException(String message, Throwable cause) { super(message, cause); } }
CliFrontend
package org.apache.flink.client.cli; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobSubmissionResult; import org.apache.flink.api.common.accumulators.AccumulatorHelper; import org.apache.flink.client.deployment.ClusterDescriptor; import org.apache.flink.client.deployment.ClusterSpecification; import org.apache.flink.client.program.ClusterClient; import org.apache.flink.client.program.PackagedProgram; import org.apache.flink.client.program.PackagedProgramUtils; import org.apache.flink.client.program.ProgramInvocationException; import org.apache.flink.client.program.ProgramMissingJobException; import org.apache.flink.client.program.ProgramParametrizationException; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.RestOptions; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.plugin.PluginUtils; import org.apache.flink.optimizer.DataStatistics; import org.apache.flink.optimizer.Optimizer; import org.apache.flink.optimizer.costs.DefaultCostEstimator; import org.apache.flink.optimizer.plan.FlinkPlan; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plan.StreamingPlan; import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.client.JobStatusMessage; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.security.SecurityConfiguration; import org.apache.flink.runtime.security.SecurityUtils; import org.apache.flink.runtime.util.EnvironmentInformation; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; import org.apache.flink.util.ShutdownHookUtil; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Options; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; import java.io.FileNotFoundException; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.UndeclaredThrowableException; import java.net.InetSocketAddress; import java.net.URL; import java.text.SimpleDateFormat; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Comparator; import java.util.Date; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; /** * Implementation of a simple command line frontend for executing * programs. * 實現一個簡單的命令行前端來執行程序 */ public class CliFrontend { // 獲取日誌 private static final Logger LOG = LoggerFactory.getLogger(CliFrontend.class); // actions 行爲: run, info, list, cancel, stop, savepoint private static final String ACTION_RUN = "run"; private static final String ACTION_INFO = "info"; private static final String ACTION_LIST = "list"; private static final String ACTION_CANCEL = "cancel"; private static final String ACTION_STOP = "stop"; private static final String ACTION_SAVEPOINT = "savepoint"; // configuration dir parameters 配置路徑參數 fallback: 應變計劃 private static final String CONFIG_DIRECTORY_FALLBACK_1 = "../conf"; private static final String CONFIG_DIRECTORY_FALLBACK_2 = "conf"; // -------------------------------------------------------------------------------------------- private final Configuration configuration; // 帶泛型的客戶命令行列表 private final List<CustomCommandLine<?>> customCommandLines; // 客戶命令行選項 private final Options customCommandLineOptions; // 客戶鏈接時間超時斷定 private final Duration clientTimeout; // 默認並行數 private final int defaultParallelism; public CliFrontend( Configuration configuration, List<CustomCommandLine<?>> customCommandLines) { this.configuration = Preconditions.checkNotNull(configuration); this.customCommandLines = Preconditions.checkNotNull(customCommandLines); // 使用文件系統初始化, 參數爲配置文件 和 根據根文件夾建立的插件管理器 FileSystem.initialize(configuration, PluginUtils.createPluginManagerFromRootFolder(configuration)); this.customCommandLineOptions = new Options(); // 遍歷客戶端命令行列表 for (CustomCommandLine<?> customCommandLine : customCommandLines) { customCommandLine.addGeneralOptions(customCommandLineOptions); customCommandLine.addRunOptions(customCommandLineOptions); } // 經過akka工具包 (scala寫的, 用於簡化編寫容錯的、高可伸縮性的 Java 和 Scala 的 Actor 模型應用) 獲取客戶端超時時間 this.clientTimeout = AkkaUtils.getClientTimeout(this.configuration); this.defaultParallelism = configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM); } // -------------------------------------------------------------------------------------------- // Getter & Setter // -------------------------------------------------------------------------------------------- /** * Getter which returns a copy of the associated configuration. * getter 返回一個相關配置文件的拷貝 * @return Copy of the associated configuration */ public Configuration getConfiguration() { Configuration copiedConfiguration = new Configuration(); copiedConfiguration.addAll(configuration); return copiedConfiguration; } public Options getCustomCommandLineOptions() { return customCommandLineOptions; } // -------------------------------------------------------------------------------------------- // Execute Actions 執行行爲 // -------------------------------------------------------------------------------------------- /** * Executions the run action. 執行運行行爲 * * @param args Command line arguments for the run action. */ protected void run(String[] args) throws Exception { LOG.info("Running 'run' command."); // 經過客戶端前端解析器獲取命令選項 final Options commandOptions = CliFrontendParser.getRunCommandOptions(); // 經過客戶端前端解析器獲取命令行選項(解析器合併命令選項和客戶端命令行選項) final Options commandLineOptions = CliFrontendParser.mergeOptions(commandOptions, customCommandLineOptions); // 經過客戶端前端解析器解析 命令行選項, args: 參數 是否在沒有選項時中止(true) final CommandLine commandLine = CliFrontendParser.parse(commandLineOptions, args, true); final RunOptions runOptions = new RunOptions(commandLine); // evaluate help flag 評估幫助標識 // 若是運行選項時打印幫助 if (runOptions.isPrintHelp()) { CliFrontendParser.printHelpForRun(customCommandLines); return; } // 若是運行選項不是python if (!runOptions.isPython()) { // Java program should be specified a JAR file // java 程序應該被規定爲一個jar文件 // 若是運行選項的jar包參數爲空, 就報該錯誤 if (runOptions.getJarFilePath() == null) { throw new CliArgsException("Java program should be specified a JAR file."); } } // 打包的程序 final PackagedProgram program; try { // 向日志中添加日誌信息 LOG.info("Building program from JAR file"); // 根據運行選項來建立該程序 program = buildProgram(runOptions); } catch (FileNotFoundException e) { throw new CliArgsException("Could not build the program from JAR file.", e); } // 獲取活躍的客戶端命令行 final CustomCommandLine<?> customCommandLine = getActiveCustomCommandLine(commandLine); try { runProgram(customCommandLine, commandLine, runOptions, program); } finally { // 最終, 刪除提取出的資料 program.deleteExtractedLibraries(); } } // 運行程序方法: 參數: 客戶端命令行, 命令行, 運行選項, 打包的程序 // 會跑出程序調用異常 private <T> void runProgram( CustomCommandLine<T> customCommandLine, CommandLine commandLine, RunOptions runOptions, PackagedProgram program) throws ProgramInvocationException, FlinkException { // 經過命令行獲取集羣的描述器 final ClusterDescriptor<T> clusterDescriptor = customCommandLine.createClusterDescriptor(commandLine); try { // 經過命令行獲取集羣的Id final T clusterId = customCommandLine.getClusterId(commandLine); // 集羣的客戶 final ClusterClient<T> client; // directly deploy the job if the cluster is started in job mode and detached // 若是集羣已經以工做模式啓動而且主線程與子線程分離(分離狀態), 那麼直接部署任務就好了 // 若是集羣id爲空 且 集羣已經處於分離狀態 if (clusterId == null && runOptions.getDetachedMode()) { // 若是運行選項的並行數爲 -1, 就將並行數設置爲默認並行數, 不然就使用當前選項的並行數 int parallelism = runOptions.getParallelism() == -1 ? defaultParallelism : runOptions.getParallelism(); // 根據打包的程序, 配置文件 和 並行數 建立任務圖 final JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, configuration, parallelism); // 經過命令行獲取最終的集羣規格 final ClusterSpecification clusterSpecification = customCommandLine.getClusterSpecification(commandLine); // 經過集羣描述器部署集羣 client = clusterDescriptor.deployJobCluster( clusterSpecification, jobGraph, runOptions.getDetachedMode()); // 分離模式 // 登陸並打印輸出 任務已被提交 logAndSysout("Job has been submitted with JobID " + jobGraph.getJobID()); try { // 關閉客戶端 client.close(); } catch (Exception e) { LOG.info("Could not properly shut down the client.", e); } } else { // 關閉掛鉤: 用於序退出的時候,作一些Check,保證已經開始的操做X的原子性, 優雅地關閉程序 // 當程序即將退出時,查看當前是否有操做X在執行中: // 若是有,等待其完成而後退出。且期間再也不接受新的操做X。若是操做X執行之間過長,終止並回滾全部狀態。 // 若是沒有,則能夠當即退出。 final Thread shutdownHook; if (clusterId != null) { client = clusterDescriptor.retrieve(clusterId); shutdownHook = null; } else { // also in job mode we have to deploy a session cluster because the job might consist of multiple parts (e.g. when using collect) // 在任務模式下, 因爲任務可能由許多部分組成, 咱們必須部署一個會話集羣 // 集羣規格 final ClusterSpecification clusterSpecification = customCommandLine.getClusterSpecification(commandLine); // 部署會話集羣 client = clusterDescriptor.deploySessionCluster(clusterSpecification); // if not running in detached mode, add a shutdown hook to shut down cluster if client exits // 若是沒有在分離狀態下運行, 當客戶端存在時, 添加一個關閉掛鉤來關閉集羣 // there's a race-condition here if cli is killed before shutdown hook is installed // 若是客戶端在被打包到本地倉庫以前就已經被殺死, 就會產生一個競態(爭) 條件 // 若是分離狀態模式不存在 且 運行選項已經經過相關聯的exit方法關閉 if (!runOptions.getDetachedMode() && runOptions.isShutdownOnAttachedExit()) { // 經過客戶端的要關閉的集羣名稱, 客戶端類的名稱 和日誌 來初始化 關閉掛鉤 shutdownHook = ShutdownHookUtil.addShutdownHook(client::shutDownCluster, client.getClass().getSimpleName(), LOG); } else { // 不然就將關閉掛鉤設置爲空 shutdownHook = null; } } try { // 設置分離狀態 client.setDetached(runOptions.getDetachedMode()); // 根據保存點配置進行debug, 並寫入到日誌 LOG.debug("{}", runOptions.getSavepointRestoreSettings()); // 用戶並行量 int userParallelism = runOptions.getParallelism(); LOG.debug("User parallelism is set to {}", userParallelism); // 若是用戶並行量等於執行配置類中的默認並行量 if (ExecutionConfig.PARALLELISM_DEFAULT == userParallelism) { // 就將用戶並行量設置成默認並行量 userParallelism = defaultParallelism; } // 執行程序 executeProgram(program, client, userParallelism); } finally { // 若是集羣id爲空 而且 沒有被分離 if (clusterId == null && !client.isDetached()) { // terminate the cluster only if we have started it before and if it's not detached // 只有在集羣已經啓動過 且 它沒有被分離 的狀況下才終止集羣 try { client.shutDownCluster(); } catch (final Exception e) { LOG.info("Could not properly terminate the Flink cluster.", e); } if (shutdownHook != null) { // we do not need the hook anymore as we have just tried to shutdown the cluster. // ∵咱們已經嘗試過關閉集羣了, ∴ 咱們不須要關閉掛鉤了(將它移除) ShutdownHookUtil.removeShutdownHook(shutdownHook, client.getClass().getSimpleName(), LOG); } } try { client.close(); } catch (Exception e) { LOG.info("Could not properly shut down the client.", e); } } } } finally { try { clusterDescriptor.close(); } catch (Exception e) { LOG.info("Could not properly close the cluster descriptor.", e); } } } /** * Executes the info action. * 執行通知行爲 * * @param args Command line arguments for the info action. */ protected void info(String[] args) throws CliArgsException, FileNotFoundException, ProgramInvocationException { LOG.info("Running 'info' command."); // 經過客戶端前端解析器獲取通知相關命令行選項 final Options commandOptions = CliFrontendParser.getInfoCommandOptions(); // 經過客戶端前端解析器解析命令行 true|false: 是否在沒有命令行選項時中止解析 final CommandLine commandLine = CliFrontendParser.parse(commandOptions, args, true); InfoOptions infoOptions = new InfoOptions(commandLine); // evaluate help flag 評估幫助標籤 if (infoOptions.isPrintHelp()) { CliFrontendParser.printHelpForInfo(); return; } // 若是通知選項的jar文件路徑爲空 if (infoOptions.getJarFilePath() == null) { throw new CliArgsException("The program JAR file was not specified."); } // -------- build the packaged program ------------- // 建立打包的程序 LOG.info("Building program from JAR file"); final PackagedProgram program = buildProgram(infoOptions); try { // 獲取並行量 int parallelism = infoOptions.getParallelism(); // 若是執行配置文件的默認並行量與該並行量一致, 就將其設置爲默認並行量 if (ExecutionConfig.PARALLELISM_DEFAULT == parallelism) { parallelism = defaultParallelism; } LOG.info("Creating program plan dump"); // 建立 程序的 進程的內存鏡像(dump) // 經過 數據統計, 默認開銷預估器 和 配置文件 來初始化 優化程序 Optimizer compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), configuration); // 經過 優化程序, 打包的程序, 並行數 來 初始化 flinkr任務 FlinkPlan flinkPlan = ClusterClient.getOptimizedPlan(compiler, program, parallelism); // json任務 String jsonPlan = null; // 若是flink任務 是 優化過的任務類 if (flinkPlan instanceof OptimizedPlan) { // 經過json格式的plan dump(計劃程序內存鏡像) 將優化器計劃轉化爲JSON格式 jsonPlan = new PlanJSONDumpGenerator().getOptimizerPlanAsJSON((OptimizedPlan) flinkPlan); // 若是flink 任務 爲流式計算任務 } else if (flinkPlan instanceof StreamingPlan) { // 將流式任務轉化爲JSON格式 jsonPlan = ((StreamingPlan) flinkPlan).getStreamingPlanAsJSON(); } // 若是json計劃不爲空, 就執行文件 if (jsonPlan != null) { System.out.println("----------------------- Execution Plan -----------------------"); System.out.println(jsonPlan); System.out.println("--------------------------------------------------------------"); } else { System.out.println("JSON plan could not be generated."); } // 獲取程序狀態的描述 String description = program.getDescription(); // 若是描述不爲空 if (description != null) { System.out.println(); System.out.println(description); } // 若是描述爲空 else { System.out.println(); System.out.println("No description provided."); } } finally { // 刪除提取出的資料 program.deleteExtractedLibraries(); } } /** * Executes the list action. 執行計劃列表 * * @param args Command line arguments for the list action. * args 爲列表行爲的參數 */ protected void list(String[] args) throws Exception { LOG.info("Running 'list' command."); // 經過客戶端前端解析器獲取命令行選項 final Options commandOptions = CliFrontendParser.getListCommandOptions(); // 經過客戶端前端解析器 合併 命令行選項 和 客戶端命令行選項 final Options commandLineOptions = CliFrontendParser.mergeOptions(commandOptions, customCommandLineOptions); // 經過命令解析器來解析命令行選項 false爲若是沒有選項是否中止解析的標籤 final CommandLine commandLine = CliFrontendParser.parse(commandLineOptions, args, false); // 初始化列表選項 ListOptions listOptions = new ListOptions(commandLine); // evaluate help flag 評估幫助標籤 // 若是選項是打印幫助, 就經過解析器打印幫助列表 if (listOptions.isPrintHelp()) { CliFrontendParser.printHelpForList(customCommandLines); return; } // 是否展現運行 final boolean showRunning; // 展現是否已有安排 final boolean showScheduled; // 是否展現全部 final boolean showAll; // print running and scheduled jobs if not option supplied // 若是沒有提供選項, 默認會打印運行狀態和已安排的任務, 不然就按照配置的選項執行 if (!listOptions.showRunning() && !listOptions.showScheduled() && !listOptions.showAll()) { showRunning = true; showScheduled = true; showAll = false; } else { showRunning = listOptions.showRunning(); showScheduled = listOptions.showScheduled(); showAll = listOptions.showAll(); } // 獲取活躍的命令行 final CustomCommandLine<?> activeCommandLine = getActiveCustomCommandLine(commandLine); // 運行集羣的行爲 runClusterAction( activeCommandLine, commandLine, clusterClient -> listJobs(clusterClient, showRunning, showScheduled, showAll)); } // 任務列表 private <T> void listJobs( // 集羣客戶端 ClusterClient<T> clusterClient, boolean showRunning, boolean showScheduled, boolean showAll) throws FlinkException { // 自定任務狀態信息的一個集合: 任務詳情 Collection<JobStatusMessage> jobDetails; try { /** * Future: 異步計算的將來結果 * CompletableFuture能夠用於建立異步調用集羣的列表任務 * JDK5新增了Future接口,用於描述一個異步計算的結果。雖然 Future 以及相關使用方法提供了異步執行任務的能力,可是對於結果的獲取倒是很不方便,只能經過阻塞或者輪詢的方式獲得任務的結果。阻塞的方式顯然和咱們的異步編程的初衷相違背,輪詢的方式又會耗費無謂的 CPU 資源,並且也不能及時地獲得計算結果。 * 在Java8中,CompletableFuture提供了很是強大的Future的擴展功能,能夠幫助咱們簡化異步編程的複雜性,而且提供了函數式編程的能力,能夠經過回調的方式處理計算結果,也提供了轉換和組合 CompletableFuture 的方法 */ CompletableFuture<Collection<JobStatusMessage>> jobDetailsFuture = clusterClient.listJobs(); // 登陸並打印 logAndSysout("Waiting for response..."); // 從future中獲取任務細節 jobDetails = jobDetailsFuture.get(); } catch (Exception e) { // 經過剝離執行異常來獲取致使該異常的緣由 Throwable cause = ExceptionUtils.stripExecutionException(e); throw new FlinkException("Failed to retrieve job list.", cause); } LOG.info("Successfully retrieved list of jobs"); // 運行的任務列表 final List<JobStatusMessage> runningJobs = new ArrayList<>(); // 已經安排好的任務列表 final List<JobStatusMessage> scheduledJobs = new ArrayList<>(); // 已經終止的任務列表 final List<JobStatusMessage> terminatedJobs = new ArrayList<>(); // 遍歷任務細節列表 jobDetails.forEach(details -> { // 若是該任務細節的任務狀態爲初始狀態 if (details.getJobState() == JobStatus.CREATED) { // 就將其添加到已安排好的任務列表 scheduledJobs.add(details); // 若是 該任務細節的任務狀態不是全局終止狀態 } else if (!details.getJobState().isGloballyTerminalState()) { // 就將該任務細節添加至運行中的任務列表中 runningJobs.add(details); } else { // 不然就將該任務添加至終止的任務列表中 terminatedJobs.add(details); } }); // 若是展現運行標籤爲 true 或 展現全部標籤爲 true if (showRunning || showAll) { // 若是任務運行列表爲空 if (runningJobs.size() == 0) { System.out.println("No running jobs."); } else { System.out.println("------------------ Running/Restarting Jobs -------------------"); // 打印運行中的任務狀態信息 printJobStatusMessages(runningJobs); System.out.println("--------------------------------------------------------------"); } } // 若是展現已安排的任務標籤爲 true 或 展現全部的任務標籤爲 true if (showScheduled || showAll) { // 若是已安排好的任務列表爲空 if (scheduledJobs.size() == 0) { System.out.println("No scheduled jobs."); } else { System.out.println("----------------------- Scheduled Jobs -----------------------"); // 打印已安排好的任務列表的狀態信息 printJobStatusMessages(scheduledJobs); System.out.println("--------------------------------------------------------------"); } } // 若是顯示全部的標籤爲 true if (showAll) { // 若是終止的任務列表不爲空 if (terminatedJobs.size() != 0) { System.out.println("---------------------- Terminated Jobs -----------------------"); // 就打印終止的任務列表的狀態信息 printJobStatusMessages(terminatedJobs); System.out.println("--------------------------------------------------------------"); } } } // 打印任務狀態信息 private static void printJobStatusMessages(List<JobStatusMessage> jobs) { // 簡單日期類型 SimpleDateFormat dateFormat = new SimpleDateFormat("dd.MM.yyyy HH:mm:ss"); // 比較任務狀態信息的起始時間 Comparator<JobStatusMessage> startTimeComparator = (o1, o2) -> (int) (o1.getStartTime() - o2.getStartTime()); // 以 任務狀態爲key, 任務狀態信息列表爲value的Map的 比較器 // CASE_INSENSITIVE_ORDER 非敏感命令狀態比較器, 位於JAVA的String類中 // CaseInsensitiveComparator是排序方法的實現類,在compare中先按照字符串長度取短的那個字符串的長度做爲條件,而後循環判斷兩個字符串的第一個字符的ASCII碼大小,作出遞增排序,若是兩個字符串第一個字符的ASCII碼一致,則判斷第二個字符,以此類推,經過這種方式將字符串經過首字母的ASCII碼進行排序。 Comparator<Map.Entry<JobStatus, List<JobStatusMessage>>> statusComparator = (o1, o2) -> String.CASE_INSENSITIVE_ORDER.compare(o1.getKey().toString(), o2.getKey().toString()); // 獲取匹配任務狀態和任務狀態信息列表的map, 再經過util包中的stream類(流, jdk1.8新特性)中收集方法來收集根據任務狀態進行分組後的收集器類。實際上是將分組後的數據打包成一個收集器對象, 再經過collect方法對它進行解包。 Map<JobStatus, List<JobStatusMessage>> jobsByState = jobs.stream().collect(Collectors.groupingBy(JobStatusMessage::getJobState)); // 對任務狀態根據狀態比較器進行狀態比較 // 再獲取排序後的map的value // 再遍歷扁平化的map列表, 獲取流並根據起始時間比較器進行排序 jobsByState.entrySet().stream() .sorted(statusComparator) .map(Map.Entry::getValue).flatMap(List::stream).sorted(startTimeComparator) .forEachOrdered(job -> System.out.println(dateFormat.format(new Date(job.getStartTime())) + " : " + job.getJobId() + " : " + job.getJobName() + " (" + job.getJobState() + ")")); } /** * Executes the STOP action. * 執行中止行爲 * @param args Command line arguments for the stop action. */ protected void stop(String[] args) throws Exception { LOG.info("Running 'stop-with-savepoint' command."); // 經過客戶端前端解析器獲取命令行選項 final Options commandOptions = CliFrontendParser.getStopCommandOptions(); // 經過客戶端前端解析器 合併 命令行選項 和 客戶端命令行選項 final Options commandLineOptions = CliFrontendParser.mergeOptions(commandOptions, customCommandLineOptions); // 經過命令解析器來解析命令行選項 false爲若是沒有選項是否中止解析的標籤 final CommandLine commandLine = CliFrontendParser.parse(commandLineOptions, args, false); // 初始化中止選項 final StopOptions stopOptions = new StopOptions(commandLine); // 若是選項是打印幫助 if (stopOptions.isPrintHelp()) { CliFrontendParser.printHelpForStop(customCommandLines); return; } // 已清理的參數數組 final String[] cleanedArgs = stopOptions.getArgs(); // 獲取目標目錄 若是若是中止選項有保存點標籤 且 已清理的參數不爲空, // 就獲取目標目錄, 不然返回空 final String targetDirectory = stopOptions.hasSavepointFlag() && cleanedArgs.length > 0 ? stopOptions.getTargetDirectory() : null; // the default savepoint location is going to be used in this case. // 返回空的狀況下會使用默認的保存點 // 若是清理的參數列表不等於0, jobId就等於該列表的第一個字符串的解析結果, 不然就等於中止選項的目標目錄的解析結果 final JobID jobId = cleanedArgs.length != 0 ? parseJobId(cleanedArgs[0]) : parseJobId(stopOptions.getTargetDirectory()); // 獲取 是否要設置提前結束事件的時間 的 標籤 final boolean advanceToEndOfEventTime = stopOptions.shouldAdvanceToEndOfEventTime(); logAndSysout((advanceToEndOfEventTime ? "Draining job " : "Suspending job ") + "\"" + jobId + "\" with a savepoint."); // 獲取活躍的消費者命令行 final CustomCommandLine<?> activeCommandLine = getActiveCustomCommandLine(commandLine); // 執行集羣行爲 runClusterAction( activeCommandLine, commandLine, clusterClient -> { final String savepointPath; try { // 根據任務id, 是否提早結束任務 和 目標文件目錄 獲取保存點路徑 savepointPath = clusterClient.stopWithSavepoint(jobId, advanceToEndOfEventTime, targetDirectory); } catch (Exception e) { throw new FlinkException("Could not stop with a savepoint job \"" + jobId + "\".", e); } logAndSysout("Savepoint completed. Path: " + savepointPath); }); } /** * Executes the CANCEL action. * 執行取消操做 * * @param args Command line arguments for the cancel action. */ protected void cancel(String[] args) throws Exception { LOG.info("Running 'cancel' command."); // 經過客戶端前端解析器獲取命令行選項 final Options commandOptions = CliFrontendParser.getCancelCommandOptions(); // 經過客戶端前端解析器 合併 命令行選項 和 客戶端命令行選項 final Options commandLineOptions = CliFrontendParser.mergeOptions(commandOptions, customCommandLineOptions); // 經過命令解析器來解析命令行選項 false爲若是沒有選項是否中止解析的標籤 final CommandLine commandLine = CliFrontendParser.parse(commandLineOptions, args, false); // 取消選項 CancelOptions cancelOptions = new CancelOptions(commandLine); // evaluate help flag 評估 幫助標籤 if (cancelOptions.isPrintHelp()) { CliFrontendParser.printHelpForCancel(customCommandLines); return; } // 獲取活躍的客戶端命令行 final CustomCommandLine<?> activeCommandLine = getActiveCustomCommandLine(commandLine); // 獲取已清理的參數 final String[] cleanedArgs = cancelOptions.getArgs(); // 若是取消選項有保存點 if (cancelOptions.isWithSavepoint()) { // 這個過期警告就是個唬人的, 照樣能執行...... logAndSysout("DEPRECATION WARNING: Cancelling a job with savepoint is deprecated. Use \"stop\" instead."); // 任務 ID final JobID jobId; // 目標文件目錄 final String targetDirectory; // 若是任務參數不爲空 if (cleanedArgs.length > 0) { // 就將已清除的參數數組的第一個字符串進行解析獲取任務id jobId = parseJobId(cleanedArgs[0]); // 獲取保存點目標目錄 targetDirectory = cancelOptions.getSavepointTargetDirectory(); } else { // 不然就將任務id設置爲取消選項的保存點目標文件目錄 jobId = parseJobId(cancelOptions.getSavepointTargetDirectory()); // 將目標目錄設置爲空 targetDirectory = null; } // 若是目標目錄爲空 if (targetDirectory == null) { logAndSysout("Cancelling job " + jobId + " with savepoint to default savepoint directory."); } else { logAndSysout("Cancelling job " + jobId + " with savepoint to " + targetDirectory + '.'); } // 運行集羣行爲(保存點) runClusterAction( activeCommandLine, commandLine, clusterClient -> { final String savepointPath; try { savepointPath = clusterClient.cancelWithSavepoint(jobId, targetDirectory); } catch (Exception e) { throw new FlinkException("Could not cancel job " + jobId + '.', e); } logAndSysout("Cancelled job " + jobId + ". Savepoint stored in " + savepointPath + '.'); }); } else { final JobID jobId; // 若是清理的參數不爲空 if (cleanedArgs.length > 0) { jobId = parseJobId(cleanedArgs[0]); } else { throw new CliArgsException("Missing JobID. Specify a JobID to cancel a job."); } logAndSysout("Cancelling job " + jobId + '.'); // 運行集羣行爲(取消任務) runClusterAction( activeCommandLine, commandLine, clusterClient -> { try { clusterClient.cancel(jobId); } catch (Exception e) { throw new FlinkException("Could not cancel job " + jobId + '.', e); } }); logAndSysout("Cancelled job " + jobId + '.'); } } /** * Executes the SAVEPOINT action. * 執行保存點行爲 * @param args Command line arguments for the savepoint action. */ protected void savepoint(String[] args) throws Exception { LOG.info("Running 'savepoint' command."); // 經過客戶端前端解析器獲取保存點命令選項 final Options commandOptions = CliFrontendParser.getSavepointCommandOptions(); // 經過客戶端前端解析器 合併 命令行選項 和 客戶端命令行選項 final Options commandLineOptions = CliFrontendParser.mergeOptions(commandOptions, customCommandLineOptions); // 經過命令解析器來解析命令行選項 false爲若是沒有選項是否中止解析的標籤 final CommandLine commandLine = CliFrontendParser.parse(commandLineOptions, args, false); // 初始化保存點選項 final SavepointOptions savepointOptions = new SavepointOptions(commandLine); // evaluate help flag 評估幫助標識 if (savepointOptions.isPrintHelp()) { CliFrontendParser.printHelpForSavepoint(customCommandLines); return; } // 獲取活躍的客戶端命令行 final CustomCommandLine<?> activeCommandLine = getActiveCustomCommandLine(commandLine); // 若是保存點選項的是否釋放本機資源標籤爲 true if (savepointOptions.isDispose()) { runClusterAction( activeCommandLine, commandLine, // 向任務管理器發送釋放保存點資源請求 clusterClient -> disposeSavepoint(clusterClient, savepointOptions.getSavepointPath())); } else { // 獲取已清理的參數 String[] cleanedArgs = savepointOptions.getArgs(); final JobID jobId; // 若是已清理的參數不爲空 且 數組長度大於等於1 if (cleanedArgs.length >= 1) { // 就將任務Id 設置爲已清理的參數String數組的第一個參數 String jobIdString = cleanedArgs[0]; // 解析該String類的任務Id成任務Id // parseJobId內部調用JobID.fromHexString方法將16進制字符串轉化爲byte數組並傳入JobId構造器來初始化JobId對象 jobId = parseJobId(jobIdString); } else { throw new CliArgsException("Missing JobID. " + "Specify a Job ID to trigger a savepoint."); } // 保存點目錄 final String savepointDirectory; // 若是已清理的參數數組長度大於等於2 if (cleanedArgs.length >= 2) { // 就將任務Id 設置爲已清理的參數String數組的第二個參數 savepointDirectory = cleanedArgs[1]; } else { savepointDirectory = null; } // Print superfluous arguments 打印多餘的參數 if (cleanedArgs.length >= 3) { logAndSysout("Provided more arguments than required. Ignoring not needed arguments."); } // 運行集羣行爲 runClusterAction( activeCommandLine, commandLine, clusterClient -> triggerSavepoint(clusterClient, jobId, savepointDirectory)); } } /** * Sends a SavepointTriggerMessage to the job manager. * 將保存點啓動信息發送給任務管理器 */ private String triggerSavepoint(ClusterClient<?> clusterClient, JobID jobId, String savepointDirectory) throws FlinkException { logAndSysout("Triggering savepoint for job " + jobId + '.'); // 這裏又使用了CompletableFuture來實現異步調用 CompletableFuture<String> savepointPathFuture = clusterClient.triggerSavepoint(jobId, savepointDirectory); logAndSysout("Waiting for response..."); final String savepointPath; try { // 從future中獲取保存點路徑 savepointPath = savepointPathFuture.get(); } catch (Exception e) { Throwable cause = ExceptionUtils.stripExecutionException(e); throw new FlinkException("Triggering a savepoint for the job " + jobId + " failed.", cause); } logAndSysout("Savepoint completed. Path: " + savepointPath); logAndSysout("You can resume your program from this savepoint with the run command."); return savepointPath; } /** * Sends a SavepointDisposalRequest to the job manager. */ private void disposeSavepoint(ClusterClient<?> clusterClient, String savepointPath) throws FlinkException { Preconditions.checkNotNull(savepointPath, "Missing required argument: savepoint path. " + "Usage: bin/flink savepoint -d <savepoint-path>"); logAndSysout("Disposing savepoint '" + savepointPath + "'."); final CompletableFuture<Acknowledge> disposeFuture = clusterClient.disposeSavepoint(savepointPath); logAndSysout("Waiting for response..."); try { disposeFuture.get(clientTimeout.toMillis(), TimeUnit.MILLISECONDS); } catch (Exception e) { throw new FlinkException("Disposing the savepoint '" + savepointPath + "' failed.", e); } logAndSysout("Savepoint '" + savepointPath + "' disposed."); } // -------------------------------------------------------------------------------------------- // Interaction with programs and JobManager // 與程序和任務管理器交互 // -------------------------------------------------------------------------------------------- protected void executeProgram(PackagedProgram program, ClusterClient<?> client, int parallelism) throws ProgramMissingJobException, ProgramInvocationException { logAndSysout("Starting execution of program"); final JobSubmissionResult result = client.run(program, parallelism); if (null == result) { throw new ProgramMissingJobException("No JobSubmissionResult returned, please make sure you called " + "ExecutionEnvironment.execute()"); } if (result.isJobExecutionResult()) { logAndSysout("Program execution finished"); JobExecutionResult execResult = result.getJobExecutionResult(); System.out.println("Job with JobID " + execResult.getJobID() + " has finished."); System.out.println("Job Runtime: " + execResult.getNetRuntime() + " ms"); Map<String, Object> accumulatorsResult = execResult.getAllAccumulatorResults(); if (accumulatorsResult.size() > 0) { System.out.println("Accumulator Results: "); System.out.println(AccumulatorHelper.getResultsFormatted(accumulatorsResult)); } } else { logAndSysout("Job has been submitted with JobID " + result.getJobID()); } } /** * Creates a Packaged program from the given command line options. * 根據給出的命令行選項建立一個打包好的程序 * @return A PackagedProgram (upon success) */ PackagedProgram buildProgram(ProgramOptions options) throws FileNotFoundException, ProgramInvocationException { String[] programArgs = options.getProgramArgs(); String jarFilePath = options.getJarFilePath(); List<URL> classpaths = options.getClasspaths(); // Get assembler class // 獲取彙編程序類 String entryPointClass = options.getEntryPointClassName(); File jarFile = null; if (options.isPython()) { // If the job is specified a jar file if (jarFilePath != null) { jarFile = getJarFile(jarFilePath); } // If the job is Python Shell job, the entry point class name is PythonGateWayServer. // 若是任務是python shell 任務, 進入點的類名應該爲 Python 網關服務器 // Otherwise, the entry point class of python job is PythonDriver // 不然, 進入點的類名稱應該爲Python 驅動器 if (entryPointClass == null) { entryPointClass = "org.apache.flink.client.python.PythonDriver"; } } else { if (jarFilePath == null) { throw new IllegalArgumentException("Java program should be specified a JAR file."); } jarFile = getJarFile(jarFilePath); } PackagedProgram program = entryPointClass == null ? new PackagedProgram(jarFile, classpaths, programArgs) : new PackagedProgram(jarFile, classpaths, entryPointClass, programArgs); program.setSavepointRestoreSettings(options.getSavepointRestoreSettings()); return program; } /** * Gets the JAR file from the path. * 經過路徑獲取jar文件 * * @param jarFilePath The path of JAR file * @return The JAR file * @throws FileNotFoundException The JAR file does not exist. */ private File getJarFile(String jarFilePath) throws FileNotFoundException { File jarFile = new File(jarFilePath); // Check if JAR file exists // 檢查 jar 文件是否存在, 是否爲文件類 if (!jarFile.exists()) { throw new FileNotFoundException("JAR file does not exist: " + jarFile); } else if (!jarFile.isFile()) { throw new FileNotFoundException("JAR file is not a file: " + jarFile); } return jarFile; } // -------------------------------------------------------------------------------------------- // Logging and Exception Handling // 日誌和異常處理 // -------------------------------------------------------------------------------------------- /** * Displays an exception message for incorrect command line arguments. * 展現不正確命令行參數致使的異常消息 * * @param e The exception to display. * @return The return code for the process. */ private static int handleArgException(CliArgsException e) { LOG.error("Invalid command line arguments.", e); System.out.println(e.getMessage()); System.out.println(); System.out.println("Use the help option (-h or --help) to get help on the command."); return 1; } /** * Displays an optional exception message for incorrect program parametrization. * 展現了不正確的程序參數化會致使的選擇異常信息 * @param e The exception to display. * @return The return code for the process. */ private static int handleParametrizationException(ProgramParametrizationException e) { LOG.error("Program has not been parametrized properly.", e); System.err.println(e.getMessage()); return 1; } /** * Displays a message for a program without a job to execute. * 展現了沒有任務執行會致使的錯誤消息 * @return The return code for the process. */ private static int handleMissingJobException() { System.err.println(); System.err.println("The program didn't contain a Flink job. " + "Perhaps you forgot to call execute() on the execution environment."); return 1; } /** * Displays an exception message. * * 展現異常消息 * @param t The exception to display. * @return The return code for the process. */ // 處理錯誤 private static int handleError(Throwable t) { LOG.error("Error while running the command.", t); System.err.println(); System.err.println("------------------------------------------------------------"); System.err.println(" The program finished with the following exception:"); System.err.println(); // 若是致使異常的緣由是無效的程序異常 if (t.getCause() instanceof InvalidProgramException) { // 打印錯誤消息 System.err.println(t.getCause().getMessage()); // 獲取棧追蹤信息(數組) StackTraceElement[] trace = t.getCause().getStackTrace(); // 遍歷棧追蹤元素數組 for (StackTraceElement ele: trace) { // 打印元素信息 System.err.println("\t" + ele); // 若是遍歷到了main方法, 就退出 if (ele.getMethodName().equals("main")) { break; } } } else { t.printStackTrace(); } return 1; } // 寫入日誌並打印 private static void logAndSysout(String message) { LOG.info(message); System.out.println(message); } // -------------------------------------------------------------------------------------------- // Internal methods 內部方法 // -------------------------------------------------------------------------------------------- // 解析任務Id private JobID parseJobId(String jobIdString) throws CliArgsException { if (jobIdString == null) { throw new CliArgsException("Missing JobId"); } final JobID jobId; try { // 將16進制的任務id字符串轉化爲Byte數組再傳入JobId構造器中初始化JobId對象 jobId = JobID.fromHexString(jobIdString); } catch (IllegalArgumentException e) { throw new CliArgsException(e.getMessage()); } return jobId; } /** * Retrieves the {@link ClusterClient} from the given {@link CustomCommandLine} and runs the given {@link ClusterAction} against it. * 從已得的客戶端命令行選項取回集羣客戶端並運行已獲取的集羣行爲與它進行競爭 * * @param activeCommandLine to create the {@link ClusterDescriptor} from * 集羣描述器由參數活躍的命令行建立 * @param commandLine containing the parsed command line options * commandLine參數 包含 解析過的命令行選項 * * @param clusterAction the cluster action to run against the retrieved {@link ClusterClient}. * 集羣行爲參數爲與取回的集羣客戶端進行競爭 * @param <T> type of the cluster id 集羣類型 * @throws FlinkException if something goes wrong 異常 */ private <T> void runClusterAction(CustomCommandLine<T> activeCommandLine, CommandLine commandLine, ClusterAction<T> clusterAction) throws FlinkException { final ClusterDescriptor<T> clusterDescriptor = activeCommandLine.createClusterDescriptor(commandLine); // 獲取集羣Id final T clusterId = activeCommandLine.getClusterId(commandLine); // 若是集羣id'爲空 if (clusterId == null) { throw new FlinkException("No cluster id was specified. Please specify a cluster to which " + "you would like to connect."); } else { try { // 經過集羣描述器獲取集羣客戶端 final ClusterClient<T> clusterClient = clusterDescriptor.retrieve(clusterId); try { clusterAction.runAction(clusterClient); } finally { try { clusterClient.close(); } catch (Exception e) { LOG.info("Could not properly shut down the cluster client.", e); } } } finally { try { clusterDescriptor.close(); } catch (Exception e) { LOG.info("Could not properly close the cluster descriptor.", e); } } } } /** * Internal interface to encapsulate cluster actions which are executed via the {@link ClusterClient}. * 封裝了集羣內部行爲的內部接口, 能夠經過集羣客戶端執行 * * @param <T> type of the cluster id */ @FunctionalInterface private interface ClusterAction<T> { /** * Run the cluster action with the given {@link ClusterClient}. * 根據已得的集羣客戶端來運行集羣行爲 * @param clusterClient to run the cluster action against * @throws FlinkException if something goes wrong */ void runAction(ClusterClient<T> clusterClient) throws FlinkException; } // -------------------------------------------------------------------------------------------- // Entry point for executable 可執行程序的進入點 // -------------------------------------------------------------------------------------------- /** * Parses the command line arguments and starts the requested action. * 解析命令行參數並開啓請求的行爲 * * @param args command line arguments of the client. * @return The return code of the program */ public int parseParameters(String[] args) { // check for action 檢查行爲(是否存在) if (args.length < 1) { CliFrontendParser.printHelp(customCommandLines); System.out.println("Please specify an action."); return 1; } // get action 獲取行爲 String action = args[0]; // remove action from parameters 從參數中移除行爲 final String[] params = Arrays.copyOfRange(args, 1, args.length); try { // do action 執行行爲 run,list,inform,cancel,stop,savepoint, // -h,--help, -v, --version, 默認爲無效行爲, 並打印有效行爲選項 switch (action) { case ACTION_RUN: run(params); return 0; case ACTION_LIST: list(params); return 0; case ACTION_INFO: info(params); return 0; case ACTION_CANCEL: cancel(params); return 0; case ACTION_STOP: stop(params); return 0; case ACTION_SAVEPOINT: savepoint(params); return 0; case "-h": case "--help": CliFrontendParser.printHelp(customCommandLines); return 0; case "-v": case "--version": String version = EnvironmentInformation.getVersion(); String commitID = EnvironmentInformation.getRevisionInformation().commitId; System.out.print("Version: " + version); System.out.println(commitID.equals(EnvironmentInformation.UNKNOWN) ? "" : ", Commit ID: " + commitID); return 0; default: System.out.printf("\"%s\" is not a valid action.\n", action); System.out.println(); System.out.println("Valid actions are \"run\", \"list\", \"info\", \"savepoint\", \"stop\", or \"cancel\"."); System.out.println(); System.out.println("Specify the version option (-v or --version) to print Flink version."); System.out.println(); System.out.println("Specify the help option (-h or --help) to get help on the command."); return 1; } } catch (CliArgsException ce) { return handleArgException(ce); } catch (ProgramParametrizationException ppe) { return handleParametrizationException(ppe); } catch (ProgramMissingJobException pmje) { return handleMissingJobException(); } catch (Exception e) { return handleError(e); } } /** * Submits the job based on the arguments. * 基於參數提交任務 */ public static void main(final String[] args) { EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args); // 1. find the configuration directory // 查找配置文件目錄 final String configurationDirectory = getConfigurationDirectoryFromEnv(); // 2. load the global configuration // 加載全局配置 final Configuration configuration = GlobalConfiguration.loadConfiguration(configurationDirectory); // 3. load the custom command lines // 加載客戶端命令行 final List<CustomCommandLine<?>> customCommandLines = loadCustomCommandLines( configuration, configurationDirectory); try { final CliFrontend cli = new CliFrontend( configuration, customCommandLines); // 經過安全工具包將安全配置文件存儲到本地maven倉庫 SecurityUtils.install(new SecurityConfiguration(cli.configuration)); // 安全運行參數解析並獲取返回的代碼, runSecured方法可能須要實現Callable接口 int retCode = SecurityUtils.getInstalledContext() .runSecured(() -> cli.parseParameters(args)); System.exit(retCode); } catch (Throwable t) { final Throwable strippedThrowable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class); // 打印: 運行命令行時發生致命的錯誤:xxx LOG.error("Fatal error while running command line interface.", strippedThrowable); strippedThrowable.printStackTrace(); System.exit(31); } } // -------------------------------------------------------------------------------------------- // Miscellaneous Utilities // 各類各樣的工具包 // -------------------------------------------------------------------------------------------- // 從環境中獲取配置文件目錄 public static String getConfigurationDirectoryFromEnv() { // 獲取環境位置 String location = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR); // 若是位置不爲空 if (location != null) { // 若是該位置有文件存在, 就返回該位置, 不然就報運行時錯誤 if (new File(location).exists()) { return location; } else { throw new RuntimeException("The configuration directory '" + location + "', specified in the '" + ConfigConstants.ENV_FLINK_CONF_DIR + "' environment variable, does not exist."); } } // 若是位置爲空但存在應變計劃1 else if (new File(CONFIG_DIRECTORY_FALLBACK_1).exists()) { // 將位置設置爲應變計劃1 location = CONFIG_DIRECTORY_FALLBACK_1; } // 若是位置爲空但存在應變計劃2 else if (new File(CONFIG_DIRECTORY_FALLBACK_2).exists()) { // 將位置設置爲應變計劃2 location = CONFIG_DIRECTORY_FALLBACK_2; } // 若是位置爲空且沒有應變計劃1和應變計劃, 就拋出運行時異常 else { throw new RuntimeException("The configuration directory was not specified. " + "Please specify the directory containing the configuration file through the '" + ConfigConstants.ENV_FLINK_CONF_DIR + "' environment variable."); } return location; } /** * Writes the given job manager address to the associated configuration object. * 將已得的任務管理器地址寫入到相關聯的配置文件對象 * * @param address Address to write to the configuration * @param config The configuration to write to */ static void setJobManagerAddressInConfig(Configuration config, InetSocketAddress address) { // 我其實很不明白它爲什麼要用setString, setInteger來命名這樣的方法, 明明內部是將任務管理器地址 和 剩餘的選項 的地址 和 端口的 k,v信息 存儲到配置文件中, 取這樣的名字比較讓人困惑, 並且design pattern中說過 儘可能少用裝飾器(Decorator) 模式, 容易形成困惑。 config.setString(JobManagerOptions.ADDRESS, address.getHostString()); config.setInteger(JobManagerOptions.PORT, address.getPort()); config.setString(RestOptions.ADDRESS, address.getHostString()); config.setInteger(RestOptions.PORT, address.getPort()); } // 根據配置文件和配置文件目錄加載客戶端命令行, 返回客戶端命令行的一個列表 public static List<CustomCommandLine<?>> loadCustomCommandLines(Configuration configuration, String configurationDirectory) { List<CustomCommandLine<?>> customCommandLines = new ArrayList<>(2); // Command line interface of the YARN session, with a special initialization here to prefix all options with y/yarn. // Hadoop YARN(Yet Another resource negotiator) 協調器 的命令行接口, 經過特殊的初始化在全部選項前添加前綴 y 獲取 yarn // Tips: DefaultCLI must be added at last, because getActiveCustomCommandLine(..) will get the active CustomCommandLine in order and DefaultCLI isActive always return true. // 因爲getActiveCustomCommandLine()[獲取活躍的客戶端命令行] 方法會將客戶端命令行按字典序排序 而 默認命令型 一直是活躍的,因此須要將默認命令行添加到最後 final String flinkYarnSessionCLI = "org.apache.flink.yarn.cli.FlinkYarnSessionCli"; try { customCommandLines.add( loadCustomCommandLine(flinkYarnSessionCLI, configuration, configurationDirectory, "y", "yarn")); } catch (NoClassDefFoundError | Exception e) { LOG.warn("Could not load CLI class {}.", flinkYarnSessionCLI, e); } customCommandLines.add(new DefaultCLI(configuration)); return customCommandLines; } // -------------------------------------------------------------------------------------------- // Custom command-line 客戶端命令行 // -------------------------------------------------------------------------------------------- /** * Gets the custom command-line for the arguments. * 獲取客戶端命令行的參數 * @param commandLine The input to the command-line. * @return custom command-line which is active (may only be one at a time) */ public CustomCommandLine<?> getActiveCustomCommandLine(CommandLine commandLine) { for (CustomCommandLine<?> cli : customCommandLines) { if (cli.isActive(commandLine)) { return cli; } } throw new IllegalStateException("No command-line ran."); } /** * Loads a class from the classpath that implements the CustomCommandLine interface. * 從實現了客戶端命令行接口的 類路徑加載該類 * @param className The fully-qualified class name to load. * @param params The constructor parameters */ private static CustomCommandLine<?> loadCustomCommandLine(String className, Object... params) throws IllegalAccessException, InvocationTargetException, InstantiationException, ClassNotFoundException, NoSuchMethodException { Class<? extends CustomCommandLine> customCliClass = Class.forName(className).asSubclass(CustomCommandLine.class); // construct class types from the parameters // 從參數構造類的類型 Class<?>[] types = new Class<?>[params.length]; for (int i = 0; i < params.length; i++) { // 檢查非空 Preconditions.checkNotNull(params[i], "Parameters for custom command-lines may not be null."); // 獲取類型 types[i] = params[i].getClass(); } // 經過反射獲取構造器 Constructor<? extends CustomCommandLine> constructor = customCliClass.getConstructor(types); return constructor.newInstance(params); } }
package org.apache.flink.client.cli; import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.DefaultParser; import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import javax.annotation.Nullable; import java.util.Collection; /** * A simple command line parser (based on Apache Commons CLI) that extracts command * line options. * 一個基於apache Commons的簡單命令行客戶端, 用於提取命令行選項 */ public class CliFrontendParser { // 幫助選項 false 爲 是否有參數的boolean標籤 static final Option HELP_OPTION = new Option("h", "help", false, "Show the help message for the CLI Frontend or the action."); // jar 文件選項 有參數 static final Option JAR_OPTION = new Option("j", "jarfile", true, "Flink program JAR file."); // 類選項 有參數 static final Option CLASS_OPTION = new Option("c", "class", true, "Class with the program entry point (\"main()\" method or \"getPlan()\" method). Only needed if the " + "JAR file does not specify the class in its manifest."); // 類路徑選項 有參數 static final Option CLASSPATH_OPTION = new Option("C", "classpath", true, "Adds a URL to each user code " + "classloader on all nodes in the cluster. The paths must specify a protocol (e.g. file://) and be " + "accessible on all nodes (e.g. by means of a NFS share). You can use this option multiple " + "times for specifying more than one URL. The protocol must be supported by the " + "{@link java.net.URLClassLoader}."); // 並行數選項 public static final Option PARALLELISM_OPTION = new Option("p", "parallelism", true, "The parallelism with which to run the program. Optional flag to override the default value " + "specified in the configuration."); // 記錄選項 static final Option LOGGING_OPTION = new Option("q", "sysoutLogging", false, "If present, " + "suppress logging output to standard out."); // 分類選項 public static final Option DETACHED_OPTION = new Option("d", "detached", false, "If present, runs " + "the job in detached mode"); // 依附於退出的選項, 如linux指令ctrl + C public static final Option SHUTDOWN_IF_ATTACHED_OPTION = new Option( "sae", "shutdownOnAttachedExit", false, "If the job is submitted in attached mode, perform a best-effort cluster shutdown " + "when the CLI is terminated abruptly, e.g., in response to a user interrupt, such as typing Ctrl + C."); /** * @deprecated use non-prefixed variant {@link #DETACHED_OPTION} for both YARN and non-YARN deployments * 使用沒有前綴的變量部署yarn和非yarn模式(已過期, 不用看了) */ @Deprecated public static final Option YARN_DETACHED_OPTION = new Option("yd", "yarndetached", false, "If present, runs " + "the job in detached mode (deprecated; use non-YARN specific option instead)"); // 參數選項 static final Option ARGS_OPTION = new Option("a", "arguments", true, "Program arguments. Arguments can also be added without -a, simply as trailing parameters."); // 地址選項 public static final Option ADDRESS_OPTION = new Option("m", "jobmanager", true, "Address of the JobManager (master) to which to connect. " + "Use this flag to connect to a different JobManager than the one specified in the configuration."); // 保存點選項 public static final Option SAVEPOINT_PATH_OPTION = new Option("s", "fromSavepoint", true, "Path to a savepoint to restore the job from (for example hdfs:///flink/savepoint-1537)."); // 保存點容許沒有恢復狀態選項 public static final Option SAVEPOINT_ALLOW_NON_RESTORED_OPTION = new Option("n", "allowNonRestoredState", false, "Allow to skip savepoint state that cannot be restored. " + "You need to allow this if you removed an operator from your " + "program that was part of the program when the savepoint was triggered."); // 保存點安排選項 static final Option SAVEPOINT_DISPOSE_OPTION = new Option("d", "dispose", true, "Path of savepoint to dispose."); // list specific options 特殊選項列表 // 運行選項 static final Option RUNNING_OPTION = new Option("r", "running", false, "Show only running programs and their JobIDs"); // 已安排的選項 static final Option SCHEDULED_OPTION = new Option("s", "scheduled", false, "Show only scheduled programs and their JobIDs"); // 全部選項 static final Option ALL_OPTION = new Option("a", "all", false, "Show all programs and their JobIDs"); // zookeeper名稱空間選項 static final Option ZOOKEEPER_NAMESPACE_OPTION = new Option("z", "zookeeperNamespace", true, "Namespace to create the Zookeeper sub-paths for high availability mode"); // 依據保存點取消任務選項 (已過期, 然而還能用......) static final Option CANCEL_WITH_SAVEPOINT_OPTION = new Option( "s", "withSavepoint", true, "**DEPRECATION WARNING**: " + "Cancelling a job with savepoint is deprecated. Use \"stop\" instead. \n Trigger" + " savepoint and cancel job. The target directory is optional. If no directory is " + "specified, the configured default directory (" + CheckpointingOptions.SAVEPOINT_DIRECTORY.key() + ") is used."); // 中止並設置保存點選項 public static final Option STOP_WITH_SAVEPOINT_PATH = new Option("p", "savepointPath", true, "Path to the savepoint (for example hdfs:///flink/savepoint-1537). " + "If no directory is specified, the configured default will be used (\"" + CheckpointingOptions.SAVEPOINT_DIRECTORY.key() + "\")."); // 中止並排空選項(在生成保存點並中止Pipline[管道]前, 會將最大的水位線發送至JobManager) public static final Option STOP_AND_DRAIN = new Option("d", "drain", false, "Send MAX_WATERMARK before taking the savepoint and stopping the pipelne."); // python選項 static final Option PY_OPTION = new Option("py", "python", true, "Python script with the program entry point. " + "The dependent resources can be configured with the `--pyFiles` option."); // python文件選項 static final Option PYFILES_OPTION = new Option("pyfs", "pyFiles", true, "Attach custom python files for job. " + "Comma can be used as the separator to specify multiple files. " + "The standard python resource file suffixes such as .py/.egg/.zip are all supported." + "(eg: --pyFiles file:///tmp/myresource.zip,hdfs:///$namenode_address/myresource2.zip)"); // python 模塊選項 static final Option PYMODULE_OPTION = new Option("pym", "pyModule", true, "Python module with the program entry point. " + "This option must be used in conjunction with `--pyFiles`."); // 這是一個靜態塊 static { // 不須要幫助選項(不是必需要有的) HELP_OPTION.setRequired(false); // 不須要jar選項(不是必需要有的) JAR_OPTION.setRequired(false); // 設置參數名(不是必需要有的) JAR_OPTION.setArgName("jarfile"); // 不須要類選項(不是必需要有的) CLASS_OPTION.setRequired(false); CLASS_OPTION.setArgName("classname"); // 不須要類路徑選項(不是必需要有的) CLASSPATH_OPTION.setRequired(false); CLASSPATH_OPTION.setArgName("url"); ADDRESS_OPTION.setRequired(false); ADDRESS_OPTION.setArgName("host:port"); // 不須要並行量選項(不是必需要有的) PARALLELISM_OPTION.setRequired(false); PARALLELISM_OPTION.setArgName("parallelism"); // 不須要記錄選項(不是必需要有的) LOGGING_OPTION.setRequired(false); // 不須要分離選項(不是必需要有的) DETACHED_OPTION.setRequired(false); // 不須要依附於退出的選項(不是必需要有的) SHUTDOWN_IF_ATTACHED_OPTION.setRequired(false); // 不須要 yarn 分離選項(不是必需要有的) YARN_DETACHED_OPTION.setRequired(false); // 不須要參數選項(不是必需要有的) ARGS_OPTION.setRequired(false); ARGS_OPTION.setArgName("programArgs"); // 設置參數爲無限制的值(-2) ARGS_OPTION.setArgs(Option.UNLIMITED_VALUES); // 不須要運行選項(不是必需要有的) RUNNING_OPTION.setRequired(false); // 不須要已安排的選項(不是必需要有的) SCHEDULED_OPTION.setRequired(false); // 不須要保存點選項(不是必需要有的) SAVEPOINT_PATH_OPTION.setRequired(false); SAVEPOINT_PATH_OPTION.setArgName("savepointPath"); // 保存點容許無回覆選項 SAVEPOINT_ALLOW_NON_RESTORED_OPTION.setRequired(false); // Zookeeper 名稱空間選項 ZOOKEEPER_NAMESPACE_OPTION.setRequired(false); ZOOKEEPER_NAMESPACE_OPTION.setArgName("zookeeperNamespace"); // 根據保存點取消選項 CANCEL_WITH_SAVEPOINT_OPTION.setRequired(false); CANCEL_WITH_SAVEPOINT_OPTION.setArgName("targetDirectory"); CANCEL_WITH_SAVEPOINT_OPTION.setOptionalArg(true); STOP_WITH_SAVEPOINT_PATH.setRequired(false); STOP_WITH_SAVEPOINT_PATH.setArgName("savepointPath"); STOP_WITH_SAVEPOINT_PATH.setOptionalArg(true); // 中止並清空管道選項 STOP_AND_DRAIN.setRequired(false); // python選項 PY_OPTION.setRequired(false); PY_OPTION.setArgName("python"); // python文件選項 PYFILES_OPTION.setRequired(false); PYFILES_OPTION.setArgName("pyFiles"); // python模塊選項 PYMODULE_OPTION.setRequired(false); PYMODULE_OPTION.setArgName("pyModule"); } // 運行選項 private static final Options RUN_OPTIONS = getRunCommandOptions(); // 構造經常使用選項 private static Options buildGeneralOptions(Options options) { // 添加幫助選項 options.addOption(HELP_OPTION); // backwards compatibility: ignore verbose flag (-v) // 向後兼容性 忽略冗長的標識 options.addOption(new Option("v", "verbose", false, "This option is deprecated.")); return options; } // 獲取程序特殊選項 private static Options getProgramSpecificOptions(Options options) { // 添加如下選項 options.addOption(JAR_OPTION); options.addOption(CLASS_OPTION); options.addOption(CLASSPATH_OPTION); options.addOption(PARALLELISM_OPTION); options.addOption(ARGS_OPTION); options.addOption(LOGGING_OPTION); options.addOption(DETACHED_OPTION); options.addOption(SHUTDOWN_IF_ATTACHED_OPTION); options.addOption(YARN_DETACHED_OPTION); options.addOption(PY_OPTION); options.addOption(PYFILES_OPTION); options.addOption(PYMODULE_OPTION); return options; } // 獲取沒有已過期的程序特殊選項 private static Options getProgramSpecificOptionsWithoutDeprecatedOptions(Options options) { options.addOption(CLASS_OPTION); options.addOption(CLASSPATH_OPTION); options.addOption(PARALLELISM_OPTION); options.addOption(LOGGING_OPTION); options.addOption(DETACHED_OPTION); options.addOption(SHUTDOWN_IF_ATTACHED_OPTION); options.addOption(PY_OPTION); options.addOption(PYFILES_OPTION); options.addOption(PYMODULE_OPTION); return options; } // 獲取運行命令行選項 public static Options getRunCommandOptions() { Options options = buildGeneralOptions(new Options()); options = getProgramSpecificOptions(options); options.addOption(SAVEPOINT_PATH_OPTION); return options.addOption(SAVEPOINT_ALLOW_NON_RESTORED_OPTION); } // 獲取通知命令行選項 static Options getInfoCommandOptions() { Options options = buildGeneralOptions(new Options()); return getProgramSpecificOptions(options); } // 獲取命令行選項列表 static Options getListCommandOptions() { Options options = buildGeneralOptions(new Options()); options.addOption(ALL_OPTION); options.addOption(RUNNING_OPTION); return options.addOption(SCHEDULED_OPTION); } // 獲取取消命令行選項 static Options getCancelCommandOptions() { Options options = buildGeneralOptions(new Options()); return options.addOption(CANCEL_WITH_SAVEPOINT_OPTION); } // 獲取中止命令行選項 static Options getStopCommandOptions() { return buildGeneralOptions(new Options()) .addOption(STOP_WITH_SAVEPOINT_PATH) .addOption(STOP_AND_DRAIN); } // 獲取保存點命令行選項 static Options getSavepointCommandOptions() { Options options = buildGeneralOptions(new Options()); options.addOption(SAVEPOINT_DISPOSE_OPTION); return options.addOption(JAR_OPTION); } // -------------------------------------------------------------------------------------------- // Help // -------------------------------------------------------------------------------------------- // 獲取沒有過期的運行選項 private static Options getRunOptionsWithoutDeprecatedOptions(Options options) { // 獲取沒有已過期的程序特殊選項 Options o = getProgramSpecificOptionsWithoutDeprecatedOptions(options); // 添加保存點選項 o.addOption(SAVEPOINT_PATH_OPTION); // 添加容許無恢復的保存點選項 並 返回 o return o.addOption(SAVEPOINT_ALLOW_NON_RESTORED_OPTION); } // 獲取沒有過期的通知選項 private static Options getInfoOptionsWithoutDeprecatedOptions(Options options) { options.addOption(CLASS_OPTION); options.addOption(PARALLELISM_OPTION); return options; } // 獲取沒有過期的選項列表 private static Options getListOptionsWithoutDeprecatedOptions(Options options) { options.addOption(RUNNING_OPTION); return options.addOption(SCHEDULED_OPTION); } // 獲取沒有過期的取消選項 private static Options getCancelOptionsWithoutDeprecatedOptions(Options options) { return options.addOption(CANCEL_WITH_SAVEPOINT_OPTION); } // 獲取沒有過期的中止選項 private static Options getStopOptionsWithoutDeprecatedOptions(Options options) { return options .addOption(STOP_WITH_SAVEPOINT_PATH) .addOption(STOP_AND_DRAIN); } // 獲取沒有過期的保存點選項 private static Options getSavepointOptionsWithoutDeprecatedOptions(Options options) { options.addOption(SAVEPOINT_DISPOSE_OPTION); options.addOption(JAR_OPTION); return options; } /** * Prints the help for the client. * 爲客戶打印幫助信息 */ public static void printHelp(Collection<CustomCommandLine<?>> customCommandLines) { System.out.println("./flink <ACTION> [OPTIONS] [ARGUMENTS]"); System.out.println(); System.out.println("The following actions are available:"); printHelpForRun(customCommandLines); printHelpForInfo(); printHelpForList(customCommandLines); printHelpForStop(customCommandLines); printHelpForCancel(customCommandLines); printHelpForSavepoint(customCommandLines); System.out.println(); } // 打印運行的幫助信息 public static void printHelpForRun(Collection<CustomCommandLine<?>> customCommandLines) { HelpFormatter formatter = new HelpFormatter(); formatter.setLeftPadding(5); formatter.setWidth(80); System.out.println("\nAction \"run\" compiles and runs a program."); System.out.println("\n Syntax: run [OPTIONS] <jar-file> <arguments>"); formatter.setSyntaxPrefix(" \"run\" action options:"); formatter.printHelp(" ", getRunOptionsWithoutDeprecatedOptions(new Options())); printCustomCliOptions(customCommandLines, formatter, true); System.out.println(); } // 打印通知的幫助信息 public static void printHelpForInfo() { HelpFormatter formatter = new HelpFormatter(); formatter.setLeftPadding(5); formatter.setWidth(80); System.out.println("\nAction \"info\" shows the optimized execution plan of the program (JSON)."); System.out.println("\n Syntax: info [OPTIONS] <jar-file> <arguments>"); formatter.setSyntaxPrefix(" \"info\" action options:"); formatter.printHelp(" ", getInfoOptionsWithoutDeprecatedOptions(new Options())); System.out.println(); } // 打印幫助信息列表 public static void printHelpForList(Collection<CustomCommandLine<?>> customCommandLines) { HelpFormatter formatter = new HelpFormatter(); formatter.setLeftPadding(5); formatter.setWidth(80); System.out.println("\nAction \"list\" lists running and scheduled programs."); System.out.println("\n Syntax: list [OPTIONS]"); formatter.setSyntaxPrefix(" \"list\" action options:"); formatter.printHelp(" ", getListOptionsWithoutDeprecatedOptions(new Options())); printCustomCliOptions(customCommandLines, formatter, false); System.out.println(); } // 打印中止的幫助信息 public static void printHelpForStop(Collection<CustomCommandLine<?>> customCommandLines) { HelpFormatter formatter = new HelpFormatter(); formatter.setLeftPadding(5); formatter.setWidth(80); System.out.println("\nAction \"stop\" stops a running program with a savepoint (streaming jobs only)."); System.out.println("\n Syntax: stop [OPTIONS] <Job ID>"); formatter.setSyntaxPrefix(" \"stop\" action options:"); formatter.printHelp(" ", getStopOptionsWithoutDeprecatedOptions(new Options())); printCustomCliOptions(customCommandLines, formatter, false); System.out.println(); } // 打印取消的幫助信息 public static void printHelpForCancel(Collection<CustomCommandLine<?>> customCommandLines) { HelpFormatter formatter = new HelpFormatter(); formatter.setLeftPadding(5); formatter.setWidth(80); System.out.println("\nAction \"cancel\" cancels a running program."); System.out.println("\n Syntax: cancel [OPTIONS] <Job ID>"); formatter.setSyntaxPrefix(" \"cancel\" action options:"); formatter.printHelp(" ", getCancelOptionsWithoutDeprecatedOptions(new Options())); printCustomCliOptions(customCommandLines, formatter, false); System.out.println(); } // 打印保存點幫助信息 public static void printHelpForSavepoint(Collection<CustomCommandLine<?>> customCommandLines) { HelpFormatter formatter = new HelpFormatter(); formatter.setLeftPadding(5); formatter.setWidth(80); System.out.println("\nAction \"savepoint\" triggers savepoints for a running job or disposes existing ones."); System.out.println("\n Syntax: savepoint [OPTIONS] <Job ID> [<target directory>]"); formatter.setSyntaxPrefix(" \"savepoint\" action options:"); formatter.printHelp(" ", getSavepointOptionsWithoutDeprecatedOptions(new Options())); printCustomCliOptions(customCommandLines, formatter, false); System.out.println(); } /** * Prints custom cli options. * 打印客戶選項 * @param formatter The formatter to use for printing 打印格式 * @param runOptions True if the run options should be printed, False to print only general options * 是否打印運行選項, true 就會打印, 不然只會打印普通選項 */ private static void printCustomCliOptions( Collection<CustomCommandLine<?>> customCommandLines, HelpFormatter formatter, boolean runOptions) { // prints options from all available command-line classes // 從全部可獲取的命令行類中打印選項 for (CustomCommandLine cli: customCommandLines) { formatter.setSyntaxPrefix(" Options for " + cli.getId() + " mode:"); Options customOpts = new Options(); cli.addGeneralOptions(customOpts); if (runOptions) { cli.addRunOptions(customOpts); } formatter.printHelp(" ", customOpts); System.out.println(); } } // 保存點恢復設置 public static SavepointRestoreSettings createSavepointRestoreSettings(CommandLine commandLine) { // 若是有保存點路徑選項 if (commandLine.hasOption(SAVEPOINT_PATH_OPTION.getOpt())) { // 獲取保存點路徑 String savepointPath = commandLine.getOptionValue(SAVEPOINT_PATH_OPTION.getOpt()); // 是否容許恢復度狀態 boolean allowNonRestoredState = commandLine.hasOption(SAVEPOINT_ALLOW_NON_RESTORED_OPTION.getOpt()); // 根據保存點路徑 和 是否容許恢復狀態的標籤, 生成新的保存點恢復設置, 返回 return SavepointRestoreSettings.forPath(savepointPath, allowNonRestoredState); } else { // 不然就不設置新保存點恢復設置 return SavepointRestoreSettings.none(); } } // -------------------------------------------------------------------------------------------- // Line Parsing 行解析 // -------------------------------------------------------------------------------------------- // 解析運行命令行 public static RunOptions parseRunCommand(String[] args) throws CliArgsException { try { // 建立解析器 DefaultParser parser = new DefaultParser(); // 經過解析器解析運行選項, 參數(選項爲空會中止解析) 獲取命令行 CommandLine line = parser.parse(RUN_OPTIONS, args, true); return new RunOptions(line); } catch (ParseException e) { throw new CliArgsException(e.getMessage()); } } // 解析方法 參數: 選項, 參數, 沒有選項時是否中止解析的標籤 public static CommandLine parse(Options options, String[] args, boolean stopAtNonOptions) throws CliArgsException { final DefaultParser parser = new DefaultParser(); try { return parser.parse(options, args, stopAtNonOptions); } catch (ParseException e) { throw new CliArgsException(e.getMessage()); } } /** * Merges the given {@link Options} into a new Options object. * 合併已給的選項成一個新的選項對象 * @param optionsA options to merge, can be null if none * 合併選項A, 可爲空 * @param optionsB options to merge, can be null if none * 合併選項B, 可爲空 * @return */ public static Options mergeOptions(@Nullable Options optionsA, @Nullable Options optionsB) { final Options resultOptions = new Options(); // 若是不爲空就遍歷選項A, 將其添加到結果選項中 if (optionsA != null) { for (Option option : optionsA.getOptions()) { resultOptions.addOption(option); } } // 若是不爲空就遍歷選項B, 將其添加到結果選項中 if (optionsB != null) { for (Option option : optionsB.getOptions()) { resultOptions.addOption(option); } } return resultOptions; } }
package org.apache.flink.client.cli; import org.apache.flink.client.deployment.ClusterSpecification; import org.apache.flink.client.deployment.StandaloneClusterDescriptor; import org.apache.flink.client.deployment.StandaloneClusterId; import org.apache.flink.configuration.Configuration; import org.apache.flink.util.FlinkException; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Options; import javax.annotation.Nullable; /** * The default CLI which is used for interaction with standalone clusters. * 默認客戶端被用於和單節點集羣交互 * 繼承了客戶端命令行抽象類, 其泛型爲單節點集羣Id類 */ public class DefaultCLI extends AbstractCustomCommandLine<StandaloneClusterId> { public DefaultCLI(Configuration configuration) { super(configuration); } // 判斷是否活躍 @Override public boolean isActive(CommandLine commandLine) { // always active because we can try to read a JobManager address from the config // 由於咱們能從配置中讀取一個任務管理器的地址, 因此命令行一直是活躍的 return true; } @Override public String getId() { return "default"; } // 添加通用選項 @Override public void addGeneralOptions(Options baseOptions) { super.addGeneralOptions(baseOptions); } // 建立單節點集羣描述器 @Override public StandaloneClusterDescriptor createClusterDescriptor( CommandLine commandLine) throws FlinkException { // applyCommandLineOptionsToConfiguration: 將命令行選項應用到配置(添加地址選項和zookeeper名稱空間選項), 返回有效的配置文件 final Configuration effectiveConfiguration = applyCommandLineOptionsToConfiguration(commandLine); // 根據該文件建立單節點其羣描述器對象 return new StandaloneClusterDescriptor(effectiveConfiguration); } // 獲取集羣Id, 可爲空 @Override @Nullable public StandaloneClusterId getClusterId(CommandLine commandLine) { // 看到這個getInstance就知道StandaloneClusterId是一個單例, 點進去一看你會發現這是餓漢式的單例 return StandaloneClusterId.getInstance(); } // 獲取集羣規格 @Override public ClusterSpecification getClusterSpecification(CommandLine commandLine) { // 這裏使用了構造器模式來建立集羣規格 return new ClusterSpecification.ClusterSpecificationBuilder().createClusterSpecification(); } }
package org.apache.flink.client.cli; import org.apache.commons.cli.CommandLine; import static org.apache.flink.client.cli.CliFrontendParser.CANCEL_WITH_SAVEPOINT_OPTION; /** * Command line options for the CANCEL command. * 取消命令的命令行選項, 繼承了命令行選項 */ public class CancelOptions extends CommandLineOptions { // 參數 private final String[] args; /** * Flag indicating whether to cancel with a savepoint. * 表面是否根據保存點進行取消操做的標籤 */ private final boolean withSavepoint; /** * Optional target directory for the savepoint. Overwrites cluster * default. * 保存點的可選目標目錄, 重寫了默認集羣 */ private final String targetDirectory; public CancelOptions(CommandLine line) { super(line); this.args = line.getArgs(); this.withSavepoint = line.hasOption(CANCEL_WITH_SAVEPOINT_OPTION.getOpt()); this.targetDirectory = line.getOptionValue(CANCEL_WITH_SAVEPOINT_OPTION.getOpt()); } // 參數爲空就建立空字符串數組, 有就獲取該參數 public String[] getArgs() { return args == null ? new String[0] : args; } // 是否根據保存點進行取消操做的標籤 public boolean isWithSavepoint() { return withSavepoint; } // 獲取保存點的可選目標目錄 public String getSavepointTargetDirectory() { return targetDirectory; } }
package org.apache.flink.client.cli; import org.apache.commons.cli.CommandLine; import static org.apache.flink.client.cli.CliFrontendParser.HELP_OPTION; /** * Base class for all options parsed from the command line. * 全部從命令行解析的選項的基類 * Contains options for printing help and the JobManager address. * 包含了打印幫助選項和任務管理器地址選項 */ public abstract class CommandLineOptions { private final boolean printHelp; protected CommandLineOptions(CommandLine line) { this.printHelp = line.hasOption(HELP_OPTION.getOpt()); } public boolean isPrintHelp() { return printHelp; } }
package org.apache.flink.client.cli; import org.apache.commons.cli.CommandLine; /** * Command line options for the INFO command. * 通知命令的命令行選項, 繼承了程序選項類 */ public class InfoOptions extends ProgramOptions { public InfoOptions(CommandLine line) throws CliArgsException { super(line); } }
import org.apache.commons.cli.CommandLine; import static org.apache.flink.client.cli.CliFrontendParser.ALL_OPTION; import static org.apache.flink.client.cli.CliFrontendParser.RUNNING_OPTION; import static org.apache.flink.client.cli.CliFrontendParser.SCHEDULED_OPTION; /** * Command line options for the LIST command. * 列表命令的命令行選項 */ public class ListOptions extends CommandLineOptions { // 是否展現運行的程序 private final boolean showRunning; // 是否展現已安排好的程序 private final boolean showScheduled; // 是否展現全部 private final boolean showAll; public ListOptions(CommandLine line) { super(line); this.showAll = line.hasOption(ALL_OPTION.getOpt()); this.showRunning = line.hasOption(RUNNING_OPTION.getOpt()); this.showScheduled = line.hasOption(SCHEDULED_OPTION.getOpt()); } public boolean showRunning() { return showRunning; } public boolean showScheduled() { return showScheduled; } public boolean showAll() { return showAll; } }
package org.apache.flink.client.cli; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.commons.cli.CommandLine; import java.net.MalformedURLException; import java.net.URL; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import static org.apache.flink.client.cli.CliFrontendParser.ARGS_OPTION; import static org.apache.flink.client.cli.CliFrontendParser.CLASSPATH_OPTION; import static org.apache.flink.client.cli.CliFrontendParser.CLASS_OPTION; import static org.apache.flink.client.cli.CliFrontendParser.DETACHED_OPTION; import static org.apache.flink.client.cli.CliFrontendParser.JAR_OPTION; import static org.apache.flink.client.cli.CliFrontendParser.PARALLELISM_OPTION; import static org.apache.flink.client.cli.CliFrontendParser.PYFILES_OPTION; import static org.apache.flink.client.cli.CliFrontendParser.PYMODULE_OPTION; import static org.apache.flink.client.cli.CliFrontendParser.PY_OPTION; import static org.apache.flink.client.cli.CliFrontendParser.SHUTDOWN_IF_ATTACHED_OPTION; import static org.apache.flink.client.cli.CliFrontendParser.YARN_DETACHED_OPTION; /** * Base class for command line options that refer to a JAR file program. * 是用於查閱jar文件命令行的命令行選項的基類。 */ public abstract class ProgramOptions extends CommandLineOptions { // jar文件路徑 private final String jarFilePath; // 進入點類 private final String entryPointClass; // 類路徑, 是URL的一個列表 private final List<URL> classpaths; // 程序參數 private final String[] programArgs; // 最終的並行量 private final int parallelism; // 是否啓用分離模式 private final boolean detachedMode; // 是否啓用關聯退出 private final boolean shutdownOnAttachedExit; // 保存點恢復設置 private final SavepointRestoreSettings savepointSettings; /** * Flag indicating whether the job is a Python job. * 判斷任務是否爲python任務 */ private final boolean isPython; protected ProgramOptions(CommandLine line) throws CliArgsException { super(line); String[] args = line.hasOption(ARGS_OPTION.getOpt()) ? line.getOptionValues(ARGS_OPTION.getOpt()) : line.getArgs(); this.entryPointClass = line.hasOption(CLASS_OPTION.getOpt()) ? line.getOptionValue(CLASS_OPTION.getOpt()) : null; isPython = line.hasOption(PY_OPTION.getOpt()) | line.hasOption(PYMODULE_OPTION.getOpt()) | "org.apache.flink.client.python.PythonGatewayServer".equals(entryPointClass); // If specified the option -py(--python) // 若是有標註 -py 或 --python選項 if (line.hasOption(PY_OPTION.getOpt())) { // Cannot use option -py and -pym simultaneously. // 不能同時使用 -py 和 -pym(python module) if (line.hasOption(PYMODULE_OPTION.getOpt())) { throw new CliArgsException("Cannot use option -py and -pym simultaneously."); } // The cli cmd args which will be transferred to PythonDriver will be transformed as follows: // 客戶端的cmd參數會被轉換成python驅動器, 以下所示 // CLI cmd : -py ${python.py} pyfs [optional] ${py-files} [optional] ${other args}. // PythonDriver args: py ${python.py} [optional] pyfs [optional] ${py-files} [optional] ${other args}. // -------------------------------transformed------------------------------------------------------- // e.g. -py wordcount.py(CLI cmd) -----------> py wordcount.py(PythonDriver args) // e.g. -py wordcount.py -pyfs file:///AAA.py,hdfs:///BBB.py --input in.txt --output out.txt(CLI cmd) // -----> -py wordcount.py -pyfs file:///AAA.py,hdfs:///BBB.py --input in.txt --output out.txt(PythonDriver args) // 新參數 String[] newArgs; // 參數標識 int argIndex; // 若是有python文件選項 if (line.hasOption(PYFILES_OPTION.getOpt())) { newArgs = new String[args.length + 4]; newArgs[2] = "-" + PYFILES_OPTION.getOpt(); newArgs[3] = line.getOptionValue(PYFILES_OPTION.getOpt()); argIndex = 4; } else { newArgs = new String[args.length + 2]; argIndex = 2; } newArgs[0] = "-" + PY_OPTION.getOpt(); newArgs[1] = line.getOptionValue(PY_OPTION.getOpt()); // 數組拷貝 System.arraycopy(args, 0, newArgs, argIndex, args.length); args = newArgs; } // If specified the option -pym(--pyModule) // 若是標註了pym選項 if (line.hasOption(PYMODULE_OPTION.getOpt())) { // If you specify the option -pym, you should specify the option --pyFiles simultaneously. // 若是你標註了 -pym, 你應當同時標註 --pyFiles if (!line.hasOption(PYFILES_OPTION.getOpt())) { throw new CliArgsException("-pym must be used in conjunction with `--pyFiles`"); } // The cli cmd args which will be transferred to PythonDriver will be transformed as follows: // 客戶端cmd參數會被轉換成python驅動器, 以下所示 // CLI cmd : -pym ${py-module} -pyfs ${py-files} [optional] ${other args}. // PythonDriver args: -pym ${py-module} -pyfs ${py-files} [optional] ${other args}. // e.g. -pym AAA.fun -pyfs AAA.zip(CLI cmd) ----> -pym AAA.fun -pyfs AAA.zip(PythonDriver args) String[] newArgs = new String[args.length + 4]; newArgs[0] = "-" + PYMODULE_OPTION.getOpt(); newArgs[1] = line.getOptionValue(PYMODULE_OPTION.getOpt()); newArgs[2] = "-" + PYFILES_OPTION.getOpt(); newArgs[3] = line.getOptionValue(PYFILES_OPTION.getOpt()); // 數組拷貝 System.arraycopy(args, 0, newArgs, 4, args.length); args = newArgs; } // 若是命令行中有jar選項 if (line.hasOption(JAR_OPTION.getOpt())) { // 獲取jar文件路徑 this.jarFilePath = line.getOptionValue(JAR_OPTION.getOpt()); // 若是不是python任務且參數存在 } else if (!isPython && args.length > 0) { jarFilePath = args[0]; args = Arrays.copyOfRange(args, 1, args.length); } else { jarFilePath = null; } this.programArgs = args; // 類路徑 List<URL> classpaths = new ArrayList<URL>(); // 若是命令行中有類路徑選項 if (line.hasOption(CLASSPATH_OPTION.getOpt())) { // 遍歷選項值數組(經過類選項獲取該String數組) for (String path : line.getOptionValues(CLASSPATH_OPTION.getOpt())) { try { classpaths.add(new URL(path)); } catch (MalformedURLException e) { throw new CliArgsException("Bad syntax for classpath: " + path); } } } this.classpaths = classpaths; // 若是有並行數選項 if (line.hasOption(PARALLELISM_OPTION.getOpt())) { // 從選項值解析到的String值 String parString = line.getOptionValue(PARALLELISM_OPTION.getOpt()); try { // 解析獲取並行數 parallelism = Integer.parseInt(parString); // 若是獲取到的並行數小於等於0, 拋出數字格式異常 if (parallelism <= 0) { throw new NumberFormatException(); } } catch (NumberFormatException e) { throw new CliArgsException("The parallelism must be a positive number: " + parString); } } else { // 不然就使用默認並行數 parallelism = ExecutionConfig.PARALLELISM_DEFAULT; } // 若是有分離選項 或 有yarn分離選項和關聯退出選項, 就獲取分離模式 detachedMode = line.hasOption(DETACHED_OPTION.getOpt()) || line.hasOption( YARN_DETACHED_OPTION.getOpt()); shutdownOnAttachedExit = line.hasOption(SHUTDOWN_IF_ATTACHED_OPTION.getOpt()); // 根據命令行獲取保存點設置 this.savepointSettings = CliFrontendParser.createSavepointRestoreSettings(line); } public String getJarFilePath() { return jarFilePath; } public String getEntryPointClassName() { return entryPointClass; } public List<URL> getClasspaths() { return classpaths; } public String[] getProgramArgs() { return programArgs; } public int getParallelism() { return parallelism; } public boolean getDetachedMode() { return detachedMode; } public boolean isShutdownOnAttachedExit() { return shutdownOnAttachedExit; } public SavepointRestoreSettings getSavepointRestoreSettings() { return savepointSettings; } /** * Indicates whether the job is a Python job. * 顯示該任務是否爲python任務 */ public boolean isPython() { return isPython; } }
package org.apache.flink.client.cli; import org.apache.commons.cli.CommandLine; /** * Command line options for the RUN command. * 運行命令的命令行選項 */ public class RunOptions extends ProgramOptions { public RunOptions(CommandLine line) throws CliArgsException { super(line); } }
package org.apache.flink.client.cli; import org.apache.commons.cli.CommandLine; import static org.apache.flink.client.cli.CliFrontendParser.JAR_OPTION; import static org.apache.flink.client.cli.CliFrontendParser.SAVEPOINT_DISPOSE_OPTION; /** * Command line options for the SAVEPOINT command. * 保存點命令的命令行選項 */ public class SavepointOptions extends CommandLineOptions { private final String[] args; // 是否釋放資源 private boolean dispose; // 釋放保存點的路徑 private String disposeSavepointPath; private String jarFile; public SavepointOptions(CommandLine line) { super(line); args = line.getArgs(); dispose = line.hasOption(SAVEPOINT_DISPOSE_OPTION.getOpt()); disposeSavepointPath = line.getOptionValue(SAVEPOINT_DISPOSE_OPTION.getOpt()); jarFile = line.getOptionValue(JAR_OPTION.getOpt()); } public String[] getArgs() { return args == null ? new String[0] : args; } public boolean isDispose() { return dispose; } public String getSavepointPath() { return disposeSavepointPath; } public String getJarFilePath() { return jarFile; } }
package org.apache.flink.client.cli; import org.apache.commons.cli.CommandLine; import static org.apache.flink.client.cli.CliFrontendParser.STOP_AND_DRAIN; import static org.apache.flink.client.cli.CliFrontendParser.STOP_WITH_SAVEPOINT_PATH; /** * Command line options for the STOP command. * 中止命令的命令行選項 */ class StopOptions extends CommandLineOptions { private final String[] args; private final boolean savepointFlag; /** * Optional target directory for the savepoint. Overwrites cluster * default. * 保存點的可選的目標目錄, 重寫默認集羣 */ private final String targetDirectory; // 是否使事件事件提早 private final boolean advanceToEndOfEventTime; StopOptions(CommandLine line) { super(line); this.args = line.getArgs(); this.savepointFlag = line.hasOption(STOP_WITH_SAVEPOINT_PATH.getOpt()); this.targetDirectory = line.getOptionValue(STOP_WITH_SAVEPOINT_PATH.getOpt()); this.advanceToEndOfEventTime = line.hasOption(STOP_AND_DRAIN.getOpt()); } String[] getArgs() { return args == null ? new String[0] : args; } boolean hasSavepointFlag() { return savepointFlag; } String getTargetDirectory() { return targetDirectory; } boolean shouldAdvanceToEndOfEventTime() { return advanceToEndOfEventTime; } }
org.apache.flink.client.deployment
package org.apache.flink.client.deployment; import org.apache.flink.client.program.ClusterClient; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.util.FlinkException; /** * A descriptor to deploy a cluster (e.g. Yarn or Mesos) and return a Client for Cluster communication. * 一個用於部署集羣並返回一個與集羣交流的客戶端 * * @param <T> Type of the cluster id */ public interface ClusterDescriptor<T> extends AutoCloseable { /** * Returns a String containing details about the cluster (NodeManagers, available memory, ...). * 返回一個含有集羣詳情的信息 * */ String getClusterDescription(); /** * Retrieves an existing Flink Cluster. * 獲取已存在的Flink集羣 * @param clusterId The unique identifier of the running cluster * @return Client for the cluster * @throws ClusterRetrieveException if the cluster client could not be retrieved */ ClusterClient<T> retrieve(T clusterId) throws ClusterRetrieveException; /** * Triggers deployment of a cluster. * 啓動集羣的部署 * @param clusterSpecification Cluster specification defining the cluster to deploy * @return Client for the cluster * @throws ClusterDeploymentException if the cluster could not be deployed */ // 部署集羣會話 ClusterClient<T> deploySessionCluster(ClusterSpecification clusterSpecification) throws ClusterDeploymentException; /** * Deploys a per-job cluster with the given job on the cluster. * 根據已得的任務在集羣上部署各自的做業 * * @param clusterSpecification Initial cluster specification with which the Flink cluster is launched * 集羣規格 * @param jobGraph JobGraph with which the job cluster is started * 任務圖 * @param detached true if the cluster should be stopped after the job completion without serving the result, otherwise false * 集羣是否應當在任務完成後不提供任務執行結果就被關閉 * @return Cluster client to talk to the Flink cluster * @throws ClusterDeploymentException if the cluster could not be deployed 部署異常 */ ClusterClient<T> deployJobCluster( final ClusterSpecification clusterSpecification, final JobGraph jobGraph, final boolean detached) throws ClusterDeploymentException; /** * Terminates the cluster with the given cluster id. * 根據所得的id終止集羣 * @param clusterId identifying the cluster to shut down * @throws FlinkException if the cluster could not be terminated */ void killCluster(T clusterId) throws FlinkException; }
package org.apache.flink.client.deployment; import org.apache.flink.util.FlinkException; /** * Class which indicates a problem when deploying a Flink cluster. * 表面部署flink集羣時遇到的問題, 繼承了FlinkException */ public class ClusterDeploymentException extends FlinkException { private static final long serialVersionUID = -4327724979766139208L; public ClusterDeploymentException(String message) { super(message); } public ClusterDeploymentException(Throwable cause) { super(cause); } public ClusterDeploymentException(String message, Throwable cause) { super(message, cause); } }
package org.apache.flink.client.deployment; import org.apache.flink.util.FlinkException; /** * Exception which indicates that a cluster could not be retrieved. * 當沒法獲取一個集羣的信息時會拋出該異常, 繼承了FlinkException */ public class ClusterRetrieveException extends FlinkException { private static final long serialVersionUID = 7718062507419172318L; public ClusterRetrieveException(String message) { super(message); } public ClusterRetrieveException(Throwable cause) { super(cause); } public ClusterRetrieveException(String message, Throwable cause) { super(message, cause); } }
package org.apache.flink.client.deployment; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ConfigurationUtils; import org.apache.flink.configuration.TaskManagerOptions; /** * Description of the cluster to start by the {@link ClusterDescriptor}. * 對根據集羣描述器所啓動的集羣的描述 */ public final class ClusterSpecification { // master的內存(單位: MB) private final int masterMemoryMB; // 任務管理器的內存(單位: MB) private final int taskManagerMemoryMB; // 任務管理器的數量 private final int numberTaskManagers; // 每一個任務管理器的任務槽數量 private final int slotsPerTaskManager; private ClusterSpecification(int masterMemoryMB, int taskManagerMemoryMB, int numberTaskManagers, int slotsPerTaskManager) { this.masterMemoryMB = masterMemoryMB; this.taskManagerMemoryMB = taskManagerMemoryMB; this.numberTaskManagers = numberTaskManagers; this.slotsPerTaskManager = slotsPerTaskManager; } public int getMasterMemoryMB() { return masterMemoryMB; } public int getTaskManagerMemoryMB() { return taskManagerMemoryMB; } public int getNumberTaskManagers() { return numberTaskManagers; } public int getSlotsPerTaskManager() { return slotsPerTaskManager; } @Override public String toString() { return "ClusterSpecification{" + "masterMemoryMB=" + masterMemoryMB + ", taskManagerMemoryMB=" + taskManagerMemoryMB + ", numberTaskManagers=" + numberTaskManagers + ", slotsPerTaskManager=" + slotsPerTaskManager + '}'; } // 根據配置獲取集羣規格 public static ClusterSpecification fromConfiguration(Configuration configuration) { int slots = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1); int jobManagerMemoryMb = ConfigurationUtils.getJobManagerHeapMemory(configuration).getMebiBytes(); int taskManagerMemoryMb = ConfigurationUtils.getTaskManagerHeapMemory(configuration).getMebiBytes(); return new ClusterSpecificationBuilder() .setMasterMemoryMB(jobManagerMemoryMb) .setTaskManagerMemoryMB(taskManagerMemoryMb) .setNumberTaskManagers(1) .setSlotsPerTaskManager(slots) .createClusterSpecification(); } /** * Builder for the {@link ClusterSpecification} instance. * 集羣規格的構造器 */ public static class ClusterSpecificationBuilder { // 默認master內存爲768MB private int masterMemoryMB = 768; // 默認任務管理器內存爲768MB private int taskManagerMemoryMB = 768; // 默認任務管理器數量爲1 private int numberTaskManagers = 1; // 默認美國任務管理器的任務槽數量爲1 private int slotsPerTaskManager = 1; public ClusterSpecificationBuilder setMasterMemoryMB(int masterMemoryMB) { this.masterMemoryMB = masterMemoryMB; return this; } public ClusterSpecificationBuilder setTaskManagerMemoryMB(int taskManagerMemoryMB) { this.taskManagerMemoryMB = taskManagerMemoryMB; return this; } public ClusterSpecificationBuilder setNumberTaskManagers(int numberTaskManagers) { this.numberTaskManagers = numberTaskManagers; return this; } public ClusterSpecificationBuilder setSlotsPerTaskManager(int slotsPerTaskManager) { this.slotsPerTaskManager = slotsPerTaskManager; return this; } public ClusterSpecification createClusterSpecification() { return new ClusterSpecification( masterMemoryMB, taskManagerMemoryMB, numberTaskManagers, slotsPerTaskManager); } } }
package org.apache.flink.client.deployment; import org.apache.flink.client.program.rest.RestClusterClient; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; /** * A deployment descriptor for an existing cluster. * 對已存在的集羣進行描述 */ public class StandaloneClusterDescriptor implements ClusterDescriptor<StandaloneClusterId> { private final Configuration config; public StandaloneClusterDescriptor(Configuration config) { this.config = Preconditions.checkNotNull(config); } // 獲取集羣描述 @Override public String getClusterDescription() { String host = config.getString(JobManagerOptions.ADDRESS, ""); int port = config.getInteger(JobManagerOptions.PORT, -1); return "Standalone cluster at " + host + ":" + port; } // 獲取集羣客戶端信息 @Override public RestClusterClient<StandaloneClusterId> retrieve(StandaloneClusterId standaloneClusterId) throws ClusterRetrieveException { try { return new RestClusterClient<>(config, standaloneClusterId); } catch (Exception e) { throw new ClusterRetrieveException("Couldn't retrieve standalone cluster", e); } } // 關閉與客戶端的會話 @Override public RestClusterClient<StandaloneClusterId> deploySessionCluster(ClusterSpecification clusterSpecification) { throw new UnsupportedOperationException("Can't deploy a standalone cluster."); } // 使集羣客戶端休息 @Override public RestClusterClient<StandaloneClusterId> deployJobCluster( ClusterSpecification clusterSpecification, JobGraph jobGraph, boolean detached) { throw new UnsupportedOperationException("Can't deploy a standalone per-job cluster."); } // 殺死集羣 @Override public void killCluster(StandaloneClusterId clusterId) throws FlinkException { throw new UnsupportedOperationException("Cannot terminate a standalone cluster."); } @Override public void close() throws Exception { // nothing to do } }
package org.apache.flink.client.deployment; /** * Identifier for standalone clusters. */ public class StandaloneClusterId { private static final StandaloneClusterId INSTANCE = new StandaloneClusterId(); private StandaloneClusterId() {} public static StandaloneClusterId getInstance() { return INSTANCE; } }
org.apache.flink.client.program
package org.apache.flink.client.program; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobSubmissionResult; import org.apache.flink.api.common.Plan; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; import org.apache.flink.optimizer.CompilerException; import org.apache.flink.optimizer.DataStatistics; import org.apache.flink.optimizer.Optimizer; import org.apache.flink.optimizer.costs.DefaultCostEstimator; import org.apache.flink.optimizer.plan.FlinkPlan; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plan.StreamingPlan; import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator; import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; import org.apache.flink.runtime.client.JobStatusMessage; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.util.FlinkException; import org.apache.flink.util.OptionalFailure; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.net.URISyntaxException; import java.net.URL; import java.util.Collection; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; /** * Encapsulates the functionality necessary to submit a program to a remote cluster. * 封裝了將一個程序提交到一個遠程集羣所必需要有的功能 * @param <T> type of the cluster id 集羣id的類型(T) */ // 實現了可自動關閉的接口(jdk1.7, Josh Bloch) public abstract class ClusterClient<T> implements AutoCloseable { // 經過日誌工廠獲取日誌類(getLogger經過反射獲取) protected final Logger log = LoggerFactory.getLogger(getClass()); /** The optimizer used in the optimization of batch programs. */ // 應用於批處理程序優化的優化器 final Optimizer compiler; /** Configuration of the client. */ // 客戶端配置 private final Configuration flinkConfig; /** * For interactive invocations, the job results are only available after the ContextEnvironment has been run inside the user JAR. We pass the Client to every instance of the ContextEnvironment which lets us access the execution result here. * 對於互相調用, 任務執行結果只有在上下文環境已經在用戶jar中運行的狀態時才能夠獲取。 咱們將每一個客戶轉給每一個上下文環境實例來獲取執行結果。 */ // 最終任務執行結果 protected JobExecutionResult lastJobExecutionResult; /** Switch for blocking/detached job submission of the client. */ // 是否啓用分離任務提交模式的標籤, 爲false則使用阻塞式的任務提交 private boolean detachedJobSubmission = false; // ------------------------------------------------------------------------ // Construction 架構 // ------------------------------------------------------------------------ /** * Creates a instance that submits the programs to the JobManager defined in the configuration. * 建立一個提交程序到配置中已定義的任務管理器的實例 * This method will try to resolve the JobManager hostname and throw an exception if that is not possible. * 該方法會設法去決定任務管理器的主機名, 若是沒法決定,則會拋出異常 * * @param flinkConfig The config used to obtain the job-manager's address, and used to configure the optimizer. * flinkConfig參數: 獲取任務管理器地址並被用於配置程序優化器的配置 */ public ClusterClient(Configuration flinkConfig) { this.flinkConfig = Preconditions.checkNotNull(flinkConfig); this.compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), flinkConfig); } /** * User overridable hook to close the client, possibly closes internal services. * 用戶重寫hook來關閉客戶端, 可能關閉內部服務 * @deprecated use the {@link #close()} instead. This method stays for backwards compatibility. * 該方法已過期, 請使用close()方法, 仍存在該方法是爲了向後兼容性 */ public void shutdown() throws Exception { close(); } @Override public void close() throws Exception { } // ------------------------------------------------------------------------ // Access to the Program's Plan // 獲取程序的計劃 // ------------------------------------------------------------------------ // 獲取優化過的JSON格式的計劃 參數: 優化器, 打包的程序, 並行數 public static String getOptimizedPlanAsJson(Optimizer compiler, PackagedProgram prog, int parallelism) throws CompilerException, ProgramInvocationException { // 計劃的json格式dump文件生成器 PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator(); return jsonGen.getOptimizerPlanAsJSON((OptimizedPlan) getOptimizedPlan(compiler, prog, parallelism)); } // 獲取優化過的計劃 public static FlinkPlan getOptimizedPlan(Optimizer compiler, PackagedProgram prog, int parallelism) throws CompilerException, ProgramInvocationException { // 獲取上下文類加載器(其實就是獲取當前線程的上下午加載器) final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader(); try { // 將當前線程的上下文加載器設置爲進程的用戶碼類加載器 Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader()); // temporary hack to support the optimizer plan preview // 臨時的 侵入 以支持優化器計劃 預覽 OptimizerPlanEnvironment env = new OptimizerPlanEnvironment(compiler); // 若是並行數大於0, 就將優化器計劃環境的並行數設置爲該並行數 if (parallelism > 0) { env.setParallelism(parallelism); } return env.getOptimizedPlan(prog); } finally { // 將當前線程的上下文加載器設置爲以前獲取到的類加載器 Thread.currentThread().setContextClassLoader(contextClassLoader); } } // 獲取優化器計劃 public static OptimizedPlan getOptimizedPlan(Optimizer compiler, Plan p, int parallelism) throws CompilerException { // 經過日誌工廠類和反射獲取日誌 Logger log = LoggerFactory.getLogger(ClusterClient.class); // 若是並行數 > 0 且 計劃的默認並行數 <= 0 if (parallelism > 0 && p.getDefaultParallelism() <= 0) { // 就在日誌中打印: 將默認並行數從計劃的默認並行數改成傳入的並行數 log.debug("Changing plan default parallelism from {} to {}", p.getDefaultParallelism(), parallelism); // 將默認的並行數也設置爲當前傳入的並行數 p.setDefaultParallelism(parallelism); } log.debug("Set parallelism {}, plan default parallelism {}", parallelism, p.getDefaultParallelism()); // 編譯該計劃並返回優化過的計劃 return compiler.compile(p); } // ------------------------------------------------------------------------ // Program submission / execution 程序提交/執行 // ------------------------------------------------------------------------ /** * General purpose method to run a user jar from the CliFrontend in either blocking or detached mode, depending on whether {@code setDetached(true)} or {@code setDetached(false)}. * 該方法的主要目的是從客戶端前端(不管是阻塞模式仍是分離模式)運行一個用戶jar, 運行哪一種模式取決於setDetached標籤 * * @param prog the packaged program 打包的程序 * @param parallelism the parallelism to execute the contained Flink job 並行數 * @return The result of the execution 返回執行結果 * @throws ProgramMissingJobException 拋出程序無任務異常 * @throws ProgramInvocationException 拋出程序調用異常 */ // 經過打包的程序和並行度運行, 並獲取任務提交的結果 public JobSubmissionResult run(PackagedProgram prog, int parallelism) throws ProgramInvocationException, ProgramMissingJobException { // 獲取當前線程的上下文加載器 final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader(); try { // 將當前線程的類加載器設置爲打包的程序的用戶碼類加載器 Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader()); // 打印是否使用了分離模式 log.info("Starting program (detached: {})", isDetached()); // 獲取程序所需運行的全部library, 本質上是個URL的列表集合 final List<URL> libraries = prog.getAllLibraries(); // 初始化上下文工廠 ContextEnvironmentFactory factory = new ContextEnvironmentFactory(this, libraries, prog.getClasspaths(), prog.getUserCodeClassLoader(), parallelism, isDetached(), prog.getSavepointSettings()); ContextEnvironment.setAsContext(factory); try { // invoke main method 請求主方法 prog.invokeInteractiveModeForExecution(); // 最後的任務執行結果爲空, 就拋出程序沒有任務異常 if (lastJobExecutionResult == null) { throw new ProgramMissingJobException("The program didn't contain a Flink job."); } // 返回當前最後的任務的執行結果 return this.lastJobExecutionResult; } finally { /**個沙雕裝飾器模式, 執行unsetContext()方法實際調用的是 resetContextEnvironment方法, 將上下文工廠設置爲空, 而且 將存儲在threadLocal中的上下文工廠設置爲空。 threadlocal而是一個線程內部的存儲類,能夠在指定線程內存儲數據,數據存儲之後,只有指定線程能夠獲得存儲數據。在此先不作展開, 之後填坑 */ ContextEnvironment.unsetContext(); } } finally { // 設置當前線程的上下午類加載器爲獲取的類加載器 Thread.currentThread().setContextClassLoader(contextClassLoader); } } // 運行並返回任務執行的結果 public JobSubmissionResult run( Plan plan, List<URL> libraries, List<URL> classpaths, ClassLoader classLoader, int parallelism, SavepointRestoreSettings savepointSettings) throws CompilerException, ProgramInvocationException { // 獲取優化過的計劃 OptimizedPlan optPlan = getOptimizedPlan(compiler, plan, parallelism); return run(optPlan, libraries, classpaths, classLoader, savepointSettings); } // 運行並返回任務執行的結果 public JobSubmissionResult run( FlinkPlan compiledPlan, List<URL> libraries, List<URL> classpaths, ClassLoader classLoader, SavepointRestoreSettings savepointSettings) throws ProgramInvocationException { // 獲取任務視圖 JobGraph job = getJobGraph(flinkConfig, compiledPlan, libraries, classpaths, savepointSettings); return submitJob(job, classLoader); } /** * Requests the {@link JobStatus} of the job with the given {@link JobID}. * 須要有已獲取的任務Id的任務狀態 */ // 使用CompletableFuture實現異步調用, 根據任務Id獲取任務狀態 public abstract CompletableFuture<JobStatus> getJobStatus(JobID jobId); /** * Cancels a job identified by the job id. * 根據任務id取消任務 * @param jobId the job id * @throws Exception In case an error occurred. */ public abstract void cancel(JobID jobId) throws Exception; /** * Cancels a job identified by the job id and triggers a savepoint. * 根據任務id取消了任務或者引起了一個保存點 * * @param jobId the job id * @param savepointDirectory directory the savepoint should be written to 保存點應該被寫入到的目錄 * @return path where the savepoint is located * @throws Exception In case an error occurred. */ public abstract String cancelWithSavepoint(JobID jobId, @Nullable String savepointDirectory) throws Exception; /** * Stops a program on Flink cluster whose job-manager is configured in this client's configuration. * 在Flink集羣上中止一個程序, 該程序的任務管理器被配置在此客戶端的配置中 * * Stopping works only for streaming programs. Be aware, that the program might continue to run for a while after sending the stop command, because after sources stopped to emit data all operators need to finish processing. * 只中止流式程序的工做。須要注意的是: 在發出中止的命令行以後, 該程序可能還會運行一段時間, 由於在資源被中止後須要將數據散發到全部操做器來結束進程 * * @param jobId the job ID of the streaming program to stop * @param advanceToEndOfEventTime flag indicating if the source should inject a {@code MAX_WATERMARK} in the pipeline * advanceToEndOfEventTime(提早到最終的事件時間) 參數: 判斷該資源是否應該拒絕管道中一個最大水位線 * @param savepointDirectory directory the savepoint should be written to 保存點應當被寫入到的目錄 * @return a {@link CompletableFuture} containing the path where the savepoint is located * 返回一個包含保存點路徑CompletableFuture類 * @throws Exception * If the job ID is invalid (ie, is unknown or refers to a batch job) or if sending the stop signal failed. That might be due to an I/O problem, ie, the job-manager is unreachable. * 若是任務id無效(好比未知或者是一個批處理任務) 或者 發送中止信號失敗, 這有多是由於輸入輸出流致使的錯誤, 好比任務管理器沒法獲取。 */ public abstract String stopWithSavepoint(final JobID jobId, final boolean advanceToEndOfEventTime, @Nullable final String savepointDirectory) throws Exception; /** * Triggers a savepoint for the job identified by the job id. The savepoint will be written to the given savepoint directory, or {@link org.apache.flink.configuration.CheckpointingOptions#SAVEPOINT_DIRECTORY} if it is null. * 根據由任務id獲取的任務引起一個保存點, 該保存點會被寫入到一個已得的保存點目錄 或者 檢查配置中的保存點目錄 若是 保存點爲空 * * @param jobId job id * @param savepointDirectory directory the savepoint should be written to * @return path future where the savepoint is located * @throws FlinkException if no connection to the cluster could be established */ public abstract CompletableFuture<String> triggerSavepoint(JobID jobId, @Nullable String savepointDirectory) throws FlinkException; public abstract CompletableFuture<Acknowledge> disposeSavepoint(String savepointPath) throws FlinkException; /** * Lists the currently running and finished jobs on the cluster. * 列出集羣中當前正在運行和已完成的任務 * @return future collection of running and finished jobs * 返回運行中和完成的任務的CompletableFuture集合 * @throws Exception if no connection to the cluster could be established */ public abstract CompletableFuture<Collection<JobStatusMessage>> listJobs() throws Exception; /** * Requests and returns the accumulators for the given job identifier. Accumulators can be requested while a job is running or after it has finished. The default class loader is used to deserialize the incoming accumulator results. * 請求並返回已得任務的標識的累加器。當一個任務在運行或者在它完成以後, 能夠請求累加器, 默認的類加載器被用於將將要到來的累加器結果反序列化 * @param jobID The job identifier of a job. * @return A Map containing the accumulator's name and its value. * 返回一個含有累加器名稱(K)和值(V)得Map */ public Map<String, OptionalFailure<Object>> getAccumulators(JobID jobID) throws Exception { return getAccumulators(jobID, ClassLoader.getSystemClassLoader()); } /** * Requests and returns the accumulators for the given job identifier. Accumulators can be requested while a job is running or after it has finished. * 請求並返回已得任務標識的累加器 * * @param jobID The job identifier of a job. * @param loader The class loader for deserializing the accumulator results. * @return A Map containing the accumulator's name and its value. */ public abstract Map<String, OptionalFailure<Object>> getAccumulators(JobID jobID, ClassLoader loader) throws Exception; // ------------------------------------------------------------------------ // Internal translation methods 內部轉換方法 // ------------------------------------------------------------------------ // 獲取任務視圖 參數: flink配置文件, 打包的程序, 選擇的計劃, 保存點恢復設置 public static JobGraph getJobGraph(Configuration flinkConfig, PackagedProgram prog, FlinkPlan optPlan, SavepointRestoreSettings savepointSettings) throws ProgramInvocationException { return getJobGraph(flinkConfig, optPlan, prog.getAllLibraries(), prog.getClasspaths(), savepointSettings); } // 獲取任務視圖 參數: flink配置文件, 選擇的計劃, jar文件列表, 類路徑列表, // 保存點恢復設置 public static JobGraph getJobGraph(Configuration flinkConfig, FlinkPlan optPlan, List<URL> jarFiles, List<URL> classpaths, SavepointRestoreSettings savepointSettings) { JobGraph job; // 若是選擇的計劃是一個流式運算計劃 if (optPlan instanceof StreamingPlan) { // 將optplan強轉爲StreamingPlan再獲取任務視圖 job = ((StreamingPlan) optPlan).getJobGraph(); // 設置保存點恢復設置 job.setSavepointRestoreSettings(savepointSettings); } else { // 根據配置初始化任務視圖生成器 JobGraphGenerator gen = new JobGraphGenerator(flinkConfig); // 編譯優化過的計劃獲取任務視圖 job = gen.compileJobGraph((OptimizedPlan) optPlan); } // 遍歷jar文件URL列表 for (URL jar : jarFiles) { try { // 將該jar的路徑添加到任務視圖中(將jar轉化爲統一標識符傳入) job.addJar(new Path(jar.toURI())); } catch (URISyntaxException e) { // 拋出URL無效異常 throw new RuntimeException("URL is invalid. This should not happen.", e); } } // 設置任務的類路徑 job.setClasspaths(classpaths); return job; } // ------------------------------------------------------------------------ // Abstract methods to be implemented by the cluster specific Client // 被空氣棉的特定客戶實現的抽象方法 // ------------------------------------------------------------------------ /** * Returns an URL (as a string) to the JobManager web interface. * 返回一個String類的URL任務管理器web界面 */ public abstract String getWebInterfaceURL(); /** * Returns the cluster id identifying the cluster to which the client is connected. * 返回集羣id來鑑別該鏈接哪一個集羣 * * @return cluster id of the connected cluster */ public abstract T getClusterId(); /** * Set the mode of this client (detached or blocking job execution). * 設置客戶端執行模式: 分離仍是阻塞式執行 * @param isDetached If true, the client will submit programs detached via the {@code run} method */ public void setDetached(boolean isDetached) { this.detachedJobSubmission = isDetached; } /** * A flag to indicate whether this clients submits jobs detached. * 代表該客戶端是否提交了分離式執行的任務的標籤 * @return True if the Client submits detached, false otherwise */ public boolean isDetached() { return detachedJobSubmission; } /** * Return the Flink configuration object. * 返回Flink配置文件對象 * @return The Flink configuration object */ public Configuration getFlinkConfiguration() { return flinkConfig.clone(); // 這是一個deepcopy, clone新建立了一個對象 } /** * Calls the subclasses' submitJob method. It may decide to simply call one of the run methods or it may perform some custom job submission logic. * 請求了子類的提交任務方法, 它可能決定去簡化請求run方法中的一個 或者 它可能表現出一些客戶端提交任務的邏輯 * @param jobGraph The JobGraph to be submitted * @return JobSubmissionResult */ public abstract JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException; /** * Submit the given {@link JobGraph} to the cluster. * 提交已得的任務視圖到集羣 * @param jobGraph to submit * @return Future which is completed with the {@link JobSubmissionResult} */ public abstract CompletableFuture<JobSubmissionResult> submitJob(@Nonnull JobGraph jobGraph); /** * Request the {@link JobResult} for the given {@link JobID}. * 根據已得的任務Id請求任務結果, 任務Id不能爲空 * @param jobId for which to request the {@link JobResult} * @return Future which is completed with the {@link JobResult} */ public abstract CompletableFuture<JobResult> requestJobResult(@Nonnull JobID jobId); // 終止集羣, 拋出不支持的操做異常 public void shutDownCluster() { throw new UnsupportedOperationException("The " + getClass().getSimpleName() + " does not support shutDownCluster."); } }
package org.apache.flink.client.program; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.JobSubmissionResult; import org.apache.flink.api.common.Plan; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import java.net.URL; import java.util.List; /** * Execution Environment for remote execution with the Client. * 客戶端遠程執行的執行環境 */ public class ContextEnvironment extends ExecutionEnvironment { // 集羣客戶端 private final ClusterClient<?> client; // 是否分離執行? private final boolean detached; // 要附上的jar文件(URL列表) private final List<URL> jarFilesToAttach; // 要附上的類路徑(URL列表) private final List<URL> classpathsToAttach; // 用戶代碼加載器 private final ClassLoader userCodeClassLoader; // 保存點恢復設置 private final SavepointRestoreSettings savepointSettings; // 是否已經被請求過了(默認爲false) private boolean alreadyCalled; public ContextEnvironment(ClusterClient<?> remoteConnection, List<URL> jarFiles, List<URL> classpaths, ClassLoader userCodeClassLoader, SavepointRestoreSettings savepointSettings, boolean detached) { this.client = remoteConnection; this.jarFilesToAttach = jarFiles; this.classpathsToAttach = classpaths; this.userCodeClassLoader = userCodeClassLoader; this.savepointSettings = savepointSettings; this.detached = detached; this.alreadyCalled = false; } // 根據任務名執行任務, 返回任務執行結果 @Override public JobExecutionResult execute(String jobName) throws Exception { // 驗證在分離模式下執行是否只被調用一次 verifyExecuteIsCalledOnceWhenInDetachedMode(); // 根據任務名建立程序計劃 final Plan plan = createProgramPlan(jobName); final JobSubmissionResult jobSubmissionResult = client.run( plan, jarFilesToAttach, classpathsToAttach, userCodeClassLoader, getParallelism(), savepointSettings); // 獲取最終的任務執行結果 lastJobExecutionResult = jobSubmissionResult.getJobExecutionResult(); return lastJobExecutionResult; } // 驗證在分離模式下執行是否只被調用一次 private void verifyExecuteIsCalledOnceWhenInDetachedMode() { // 若是已經被請求過 且 是分離模式 if (alreadyCalled && detached) { // 拋出無效的程序異常 throw new InvalidProgramException(DetachedJobExecutionResult.DETACHED_MESSAGE + DetachedJobExecutionResult.EXECUTE_TWICE_MESSAGE); } // 將是否已被請求過了的標籤設置爲true alreadyCalled = true; } // 重寫toString @Override public String toString() { return "Context Environment (parallelism = " + (getParallelism() == ExecutionConfig.PARALLELISM_DEFAULT ? "default" : getParallelism()) + ")"; } public ClusterClient<?> getClient() { return this.client; } public List<URL> getJars(){ return jarFilesToAttach; } public List<URL> getClasspaths(){ return classpathsToAttach; } public ClassLoader getUserCodeClassLoader() { return userCodeClassLoader; } public SavepointRestoreSettings getSavepointRestoreSettings() { return savepointSettings; } // -------------------------------------------------------------------------------------------- // 又是個坑爹的裝飾器, 明明是根據上下文環境初始化上下文環境內容, 非要起個設置爲上下文的容易誤導人的方法 static void setAsContext(ContextEnvironmentFactory factory) { // 該方法內會先檢查工廠類是否爲空, 再將保存在threadlocal類中的上下文環境工廠設置爲該上下文工廠實例 // ps: private static final ThreadLocal<ExecutionEnvironmentFactory> threadLocalContextEnvironmentFactory = new ThreadLocal<>(); // 這原本就是一盒帶執行環境工廠類泛型的一個ThreadLocal類 initializeContextEnvironment(factory); } // 前面講過, 仍是坑爹的裝飾器, 不說了 static void unsetContext() { resetContextEnvironment(); } }
package org.apache.flink.client.program; import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.ExecutionEnvironmentFactory; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import java.net.URL; import java.util.List; /** * The factory that instantiates the environment to be used when running jobs that are submitted through a pre-configured client connection. * 該工廠將環境實例化爲在運行經過前置配置的客戶端鏈接的任務可以使用的環境 * This happens for example when a job is submitted from the command line. 好比一個任務從命令行被提交 */ public class ContextEnvironmentFactory implements ExecutionEnvironmentFactory { private final ClusterClient<?> client; private final List<URL> jarFilesToAttach; private final List<URL> classpathsToAttach; private final ClassLoader userCodeClassLoader; private final int defaultParallelism; private final boolean isDetached; private final SavepointRestoreSettings savepointSettings; private boolean alreadyCalled; public ContextEnvironmentFactory(ClusterClient<?> client, List<URL> jarFilesToAttach, List<URL> classpathsToAttach, ClassLoader userCodeClassLoader, int defaultParallelism, boolean isDetached, SavepointRestoreSettings savepointSettings) { this.client = client; this.jarFilesToAttach = jarFilesToAttach; this.classpathsToAttach = classpathsToAttach; this.userCodeClassLoader = userCodeClassLoader; this.defaultParallelism = defaultParallelism; this.isDetached = isDetached; this.savepointSettings = savepointSettings; this.alreadyCalled = false; } // 建立執行環境 @Override public ExecutionEnvironment createExecutionEnvironment() { verifyCreateIsCalledOnceWhenInDetachedMode(); final ContextEnvironment environment = new ContextEnvironment( client, jarFilesToAttach, classpathsToAttach, userCodeClassLoader, savepointSettings, isDetached); if (defaultParallelism > 0) { environment.setParallelism(defaultParallelism); } return environment; } private void verifyCreateIsCalledOnceWhenInDetachedMode() { if (isDetached && alreadyCalled) { throw new InvalidProgramException("Multiple environments cannot be created in detached mode"); } alreadyCalled = true; } }
package org.apache.flink.client.program; import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.JobID; import java.util.Map; /** * The {@link JobExecutionResult} returned by a {@link ContextEnvironment} when executing a job in detached mode. * 當一個任務以分離模式被執行的狀況下, 有上下文環境所返回的任務執行結果 */ public final class DetachedJobExecutionResult extends JobExecutionResult { // 以分離模式提交 static final String DETACHED_MESSAGE = "Job was submitted in detached mode. "; // 執行兩次會發送的消息: 只容許調用一次 static final String EXECUTE_TWICE_MESSAGE = "Only one call to execute is allowed. "; // 渴望的方法消息??? 難道是打錯了的近似提示? 喵喵喵? static final String EAGER_FUNCTION_MESSAGE = "Please make sure your program doesn't call " + "an eager execution function [collect, print, printToErr, count]. "; // 任務結果消息 static final String JOB_RESULT_MESSAGE = "Results of job execution, such as accumulators," + " runtime, etc. are not available. "; // 分類模式任務執行結果 public DetachedJobExecutionResult(final JobID jobID) { super(jobID, -1, null); } // 獲取網絡運行時間 @Override public long getNetRuntime() { throw new InvalidProgramException(DETACHED_MESSAGE + JOB_RESULT_MESSAGE); } // 根據累加器名稱獲取累加器執行結果 @Override public <T> T getAccumulatorResult(String accumulatorName) { throw new InvalidProgramException(DETACHED_MESSAGE + JOB_RESULT_MESSAGE + EAGER_FUNCTION_MESSAGE); } // 獲取全部的累加器結果 @Override public Map<String, Object> getAllAccumulatorResults() { throw new InvalidProgramException(DETACHED_MESSAGE + JOB_RESULT_MESSAGE); } // 根據累加器名獲取整型數計數結果 @Override public Integer getIntCounterResult(String accumulatorName) { throw new InvalidProgramException(DETACHED_MESSAGE + JOB_RESULT_MESSAGE); } @Override public JobID getJobID() { return super.getJobID(); } // 是否爲任務執行結果? 沒搞懂這個是幹嗎的, 判斷是不是任務執行的結果? @Override public boolean isJobExecutionResult() { return false; } // 獲取任務執行的結果 @Override public JobExecutionResult getJobExecutionResult() { return this; } }
package org.apache.flink.client.program; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobSubmissionResult; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.client.JobStatusMessage; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.minicluster.MiniCluster; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.OptionalFailure; import org.apache.flink.util.SerializedValue; import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.io.IOException; import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; /** * Client to interact with a {@link MiniCluster}. * 與最小集羣交互的客戶端, 繼承了集羣客戶端接口 */ public class MiniClusterClient extends ClusterClient<MiniClusterClient.MiniClusterId> { private final MiniCluster miniCluster; // 配置文件類和最小集羣客戶端不可爲空 public MiniClusterClient(@Nonnull Configuration configuration, @Nonnull MiniCluster miniCluster) { super(configuration); this.miniCluster = miniCluster; } // 提交任務, 返回任務提交結果 @Override public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException { // 又是使用了CompletableFuture來實現異步調用(任務異步提交) final CompletableFuture<JobSubmissionResult> jobSubmissionResultFuture = submitJob(jobGraph); // 是否分離 if (isDetached()) { try { // 從CompletableFuture中取出任務提交結果 final JobSubmissionResult jobSubmissionResult = jobSubmissionResultFuture.get(); // 根據任務執行結果的任務Id獲取最終任務執行結果 lastJobExecutionResult = new DetachedJobExecutionResult(jobSubmissionResult.getJobID()); return lastJobExecutionResult; // catch到 打斷異常 或 執行異常 } catch (InterruptedException | ExecutionException e) { // 檢查是否爲打斷異常 ExceptionUtils.checkInterrupted(e); throw new ProgramInvocationException("Could not run job in detached mode.", jobGraph.getJobID(), e); } } else { // 使用CompletableFuture類的thenCompose方法來實現多個CompleteFuture調用的鏈接 final CompletableFuture<JobResult> jobResultFuture = jobSubmissionResultFuture.thenCompose( (JobSubmissionResult ignored) -> requestJobResult(jobGraph.getJobID())); // 任務結果 final JobResult jobResult; try { // 從CompletableFuture中獲取結果 jobResult = jobResultFuture.get(); // 捕捉到打斷異常或執行異常 } catch (InterruptedException | ExecutionException e) { // 檢查是否爲打斷異常 ExceptionUtils.checkInterrupted(e); throw new ProgramInvocationException("Could not run job", jobGraph.getJobID(), e); } try { // 獲取最終任務執行結果 lastJobExecutionResult = jobResult.toJobExecutionResult(classLoader); return lastJobExecutionResult; } catch (JobExecutionException | IOException | ClassNotFoundException e) { throw new ProgramInvocationException("Job failed", jobGraph.getJobID(), e); } } } // 根據任務視圖異步提交任務, 返回CompletableFuture, 泛型爲任務提交結果 @Override public CompletableFuture<JobSubmissionResult> submitJob(@Nonnull JobGraph jobGraph) { return miniCluster.submitJob(jobGraph); } // 根據任務視圖異步提交任務, 返回CompletableFuture, 泛型爲任務結果 @Override public CompletableFuture<JobResult> requestJobResult(@Nonnull JobID jobId) { return miniCluster.requestJobResult(jobId); } // 根據任務Id取消任務 @Override public void cancel(JobID jobId) throws Exception { miniCluster.cancelJob(jobId).get(); } // 根據保存點取消任務 @Override public String cancelWithSavepoint(JobID jobId, @Nullable String savepointDirectory) throws Exception { return miniCluster.triggerSavepoint(jobId, savepointDirectory, true).get(); } // 根據保存點中止 @Override public String stopWithSavepoint(JobID jobId, boolean advanceToEndOfEventTime, @Nullable String savepointDirector) throws Exception { return miniCluster.stopWithSavepoint(jobId, savepointDirector, advanceToEndOfEventTime).get(); } // 引起保存點, 返回CompletableFuture <泛型爲String類的保存點路徑> @Override public CompletableFuture<String> triggerSavepoint(JobID jobId, @Nullable String savepointDirectory) { return miniCluster.triggerSavepoint(jobId, savepointDirectory, false); } // 釋放保存點, 返回一個泛型爲承認類的CompletableFuture @Override public CompletableFuture<Acknowledge> disposeSavepoint(String savepointPath) { return miniCluster.disposeSavepoint(savepointPath); } // 獲取任務列表, 返回一個泛型爲任務狀態消息集合的CompletableFuture @Override public CompletableFuture<Collection<JobStatusMessage>> listJobs() { return miniCluster.listJobs(); } // 獲取累加器, 返回以累加器名 做爲Key 和 可選的失敗? (其實它在成功時會返回成功的值, 在失敗時會返回致使失敗的緣由) 做爲 value 的 Map @Override public Map<String, OptionalFailure<Object>> getAccumulators(JobID jobID, ClassLoader loader) throws Exception { // 獲取待評估的執行圖 AccessExecutionGraph executionGraph = miniCluster.getExecutionGraph(jobID).get(); // 獲取序列化過的累加器 Map<String, SerializedValue<OptionalFailure<Object>>> accumulatorsSerialized = executionGraph.getAccumulatorsSerialized(); Map<String, OptionalFailure<Object>> result = new HashMap<>(accumulatorsSerialized.size()); // 遍歷該哈希表 for (Map.Entry<String, SerializedValue<OptionalFailure<Object>>> acc : accumulatorsSerialized.entrySet()) { // 將累加器的key 和 反序列化過的累加器的值 存入結果 result.put(acc.getKey(), acc.getValue().deserializeValue(loader)); } return result; } // 獲取任務狀態 @Override public CompletableFuture<JobStatus> getJobStatus(JobID jobId) { return miniCluster.getJobStatus(jobId); } // 獲取集羣Id @Override public MiniClusterClient.MiniClusterId getClusterId() { return MiniClusterId.INSTANCE; } // 獲取網頁界面URL @Override public String getWebInterfaceURL() { return miniCluster.getRestAddress().toString(); } enum MiniClusterId { INSTANCE } }
package org.apache.flink.client.program; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.Plan; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.ExecutionEnvironmentFactory; import org.apache.flink.optimizer.Optimizer; import org.apache.flink.optimizer.plan.FlinkPlan; import java.io.ByteArrayOutputStream; import java.io.PrintStream; /** * An {@link ExecutionEnvironment} that never executes a job but only creates the optimized plan. * 一個從不執行任務, 僅建立優化的計劃 */ public class OptimizerPlanEnvironment extends ExecutionEnvironment { private final Optimizer compiler; private FlinkPlan optimizerPlan; public OptimizerPlanEnvironment(Optimizer compiler) { this.compiler = compiler; } // ------------------------------------------------------------------------ // Execution Environment methods 執行環節方法 // ------------------------------------------------------------------------ @Override public JobExecutionResult execute(String jobName) throws Exception { Plan plan = createProgramPlan(jobName); this.optimizerPlan = compiler.compile(plan); // do not go on with anything now! 此時不要執行其餘任何事, 否則會拋出該異常 throw new ProgramAbortException(); } // 根據打包的程序獲取優化計劃 public FlinkPlan getOptimizedPlan(PackagedProgram prog) throws ProgramInvocationException { // temporarily write syserr and sysout to a byte array. // 臨時 經過syserr 和 sysout寫出爲一個字節數組 PrintStream originalOut = System.out; PrintStream originalErr = System.err; ByteArrayOutputStream baos = new ByteArrayOutputStream(); System.setOut(new PrintStream(baos)); ByteArrayOutputStream baes = new ByteArrayOutputStream(); System.setErr(new PrintStream(baes)); setAsContext(); try { // 請求交互模式執行 prog.invokeInteractiveModeForExecution(); } catch (ProgramInvocationException e) { throw e; } catch (Throwable t) { // the invocation gets aborted with the preview plan // 此調用會隨着此前的計劃流產 if (optimizerPlan != null) { return optimizerPlan; } else { throw new ProgramInvocationException("The program caused an error: ", t); } } finally { // 取消設置爲上下文 unsetAsContext(); System.setOut(originalOut); System.setErr(originalErr); } String stdout = baos.toString(); String stderr = baes.toString(); throw new ProgramInvocationException( "The program plan could not be fetched - the program aborted pre-maturely." + "\n\nSystem.err: " + (stderr.length() == 0 ? "(none)" : stderr) + "\n\nSystem.out: " + (stdout.length() == 0 ? "(none)" : stdout)); } // ------------------------------------------------------------------------ // 設置爲上下文 private void setAsContext() { ExecutionEnvironmentFactory factory = new ExecutionEnvironmentFactory() { @Override public ExecutionEnvironment createExecutionEnvironment() { return OptimizerPlanEnvironment.this; } }; initializeContextEnvironment(factory); } // 取消設置爲上下文 private void unsetAsContext() { resetContextEnvironment(); } // ------------------------------------------------------------------------ public void setPlan(FlinkPlan plan){ this.optimizerPlan = plan; } /** * A special exception used to abort programs when the caller is only interested * in the program plan, rather than in the full execution. * 當請求者只對程序計劃感興趣而不關心整個的執行時, 用於將程序流產(中斷)的特殊異常類 */ public static final class ProgramAbortException extends Error { private static final long serialVersionUID = 1L; } }
package org.apache.flink.client.program; import org.apache.flink.api.common.ProgramDescription; import org.apache.flink.client.ClientUtils; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.util.InstantiationUtil; import javax.annotation.Nullable; import java.io.BufferedInputStream; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.lang.reflect.Modifier; import java.net.MalformedURLException; import java.net.URISyntaxException; import java.net.URL; import java.nio.file.FileSystems; import java.nio.file.FileVisitResult; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.SimpleFileVisitor; import java.nio.file.attribute.BasicFileAttributes; import java.util.ArrayList; import java.util.Collections; import java.util.Enumeration; import java.util.List; import java.util.Random; import java.util.jar.Attributes; import java.util.jar.JarEntry; import java.util.jar.JarFile; import java.util.jar.Manifest; /** * This class encapsulates represents a program, packaged in a jar file. It supplies * functionality to extract nested libraries, search for the program entry point, * and extract a program plan. * 該類表明了一個被封裝的程序, 好比被打成jar包的文件。 * 它提供了從嵌套的資料中查找程序的進入點而且抽取程序計劃的功能 * */ public class PackagedProgram { /** * Property name of the entry in JAR manifest file that describes the Flink * specific entry point. * 描述了Flink 特定進入點的 jar manifest 文件中入口 的特徵名 */ public static final String MANIFEST_ATTRIBUTE_ASSEMBLER_CLASS = "program-class"; /** * Property name of the entry in JAR manifest file that describes the class with * the main method. * 描述了主方法類的 jar manifest 文件中入口 的特徵名 */ public static final String MANIFEST_ATTRIBUTE_MAIN_CLASS = "Main-Class"; // -------------------------------------------------------------------------------------------- private final URL jarFile; private final String[] args; private final Class<?> mainClass; private final List<File> extractedTempLibraries; private final List<URL> classpaths; private ClassLoader userCodeClassLoader; private SavepointRestoreSettings savepointSettings = SavepointRestoreSettings.none(); /** * Flag indicating whether the job is a Python job. * 判斷這是不是一個python任務的標籤 */ private final boolean isPython; /** * Creates an instance that wraps the plan defined in the jar file using the given * argument. * 建立一個實例來包裝 在 jar文件中 由已得參數 定義的 計劃 * * @param jarFile * The jar file which contains the plan and a Manifest which defines * the program-class * 該 jar 文件包含了計劃 和 一個 定義了程序類的 manifest 文件 * @param args * Optional. The arguments used to create the pact plan, depend on * implementation of the pact plan. See getDescription(). * 可選的參數, 該參數被用於建立打包計劃, 依賴於打包的計劃的應用. * 詳情請看獲取描述方法 * @throws ProgramInvocationException * This invocation is thrown if the Program can't be properly loaded. * Causes may be a missing / wrong class or manifest files. * 若是程序不能正確的加載, 該調用會被拋出。 可能致使其發生的緣由可能爲一個 丟失的/錯誤的 類 或 manifest文件 */ public PackagedProgram(File jarFile, String... args) throws ProgramInvocationException { this(jarFile, Collections.<URL>emptyList(), null, args); } /** * Creates an instance that wraps the plan defined in the jar file using the given * argument. * 建立了一個包裝了在jar文件中經過已得參數定義的計劃 * * @param jarFile * The jar file which contains the plan and a Manifest which defines * the program-class * 包含了計劃 和 一個 定義了 程序類的 manifest文件 * @param classpaths * Additional classpath URLs needed by the Program. * 程序所需的額外 URL 路徑 * @param args * Optional. The arguments used to create the pact plan, depend on * implementation of the pact plan. See getDescription(). * 可選的參數, 該參數被用於建立打包計劃, 依賴於打包的計劃的應用. 詳情請看獲取描述方法 * @throws ProgramInvocationException * This invocation is thrown if the Program can't be properly loaded. * Causes may be a missing / wrong class or manifest files. * 若是程序不能正確的加載, 該調用會被拋出。 可能致使其發生的緣由可能爲一個 丟失的/錯誤的 * 類 或 manifest文件 */ public PackagedProgram(File jarFile, List<URL> classpaths, String... args) throws ProgramInvocationException { this(jarFile, classpaths, null, args); } /** * Creates an instance that wraps the plan defined in the jar file using the given * arguments. For generating the plan the class defined in the className parameter * is used. * * @param jarFile * The jar file which contains the plan. * @param entryPointClassName * Name of the class which generates the plan. Overrides the class defined * in the jar file manifest * 生成計劃的進入點名稱類, 重載了定義在jar manifest 文件中定義的類 * @param args * Optional. The arguments used to create the pact plan, depend on * implementation of the pact plan. See getDescription(). * @throws ProgramInvocationException * This invocation is thrown if the Program can't be properly loaded. Causes may be a missing / wrong class or manifest files. */ public PackagedProgram(File jarFile, @Nullable String entryPointClassName, String... args) throws ProgramInvocationException { this(jarFile, Collections.<URL>emptyList(), entryPointClassName, args); } /** * Creates an instance that wraps the plan defined in the jar file using the given * arguments. For generating the plan the class defined in the className parameter * is used. * * @param jarFile * The jar file which contains the plan. * @param classpaths * Additional classpath URLs needed by the Program. * @param entryPointClassName * Name of the class which generates the plan. Overrides the class defined * in the jar file manifest * @param args * Optional. The arguments used to create the pact plan, depend on * implementation of the pact plan. See getDescription(). * @throws ProgramInvocationException * This invocation is thrown if the Program can't be properly loaded. * Causes may be a missing / wrong class or manifest files. */ public PackagedProgram(File jarFile, List<URL> classpaths, @Nullable String entryPointClassName, String... args) throws ProgramInvocationException { // Whether the job is a Python job. 判斷是否爲python任務 isPython = entryPointClassName != null && (entryPointClassName.equals("org.apache.flink.client.python.PythonDriver") || entryPointClassName.equals("org.apache.flink.client.python.PythonGatewayServer")); URL jarFileUrl = null; if (jarFile != null) { try { jarFileUrl = jarFile.getAbsoluteFile().toURI().toURL(); } catch (MalformedURLException e1) { throw new IllegalArgumentException("The jar file path is invalid."); } // 檢查文件路徑 checkJarFile(jarFileUrl); } else if (!isPython) { throw new IllegalArgumentException("The jar file must not be null."); } this.jarFile = jarFileUrl; this.args = args == null ? new String[0] : args; // if no entryPointClassName name was given, we try and look one up through the manifest // 若是沒有進入點類名稱, 咱們回去manifest文件中去找一個 if (entryPointClassName == null) { entryPointClassName = getEntryPointClassNameFromJar(jarFileUrl); } // now that we have an entry point, we can extract the nested jar files (if any) // 如今咱們有進入點了, 咱們key抽取任何嵌套的jar文件了 this.extractedTempLibraries = jarFileUrl == null ? Collections.emptyList() : extractContainedLibraries(jarFileUrl); this.classpaths = classpaths; this.userCodeClassLoader = ClientUtils.buildUserCodeClassLoader(getAllLibraries(), classpaths, getClass().getClassLoader()); // load the entry point class 加載進入點類 this.mainClass = loadMainClass(entryPointClassName, userCodeClassLoader); if (!hasMainMethod(mainClass)) { throw new ProgramInvocationException("The given program class does not have a main(String[]) method."); } } public PackagedProgram(Class<?> entryPointClass, String... args) throws ProgramInvocationException { this.jarFile = null; this.args = args == null ? new String[0] : args; this.extractedTempLibraries = Collections.emptyList(); this.classpaths = Collections.emptyList(); this.userCodeClassLoader = entryPointClass.getClassLoader(); // load the entry point class this.mainClass = entryPointClass; isPython = entryPointClass.getCanonicalName().equals("org.apache.flink.client.python.PythonDriver"); if (!hasMainMethod(mainClass)) { throw new ProgramInvocationException("The given program class does not have a main(String[]) method."); } } public void setSavepointRestoreSettings(SavepointRestoreSettings savepointSettings) { this.savepointSettings = savepointSettings; } public SavepointRestoreSettings getSavepointSettings() { return savepointSettings; } public String[] getArguments() { return this.args; } public String getMainClassName() { return this.mainClass.getName(); } /** * Returns the description provided by the Program class. This * may contain a description of the plan itself and its arguments. * * @return The description of the PactProgram's input parameters. * @throws ProgramInvocationException * This invocation is thrown if the Program can't be properly loaded. Causes * may be a missing / wrong class or manifest files. */ @Nullable public String getDescription() throws ProgramInvocationException { if (ProgramDescription.class.isAssignableFrom(this.mainClass)) { ProgramDescription descr; try { descr = InstantiationUtil.instantiate( this.mainClass.asSubclass(ProgramDescription.class), ProgramDescription.class); } catch (Throwable t) { return null; } try { return descr.getDescription(); } catch (Throwable t) { throw new ProgramInvocationException("Error while getting the program description" + (t.getMessage() == null ? "." : ": " + t.getMessage()), t); } } else { return null; } } /** * This method assumes that the context environment is prepared, or the execution * will be a local execution by default. */ public void invokeInteractiveModeForExecution() throws ProgramInvocationException{ callMainMethod(mainClass, args); } /** * Returns the classpaths that are required by the program. * * @return List of {@link java.net.URL}s. */ public List<URL> getClasspaths() { return this.classpaths; } /** * Gets the {@link java.lang.ClassLoader} that must be used to load user code classes. * * @return The user code ClassLoader. */ public ClassLoader getUserCodeClassLoader() { return this.userCodeClassLoader; } /** * Returns all provided libraries needed to run the program. */ public List<URL> getAllLibraries() { List<URL> libs = new ArrayList<URL>(this.extractedTempLibraries.size() + 1); if (jarFile != null) { libs.add(jarFile); } for (File tmpLib : this.extractedTempLibraries) { try { libs.add(tmpLib.getAbsoluteFile().toURI().toURL()); } catch (MalformedURLException e) { throw new RuntimeException("URL is invalid. This should not happen.", e); } } if (isPython) { String flinkOptPath = System.getenv(ConfigConstants.ENV_FLINK_OPT_DIR); final List<Path> pythonJarPath = new ArrayList<>(); try { Files.walkFileTree(FileSystems.getDefault().getPath(flinkOptPath), new SimpleFileVisitor<Path>() { @Override public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { FileVisitResult result = super.visitFile(file, attrs); if (file.getFileName().toString().startsWith("flink-python")) { pythonJarPath.add(file); } return result; } }); } catch (IOException e) { throw new RuntimeException( "Exception encountered during finding the flink-python jar. This should not happen.", e); } if (pythonJarPath.size() != 1) { throw new RuntimeException("Found " + pythonJarPath.size() + " flink-python jar."); } try { libs.add(pythonJarPath.get(0).toUri().toURL()); } catch (MalformedURLException e) { throw new RuntimeException("URL is invalid. This should not happen.", e); } } return libs; } /** * Deletes all temporary files created for contained packaged libraries. */ public void deleteExtractedLibraries() { deleteExtractedLibraries(this.extractedTempLibraries); this.extractedTempLibraries.clear(); } private static boolean hasMainMethod(Class<?> entryClass) { Method mainMethod; try { mainMethod = entryClass.getMethod("main", String[].class); } catch (NoSuchMethodException e) { return false; } catch (Throwable t) { throw new RuntimeException("Could not look up the main(String[]) method from the class " + entryClass.getName() + ": " + t.getMessage(), t); } return Modifier.isStatic(mainMethod.getModifiers()) && Modifier.isPublic(mainMethod.getModifiers()); } private static void callMainMethod(Class<?> entryClass, String[] args) throws ProgramInvocationException { Method mainMethod; if (!Modifier.isPublic(entryClass.getModifiers())) { throw new ProgramInvocationException("The class " + entryClass.getName() + " must be public."); } try { mainMethod = entryClass.getMethod("main", String[].class); } catch (NoSuchMethodException e) { throw new ProgramInvocationException("The class " + entryClass.getName() + " has no main(String[]) method."); } catch (Throwable t) { throw new ProgramInvocationException("Could not look up the main(String[]) method from the class " + entryClass.getName() + ": " + t.getMessage(), t); } if (!Modifier.isStatic(mainMethod.getModifiers())) { throw new ProgramInvocationException("The class " + entryClass.getName() + " declares a non-static main method."); } if (!Modifier.isPublic(mainMethod.getModifiers())) { throw new ProgramInvocationException("The class " + entryClass.getName() + " declares a non-public main method."); } try { mainMethod.invoke(null, (Object) args); } catch (IllegalArgumentException e) { throw new ProgramInvocationException("Could not invoke the main method, arguments are not matching.", e); } catch (IllegalAccessException e) { throw new ProgramInvocationException("Access to the main method was denied: " + e.getMessage(), e); } catch (InvocationTargetException e) { Throwable exceptionInMethod = e.getTargetException(); if (exceptionInMethod instanceof Error) { throw (Error) exceptionInMethod; } else if (exceptionInMethod instanceof ProgramParametrizationException) { throw (ProgramParametrizationException) exceptionInMethod; } else if (exceptionInMethod instanceof ProgramInvocationException) { throw (ProgramInvocationException) exceptionInMethod; } else { throw new ProgramInvocationException("The main method caused an error: " + exceptionInMethod.getMessage(), exceptionInMethod); } } catch (Throwable t) { throw new ProgramInvocationException("An error occurred while invoking the program's main method: " + t.getMessage(), t); } } private static String getEntryPointClassNameFromJar(URL jarFile) throws ProgramInvocationException { JarFile jar; Manifest manifest; String className; // Open jar file try { jar = new JarFile(new File(jarFile.toURI())); } catch (URISyntaxException use) { throw new ProgramInvocationException("Invalid file path '" + jarFile.getPath() + "'", use); } catch (IOException ioex) { throw new ProgramInvocationException("Error while opening jar file '" + jarFile.getPath() + "'. " + ioex.getMessage(), ioex); } // jar file must be closed at the end try { // Read from jar manifest try { manifest = jar.getManifest(); } catch (IOException ioex) { throw new ProgramInvocationException("The Manifest in the jar file could not be accessed '" + jarFile.getPath() + "'. " + ioex.getMessage(), ioex); } if (manifest == null) { throw new ProgramInvocationException("No manifest found in jar file '" + jarFile.getPath() + "'. The manifest is need to point to the program's main class."); } Attributes attributes = manifest.getMainAttributes(); // check for a "program-class" entry first className = attributes.getValue(PackagedProgram.MANIFEST_ATTRIBUTE_ASSEMBLER_CLASS); if (className != null) { return className; } // check for a main class className = attributes.getValue(PackagedProgram.MANIFEST_ATTRIBUTE_MAIN_CLASS); if (className != null) { return className; } else { throw new ProgramInvocationException("Neither a '" + MANIFEST_ATTRIBUTE_MAIN_CLASS + "', nor a '" + MANIFEST_ATTRIBUTE_ASSEMBLER_CLASS + "' entry was found in the jar file."); } } finally { try { jar.close(); } catch (Throwable t) { throw new ProgramInvocationException("Could not close the JAR file: " + t.getMessage(), t); } } } private static Class<?> loadMainClass(String className, ClassLoader cl) throws ProgramInvocationException { ClassLoader contextCl = null; try { contextCl = Thread.currentThread().getContextClassLoader(); Thread.currentThread().setContextClassLoader(cl); return Class.forName(className, false, cl); } catch (ClassNotFoundException e) { throw new ProgramInvocationException("The program's entry point class '" + className + "' was not found in the jar file.", e); } catch (ExceptionInInitializerError e) { throw new ProgramInvocationException("The program's entry point class '" + className + "' threw an error during initialization.", e); } catch (LinkageError e) { throw new ProgramInvocationException("The program's entry point class '" + className + "' could not be loaded due to a linkage failure.", e); } catch (Throwable t) { throw new ProgramInvocationException("The program's entry point class '" + className + "' caused an exception during initialization: " + t.getMessage(), t); } finally { if (contextCl != null) { Thread.currentThread().setContextClassLoader(contextCl); } } } /** * Takes all JAR files that are contained in this program's JAR file and extracts them * to the system's temp directory. * * @return The file names of the extracted temporary files. * @throws ProgramInvocationException Thrown, if the extraction process failed. */ public static List<File> extractContainedLibraries(URL jarFile) throws ProgramInvocationException { Random rnd = new Random(); JarFile jar = null; try { jar = new JarFile(new File(jarFile.toURI())); final List<JarEntry> containedJarFileEntries = new ArrayList<JarEntry>(); Enumeration<JarEntry> entries = jar.entries(); while (entries.hasMoreElements()) { JarEntry entry = entries.nextElement(); String name = entry.getName(); if (name.length() > 8 && name.startsWith("lib/") && name.endsWith(".jar")) { containedJarFileEntries.add(entry); } } if (containedJarFileEntries.isEmpty()) { return Collections.emptyList(); } else { // go over all contained jar files final List<File> extractedTempLibraries = new ArrayList<File>(containedJarFileEntries.size()); final byte[] buffer = new byte[4096]; boolean incomplete = true; try { for (int i = 0; i < containedJarFileEntries.size(); i++) { final JarEntry entry = containedJarFileEntries.get(i); String name = entry.getName(); // '/' as in case of zip, jar // java.util.zip.ZipEntry#isDirectory always looks only for '/' not for File.separator name = name.replace('/', '_'); File tempFile; try { tempFile = File.createTempFile(rnd.nextInt(Integer.MAX_VALUE) + "_", name); tempFile.deleteOnExit(); } catch (IOException e) { throw new ProgramInvocationException( "An I/O error occurred while creating temporary file to extract nested library '" + entry.getName() + "'.", e); } extractedTempLibraries.add(tempFile); // copy the temp file contents to a temporary File OutputStream out = null; InputStream in = null; try { out = new FileOutputStream(tempFile); in = new BufferedInputStream(jar.getInputStream(entry)); int numRead = 0; while ((numRead = in.read(buffer)) != -1) { out.write(buffer, 0, numRead); } } catch (IOException e) { throw new ProgramInvocationException("An I/O error occurred while extracting nested library '" + entry.getName() + "' to temporary file '" + tempFile.getAbsolutePath() + "'."); } finally { if (out != null) { out.close(); } if (in != null) { in.close(); } } } incomplete = false; } finally { if (incomplete) { deleteExtractedLibraries(extractedTempLibraries); } } return extractedTempLibraries; } } catch (Throwable t) { throw new ProgramInvocationException("Unknown I/O error while extracting contained jar files.", t); } finally { if (jar != null) { try { jar.close(); } catch (Throwable t) {} } } } public static void deleteExtractedLibraries(List<File> tempLibraries) { for (File f : tempLibraries) { f.delete(); } } private static void checkJarFile(URL jarfile) throws ProgramInvocationException { try { ClientUtils.checkJarFile(jarfile); } catch (IOException e) { throw new ProgramInvocationException(e.getMessage(), e); } catch (Throwable t) { throw new ProgramInvocationException("Cannot access jar file" + (t.getMessage() == null ? "." : ": " + t.getMessage()), t); } } }
package org.apache.flink.client.program; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; import org.apache.flink.optimizer.DataStatistics; import org.apache.flink.optimizer.Optimizer; import org.apache.flink.optimizer.costs.DefaultCostEstimator; import org.apache.flink.optimizer.plan.FlinkPlan; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plan.StreamingPlan; import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; import org.apache.flink.runtime.jobgraph.JobGraph; import javax.annotation.Nullable; import java.net.URISyntaxException; import java.net.URL; /** * Utility class for {@link PackagedProgram} related operations. */ public class PackagedProgramUtils { /** * Creates a {@link JobGraph} with a specified {@link JobID} * from the given {@link PackagedProgram}. * * @param packagedProgram to extract the JobGraph from * @param configuration to use for the optimizer and job graph generator * @param defaultParallelism for the JobGraph * @param jobID the pre-generated job id * @return JobGraph extracted from the PackagedProgram * @throws ProgramInvocationException if the JobGraph generation failed */ public static JobGraph createJobGraph( PackagedProgram packagedProgram, Configuration configuration, int defaultParallelism, @Nullable JobID jobID) throws ProgramInvocationException { Thread.currentThread().setContextClassLoader(packagedProgram.getUserCodeClassLoader()); final Optimizer optimizer = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), configuration); final OptimizerPlanEnvironment optimizerPlanEnvironment = new OptimizerPlanEnvironment(optimizer); optimizerPlanEnvironment.setParallelism(defaultParallelism); final FlinkPlan flinkPlan = optimizerPlanEnvironment.getOptimizedPlan(packagedProgram); final JobGraph jobGraph; if (flinkPlan instanceof StreamingPlan) { jobGraph = ((StreamingPlan) flinkPlan).getJobGraph(jobID); jobGraph.setSavepointRestoreSettings(packagedProgram.getSavepointSettings()); } else { final JobGraphGenerator jobGraphGenerator = new JobGraphGenerator(configuration); jobGraph = jobGraphGenerator.compileJobGraph((OptimizedPlan) flinkPlan, jobID); } for (URL url : packagedProgram.getAllLibraries()) { try { jobGraph.addJar(new Path(url.toURI())); } catch (URISyntaxException e) { throw new ProgramInvocationException("Invalid URL for jar file: " + url + '.', jobGraph.getJobID(), e); } } jobGraph.setClasspaths(packagedProgram.getClasspaths()); return jobGraph; } /** * Creates a {@link JobGraph} with a random {@link JobID} * from the given {@link PackagedProgram}. * * @param packagedProgram to extract the JobGraph from * @param configuration to use for the optimizer and job graph generator * @param defaultParallelism for the JobGraph * @return JobGraph extracted from the PackagedProgram * @throws ProgramInvocationException if the JobGraph generation failed */ public static JobGraph createJobGraph( PackagedProgram packagedProgram, Configuration configuration, int defaultParallelism) throws ProgramInvocationException { return createJobGraph(packagedProgram, configuration, defaultParallelism, null); } private PackagedProgramUtils() {} }
package org.apache.flink.client.program; import org.apache.flink.api.common.JobID; /** * Exception used to indicate that there is an error during the invocation of a Flink program. */ public class ProgramInvocationException extends Exception { /** * Serial version UID for serialization interoperability. */ private static final long serialVersionUID = -2417524218857151612L; /** * Creates a <tt>ProgramInvocationException</tt> with the given message. * * @param message * The message for the exception. */ public ProgramInvocationException(String message) { super(message); } /** * Creates a <tt>ProgramInvocationException</tt> with the given message which contains job id. * * @param message * The additional message. * @param jobID * ID of failed job. */ public ProgramInvocationException(String message, JobID jobID) { super(message + " (JobID: " + jobID + ")"); } /** * Creates a <tt>ProgramInvocationException</tt> for the given exception. * * @param cause * The exception that causes the program invocation to fail. */ public ProgramInvocationException(Throwable cause) { super(cause); } /** * Creates a <tt>ProgramInvocationException</tt> for the given exception with an * additional message. * * @param message * The additional message. * @param cause * The exception that causes the program invocation to fail. */ public ProgramInvocationException(String message, Throwable cause) { super(message, cause); } /** * Creates a <tt>ProgramInvocationException</tt> for the given exception with an * additional message which contains job id. * * @param message * The additional message. * @param jobID * ID of failed job. * @param cause * The exception that causes the program invocation to fail. */ public ProgramInvocationException(String message, JobID jobID, Throwable cause) { super(message + " (JobID: " + jobID + ")", cause); } }
package org.apache.flink.client.program; import org.apache.flink.util.FlinkException; /** * Exception used to indicate that no job was executed during the invocation of a Flink program. */ public class ProgramMissingJobException extends FlinkException { /** * Serial version UID for serialization interoperability. */ private static final long serialVersionUID = -1964276369605091101L; public ProgramMissingJobException(String message) { super(message); } }
package org.apache.flink.client.program; import org.apache.flink.util.Preconditions; /** * Exception used to indicate that there is an error in the parametrization of a Flink program. */ public class ProgramParametrizationException extends RuntimeException { /** * Serial version UID for serialization interoperability. */ private static final long serialVersionUID = 909054589029890262L; /** * Creates a <tt>ProgramParametrizationException</tt> with the given message. * * @param message * The program usage string. */ public ProgramParametrizationException(String message) { super(Preconditions.checkNotNull(message)); } }
rest
package org.apache.flink.client.program.rest; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobSubmissionResult; import org.apache.flink.api.common.accumulators.AccumulatorHelper; import org.apache.flink.api.common.cache.DistributedCache; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.client.program.ClusterClient; import org.apache.flink.client.program.DetachedJobExecutionResult; import org.apache.flink.client.program.ProgramInvocationException; import org.apache.flink.client.program.rest.retry.ExponentialWaitStrategy; import org.apache.flink.client.program.rest.retry.WaitStrategy; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.client.JobStatusMessage; import org.apache.flink.runtime.client.JobSubmissionException; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; import org.apache.flink.runtime.highavailability.ClientHighAvailabilityServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.rest.FileUpload; import org.apache.flink.runtime.rest.RestClient; import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationInfo; import org.apache.flink.runtime.rest.handler.async.TriggerResponse; import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; import org.apache.flink.runtime.rest.messages.EmptyResponseBody; import org.apache.flink.runtime.rest.messages.JobAccumulatorsHeaders; import org.apache.flink.runtime.rest.messages.JobAccumulatorsInfo; import org.apache.flink.runtime.rest.messages.JobAccumulatorsMessageParameters; import org.apache.flink.runtime.rest.messages.JobCancellationHeaders; import org.apache.flink.runtime.rest.messages.JobCancellationMessageParameters; import org.apache.flink.runtime.rest.messages.JobMessageParameters; import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders; import org.apache.flink.runtime.rest.messages.MessageHeaders; import org.apache.flink.runtime.rest.messages.MessageParameters; import org.apache.flink.runtime.rest.messages.RequestBody; import org.apache.flink.runtime.rest.messages.ResponseBody; import org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter; import org.apache.flink.runtime.rest.messages.TriggerId; import org.apache.flink.runtime.rest.messages.cluster.ShutdownHeaders; import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders; import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo; import org.apache.flink.runtime.rest.messages.job.JobExecutionResultHeaders; import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders; import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody; import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody; import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalRequest; import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalStatusHeaders; import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalStatusMessageParameters; import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalTriggerHeaders; import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointInfo; import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusHeaders; import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusMessageParameters; import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerHeaders; import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerMessageParameters; import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerRequestBody; import org.apache.flink.runtime.rest.messages.job.savepoints.stop.StopWithSavepointRequestBody; import org.apache.flink.runtime.rest.messages.job.savepoints.stop.StopWithSavepointTriggerHeaders; import org.apache.flink.runtime.rest.messages.queue.AsynchronouslyCreatedResource; import org.apache.flink.runtime.rest.messages.queue.QueueStatus; import org.apache.flink.runtime.rest.util.RestClientException; import org.apache.flink.runtime.rest.util.RestConstants; import org.apache.flink.runtime.util.ExecutorThreadFactory; import org.apache.flink.runtime.webmonitor.retriever.LeaderRetriever; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.ExecutorUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.OptionalFailure; import org.apache.flink.util.Preconditions; import org.apache.flink.util.function.CheckedSupplier; import org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.io.IOException; import java.io.ObjectOutputStream; import java.net.MalformedURLException; import java.net.URL; import java.nio.file.Files; import java.nio.file.Paths; import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; /** * A {@link ClusterClient} implementation that communicates via HTTP REST requests. */ public class RestClusterClient<T> extends ClusterClient<T> { private final RestClusterClientConfiguration restClusterClientConfiguration; /** Timeout for futures. */ private final Duration timeout; private final RestClient restClient; private final ExecutorService executorService = Executors.newFixedThreadPool(4, new ExecutorThreadFactory("Flink-RestClusterClient-IO")); private final WaitStrategy waitStrategy; private final T clusterId; private final ClientHighAvailabilityServices clientHAServices; private final LeaderRetrievalService webMonitorRetrievalService; private final LeaderRetriever webMonitorLeaderRetriever = new LeaderRetriever(); /** ExecutorService to run operations that can be retried on exceptions. */ private ScheduledExecutorService retryExecutorService; public RestClusterClient(Configuration config, T clusterId) throws Exception { this( config, null, clusterId, new ExponentialWaitStrategy(10L, 2000L)); } @VisibleForTesting RestClusterClient( Configuration configuration, @Nullable RestClient restClient, T clusterId, WaitStrategy waitStrategy) throws Exception { super(configuration); this.timeout = AkkaUtils.getClientTimeout(configuration); this.restClusterClientConfiguration = RestClusterClientConfiguration.fromConfiguration(configuration); if (restClient != null) { this.restClient = restClient; } else { this.restClient = new RestClient(restClusterClientConfiguration.getRestClientConfiguration(), executorService); } this.waitStrategy = Preconditions.checkNotNull(waitStrategy); this.clusterId = Preconditions.checkNotNull(clusterId); this.clientHAServices = HighAvailabilityServicesUtils.createClientHAService(configuration); this.webMonitorRetrievalService = clientHAServices.getClusterRestEndpointLeaderRetriever(); this.retryExecutorService = Executors.newSingleThreadScheduledExecutor(new ExecutorThreadFactory("Flink-RestClusterClient-Retry")); startLeaderRetrievers(); } private void startLeaderRetrievers() throws Exception { this.webMonitorRetrievalService.start(webMonitorLeaderRetriever); } @Override public void close() { ExecutorUtils.gracefulShutdown(restClusterClientConfiguration.getRetryDelay(), TimeUnit.MILLISECONDS, retryExecutorService); this.restClient.shutdown(Time.seconds(5)); ExecutorUtils.gracefulShutdown(5, TimeUnit.SECONDS, this.executorService); try { webMonitorRetrievalService.stop(); } catch (Exception e) { log.error("An error occurred during stopping the WebMonitorRetrievalService", e); } try { clientHAServices.close(); } catch (Exception e) { log.error("An error occurred during stopping the ClientHighAvailabilityServices", e); } try { super.close(); } catch (Exception e) { log.error("Error while closing the Cluster Client", e); } } @Override public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException { log.info("Submitting job {} (detached: {}).", jobGraph.getJobID(), isDetached()); final CompletableFuture<JobSubmissionResult> jobSubmissionFuture = submitJob(jobGraph); if (isDetached()) { try { final JobSubmissionResult jobSubmissionResult = jobSubmissionFuture.get(); log.warn("Job was executed in detached mode, the results will be available on completion."); this.lastJobExecutionResult = new DetachedJobExecutionResult(jobSubmissionResult.getJobID()); return lastJobExecutionResult; } catch (Exception e) { throw new ProgramInvocationException("Could not submit job", jobGraph.getJobID(), ExceptionUtils.stripExecutionException(e)); } } else { final CompletableFuture<JobResult> jobResultFuture = jobSubmissionFuture.thenCompose( ignored -> requestJobResult(jobGraph.getJobID())); final JobResult jobResult; try { jobResult = jobResultFuture.get(); } catch (Exception e) { throw new ProgramInvocationException("Could not retrieve the execution result.", jobGraph.getJobID(), ExceptionUtils.stripExecutionException(e)); } try { this.lastJobExecutionResult = jobResult.toJobExecutionResult(classLoader); return lastJobExecutionResult; } catch (JobExecutionException e) { throw new ProgramInvocationException("Job failed.", jobGraph.getJobID(), e); } catch (IOException | ClassNotFoundException e) { throw new ProgramInvocationException("Job failed.", jobGraph.getJobID(), e); } } } /** * Requests the job details. * * @param jobId The job id * @return Job details */ public CompletableFuture<JobDetailsInfo> getJobDetails(JobID jobId) { final JobDetailsHeaders detailsHeaders = JobDetailsHeaders.getInstance(); final JobMessageParameters params = new JobMessageParameters(); params.jobPathParameter.resolve(jobId); return sendRequest( detailsHeaders, params); } @Override public CompletableFuture<JobStatus> getJobStatus(JobID jobId) { return getJobDetails(jobId).thenApply(JobDetailsInfo::getJobStatus); } /** * Requests the {@link JobResult} for the given {@link JobID}. The method retries multiple * times to poll the {@link JobResult} before giving up. * * @param jobId specifying the job for which to retrieve the {@link JobResult} * @return Future which is completed with the {@link JobResult} once the job has completed or * with a failure if the {@link JobResult} could not be retrieved. */ @Override public CompletableFuture<JobResult> requestJobResult(@Nonnull JobID jobId) { return pollResourceAsync( () -> { final JobMessageParameters messageParameters = new JobMessageParameters(); messageParameters.jobPathParameter.resolve(jobId); return sendRequest( JobExecutionResultHeaders.getInstance(), messageParameters); }); } /** * Submits the given {@link JobGraph} to the dispatcher. * * @param jobGraph to submit * @return Future which is completed with the submission response */ @Override public CompletableFuture<JobSubmissionResult> submitJob(@Nonnull JobGraph jobGraph) { // we have to enable queued scheduling because slot will be allocated lazily jobGraph.setAllowQueuedScheduling(true); CompletableFuture<java.nio.file.Path> jobGraphFileFuture = CompletableFuture.supplyAsync(() -> { try { final java.nio.file.Path jobGraphFile = Files.createTempFile("flink-jobgraph", ".bin"); try (ObjectOutputStream objectOut = new ObjectOutputStream(Files.newOutputStream(jobGraphFile))) { objectOut.writeObject(jobGraph); } return jobGraphFile; } catch (IOException e) { throw new CompletionException(new FlinkException("Failed to serialize JobGraph.", e)); } }, executorService); CompletableFuture<Tuple2<JobSubmitRequestBody, Collection<FileUpload>>> requestFuture = jobGraphFileFuture.thenApply(jobGraphFile -> { List<String> jarFileNames = new ArrayList<>(8); List<JobSubmitRequestBody.DistributedCacheFile> artifactFileNames = new ArrayList<>(8); Collection<FileUpload> filesToUpload = new ArrayList<>(8); filesToUpload.add(new FileUpload(jobGraphFile, RestConstants.CONTENT_TYPE_BINARY)); for (Path jar : jobGraph.getUserJars()) { jarFileNames.add(jar.getName()); filesToUpload.add(new FileUpload(Paths.get(jar.toUri()), RestConstants.CONTENT_TYPE_JAR)); } for (Map.Entry<String, DistributedCache.DistributedCacheEntry> artifacts : jobGraph.getUserArtifacts().entrySet()) { artifactFileNames.add(new JobSubmitRequestBody.DistributedCacheFile(artifacts.getKey(), new Path(artifacts.getValue().filePath).getName())); filesToUpload.add(new FileUpload(Paths.get(artifacts.getValue().filePath), RestConstants.CONTENT_TYPE_BINARY)); } final JobSubmitRequestBody requestBody = new JobSubmitRequestBody( jobGraphFile.getFileName().toString(), jarFileNames, artifactFileNames); return Tuple2.of(requestBody, Collections.unmodifiableCollection(filesToUpload)); }); final CompletableFuture<JobSubmitResponseBody> submissionFuture = requestFuture.thenCompose( requestAndFileUploads -> sendRetriableRequest( JobSubmitHeaders.getInstance(), EmptyMessageParameters.getInstance(), requestAndFileUploads.f0, requestAndFileUploads.f1, isConnectionProblemOrServiceUnavailable()) ); submissionFuture .thenCombine(jobGraphFileFuture, (ignored, jobGraphFile) -> jobGraphFile) .thenAccept(jobGraphFile -> { try { Files.delete(jobGraphFile); } catch (IOException e) { log.warn("Could not delete temporary file {}.", jobGraphFile, e); } }); return submissionFuture .thenApply( (JobSubmitResponseBody jobSubmitResponseBody) -> new JobSubmissionResult(jobGraph.getJobID())) .exceptionally( (Throwable throwable) -> { throw new CompletionException(new JobSubmissionException(jobGraph.getJobID(), "Failed to submit JobGraph.", ExceptionUtils.stripCompletionException(throwable))); }); } @Override public void cancel(JobID jobID) throws Exception { JobCancellationMessageParameters params = new JobCancellationMessageParameters(); params.jobPathParameter.resolve(jobID); params.terminationModeQueryParameter.resolve(Collections.singletonList(TerminationModeQueryParameter.TerminationMode.CANCEL)); CompletableFuture<EmptyResponseBody> responseFuture = sendRequest( JobCancellationHeaders.getInstance(), params); responseFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS); } @Override public String stopWithSavepoint( final JobID jobId, final boolean advanceToEndOfTime, @Nullable final String savepointDirectory) throws Exception { final StopWithSavepointTriggerHeaders stopWithSavepointTriggerHeaders = StopWithSavepointTriggerHeaders.getInstance(); final SavepointTriggerMessageParameters stopWithSavepointTriggerMessageParameters = stopWithSavepointTriggerHeaders.getUnresolvedMessageParameters(); stopWithSavepointTriggerMessageParameters.jobID.resolve(jobId); final CompletableFuture<TriggerResponse> responseFuture = sendRequest( stopWithSavepointTriggerHeaders, stopWithSavepointTriggerMessageParameters, new StopWithSavepointRequestBody(savepointDirectory, advanceToEndOfTime)); return responseFuture.thenCompose(savepointTriggerResponseBody -> { final TriggerId savepointTriggerId = savepointTriggerResponseBody.getTriggerId(); return pollSavepointAsync(jobId, savepointTriggerId); }).thenApply(savepointInfo -> { if (savepointInfo.getFailureCause() != null) { throw new CompletionException(savepointInfo.getFailureCause()); } return savepointInfo.getLocation(); }).get(); } @Override public String cancelWithSavepoint(JobID jobId, @Nullable String savepointDirectory) throws Exception { return triggerSavepoint(jobId, savepointDirectory, true).get(); } @Override public CompletableFuture<String> triggerSavepoint( final JobID jobId, final @Nullable String savepointDirectory) { return triggerSavepoint(jobId, savepointDirectory, false); } private CompletableFuture<String> triggerSavepoint( final JobID jobId, final @Nullable String savepointDirectory, final boolean cancelJob) { final SavepointTriggerHeaders savepointTriggerHeaders = SavepointTriggerHeaders.getInstance(); final SavepointTriggerMessageParameters savepointTriggerMessageParameters = savepointTriggerHeaders.getUnresolvedMessageParameters(); savepointTriggerMessageParameters.jobID.resolve(jobId); final CompletableFuture<TriggerResponse> responseFuture = sendRequest( savepointTriggerHeaders, savepointTriggerMessageParameters, new SavepointTriggerRequestBody(savepointDirectory, cancelJob)); return responseFuture.thenCompose(savepointTriggerResponseBody -> { final TriggerId savepointTriggerId = savepointTriggerResponseBody.getTriggerId(); return pollSavepointAsync(jobId, savepointTriggerId); }).thenApply(savepointInfo -> { if (savepointInfo.getFailureCause() != null) { throw new CompletionException(savepointInfo.getFailureCause()); } return savepointInfo.getLocation(); }); } @Override public Map<String, OptionalFailure<Object>> getAccumulators(final JobID jobID, ClassLoader loader) throws Exception { final JobAccumulatorsHeaders accumulatorsHeaders = JobAccumulatorsHeaders.getInstance(); final JobAccumulatorsMessageParameters accMsgParams = accumulatorsHeaders.getUnresolvedMessageParameters(); accMsgParams.jobPathParameter.resolve(jobID); accMsgParams.includeSerializedAccumulatorsParameter.resolve(Collections.singletonList(true)); CompletableFuture<JobAccumulatorsInfo> responseFuture = sendRequest( accumulatorsHeaders, accMsgParams); Map<String, OptionalFailure<Object>> result = Collections.emptyMap(); try { result = responseFuture.thenApply((JobAccumulatorsInfo accumulatorsInfo) -> { try { return AccumulatorHelper.deserializeAccumulators( accumulatorsInfo.getSerializedUserAccumulators(), loader); } catch (Exception e) { throw new CompletionException( new FlinkException( String.format("Deserialization of accumulators for job %s failed.", jobID), e)); } }).get(timeout.toMillis(), TimeUnit.MILLISECONDS); } catch (ExecutionException ee) { ExceptionUtils.rethrowException(ExceptionUtils.stripExecutionException(ee)); } return result; } private CompletableFuture<SavepointInfo> pollSavepointAsync( final JobID jobId, final TriggerId triggerID) { return pollResourceAsync(() -> { final SavepointStatusHeaders savepointStatusHeaders = SavepointStatusHeaders.getInstance(); final SavepointStatusMessageParameters savepointStatusMessageParameters = savepointStatusHeaders.getUnresolvedMessageParameters(); savepointStatusMessageParameters.jobIdPathParameter.resolve(jobId); savepointStatusMessageParameters.triggerIdPathParameter.resolve(triggerID); return sendRequest( savepointStatusHeaders, savepointStatusMessageParameters); }); } @Override public CompletableFuture<Collection<JobStatusMessage>> listJobs() { return sendRequest(JobsOverviewHeaders.getInstance()) .thenApply( (multipleJobsDetails) -> multipleJobsDetails .getJobs() .stream() .map(detail -> new JobStatusMessage( detail.getJobId(), detail.getJobName(), detail.getStatus(), detail.getStartTime())) .collect(Collectors.toList())); } @Override public T getClusterId() { return clusterId; } @Override public CompletableFuture<Acknowledge> disposeSavepoint(String savepointPath) { final SavepointDisposalRequest savepointDisposalRequest = new SavepointDisposalRequest(savepointPath); final CompletableFuture<TriggerResponse> savepointDisposalTriggerFuture = sendRequest( SavepointDisposalTriggerHeaders.getInstance(), savepointDisposalRequest); final CompletableFuture<AsynchronousOperationInfo> savepointDisposalFuture = savepointDisposalTriggerFuture.thenCompose( (TriggerResponse triggerResponse) -> { final TriggerId triggerId = triggerResponse.getTriggerId(); final SavepointDisposalStatusHeaders savepointDisposalStatusHeaders = SavepointDisposalStatusHeaders.getInstance(); final SavepointDisposalStatusMessageParameters savepointDisposalStatusMessageParameters = savepointDisposalStatusHeaders.getUnresolvedMessageParameters(); savepointDisposalStatusMessageParameters.triggerIdPathParameter.resolve(triggerId); return pollResourceAsync( () -> sendRequest( savepointDisposalStatusHeaders, savepointDisposalStatusMessageParameters)); }); return savepointDisposalFuture.thenApply( (AsynchronousOperationInfo asynchronousOperationInfo) -> { if (asynchronousOperationInfo.getFailureCause() == null) { return Acknowledge.get(); } else { throw new CompletionException(asynchronousOperationInfo.getFailureCause()); } }); } @Override public void shutDownCluster() { try { sendRequest(ShutdownHeaders.getInstance()).get(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } catch (ExecutionException e) { log.error("Error while shutting down cluster", e); } } /** * Creates a {@code CompletableFuture} that polls a {@code AsynchronouslyCreatedResource} until * its {@link AsynchronouslyCreatedResource#queueStatus() QueueStatus} becomes * {@link QueueStatus.Id#COMPLETED COMPLETED}. The future completes with the result of * {@link AsynchronouslyCreatedResource#resource()}. * * @param resourceFutureSupplier The operation which polls for the * {@code AsynchronouslyCreatedResource}. * @param <R> The type of the resource. * @param <A> The type of the {@code AsynchronouslyCreatedResource}. * @return A {@code CompletableFuture} delivering the resource. */ private <R, A extends AsynchronouslyCreatedResource<R>> CompletableFuture<R> pollResourceAsync( final Supplier<CompletableFuture<A>> resourceFutureSupplier) { return pollResourceAsync(resourceFutureSupplier, new CompletableFuture<>(), 0); } private <R, A extends AsynchronouslyCreatedResource<R>> CompletableFuture<R> pollResourceAsync( final Supplier<CompletableFuture<A>> resourceFutureSupplier, final CompletableFuture<R> resultFuture, final long attempt) { resourceFutureSupplier.get().whenComplete((asynchronouslyCreatedResource, throwable) -> { if (throwable != null) { resultFuture.completeExceptionally(throwable); } else { if (asynchronouslyCreatedResource.queueStatus().getId() == QueueStatus.Id.COMPLETED) { resultFuture.complete(asynchronouslyCreatedResource.resource()); } else { retryExecutorService.schedule(() -> { pollResourceAsync(resourceFutureSupplier, resultFuture, attempt + 1); }, waitStrategy.sleepTime(attempt), TimeUnit.MILLISECONDS); } } }); return resultFuture; } // ====================================== // Legacy stuff we actually implement // ====================================== @Override public String getWebInterfaceURL() { try { return getWebMonitorBaseUrl().get().toString(); } catch (InterruptedException | ExecutionException e) { ExceptionUtils.checkInterrupted(e); log.warn("Could not retrieve the web interface URL for the cluster.", e); return "Unknown address."; } } //------------------------------------------------------------------------- // RestClient Helper //------------------------------------------------------------------------- private <M extends MessageHeaders<EmptyRequestBody, P, U>, U extends MessageParameters, P extends ResponseBody> CompletableFuture<P> sendRequest(M messageHeaders, U messageParameters) { return sendRequest(messageHeaders, messageParameters, EmptyRequestBody.getInstance()); } private <M extends MessageHeaders<R, P, EmptyMessageParameters>, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRequest(M messageHeaders, R request) { return sendRequest(messageHeaders, EmptyMessageParameters.getInstance(), request); } @VisibleForTesting <M extends MessageHeaders<EmptyRequestBody, P, EmptyMessageParameters>, P extends ResponseBody> CompletableFuture<P> sendRequest(M messageHeaders) { return sendRequest(messageHeaders, EmptyMessageParameters.getInstance(), EmptyRequestBody.getInstance()); } @VisibleForTesting public <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRequest(M messageHeaders, U messageParameters, R request) { return sendRetriableRequest( messageHeaders, messageParameters, request, isConnectionProblemOrServiceUnavailable()); } private <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRetriableRequest(M messageHeaders, U messageParameters, R request, Predicate<Throwable> retryPredicate) { return sendRetriableRequest(messageHeaders, messageParameters, request, Collections.emptyList(), retryPredicate); } private <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRetriableRequest(M messageHeaders, U messageParameters, R request, Collection<FileUpload> filesToUpload, Predicate<Throwable> retryPredicate) { return retry(() -> getWebMonitorBaseUrl().thenCompose(webMonitorBaseUrl -> { try { return restClient.sendRequest(webMonitorBaseUrl.getHost(), webMonitorBaseUrl.getPort(), messageHeaders, messageParameters, request, filesToUpload); } catch (IOException e) { throw new CompletionException(e); } }), retryPredicate); } private <C> CompletableFuture<C> retry( CheckedSupplier<CompletableFuture<C>> operation, Predicate<Throwable> retryPredicate) { return FutureUtils.retryWithDelay( CheckedSupplier.unchecked(operation), restClusterClientConfiguration.getRetryMaxAttempts(), Time.milliseconds(restClusterClientConfiguration.getRetryDelay()), retryPredicate, new ScheduledExecutorServiceAdapter(retryExecutorService)); } private static Predicate<Throwable> isConnectionProblemOrServiceUnavailable() { return isConnectionProblemException().or(isServiceUnavailable()); } private static Predicate<Throwable> isConnectionProblemException() { return (throwable) -> ExceptionUtils.findThrowable(throwable, java.net.ConnectException.class).isPresent() || ExceptionUtils.findThrowable(throwable, java.net.SocketTimeoutException.class).isPresent() || ExceptionUtils.findThrowable(throwable, ConnectTimeoutException.class).isPresent() || ExceptionUtils.findThrowable(throwable, IOException.class).isPresent(); } private static Predicate<Throwable> isServiceUnavailable() { return httpExceptionCodePredicate(code -> code == HttpResponseStatus.SERVICE_UNAVAILABLE.code()); } private static Predicate<Throwable> httpExceptionCodePredicate(Predicate<Integer> statusCodePredicate) { return (throwable) -> ExceptionUtils.findThrowable(throwable, RestClientException.class) .map(restClientException -> { final int code = restClientException.getHttpResponseStatus().code(); return statusCodePredicate.test(code); }) .orElse(false); } @VisibleForTesting CompletableFuture<URL> getWebMonitorBaseUrl() { return FutureUtils.orTimeout( webMonitorLeaderRetriever.getLeaderFuture(), restClusterClientConfiguration.getAwaitLeaderTimeout(), TimeUnit.MILLISECONDS) .thenApplyAsync(leaderAddressSessionId -> { final String url = leaderAddressSessionId.f0; try { return new URL(url); } catch (MalformedURLException e) { throw new IllegalArgumentException("Could not parse URL from " + url, e); } }, executorService); } }
package org.apache.flink.client.program.rest; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.RestOptions; import org.apache.flink.runtime.rest.RestClientConfiguration; import org.apache.flink.util.ConfigurationException; import org.apache.flink.util.Preconditions; import static org.apache.flink.util.Preconditions.checkArgument; /** * A configuration object for {@link RestClusterClient}s. */ public final class RestClusterClientConfiguration { private final RestClientConfiguration restClientConfiguration; private final long awaitLeaderTimeout; private final int retryMaxAttempts; private final long retryDelay; private RestClusterClientConfiguration( final RestClientConfiguration endpointConfiguration, final long awaitLeaderTimeout, final int retryMaxAttempts, final long retryDelay) { checkArgument(awaitLeaderTimeout >= 0, "awaitLeaderTimeout must be equal to or greater than 0"); checkArgument(retryMaxAttempts >= 0, "retryMaxAttempts must be equal to or greater than 0"); checkArgument(retryDelay >= 0, "retryDelay must be equal to or greater than 0"); this.restClientConfiguration = Preconditions.checkNotNull(endpointConfiguration); this.awaitLeaderTimeout = awaitLeaderTimeout; this.retryMaxAttempts = retryMaxAttempts; this.retryDelay = retryDelay; } public RestClientConfiguration getRestClientConfiguration() { return restClientConfiguration; } /** * @see RestOptions#AWAIT_LEADER_TIMEOUT */ public long getAwaitLeaderTimeout() { return awaitLeaderTimeout; } /** * @see RestOptions#RETRY_MAX_ATTEMPTS */ public int getRetryMaxAttempts() { return retryMaxAttempts; } /** * @see RestOptions#RETRY_DELAY */ public long getRetryDelay() { return retryDelay; } public static RestClusterClientConfiguration fromConfiguration(Configuration config) throws ConfigurationException { RestClientConfiguration restClientConfiguration = RestClientConfiguration.fromConfiguration(config); final long awaitLeaderTimeout = config.getLong(RestOptions.AWAIT_LEADER_TIMEOUT); final int retryMaxAttempts = config.getInteger(RestOptions.RETRY_MAX_ATTEMPTS); final long retryDelay = config.getLong(RestOptions.RETRY_DELAY); return new RestClusterClientConfiguration(restClientConfiguration, awaitLeaderTimeout, retryMaxAttempts, retryDelay); } }
retry 重試
package org.apache.flink.client.program.rest.retry; /** * * Operations that are polling for a result to arrive require a waiting time * between consecutive polls. A {@code WaitStrategy} determines this waiting * time. * 輪詢等待結果的操做 須要一個 在 相鄰 輪詢時間 的 等待時間 * 等待策略決定了等待時間 * */ @FunctionalInterface public interface WaitStrategy { /** * Returns the time to wait until the next attempt. Attempts start at {@code 0 * }. 返回直到下一次進入的等待時長 * @param attempt The number of the last attempt. * @return Waiting time in ms. */ long sleepTime(long attempt); }
package org.apache.flink.client.program.rest.retry; import static org.apache.flink.util.Preconditions.checkArgument; /** * {@link WaitStrategy} with exponentially increasing sleep time. * 等待機制: 指數級的增加 睡眠時間 */ public class ExponentialWaitStrategy implements WaitStrategy { private final long initialWait; private final long maxWait; public ExponentialWaitStrategy(final long initialWait, final long maxWait) { checkArgument(initialWait > 0, "initialWait must be positive, was %s", initialWait); checkArgument(maxWait > 0, "maxWait must be positive, was %s", maxWait); checkArgument(initialWait <= maxWait, "initialWait must be lower than or equal to maxWait", maxWait); this.initialWait = initialWait; this.maxWait = maxWait; } @Override public long sleepTime(final long attempt) { checkArgument(attempt >= 0, "attempt must not be negative (%s)", attempt); final long exponentialSleepTime = initialWait * Math.round(Math.pow(2, attempt)); return exponentialSleepTime >= 0 && exponentialSleepTime < maxWait ? exponentialSleepTime : maxWait; } }
flink-connectors
flink-container
flink-contrib
flink-core
flink-dist
flink-docs
flink-end-toend-tests
flink-examples
flink-filesystems
flink-formats
flink-fs-tests
flink-java
flink-jepsen
flink-libraries
flink-mesos
flink-metrics
flink-ml-parent
flink-optimizer
flink-python
flink-queryable-state
flink-quickstart
flink-runtime
flink-runtime-web
flink-scala
flink-scala-shell
flink-shaded-curator
flink-state-backends
flink-streaming-java
flink-streaming-scala
flink-table
flink-tests
flink-test-utils-parent
flink-walkthroughs
flink-yarn
flink-yarn-tests