Flink
是目前最熱門的分佈式流/批處理框架,而Kubernetes
是目前最熱門的資源管理和調度平臺。Flink
支持在Kubernetes
上採用Session模式或Application模式部署做業。基於實踐經驗,本文主要討論在Kubernetes上部署Flink做業須要注意的地方。html
環境:java
測試雖基於flink-1.11.2,參考1.12的文檔也無妨:node
k8s上運行flink任務有兩種模式,session模式和application模式(早期還有一種per-job模式,但已廢棄)github
session模式下,在k8s上會部署一個運行jobmanager的pod(以及包括deployment/rs/service/configmap等資源)。後續經過提交命令提交的任務,都由這個jobmanager的pod負責向k8s申請taskmanager的pod。這種模式與standalone有些相似,惟一不一樣的是,standalone模式須要事先部署好master和worker。docker
application模式下,每一個做業會在k8s上部署一套jm和tm,這跟yarn模式是相似的。apache
基於上述原理,不論是哪一種模式都須要pod有權限建立和管理k8s資源,所以須要考慮RBAC。api
正如文檔所說,須要事先爲default這個servicename設置對應的role,實踐中咱們部署以下配置,參考kubernetes-log-user-systemserviceaccountdefaultdefault-cannot-get-services:網絡
apiVersion: rbac.authorization.k8s.io/v1beta1 kind: ClusterRoleBinding metadata: name: fabric8-rbac subjects: - kind: ServiceAccount # Reference to upper's `metadata.name` name: default # Reference to upper's `metadata.namespace` namespace: default roleRef: kind: ClusterRole name: cluster-admin apiGroup: rbac.authorization.k8s.io
最開始遺漏上述步驟浪費了大量的調試時間。session
另外,提交任務最好在k8s的節點上進行,由於以下緣由
KubeConfig, which has access to list, create, delete pods and services, configurable via ~/.kube/config. You can verify permissions by running kubectl auth can-i <list|create|edit|delete> pods.
k8s上須要事先安裝好CoreDNS
flink1.11的客戶端對log配置處理並很差,這形成調試和排錯困難,因此建議上來先處理一下客戶端的配置:
在flink-conf.yaml
增長以下配置:
kubernetes.container-start-command-template: %java% %classpath% %jvmmem% %jvmopts% %logging% %class% %args% kubernetes.container.image.pull-policy: Always
kubernetes.container-start-command-template
的做用是生成jobmanager pod時的啓動命令。這裏去掉提交命令中最後的的%redirect%
。默認%redirect%
會將標準輸出和標準錯誤重定向到文件。重定向會致使,若是pod出錯掛掉的話,沒法經過kubectl logs命令查看日誌將logback-console.xml
和log4j-console.properties
重命名爲logback.xml
和log4j.properties
。這將使得日誌打印到stdout和stderr,不然日誌將打印到文件。
1.12修復了這個兩個問題,沒必要修改命令行模板,也沒必要重命名日誌文件 FLINK-15792
首先經過以下命令提交jobmanager:
$ ./bin/kubernetes-session.sh -Dkubernetes.cluster-id=my-first-flink-cluster
若是不成功的的話,建議經過kubectl logs命令看下問題。
接下來提交做業:
$ ./bin/flink run --target kubernetes-session -Dkubernetes.cluster-id=my-first-flink-cluster ./examples/streaming/TopSpeedWindowing.jar
可能會報錯:
Caused by: java.util.concurrent.CompletionException: [org.apache.flink.shaded.netty4.io](http://org.apache.flink.shaded.netty4.io/).netty.channel.AbstractChannel$AnnotatedConnectException: 拒絕鏈接: /192.168.21.73:8081 at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:957) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940) ... 19 more
顯然是客戶端嘗試將jobgraph
提交給容器中的jobmanager
時沒法鏈接到jobmanager。這裏想到首先要檢查一下jobmanager是否正常,進入jm容器,日誌沒有什麼報錯,應該沒問題。容器啓動的是8081的端口,在外面是沒法訪問的。這是k8s環境的「通病」:複雜網絡環境。
有幾種方案:
經過kubectl port-forward進行本地端口轉發,例如:
kubectl port-forward my-first-flink-cluster-6446bbb6f6-4nnm5 8081:8081 --address 0.0.0.0
jobmanager會啓動一個rest service資源,默認採用LoadBalancer類型,咱們能夠在集羣中安裝一個LoadBalancer,下面介紹瞭如何安裝metallb
metallb自己也是pod運行的,按照官網安裝沒什麼問題:
kubectl apply -f https://raw.githubusercontent.com/metallb/metallb/v0.9.5/manifests/namespace.yaml kubectl apply -f https://raw.githubusercontent.com/metallb/metallb/v0.9.5/manifests/metallb.yaml首次安裝需運行
kubectl create secret generic -n metallb-system memberlist --from-literal=secretkey="$(openssl rand -base64 128)"採用layer2的方式配置,並且地址池的地址段與node是同一個地址段,這樣不須要配置其餘東西:
config.yaml apiVersion: v1 kind: ConfigMap metadata: namespace: metallb-system name: config data: config: | address-pools: - name: default protocol: layer2 addresses: - 192.168.21.210-192.168.21.215
配置loadbalancer之後:
能夠看到xxx-rest的svc,其中的EXTERNAL-IP
原先一直是<pending>,如今從地址段中分配了一個地址了。這個地址沒法ping,可是能夠訪問:
解決了服務外部訪問的問題,就能正常運行測試做業了。
session模式總結:
Session mode雖然看似簡單,可是對於掃清環境障礙起到相當重要的做用。上面提到Application mode與yarn實際上是比較相似的,是一種更接近生產的部署模式。
首先須要將打包好的應用程序jar包打入鏡像:
FROM flink:1.11.2-scala_2.11 RUN mkdir -p $FLINK_HOME/usrlib COPY jax-flink-entry-2.0-SNAPSHOT.jar $FLINK_HOME/usrlib/jax-flink-entry-2.0-SNAPSHOT.jar COPY kafka-clients-2.2.0.jar $FLINK_HOME/lib/kafka-clients-2.2.0.jar
以上面的Dockerfile爲例,把咱們的應用程序包放到$FLINK_HOME/usrlib
(這是個特殊的目錄,默認Flink在運行的時候會從這個目錄加載用戶的jar包)。同時,咱們把依賴包放到$FLINK_HOME/lib
下。
構建鏡像並推送到內部的鏡像倉庫:
docker build -t xxxxx:5000/jax-flink:lastest . docker push xxxx:5000/jax-flink:lastest
以Application mode提交做業
./bin/flink run-application \ --target kubernetes-application \ -Dkubernetes.cluster-id=jax-application-cluster \ -Dkubernetes.container.image=xxxx:5000/jax-flink:lastest \ -c com.eoi.jax.flink_entry.FlinkMainEntry \ local:///opt/flink/usrlib/jax-flink-entry-2.0-SNAPSHOT.jar ...
做業會啓動獨立的jobmanager和taskmanager。Applicatoin mode的特色是做業的構建(生成jobgraph的過程)不在客戶端完成,而是在jobmanager上完成,這一點與spark的driver是相似的。
一些提交命令參數的做用:
$internal.application.program-args
。這將最終最爲用戶main函數的參數[]String$internal.application.main
pipeline.classpaths
(必須是合法的URL)。可是,pipeline.classpaths中的URL不會被加到運行用戶main函數的類加載器中,這意味着-C指定的依賴包沒法被用戶代碼使用。筆者已經向Flink提交了相關的issue和PR,已經被肯定爲BUG。FLINK-21289containerized.taskmanager.env
和containerized.master.env
測試下來是生效的,能夠生成容器的env