Flink集羣部署

1部署方式linux

通常來說有三種方式:session

  • Localapp

  • Standalonessh

  • Flink On Yarn/Mesos/K8s… ide


2Standalone部署
工具

上一節咱們講了單機模式如何部署啓動,這節咱們基於CentOS 7虛擬機搭建一個3個節點的集羣:oop

角色分配: ui


Master: 192.168.246.134
Slave: 192.168.246.135
Slave: 192.168.246.136

192.168.246.134 jobmanager
192.168.246.135 taskmanager
192.168.246.136 taskmanager

假設三臺機器都存在:this

用戶root 密碼爲123 spa


192.168.246.134 master
192.168.246.135 slave1
192.168.246.136 slave2

三臺機器首先要作ssh免登,具體方法很簡單,能夠百度。

下載一個包到本地,這裏我選擇了1.7.2版本+Hadoop2.8+Scala2.11版本,而後分發到三臺機器上:


scp flink-1.7.2-bin-hadoop28-scala_2.11.tgz root@192.168.246.13X:~
scp jdk-8u11-linux-x64.tar.gz root@192.168.246.13X:~
注意:X表明四、五、6,分發到3臺機器

修改解壓後目錄屬主:
Chown -R root:root flink/
Chown -R root:root jdk8/

export JAVA_HOME=/root/jdk8
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
export PATH=${JAVA_HOME}/bin:$PATH

分別修改master和slave的flink-conf.yaml文件 


Vim flink/conf/flink-conf.yaml

#
#配置master節點ip
jobmanager.rpc.address: 192.168.1.100

#
#配置slave節點可用內存,單位MB
taskmanager.heap.mb: 25600

#
#配置每一個節點的可用slot,1 核CPU對應 1 slot
##the number of available CPUs per machine 
taskmanager.numberOfTaskSlots: 30

#
#默認並行度 1 slot資源
parallelism.default: 1

修改slave節點配置文件slaves:
192.168.246.135
192.168.246.136

啓動集羣: 


##在master節點上執行此腳本,就能夠啓動集羣,前提要保證master節點到slaver節點能夠免密登陸,
##由於它的啓動過程是:先在master節點啓動jobmanager進程,而後ssh到各slaver節點啓動taskmanager進程
./bin/start-cluster.sh
中止集羣:
./bin/stop-cluster.sh


3Flink on yarn集羣部署

名詞解釋:指事物的結構形態、運轉模型和人們觀念的根本性轉變過程。

Yarn的簡介:

圖片

ResourceManager

ResourceManager 負責整個集羣的資源管理和分配,是一個全局的資源管理系統。 NodeManager 以心跳的方式向 ResourceManager 彙報資源使用狀況(目前主要是 CPU 和內存的使用狀況)。RM 只接受 NM 的資源回報信息,對於具體的資源處理則交給 NM 本身處理。

NodeManager

NodeManager 是每一個節點上的資源和任務管理器,它是管理這臺機器的代理,負責該節點程序的運行,以及該節點資源的管理和監控。YARN 集羣每一個節點都運行一個NodeManager。

NodeManager 定時向 ResourceManager 彙報本節點資源(CPU、內存)的使用狀況和Container 的運行狀態。當 ResourceManager 宕機時 NodeManager 自動鏈接 RM 備用節點。

NodeManager 接收並處理來自 ApplicationMaster 的 Container 啓動、中止等各類請求。

ApplicationMaster

負責與 RM 調度器協商以獲取資源(用 Container 表示)。

將獲得的任務進一步分配給內部的任務(資源的二次分配)。

與 NM 通訊以啓動/中止任務。

監控全部任務運行狀態,並在任務運行失敗時從新爲任務申請資源以重啓任務 


Flink on yarn 集羣啓動步驟 

  • 步驟1 用戶向YARN中提交應用程序,其中包括ApplicationMaster程序、啓動ApplicationMaster的命令、用戶程序等。

  • 步驟2 ResourceManager爲該應用程序分配第一個Container,並與對應的Node-Manager通訊,要求它在這個Container中啓動應用程序的ApplicationMaster。

  • 步驟3 ApplicationMaster首先向ResourceManager註冊,這樣用戶能夠直接經過ResourceManager查看應用程序的運行狀態,而後它將爲各個任務申請資源,並監控它的運行狀態,直到運行結束,即重複步驟4~7。

  • 步驟4 ApplicationMaster採用輪詢的方式經過RPC協議向ResourceManager申請和領取資源。

  • 步驟5 一旦ApplicationMaster申請到資源後,便與對應的NodeManager通訊,要求它啓動任務。

  • 步驟6 NodeManager爲任務設置好運行環境(包括環境變量、JAR包、二進制程序等)後,將任務啓動命令寫到一個腳本中,並經過運行該腳本啓動任務。

  • 步驟7 各個任務經過某個RPC協議向ApplicationMaster彙報本身的狀態和進度,以讓ApplicationMaster隨時掌握各個任務的運行狀態,從而能夠在任務失敗時從新啓動任務。 在應用程序運行過程當中,用戶可隨時經過RPC向ApplicationMaster查詢應用程序的當前運行狀態。

  • 步驟8 應用程序運行完成後,ApplicationMaster向ResourceManager註銷並關閉本身。


on yarn 集羣部署 :

設置Hadoop環境變量:


[root@hadoop2 flink-1.7.2]# vi /etc/profile
export HADOOP_CONF_DIR=這裏是你本身的hadoop路徑

bin/yarn-session.sh -h 查看使用方法:


bin/yarn-session.sh -h

Usage:
Required
-n,--container <arg> 爲YARN分配容器的數量 (=Number of Task Managers)
Optional
-D <property=value> 動態屬性 
-d,--detached 以分離模式運行做業
-h,--help Yarn session幫助.
-id,--applicationId <arg> 鏈接到一個正在運行的YARN session
-j,--jar <arg> Flink jar文件的路徑
-jm,--jobManagerMemory <arg> JobManager的內存大小,driver-memory [in MB]
-m,--jobmanager <arg> Address of the JobManager (master) to which to connect. Use this flag to connect to a different JobManager than the one specified in the configuration.
-n,--container <arg> TaskManager的數量,至關於executor的數量
-nm,--name <arg> 設置YARN應用自定義名稱 
-q,--query 顯示可用的YARN資源 (memory, cores)
-qu,--queue <arg> 指定YARN隊列
-s,--slots <arg> 每一個JobManager的core的數量,executor-cores。建議將slot的數量設置每臺機器的處理器數量
-st,--streaming 在流模式下啓動Flink
-t,--ship <arg> 在指定目錄中傳送文件(t for transfer)
-tm,--taskManagerMemory <arg> 每一個TaskManager的內存大小,executor-memory [in MB]
-yd,--yarndetached 若是存在,則以分離模式運行做業 (deprecated; use non-YARN specific option instead)
-z,--zookeeperNamespace <arg> 爲高可用性模式建立Zookeeper子路徑的命名空間

在啓動的是能夠指定TaskManager的個數以及內存(默認是1G),也能夠指定JobManager的內存,可是JobManager的個數只能是一個

咱們開啓動一個YARN session:


./bin/yarn-session.sh -n 4 -tm 8192 -s 8

上面命令啓動了4個TaskManager,每一個TaskManager內存爲8G且佔用了8個核(是每一個TaskManager,默認是1個核)。在啓動YARN session的時候會加載conf/flink-config.yaml配置文件,咱們能夠根據本身的需求去修改裏面的相關參數.

YARN session啓動以後就可使用bin/flink來啓動提交做業:

例如:


./bin/flink run -c com.demo.wangzhiwu.WordCount $DEMO_DIR/target/flink-demo-1.0.SNAPSHOT.jar --port 9000

flink run的用法以下:

用法: run [OPTIONS] <jar-file> <arguments>

"run" 操做參數:


-c,--class <classname> 若是沒有在jar包中指定入口類,則須要在這裏經過這個參數指定

-m,--jobmanager <host:port> 指定須要鏈接的jobmanager(主節點)地址

使用這個參數能夠指定一個不一樣於配置文件中的jobmanager

-p,--parallelism <parallelism> 指定程序的並行度。能夠覆蓋配置文件中的默認值。

使用run 命令向yarn集羣提交一個job。客戶端能夠肯定jobmanager的地址。固然,你也能夠經過-m參數指定jobmanager。jobmanager的地址在yarn控制檯上能夠看到。

值得注意的是:

上面的YARN session是在Hadoop YARN環境下啓動一個Flink cluster集羣,裏面的資源是能夠共享給其餘的Flink做業。咱們還能夠在YARN上啓動一個Flink做業。這裏咱們仍是使用./bin/flink,可是不須要事先啓動YARN session:


./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar \

--input hdfs://user/hadoop/input.txt \

--output hdfs://user/hadoop/output.txt

上面的命令一樣會啓動一個相似於YARN session啓動的頁面。其中的-yn是指TaskManager的個數,必需要指定。 

後臺運行 yarn session

若是你不但願flink yarn client一直運行,也能夠啓動一個後臺運行的yarn session。使用這個參數:-d 或者 --detached

在這種狀況下,flink yarn client將會只提交任務到集羣而後關閉本身。注意:在這種狀況下,沒法使用flink中止yarn session。

必須使用yarn工具來中止yarn session


yarn application -kill <applicationId>

flink on yarn的故障恢復

flink 的 yarn 客戶端經過下面的配置參數來控制容器的故障恢復。這些參數能夠經過conf/flink-conf.yaml 或者在啓動yarn session的時候經過-D參數來指定。


yarn.reallocate-failed:這個參數控制了flink是否應該從新分配失敗的taskmanager容器。默認是true

yarn.maximum-failed-containersapplicationMaster能夠接受的容器最大失敗次數,達到這個參數,就會認爲yarn session失敗。默認這個次數和初始化請求的taskmanager數量相等(-n 參數指定的)。

yarn.application-attemptsapplicationMaster重試的次數。若是這個值被設置爲1(默認就是1),當application master失敗的時候,yarn session也會失敗。設置一個比較大的值的話,yarn會嘗試重啓applicationMaster

日誌文件查看

在某種狀況下,flink yarn session 部署失敗是因爲它自身的緣由,用戶必須依賴於yarn的日誌來進行分析。最有用的就是yarn log aggregation 。啓動它,用戶必須在yarn-site.xml文件中設置yarn.log-aggregation-enable 屬性爲true。一旦啓用了,用戶能夠經過下面的命令來查看一個失敗的yarn session的全部詳細日誌。


yarn logs -applicationId <application ID>
相關文章
相關標籤/搜索