大數據雲原生系列| 微信 Flink on Kubernetes 實戰總結

前言

架構轉型,擁抱雲原生服務生態java

當前微信內部的大數據計算平臺是基於自研的 Yard 資源調度系統來建設,Yard 的設計初衷除了提供在線服務資源隔離外,另外一方面是爲了提升在線服務機器的總體資源利用率,其核心策略是在機器空閒時能在上面跑一些大數據離線任務。可是對接業界各類大數據計算框架(例如 Hadoop MapReduce、Spark、Flink 等)都須要專門定製化開發,迭代維護很是不靈活,難以跟上開源社區發展的步伐。爲此,咱們開始轉向使用Kubernetes,並基於騰訊雲的 TKE 容器平臺逐步搭建咱們的大數據計算平臺。mysql

考慮到咱們 Yard 平臺上 Flink 做業還不是特別多,歷史包袱相對較少,因此咱們首先開始 Flink on Kubernetes實戰之路。git

微信 Flink 實時計算平臺總體概況

微信 Flink 做業數據流轉圖

下圖是咱們大多數業務的 Flink 做業實時計算數據流轉圖,數據經採集上報到消息隊列 Pulsar,用戶的 Flink 做業消費 Pulsar 計算(必要時也會訪問其餘外部存儲,如Redis、FeatureKV等),計算結果能夠落地到多種存儲系統,例如對於報表類業務,計算結果寫入 mysql/pg;對於實時樣本特徵拼接做業,計算結果寫入 hdfs,爲下游模型訓練不斷提供樣本;對於一些中間結果,則寫入Pulsar,以便對接下游 Flink 做業。github

下面詳細闡述上圖中 Flink 做業是如何提交部署的。web

集羣及 Flink 做業部署

Flink on TKE 半托管服務,極致的Flink雲原生使用體驗sql

Flink on TKE 半托管服務提供了Flink集羣部署、日誌、監控、存儲等一站式的服務,用戶能夠將其餘在線業務與Flink運行在同一個集羣中,從而最大程度提升資源資源使用率,達到統一資源、統一技術棧、統一運維等能力。docker

咱們基於騰訊雲的 TKE 容器平臺構建 Flink Kubernetes 計算集羣。根據已有的 Flink 做業運營行狀況,咱們發現絕大多數 Flink 做業主要是耗費內存,而CPU利用率廣泛較低,在機型選擇上咱們推薦選擇內存型機器。apache

對於 Flink 做業的提交部署,Flink on Kubernetes 有多種部署模式(詳細介紹請參考TKE團隊出品的文章:Flink on kubernetes 部署模式分析),Flink 開源社區前後推出了基於 Standalone 的 Kubernetes 聲明式部署以及 Kubernetes Native 部署方式,基於 Standalone 的 Kubernetes 聲明式部署步驟繁瑣且不易管理,因此不考慮,另外社區的 Flink on Kubernetes Native 部署方式是從1.12起正式推出,功能還不夠完善,而且還沒有被大規模生產驗證,咱們在這以前其實已經開始調研部署,通過一番比較後,咱們使用的是TKE容器團隊提供的Flink on TKE半托管服務(基於Kubernetes Operator),其提交部署流程大體以下圖所示。json

img

經過 Flink Operator,客戶端就能夠經過一個簡單的聲明式 API 提交部署 Flink 做業,各組件的生命週期統一由 Operator 控制,例如:api

apiVersion: flinkoperator.Kubernetes.io/v1beta1
kind: FlinkCluster
metadata:
  name: flink-hello-world
spec:
  image:
    name: flink:1.11.3
  jobManager:
    resources:
      limits:
        memory: "1024Mi"
        cpu: "200m"
  taskManager:
    replicas: 2
    resources:
      limits:
        memory: "2024Mi"
        cpu: "200m"
  job:
    jarFile: /opt/flink/examples/streaming/helloword.jar
    className: org.apache.flink.streaming.examples.wordcount.WordCount
    args: ["--input", "/opt/flink/README.txt"]
    parallelism: 2
  flinkProperties:
    taskmanager.numberOfTaskSlots: "2"

Flink Operator 提交流程大體以下圖所示,首先會啓動一個 Flink Standalone Session Cluster,而後拉起一個 Job Pod 運行用戶代碼,向 Standalone Session Cluster 提交 Job,提交完成後會不斷去跟蹤 Job 的運行狀態。因此運行過程當中會有三類 Pod,即 JobManager、TaskManager、Job Pod。

來源: https://github.com/lyft/flink...

使用 Flink Operator 部署 Flink 做業的好處不言而喻,客戶端不須要像 Flink on Kubernetes Native 部署方式那樣須要 kubeconfig,能夠直接經過 http 接口訪問 API Server。雖然 Flink on Kubernetes Native 部署能夠作到按需自動申請 TM,可是實際上咱們的應用場景基本都是單 Job 的流計算,用戶事先規劃好資源也可接受,並且基於 Flink Operator,咱們能夠作批調度,即 Gang Schedule,能夠避免資源有限的狀況下做業之間互相等待資源 hold 住的狀況(例如大做業先提交,部分 TaskManager 長時間處於資源等待狀態,小做業後提交,小做業申請不到資源也 hold 在那裏傻等)。

自動下載用戶上傳資源

做業與 Flink 內核動態分離,提升靈活性

經過上述的聲明式 API 方式提交部署,咱們能夠看到用戶 jar 包須要事先打到 image 裏,做爲平臺提供方,固然不可能讓每一個用戶本身去打 docker image,有些用戶甚至都不知道怎麼用 docker,因此咱們應該對用戶屏蔽 docker image,用戶只須要上傳 jar 包等資源便可。Flink Operator 提供了 initContainer 選項,藉助它咱們能夠實現自動下載用戶上傳資源,可是爲了簡單,咱們直接修改 docker entrypoint 啓動腳本,先下載用戶上傳的資源,再啓動 Flink 相關進程,用戶上傳的資源經過環境變量聲明。例如:

apiVersion: flinkoperator.Kubernetes.io/v1beta1
kind: FlinkCluster
metadata:
  name: flink-hello-world
spec:
  image:
    name: flink:1.11.3
  envVars:
    - name: FLINK_USER_JAR
      value: hdfs://xxx/path/to/helloword.jar
    - name: FLINK_USER_DEPENDENCIES
      value: hdfs://xxx/path/to/config.json,hdfs://xxx/path/to/vocab.txt
  ...

用戶上傳的依賴能夠是任意文件,跟 Flink on Yarn 的方式不一樣,咱們不用經過 submit 來分發依賴,而是在容器 docker entrypoint 啓動腳本中直接下載到工做目錄,以便用戶能夠在代碼裏以相對路徑的方式(例如 ./config.json)訪問到,若是依賴文件是 jar,則須要將其附加到 classpath 中,爲了避免修改 flink 的腳本,咱們將 jar 附加到環境變量 HADOOP_CLASSPATH上,最後 Flink 相關進程啓動的時候會被加到 Java 的 classpath 中。

對於用戶主類所在的 jar(即環境變量FLINK_USER_JAR),只須要在 Job Pod 的 Container 中下載,若是一樣下載到當前目錄,那麼它也會被附加到classpath中,在提交的時候可能會出現以下類加載連接錯誤,這是由於 Java 啓動的時候加載了一遍,在執行用戶main函數的時候 Flink 又會去加載一遍,因此咱們將主 jar 包下載到一個專門固定目錄,例如/opt/workspace/main/,那麼提交時經過spec.job.jarFile

參數指定到 /opt/workspace/main/xxx.jar 便可。

java.lang.LinkageError: loader constraint violation: loader (instance of sun/misc/Launcher$AppClassLoader) previously initiated loading for a different type with name "org/apache/pulsar/client/api/Authentication"
    at java.lang.ClassLoader.defineClass1(Native Method) ~[?:1.8.0_152]
    at java.lang.ClassLoader.defineClass(ClassLoader.java:763) ~[?:1.8.0_152]
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) ~[?:1.8.0_152]
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:467) ~[?:1.8.0_152]
    at java.net.URLClassLoader.access$100(URLClassLoader.java:73) ~[?:1.8.0_152]
    at java.net.URLClassLoader$1.run(URLClassLoader.java:368) ~[?:1.8.0_152]
    at java.net.URLClassLoader$1.run(URLClassLoader.java:362) ~[?:1.8.0_152]

總的來講,每類 pod 的啓動流程以下圖所示:

img

與微信後臺服務打通

雲原生架構下的資源類型 Demonsets,簡化架構轉型複雜度

用戶的 Flink 做業常常須要在運行過程當中與微信的後臺服務進行交互,在傳統的裸機上訪問微信的後臺服務須要機器部署 Agent 及路由配置,對於 Kubernetes 集羣,在咱們基礎架構中心的同事支持下,微信後臺基礎 Agent 以 DeamonSet 方式打包到部署到每一個節點上,咱們在起 Flink 相關 Container 的時候,帶上 HostIPC 選項並掛載路由配置路徑,就能夠像使用裸機同樣訪問微信的後臺服務。

此外,由於部分 Agent 的 unix sock 文件在母機 /tmp 下,咱們須要在容器裏掛載目錄 /tmp,然而 Flink 運行過程當中 shuffle、web 以及一些臨時文件(例如解壓出來的so等)默認都是放到 /tmp 目錄下,這就會致使做業即便失敗也會殘留一些垃圾到母機上,久而久之,/tmp 目錄勢必會被撐爆,因此咱們在啓動 Java 進程時設置參數 -Djava.io.tmpdir=/opt/workspace/tmp,將 Java 的默認臨時目錄改到容器內的路徑,這樣做業失敗,容器銷燬不至於殘留垃圾。

屬性配置、日誌及監控

日誌與監控,提高可觀測性

從上面的聲明式 yaml 配置能夠看到,提交 Flink 做業時是經過flinkProperties 選項來指定 Flink 屬性參數,事實上 Flink Operator 會將flinkProperties指定的屬性參數以 ConfigMap 形式部署,會覆蓋 image 中的 flink/conf 目錄,因此咱們不能將系統默認屬性配置放到 flink image 中,爲此,咱們在客戶端維護一份 Flink 系統默認配置,在提交的時候會合並用戶填的屬性配置,填充到 flinkProperties 選項中,能夠方便咱們靈活調整 Flink 系統默認配置。

默認狀況下,Flink on Kubernetes部署的做業,其在 Docker Container 中運行的進程都是前臺運行的,使用 log4j-console.properties配置,日誌會直接打到控制檯,這樣就會致使 Flink UI 沒法展現 log,只能去查看 Pod 日誌,此外用戶經過 System.out.println 打的日誌也會混在 log4j 的日誌中,不易區分查看。因此咱們從新定義了 log4j-console.properties,將 log4j 日誌打到FLINK_LOG_DIR 目錄下的文件中,並按大小滾動,爲了能在 Flink UI 上也能看到用戶 stdout 的輸出,在進程啓動命令flink-console.sh 最後加上 2>\&1 | tee ${FLINK_LOG_PREFIX}.out,能夠把控制檯輸出的日誌旁路一份到日誌目錄的文件中。最後 Flink UI 展現的日誌以下圖所示:

對於歷史失敗做業,咱們在Kubernetes上也部署了一個 Flink History Server,能夠靈活地擴縮容,今後不再用擔憂半夜做業掛了自動重啓沒法追溯緣由了。

對於資源及做業的監控,TKE 提供了免費的雲原生 Prometheus 服務 TPS,能夠一鍵部署並關聯咱們的 TKE 集羣,然而咱們在早期已經採用主流的 Prometheus + Grafana 組合部署了監控平臺,這裏就沒有使用TPS。當前咱們有集羣資源、應用組(Namespace)資源、做業資源利用狀況的監控,大體以下圖所示。後面咱們會再將每一個做業 Flink Metric 推到 Prometheus,便於監控做業級別的反壓、gc、operator 流量等信息。

數據應用平臺對接

基於上述基礎的 Flink-on-Kubernetes 能力,就能夠將 Flink 對接到咱們的各類數據應用平臺上。以下圖所示,咱們已經支持用戶使用多樣化的方式使用 Flink,用戶能夠在機器學習平臺拖拽節點或者註冊定製化節點以 Jar 包或 PyFlink 的方式使用,另外也能夠在SQL分析平臺上寫 Flink SQL。

對於 Jar、PyFlink 的方式使用就不詳細展開,對於 Flink SQL 的支持,咱們目前是結合咱們自身的元數據體系,利用 Flink 已有的 SQL 功能。當前實時數倉被業界普遍提起,咱們知道傳統的離線數倉,如 Hive,無外乎是在 HDFS 上套了一層 Schema,那麼實時數倉也相似,數據源一般是 Kafka、Pulsar 這類消息隊列系統,在這之上套一層 Schema 將實時數據管理起來,就能夠稱之爲實時數倉了。咱們基於SQL分析平臺的元數據管理體系,構建 Flink SQL 能力,用戶能夠在SQL分析平臺上註冊/管理庫表元數據,爲了架構簡單,咱們並無去實現本身的 Flink Catalog(元數據操做直接在 SQL分析平臺上完成,無需實現 create、drop 等 API),而是採用以下圖所示的流程來提交 SQL。

用戶在SQL分析平臺上註冊庫表元數據(能夠精細受權管控),而後編輯 SQL 提交,首先SQL分析平臺會作語法校驗、權限及合法性校驗,沒問題後,將 SQL 涉及到的元數據加密打包,連同聲明式配置 Yaml 提交給統一調度平臺,在統一調度平臺上咱們開發了一個 FlinkSQL 類型的做業,本質上就是一個常規的 Flink Jar 做業,即 FlinkSQLDriver ,用於接受 SQL 及其附屬的參數,FlinkSQLDriver 被提交後,解析傳過來的配置,組裝完整的 SQL 語句(包括 DDL、DML),而後調用 tableEnvironment.executeSql逐條執行,因此本質上是將庫表臨時註冊到 default catalog 中。

小結

本文從總體上介紹了微信 Flink-on-Kubernetes實戰經驗以及 Flink 數據應用平臺的概況,一方面咱們提供最基礎的 Flink 計算平臺能力,藉助Kubernetes有效管控集羣,另外一方面咱們在已有的數據通道及元數據平臺上構建實時數倉,提供 Flink SQL 能力,進一步下降用戶使用門檻,對於 Flink SQL 的支持目前還比較初級和原始,後面咱們將結合業務使用狀況探索更多深層次的優化。

【騰訊雲原生】雲說新品、雲研新術、雲遊新活、雲賞資訊,掃碼關注同名公衆號,及時獲取更多幹貨!!
相關文章
相關標籤/搜索