在Kubernetes上運行Flink

Flink是目前最熱門的分佈式流/批處理框架,而Kubernetes是目前最熱門的資源管理和調度平臺。Flink支持在Kubernetes上採用Session模式或Application模式部署做業。基於實踐經驗,本文主要討論在Kubernetes上部署Flink做業須要注意的地方。html

環境:java

  • k8s: 1.15
  • flink-client:flink-1.11.2

測試雖基於flink-1.11.2,參考1.12的文檔也無妨:node

native_kubernetesgit

k8s上運行flink任務有兩種模式,session模式和application模式(早期還有一種per-job模式,但已廢棄)github

session模式下,在k8s上會部署一個運行jobmanager的pod(以及包括deployment/rs/service/configmap等資源)。後續經過提交命令提交的任務,都由這個jobmanager的pod負責向k8s申請taskmanager的pod。這種模式與standalone有些相似,惟一不一樣的是,standalone模式須要事先部署好master和worker。docker

application模式下,每一個做業會在k8s上部署一套jm和tm,這跟yarn模式是相似的。apache

準備

RBCA

基於上述原理,不論是哪一種模式都須要pod有權限建立和管理k8s資源,所以須要考慮RBAC。api

正如文檔所說,須要事先爲default這個servicename設置對應的role,實踐中咱們部署以下配置,參考kubernetes-log-user-systemserviceaccountdefaultdefault-cannot-get-services網絡

apiVersion: rbac.authorization.k8s.io/v1beta1
kind: ClusterRoleBinding
metadata:
  name: fabric8-rbac
subjects:
  - kind: ServiceAccount
    # Reference to upper's `metadata.name`
    name: default
    # Reference to upper's `metadata.namespace`
    namespace: default
roleRef:
  kind: ClusterRole
  name: cluster-admin
  apiGroup: rbac.authorization.k8s.io

最開始遺漏上述步驟浪費了大量的調試時間。session

在k8s節點上提交

另外,提交任務最好在k8s的節點上進行,由於以下緣由

KubeConfig, which has access to list, create, delete pods and services, configurable via ~/.kube/config. You can verify permissions by running kubectl auth can-i <list|create|edit|delete> pods.

CoreDNS

k8s上須要事先安裝好CoreDNS

處理日誌配置

flink1.11的客戶端對log配置處理並很差,這形成調試和排錯困難,因此建議上來先處理一下客戶端的配置:

flink-conf.yaml增長以下配置:

kubernetes.container-start-command-template: %java% %classpath% %jvmmem% %jvmopts% %logging% %class% %args%
kubernetes.container.image.pull-policy: Always
  • kubernetes.container-start-command-template的做用是生成jobmanager pod時的啓動命令。這裏去掉提交命令中最後的的%redirect%。默認%redirect%會將標準輸出和標準錯誤重定向到文件。重定向會致使,若是pod出錯掛掉的話,沒法經過kubectl logs命令查看日誌
  • 鏡像始終拉取。在內網環境下,流量不是問題,始終拉取鏡像,方便後面修改基礎鏡像後,能及時拉取

logback-console.xmllog4j-console.properties重命名爲logback.xmllog4j.properties。這將使得日誌打印到stdout和stderr,不然日誌將打印到文件。

1.12修復了這個兩個問題,沒必要修改命令行模板,也沒必要重命名日誌文件 FLINK-15792

Session mode

首先經過以下命令提交jobmanager:

$ ./bin/kubernetes-session.sh -Dkubernetes.cluster-id=my-first-flink-cluster

若是不成功的的話,建議經過kubectl logs命令看下問題。

接下來提交做業:

$ ./bin/flink run --target kubernetes-session -Dkubernetes.cluster-id=my-first-flink-cluster ./examples/streaming/TopSpeedWindowing.jar

可能會報錯:

Caused by: java.util.concurrent.CompletionException: [org.apache.flink.shaded.netty4.io](http://org.apache.flink.shaded.netty4.io/).netty.channel.AbstractChannel$AnnotatedConnectException: 拒絕鏈接: /192.168.21.73:8081
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:957)
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
... 19 more

顯然是客戶端嘗試將jobgraph提交給容器中的jobmanager時沒法鏈接到jobmanager。這裏想到首先要檢查一下jobmanager是否正常,進入jm容器,日誌沒有什麼報錯,應該沒問題。容器啓動的是8081的端口,在外面是沒法訪問的。這是k8s環境的「通病」:複雜網絡環境。

有幾種方案:

  1. 經過kubectl port-forward進行本地端口轉發,例如:

    kubectl port-forward my-first-flink-cluster-6446bbb6f6-4nnm5 8081:8081 --address 0.0.0.0
  2. jobmanager會啓動一個rest service資源,默認採用LoadBalancer類型,咱們能夠在集羣中安裝一個LoadBalancer,下面介紹瞭如何安裝metallb

    metallb自己也是pod運行的,按照官網安裝沒什麼問題:

    kubectl apply -f https://raw.githubusercontent.com/metallb/metallb/v0.9.5/manifests/namespace.yaml
    kubectl apply -f https://raw.githubusercontent.com/metallb/metallb/v0.9.5/manifests/metallb.yaml

    首次安裝需運行

    kubectl create secret generic -n metallb-system memberlist --from-literal=secretkey="$(openssl rand -base64 128)"

    採用layer2的方式配置,並且地址池的地址段與node是同一個地址段,這樣不須要配置其餘東西:

    config.yaml
    apiVersion: v1
    kind: ConfigMap
    metadata:
      namespace: metallb-system
      name: config
    data:
      config: |
        address-pools:
        - name: default
          protocol: layer2
          addresses:
          - 192.168.21.210-192.168.21.215

配置loadbalancer之後:

能夠看到xxx-rest的svc,其中的EXTERNAL-IP原先一直是<pending>,如今從地址段中分配了一個地址了。這個地址沒法ping,可是能夠訪問:

image.png

解決了服務外部訪問的問題,就能正常運行測試做業了。

session模式總結:

  1. session模式下,在k8s上提交一個jobmanager的pod,要保證服務帳號有管理pod的權限,不然沒法運行
  2. 客戶端提交的時候要注意打通網絡,能夠用port-forward來作
  3. 客戶端提交終端會阻塞,但ctrl-C也不要緊
  4. jobmanager會爲做業向k8s申請生成taskmanager容器運行做業

Application mode

Session mode雖然看似簡單,可是對於掃清環境障礙起到相當重要的做用。上面提到Application mode與yarn實際上是比較相似的,是一種更接近生產的部署模式。

首先須要將打包好的應用程序jar包打入鏡像:

FROM flink:1.11.2-scala_2.11
 
RUN mkdir -p $FLINK_HOME/usrlib
COPY jax-flink-entry-2.0-SNAPSHOT.jar $FLINK_HOME/usrlib/jax-flink-entry-2.0-SNAPSHOT.jar
COPY kafka-clients-2.2.0.jar $FLINK_HOME/lib/kafka-clients-2.2.0.jar

以上面的Dockerfile爲例,把咱們的應用程序包放到$FLINK_HOME/usrlib(這是個特殊的目錄,默認Flink在運行的時候會從這個目錄加載用戶的jar包)。同時,咱們把依賴包放到$FLINK_HOME/lib下。

構建鏡像並推送到內部的鏡像倉庫:

docker build -t xxxxx:5000/jax-flink:lastest .
docker push xxxx:5000/jax-flink:lastest

以Application mode提交做業

./bin/flink run-application \
--target kubernetes-application \
-Dkubernetes.cluster-id=jax-application-cluster \
-Dkubernetes.container.image=xxxx:5000/jax-flink:lastest \
-c com.eoi.jax.flink_entry.FlinkMainEntry  \
local:///opt/flink/usrlib/jax-flink-entry-2.0-SNAPSHOT.jar ...

做業會啓動獨立的jobmanager和taskmanager。Applicatoin mode的特色是做業的構建(生成jobgraph的過程)不在客戶端完成,而是在jobmanager上完成,這一點與spark的driver是相似的。

image.png

一些提交命令參數的做用:

  • 應用自身的參數:會在flink-conf.yaml中生成:$internal.application.program-args。這將最終最爲用戶main函數的參數[]String
  • -class:會在flink-conf.yaml中生成$internal.application.main
  • -C: 會在flink-conf.yarml中生成pipeline.classpaths(必須是合法的URL)。可是,pipeline.classpaths中的URL不會被加到運行用戶main函數的類加載器中,這意味着-C指定的依賴包沒法被用戶代碼使用。筆者已經向Flink提交了相關的issue和PR,已經被肯定爲BUG。FLINK-21289
  • containerized.taskmanager.envcontainerized.master.env測試下來是生效的,能夠生成容器的env
相關文章
相關標籤/搜索