Flink1.3 指南四 命令行接口

Flink提供了一個命令行接口(CLI)用來運行打成JAR包的程序,而且能夠控制程序的運行。命令行接口在Flink安裝完以後便可擁有,本地單節點或是分佈式的部署安裝都會有命令行接口。命令行接口啓動腳本是 $FLINK_HOME/bin目錄下的flink腳本, 默認狀況下會鏈接運行中的Flink master(JobManager),JobManager的啓動腳本與CLI在同一安裝目錄下。html

使用命令行接口的先決條件是JobManager已經被啓動或是在Flink YARN環境下。JobManager能夠經過以下命令啓動:java

$FLINK_HOME/bin/start-local.sh
或
$FLINK_HOME/bin/start-cluster.sh

1. Example

(1) 運行示例程序,不傳參數:node

./bin/flink run ./examples/batch/WordCount.jar

(2) 運行示例程序,帶輸入和輸出文件參數:apache

./bin/flink run ./examples/batch/WordCount.jar --input file:///home/xiaosi/a.txt --output file:///home/xiaosi/result.txt

(3) 運行示例程序,帶輸入和輸出文件參數,並設置16個併發度:session

./bin/flink run -p 16 ./examples/batch/WordCount.jar --input file:///home/xiaosi/a.txt --output file:///home/xiaosi/result.txt

(4) 運行示例程序,並禁止Flink輸出日誌併發

./bin/flink run -q ./examples/batch/WordCount.jar

(5) 以獨立(detached)模式運行示例程序app

./bin/flink run -d ./examples/batch/WordCount.jar

(6) 在指定JobManager上運行示例程序分佈式

./bin/flink run -m myJMHost:6123 ./examples/batch/WordCount.jar --input file:///home/xiaosi/a.txt --output file:///home/xiaosi/result.txt

(7) 運行示例程序,指定程序入口類(Main方法所在類):ide

./bin/flink run -c org.apache.flink.examples.java.wordcount.WordCount ./examples/batch/WordCount.jar --input file:///home/xiaosi/a.txt --output file:///home/xiaosi/result.txt

(8) 運行示例程序,使用per-job YARN 集羣啓動 2 個TaskManageroop

./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar --input hdfs:///xiaosi/a.txt --output hdfs:///xiaosi/result.txt

(9) 以JSON格式輸出 WordCount示例程序優化執行計劃:

./bin/flink info ./examples/batch/WordCount.jar --input file:///home/xiaosi/a.txt --output file:///home/xiaosi/result.txt

(10) 列出已經調度的和正在運行的Job(包含Job ID信息)

./bin/flink list

(11) 列出已經調度的Job(包含Job ID信息)

./bin/flink list -s

(13) 列出正在運行的Job(包含Job ID信息)

./bin/flink list -r

(14) 列出在Flink YARN中運行Job

./bin/flink list -m yarn-cluster -yid <yarnApplicationID> -r

(15) 取消一個Job

./bin/flink cancel <jobID>

(16) 取消一個帶有保存點(savepoint)的Job

./bin/flink cancel -s [targetDirectory] <jobID>

(17) 中止一個Job(只適用於流計算Job)

./bin/flink stop <jobID>

備註:

取消和中止Job區別以下:
調用取消Job時,做業中的operator當即收到一個調用cancel()方法的指令以儘快取消它們。若是operator在調用取消操做後沒有中止,Flink將按期開啓中斷線程來取消做業直到做業中止。
調用中止Job是一種中止正在運行的流做業的更加優雅的方法。中止僅適用於使用實現`StoppableFunction`接口的源的那些做業。當用戶請求中止做業時,全部源將收到調用stop()方法指令。可是Job仍是會持續運行,直到全部來源已經正確關閉。這容許做業完成處理全部正在傳輸的數據(inflight data)。

2. 保存點

保存點經過命令行客戶端進行控制:

2.1 觸發保存點

./bin/flink savepoint <jobID> [savepointDirectory]

返回建立的保存點的路徑。你須要此路徑來還原和處理保存點。

觸發保存點時,能夠選擇是否指定savepointDirectory。若是在此處未指定,則須要爲Flink安裝配置默認的保存點目錄(請參閱保存點)。

2.2 根據保存點取消Job

你能夠自動觸發保存點並取消一個Job:

./bin/flink cancel -s  [savepointDirectory] <jobID>

若是沒有指定保存點目錄,則須要爲Flink安裝配置默認的保存點目錄(請參閱保存點)。若是保存點觸發成功,該做業將被取消

2.3 恢復保存點

./bin/flink run -s <savepointPath> ...

這個run命令提交Job時帶有一個保存點標記,這使得程序能夠從保存點中恢復狀態。保存點路徑是經過保存點觸發命令獲得的。

默認狀況下,咱們嘗試將全部保存點狀態與正在提交的做業相匹配。 若是要容許跳過那些沒法使用它恢復新做業的保存點狀態(allow to skip savepoint state that cannot be restored with the new job),則能夠設置allowNonRestoredState標誌。若是當保存點觸發時,從你程序中刪除了做爲程序一部分的operator,可是仍然要使用保存點,則須要容許這一點(You need to allow this if you removed an operator from your program that was part of the program when the savepoint was triggered and you still want to use the savepoint.)。

./bin/flink run -s <savepointPath> -n ...

若是你的程序刪除了做爲保存點一部分的operator,這時會很是有用(This is useful if your program dropped an operator that was part of the savepoint.)。

2.4 銷燬保存點

./bin/flink savepoint -d <savepointPath>

銷燬一個保存點一樣須要一個路徑。這個保存點路徑是經過保存點觸發命令獲得的。

3. 用法

下面是Flink命令行接口的用法:

xiaosi@yoona:~/qunar/company/opt/flink-1.3.2$ ./bin/flink
./flink <ACTION> [OPTIONS] [ARGUMENTS]

The following actions are available:

Action "run" compiles and runs a program.

  Syntax: run [OPTIONS] <jar-file> <arguments>
  "run" action options:
     -c,--class <classname>                         Class with the program entry
                                                    point ("main" method or
                                                    "getPlan()" method. Only
                                                    needed if the JAR file does
                                                    not specify the class in its
                                                    manifest.
     -C,--classpath <url>                           Adds a URL to each user code
                                                    classloader  on all nodes in
                                                    the cluster. The paths must
                                                    specify a protocol (e.g.
                                                    file://) and be accessible
                                                    on all nodes (e.g. by means
                                                    of a NFS share). You can use
                                                    this option multiple times
                                                    for specifying more than one
                                                    URL. The protocol must be
                                                    supported by the {@link
                                                    java.net.URLClassLoader}.
     -d,--detached                                  If present, runs the job in
                                                    detached mode
     -m,--jobmanager <host:port>                    Address of the JobManager
                                                    (master) to which to
                                                    connect. Use this flag to
                                                    connect to a different
                                                    JobManager than the one
                                                    specified in the
                                                    configuration.
     -n,--allowNonRestoredState                     Allow to skip savepoint
                                                    state that cannot be
                                                    restored. You need to allow
                                                    this if you removed an
                                                    operator from your program
                                                    that was part of the program
                                                    when the savepoint was
                                                    triggered.
     -p,--parallelism <parallelism>                 The parallelism with which
                                                    to run the program. Optional
                                                    flag to override the default
                                                    value specified in the
                                                    configuration.
     -q,--sysoutLogging                             If present, suppress logging
                                                    output to standard out.
     -s,--fromSavepoint <savepointPath>             Path to a savepoint to
                                                    restore the job from (for
                                                    example
                                                    hdfs:///flink/savepoint-1537
                                                    ).
     -z,--zookeeperNamespace <zookeeperNamespace>   Namespace to create the
                                                    Zookeeper sub-paths for high
                                                    availability mode
  Options for yarn-cluster mode:
     -yD <arg>                            Dynamic properties
     -yd,--yarndetached                   Start detached
     -yid,--yarnapplicationId <arg>       Attach to running YARN session
     -yj,--yarnjar <arg>                  Path to Flink jar file
     -yjm,--yarnjobManagerMemory <arg>    Memory for JobManager Container [in
                                          MB]
     -yn,--yarncontainer <arg>            Number of YARN container to allocate
                                          (=Number of Task Managers)
     -ynm,--yarnname <arg>                Set a custom name for the application
                                          on YARN
     -yq,--yarnquery                      Display available YARN resources
                                          (memory, cores)
     -yqu,--yarnqueue <arg>               Specify YARN queue.
     -ys,--yarnslots <arg>                Number of slots per TaskManager
     -yst,--yarnstreaming                 Start Flink in streaming mode
     -yt,--yarnship <arg>                 Ship files in the specified directory
                                          (t for transfer)
     -ytm,--yarntaskManagerMemory <arg>   Memory per TaskManager Container [in
                                          MB]
     -yz,--yarnzookeeperNamespace <arg>   Namespace to create the Zookeeper
                                          sub-paths for high availability mode

  Options for yarn mode:
     -ya,--yarnattached                   Start attached
     -yD <arg>                            Dynamic properties
     -yj,--yarnjar <arg>                  Path to Flink jar file
     -yjm,--yarnjobManagerMemory <arg>    Memory for JobManager Container [in
                                          MB]
     -yqu,--yarnqueue <arg>               Specify YARN queue.
     -yt,--yarnship <arg>                 Ship files in the specified directory
                                          (t for transfer)
     -yz,--yarnzookeeperNamespace <arg>   Namespace to create the Zookeeper
                                          sub-paths for high availability mode



Action "info" shows the optimized execution plan of the program (JSON).

  Syntax: info [OPTIONS] <jar-file> <arguments>
  "info" action options:
     -c,--class <classname>           Class with the program entry point ("main"
                                      method or "getPlan()" method. Only needed
                                      if the JAR file does not specify the class
                                      in its manifest.
     -p,--parallelism <parallelism>   The parallelism with which to run the
                                      program. Optional flag to override the
                                      default value specified in the
                                      configuration.
  Options for yarn-cluster mode:
     -yid,--yarnapplicationId <arg>   Attach to running YARN session

  Options for yarn mode:




Action "list" lists running and scheduled programs.

  Syntax: list [OPTIONS]
  "list" action options:
     -m,--jobmanager <host:port>   Address of the JobManager (master) to which
                                   to connect. Use this flag to connect to a
                                   different JobManager than the one specified
                                   in the configuration.
     -r,--running                  Show only running programs and their JobIDs
     -s,--scheduled                Show only scheduled programs and their JobIDs
  Options for yarn-cluster mode:
     -yid,--yarnapplicationId <arg>   Attach to running YARN session

  Options for yarn mode:




Action "stop" stops a running program (streaming jobs only).

  Syntax: stop [OPTIONS] <Job ID>
  "stop" action options:
     -m,--jobmanager <host:port>   Address of the JobManager (master) to which
                                   to connect. Use this flag to connect to a
                                   different JobManager than the one specified
                                   in the configuration.
  Options for yarn-cluster mode:
     -yid,--yarnapplicationId <arg>   Attach to running YARN session

  Options for yarn mode:




Action "cancel" cancels a running program.

  Syntax: cancel [OPTIONS] <Job ID>
  "cancel" action options:
     -m,--jobmanager <host:port>            Address of the JobManager (master)
                                            to which to connect. Use this flag
                                            to connect to a different JobManager
                                            than the one specified in the
                                            configuration.
     -s,--withSavepoint <targetDirectory>   Trigger savepoint and cancel job.
                                            The target directory is optional. If
                                            no directory is specified, the
                                            configured default directory
                                            (state.savepoints.dir) is used.
  Options for yarn-cluster mode:
     -yid,--yarnapplicationId <arg>   Attach to running YARN session

  Options for yarn mode:




Action "savepoint" triggers savepoints for a running job or disposes existing ones.

  Syntax: savepoint [OPTIONS] <Job ID> [<target directory>]
  "savepoint" action options:
     -d,--dispose <arg>            Path of savepoint to dispose.
     -j,--jarfile <jarfile>        Flink program JAR file.
     -m,--jobmanager <host:port>   Address of the JobManager (master) to which
                                   to connect. Use this flag to connect to a
                                   different JobManager than the one specified
                                   in the configuration.
  Options for yarn-cluster mode:
     -yid,--yarnapplicationId <arg>   Attach to running YARN session

  Options for yarn mode:

  Please specify an action.

原文:https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/cli.html

相關文章
相關標籤/搜索