千呼萬喚始出來,在 Kubernetes 如火如荼的今天,Flink 社區終於在 1.10 版本提供了對 Kubernetes 的原生支持,也就是Native Kubernetes Integration。不過還只是Beta版本,預計會在 1.11 版本里面提供完整的支持。html
咱們知道,在 Flink 1.9 以及以前的版本里面,若是要在 Kubernetes 上運行 Flink 任務是須要事先指定好須要的 TaskManager(TM) 的個數以及CPU和內存的。這樣的問題是:大多數狀況下,你在任務啓動前根本沒法精確的預估這個任務須要多少個TM。若是指定的TM多了,會致使資源浪費;若是指定的TM個數少了,會致使任務調度不起來。本質緣由是在 Kubernetes 上運行的 Flink 任務並無直接向 Kubernetes 集羣去申請資源。前端
Flink 在 1.10 版本完成了Active Kubernetes Integration
的第一階段,支持了 session clusters。後續的第二階段會提供更完整的支持,如支持 per-job 任務提交,以及基於原生 Kubernetes API 的高可用,支持更多的 Kubernetes 參數如 toleration, label 和 node selector 等。Active Kubernetes Integration
中的Active
意味着 Flink 的 ResourceManager (KubernetesResourceManager) 能夠直接和 Kubernetes 通訊,按需申請新的 Pod,相似於 Flink 中對 Yarn 和 Mesos 的集成所作的那樣。在多租戶環境中,用戶能夠利用 Kubernetes 裏面的 namespace 作資源隔離啓動不一樣的 Flink 集羣。固然,Kubernetes 集羣中的用戶賬號和賦權是須要提早準備好的。java
工做原理以下(段首的序號對應圖中箭頭所示的數字):node
前面兩步完成後,整個 Flink session cluster 就啓動好了,能夠接受提交任務請求了。apache
Flink 的文檔上對如何使用已經寫的比較詳細了,不過剛開始總會踩到一些坑。若是對 Kubernetes 不熟,可能會花點時間。api
(1) 首先得有個 Kubernetes 集羣,會有個~/.kube/config
文件。嘗試執行 kubectl get nodes 看下集羣是否正常。session
若是沒有這個~/.kube/config
文件,會報錯:app
2020-02-17 22:27:17,253 WARN io.fabric8.kubernetes.client.Config - Error reading service account token from: [/var/run/secrets/kubernetes.io/serviceaccount/token]. Ignoring. 2020-02-17 22:27:17,437 ERROR org.apache.flink.kubernetes.cli.KubernetesSessionCli - Error while running the Flink session. io.fabric8.kubernetes.client.KubernetesClientException: Operation: [get] for kind: [Service] with name: [flink-cluster-81832d75-662e-40fd-8564-cd5a902b243c] in namespace: [default] failed. at io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:64) at io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:72) at io.fabric8.kubernetes.client.dsl.base.BaseOperation.getMandatory(BaseOperation.java:231) at io.fabric8.kubernetes.client.dsl.base.BaseOperation.get(BaseOperation.java:164) at org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient.getService(Fabric8FlinkKubeClient.java:334) at org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient.getInternalService(Fabric8FlinkKubeClient.java:246) at org.apache.flink.kubernetes.cli.KubernetesSessionCli.run(KubernetesSessionCli.java:104) at org.apache.flink.kubernetes.cli.KubernetesSessionCli.lambda$main$0(KubernetesSessionCli.java:185) at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) at org.apache.flink.kubernetes.cli.KubernetesSessionCli.main(KubernetesSessionCli.java:185) Caused by: java.net.UnknownHostException: kubernetes.default.svc: nodename nor servname provided, or not known
(2) 提早建立好用戶和賦權(RBAC)jvm
kubectl create serviceaccount flink kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=default:flink
若是沒有建立用戶,使用默認的用戶去提交,會報錯:ide
Caused by: io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: GET at: https://10.10.0.1/api/v1/namespaces/default/pods?labelSelector=app%3Dkaibo-test%2Ccomponent%3Dtaskmanager%2Ctype%3Dflink-native-kubernetes. Message: Forbidden!Configured service account doesn't have access. Service account may have been revoked. pods is forbidden: User "system:serviceaccount:default:default" cannot list resource "pods" in API group "" in the namespace "default".
(3) 這一步是可選的。默認狀況下, JobManager 和 TaskManager 只會將 log 寫到各自 pod 的 /opt/flink/log 。若是想經過 kubectl logs 看到日誌,須要將 log 輸出到控制檯。要作以下修改 FLINK_HOME/conf 目錄下的 log4j.properties 文件。
log4j.rootLogger=INFO, file, console # Log all infos to the console log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
而後啓動 session cluster 的命令行須要帶上參數:
-Dkubernetes.container-start-command-template="%java% %classpath% %jvmmem% %jvmopts% %logging% %class% %args%"
(4) 終於能夠開始啓動 session cluster了。以下命令是啓動一個每一個 TaskManager 是4G內存,2個CPU,4個slot 的 session cluster。
bin/kubernetes-session.sh -Dkubernetes.container-start-command-template="%java% %classpath% %jvmmem% %jvmopts% %logging% %class% %args%" -Dkubernetes.cluster-id=kaibo-test -Dtaskmanager.memory.process.size=4096m -Dkubernetes.taskmanager.cpu=2 -Dtaskmanager.numberOfTaskSlots=4
更多的參數詳見文檔:https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#kubernetes
使用kubectl logs kaibo-test-6f7dffcbcf-c2p7g -f
就能看到日誌了。
若是出現大量的這種日誌(目前遇到是雲廠商的LoadBalance liveness探測致使):
2020-02-17 14:58:56,323 WARN org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint - Unhandled exception java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:192) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:377) at org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:247)
能夠暫時在 log4j.properties 裏面配置上:
log4j.logger.org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint=ERROR, file
這個日誌太多會致使 WebUI 上打開 jobmanger log 是空白,由於文件太大了前端沒法顯示。
若是前面第(1)和第(2)步沒有作,會出現各類異常,經過 kubectl logs 就能很方便的看到日誌了。
Session cluster 啓動後能夠經過 kubectl get pods,svc 來看是否正常。
經過端口轉發來查看 Web UI:
kubectl port-forward service/kaibo-test 8081
打開http://127.0.0.1:8001就能看到 Flink 的 WebUI 了。
(5) 提交任務
./bin/flink run -d -e kubernetes-session -Dkubernetes.cluster-id=kaibo-test examples/streaming/TopSpeedWindowing.jar
咱們從 Flink WebUI 頁面上能夠看到,剛開始啓動時,UI上顯示 Total/Available Task Slots 爲0, Task Managers 也是0。隨着任務的提交,資源會動態增長。任務中止後,資源就會釋放掉。
在提交任務後,經過 kubectl get pods 可以看到 Flink 爲 TaskManager 分配了新的 Pod。
(6) 中止 session cluster
echo 'stop' | ./bin/kubernetes-session.sh -Dkubernetes.cluster-id=kaibo-test -Dexecution.attached=true
也能夠手工刪除資源:
kubectl delete service/<ClusterID>
能夠看到,Flink 1.10 版本對和 Kubernetes 的集成作了很好的嘗試。期待社區後續的 1.11 版本能對 per-job 提供支持,以及和 Kubernetes 的深度集成,例如基於原生 Kubernetes API 的高可用
查看更多:https://yqh.aliyun.com/detail..._content=g_1000105249
上雲就看雲棲號:更多雲資訊,上雲案例,最佳實踐,產品入門,訪問:https://yqh.aliyun.com/