Flink 1.10 Native Kubernetes 原理與實踐

千呼萬喚始出來,在 Kubernetes 如火如荼的今天,Flink 社區終於在 1.10 版本提供了對 Kubernetes 的原生支持,也就是Native Kubernetes Integration。不過還只是Beta版本,預計會在 1.11 版本里面提供完整的支持。html

咱們知道,在 Flink 1.9 以及以前的版本里面,若是要在 Kubernetes 上運行 Flink 任務是須要事先指定好須要的 TaskManager(TM) 的個數以及CPU和內存的。這樣的問題是:大多數狀況下,你在任務啓動前根本沒法精確的預估這個任務須要多少個TM。若是指定的TM多了,會致使資源浪費;若是指定的TM個數少了,會致使任務調度不起來。本質緣由是在 Kubernetes 上運行的 Flink 任務並無直接向 Kubernetes 集羣去申請資源。前端

Flink 在 1.10 版本完成了Active Kubernetes Integration的第一階段,支持了 session clusters。後續的第二階段會提供更完整的支持,如支持 per-job 任務提交,以及基於原生 Kubernetes API 的高可用,支持更多的 Kubernetes 參數如 toleration, label 和 node selector 等。Active Kubernetes Integration中的Active意味着 Flink 的 ResourceManager (KubernetesResourceManager) 能夠直接和 Kubernetes 通訊,按需申請新的 Pod,相似於 Flink 中對 Yarn 和 Mesos 的集成所作的那樣。在多租戶環境中,用戶能夠利用 Kubernetes 裏面的 namespace 作資源隔離啓動不一樣的 Flink 集羣。固然,Kubernetes 集羣中的用戶賬號和賦權是須要提早準備好的。java

原理

flink_1.10_nativek8s.png

工做原理以下(段首的序號對應圖中箭頭所示的數字):node

  1. Flink 客戶端首先鏈接 Kubernetes API Server,提交 Flink 集羣的資源描述文件,包括 configmap,job manager service,job manager deployment 和Owner Reference
  2. Kubernetes Master 就會根據這些資源描述文件去建立對應的 Kubernetes 實體。以咱們最關心的 job manager deployment 爲例,Kubernetes 集羣中的某個節點收到請求後,Kubelet 進程會從中央倉庫下載 Flink 鏡像,準備和掛載 volume,而後執行啓動命令。在 flink master 的 pod 啓動後,Dispacher 和 KubernetesResourceManager 也都啓動了。

前面兩步完成後,整個 Flink session cluster 就啓動好了,能夠接受提交任務請求了。apache

  1. 用戶能夠經過 Flink 命令行即 flink client 往這個 session cluster 提交任務。此時 job graph 會在 flink client 端生成,而後和用戶 jar 包一塊兒經過 RestClinet 上傳。
  2. 一旦 job 提交成功,JobSubmitHandler 收到請求就會提交 job 給 Dispatcher。接着就會生成一個 job master。
  3. JobMaster 向 KubernetesResourceManager 請求 slots。
  4. KubernetesResourceManager 從 Kubernetes 集羣分配 TaskManager。每一個TaskManager都是具備惟一表示的 Pod。KubernetesResourceManager 會爲 TaskManager 生成一份新的配置文件,裏面有 Flink Master 的 service name 做爲地址。這樣在 Flink Master failover以後,TaskManager 仍然能夠從新連上。
  5. Kubernetes 集羣分配一個新的 Pod 後,在上面啓動 TaskManager。
  6. TaskManager 啓動後註冊到 SlotManager。
  7. SlotManager 向 TaskManager 請求 slots。
  8. TaskManager 提供 slots 給 JobMaster。而後任務就會被分配到這個 slots 上運行。

實踐

Flink 的文檔上對如何使用已經寫的比較詳細了,不過剛開始總會踩到一些坑。若是對 Kubernetes 不熟,可能會花點時間。api

(1) 首先得有個 Kubernetes 集羣,會有個~/.kube/config文件。嘗試執行 kubectl get nodes 看下集羣是否正常。session

若是沒有這個~/.kube/config文件,會報錯:app

2020-02-17 22:27:17,253 WARN  io.fabric8.kubernetes.client.Config                           - Error reading service account token from: [/var/run/secrets/kubernetes.io/serviceaccount/token]. Ignoring.
2020-02-17 22:27:17,437 ERROR org.apache.flink.kubernetes.cli.KubernetesSessionCli          - Error while running the Flink session.
io.fabric8.kubernetes.client.KubernetesClientException: Operation: [get]  for kind: [Service]  with name: [flink-cluster-81832d75-662e-40fd-8564-cd5a902b243c]  in namespace: [default]  failed.
    at io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:64)
    at io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:72)
    at io.fabric8.kubernetes.client.dsl.base.BaseOperation.getMandatory(BaseOperation.java:231)
    at io.fabric8.kubernetes.client.dsl.base.BaseOperation.get(BaseOperation.java:164)
    at org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient.getService(Fabric8FlinkKubeClient.java:334)
    at org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient.getInternalService(Fabric8FlinkKubeClient.java:246)
    at org.apache.flink.kubernetes.cli.KubernetesSessionCli.run(KubernetesSessionCli.java:104)
    at org.apache.flink.kubernetes.cli.KubernetesSessionCli.lambda$main$0(KubernetesSessionCli.java:185)
    at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
    at org.apache.flink.kubernetes.cli.KubernetesSessionCli.main(KubernetesSessionCli.java:185)
Caused by: java.net.UnknownHostException: kubernetes.default.svc: nodename nor servname provided, or not known

(2) 提早建立好用戶和賦權(RBAC)jvm

kubectl create serviceaccount flink
kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=default:flink

若是沒有建立用戶,使用默認的用戶去提交,會報錯:ide

Caused by: io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: GET at: https://10.10.0.1/api/v1/namespaces/default/pods?labelSelector=app%3Dkaibo-test%2Ccomponent%3Dtaskmanager%2Ctype%3Dflink-native-kubernetes. 

Message: Forbidden!Configured service account doesn't have access. 
Service account may have been revoked. pods is forbidden: 
User "system:serviceaccount:default:default" cannot list resource "pods" in API group "" in the namespace "default".

(3) 這一步是可選的。默認狀況下, JobManager 和 TaskManager 只會將 log 寫到各自 pod 的 /opt/flink/log 。若是想經過 kubectl logs 看到日誌,須要將 log 輸出到控制檯。要作以下修改 FLINK_HOME/conf 目錄下的 log4j.properties 文件。

log4j.rootLogger=INFO, file, console

# Log all infos to the console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n

而後啓動 session cluster 的命令行須要帶上參數:

-Dkubernetes.container-start-command-template="%java% %classpath% %jvmmem% %jvmopts% %logging% %class% %args%"

(4) 終於能夠開始啓動 session cluster了。以下命令是啓動一個每一個 TaskManager 是4G內存,2個CPU,4個slot 的 session cluster。

bin/kubernetes-session.sh -Dkubernetes.container-start-command-template="%java% %classpath% %jvmmem% %jvmopts% %logging% %class% %args%" -Dkubernetes.cluster-id=kaibo-test -Dtaskmanager.memory.process.size=4096m -Dkubernetes.taskmanager.cpu=2 -Dtaskmanager.numberOfTaskSlots=4

更多的參數詳見文檔:https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#kubernetes

使用kubectl logs kaibo-test-6f7dffcbcf-c2p7g -f就能看到日誌了。

若是出現大量的這種日誌(目前遇到是雲廠商的LoadBalance liveness探測致使):

2020-02-17 14:58:56,323 WARN  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - Unhandled exception
java.io.IOException: Connection reset by peer
    at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
    at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
    at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
    at sun.nio.ch.IOUtil.read(IOUtil.java:192)
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:377)
    at org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:247)

能夠暫時在 log4j.properties 裏面配置上:

log4j.logger.org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint=ERROR, file

這個日誌太多會致使 WebUI 上打開 jobmanger log 是空白,由於文件太大了前端沒法顯示。

若是前面第(1)和第(2)步沒有作,會出現各類異常,經過 kubectl logs 就能很方便的看到日誌了。

Session cluster 啓動後能夠經過 kubectl get pods,svc 來看是否正常。

經過端口轉發來查看 Web UI:

kubectl port-forward service/kaibo-test 8081

打開http://127.0.0.1:8001就能看到 Flink 的 WebUI 了。

(5) 提交任務

./bin/flink run -d -e kubernetes-session -Dkubernetes.cluster-id=kaibo-test examples/streaming/TopSpeedWindowing.jar

咱們從 Flink WebUI 頁面上能夠看到,剛開始啓動時,UI上顯示 Total/Available Task Slots 爲0, Task Managers 也是0。隨着任務的提交,資源會動態增長。任務中止後,資源就會釋放掉。

在提交任務後,經過 kubectl get pods 可以看到 Flink 爲 TaskManager 分配了新的 Pod。

pods.png

(6) 中止 session cluster

echo 'stop' | ./bin/kubernetes-session.sh -Dkubernetes.cluster-id=kaibo-test -Dexecution.attached=true

也能夠手工刪除資源:

kubectl delete service/<ClusterID>

總結

能夠看到,Flink 1.10 版本對和 Kubernetes 的集成作了很好的嘗試。期待社區後續的 1.11 版本能對 per-job 提供支持,以及和 Kubernetes 的深度集成,例如基於原生 Kubernetes API 的高可用

查看更多:https://yqh.aliyun.com/detail..._content=g_1000105249

上雲就看雲棲號:更多雲資訊,上雲案例,最佳實踐,產品入門,訪問:https://yqh.aliyun.com/

相關文章
相關標籤/搜索