簡介:惟品會 Flink 的容器化實踐應用,Flink SQL 平臺化建設,以及在實時數倉和實驗平臺上的應用案例。
轉自dbaplus社羣公衆號
做者:王康,惟品會數據平臺高級開發工程師git
GitHub 地址
https://github.com/apache/flink
歡迎你們給 Flink 點贊送 star~github
自 2017 年起,爲保障內部業務在平時和大促期間的平穩運行,惟品會就開始基於 Kubernetes 深刻打造高性能、穩定、可靠、易用的實時計算平臺,如今的平臺支持 Flink、Spark、Storm 等主流框架。數據庫
本文將分爲五個方面,分享惟品會 Flink 的容器化實踐應用以及產品化經驗:apache
在集羣規模方面,咱們有 2000+ 的物理機,主要部署 Kubernetes 異地雙活的集羣,利用 Kubernetes 的 namespaces,labels 和 taints 等實現業務隔離以及初步的計算負載隔離。json
Flink 任務數、Flink SQL 任務數、Storm 任務數、Spark 任務數,這些線上實時應用加起來有 1000 多個。目前咱們主要支持 Flink SQL 這一塊,由於 SQL 化是一個趨勢,因此咱們要支持 SQL 任務的上線平臺。api
咱們從下往上進行解析實時計算平臺的總體架構:緩存
其實是用 deployment 的模式運行 Kubernetes 上,平臺雖然支持 yarn 調度,可是 yarn 調度與批任務共享資源,因此主流任務仍是運行在 Kubernetes 上的。而且,yarn 調度這一層主要是離線部署的一套 yarn 集羣。在 2017 年的時候,咱們自研了 Flink on Kubernetes 的一套方案,由於底層調度分了兩層,因此在大促資源緊張的時候,實時跟離線就能夠作一個資源的借調。架構
主要用來支持公司內部基於 Kafka 的實時數據 vms,基於 binlog 的 vdp 數據和原生 Kafka 做爲消息總線,狀態存儲在 HDFS 上,數據主要存入 Redis、MySQL、HBase、Kudu、HDFS、ClickHouse 等。併發
主要是 Flink、Storm、Spark,目前主推的是 Flink,每一個框架會都會支持幾個版本的鏡像以知足不一樣的業務需求。app
主要提供做業配置、調度、版本管理、容器監控、job 監控、告警、日誌等功能,提供多租戶的資源管理(quota,label 管理)以及 Kafka 監控。資源配置也分爲大促日和日常日,大促的資源和日常的資源是不同的,資源的權限管控也是不同的。在 Flink 1.11 版本以前,平臺自建元數據管理系統爲 Flink SQL 管理 schema;從 1.11 版本開始,則是經過 Hive metastore 與公司元數據管理系統融合。
主要是支持實時大屏、推薦、實驗平臺、實時監控和實時數據清洗的一些場景。
上面是實時平臺 Flink 容器化的架構圖。Flink 容器化實際上是基於 Standalone 模式部署的。
咱們的部署模式共有 Client、Job Manager、Task Manager 三個角色,每個角色都會有一個 Deployment 來控制。
用戶經過平臺上傳任務 jar 包、配置等,存儲於 HDFS 上。同時由平臺維護的配置、依賴等也存儲在 HDFS 上,當 pod 啓動時,就會進行拉取等初始化操做。
Client 中主進程是一個由 go 開發的 agent,當 Client 啓動時,會首先檢查集羣狀態,當集羣準備好後,從 HDFS 上拉取 jar 包,再向這個集羣提交任務。Client 的主要任務是作容錯,它主要功能還有監控任務狀態,作 savepoint 等操做。
經過部署在每臺物理機上的 smart-agent 採集容器的指標寫入 m3,以及經過 Flink 暴漏的接口將 metrics 寫入 prometheus,結合 grafana 展現。一樣經過部署在每臺物理機上的 vfilebeat 採集掛載出來的相關日誌寫入 es,在 dragonfly 能夠實現日誌檢索。
1)Flink 平臺化
在實踐過程當中,必定要結合具體場景和易用性,再去考慮作平臺化工做。
2)Flink 穩定性
在咱們應用部署以及運行過程當中,異常是不可避免的,這時候平臺就須要作一些保證任務在出現異常情況後,依舊保持穩定性的一些策略。
pod 的健康和可用:
由 livenessProbe 和 readinessProbe 檢測,同時指定 pod 的重啓策略,Kubernetes 自己能夠作一個 pod 的拉起。
Flink 任務產生異常時:
在 Client 中會定時監控 Flink 狀態,同時將最新的 checkpoint 地址更新到本身的緩存中,並彙報到平臺,而後固化到 MySQL 中。當 Flink 沒法再重啓時,由 Client 從新從最新的成功 checkpoint 提交任務。這是它的第二層保障。
這一層將 checkpoint 固化到 MySQL 中後,就再也不使用 Flink HA 機制了,少了 zk 的組件依賴。
機房容災:
Kafka 監控是任務監控裏很是重要的一個環節,總體的流程以下:
平臺提供監控 Kafka 堆積,用戶在界面上,能夠配置本身的 Kafka 監控,告知在怎樣的集羣,以及用戶消費 message 等配置信息。能夠從 MySQL 中將用戶 Kafka 監控配置提取後,再經過 jmx 監控 Kafka,這樣的信息採集以後,寫入下游 Kafka,再經過另外一個 Flink 任務實時監控告警,同時將這些數據同步寫入 ck 裏面,從而反饋給咱們的用戶(這裏也能夠不用 ck,用 Prometheus 去作監控也是能夠的,但 ck 會更加適合),最後再用 Grafana 組件去展現給用戶。
有了前面 Flink 的容器化方案以後,就要開始 Flink SQL 平臺化建設了。你們都知道,這樣流式的 api 開發起來,仍是有必定的成本的。 Flink 確定是比 Storm 快的,也相對比較穩定、容易一些,可是對於一些用戶,特別是 Java 開發的一些同窗來講,作這個是有必定門檻的。
Kubernetes 的 Flink 容器化實現之後,方便了 Flink api 應用的發佈,可是對於 Flink SQL 的任務仍然不夠便利。因而平臺提供了更加方便的在線編輯發佈、SQL 管理等一棧式開發平臺。
平臺的 Flink SQL 方案如上圖所示,任務發佈系統與元數據管理系統是徹底解耦的。
1)Flink SQL 任務發佈平臺化
在實踐過程當中,須要考慮易用性,作平臺化工做,主操做界面以下圖所示:
下圖是一個用戶界面配置的例子:
下圖是一個集羣配置的範例:
2)元數據管理
平臺在 1.11 以前經過構建本身的元數據管理系統 UDM,MySQL 存儲 Kafka,Redis 等 schema,經過自定義 catalog 打通 Flink 與 UDM,從而實現元數據管理。
在 1.11 以後,Flink 集成 Hive 逐漸完善,平臺重構了 Flink SQL 框架,並經過部署一個 SQL-gateway service 服務,中間調用本身維護的 SQL-Client jar 包,從而與離線元數據打通,實現了實時離線元數據的統一,爲以後的流批一體打好了基礎。
在元數據管理系統建立的 Flink 表操做界面以下圖所示:建立 Flink 表的元數據,持久化到 Hive 裏,Flink SQL 啓動時從 Hive 裏讀取對應表的 table schema 信息。
平臺對於官方原生支持或者不支持的 connector 進行整合和開發,鏡像和 connector,format 等相關依賴進行解耦,能夠快捷的進行更新與迭代。
1)Flink SQL 相關實踐
Flink SQL 主要分爲如下三層:
connector 層
runtime 層
平臺層
2)拓撲圖執行計劃修改
針對現階段 SQL 生成的 stream graph 並行度沒法修改等問題,平臺提供可修改的拓撲預覽修改相關參數。平臺會將解析後的 FlinkSQL 的 excution plan json 提供給用戶,利用 uid 保證算子的惟一性,修改每一個算子的並行度,chain 策略等,也爲用戶解決反壓問題提供方法。例如針對 ClickHouse sink 小併發大批次的場景,咱們支持修改 ClickHouse sink 並行度,source 並行度 = 72,sink 並行度 = 24,提升 ClickHouse sink tps。
3)維表關聯 keyBy 優化 cache
針對維表關聯的狀況,爲了下降 IO 請求次數,下降維表數據庫讀壓力,從而下降延遲,提升吞吐,有如下三種措施:
下面是維表關聯 KeyBy 優化 cache 的圖:
在優化以前的時候,維表關聯 LookupJoin 算子和正常算子 chain 在一塊兒,優化之間維表關聯 Lookup Join 算子和正常算子不 chain 在一塊兒,將join key 做爲 hash 策略的 key。
採用這種方式優化後,例如原來的 3000W 數據量維表,10 個 TM 節點,每一個節點都要緩存 3000W 的數據,總共須要緩存 3 億的量。而通過 keyBy 優化以後,每一個 TM 節點只須要緩存 3000W/10 = 300W 的數據量,總共緩存的數據量只有 3000W,這很是大程度減小了緩存數據量。
4)維表關聯延遲 join
維表關聯中,有不少業務場景,在維表數據新增數據以前,主流數據已經發生 join 操做,會出現關聯不上的狀況。所以,爲了保證數據的正確,將關聯不上的數據進行緩存,進行延遲 join。
最簡單的作法是,在維表關聯的 function 裏設置重試次數和重試間隔,這個方法會增大整個流的延遲,但主流 qps 不高的狀況下,能夠解決問題。
增長延遲 join 的算子,當 join 維表未關聯時,先緩存起來,根據設置重試次數和重試間隔從而進行延遲的 join。
1)實時數據入倉
實時數倉主要分爲三個過程:
使用 Flink SQL 作流式數據入倉是很是方便的,並且 1.12 版本已經支持了小文件的自動合併,解決了大數據層一個很是廣泛的痛點。
咱們自定義分區提交策略,當前分區 ready 時候會調一下實時平臺的分區提交 api,在離線調度定時調度經過這個 api 檢查分區是否 ready。
採用 Flink SQL 統一入倉方案之後,咱們可得到如下成果:
2)實時指標計算
以往指標計算一般採用 Storm 方式,這個方式須要經過 api 定製化開發,採用這樣 Flink 方案之後,咱們能夠得到了如下成果:
3)實時離線一體化ETL數據集成
具體的流程以下圖所示:
Flink SQL 在最近的版本中持續強化了維表 join 的能力,不只能夠實時關聯數據庫中的維表數據,還能關聯 Hive 和 Kafka 中的維表數據,能靈活知足不一樣工做負載和時效性的需求。
基於 Flink 強大的流式 ETL 的能力,咱們能夠統一在實時層作數據接入和數據轉換,而後將明細層的數據迴流到離線數倉中。
咱們經過將 presto 內部使用的 HyperLogLog(後面簡稱 HLL)實現引入到 Spark UDAF 函數裏,打通 HLL 對象在 Spark SQL 與 presto 引擎之間的互通。如 Spark SQL 經過 prepare 函數生成的 HLL 對象,不只能夠在 Spark SQL 裏 merge 查詢並且能夠在 presto 裏進行 merge 查詢。
具體流程以下:
UV 近似計算示例:
惟品會實驗平臺是經過配置多維度分析和下鑽分析,提供海量數據的 A/B-test 實驗效果分析的一體化平臺。一個實驗是由一股流量(好比用戶請求)和在這股流量上進行的相對對比實驗的修改組成。實驗平臺對於海量數據查詢有着低延遲、低響應、超大規模數據(百億級)的需求。
總體數據架構以下:
業務數據流以下:
咱們的實驗平臺有一個很重要的 ES 場景,咱們上線一個應用場景後,若是我想看效果如何,包括上線產生的曝光、點擊、加購、收藏是怎樣的。咱們須要把每個數據的明細,好比說分流的一些數據,根據場景分區,寫到 ck 裏面去。
咱們經過 Flink SQL Redis connector,支持 Redis 的 sink 、source 維表關聯等操做,能夠很方便地讀寫 Redis,實現維表關聯,維表關聯內可配置 cache ,極大提升應用的 TPS。經過 Flink SQL 實現實時數據流的 pipeline,最終將大寬表 sink 到 CK 裏,並按照某個字段粒度作 murmurHash3\_64 存儲,保證相同用戶的數據都存在同一 shard 節點組內,從而使得 ck 大表之間的 join 變成 local 本地表之間的 join,減小數據 shuffle 操做,提高 join 查詢效率。
Flink SQL 對於 Hive 用戶來講,使用起來仍是有一點不同的地方。無論是 Hive,仍是 Spark SQL,都是批量處理的一個場景。
因此當前咱們的 Flink SQL 調試起來仍有不少不方便的地方,對於作離線 Hive 的用戶來講還有必定的使用門檻,例如手動配置 Kafka 監控、任務的壓測調優。因此如何能讓用戶的使用門檻降至最低,讓用戶只須要懂 SQL 或者懂業務,把 Flink SQL 裏面的概念對用戶屏蔽掉,簡化用戶的使用流程,是一個比較大的挑戰。
未來咱們考慮作一些智能監控,告訴用戶當前任務存在的問題,不須要用戶去學習太多的東西,儘量自動化並給用戶一些優化建議。
一方面,咱們作數據湖主要是爲了解決咱們 binlog 實時更新的場景,目前咱們的 VDP binlog 消息流,經過 Flink SQL 寫入到 Hive ods 層,以加速 ods 層數據源的準備時間,可是會產生大量重複消息去重合並。咱們會考慮 Flink + 數據湖的 cdc 入倉方案來作增量入倉。
另外一方面咱們但願經過數據湖,來替代咱們 Kudu,咱們這邊一部分重要的業務在用 Kudu。雖然 Kudu 沒有大量的使用,但鑑於 Kudu 的運維比通常的數據庫運維複雜得多、比較小衆,而且像訂單打寬以後的 Kafka 消息流、以及聚合結果都須要很是強的實時 upsert 能力,因此咱們就開始調研 CDC+數據湖這種解決方案,用這種方案的增量 upsert 能力來替換 kudu 增量 upsert 場景。
Q1:vdp connector 是 MySQL binlog 讀取嗎?和 canal是一種工具嗎?
A1 :vdp 是公司 binlog 同步的一個組件,將 binlog 解析以後發送到 Kafka。是基於 canal 二次開發的。咱們定義了一個 cdc format 能夠對接公司的 vdp Kafka 數據源,與 Canal CDC format 有點相似。目前沒有開源,使咱們公司用的 binlog 的一個同步方案。
Q2 : uv 數據輸出到 HBase,銷售數據輸出到 kudu,輸出到了不一樣的數據源,主要是由於什麼採起的這種策略?
A2 :kudu 的應用場景沒有 HBase 這麼普遍。uv 實時寫入的 TPS 比較高,HBase 比較適合單條查詢的場景,寫入 HBase 高吞吐 + 低延遲,小範圍查詢延遲低;kudu 的話具有一些 OLAP 的特性,能夠存訂單類明細,列存加速,結合 Spark、presto 等作 OLAP 分析。
Q3 : 請問一下,大家怎麼解決的 ClickHouse 的數據更新問題?好比數據指標更新。
A3 : ck 的更新是異步 merge,只能在同一 shard 同一節點同一分區內異步 merge,是弱一致性。對於指標更新場景不太建議使用 ck。若是在 ck 裏有更新強需求的場景,能夠嘗試 AggregatingMergeTree 解決方案,用 insert 替換 update,作字段級的 merge。
Q4:binlog 寫入怎麼保證數據的去重和一致性?
A4 : binlog 目前尚未寫入 ck 的場景,這個方案看起來不太成熟。不建議這麼作,能夠用採用 CDC + 數據湖的解決方案。
Q5 : 若是 ck 各個節點寫入不均衡,怎麼去監控,怎麼解決?怎麼樣看數據傾斜呢?
A5 :能夠經過 ck 的 system.parts 本地表監控每臺機器每一個表每一個分區的寫入數據量以及 size,來查看數據分區,從而定位到某個表某臺機器某個分區。
Q6 : 大家在實時平臺是如何作任務監控或者健康檢查的?又是如何在出錯後自動恢復的?如今用的是 yarn-application 模式嗎?存在一個 yarn application 對應多個 Flink job 的狀況嗎?
A6 : 對於 Flink 1.12+ 版本,支持了 PrometheusReporter 方式暴露一些 Flink metrics 指標,好比算子的 watermark、checkpoint 相關的指標如 size、耗時、失敗次數等關鍵指標,而後採集、存儲起來作任務監控告警。
Flink 原生的 restart 策略和 failover 機制,做爲第一層的保證。
在 Client 中會定時監控 Flink 狀態,同時將最新的 checkpoint 地址更新到本身的緩存中,並彙報到平臺,固化到 MySQL 中。當 Flink 沒法再重啓時,由 Client 從新從最新的成功 checkpoint 提交任務。做爲第二層保證。這一層將 checkpoint 固化到 MySQL 中後,就再也不使用 Flink HA 機制了,少了 zk 的組件依賴。
當前兩層沒法重啓時或集羣出現異常時,由平臺自動從固化到 MySQL 中的最新 chekcpoint 從新拉起一個集羣,提交任務,做爲第三層保證。
咱們支持 yarn-per-job 模式,主要基於 Flink on Kubernetes 模式部署 standalone 集羣。
Q7 : 目前大家大數據平臺上全部的組件都是容器化的仍是混合的?
A7 :目前咱們實時這一塊的組件 Flink、Spark 、Storm、Presto 等計算框架實現了容器化,詳情可看上文 1.2 平臺架構。
Q8 :kudu 不是在 Kubernetes 上跑的吧?
A8 :kudu 不是在 Kubernetes 上運行,這個目前尚未特別成熟的方案。而且 kudu 是基於 cloudera manager 運維的,沒有上 Kubernetes 的必要。
Q9 : Flink 實時數倉維度表存到 ck 中,再去查詢 ck,這樣的方案能夠嗎?
A9:這是能夠的,是能夠值得嘗試的。事實表與維度表數據均可以存,能夠按照某個字段作哈希(好比 user\_id),從而實現 local join 的效果。
本文內容由阿里雲實名註冊用戶自發貢獻,版權歸原做者全部,阿里雲開發者社區不擁有其著做權,亦不承擔相應法律責任。具體規則請查看《阿里雲開發者社區用戶服務協議》和《阿里雲開發者社區知識產權保護指引》。若是您發現本社區中有涉嫌抄襲的內容,填寫侵權投訴表單進行舉報,一經查實,本社區將馬上刪除涉嫌侵權內容。