做者:黃良輝sql
本文講述 Flink 在 Shopee 新加坡數據組(Shopee Singapore Data Team)的應用實踐,主要內容包括:數據庫
- 實時數倉建設背景
- Flink 在實時數據數倉建設中結合 Druid、Hive 的應用場景
- 實時任務監控
- Streaming SQL 平臺化
- Streaming Job 管理
- 將來規劃優化方向
Shopee 是東南亞與臺灣領航電商平臺,覆蓋新加坡、馬來西亞、菲律賓、臺灣、印度尼西亞、泰國及越南七大市場,同時在中國深圳、上海和香港設立跨境業務辦公室。緩存
其中包括訂單商品、物流,支付,數字產品等各方面的業務。爲了支持這些互聯網化產品,應對越來的越多的業務挑戰,因而咱們進行了數據倉庫的設計和架構建設。架構
當前隨着業務發展,數據規模的膨脹和商務智能團隊對實時需求的不斷增加,業務挑戰愈來愈大:app
業務維度
而言,業務需求愈來愈複雜,有須要明細數據查詢,又有實時各類維度聚合報表,實時標籤培訓和查詢需求。同時大量業務共享了一些業務邏輯,形成大量業務耦合度高,重複開發。平臺架構
而言,當前任務愈來愈多,管理調度,資源管理,數據質量異常監控等也愈來愈重要。實時化也越來急迫,目前大量業務仍是離線任務形式,致使凌晨服務負載壓力巨大,同時基於 T+1(天、小時級)架構業務沒法知足精細化、實時化運營須要。技術實現
而言,如今實時業務大量採用 Spark Structured Streaming 實現,嚴重依賴 HBase 作 Stateful 需求,開發複雜;在異常故障事故,Task 失敗,缺少 Exactly Once 特性支持,數據易丟失、重複。爲了解決上述問題,因而開始了 Flink 實時數倉的探索。異步
爲了支持這些互聯網化產品不斷增加的的數據和複雜的業務,Shopee 構建以下圖數據倉庫架構,從下到上層來看:ide
數據收集層
,這一層負責實時數據,包括 Binlog、Service Log, Tracking Service Log,通過 Real-time Ingestion 團隊數據將會被收集到 Kafka 、Hbase 中。Auto-Ingestion 團隊負責數據庫數離線平常收集到 HDFS。存儲層
,這層主要是 Kafka 保存實時消息,加上 HDFS 保存 Hive 數據存儲等,HBase 保存維度數據。存儲層
上面是 Spark, Flink 計算引擎, Presto SQL 查詢引擎
。調度管理層
,各類資源管理,任務管理,任務調度,管理各類 Spark,Flink 任務。資源管理層
上一層是 OLAP 數據存儲層
,Druid 用於存儲時間序列數據,Phoenix(HBase)存儲聚合報表數據、維度表數據、標籤數據,Elastic Search 存儲須要多維度字段索引的數據如廣告數據、用戶畫像等。應用層
,數據報表,數據業務服務,用戶畫像等。
目前在 Shopee Data Team 主要從數據分庫 Binlog 以及 Tracking Service 同步到 Kafka 集羣中,經過 Flink/Spark 計算,包含實時訂單商品銷量優惠活動分析,訂單物流分析、產品用戶標新、用戶印象行爲分析,電商活動遊戲運營分析等。最後的結果存到 Druid、 HBase、 HDFS 等,後面接入一些數據應用產品。目前已經有很多核心做業從 Spark Structured Streaming 遷移到 Flink Streaming 實現。函數
在實時訂單銷量分析產品中,經過 Flink 處理訂單流,將處理後的明細數據實時注入Druid,達到公司實時運營活動分析等用途。
咱們使用 T-1(天)的 Lambda 架構來實時和歷史訂單數據產品分析,Flink 只處理實時今天的訂單數據,每日會定時將昨日的數據經過離線任務索引到 Druid 中覆蓋修正實時數據的微小偏差。總體的 Flink 實時處理流程以下圖,從上中下看共三條流水線:工具
第一條流水線,經過 Kafka 接入 訂單 Binlog 事件。性能
KeyBy
進入ProcessWindowFunction
,由於上游數據是 Binlog 會有重複訂單事件,因此會經過 ValueState 來對訂單進行去重。Side Output
進入另外一個 Slow Kafka Topic,以便處理異常訂單。第二條流水線比較複雜,經過多個實時任務將各分表 Slave Binlog
同步到 Hbase Phoenix 表,以便作成實時訂單流的維度表。目前遇到比較多問題仍是常常 Binlog 延遲等問題,以及數據熱點問題。
第三條流基本與第一條相似,相似消息隊列中的 dead message 異常處理。由於大量維度表依賴,不能保證 Phoenix 都在訂單被處理前就被同步到 Phoenix 表,好比新訂單商品,新用戶,新店鋪,新分類,新商品等。因此咱們引入一條實時 backfill 處理流將會對第一條主流,處理失敗的訂單重複處理,直到全部字段都關聯成功纔會進入下游 Druid。
另外爲了不一些過時消息進入死循環,一樣有個事件過濾窗口,保證只保留今日的訂單事件在流水線中被處理。不一樣的是,由於須要區分付款訂單和未付款訂單事件類型(可能一個訂單有兩個狀態事件,當用戶下單時,會有一個下單事件,當用戶完成支付會有一個支付完成事件),因此將訂單是否被處理狀態放在enrichment以後標記重複成功。
由於上游數據源是 Binlog,因此隨着訂單狀態的更新,會有大量的訂單重複事件。
經過使用 Flink State 功能保存在內存中(FsSateBackend),以 ValueState 來標記訂單是否被處理,經過設置 TTL,保證訂單狀態保存24小時過時,如今活動高峯期大概2G State,平均每一個TaskManager大約100M State。Checkpoint interval 設置爲10秒一次,HDFS 負載並不高。同時由於流使用了窗口和自定義 Trigger,致使 State 須要緩衝少許窗口數據。窗口的使用將會在 Enrihcment 流程
優化部分詳細說明。
在 Enrichment 步驟, 業務邏輯複雜,存在大量 IO,咱們作了大量改進優化。
Local RLU Memeory Cache
層,減小 Hbase 的訪問量,加速關聯;對 HBase Row Key Salt Bucket 避免訂單商品表訪問熱點問題。Interval Join
來作,可是因爲一個訂單有多條訂單商品信息,加上上游是 Binlog 事件,以及其餘維度表數據延遲問題,致使業務邏輯複雜,並且計算產出數據保存在 Druid 只能支持增量更新。因此選擇了使用 HBase 存儲來關聯訂單商品信息,附加慢消息處理流來解決數據延遲問題。目前將 Checkpoint 設置爲 exactly once
模式,並開啓了Kafka exactly once
生產者模式,經過 Two Phase Commit
功能保證數據的一致性,避免 task 失敗,job 重啓時致使數據丟失。
監控方面,經過監控 Upstream Kafka Topic,以及 HBase 表寫入更新狀態,結合下游 Druid 數據延遲監控,作到 end-to-end 的 lag 指標監控。經過 Flink Metric Report 彙報 Hbase 訪問性能指標,緩存大小,延遲訂單數量等來對 Flink job 具體步驟性能分析。
在訂單物流實時分析業務,接入 Binlog event 實現支持點更新的物流分析,使用 Flink Retract Stream
功能來支持每當訂單和物流有最新狀態變化事件就觸發下游數據更新。
經過 Interval Join
訂單流和物流流,並使用 Rocksdb State 與 Incremental Checkpoint 來維護最近七天的狀態數據,從 Hbase 來增長用戶維度信息等,維度字段 enrihcment 經過 Local LRU Memory Cache 層來優化查詢,最後定時從 Hbase 導出到 HDFS。
如今將 Flink 任務產生的訂單物流事件保存 HBase 來支持記錄級別的點更新,每小時從 HBase 導出到 HDFS 結果,經過 Presto 接入來作實時分析。HBase 導出到HDFS,經過對 Hbase Row Key Salt Bucket 避免熱點問題,優化減少 Region Size(默認10G)來減小導出時間。可是數據如今延遲仍是比較嚴重,在一個半小時左右,並且鏈路繁瑣。未來考慮加入 Apache Hudi
組建接入 Presto,將延遲降到半小時內。
目前 Shopee 有大量的實時需求經過 SQL 實現,應用場景主要是應用層實時彙總數據報表、維度表更新等。業務經過 SDK 和一站式網站管理兩種方式實現。一是以 SDK 形式提供支持,用戶能夠經過引入 JAR 依賴進行二次項目開發。二是製做了相關網站,經過以任務形式,用戶建立任務編輯保存 SQL 來實現業務需求,目前支持以下:
下面是部分任務組織UI化形式:
當前平臺只支持 Spark SQL 實現 Stream SQL,使用 Hive 存儲元數據,經過關聯維度表 JOIN Apache Phoenix 等外部表和外部服務實現 enrichment 等功能。經過對比 Flink SQL 與 Spark SQL,發現 Spark SQL 很多缺點:
Spark SQL 支持仍是有不少侷限性,目前正在作 Flink SQL 需求導入評估階段,並計劃在 Stream SQL Platform 接入 Flink SQL 的支持。來知足公司愈來愈複雜用戶畫像標籤標註和簡單實時業務 SQL 化,減小業務開發成本。同時須要引入更好的 UDF 管理方式,集成元數據服務簡化開發。
Shopee Data Team 擁有大量的實時任務是經過 Jar 包發佈的,目前在 Job 管理上經過網站頁面化,來減小 Job 維護成本。目前支持環境管理,任務管理,任務應用配置管理,和任務監控報警。
目前能夠配置 Flink / Spark Bin 路徑來支持不一樣的 Flink/Spark 版本,來支持 Flink 升級帶來的多版本問題,並支持一些顏色高亮來區分不一樣環境。
如今支持實時任務的環境檢索,狀態檢索,名字檢索等。支持重啓,禁用,配置任務參數等。任務支持從 checkpoint/savepoint 恢復,中止任務自動保存 savepoint,從 kafka timestamp 啓動。
同時實時任務也支持配置內存,CPU 等 Flink Job 運行參數、JAR 依賴配置等。目前支持預覽,編輯更新等,經過 Jekins CICD 集成與人工干預結果,來完成 Job 的部署升級。
任務應用配置是使用 HOCON 配置格式支持,目前支持共享配置集成,並經過配置名約定將 Checkpoint 路徑自動綁定到配置中。網站支持預覽模式,編輯模式,配置高亮等,未來會集成配置版本回滾等功能。
對於任務監控方面,如今支持任務異常處理報警。異常處理支持自動掛起失敗的任務,並從上次最新 checkpoint 恢復;經過 Flink REST API 檢測 Flink Job 狀態,來避免 Flink Job 異常形成的假活狀態。出現任務重啓,異常狀況會經過郵件等方式給任務負責人發報警,將來打算在網站集成 Grafana/Promethus 等監控工具來完成任務監控自動化等。
整體而言,Flink 在 Shopee 從 2019 年末開始調研,到項目落地不到半年時間,已經完成業務大量需求導入評估,對 Exactly Once,Kafka Exactly Once Semantics,Two Phase Commit,Interval Join,Rocksdb/FS State 一系列的功能進行了驗證。在將來規劃上:
做者簡介:
黃良輝,2019 年加入 Shopee,在 Shoppe Data Team 負責實時數據業務和數據產品開發。