Flink(二)CentOS7.5搭建Flink1.6.1分佈式集羣

一. Flink的下載

安裝包下載地址:http://flink.apache.org/downloads.html  ,選擇對應Hadoop的Flink版本下載html

[admin@node21 software]$ wget http://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.6.1/flink-1.6.1-bin-hadoop27-scala_2.11.tgz
[admin@node21 software]$ ll
-rw-rw-r-- 1 admin admin 301867081 Sep 15 15:47 flink-1.6.1-bin-hadoop27-scala_2.11.tgz

Flink 有三種部署模式,分別是 Local、Standalone Cluster 和 Yarn Cluster。java

二. Local模式

對於 Local 模式來講,JobManager 和 TaskManager 會公用一個 JVM 來完成 Workload。若是要驗證一個簡單的應用,Local 模式是最方便的。實際應用中大多使用 Standalone 或者 Yarn Cluster,而local模式只是將安裝包解壓啓動(./bin/start-local.sh)便可,在這裏不在演示。node

三. Standalone 模式

快速入門教程地址:https://ci.apache.org/projects/flink/flink-docs-release-1.6/quickstart/setup_quickstart.htmlweb

1.  軟件要求

  • Java 1.8.x或更高版本,
  • ssh(必須運行sshd才能使用管理遠程組件的Flink腳本)

集羣部署規劃sql

節點名稱  master worker zookeeper
node21  master   zookeeper
node22  master  worker zookeeper
node23    worker zookeeper

2. 解壓

[admin@node21 software]$ tar zxvf flink-1.6.1-bin-hadoop27-scala_2.11.tgz -C /opt/module/
[admin@node21 software]$ cd /opt/module/
[admin@node21 module]$ ll
drwxr-xr-x 8 admin admin 125 Sep 15 04:47 flink-1.6.1

3. 修改配置文件

[admin@node21 conf]$ ls
flink-conf.yaml       log4j-console.properties  log4j-yarn-session.properties  logback.xml       masters  sql-client-defaults.yaml
log4j-cli.properties  log4j.properties          logback-console.xml            logback-yarn.xml  slaves   zoo.cfg

修改flink/conf/masters,slaves,flink-conf.yamlapache

[admin@node21 conf]$ sudo vi masters
node21:8081
[admin@node21 conf]$ sudo vi slaves
node22
node23
[admin@node21 conf]$ sudo vi flink-conf.yaml 
taskmanager.numberOfTaskSlots:2
jobmanager.rpc.address: node21

可選配置:api

  • 每一個JobManager(jobmanager.heap.mb的可用內存量
  • 每一個TaskManager(taskmanager.heap.mb的可用內存量
  • 每臺機器的可用CPU數量(taskmanager.numberOfTaskSlots),
  • 集羣中的CPU總數(parallelism.default)和
  • 臨時目錄(taskmanager.tmp.dirs

4. 拷貝安裝包到各節點

[admin@node21 module]$ scp -r flink-1.6.1/ admin@node22:`pwd`
[admin@node21 module]$ scp -r flink-1.6.1/ admin@node23:`pwd`

5. 配置環境變量

配置全部節點Flink的環境變量bash

[admin@node21 flink-1.6.1]$ sudo vi /etc/profile
export FLINK_HOME=/opt/module/flink-1.6.1
export PATH=$PATH:$FLINK_HOME/bin
[admin@node21 flink-1.6.1]$ source /etc/profile

6. 啓動flink

[admin@node21 flink-1.6.1]$ ./bin/start-cluster.sh 
Starting cluster.
Starting standalonesession daemon on host node21.
Starting taskexecutor daemon on host node22.
Starting taskexecutor daemon on host node23.

jps查看進程服務器

7.  WebUI查看

http://node21:8081session

8. Flink 的 HA

首先,咱們須要知道 Flink 有兩種部署的模式,分別是 Standalone 以及 Yarn Cluster 模式。對於 Standalone 來講,Flink 必須依賴於 Zookeeper 來實現 JobManager 的 HA(Zookeeper 已經成爲了大部分開源框架 HA 必不可少的模塊)。在 Zookeeper 的幫助下,一個 Standalone 的 Flink 集羣會同時有多個活着的 JobManager,其中只有一個處於工做狀態,其餘處於 Standby 狀態。當工做中的 JobManager 失去鏈接後(如宕機或 Crash),Zookeeper 會從 Standby 中選舉新的 JobManager 來接管 Flink 集羣。

對於 Yarn Cluaster 模式來講,Flink 就要依靠 Yarn 自己來對 JobManager 作 HA 了。其實這裏徹底是 Yarn 的機制。對於 Yarn Cluster 模式來講,JobManager 和 TaskManager 都是被 Yarn 啓動在 Yarn 的 Container 中。此時的 JobManager,其實應該稱之爲 Flink Application Master。也就說它的故障恢復,就徹底依靠着 Yarn 中的 ResourceManager(和 MapReduce 的 AppMaster 同樣)。因爲徹底依賴了 Yarn,所以不一樣版本的 Yarn 可能會有細微的差別。這裏再也不作深究。

1) 修改配置文件

修改flink-conf.yaml,HA模式下,jobmanager不須要指定,在master file中配置,由zookeeper選出leader與standby。

#jobmanager.rpc.address: node21
high-availability:zookeeper                             #指定高可用模式(必須)
high-availability.zookeeper.quorum:node21:2181,node22:2181,node23:2181  #ZooKeeper仲裁是ZooKeeper服務器的複製組,它提供分佈式協調服務(必須)
high-availability.storageDir:hdfs:///flink/ha/       #JobManager元數據保存在文件系統storageDir中,只有指向此狀態的指針存儲在ZooKeeper中(必須)
high-availability.zookeeper.path.root:/flink         #根ZooKeeper節點,在該節點下放置全部集羣節點(推薦) 
high-availability.cluster-id:/flinkCluster           #自定義集羣(推薦)
state.backend: filesystem
state.checkpoints.dir: hdfs:///flink/checkpoints
state.savepoints.dir: hdfs:///flink/checkpoints

修改conf/zoo.cfg

server.1=node21:2888:3888
server.2=node22:2888:3888
server.3=node23:2888:3888

修改conf/masters

node21:8081
node22:8081

修改slaves

node22
node23

同步配置文件conf到各節點

2) 啓動HA

先啓動zookeeper集羣各節點(測試環境中也能夠用Flink自帶的start-zookeeper-quorum.sh),啓動dfs ,再啓動flink

[admin@node21 flink-1.6.1]$ start-cluster.sh 

WebUI查看,這是會自動產生一個主Master,以下

3) 驗證HA

手動殺死node22上的master,此時,node21上的備用master轉爲主mater。

4)手動將JobManager / TaskManager實例添加到羣集

您可使用bin/jobmanager.shbin/taskmanager.sh腳本將JobManager和TaskManager實例添加到正在運行的集羣中

添加JobManager

bin/jobmanager.sh ((start|start-foreground) [host] [webui-port])|stop|stop-all

添加TaskManager

bin/taskmanager.sh start|start-foreground|stop|stop-all

[admin@node22 flink-1.6.1]$ jobmanager.sh start node22

新添加的爲從master。

9. 運行測試任務

[admin@node21 flink-1.6.1]$ flink run -m node21:8081 ./examples/batch/WordCount.jar --input /opt/wcinput/wc.txt --output /opt/wcoutput/
[admin@node21 flink-1.6.1]$ flink run -m node21:8081 ./examples/batch/WordCount.jar --input hdfs:///user/admin/input/wc.txt --output hdfs:///user/admin/output2

四. Yarn Cluster模式

1. 引入

在一個企業中,爲了最大化的利用集羣資源,通常都會在一個集羣中同時運行多種類型的 Workload。所以 Flink 也支持在 Yarn 上面運行。首先,讓咱們經過下圖瞭解下 Yarn 和 Flink 的關係。

在圖中能夠看出,Flink 與 Yarn 的關係與 MapReduce 和 Yarn 的關係是同樣的。Flink 經過 Yarn 的接口實現了本身的 App Master。當在 Yarn 中部署了 Flink,Yarn 就會用本身的 Container 來啓動 Flink 的 JobManager(也就是 App Master)和 TaskManager。

啓動新的Flink YARN會話時,客戶端首先檢查所請求的資源(容器和內存)是否可用。以後,它將包含Flink和配置的jar上傳到HDFS(步驟1)。

客戶端的下一步是請求(步驟2)YARN容器以啓動ApplicationMaster(步驟3)。因爲客戶端將配置和jar文件註冊爲容器的資源,所以在該特定機器上運行的YARN的NodeManager將負責準備容器(例如,下載文件)。完成後,將啓動ApplicationMaster(AM)。

JobManager和AM在同一容器中運行。一旦它們成功啓動,AM就知道JobManager(它本身的主機)的地址。它正在爲TaskManagers生成一個新的Flink配置文件(以便它們能夠鏈接到JobManager)。該文件也上傳到HDFS。此外,AM容器還提供Flink的Web界面。YARN代碼分配的全部端口都是臨時端口這容許用戶並行執行多個Flink YARN會話。

以後,AM開始爲Flink的TaskManagers分配容器,這將從HDFS下載jar文件和修改後的配置。完成這些步驟後,便可創建Flink並準備接受做業。

2. 修改環境變量

export  HADOOP_CONF_DIR= /opt/module/hadoop-2.7.6/etc/hadoop

3. 部署啓動 

[admin@node21 flink-1.6.1]$ yarn-session.sh -d -s 2 -tm 800 -n 2
-n : TaskManager的數量,至關於executor的數量
-s : 每一個JobManager的core的數量,executor-cores。建議將slot的數量設置每臺機器的處理器數量
-tm : 每一個TaskManager的內存大小,executor-memory
-jm : JobManager的內存大小,driver-memory

上面的命令的意思是,同時向Yarn申請3個container,其中 2 個 Container 啓動 TaskManager(-n 2),每一個 TaskManager 擁有兩個 Task Slot(-s 2),而且向每一個 TaskManager 的 Container 申請 800M 的內存,以及一個ApplicationMaster(Job Manager)。

Flink部署到Yarn Cluster後,會顯示Job Manager的鏈接細節信息。

Flink on Yarn會覆蓋下面幾個參數,若是不但願改變配置文件中的參數,能夠動態的經過-D選項指定,如 -Dfs.overwrite-files=true -Dtaskmanager.network.numberOfBuffers=16368

jobmanager.rpc.address:由於JobManager會常常分配到不一樣的機器上

taskmanager.tmp.dirs:使用Yarn提供的tmp目錄

parallelism.default:若是有指定slot個數的狀況下

yarn-session.sh會掛起進程,因此能夠經過在終端使用CTRL+C或輸入stop中止yarn-session。

若是不但願Flink Yarn client長期運行,Flink提供了一種detached YARN session,啓動時候加上參數-d或—detached

在上面的命令成功後,咱們就能夠在 Yarn Application 頁面看到 Flink 的紀錄。以下圖。

若是在虛擬機中測試,可能會遇到錯誤。這裏須要注意內存的大小,Flink 向 Yarn 會申請多個 Container,可是 Yarn 的配置可能限制了 Container 所能申請的內存大小,甚至 Yarn 自己所管理的內存就很小。這樣極可能沒法正常啓動 TaskManager,尤爲當指定多個 TaskManager 的時候。所以,在啓動 Flink 以後,須要去 Flink 的頁面中檢查下 Flink 的狀態。這裏能夠從 RM 的頁面中,直接跳轉(點擊 Tracking UI)。這時候 Flink 的頁面如圖

yarn-session.sh啓動命令參數以下:

[admin@node21 flink-1.6.1]$ yarn-session.sh --help
Usage:
   Required
     -n,--container <arg>   Number of YARN container to allocate (=Number of Task Managers)
   Optional
     -D <property=value>             use value for given property
     -d,--detached                   If present, runs the job in detached mode
     -h,--help                       Help for the Yarn session CLI.
     -id,--applicationId <arg>       Attach to running YARN session
     -j,--jar <arg>                  Path to Flink jar file
     -jm,--jobManagerMemory <arg>    Memory for JobManager Container with optional unit (default: MB)
     -m,--jobmanager <arg>           Address of the JobManager (master) to which to connect. Use this flag to connect to a different JobManager than the one specified i
n the configuration.     -n,--container <arg>            Number of YARN container to allocate (=Number of Task Managers)
     -nl,--nodeLabel <arg>           Specify YARN node label for the YARN application
     -nm,--name <arg>                Set a custom name for the application on YARN
     -q,--query                      Display available YARN resources (memory, cores)
     -qu,--queue <arg>               Specify YARN queue.
     -s,--slots <arg>                Number of slots per TaskManager
     -st,--streaming                 Start Flink in streaming mode
     -t,--ship <arg>                 Ship files in the specified directory (t for transfer)
     -tm,--taskManagerMemory <arg>   Memory per TaskManager Container with optional unit (default: MB)
     -yd,--yarndetached              If present, runs the job in detached mode (deprecated; use non-YARN specific option instead)
     -z,--zookeeperNamespace <arg>   Namespace to create the Zookeeper sub-paths for high availability mode

4. 提交任務

以後,咱們能夠經過這種方式提交咱們的任務

[admin@node21 flink-1.6.1]$ ./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar --input /opt/wcinput/wc.txt --output /opt/wcoutput/

以上命令在參數前加上y前綴,-yn表示TaskManager個數。

在這個模式下,一樣可使用-m yarn-cluster提交一個"運行後即焚"的detached yarn(-yd)做業到yarn cluster。

5. 中止yarn cluster

yarn application -kill application_1539058959130_0001

6. Yarn模式的HA

應用最大嘗試次數(yarn-site.xml),您必須配置爲嘗試應用的最大數量的設置yarn-site.xml,當前YARN版本的默認值爲2(表示容許單個JobManager失敗)。

<property>
  <name>yarn.resourcemanager.am.max-attempts</name>
  <value>4</value>
  <description>The maximum number of application master execution attempts</description>
</property>

示例:高度可用的YARN會話

  1. 配置HA模式和zookeeper法定人數conf/flink-conf.yaml

    high-availability: zookeeper
    high-availability.zookeeper.quorum: node21:2181,node22:2181,node23:2181
    high-availability.storageDir: hdfs:///flink/recovery
    high-availability.zookeeper.path.root: /flink
    yarn.application-attempts: 10
  2. 配置ZooKeeper的服務器conf/zoo.cfg(目前它只是能夠運行每臺機器的單一的ZooKeeper服務器):

    server.1=node21:2888:3888
    server.2=node22:2888:3888
    server.3=node23:2888:3888
  3. 啓動ZooKeeper仲裁

    $ bin / start-zookeeper-quorum.sh
  4. 啓動HA羣集

    $ bin / yarn-session.sh -n 2

五.錯誤異常

1.身份認證失敗

[root@node21 flink-1.6.1]# flink run examples/streaming/SocketWindowWordCount.jar --port 9000
Starting execution of program

------------------------------------------------------------
 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: b7a99ac5db242290413dbebe32ba52b0)
    at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:267)
    at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:486)
    at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
    at org.apache.flink.streaming.examples.socket.SocketWindowWordCount.main(SocketWindowWordCount.java:92)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
    at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:426)
    at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:804)
    at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:280)
    at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:215)
    at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1044)
    at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1120)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
    at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
    at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1120)
Caused by: java.net.ConnectException: Connection refused (Connection refused)
    at java.net.PlainSocketImpl.socketConnect(Native Method)
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    at java.net.Socket.connect(Socket.java:589)
    at org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction.run(SocketTextStreamFunction.java:96)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
    at java.lang.Thread.run(Thread.java:748)

經過查看日誌,發現有以下報錯

2018-10-20 02:32:19,668 ERROR org.apache.flink.shaded.curator.org.apache.curator.ConnectionState  - Authentication failed

解決法案:添加定時任務認證kerberos

相關文章
相關標籤/搜索