本文整理自 2019 年 4 月 13 日在深圳舉行的 Flink Meetup 會議,分享嘉賓張俊,目前擔任 OPPO 大數據平臺研發負責人,也是 Apache Flink contributor。本文主要內容以下:sql
一.OPPO 實時數倉的演進思路數據庫
1.1.OPPO 業務與數據規模編程
你們都知道 OPPO 是作智能手機的,但並不知道 OPPO 與互聯網以及大數據有什麼關係,下圖概要介紹了 OPPO 的業務與數據狀況:json
OPPO 做爲手機廠商,基於 Android 定製了本身的 ColorOS 系統,當前日活躍用戶超過 2 億。圍繞 ColorOS,OPPO 構建了不少互聯網應用,好比應用商店、瀏覽器、信息流等。在運營這些互聯網應用的過程當中,OPPO 積累了大量的數據,上圖右邊是總體數據規模的演進:從 2012 年開始每一年都是 2~3 倍的增加速度,截至目前總數據量已經超過 100PB,日增數據量超過 200TB。
要支撐這麼大的一個數據量,OPPO 研發出一整套的數據系統與服務,並逐漸造成了本身的數據中臺體系。瀏覽器
1.2.OPPO 數據中臺緩存
今年你們都在談數據中臺,OPPO 是如何理解數據中臺的呢?咱們把它分紅了 4 個層次:架構
以上就是 OPPO 數據中臺的整個體系,而數據倉庫在其中處於很是基礎與核心的位置。app
1.3. 構建 OPPO 離線數倉框架
過往 二、3 年,咱們的重點聚焦在離線數倉的構建。上圖大體描述了整個構建過程:首先,數據來源基本是手機、日誌文件以及 DB 數據庫,咱們基於 Apache NiFi 打造了高可用、高吞吐的接入系統,將數據統一落入 HDFS,造成原始層;緊接着,基於 Hive 的小時級 ETL 與天級彙總 Hive 任務,分別負責計算生成明細層與彙總層;最後,應用層是基於 OPPO 內部研發的數據產品,主要是報表分析、用戶畫像以及接口服務。此外,中間的明細層還支持基於 Presto 的即席查詢與自助提數。
伴隨着離線數倉的逐步完善,業務對實時數倉的訴求也愈發強烈。編輯器
1.4. 數倉實時化的訴求
對於數倉實時化的訴求,你們一般都是從業務視角來看,但其實站在平臺的角度,實時化也能帶來切實的好處。首先,從業務側來看,報表、標籤、接口等都會有實時的應用場景,分別參見上圖左邊的幾個案例;其次,對平臺側來講,咱們能夠從三個案例來看:第一,OPPO 大量的批量任務都是從 0 點開始啓動,都是經過 T+1 的方式去作數據處理,這會致使計算負載集中爆發,對集羣的壓力很大;第二,標籤導入也屬於一種 T+1 批量任務,每次全量導入都會耗費很長的時間;第三,數據質量的監控也必須是 T+1 的,致使沒辦法及時發現數據的一些問題。
既然業務側和平臺側都有實時化的這個訴求,那 OPPO 是如何來構建本身的實時數倉呢?
1.5. 離線到實時的平滑遷移
不管是一個平臺仍是一個系統,都離不開上下兩個層次的構成:上層是 API,是面向用戶的編程抽象與接口;下層是 Runtime,是面向內核的執行引擎。咱們但願從離線到實時的遷移是平滑的,是什麼意思呢?從 API 這層來看,數倉的抽象是 Table、編程接口是 SQL+UDF,離線數倉時代用戶已經習慣了這樣的 API,遷移到實時數倉後最好也能保持一致。而從 Runtime 這層來看,計算引擎從 Hive 演進到了 Flink,存儲引擎從 HDFS 演進到了 Kafka。
基於以上的思路,只須要把以前提到的離線數倉 pipeline 改造下,就獲得了實時數倉 pipeline。
1.6. 構建 OPPO 實時數倉
從上圖能夠看到,整個 pipeline 與離線數倉基本類似,只是把 Hive 替換爲 Flink,把 HDFS 替換爲 Kafka。從整體流程來看,基本模型是不變的,仍是由原始層、明細層、彙總層、應用層的級聯計算來構成。
所以,這裏的核心問題是如何基於 Flink 構建出這個 pipeline,下面就介紹下咱們基於 Flink SQL 所作的一些工做。
二. 基於 Flink SQL 的擴展工做
2.1.Why Flink SQL
首先,爲何要用 Flink SQL? 下圖展現了 Flink 框架的基本結構,最下面是 Runtime,這個執行引擎咱們認爲最核心的優點是四個:第一,低延遲,高吞吐;第二,端到端的 Exactly-once;第三,可容錯的狀態管理;第四,Window & Event time 的支持。基於 Runtime 抽象出 3 個層次的 API,SQL 處於最上層。
Flink SQL API 有哪些優點呢?咱們也從四個方面去看:第一,支持 ANSI SQL 的標準;第二,支持豐富的數據類型與內置函數,包括常見的算術運算與統計聚合;第三,可自定義 Source/Sink,基於此能夠靈活地擴展上下游;第四,批流統一,一樣的 SQL,既能夠跑離線也能夠跑實時。
那麼,基於 Flink SQL API 如何編程呢?下面是一個簡單的演示:
首先是定義與註冊輸入 / 輸出表,這裏建立了 2 張 Kakfa 的表,指定 kafka 版本是什麼、對應哪一個 topic;接下來是註冊 UDF,篇幅緣由這裏沒有列出 UDF 的定義;最後是纔是執行真正的 SQL。能夠看到,爲了執行 SQL,須要作這麼多的編碼工做,這並非咱們但願暴露給用戶的接口。
2.2. 基於 WEB 的開發 IDE
前面提到過,數倉的抽象是 Table,編程接口是 SQL+UDF。對於用戶來講,平臺提供的編程界面應該是相似上圖的那種,有用過 HUE 作交互查詢的應該很熟悉。左邊的菜單是 Table 列表,右邊是 SQL 編輯器,能夠在上面直接寫 SQL,而後提交執行。要實現這樣一種交互方式,Flink SQL 默認是沒法實現的,中間存在 gap,總結下來就 2 點:第一,元數據的管理,怎麼去建立庫表,怎麼去上傳 UDF,使得以後在 SQL 中可直接引用;第二,SQL 做業的管理,怎麼去編譯 SQL,怎麼去提交做業。
在技術調研過程當中,咱們發現了 Uber 在 2017 年開源的 AthenaX 框架。
2.3.AthenaX:基於 REST 的 SQL 管理器
AthenaX 能夠看做是一個基於 REST 的 SQL 管理器,它是怎麼實現 SQL 做業與元數據管理的呢?
對於 SQL 做業提交,AthenaX 中有一個 Job 的抽象,封裝了要執行的 SQL 以及做業資源等信息。全部的 Job 由一個 JobStore 來託管,它按期跟 YARN 當中處於 Running 狀態的 App 作一個匹配。若是不一致,就會向 YARN 提交對應的 Job。
對於元數據管理,核心的問題是如何將外部建立的庫表注入 Flink,使得 SQL 中能夠識別到。實際上,Flink 自己就預留了與外部元數據對接的能力,分別提供了 ExternalCatalog 和 ExternalCatalogTable 這兩個抽象。AthenaX 在此基礎上再封裝出一個 TableCatalog,在接口層面作了必定的擴展。在提交 SQL 做業的階段,AthenaX 會自動將 TableCatalog 註冊到 Flink,再調用 Flink SQL 的接口將 SQL 編譯爲 Flink 的可執行單元 JobGraph,並最終提交到 YARN 生成新的 App。
AthenaX 雖然定義好了 TableCatalog 接口,但並無提供可直接使用的實現。那麼,咱們怎麼來實現,以便對接到咱們已有的元數據系統呢?
2.4.Flink SQL 註冊庫表的過程
首先,咱們得搞清楚 Flink SQL 內部是如何註冊庫表的。整個過程涉及到三個基本的抽象:TableDescriptor、TableFactory 以及 TableEnvironment。
TableDescriptor 顧名思義,是對錶的描述,它由三個子描述符構成:第一是 Connector,描述數據的來源,好比 Kafka、ES 等;第二是 Format,描述數據的格式,好比 csv、json、avro 等;第三是 Schema,描述每一個字段的名稱與類型。TableDescriptor 有兩個基本的實現——ConnectTableDescriptor 用於描述內部表,也就是編程方式建立的表;ExternalCatalogTable 用於描述外部表。
有了 TableDescriptor,接下來須要 TableFactory 根據描述信息來實例化 Table。不一樣的描述信息須要不一樣的 TableFactory 來處理,Flink 如何找到匹配的 TableFactory 實現呢?實際上,爲了保證框架的可擴展性,Flink 採用了 Java SPI 機制來加載全部聲明過的 TableFactory,經過遍歷的方式去尋找哪一個 TableFactory 是匹配該 TableDescriptor 的。TableDescriptor 在傳遞給 TableFactory 前,被轉換成一個 map,全部的描述信息都用 key-value 形式來表達。TableFactory 定義了兩個用於過濾匹配的方法——一個是 requiredContext(),用於檢測某些特定 key 的 value 是否匹配,好比 connector.type 是否爲 kakfa;另外一個是 supportedProperties(),用於檢測 key 是否能識別,若是出現不識別的 key,說明沒法匹配。
匹配到了正確的 TableFactory,接下來就是建立真正的 Table,而後將其經過 TableEnvironment 註冊。最終註冊成功的 Table,才能在 SQL 中引用。
2.5.Flink SQL 對接外部數據源
搞清楚了 Flink SQL 註冊庫表的過程,給咱們帶來這樣一個思路:若是外部元數據建立的表也能被轉換成 TableFactory 可識別的 map,那麼就能被無縫地註冊到 TableEnvironment。基於這個思路,咱們實現了 Flink SQL 與已有元數據中心的對接,大體過程參見下圖:
經過元數據中心建立的表,都會將元數據信息存儲到 MySQL,咱們用一張表來記錄 Table 的基本信息,而後另外三張表分別記錄 Connector、Format、Schema 轉換成 key-value 後的描述信息。之因此拆開成三張表,是爲了可以能獨立的更新這三種描述信息。接下來是定製實現的 ExternalCatalog,可以讀取 MySQL 這四張表,並轉換成 map 結構。
2.6. 實時表 - 維表關聯
到目前爲止,咱們的平臺已經具有了元數據管理與 SQL 做業管理的能力,可是要真正開放給用戶使用,還有一點基本特性存在缺失。經過咱們去構建數倉,星型模型是沒法避免的。這裏有一個比較簡單的案例:中間的事實表記錄了廣告點擊流,周邊是關於用戶、廣告、產品、渠道的維度表。
假定咱們有一個 SQL 分析,須要將點擊流表與用戶維表進行關聯,這個目前在 Flink SQL 中應該怎麼來實現?咱們有兩種實現方式,一個基於 UDF,一個基於 SQL 轉換,下面分別展開來說一下。
2.7. 基於 UDF 的維表關聯
首先是基於 UDF 的實現,須要用戶將原始 SQL 改寫爲帶 UDF 調用的 SQL,這裏是 userDimFunc,上圖右邊是它的代碼實現。UserDimFunc 繼承了 Flink SQL 抽象的 TableFunction,它是其中一種 UDF 類型,能夠將任意一行數據轉換成一行或多行數據。爲了實現維表關聯,在 UDF 初始化時須要從 MySQL 全量加載維表的數據,緩存在內存 cache 中。後續對每行數據的處理,TableFunction 會調用 eval() 方法,在 eval() 中根據 user_id 去查找 cache,從而實現關聯。固然,這裏是假定維表數據比較小,若是數據量很大,不適合全量的加載與緩存,這裏不作展開了。
基於 UDF 的實現,對用戶和平臺來講都不太友好:用戶須要寫奇怪的 SQL 語句,好比圖中的 LATERAL TABLE;平臺須要爲每一個關聯場景定製特定的 UDF,維護成本過高。有沒有更好的方式呢?下面咱們來看看基於 SQL 轉換的實現。
2.8. 基於 SQL 轉換的維表關聯
咱們但願解決基於 UDF 實現所帶來的問題,用戶不須要改寫原始 SQL,平臺不須要開發不少 UDF。有一種思路是,是否能夠在 SQL 交給 Flink 編譯以前,加一層 SQL 的解析與改寫,自動實現維表的關聯?通過必定的技術調研與 POC,咱們發現是行得通的,因此稱之爲基於 SQL 轉換的實現。下面將該思路展開解釋下。
首先,增長的 SQL 解析是爲了識別 SQL 中是否存在預先定義的維度表,好比上圖中的 user_dim。一旦識別到維表,將觸發 SQL 改寫的流程,將紅框標註的 join 語句改寫成新的 Table,這個 Table 怎麼獲得呢?咱們知道,流計算領域近年來發展出「流表二象性」的理念,Flink 也是該理念的踐行者。這意味着,在 Flink 中 Stream 與 Table 之間是能夠相互轉換的。咱們把 ad_clicks 對應的 Table 轉換成 Stream,再調用 flatmap 造成另外一個 Stream,最後再轉換回 Table,就獲得了 ad_clicks_user。最後的問題是,flatmap 是如何實現維表關聯的?
Flink 中對於 Stream 的 flatmap 操做,其實是執行一個 RichFlatmapFunciton,每來一行數據就調用其 flatmap() 方法作轉換。那麼,咱們能夠定製一個 RichFlatmapFunction,來實現維表數據的加載、緩存、查找以及關聯,功能與基於 UDF 的 TableFunction 實現相似。
既然 RichFlatmapFunciton 的實現邏輯與 TableFunction 類似,那爲何相比基於 UDF 的方式,這種實現能更加通用呢?核心的點在於多了一層 SQL 解析,能夠將維表的信息獲取出來(好比維表名、關聯字段、select 字段等),再封裝成 JoinContext 傳遞給 RichFlatmapFunciton,使得的表達能力就具有通用性了。
二.構建實時數倉的應用案例
下面分享幾個典型的應用案例,都是在咱們的平臺上用 Flink SQL 來實現的。
3.1. 實時 ETL 拆分
這裏是一個典型的實時 ETL 鏈路,從大表中拆分出各業務對應的小表:
OPPO 的最大數據來源是手機端埋點,從手機 APP 過來的數據有一個特色,全部的數據是經過統一的幾個通道上報過來。由於不可能每一次業務有新的埋點,都要去升級客戶端,去增長新的通道。好比咱們有個 sdk_log 通道,全部 APP 應用的埋點都往這個通道上報數據,致使這個通道對應的原始層表巨大,一天幾十個 TB。但實際上,每一個業務只關心它自身的那部分數據,這就要求咱們在原始層進行 ETL 拆分。
這個 SQL 邏輯比較簡單,無非是根據某些業務字段作篩選,插入到不一樣的業務表中去。它的特色是,多行 SQL 最終合併成一個 SQL 提交給 Flink 執行。你們擔憂的是,包含了 4 個 SQL,會不會對同一份數據重複讀取 4 次?其實,在 Flink 編譯 SQL 的階段是會作一些優化的,由於最終指向的是同一個 kafka topic,因此只會讀取 1 次數據。
另外,一樣的 Flink SQL,咱們同時用於離線與實時數倉的 ETL 拆分,分別落入 HDFS 與 Kafka。Flink 中自己支持寫入 HDFS 的 Sink,好比 RollingFileSink。
3.2. 實時指標統計
這裏是一個典型的計算信息流 CTR 的這個案例,分別計算必定時間段內的曝光與點擊次數,相除獲得點擊率導入 Mysql,而後經過咱們內部的報表系統來可視化。這個 SQL 的特色是它用到了窗口 (Tumbling Window) 以及子查詢。
3.3. 實時標籤導入
這裏是一個實時標籤導入的案例,手機端實時感知到當前用戶的經緯度,轉換成具體 POI 後導入 ES,最終在標籤系統上作用戶定向。
這個 SQL 的特色是用了 AggregateFunction,在 5 分鐘的窗口內,咱們只關心用戶最新一次上報的經緯度。AggregateFunction 是一種 UDF 類型,一般是用於聚合指標的統計,好比計算 sum 或者 average。在這個示例中,因爲咱們只關心最新的經緯度,因此每次都替換老的數據便可。
四. 將來工做的思考和展望
最後,給你們分享一下關於將來工做,咱們的一些思考與規劃,還不是太成熟,拋出來和你們探討一下。
4.1. 端到端的實時流處理
什麼是端到端?一端是採集到的原始數據,另外一端是報表 / 標籤 / 接口這些對數據的呈現與應用,鏈接兩端的是中間實時流。當前咱們基於 SQL 的實時流處理,源表是 Kafka,目標表也是 Kafka,統一通過 Kafka 後再導入到 Druid/ES/HBase。這樣設計的目的是提升總體流程的穩定性與可用性:首先,kafka 做爲下游系統的緩衝,能夠避免下游系統的異常影響實時流的計算(一個系統保持穩定,比起多個系統同時穩定,機率上更高點);其次,kafka 到 kafka 的實時流,exactly-once 語義是比較成熟的,一致性上有保證。
而後,上述的端到端實際上是由割裂的三個步驟來完成的,每一步可能須要由不一樣角色人去負責處理:數據處理須要數據開發人員,數據導入須要引擎開發人員,數據資產化須要產品開發人員。
咱們的平臺可否把端到端給自動化起來,只須要一次 SQL 提交就能打通處理、導入、資產化這三步?在這個思路下,數據開發中看到的再也不是 Kafka Table,而應該是面向場景的展現表 / 標籤表 / 接口表。好比對於展現表,建立表的時候只要指定維度、指標等字段,平臺會將實時流結果數據從 Kafka 自動導入 Druid,再在報表系統自動導入 Druid 數據源,甚至自動生成報表模板。
4.2. 實時流的血緣分析
關於血緣分析,作過離線數倉的朋友都很清楚它的重要性,它在數據治理中都起着不可或缺的關鍵做用。對於實時數倉來講也莫不如此。咱們但願構建端到端的血緣關係,從採集系統的接入通道開始,到中間流經的實時表與實時做業,再到消費數據的產品,都能很清晰地展示出來。基於血緣關係的分析,咱們才能評估數據的應用價值,覈算數據的計算成本。
4.3. 離線 - 實時數倉一體化
最後提一個方向是離線實時數倉的一體化。咱們認爲短時間內,實時數倉沒法替代離線數倉,二者並存是新常態。在離線數倉時代,咱們積累的工具體系,如何去適配實時數倉,如何實現離線與實時數倉的一體化管理?理論上來說,它們的數據來源是一致的,上層抽象也都是 Table 與 SQL,但本質上也有不一樣的點,好比時間粒度以及計算模式。對於數據工具與產品來講,須要作哪些改造來實現徹底的一體化,這也是咱們在探索和思考的。
原文連接 本文爲雲棲社區原創內容,未經容許不得轉載。