簡介: 數據湖的架構中,CDC 數據實時讀寫的方案和原理
本文由李勁鬆、胡爭分享,社區志願者楊偉海、李培殿整理。主要介紹在數據湖的架構中,CDC 數據實時讀寫的方案和原理。文章主要分爲 4 個部份內容:數據庫
咱們先看一下今天的 topic 須要設計的是什麼?輸入是一個 CDC 或者 upsert 的數據,輸出是 Database 或者是用於大數據 OLAP 分析的存儲。json
咱們常見的輸入主要有兩種數據,第一種數據是數據庫的 CDC 數據,不斷的產生 changeLog;另外一種場景是流計算產生的 upsert 數據,在最新的 Flink 1.12 版本已經支持了 upsert 數據。緩存
1.1 離線 HBase 集羣分析 CDC 數據架構
咱們一般想到的第一個方案,就是把 CDC upsert 的數據經過 Flink 進行一些處理以後,實時的寫到 HBase 當中。HBase 是一個在線的、能提供在線點查能力的一種數據庫,具備很是高的實時性,對寫入操做是很是友好的,也能夠支持一些小範圍的查詢,並且集羣可擴展。併發
這種方案其實跟普通的點查實時鏈路是同一套,那麼用 HBase 來作大數據的 OLAP 的查詢分析有什麼問題呢?app
首先,HBase 是一個面向點查設計的一種數據庫,是一種在線服務,它的行存的索引不適合分析任務。典型的數倉設計確定是要列存的,這樣壓縮效率和查詢效率纔會高。第二,HBase 的集羣維護成本比較高。最後,HBase 的數據是 HFile,不方便與大數據裏數倉當中典型的 Parquet、Avro、Orc 等結合。高併發
1.2 Apache Kudu 維護 CDC 數據集佈局
針對 HBase 分析能力比較弱的狀況,社區前幾年出現了一個新的項目,這就是 Apache Kudu 項目。Kudu 項目擁有 HBase 的點查能力的同時,採用列存,這樣列存加速很是適合 OLAP 分析。性能
這種方案會有什麼問題呢?測試
首先 Kudu 是比較小衆的、獨立的集羣,維護成本也比較高,跟 HDFS、S三、OSS 比較割裂。其次因爲 Kudu 在設計上保留了點查能力,因此它的批量掃描性能不如 parquet,另外 Kudu 對於 delete 的支持也比較弱,最後它也不支持增量拉取。
1.3 直接導入 CDC 到 Hive 分析
第三種方案,也是你們在數倉中比較經常使用的方案,就是把 MySQL 的數據寫到 Hive,流程是:維護一個全量的分區,而後天天作一個增量的分區,最後把增量分區寫好以後進行一次 Merge ,寫入一個新的分區,流程上這樣是走得通的。Hive 以前的全量分區是不受增量的影響的,只有當增量 Merge 成功以後,分區纔可查,纔是一個全新的數據。這種純列存的 append 的數據對於分析是很是友好的。
這種方案會有什麼問題呢?
增量數據和全量數據的 Merge 是有延時的,數據不是實時寫入的,典型的是一天進行一次 Merge,這就是 T+1 的數據了。因此,時效性不好,不支持實時 upsert。每次 Merge 都須要把全部數據所有重讀重寫一遍,效率比較差、比較浪費資源。
1.4 Spark + Delta 分析 CDC 數據
針對這個問題,Spark + Delta 在分析 CDC 數據的時候提供了 MERGE INTO 的語法。這並不只僅是對 Hive 數倉的語法簡化,Spark + Delta 做爲新型數據湖的架構(例如 Iceberg、Hudi),它對數據的管理不是分區,而是文件,所以 Delta 優化 MERGE INTO 語法,僅掃描和重寫發生變化的文件便可,所以高效不少。
咱們評估一下這個方案,他的優勢是僅依賴 Spark + Delta 架構簡潔、沒有在線服務、列存,分析速度很是快。優化以後的 MERGE INTO 語法速度也夠快。
這個方案,業務上是一個 Copy On Write 的一個方案,它只須要 copy 少許的文件,可讓延遲作的相對低。理論上,在更新的數據跟現有的存量沒有很大重疊的話,能夠把天級別的延遲作到小時級別的延遲,性能也是能夠跟得上的。
這個方案在 Hive 倉庫處理 upsert 數據的路上已經前進了一小步了。但小時級別的延遲畢竟不如實時更有效,所以這個方案最大的缺點在 Copy On Write 的 Merge 有必定的開銷,延遲不能作的過低。
第一部分大概現有的方案就是這麼多,同時還須要再強調一下,upsert 之因此如此重要,是由於在數據湖的方案中,upsert 是實現數據庫準實時、實時入湖的一個關鍵技術點。
2.1 Flink 對 CDC 數據消費的支持
第一,Flink 原生支持 CDC 數據消費。在前文 Spark + Delta 的方案中,MARGE INTO 的語法,用戶須要感知 CDC 的屬性概念,而後寫到 merge 的語法上來。可是 Flink 是原生支持 CDC 數據的。用戶只要聲明一個 Debezium 或者其餘 CDC 的 format,Flink 上面的 SQL 是不須要感知任何 CDC 或者 upsert 的屬性的。Flink 中內置了 hidden column 來標識它 CDC 的類型數據,因此對用戶而言比較簡潔。
以下圖示例,在 CDC 的處理當中,Flink 在只用聲明一個 MySQL Binlog 的 DDL 語句,後面的 select 都不用感知 CDC 屬性。
2.2 Flink 對 Change Log Stream 的支持
下圖介紹的是 Flink 原生支持 Change Log Stream,Flink 在接入一個 Change Log Stream 以後,拓撲是不用關心 Change Log flag 的 SQL。拓撲徹底是按照本身業務邏輯來定義,而且一直到最後寫入 Iceberg,中間不用感知 Change Log 的 flag。
2.3 Flink + Iceberg CDC 導入方案評估
最後,Flink + Iceberg 的 CDC 導入方案的優勢是什麼?
對比以前的方案,Copy On Write 跟 Merge On Read 都有適用的場景,側重點不一樣。Copy On Write 在更新部分文件的場景中,當只須要重寫其中的一部分文件時是很高效的,產生的數據是純 append 的全量數據集,在用於數據分析的時候也是最快的,這是 Copy On Write 的優點。
另一個是 Merge On Read,即將數據連同 CDC flag 直接 append 到 Iceberg 當中,在 merge 的時候,把這些增量的數據按照必定的組織格式、必定高效的計算方式與全量的上一次數據進行一次 merge。這樣的好處是支持近實時的導入和實時數據讀取;這套計算方案的 Flink SQL 原生支持 CDC 的攝入,不須要額外的業務字段設計。
Iceberg 是統一的數據湖存儲,支持多樣化的計算模型,也支持各類引擎(包括 Spark、Presto、hive)來進行分析;產生的 file 都是純列存的,對於後面的分析是很是快的;Iceberg 做爲數據湖基於 snapshot 的設計,支持增量讀取;Iceberg 架構足夠簡潔,沒有在線服務節點,純 table format 的,這給了上游平臺方足夠的能力來定製本身的邏輯和服務化。
3.1 批量更新場景和 CDC 寫入場景
首先咱們來了解一下在整個數據湖裏面批量更新的兩個場景。
另外是 CDC 寫入的場景,對於對 Flink 來講,通常經常使用的有兩種場景,第一種場景是上游的 Binlog 可以很快速的寫到 data lake 中,而後供不一樣的分析引擎作分析使用; 第二種場景是使用 Flink 作一些聚合操做,輸出的流是 upsert 類型的數據流,也須要可以實時的寫到數據湖或者是下游系統中去作分析。以下圖示例中 CDC 寫入場景中的 SQL 語句,咱們使用單條 SQL 更新一行數據,這種計算模式是一種流式增量的導入,並且屬於高頻的更新。
3.2 Apache Iceberg 設計 CDC 寫入方案須要考慮的問題
接下來咱們看下 iceberg 對於 CDC 寫入這種場景在方案設計時須要考慮哪些問題。
3.3 Apache Iceberg Basic
在介紹具體的方案細節以前,咱們先了解一下 Iceberg 在文件系統中的佈局,整體來說 Iceberg 分爲兩部分數據,第一部分是數據文件,以下圖中的 parquet 文件,每一個數據文件對應一個校驗文件(.crc文件)。第二部分是表元數據文件(Metadata 文件),包含 Snapshot 文件(snap-_.avro)、Manifest 文件(_.avro)、TableMetadata 文件(*.json)等。
下圖展現了在 iceberg 中 snapshot、manifest 及 partition 中的文件的對應關係。下圖中包含了三個 partition,第一個 partition 中有兩個文件 f一、f3,第二個 partition 有兩個文件f四、f5,第三個 partition 有一個文件f2。對於每一次寫入都會生成一個 manifest 文件,該文件記錄本次寫入的文件與 partition 的對應關係。再向上層有 snapshot 的概念,snapshot 可以幫助快速訪問到整張表的全量數據,snapshot 記錄多個 manifest,如第二個 snapshot 包含 manifest2 和 manifest3。
3.4 INSERT、UPDATE、DELETE 寫入
在瞭解了基本的概念,下面介紹 iceberg 中 insert、update、delete 操做的設計。
下圖示例的 SQL 中展現的表包含兩個字段即 id、data,兩個字段都是 int 類型。在一個 transaction 中咱們進行了圖示中的數據流操做,首先插入了(1,2)一條記錄,接下來將這條記錄更新爲(1,3),在 iceberg 中 update 操做將會拆爲 delete 和 insert 兩個操做。
這麼作的緣由是考慮到 iceberg 做爲流批統一的存儲層,將 update 操做拆解爲 delete 和 insert 操做能夠保證流批場景作更新時讀取路徑的統一,如在批量刪除的場景下以 Hive 爲例,Hive 會將待刪除的行的文件 offset 寫入到 delta 文件中,而後作一次 merge on read,由於這樣會比較快,在 merge 時經過 position 將原文件和 delta 進行映射,將會很快獲得全部未刪除的記錄。
接下來又插入記錄(3,5),刪除了記錄(1,3),插入記錄(2,5),最終查詢是咱們獲得記錄(3,5)(2,5)。
上面操做看上去很是簡單,但在實現中是存在一些語義上的問題。以下圖中,在一個 transaction 中首先執行插入記錄(1,2)的操做,該操做會在 data file1 文件中寫入 INSERT(1,2),而後執行刪除記錄(1,2)操做,該操做會在 equalify delete file1 中寫入 DELETE(1,2),接着又執行插入記錄(1,2)操做,該操做會在 data file1 文件中再寫入INSERT(1,2),而後執行查詢操做。
在正常狀況下查詢結果應該返回記錄 INSERT(1,2),但在實現中,DELETE(1,2)操做沒法得知刪除的是 data file1 文件中的哪一行,所以兩行 INSERT(1,2)記錄都將被刪除。
那麼如何來解決這個問題呢,社區當前的方式是採用了 Mixed position-delete and equality-delete。Equality-delete 即經過指定一列或多列來進行刪除操做,position-delete 是根據文件路徑和行號來進行刪除操做,經過將這兩種方法結合起來以保證刪除操做的正確性。
以下圖咱們在第一個 transaction 中插入了三行記錄,即 INSERT(1,2)、INSERT(1,3)、INSERT(1,4),而後執行 commit 操做進行提交。接下來咱們開啓一個新的 transaction 並執行插入一行數據(1,5),因爲是新的 transaction,所以新建了一個 data file2 並寫入 INSERT(1,5)記錄,接下來執行刪除記錄(1,5),實際寫入 delete 時是:
在 position delete file1 文件寫入(file2, 0),表示刪除 data file2 中第 0 行的記錄,這是爲了解決同一個 transaction 內同一行數據反覆插入刪除的語義的問題。
在 equality delete file1 文件中寫入 DELETE (1,5),之因此寫入這個 delete 是爲了確保本次 txn 以前寫入的 (1,5) 能被正確刪除。
而後執行刪除(1,4)操做,因爲(1,4)在當前 transaction 中不曾插入過,所以該操做會使用 equality-delete 操做,即在 equality delete file1 中寫入(1,4)記錄。在上述流程中能夠看出在當前方案中存在 data file、position delete file、equality delete file 三類文件。
在瞭解了寫入流程後,如何來讀取呢。以下圖所示,對於 position delete file 中的記錄(file2, 0)只需和當前 transaction 的 data file 進行 join 操做,對於 equality delete file 記錄(1,4)和以前的 transaction 中的 data file 進行 join 操做。最終獲得記錄 INSERT(1,3)、INSERT(1,2)保證了流程的正確性。
3.5 Manifest 文件的設計
上面介紹了 insert、update 及 delete,但在設計 task 的執行計劃時咱們對 manifest 進行了一些設計,目的是經過 manifest 可以快速到找到 data file,並按照數據大小進行分割,保證每一個 task 處理的數據儘量的均勻分佈。
以下圖示例,包含四個 transaction,前兩個 transaction 是 INSERT 操做,對應 M一、M2,第三個 transaction 是 DELETE 操做,對應 M3,第四個 transaction 是 UPDATE 操做,包含兩個 manifest 文件即 data manifest 和 delete manifest。
對於爲何要對 manifest 文件拆分爲 data manifest 和 delete manifest 呢,本質上是爲了快速爲每一個 data file 找到對應的 delete file 列表。能夠看下圖示例,當咱們在 partition-2 作讀取時,須要將 deletefile-4 與datafile-二、datafile-3 作一個 join 操做,一樣也須要將 deletefile-5 與 datafile-二、datafile-3 作一個 join 操做。
以 datafile-3 爲例,deletefile 列表包含 deletefile-4 和 deletefile-5 兩個文件,如何快速找到對應的 deletefIle 列表呢,咱們能夠根據上層的 manifest 來進行查詢,當咱們將 manifest 文件拆分爲 data manifest 和 delete manifest 後,能夠將 M2(data manifest)與 M三、M4(delete manifest)先進行一次 join 操做,這樣即可以快速的獲得 data file 所對應的 delete file 列表。
3.6 文件級別的併發
另外一個問題是咱們須要保證足夠高的併發讀取,在 iceberg 中這點作得很是出色。在 iceberg 中能夠作到文件級別的併發讀取,甚至文件中更細粒度的分段的併發讀取,好比文件有 256MB,能夠分爲兩個 128MB 進行併發讀取。這裏舉例說明,假設 insert 文件跟 delete 文件在兩個 Bucket 中的佈局方式以下圖所示。
咱們經過 manifest 對比發現,datafile-2 的 delete file 列表只有 deletefile-4,這樣能夠將這兩個文件做爲一個單獨的 task(圖示中Task-2)進行執行,其餘的文件也是相似,這樣能夠保證每一個 task 數據較爲均衡的進行 merge 操做。
對於這個方案咱們作了簡單的總結,以下圖所示。首先這個方案的優勢能夠知足正確性,而且能夠實現高吞吐寫入和併發高效的讀取,另外能夠實現 snapshot 級別的增量的拉取。
當前該方案仍是比較粗糙,下面也有一些能夠優化的點。
3.7 增量文件集的 Transaction 提交
前面介紹了文件的寫入,下圖咱們介紹如何按照 iceberg 的語義進行寫入而且供用戶讀取。主要分爲數據和 metastore 兩部分,首先會有 IcebergStreamWriter 進行數據的寫入,但此時寫入數據的元數據信息並無寫入到 metastore,所以對外不可見。第二個算子是 IcebergFileCommitter,該算子會將數據文件進行收集, 最終經過 commit transaction 來完成寫入。
在 Iceberg 中並無其餘任何其餘第三方服務的依賴,而 Hudi 在某些方面作了一些 service 的抽象,如將 metastore 抽象爲獨立的 Timeline,這可能會依賴一些獨立的索引甚至是其餘的外部服務來完成。
下面是咱們將來的一些規劃,首先是 Iceberg 內核的一些優化,包括方案中涉及到的全鏈路穩定性測試及性能的優化, 並提供一些 CDC 增量拉取的相關 Table API 接口。
在 Flink 集成上,會實現 CDC 數據的自動和手動合併數據文件的能力,並提供 Flink 增量拉取 CDC 數據的能力。
在其餘生態集成上,咱們會對 Spark、Presto 等引擎進行集成,並藉助 Alluxio 加速數據查詢。
做者:阿里雲實時計算Flink
原文連接本文爲阿里雲原創內容,未經容許不得轉載