1. 引入
在類Hadoop系統上支持ACID有了更大的吸引力,其中Databricks的Delta Lake和Uber開源的Hudi也成爲了主要貢獻者和競爭對手。二者都經過在「parquet」文件格式中提供不一樣的抽象以解決主要問題;很難選擇一個比另外一個更好。此博客將使用一個很是基本的示例來了解這些工具的工做原理,並讓讀者來比較二者的優缺點。html
咱們將使用與本系列下一篇文章中相反的方法,後面咱們將討論Hadoop上Data Lake的重要性,以及爲何會出現對諸如Delta/Hudi之類的系統的需求,以及數據工程師在過去如何爲Lakes孤立地構建易錯的ACID系統。web
2. 初始化
2.1 環境
源數據庫:AWS RDS MySQL
CDC工具:AWS DMS
Hudi:AWS EMR 5.29.0
Delta:Databricks運行時6.1
對象/文件存儲:AWS S3sql
上面的工具集主要用於演示;也可使用如下工具替代shell
源數據庫:任何傳統/基於雲的RDBMS
CDC工具:Attunity,Oracle Golden Gate,Debezium,Fivetran,自定義Binlog解析器
Hudi:開源/企業Hadoop上的Apache Hudi
Delta:開源/企業Hadoop上的Delta Lake
對象/文件存儲:ADLS / HDFS數據庫
2.2 數據準備步驟
create database demo;
use demo;
create table hudi_delta_test
(
pk_id integer,
name varchar(255),
value integer,
updated_at timestamp default now() on update now(),
created_at timestamp default now(),
constraint pk primary key(pk_id)
);
insert into hudi_delta_test(pk_id,name,value) values(1,’apple’,10);
insert into hudi_delta_test(pk_id,name,value) values(2,’samsung’,20);
insert into hudi_delta_test(pk_id,name,value) values(3,’dell’,30);
insert into hudi_delta_test(pk_id,name,value) values(4,’motorola’,40);
如今使用DMS將數據加載到S3中的某個位置,並使用文件夾名稱full_load來標識該位置。爲了更貼合標題,咱們將跳過DMS的設置和配置。加載到S3後以下圖所示。
apache
insert into hudi_delta_test(pk_id,name,value) values(4,’motorola’,40);
update hudi_delta_test set value = 201 where pk_id=2;
delete from hudi_delta_test where pk_id=3;
繼續略過DMS階段,將CDC數據按如下方式加載到S3,以下圖所示微信
注意:DMS將填充一個名爲「 Op」的附加字段,表示「操做」,Op取值I/U/D,分別對應插入、更新和刪除。如下圖所示顯示了CDC數據的內容。app
df = spark.read.parquet('s3://development-dl/demo/hudi-delta-demo/raw_data/cdc_load/demo/hudi_delta_test')
df.show()
完成了數據準備後正式開始比對。DMS將持續將CDC事件傳送到S3(供Hudi和Delta Lake使用),此S3爲數據源。兩種工具的最終狀態都旨在得到一致的統一視圖,如上圖MySQL所示。
框架
3. 使用Apache HUDI
Hudi有兩種方式處理UPSERTS [1]工具
寫時複製(CoW):數據以列格式(Parquet)存儲,而且在更新時會建立文件的新版本。此存儲類型最適合於讀繁重的工做負載,由於數據集的最新版本始終在有效的列格式文件中可用。
讀時合併(MoR):數據以列(Parquet)和基於行(Avro)的格式存儲;更新記錄到基於行的「增量文件」中,並在之後進行壓縮,以建立列文件的新版本。此存儲類型最適合於寫繁重的工做負載,由於新提交會以增量文件的形式快速寫入,可是讀取數據集須要合併列文件與增量文件。
3.1 啓動Spark Shell
使用如下命令打開Spark Shell並進行相關導入.
spark-shell — conf 「spark.serializer=org.apache.spark.serializer.KryoSerializer」 — conf 「spark.sql.hive.convertMetastoreParquet=false」 — jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions._
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.hive.MultiPartKeysValueExtractor
3.2 使用CoW
val inputDataPath = 「s3://development-dl/demo/hudi-delta-demo/raw_data/full_load/demo/hudi_delta_test」
val hudiTableName = 「hudi_cow」
val hudiTablePath = 「s3://development-dl/demo/hudi-delta-demo/hudi_cow」
val hudiOptions = Map[String,String]
(
DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> 「pk_id」,
DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> 「created_at」,
HoodieWriteConfig.TABLE_NAME -> hudiTableName,
DataSourceWriteOptions.OPERATION_OPT_KEY ->
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY -> 「COPY_ON_WRITE」,
DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> 「updated_at」,
DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> 「true」,
DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> hudiTableName,
DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> 「created_at」,
DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY -> 「false」,
DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[MultiPartKeysValueExtractor].getName
)
val temp = spark.read.format(「parquet」).load(inputDataPath)
val fullDF = temp.withColumn(「Op」,lit(‘I’))
fullDF.write.format(「org.apache.hudi」).options(hudiOptions).mode(SaveMode.Overwrite).save(hudiTablePath)
因爲在Hudi選項中使用了Hive自動同步配置,所以會在Hive中建立一個名爲「 hudi_cow」的表。該表使用具備Hoodie格式的Parquet SerDe建立,表結構以下圖所示。
val updateDF = spark.read.parquet(「s3://development-dl/demo/hudi-delta-demo/raw_data/cdc_load/demo/hudi_delta_test」)
updateDF.write.format(「org.apache.hudi」).options(hudiOptions).option(DataSourceWriteOptions.OPERATION_OPT_KEY,DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL).mode(SaveMode.Append).save(hudiTablePath)
進行更新操做,表「hudi_cow」將有最新的更新數據,以下圖所示
如CoW定義中所述,當咱們以hudi格式將updateDF寫入同一S3位置時,更新的數據在寫時被複制,而且快照和增量數據使用同一張表。
3.3 使用MoR
val inputDataPath = 「s3://development-dl/demo/hudi-delta-demo/raw_data/full_load/demo/hudi_delta_test」
val hudiTableName = 「hudi_mor」
val hudiTablePath = 「s3://development-dl/demo/hudi-delta-demo/hudi_mor」
val hudiOptions = Map[String,String]
(
DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> 「pk_id」,
DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> 「created_at」,
HoodieWriteConfig.TABLE_NAME -> hudiTableName,
DataSourceWriteOptions.OPERATION_OPT_KEY ->
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY -> 「MERGE_ON_READ」,
DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> 「updated_at」,
DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> 「true」,
DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> hudiTableName,
DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> 「created_at」,
DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY -> 「false」,
DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[MultiPartKeysValueExtractor].getName
)
val temp = spark.read.format(「parquet」).load(inputDataPath)
val fullDF = temp.withColumn(「Op」,lit(‘I’))
fullDF.write.format(「org.apache.hudi」).options(hudiOptions).mode(SaveMode.Overwrite).save(hudiTablePath)
仍是開啓了Hive自動同步,將在Hive中建立兩張名爲「hudi_mor」和「 hudi_mor_rt」的表。hudi_mor是通過讀優化的表,具備快照數據,而hudi_mor_rt將具備增量和實時合併數據。數據將會以頻繁的壓縮間隔被壓縮,並提供給hudi_mor。hudi_mor_rt利用Avro格式存儲增量數據。正如MoR定義所示,經過hudi_mor_rt讀取數據時將即時合併。這對於高更新源表頗有用,同時還提供一致且非最新的讀優化表。
注意:「 hudi_mor」和「 hudi_mor_rt」都指向相同的S3存儲桶,只是定義了不一樣的存儲格式。
能夠看到加載後兩表內容相同,內容以下所示
val updateDF = spark.read.parquet(「s3://development-dl/demo/hudi-delta-demo/raw_data/cdc_load/demo/hudi_delta_test」)
updateDF.write.format(「org.apache.hudi」).options(hudiOptions).option(DataSourceWriteOptions.OPERATION_OPT_KEY,DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL).mode(SaveMode.Append).save(hudiTablePath)
表hudi_mor在很短期內就具備相同的內容(由於演示中的數據很小,而且很快會被壓縮),只要merge成功,表hudi_mor_rt就會有最新數據。
如今看看這些Hudi格式表的S3日誌的變化。底層存儲格式爲parquet,同時經過日誌方式管理ACID。一般生成如下類型的文件:
hoodie_partition_metadata:這是一個小文件,包含有關給定分區中partitionDepth和最後一次commitTime的信息
hoodie.properties:存儲表名稱、存儲類型信息
commit和clean:文件統計信息和有關正在寫入的新文件的信息,以及諸如numWrites,numDeletes,numUpdateWrites,numInserts和一些其餘相關審計字段之類的信息,存儲在這些文件中。這些文件在每次提交後生成
以上3個文件對於CoW和MoR類型的表都是通用的。另外對於MoR表,額外有爲UPSERTED分區建立的avro格式的日誌文件。以下所示的第一個log文件是CoW表中不存在的日誌文件。
4. 使用Delta Lake
使用下面的代碼片斷,咱們以parquet格式讀取完整的數據,並以delta格式將其寫入不一樣的位置
from pyspark.sql.functions import *
inputDataPath = "s3://development-dl/demo/hudi-delta-demo/raw_data/full_load/demo/hudi_delta_test"
deltaTablePath = "s3://development-dl/demo/hudi-delta-demo/delta_table"
fullDF = spark.read.format("parquet").load(inputDataPath)
fullDF = fullDF.withColumn("Op",lit('I'))
fullDF.write.format("delta").mode("overwrite").save(deltaTablePath)
在Databricks Notebook的SQL界面中使用如下命令能夠建立一個Hive外表,「using delta」關鍵字會包含基礎SERDE和FILE格式的定義。
%sql
create table delta_table
using delta
location 's3://development-dl/demo/hudi-delta-demo/delta_table'
該表的DDL以下所示。
%sql
show create table delta_table
表會包含與完整加載文件相同的全部記錄。
%sql
select * from delta_table
使用如下命令讀取CDC數據並在Hive中註冊爲臨時視圖
updateDF = spark.read.parquet("s3://development-dl/demo/hudi-delta-demo/raw_data/cdc_load/demo/hudi_delta_test")
updateDF.createOrReplaceTempView("temp")
MERGE命令:下面是執行UPSERT的MERGE SQL,它做爲SQL很方便地被執行,也能夠在spark.sql()方法調用中執行
%sql
MERGE INTO delta_table target
USING
(SELECT Op,latest_changes.pk_id,name,value,updated_at,created_at
FROM temp latest_changes
INNER JOIN (
SELECT pk_id, max(updated_at) AS MaxDate
FROM temp
GROUP BY pk_id
) cm ON latest_changes.pk_id = cm.pk_id AND latest_changes.updated_at = cm.MaxDate) as source
ON source.pk_id == target.pk_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
MERGE以後,Hive中delta_table的內容也更新了。
%sql
select * from delta_table
與Hudi同樣,Delta Lake基本文件存儲格式也是「parquet」。Delta提供帶有日誌和版本控制的ACID功能。接着看看S3在裝載和CDC合併後的變化。
增量日誌包含JSON格式的日誌,文件中包含每次提交後的schema和最新文件的信息。
在CDC合併的狀況下,因爲能夠插入/更新或刪除多條記錄。初始parquet文件的內容分爲多個較小的parquet文件,這些較小的文件會被重寫。若是對錶進行了分區,則僅與更新的分區相對應的CDC數據將受到影響。初始parquet文件仍存在於該文件夾中,但已重新的日誌文件中刪除。若是咱們在此表上運行VACUUM,則能夠物理刪除該文件。也可使用OPTIMIZE命令[6]來串聯這些較小的文件。
Delta日誌附加了另外一個JSON格式的日誌文件,該文件存儲schema和指向最新文件的文件指針。
5. 總結
上述兩個示例中都按原樣保留了刪除的記錄,並經過Op ='D'標識刪除,這是故意而爲以顯示DMS的功能,下面的參考資料顯示瞭如何將這種軟刪除轉換爲硬刪除。
但願這是一個有用的比較,有助於作出合理的選擇,選擇合適的數據湖框架。
參考資料
https://aws.amazon.com/blogs/aws/new-insert-update-delete-data-on-s3-with-amazon-emr-and-apache-hudi/
https://databricks.com/blog/2019/07/15/migrating-transactional-data-to-a-delta-lake-using-aws-dms.html
https://hudi.apache.org/
https://docs.delta.io/
https://databricks.com/blog/2019/08/21/diving-into-delta-lake-unpacking-the-transaction-log.html
https://docs.databricks.com/delta/optimizations/index.html
阿里巴巴開源大數據技術團隊成立Apache Spark中國技術社區,按期推送精彩案例,技術專家直播,問答區近萬人Spark技術同窗在線提問答疑,只爲營造純粹的Spark氛圍,歡迎釘釘掃碼加入!
對開源大數據和感興趣的同窗能夠加小編微信(下圖二維碼,備註「進羣」)進入技術交流微信羣。Apache
Spark技術交流社區公衆號,微信掃一掃關注
本文分享自微信公衆號 - Delta Lake技術圈(deltalake-emr2020)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。