用Flink取代Spark Streaming!知乎實時數倉架構演進

做者 | 知乎數據工程團隊前端

「數據智能」 (Data Intelligence) 有一個必須且基礎的環節,就是數據倉庫的建設,同時,數據倉庫也是公司數據發展到必定規模後必然會提供的一種基礎服務。從智能商業的角度來說,數據的結果表明了用戶的反饋,獲取結果的及時性就顯得尤其重要,快速的獲取數據反饋可以幫助公司更快的作出決策,更好的進行產品迭代,實時數倉在這一過程當中起到了不可替代的做用。算法

本文主要講述知乎的實時數倉實踐以及架構的演進,這包括如下幾個方面:sql

  • 實時數倉 1.0 版本,主題:ETL 邏輯實時化,技術方案:Spark Streaming。
  • 實時數倉 2.0 版本,主題:數據分層,指標計算實時化,技術方案:Flink Streaming。
  • 實時數倉將來展望:Streaming SQL 平臺化,元信息管理系統化,結果驗收自動化。

實時數倉 1.0 版本

1.0 版本的實時數倉主要是對流量數據作實時 ETL,並不計算實時指標,也未創建起實時數倉體系,實時場景比較單一,對實時數據流的處理主要是爲了提高數據平臺的服務能力。實時數據的處理向上依賴數據的收集,向下關係到數據的查詢和可視化,下圖是實時數倉 1.0 版本的總體數據架構圖。緩存

第一部分是數據採集,由三端 SDK 採集數據並經過 Log Collector Server 發送到 Kafka。第二部分是數據 ETL,主要完成對原始數據的清洗和加工並分實時和離線導入 Druid。第三部分是數據可視化,由 Druid 負責計算指標並經過 Web Server 配合前端完成數據可視化。安全

其中第1、三部分的相關內容請分別參考:知乎客戶端埋點流程、模型和平臺技術,Druid 與知乎數據分析平臺,此處咱們詳細介紹第二部分。因爲實時數據流的穩定性不如離線數據流,當實時流出現問題後須要離線數據重刷歷史數據,所以實時處理部分咱們採用了 lambda 架構。微信

Lambda 架構有高容錯、低延時和可擴展的特色,爲了實現這一設計,咱們將 ETL 工做分爲兩部分:Streaming ETL 和 Batch ETL。網絡

Streaming ETL

這一部分我會介紹實時計算框架的選擇、數據正確性的保證、以及 Streaming 中一些通用的 ETL 邏輯,最後還會介紹 Spark Streaming 在實時 ETL 中的穩定性實踐。數據結構

計算框架選擇

在 2016 年年初,業界用的比較多的實時計算框架有 Storm 和 Spark Streaming。Storm 是純流式框架,Spark Streaming 用 Micro Batch 模擬流式計算,前者比後者更實時,後者比前者吞吐量大且生態系統更完善,考慮到知乎的日誌量以及初期對實時性的要求,咱們選擇了 Spark Streaming 做爲實時數據的處理框架。架構

數據正確性保證

Spark Streaming 的端到端 Exactly-once 須要下游支持冪等、上游支持流量重放,這裏咱們在 Spark Streaming 這一層作到了 At-least-once,正常狀況下數據不重很多,但在程序重啓時可能會重發部分數據,爲了實現全局的 Exactly-once,咱們在下游作了去重邏輯,關於如何去重後面我會講到。框架

通用 ETL 邏輯

ETL 邏輯和埋點的數據結構息息相關,咱們全部的埋點共用同一套 Proto Buffer Schema,大體以下所示。

message LogEntry {
 optional BaseInfo base = 1;
 optional DetailInfo detail = 2;
 optional ExtraInfo extra = 3;
}

BaseInfo:日誌中最基本的信息,包括用戶信息、客戶端信息、時間信息、網絡信息等日誌發送時的必要信息。DetailInfo:日誌中的視圖信息,包括當前視圖、上一個視圖等用於定位用戶所在位置的信息。ExtraInfo:日誌中與特定業務相關的額外信息。

針對上述三種信息咱們將 ETL 邏輯分爲通用和非通用兩類,通用邏輯和各個業務相關,主要應用於 Base 和 Detail 信息,非通用邏輯則是由需求方針對某次需求提出,主要應用於 Extra 信息。這裏咱們列舉 3 個通用邏輯進行介紹,這包括:動態配置 Streaming、UTM 參數解析、新老用戶識別。

動態配置 Streaming

因爲 Streaming 任務須要 7 * 24 小時運行,但有些業務邏輯,好比:存在一個元數據信息中心,當這個元數據發生變化時,須要將這種變化映射到數據流上方便下游使用數據,這種變化可能須要中止 Streaming 任務以更新業務邏輯,但元數據變化的頻率很是高,且在元數據變化後如何及時通知程序的維護者也很難。動態配置 Streaming 爲咱們提供了一個解決方案,該方案以下圖所示。

咱們能夠把常常變化的元數據做爲 Streaming Broadcast 變量,該變量扮演的角色相似於只讀緩存,同時針對該變量可設置 TTL,緩存過時後 Executor 節點會從新向 Driver 請求最新的變量。經過這種機制能夠很是天然的將元數據的變化映射到數據流上,無需重啓任務也無需通知程序的維護者。

UTM 參數解析

UTM 的全稱是 Urchin Tracking Module,是用於追蹤網站流量來源的利器,關於 UTM 背景知識介紹能夠參考網上其餘內容,這裏再也不贅述。下圖是咱們解析 UTM 信息的完整邏輯。

流量數據經過 UTM 參數解析後,咱們能夠很容易知足如下需求:

  • 查看各搜索引擎導流狀況以及這些流量來自於哪些熱門搜索詞。
  • 市場部某次活動帶來的流量大小,如:頁面瀏覽數、獨立訪問用戶數等。
  • 從站內分享出去的連接在各分享平臺(如:微信、微博)被瀏覽的狀況。

新老用戶識別

對於互聯網公司而言,增加是一個永恆的話題,實時拿到新增用戶量,對於增加運營十分重要。例如:一次投放 n 個渠道,若是能拿到每一個渠道的實時新增用戶數,就能夠快速判斷出那些渠道更有價值。咱們用下圖來表達 Streaming ETL 中是如何識別新老用戶的。

判斷一個用戶是否是新用戶,最簡單的辦法就是維護一個歷史用戶池,對每條日誌判斷該用戶是否存在於用戶池中。因爲日誌量巨大,爲了避免影響 Streaming 任務的處理速度,咱們設計了兩層緩存:Thread Local Cache 和 Redis Cache,同時用 HBase 作持久化存儲以保存歷史用戶。訪問速度:本地內存 > 遠端內存 > 遠端磁盤,對於咱們這個任務來講,只有 1% 左右的請求會打到 HBase,日誌高峯期 26w/s,徹底不會影響任務的實時性。固然本地緩存 LruCache 的容量大小和 Redis 的性能也是影響實時性的兩個因素。

Streaming ETL 除了上述幾個通用場景外,還有一些其餘邏輯,這些邏輯的存在有的是爲了知足下游更方便的使用數據的需求,有的是對某些錯誤埋點的修復,總之 Streaming ETL 在整個實時數倉中處於指標計算的上游,有着不可替代的做用。

Spark Streaming 在實時數倉 1.0 中的穩定性實踐

  • 1.Spark Streaming 消費 Kafka 數據推薦使用 Direct 模式。咱們早期使用的是 High Level 或者叫 Receiver 模式並使用了 checkpoint 功能,這種方式在更新程序邏輯時須要刪除 checkpoint 不然新的程序邏輯就沒法生效。另外,因爲使用了 checkpoint 功能,Streaming 任務會保持和 Hdfs 通訊,可能會由於 NameNode 的抖動致使 Streaming 任務抖動。所以,推薦使用 Direct 模式,關於這種模式和 Receiver 模式的詳細對比,能夠參考官方文檔。
  • 2.保證 Spark Streaming 任務的資源穩定。以 Yarn 爲例,運行 Streaming 任務的隊列可以分配到的最小資源小於了任務所須要的資源,任務會出現頻繁丟失 Executor 的狀況,這會致使 Streaming 任務變慢,由於丟失的 Executor 所對應的數據須要從新計算,同時還須要從新分配 Executor。
  • 3.Spark Streaming 消費 Kafka 時須要作數據流限速。默認狀況下 Spark Streaming 以儘量大的速度讀取消息隊列,當 Streaming 任務掛了好久以後再次被啓動時,因爲拉取的數據量過大可能會致使上游的 Kafka 集羣 IO 被打爆進而出現 Kafka 集羣長時間阻塞。可使用 Streaming Conf 參數作限速,限定每秒拉取的最大速度。
  • 4.Spark Streaming 任務失敗後須要自動拉起。長時間運行發現,Spark Streaming 並不能 7 * 24h 穩定運行,咱們用 Supervisor 管理 Driver 進程,當任務掛掉後 Driver 進程將不復存在,此時 Supervisor 將從新拉起 Streaming 任務。

Batch ETL

接下來要介紹的是 Lambda 架構的第二個部分:Batch ETL,此部分咱們須要解決數據落地、離線 ETL、數據批量導入 Druid 等問題。針對數據落地咱們自研了 map reduce 任務 Batch Loader,針對數據修復咱們自研了離線任務 Repair ETL,離線修復邏輯和實時邏輯共用一套 ETL Lib,針對批量導入 ProtoParquet 數據到 Druid,咱們擴展了 Druid 的導入插件。

Repair ETL

數據架構圖中有兩個 Kafka,第一個 Kafka 存放的是原始日誌,第二個 Kafka 存放的是實時 ETL 後的日誌,咱們將兩個 Kafka 的數據所有落地,這樣作的目的是爲了保證數據鏈路的穩定性。由於實時 ETL 中有大量的業務邏輯,未知需求的邏輯也許會給整個流量數據帶來安全隱患,而上游的 Log Collect Server 不存在任何業務邏輯只負責收發日誌,相比之下第一個 Kafka 的數據要安全和穩定的多。Repair ETL 並非常常啓用,只有當實時 ETL 丟失數據或者出現邏輯錯誤時,纔會啓用該程序用於修復日誌。

Batch Load 2 HDFS

前面已經介紹過,咱們全部的埋點共用同一套 Proto Buffer Schema,數據傳輸格式所有爲二進制。咱們自研了落地 Kafka PB 數據到 Hdfs 的 Map Reduce 任務 BatchLoader,該任務除了落地數據外,還負責對數據去重。在 Streaming ETL 階段咱們作到了 At-least-once,經過此處的 BatchLoader 去重咱們實現了全局 Exactly-once。BatchLoader 除了支持落地數據、對數據去重外,還支持多目錄分區(p_date/p_hour/p_plaform/p_logtype)、數據回放、自依賴管理(早期沒有統一的調度器)等。截止到目前,BatchLoader 落地了 40+ 的 Kakfa Topic 數據。

Batch Load 2 Druid

採用 Tranquility 實時導入 Druid,這種方式強制須要一個時間窗口,當上遊數據延遲超過窗值後會丟棄窗口以外的數據,這種狀況會致使實時報表出現指標錯誤。爲了修復這種錯誤,咱們經過 Druid 發起一個離線 Map Reduce 任務按期重導上一個時間段的數據。經過這裏的 Batch 導入和前面的實時導入,實現了實時數倉的 Lambda 架構。

實時數倉 1.0 的幾個不足之處

到目前爲止咱們已經介紹完 Lambda 架構實時數倉的幾個模塊,1.0 版本的實時數倉有如下幾個不足: 
全部的流量數據存放在同一個 Kafka Topic 中,若是下游每一個業務線都要消費,這會致使全量數據被消費屢次,Kafka 出流量過高沒法知足該需求。
全部的指標計算所有由 Druid 承擔,Druid 同時兼顧實時數據源和離線數據源的查詢,隨着數據量的暴漲 Druid 穩定性急劇降低,這致使各個業務的核心報表不能穩定產出。
因爲每一個業務使用同一個流量數據源配置報表,致使查詢效率低下,同時沒法對業務作數據隔離和成本計算。

實時數倉 2.0 版本

隨着數據量的暴漲,Druid 中的流量數據源常常查詢超時同時各業務消費實時數據的需求也開始增多,若是繼續沿用實時數倉 1.0 架構,須要付出大量的額外成本。因而,在實時數倉 1.0 的基礎上,咱們創建起了實時數倉 2.0,梳理出了新的架構設計並開始着手創建實時數倉體系,新的架構以下圖所示。

原始層

實時數倉 1.0 咱們只對流量數據作 ETL 處理,在 2.0 版本中咱們加入了對業務庫的變動日誌 Binlog 的處理,Binlog 日誌在原始層爲庫級別或者 Mysql 實例級別,即:一個庫或者實例的變動日誌存放在同一個 Kafka Topic 中。同時隨着公司業務的發展不斷有新 App 產生,在原始層不只採集「知乎」日誌,像知乎極速版以及內部孵化項目的埋點數據也須要採集,不一樣 App 的埋點數據仍然使用同一套 PB Schema。

明細層

明細層是咱們的 ETL 層,這一層數據是由原始層通過 Streaming ETL 後獲得。其中對 Binlog 日誌的處理主要是完成庫或者實例日誌到表日誌的拆分,對流量日誌主要是作一些通用 ETL 處理,因爲咱們使用的是同一套 PB 結構,對不一樣 App 數據處理的邏輯代碼能夠徹底複用,這大大下降了咱們的開發成本。

彙總層之明細彙總

明細彙總層是由明細層經過 ETL 獲得,主要以寬表形式存在。業務明細匯老是由業務事實明細表和維度表 Join 獲得,流量明細匯老是由流量日誌按業務線拆分和流量維度 Join 獲得。流量按業務拆分後能夠知足各業務實時消費的需求,咱們在流量拆分這一塊作到了自動化,下圖演示了流量數據自動切分的過程。

Streaming Proxy 是流量分發模塊,它消費上游 ETL 後的全量數據並按期讀取埋點元信息,經過將流量數據與元信息數據進行「Join」完成按業務進行流量拆分的邏輯,同時也會對切分後的流量按業務作 ETL 處理。只要埋點元信息中新增一個埋點,那麼這個埋點對應的數據就會自動切分到該業務的 Kafka 中,最終業務 Kafka 中的數據是獨屬於當前業務的且已經被通用 ETL 和業務 ETL 處理過,這大大下降了各個業務使用數據的成本。

彙總層之指標彙總

指標彙總層是由明細層或者明細彙總層經過聚合計算獲得,這一層產出了絕大部分的實時數倉指標,這也是與實時數倉 1.0 最大的區別。知乎是一個生產內容的平臺,對業務指標的彙總咱們能夠從內容角度和用戶角度進行彙總,從內容角度咱們能夠實時統計內容(內容能夠是答案、問題、文章、視頻、想法)的被點贊數、被關注數、被收藏數等指標,從用戶角度我能夠實時統計用戶的粉絲數、回答數、提問數等指標。對流量指標的彙總咱們分爲各業務指標彙總和全局指標彙總。對各業務指標彙總,咱們能夠實時統計首頁、搜索、視頻、想法等業務的卡片曝光數、卡片點擊數、CTR 等,對全局指標彙總咱們主要以實時會話爲主,實時統計一個會話內的 PV 數、卡片曝光數、點擊數、瀏覽深度、會話時長等指標。

指標彙總層的存儲選型

不一樣於明細層和明細彙總層,指標彙總層須要將實時計算好的指標存儲起來以供應用層使用。咱們根據不一樣的場景選用了 HBase 和 Redis 做爲實時指標的存儲引擎。Redis 的場景主要是知足帶 Update 操做且 OPS 較高的需求,例如:實時統計全站全部內容(問題、答案、文章等)的累計 PV 數,因爲瀏覽內容產生大量的 PV 日誌,可能高達幾萬或者幾十萬每秒,須要對每一條內容的 PV 進行實時累加,這種場景下選用 Redis 更爲合適。HBase 的場景主要是知足高頻 Append 操做、低頻隨機讀取且指標列較多的需求,例如:每分鐘統計一次全部內容的被點贊數、被關注數、被收藏數等指標,將每分鐘聚合後的結果行 Append 到 HBase 並不會帶來性能和存儲量的問題,但這種狀況下 Redis 在存儲量上可能會出現瓶頸。

指標計算打通指標系統和可視化系統

指標口徑管理依賴指標系統,指標可視化依賴可視化系統,咱們經過下圖的需求開發過程來說解如何將三者聯繫起來。

  • 1.需求方整理好需求文檔後向數倉工程師提出需求並約會議評審需求,需求文檔中必須包含指標的計算口徑和指標對應的維度。
  • 2.數倉工程師根據需求文檔對需求進行評審,評審不經過則返回需求方進一步整理需求並從新提需。
  • 3.在需求評審經過後,數倉工程師開始排期開發
  • 首先在可視化系統中建立一個數據源,這個數據源是後期配置實時報表的數據源,建立數據源也即在 HBase 中建立一張 HBase 表。
  • 針對該數據源建立指標列,建立指標列也即在 HBase 列族中建立列,建立指標列的同時會將該指標信息錄入指標管理系統。
  • 針對該數據源綁定維表,這個維表是後期配置多維報表時選用維度值要用的,若是要綁定的維表已經存在,則直接綁定,不然須要導入維表。
  • 一個完整的數據源建立後,數倉工程師才能開發實時應用程序,經過應用程序將多維指標實時寫入已建立的數據源中。
  • 4.需求方根據已建立的數據源直接配置實時報表。

應用層

應用層主要是使用匯總層數據以知足業務需求。應用層主要分三塊:1. 經過直接讀取指標彙總數據作實時可視化,知足固化的實時報表需求,這部分由實時大盤服務承擔;2. 推薦算法等業務直接消費明細彙總數據作實時推薦;3. 經過 Tranquility 程序實時攝入明細彙總數據到 Druid,知足實時多維即席分析需求。

實時數倉 2.0 中的技術實現

相比實時數倉 1.0 以 Spark Streaming 做爲主要實現技術,在實時數倉 2.0 中,咱們將 Flink 做爲指標彙總層的主要計算框架。Flink 相比 Spark Streaming 有更明顯的優點,主要體如今:低延遲、Exactly-once 語義支持、Streaming SQL 支持、狀態管理、豐富的時間類型和窗口計算、CEP 支持等。
咱們在實時數倉 2.0 中主要以 Flink 的 Streaming SQL 做爲實現方案。使用 Streaming SQL 有如下優勢:易於平臺化、開發效率高、維度成本低等。目前 Streaming SQL 使用起來也有一些缺陷:1. 語法和 Hive SQL 有必定區別,初使用時須要適應;2.UDF 不如 Hive 豐富,寫 UDF 的頻率高於 Hive。

實時數倉 2.0 取得的進展

  • 1.在明細彙總層經過流量切分知足了各個業務實時消費日誌的需求。目前完成流量切分的業務達到 14+,因爲各業務消費的是切分後的流量,Kafka 出流量降低了一個數量級。
  • 2.各業務核心實時報表能夠穩定產出。因爲核心報表的計算直接由數倉負責,可視化系統直接讀取實時結果,保證了實時報表的穩定性,目前多個業務擁有實時大盤,實時報表達 40+。
  • 3.提高了即席查詢的穩定性。核心報表的指標計算轉移到數倉,Druid 只負責即席查詢,多維分析類的需求獲得了知足。
  • 4.成本計算需求獲得瞭解決。因爲各業務擁有了獨立的數據源且各核心大盤由不一樣的實時程序負責,能夠方便的統計各業務使用的存儲資源和計算資源。

實時數倉將來展望

從實時數倉 1.0 到 2.0,無論是數據架構仍是技術方案,咱們在深度和廣度上都有了更多的積累。隨着公司業務的快速發展以及新技術的誕生,實時數倉也會不斷的迭代優化。短時間可預見的咱們會從如下方面進一步提高實時數倉的服務能力:

  • 1.Streaming SQL 平臺化。目前 Streaming SQL 任務是以代碼開發 maven 打包的方式提交任務,開發成本高,後期隨着 Streaming SQL 平臺的上線,實時數倉的開發方式也會由 Jar 包轉變爲 SQL 文件。
  • 2.實時數據元信息管理系統化。對數倉元信息的管理能夠大幅度下降使用數據的成本,離線數倉的元信息管理已經基本完善,實時數倉的元信息管理纔剛剛開始。
  • 3.實時數倉結果驗收自動化。對實時結果的驗收只能藉助與離線數據指標對比的方式,以 Hive 和 Kafka 數據源爲例,分別執行 Hive SQL 和 Flink SQL,統計結果並對比是否一致實現實時結果驗收的自動化。

做者簡介

數據工程團隊是知乎技術中臺的核心團隊之一,該團隊主要由數據平臺、基礎平臺、數據倉庫、AB Testing 四個子團隊的 31 位優秀工程師組成。



本文做者:Ververica

閱讀原文

本文爲雲棲社區原創內容,未經容許不得轉載。

相關文章
相關標籤/搜索