數倉大法好!跨境電商 Shopee 的實時數倉之路

做者:黃良輝sql

本文講述 Flink 在 Shopee 新加坡數據組(Shopee Singapore Data Team)的應用實踐,主要內容包括:數據庫

  • 實時數倉建設背景
  • Flink 在實時數據數倉建設中結合 Druid、Hive 的應用場景
  • 實時任務監控
  • Streaming SQL 平臺化
  • Streaming Job 管理
  • 將來規劃優化方向

建設背景

Shopee 是東南亞與臺灣領航電商平臺,覆蓋新加坡、馬來西亞、菲律賓、臺灣、印度尼西亞、泰國及越南七大市場,同時在中國深圳、上海和香港設立跨境業務辦公室。緩存

  • Shopee在2020年第一季的總訂單量高達4.298億,同比增加111.2%。
  • 根據App Annie, Shopee在 2020年第一季強勢躋身全球購物類 App下載量前三名。
  • 同時斬獲東南亞及臺灣市場購物類 App 年度總下載量、平均月活數、安卓使用總時長三項冠軍,並領跑東南亞兩大頭部市場,拿下印尼及越南年度購物類 App 下月活量雙冠王。

其中包括訂單商品、物流,支付,數字產品等各方面的業務。爲了支持這些互聯網化產品,應對越來的越多的業務挑戰,因而咱們進行了數據倉庫的設計和架構建設。架構

數據倉庫挑戰

當前隨着業務發展,數據規模的膨脹和商務智能團隊對實時需求的不斷增加,業務挑戰愈來愈大:app

  • 業務維度而言,業務需求愈來愈複雜,有須要明細數據查詢,又有實時各類維度聚合報表,實時標籤培訓和查詢需求。同時大量業務共享了一些業務邏輯,形成大量業務耦合度高,重複開發。
  • 平臺架構而言,當前任務愈來愈多,管理調度,資源管理,數據質量異常監控等也愈來愈重要。實時化也越來急迫,目前大量業務仍是離線任務形式,致使凌晨服務負載壓力巨大,同時基於 T+1(天、小時級)架構業務沒法知足精細化、實時化運營須要。
  • 技術實現而言,如今實時業務大量採用 Spark Structured Streaming 實現,嚴重依賴 HBase 作 Stateful 需求,開發複雜;在異常故障事故,Task 失敗,缺少 Exactly Once 特性支持,數據易丟失、重複。

爲了解決上述問題,因而開始了 Flink 實時數倉的探索。異步

數據倉庫架構

爲了支持這些互聯網化產品不斷增加的的數據和複雜的業務,Shopee 構建以下圖數據倉庫架構,從下到上層來看:ide

data_arch.png

  • 最底層是數據收集層,這一層負責實時數據,包括 Binlog、Service Log, Tracking Service Log,通過 Real-time Ingestion 團隊數據將會被收集到 Kafka 、Hbase 中。Auto-Ingestion 團隊負責數據庫數離線平常收集到 HDFS。
  • 而後是存儲層,這層主要是 Kafka 保存實時消息,加上 HDFS 保存 Hive 數據存儲等,HBase 保存維度數據。
  • 存儲層上面是 Spark, Flink 計算引擎, Presto SQL 查詢引擎
  • 而後是調度管理層,各類資源管理,任務管理,任務調度,管理各類 Spark,Flink 任務。
  • 資源管理層上一層是 OLAP 數據存儲層,Druid 用於存儲時間序列數據,Phoenix(HBase)存儲聚合報表數據、維度表數據、標籤數據,Elastic Search 存儲須要多維度字段索引的數據如廣告數據、用戶畫像等。
  • 最上層是應用層,數據報表,數據業務服務,用戶畫像等。

Flink 實時數據數倉實踐

pipeline.png

目前在 Shopee Data Team 主要從數據分庫 Binlog 以及 Tracking Service 同步到 Kafka 集羣中,經過 Flink/Spark 計算,包含實時訂單商品銷量優惠活動分析,訂單物流分析、產品用戶標新、用戶印象行爲分析,電商活動遊戲運營分析等。最後的結果存到 Druid、 HBase、 HDFS 等,後面接入一些數據應用產品。目前已經有很多核心做業從 Spark Structured Streaming 遷移到 Flink Streaming 實現。函數

Flink 與 Druid 結合的實時數倉應用

在實時訂單銷量分析產品中,經過 Flink 處理訂單流,將處理後的明細數據實時注入Druid,達到公司實時運營活動分析等用途。
咱們使用 T-1(天)的 Lambda 架構來實時和歷史訂單數據產品分析,Flink 只處理實時今天的訂單數據,每日會定時將昨日的數據經過離線任務索引到 Druid 中覆蓋修正實時數據的微小偏差。總體的 Flink 實時處理流程以下圖,從上中下看共三條流水線:工具

order_item_streaming.png

第一條流水線,經過 Kafka 接入 訂單 Binlog 事件。性能

  • 首先,解析反序列化訂單事件,經過訂單時間過濾無效訂單,只保留今日訂單。經過訂單主鍵 KeyBy 進入ProcessWindowFunction,由於上游數據是 Binlog 會有重複訂單事件,因此會經過 ValueState 來對訂單進行去重。
  • 而後,經過查詢 HBase (Phoenix 表)進行Enrichment 維度字段,從 Phoenix 表中獲取訂單商品信息,分類,用戶信息等。
  • 最後,經過判斷是否全部字段成功關聯,若是全部字段都關聯成功將會把消息打入下游kafka,並實時注入到 Druid;若是有字段關聯失敗將會把訂單事件經過 Side Output 進入另外一個 Slow Kafka Topic,以便處理異常訂單。

第二條流水線比較複雜,經過多個實時任務將各分表 Slave Binlog 同步到 Hbase Phoenix 表,以便作成實時訂單流的維度表。目前遇到比較多問題仍是常常 Binlog 延遲等問題,以及數據熱點問題。

第三條流基本與第一條相似,相似消息隊列中的 dead message 異常處理。由於大量維度表依賴,不能保證 Phoenix 都在訂單被處理前就被同步到 Phoenix 表,好比新訂單商品,新用戶,新店鋪,新分類,新商品等。因此咱們引入一條實時 backfill 處理流將會對第一條主流,處理失敗的訂單重複處理,直到全部字段都關聯成功纔會進入下游 Druid。

另外爲了不一些過時消息進入死循環,一樣有個事件過濾窗口,保證只保留今日的訂單事件在流水線中被處理。不一樣的是,由於須要區分付款訂單和未付款訂單事件類型(可能一個訂單有兩個狀態事件,當用戶下單時,會有一個下單事件,當用戶完成支付會有一個支付完成事件),因此將訂單是否被處理狀態放在enrichment以後標記重複成功。

訂單事件狀態維護

由於上游數據源是 Binlog,因此隨着訂單狀態的更新,會有大量的訂單重複事件。

經過使用 Flink State 功能保存在內存中(FsSateBackend),以 ValueState 來標記訂單是否被處理,經過設置 TTL,保證訂單狀態保存24小時過時,如今活動高峯期大概2G State,平均每一個TaskManager大約100M State。Checkpoint interval 設置爲10秒一次,HDFS 負載並不高。同時由於流使用了窗口和自定義 Trigger,致使 State 須要緩衝少許窗口數據。窗口的使用將會在 Enrihcment 流程優化部分詳細說明。

Enrichment 流程優化

在 Enrichment 步驟, 業務邏輯複雜,存在大量 IO,咱們作了大量改進優化。

  • 首先,從 HBase 表關聯字段,經過增長 Local RLU Memeory Cache 層,減小 Hbase 的訪問量,加速關聯;對 HBase Row Key Salt Bucket 避免訂單商品表訪問熱點問題。
  • 第二,HBase 表直接訪問層(Service)經過 Google Guice 管理依賴方便配置管理,內存 Cache 關聯等。
  • 第三,因爲商品表和訂單商品同步到 HBase 有必定延遲,致使大量的訂單事件進入 Slow Kafka topic,因此經過設置窗口和自定義 Trigger 保證訂單數量到必定數量或者窗口超時才觸發窗口數據的處理,優化後能保證98%的訂單在主流被成功處理。
  • 最後,在訂單關聯訂單商品時,考慮過使用 Interval Join 來作,可是因爲一個訂單有多條訂單商品信息,加上上游是 Binlog 事件,以及其餘維度表數據延遲問題,致使業務邏輯複雜,並且計算產出數據保存在 Druid 只能支持增量更新。因此選擇了使用 HBase 存儲來關聯訂單商品信息,附加慢消息處理流來解決數據延遲問題。

數據質量保障和監控

目前將 Checkpoint 設置爲 exactly once 模式,並開啓了Kafka exactly once 生產者模式,經過 Two Phase Commit 功能保證數據的一致性,避免 task 失敗,job 重啓時致使數據丟失。
監控方面,經過監控 Upstream Kafka Topic,以及 HBase 表寫入更新狀態,結合下游 Druid 數據延遲監控,作到 end-to-end 的 lag 指標監控。經過 Flink Metric Report 彙報 Hbase 訪問性能指標,緩存大小,延遲訂單數量等來對 Flink job 具體步驟性能分析。

Flink 與 Hive 結合的實時數倉應用

在訂單物流實時分析業務,接入 Binlog event 實現支持點更新的物流分析,使用 Flink Retract Stream 功能來支持每當訂單和物流有最新狀態變化事件就觸發下游數據更新。

經過 Interval Join 訂單流和物流流,並使用 Rocksdb State 與 Incremental Checkpoint 來維護最近七天的狀態數據,從 Hbase 來增長用戶維度信息等,維度字段 enrihcment 經過 Local LRU Memory Cache 層來優化查詢,最後定時從 Hbase 導出到 HDFS。

order_logistic_streaming.png

如今將 Flink 任務產生的訂單物流事件保存 HBase 來支持記錄級別的點更新,每小時從 HBase 導出到 HDFS 結果,經過 Presto 接入來作實時分析。HBase 導出到HDFS,經過對 Hbase Row Key Salt Bucket 避免熱點問題,優化減少 Region Size(默認10G)來減小導出時間。可是數據如今延遲仍是比較嚴重,在一個半小時左右,並且鏈路繁瑣。未來考慮加入 Apache Hudi 組建接入 Presto,將延遲降到半小時內。

Streaming SQL 應用與管理

目前 Shopee 有大量的實時需求經過 SQL 實現,應用場景主要是應用層實時彙總數據報表、維度表更新等。業務經過 SDK 和一站式網站管理兩種方式實現。一是以 SDK 形式提供支持,用戶能夠經過引入 JAR 依賴進行二次項目開發。二是製做了相關網站,經過以任務形式,用戶建立任務編輯保存 SQL 來實現業務需求,目前支持以下:

  • 任務列表、分組管理,支持重啓,中止,禁用任務功能。
  • 任務支持 crontab 規則定時執行調度模式和 Streaming 模式。
  • JAR 資源管理,任務自定義 JAR 引用,以便重複使用 UDF 等。
  • 通用 SQL 資源管理,任務引入共享 SQL 文件,避免重複 SQL 邏輯、重複定義 View 以及環境配置等。
  • 用戶分組權限管理。
  • 集成 Garafna 作任務延遲報警。

下面是部分任務組織UI化形式:

stream_sql_platform.png

當前平臺只支持 Spark SQL 實現 Stream SQL,使用 Hive 存儲元數據,經過關聯維度表 JOIN Apache Phoenix 等外部表和外部服務實現 enrichment 等功能。經過對比 Flink SQL 與 Spark SQL,發現 Spark SQL 很多缺點:

  • Spark SQL 窗口函數種類少,沒有 Flink 的支持靈活,致使大量聚合任務沒法經過平臺 SQL 化。
  • Spark Stateful 狀態控制差,沒有 Flink Rocksdb State 增量狀態支持。
  • Spark 關聯維度表時,之前在每次 micro-batch 中都須要加載全量維度表,如今已經改成 GET 方式,Lookup 性能方面已經有提高很多,但仍是沒有像 Flink 異步 Lookup 那樣的異步功能,提升性能。
  • 沒有 Flink Snapshot 和 Two Phase Commit 功能的支持,致使任務重啓,失敗恢復會出現數據不一致,失去準確性。

Spark SQL 支持仍是有不少侷限性,目前正在作 Flink SQL 需求導入評估階段,並計劃在 Stream SQL Platform 接入 Flink SQL 的支持。來知足公司愈來愈複雜用戶畫像標籤標註和簡單實時業務 SQL 化,減小業務開發成本。同時須要引入更好的 UDF 管理方式,集成元數據服務簡化開發。

Streaming Job 管理

Shopee Data Team 擁有大量的實時任務是經過 Jar 包發佈的,目前在 Job 管理上經過網站頁面化,來減小 Job 維護成本。目前支持環境管理,任務管理,任務應用配置管理,和任務監控報警。

環境管理

目前能夠配置 Flink / Spark Bin 路徑來支持不一樣的 Flink/Spark 版本,來支持 Flink 升級帶來的多版本問題,並支持一些顏色高亮來區分不一樣環境。

env.png

任務管理

如今支持實時任務的環境檢索,狀態檢索,名字檢索等。支持重啓,禁用,配置任務參數等。任務支持從 checkpoint/savepoint 恢復,中止任務自動保存 savepoint,從 kafka timestamp 啓動。

flink_job_admin.png

任務配置管理

同時實時任務也支持配置內存,CPU 等 Flink Job 運行參數、JAR 依賴配置等。目前支持預覽,編輯更新等,經過 Jekins CICD 集成與人工干預結果,來完成 Job 的部署升級。

flink_job_config.png

任務應用配置管理

任務應用配置是使用 HOCON 配置格式支持,目前支持共享配置集成,並經過配置名約定將 Checkpoint 路徑自動綁定到配置中。網站支持預覽模式,編輯模式,配置高亮等,未來會集成配置版本回滾等功能。

flink_app_config.png

任務監控報警

對於任務監控方面,如今支持任務異常處理報警。異常處理支持自動掛起失敗的任務,並從上次最新 checkpoint 恢復;經過 Flink REST API 檢測 Flink Job 狀態,來避免 Flink Job 異常形成的假活狀態。出現任務重啓,異常狀況會經過郵件等方式給任務負責人發報警,將來打算在網站集成 Grafana/Promethus 等監控工具來完成任務監控自動化等。

將來規劃

整體而言,Flink 在 Shopee 從 2019 年末開始調研,到項目落地不到半年時間,已經完成業務大量需求導入評估,對 Exactly Once,Kafka Exactly Once Semantics,Two Phase Commit,Interval Join,Rocksdb/FS State 一系列的功能進行了驗證。在將來規劃上:

  • 首先,會嘗試更多的實時任務 Flink SQL 化,進一步實現流批統一;
  • 其次,會對目前大量 Spark structured Streaming Job 遷移到 Flink 實現,並對新業務進行 Flink 探索。
  • 在 Streaming SQL Platform 也會加入 Flink SQL 支持,來解決當前平臺遇到一些性能瓶頸和業務支持侷限性。

做者簡介:

黃良輝,2019 年加入 Shopee,在 Shoppe Data Team 負責實時數據業務和數據產品開發。

相關文章
相關標籤/搜索