Flink部署及做業提交(On Flink Standalone)

Flink部署準備及源碼編譯

官方文檔:html

前置準備

用於編譯源碼的機器最好知足以下配置:java

  • CPU > 4核
  • 內存 > 8G
  • Note:我這裏使用的機器配置是4核8G,若是內存過小編譯環節會發生OOM

部署Flink以前首先須要安裝好JDK,能夠選擇8或11版本,我這裏選擇的是JDK11:node

[root@flink01 ~]# java -version
java version "11.0.8" 2020-07-14 LTS
Java(TM) SE Runtime Environment 18.9 (build 11.0.8+10-LTS)
Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.8+10-LTS, mixed mode)
[root@flink01 ~]#

因爲咱們選擇的是源碼編譯的方式安裝Flink,因此還須要提早安裝好Maven:python

[root@flink01 /usr/local/src]# mvn --version
Apache Maven 3.6.3 (cecedd343002696d0abb50b32b541b8a6ba2883f)
Maven home: /usr/local/maven
Java version: 11.0.8, vendor: Oracle Corporation, runtime: /usr/local/jdk/11
Default locale: zh_CN, platform encoding: UTF-8
OS name: "linux", version: "3.10.0-1062.el7.x86_64", arch: "amd64", family: "unix"
[root@flink01 /usr/local/src]#

Flink有個web-dashboard項目的編譯須要依賴於NodeJS,因此也須要事先安裝好:linux

[root@flink01 ~]# node -v
v12.18.4
[root@flink01 ~]#

該項目的構建依賴於angular的cli工具,可使用以下命令進行安裝:c++

[root@flink01 ~]# npm install -g -registry=https://registry.npm.taobao.org @angular/cli
[root@flink01 ~]# ng --version

     _                      _                 ____ _     ___
    / \   _ __   __ _ _   _| | __ _ _ __     / ___| |   |_ _|
   / △ \ | '_ \ / _` | | | | |/ _` | '__|   | |   | |    | |
  / ___ \| | | | (_| | |_| | | (_| | |      | |___| |___ | |
 /_/   \_\_| |_|\__, |\__,_|_|\__,_|_|       \____|_____|___|
                |___/

Angular CLI: 10.1.3
Node: 12.18.4
OS: linux x64

Angular: 
... 
Ivy Workspace: 

Package                      Version
------------------------------------------------------
@angular-devkit/architect    0.1001.3 (cli-only)
@angular-devkit/core         10.1.3 (cli-only)
@angular-devkit/schematics   10.1.3 (cli-only)
@schematics/angular          10.1.3 (cli-only)
@schematics/update           0.1001.3 (cli-only)

[root@flink01 ~]#

而後須要在Maven的配置文件中,配置以下兩個倉庫,cloudera倉庫用於下載cdh發行版的Hadoop依賴:git

<mirrors>
    <!-- 配置阿里雲的中央鏡像倉庫 -->
    <mirror>
      <id>nexus-aliyun</id>
      <mirrorOf>central</mirrorOf>
      <name>Nexus aliyun</name>
      <url>http://maven.aliyun.com/nexus/content/groups/public</url>
    </mirror>
  </mirrors>

...

  <profiles>
    <!-- 經過profile配置cloudera倉庫 -->
    <profile>
      <repositories>
        <repository>
          <id>cloudera</id>
          <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
          <releases>
            <enabled>true</enabled>
          </releases>
          <snapshots>
            <enabled>false</enabled>
          </snapshots>
        </repository>
      </repositories>
    </profile>
  </profiles>

  <!-- 激活profile -->
  <activeProfiles>
    <activeProfile>cloudera-profile</activeProfile>
  </activeProfiles>

源碼編譯

Flink下載地址:github

安裝編譯源碼可能會用到的工具:web

[root@flink01 ~]# yum install -y cmake3 git gcc-c++ ncurses-devel perl-Data-Dumper boost boost-doc boost-devel bzip2 openssl-devel libtirpc-devel.x86_64

打開下載頁面,複製Flink源碼包的下載地址,而後到Linux上經過wget命令進行下載:sql

[root@flink01 ~]# cd /usr/local/src
[root@flink01 /usr/local/src]# wget https://github.com/apache/flink/archive/release-1.11.2.tar.gz

解壓下載好的源碼包:

[root@flink01 /usr/local/src]# tar -zxvf flink-release-1.11.2.tar.gz
[root@flink01 /usr/local/src]# cd flink-release-1.11.2

因爲flink-runtime-webweb-dashboard模塊用到了NodeJS,在編譯的過程當中須要下載一些依賴的包,但默認的NodeJS倉庫在國內幾乎沒法使用,因此須要更換爲淘寶的NodeJS倉庫,編輯pom.xml文件:

[root@flink01 /usr/local/src/flink-release-1.11.2]# vim flink-runtime-web/pom.xml

npm install 部分的arguments標籤的內容由:

<arguments>ci --cache-max=0 --no-save</arguments>

改成:

<arguments>install -registry=https://registry.npm.taobao.org --cache-max=0 --no-save</arguments>

而後就可使用Maven編譯源碼文件了:

[root@flink01 /usr/local/src/flink-release-1.11.2]# mvn clean install -DskipTests -Dhadoop.version=2.6.0-cdh5.15.1 -Dfast

但我這編譯flink-runtime-web模塊的時候報錯了,錯誤提示以下:

[ERROR] Node.js version v10.9.0 detected.
[ERROR] The Angular CLI requires a minimum Node.js version of either v10.13 or v12.0.
[ERROR] 
[ERROR] Please update your Node.js version or visit https://nodejs.org/ for additional instructions.

錯誤緣由很明顯是NodeJS的版本過低了,由於flink-runtime-web/pom.xml文件中定義了使用v10.9.0這個版本的NodeJS,並無使用咱們本身安裝好的,因而打開該文件,找到以下標籤,修改一下版本號便可,我這裏採用v10.13.0:

<nodeVersion>v10.13.0</nodeVersion>

而後從新進行編譯:

[root@flink01 /usr/local/src/flink-release-1.11.2]# mvn clean install -DskipTests -Dhadoop.version=2.6.0-cdh5.15.1 -Dfast

再次編譯的過程當中可能會輸出了以下錯誤信息,可是編譯仍然能夠繼續,而且最終的狀態也是成功的。因此能夠不用管:

[ERROR] Browserslist: caniuse-lite is outdated. Please run next command `npm update`

編譯成功,會輸出以下內容:

[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary for flink 1.11.2:
[INFO] 
[INFO] force-shading ...................................... SUCCESS [  0.721 s]
[INFO] flink .............................................. SUCCESS [  0.581 s]
[INFO] flink-annotations .................................. SUCCESS [  0.627 s]
[INFO] flink-test-utils-parent ............................ SUCCESS [  0.033 s]
[INFO] flink-test-utils-junit ............................. SUCCESS [  0.646 s]
[INFO] flink-metrics ...................................... SUCCESS [  0.032 s]
[INFO] flink-metrics-core ................................. SUCCESS [  0.360 s]
[INFO] flink-core ......................................... SUCCESS [  7.062 s]
[INFO] flink-java ......................................... SUCCESS [  1.520 s]
[INFO] flink-queryable-state .............................. SUCCESS [  0.025 s]
[INFO] flink-queryable-state-client-java .................. SUCCESS [  0.303 s]
[INFO] flink-filesystems .................................. SUCCESS [  0.023 s]
[INFO] flink-hadoop-fs .................................... SUCCESS [  1.031 s]
[INFO] flink-runtime ...................................... SUCCESS [ 24.936 s]
[INFO] flink-scala ........................................ SUCCESS [ 25.682 s]
[INFO] flink-mapr-fs ...................................... SUCCESS [  0.457 s]
[INFO] flink-filesystems :: flink-fs-hadoop-shaded ........ SUCCESS [  2.114 s]
[INFO] flink-s3-fs-base ................................... SUCCESS [  0.424 s]
[INFO] flink-s3-fs-hadoop ................................. SUCCESS [  3.012 s]
[INFO] flink-s3-fs-presto ................................. SUCCESS [  4.794 s]
[INFO] flink-swift-fs-hadoop .............................. SUCCESS [ 12.921 s]
[INFO] flink-oss-fs-hadoop ................................ SUCCESS [  3.700 s]
[INFO] flink-azure-fs-hadoop .............................. SUCCESS [ 15.227 s]
[INFO] flink-optimizer .................................... SUCCESS [  1.171 s]
[INFO] flink-streaming-java ............................... SUCCESS [  4.635 s]
[INFO] flink-clients ...................................... SUCCESS [  0.939 s]
[INFO] flink-test-utils ................................... SUCCESS [  0.634 s]
[INFO] flink-runtime-web .................................. SUCCESS [ 48.675 s]
[INFO] flink-examples ..................................... SUCCESS [  0.043 s]
[INFO] flink-examples-batch ............................... SUCCESS [  9.319 s]
[INFO] flink-connectors ................................... SUCCESS [  0.035 s]
[INFO] flink-hadoop-compatibility ......................... SUCCESS [  5.029 s]
[INFO] flink-state-backends ............................... SUCCESS [  0.018 s]
[INFO] flink-statebackend-rocksdb ......................... SUCCESS [  0.628 s]
[INFO] flink-tests ........................................ SUCCESS [ 22.051 s]
[INFO] flink-streaming-scala .............................. SUCCESS [ 23.293 s]
[INFO] flink-hcatalog ..................................... SUCCESS [  5.332 s]
[INFO] flink-table ........................................ SUCCESS [  0.019 s]
[INFO] flink-table-common ................................. SUCCESS [  1.505 s]
[INFO] flink-table-api-java ............................... SUCCESS [  0.820 s]
[INFO] flink-table-api-java-bridge ........................ SUCCESS [  0.393 s]
[INFO] flink-table-api-scala .............................. SUCCESS [ 10.990 s]
[INFO] flink-table-api-scala-bridge ....................... SUCCESS [  9.643 s]
[INFO] flink-sql-parser ................................... SUCCESS [ 17.153 s]
[INFO] flink-libraries .................................... SUCCESS [  0.018 s]
[INFO] flink-cep .......................................... SUCCESS [  1.447 s]
[INFO] flink-table-planner ................................ SUCCESS [01:12 min]
[INFO] flink-sql-parser-hive .............................. SUCCESS [  1.524 s]
[INFO] flink-table-runtime-blink .......................... SUCCESS [  2.073 s]
[INFO] flink-table-planner-blink .......................... SUCCESS [01:30 min]
[INFO] flink-metrics-jmx .................................. SUCCESS [  0.262 s]
[INFO] flink-formats ...................................... SUCCESS [  0.020 s]
[INFO] flink-json ......................................... SUCCESS [  0.500 s]
[INFO] flink-connector-kafka-base ......................... SUCCESS [  0.983 s]
[INFO] flink-avro ......................................... SUCCESS [  1.600 s]
[INFO] flink-csv .......................................... SUCCESS [  0.520 s]
[INFO] flink-connector-kafka-0.10 ......................... SUCCESS [  0.753 s]
[INFO] flink-connector-kafka-0.11 ......................... SUCCESS [  0.652 s]
[INFO] flink-connector-elasticsearch-base ................. SUCCESS [  0.807 s]
[INFO] flink-connector-elasticsearch5 ..................... SUCCESS [  8.900 s]
[INFO] flink-connector-elasticsearch6 ..................... SUCCESS [  0.691 s]
[INFO] flink-connector-elasticsearch7 ..................... SUCCESS [  0.702 s]
[INFO] flink-connector-hbase .............................. SUCCESS [  1.758 s]
[INFO] flink-hadoop-bulk .................................. SUCCESS [  0.576 s]
[INFO] flink-orc .......................................... SUCCESS [  0.828 s]
[INFO] flink-orc-nohive ................................... SUCCESS [  0.445 s]
[INFO] flink-parquet ...................................... SUCCESS [  0.992 s]
[INFO] flink-connector-hive ............................... SUCCESS [  2.614 s]
[INFO] flink-connector-jdbc ............................... SUCCESS [  0.857 s]
[INFO] flink-connector-rabbitmq ........................... SUCCESS [  0.256 s]
[INFO] flink-connector-twitter ............................ SUCCESS [  1.220 s]
[INFO] flink-connector-nifi ............................... SUCCESS [  0.309 s]
[INFO] flink-connector-cassandra .......................... SUCCESS [  2.280 s]
[INFO] flink-connector-filesystem ......................... SUCCESS [  0.742 s]
[INFO] flink-connector-kafka .............................. SUCCESS [  0.773 s]
[INFO] flink-connector-gcp-pubsub ......................... SUCCESS [ 50.078 s]
[INFO] flink-connector-kinesis ............................ SUCCESS [  5.358 s]
[INFO] flink-sql-connector-elasticsearch7 ................. SUCCESS [  4.625 s]
[INFO] flink-connector-base ............................... SUCCESS [  0.302 s]
[INFO] flink-sql-connector-elasticsearch6 ................. SUCCESS [  3.658 s]
[INFO] flink-sql-connector-kafka-0.10 ..................... SUCCESS [  0.236 s]
[INFO] flink-sql-connector-kafka-0.11 ..................... SUCCESS [  0.299 s]
[INFO] flink-sql-connector-kafka .......................... SUCCESS [  0.603 s]
[INFO] flink-sql-connector-hive-1.2.2 ..................... SUCCESS [  2.527 s]
[INFO] flink-sql-connector-hive-2.2.0 ..................... SUCCESS [  3.090 s]
[INFO] flink-sql-connector-hive-2.3.6 ..................... SUCCESS [  2.966 s]
[INFO] flink-sql-connector-hive-3.1.2 ..................... SUCCESS [  3.828 s]
[INFO] flink-avro-confluent-registry ...................... SUCCESS [ 24.666 s]
[INFO] flink-sequence-file ................................ SUCCESS [  0.397 s]
[INFO] flink-compress ..................................... SUCCESS [  0.393 s]
[INFO] flink-sql-orc ...................................... SUCCESS [  0.196 s]
[INFO] flink-sql-parquet .................................. SUCCESS [  0.352 s]
[INFO] flink-examples-streaming ........................... SUCCESS [ 21.793 s]
[INFO] flink-examples-table ............................... SUCCESS [  6.387 s]
[INFO] flink-examples-build-helper ........................ SUCCESS [  0.041 s]
[INFO] flink-examples-streaming-twitter ................... SUCCESS [  0.332 s]
[INFO] flink-examples-streaming-state-machine ............. SUCCESS [  0.319 s]
[INFO] flink-examples-streaming-gcp-pubsub ................ SUCCESS [  7.588 s]
[INFO] flink-container .................................... SUCCESS [  0.216 s]
[INFO] flink-queryable-state-runtime ...................... SUCCESS [  0.430 s]
[INFO] flink-mesos ........................................ SUCCESS [ 22.759 s]
[INFO] flink-kubernetes ................................... SUCCESS [01:55 min]
[INFO] flink-yarn ......................................... SUCCESS [  1.131 s]
[INFO] flink-gelly ........................................ SUCCESS [  1.344 s]
[INFO] flink-gelly-scala .................................. SUCCESS [ 13.956 s]
[INFO] flink-gelly-examples ............................... SUCCESS [ 11.946 s]
[INFO] flink-external-resources ........................... SUCCESS [  0.017 s]
[INFO] flink-external-resource-gpu ........................ SUCCESS [  0.154 s]
[INFO] flink-metrics-dropwizard ........................... SUCCESS [  5.900 s]
[INFO] flink-metrics-graphite ............................. SUCCESS [  3.591 s]
[INFO] flink-metrics-influxdb ............................. SUCCESS [01:53 min]
[INFO] flink-metrics-prometheus ........................... SUCCESS [ 44.165 s]
[INFO] flink-metrics-statsd ............................... SUCCESS [  0.156 s]
[INFO] flink-metrics-datadog .............................. SUCCESS [  0.158 s]
[INFO] flink-metrics-slf4j ................................ SUCCESS [  0.151 s]
[INFO] flink-cep-scala .................................... SUCCESS [  8.664 s]
[INFO] flink-table-uber ................................... SUCCESS [  3.683 s]
[INFO] flink-table-uber-blink ............................. SUCCESS [  4.093 s]
[INFO] flink-python ....................................... SUCCESS [01:53 min]
[INFO] flink-sql-client ................................... SUCCESS [  8.511 s]
[INFO] flink-state-processor-api .......................... SUCCESS [  0.590 s]
[INFO] flink-ml-parent .................................... SUCCESS [  0.018 s]
[INFO] flink-ml-api ....................................... SUCCESS [  0.159 s]
[INFO] flink-ml-lib ....................................... SUCCESS [  8.357 s]
[INFO] flink-ml-uber ...................................... SUCCESS [  0.076 s]
[INFO] flink-scala-shell .................................. SUCCESS [  9.027 s]
[INFO] flink-dist ......................................... SUCCESS [01:08 min]
[INFO] flink-yarn-tests ................................... SUCCESS [ 11.079 s]
[INFO] flink-end-to-end-tests ............................. SUCCESS [ 37.058 s]
[INFO] flink-cli-test ..................................... SUCCESS [  0.164 s]
[INFO] flink-parent-child-classloading-test-program ....... SUCCESS [  0.141 s]
[INFO] flink-parent-child-classloading-test-lib-package ... SUCCESS [  0.089 s]
[INFO] flink-dataset-allround-test ........................ SUCCESS [  0.140 s]
[INFO] flink-dataset-fine-grained-recovery-test ........... SUCCESS [  0.148 s]
[INFO] flink-datastream-allround-test ..................... SUCCESS [  0.745 s]
[INFO] flink-batch-sql-test ............................... SUCCESS [  0.142 s]
[INFO] flink-stream-sql-test .............................. SUCCESS [  0.148 s]
[INFO] flink-bucketing-sink-test .......................... SUCCESS [  0.315 s]
[INFO] flink-distributed-cache-via-blob ................... SUCCESS [  0.139 s]
[INFO] flink-high-parallelism-iterations-test ............. SUCCESS [  4.416 s]
[INFO] flink-stream-stateful-job-upgrade-test ............. SUCCESS [  0.513 s]
[INFO] flink-queryable-state-test ......................... SUCCESS [  0.981 s]
[INFO] flink-local-recovery-and-allocation-test ........... SUCCESS [  0.133 s]
[INFO] flink-elasticsearch5-test .......................... SUCCESS [  3.092 s]
[INFO] flink-elasticsearch6-test .......................... SUCCESS [  1.650 s]
[INFO] flink-quickstart ................................... SUCCESS [  0.263 s]
[INFO] flink-quickstart-java .............................. SUCCESS [ 16.713 s]
[INFO] flink-quickstart-scala ............................. SUCCESS [  0.057 s]
[INFO] flink-quickstart-test .............................. SUCCESS [  0.315 s]
[INFO] flink-confluent-schema-registry .................... SUCCESS [  1.014 s]
[INFO] flink-stream-state-ttl-test ........................ SUCCESS [  2.333 s]
[INFO] flink-sql-client-test .............................. SUCCESS [01:01 min]
[INFO] flink-streaming-file-sink-test ..................... SUCCESS [  0.130 s]
[INFO] flink-state-evolution-test ......................... SUCCESS [  0.527 s]
[INFO] flink-rocksdb-state-memory-control-test ............ SUCCESS [  0.495 s]
[INFO] flink-end-to-end-tests-common ...................... SUCCESS [  0.527 s]
[INFO] flink-metrics-availability-test .................... SUCCESS [  0.136 s]
[INFO] flink-metrics-reporter-prometheus-test ............. SUCCESS [  0.156 s]
[INFO] flink-heavy-deployment-stress-test ................. SUCCESS [  4.367 s]
[INFO] flink-connector-gcp-pubsub-emulator-tests .......... SUCCESS [02:09 min]
[INFO] flink-streaming-kafka-test-base .................... SUCCESS [  0.193 s]
[INFO] flink-streaming-kafka-test ......................... SUCCESS [  4.041 s]
[INFO] flink-streaming-kafka011-test ...................... SUCCESS [  3.555 s]
[INFO] flink-streaming-kafka010-test ...................... SUCCESS [  3.540 s]
[INFO] flink-plugins-test ................................. SUCCESS [  0.033 s]
[INFO] dummy-fs ........................................... SUCCESS [  0.084 s]
[INFO] another-dummy-fs ................................... SUCCESS [  0.074 s]
[INFO] flink-tpch-test .................................... SUCCESS [  5.635 s]
[INFO] flink-streaming-kinesis-test ....................... SUCCESS [  6.854 s]
[INFO] flink-elasticsearch7-test .......................... SUCCESS [  1.939 s]
[INFO] flink-end-to-end-tests-common-kafka ................ SUCCESS [  0.539 s]
[INFO] flink-tpcds-test ................................... SUCCESS [  0.345 s]
[INFO] flink-netty-shuffle-memory-control-test ............ SUCCESS [  0.144 s]
[INFO] flink-python-test .................................. SUCCESS [  3.675 s]
[INFO] flink-statebackend-heap-spillable .................. SUCCESS [  0.352 s]
[INFO] flink-contrib ...................................... SUCCESS [  0.019 s]
[INFO] flink-connector-wikiedits .......................... SUCCESS [  4.279 s]
[INFO] flink-fs-tests ..................................... SUCCESS [  0.509 s]
[INFO] flink-docs ......................................... SUCCESS [  5.049 s]
[INFO] flink-walkthroughs ................................. SUCCESS [  0.021 s]
[INFO] flink-walkthrough-common ........................... SUCCESS [  0.196 s]
[INFO] flink-walkthrough-datastream-java .................. SUCCESS [  0.053 s]
[INFO] flink-walkthrough-datastream-scala ................. SUCCESS [  0.050 s]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  24:47 min
[INFO] Finished at: 2020-09-29T01:20:10+08:00
[INFO] ------------------------------------------------------------------------

而且會生成一個目錄,目錄結構以下:

[root@flink01 /usr/local/src/flink-release-1.11.2]# ls flink-dist/target/flink-1.11.2-bin/flink-1.11.2/
bin  conf  examples  lib  LICENSE  log  opt  plugins  README.txt
[root@flink01 /usr/local/src/flink-release-1.11.2]#

單機模式部署及代碼提交測試

單機模式部署

首先配置一下hosts,將主機名與本地ip創建一個映射關係:

[root@flink01 ~]# vim /etc/hosts
192.168.243.148   flink01

Flink單機模式部署很是簡單,只須要將以前編譯生成的目錄拷貝出來:

[root@flink01 /usr/local/src/flink-release-1.11.2]# cp -r flink-dist/target/flink-1.11.2-bin/flink-1.11.2/ /usr/local/flink

而後使用以下命令就能夠啓動Flink了:

[root@flink01 /usr/local/flink]# ./bin/start-cluster.sh 
Starting cluster.
Starting standalonesession daemon on host flink01.
Starting taskexecutor daemon on host flink01.
[root@flink01 /usr/local/flink]# jps  # 啓動成功會有以下Java進程
2755 Jps
2389 StandaloneSessionClusterEntrypoint
2733 TaskManagerRunner
[root@flink01 /usr/local/flink]#

與啓動命令相對的中止命令以下:

$ ./bin/stop-cluster.sh

日誌文件在log目錄下,若是啓動失敗能夠經過查看日誌文件來排查問題:

[root@flink01 /usr/local/flink]# ls log/
flink-root-standalonesession-0-flink01.log  flink-root-standalonesession-0-flink01.out  flink-root-taskexecutor-0-flink01.log  flink-root-taskexecutor-0-flink01.out
[root@flink01 /usr/local/flink]#

經過瀏覽器訪問機器ip + 8081端口能夠打開Flink的web界面控制檯:
Flink部署及做業提交(On Flink Standalone)

在側邊菜單欄中能夠看到以下選項:
Flink部署及做業提交(On Flink Standalone)

  • Overview:查看總體概覽
  • Running Jobs:查看運行中的做業
  • Completed Jobs:查看已經完成的做業
  • TaskManager:查看TaskManager的系統信息
  • JobManager:查看JobManager的配置及日誌信息
  • Submit New Job:能夠在該頁面中提交做業

Flink的總體架構圖以下:
Flink部署及做業提交(On Flink Standalone)

Flink 整個系統主要由兩個組件組成,分別爲 JobManager 和 TaskManager,Flink 架構也遵循 Master - Slave 架構設計原則,JobManager 爲 Master 節點,TaskManager 爲 Worker (Slave)節點,TaskManager 能夠部署多個。其中,Flink Program是咱們使用Flink框架編寫的程序,是 TaskManager 具體要執行的任務,任務經過Client提交到集羣中。

Client 客戶端

Client負責將任務提交到集羣,與 JobManager 構建 Akka 鏈接,而後將任務提交到 JobManager,經過和 JobManager 之間進行交互獲取任務執行狀態。

Client提交任務能夠採用 CLI 方式或者經過使用 Flink WebUI 提交(菜單欄中的 Submit New Job),也能夠在應用程序中指定 JobManager 的 RPC 網絡端口構建 ExecutionEnvironment 來提交 Flink 應用。

JobManager

JobManager 負責整個 Flink 集羣任務的調度以及資源的管理,從客戶端中獲取提交的應用,而後根據集羣中 TaskManager 上 TaskSlot 的使用狀況,爲提交的應用分配相應的 TaskSlot 資源並命令 TaskManager 啓動從客戶端中獲取的應用。

JobManager 至關於整個集羣的 Master 節點,且整個集羣有且只有一個活躍的 JobManager ,負責整個集羣的任務管理和資源管理。

JobManager 和 TaskManager 之間經過 Actor System 進行通訊,獲取任務執行的狀況並經過 Actor System 將應用的任務執行狀況發送給客戶端。

同時在任務執行的過程當中,Flink JobManager 會觸發 Checkpoint 操做,每一個 TaskManager 節點 收到 Checkpoint 觸發指令後,完成 Checkpoint 操做,全部的 Checkpoint 協調過程都是在 Fink JobManager 中完成。

當任務完成後,Flink 會將任務執行的信息反饋給客戶端,而且釋放掉 TaskManager 中的資源以供下一次提交任務使用。

TaskManager

TaskManager 至關於整個集羣的 Slave 節點,負責具體的任務執行和對應任務在每一個節點上的資源申請和管理。

客戶端經過將編寫好的 Flink 應用編譯打包,提交到 JobManager,而後 JobManager 會根據已註冊在 JobManager 中 TaskManager 的資源狀況,將任務分配給有資源的 TaskManager節點,而後啓動並運行任務。

TaskManager 從 JobManager 接收須要部署的任務,而後使用 Slot 資源啓動 Task,創建數據接入的網絡鏈接,接收數據並開始數據處理。同時 TaskManager 之間的數據交互都是經過數據流的方式進行的。

能夠看出,Flink 的任務運行實際上是採用多線程的方式,這和 MapReduce 多 JVM 進行的方式有很大的區別,Flink 可以極大提升 CPU 使用效率,在多個任務和 Task 之間經過 TaskSlot 方式共享系統資源,每一個 TaskManager 中經過管理多個 TaskSlot 資源池進行對資源進行有效管理。


代碼提交測試

將Flink部署完成並瞭解了Flink的基本組件概念後,咱們能夠將Flink自帶的一些示例代碼提交到集羣中測試是否能正常運行。示例代碼的目錄以下:

[root@flink01 /usr/local/flink]# ls examples/
batch  gelly  python  streaming  table
[root@flink01 /usr/local/flink]# ls examples/streaming/
IncrementalLearning.jar  Iteration.jar  SessionWindowing.jar  SocketWindowWordCount.jar  StateMachineExample.jar  TopSpeedWindowing.jar  Twitter.jar  WindowJoin.jar  WordCount.jar
[root@flink01 /usr/local/flink]#

我這裏採用examples/streaming/SocketWindowWordCount.jar做爲測試,該示例代碼用於讀取Socket流並按照分隔符分隔單詞,完成詞頻統計的功能。爲了可以模擬Socket流,咱們須要安裝一下netcat工具,安裝命令以下:

$ yum install -y nc

使用nc命令啓動一個Socket監聽9999端口,一會咱們就能夠經過這個Socket寫入數據:

$ nc -lk 9999

而後將示例代碼提交到Flink中運行:

[root@flink01 /usr/local/flink]# ./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9999
Job has been submitted with JobID c90a28408eae654a143745903cbaa3eb

代碼提交成功後,此時在界面上就能夠看到有一個Job正在運行中:
Flink部署及做業提交(On Flink Standalone)

點進去能夠查看詳細信息:
Flink部署及做業提交(On Flink Standalone)

nc命令建立的Socket中寫入一些數據:

[root@flink01 ~]# nc -lk 9999
a b c a a b b d d c
hello world
flink spark spark flink

在以下文件中能夠看到詞頻統計後的輸出結果:

[root@flink01 /usr/local/flink]# cat log/flink-root-taskexecutor-0-flink01.out
a : 3
spark : 2
flink : 2
world : 1
hello : 1
d : 2
c : 2
b : 3
[root@flink01 /usr/local/flink]#

到此爲止咱們就測試完了,此時咱們要怎麼中止這個任務呢?建議不要直接Ctrl + c,能夠到web界面上點擊「Cancel Job」就可讓Job中止運行:
Flink部署及做業提交(On Flink Standalone)


Flink Standalone模式部署

官方文檔:

上一小節演示了Flink的單機模式部署,但在生產環境咱們每每都是須要分佈式部署的,而Flink也提供了Standalone模式部署,即獨立集羣。Flink Standalone模式的拓撲圖:
Flink部署及做業提交(On Flink Standalone)

爲了演示Standalone分佈式模式的部署,至少須要兩臺機器,因此我這裏新增一臺hostnameflink02 的機器。目前的機器配置以下:
IP Hostname 角色
192.168.243.148 flink01 master(JobManager) / worker(TaskManager)
192.168.243.150 flink02 worker(TaskManager)
  • Tips:新增的 flink02 也須要具有Java運行環境

系統配置(全部節點)

配置hosts,將主機名與本地ip創建一個映射關係,使全部節點之間能夠經過hostname互相訪問:

$ vim /etc/hosts
192.168.243.148   flink01
192.168.243.150   flink02

關閉防火牆:

$ systemctl stop firewalld && systemctl disable firewalld

配置全部節點之間的免密登陸:

[root@flink01 ~]# ssh-keygen -t rsa      # 生成密鑰對
[root@flink01 ~]# ssh-copy-id flink01    # 拷貝公鑰並追加到本身的受權列表文件中
[root@flink01 ~]# ssh-copy-id flink02    # 拷貝公鑰並追加到flink02的受權列表文件中
  • flink02 上也重複一樣的操做,這裏就不重複演示了

而後測試一下可否免密登陸,能夠看到我這裏登陸 flink02 節點不須要輸入密碼:

[root@flink01 ~]# ssh flink02
Last login: Tue Sep 29 14:22:20 2020 from 192.168.243.1
[root@flink02 ~]#

配置 master 節點

flink01 上修改一下配置文件中的幾個配置項:

[root@flink01 /usr/local/flink]# vim conf/flink-conf.yaml
jobmanager.rpc.address: flink01
jobmanager.memory.process.size: 1024m
taskmanager.memory.process.size: 2048m
taskmanager.numberOfTaskSlots: 2
parallelism.default: 1
io.tmp.dirs: /usr/local/flink/tmp_data

建立臨時目錄:

[root@flink01 /usr/local/flink]# mkdir tmp_data

簡單說明下這幾個參數:

  • jobmanager.rpc.address:指定master節點的ip地址或hostname
  • jobmanager.memory.process.size:指定JobManager節點可用的內存
  • taskmanager.memory.process.size:指定TaskManager節點可用的內存
  • taskmanager.numberOfTaskSlots:指定每臺機器可用的CPU核心數
  • parallelism.default:集羣中的CPU總數,也就是任務的並行度
  • io.tmp.dirs:TaskManager的臨時數據存儲目錄
  • 有關配置參數的更多內容能夠參考官方文檔:Configuration

而後還須要配置 worker 節點的IP或hostname:

[root@flink01 /usr/local/flink]# vim conf/workers
flink01
flink02

重啓服務:

[root@flink01 /usr/local/flink]# ./bin/stop-cluster.sh
[root@flink01 /usr/local/flink]# ./bin/start-cluster.sh

配置 worker 節點

flink 目錄拷貝到 flink02 上,在 flink02 上執行以下命令:

[root@flink02 ~]# scp -r flink01:/usr/local/flink /usr/local/flink

建立臨時目錄:

[root@flink02 ~]# cd /usr/local/flink/
[root@flink02 /usr/local/flink]# mkdir tmp_data

啓動TaskManager服務:

[root@flink02 /usr/local/flink]# ./bin/taskmanager.sh start
Starting taskexecutor daemon on host flink02.
[root@flink02 /usr/local/flink]# jps
4757 Jps
4701 TaskManagerRunner
[root@flink02 /usr/local/flink]#

此時在dashboard上就能夠看到TaskManager節點數量爲2了:
Flink部署及做業提交(On Flink Standalone)

在「Task Manager」頁面中也能夠看到兩個節點的信息:
Flink部署及做業提交(On Flink Standalone)

若是須要新增更多的TaskManager節點,也是按照這種方式添加就能夠了,很是簡單。接下來咱們測試一下提交任務到集羣中是否可以正常運行。先使用nc命令建立一個Socket並寫入一些數據:

[root@flink01 ~]# nc -lk 9999
a b c a a b b d d c
hello world
flink spark spark flink

而後提交任務:

[root@flink01 /usr/local/flink]# ./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9999
Job has been submitted with JobID 641d5e7e0bd572ba4114ea5e69b8644c

在以下文件中能夠看到詞頻統計後的輸出結果,表明任務是可以正常運行在Flink的Standalone模式上的:

[root@flink01 /usr/local/flink]# cat log/flink-root-taskexecutor-1-flink01.out
a : 3
spark : 2
flink : 2
world : 1
hello : 1
d : 2
c : 2
b : 3
[root@flink01 /usr/local/flink]#
相關文章
相關標籤/搜索