傳統數倉的組織架構是針對離線數據的OLAP(聯機事務分析)需求設計的,經常使用的導入數據方式爲採用sqoop或spark定時做業逐批將業務庫數據導入數倉。隨着數據分析對實時性要求的不斷提升,按小時、甚至分鐘級的數據同步愈來愈廣泛。由此展開了基於spark/flink流處理機制的(準)實時同步系統的開發。java
然而實時同步數倉從一開始就面臨以下幾個挑戰:node
Hudi是針對以上問題的解決方案之一。如下是對Hudi的簡單介紹,主要內容翻譯自官網。sql
Hudi內部按照操做時刻(instant)對錶的全部操做維護了一條時間線,由此能夠提供表在某一時刻的視圖,還可以高效的提取出延後到達的數據。每個時刻包含:apache
commit:提交,將批次的數據原子性的寫入表;bootstrap
clean: 清除,後臺做業,不斷清除不須要的舊得版本的數據;架構
delta_commit:delta 提交是將批次記錄原子性的寫入MergeOnRead表中,數據寫入的目的地是delta日誌文件;app
compacttion:壓縮,後臺做業,將不一樣結構的數據,例如記錄更新操做的行式存儲的日誌文件合併到列式存儲的文件中。壓縮自己是一個特殊的commit操做;異步
rollback:回滾,一些不成功時,刪除全部部分寫入的文件;ide
savepoint:保存點,標誌某些文件組爲「保存的「,這樣cleaner就不會刪除這些文件;oop
requested 某個操做被安排執行,但還沒有初始化
inflight 某個操做正在執行
completed 某一個操做在時間線上已經完成
Hudi保證按照時間線執行的操做按照時刻時間具備原子性及時間線一致性。
Hudi表存在在DFS系統的 base path(用戶寫入Hudi時自定義) 目錄下,在該目錄下被分紅不一樣的分區。每個分區以 partition path 做爲惟一的標識,組織形式與Hive相同。
每個分區內,文件經過惟一的 FileId 文件id 劃分到 FileGroup 文件組。每個FileGroup包含多個 FileSlice 文件切片,每個切片包含一個由commit或compaction操做造成的base file 基礎文件(parquet文件),以及包含對基礎文件進行inserts/update操做的log files 日誌文件(log文件)。Hudi採用了MVCC設計,compaction操做會將日誌文件和對應的基礎文件合併成新的文件切片,clean操做則刪除無效的或老版本的文件。
Hudi經過映射Hoodie鍵(記錄鍵+ 分區路徑)到文件id,提供了高效的upsert操做。當第一個版本的記錄寫入文件時,這個記錄鍵值和文件的映射關係就不會發生任何改變。換言之,映射的文件組始終包含一組記錄的全部版本。
Hudi表類型定義了數據是如何被索引、分佈到DFS系統,以及以上基本屬性和時間線事件如何施加在這個組織上。查詢類型定義了底層數據如何暴露給查詢。
表類型 | 支持的查詢類型 |
---|---|
Copy On Write寫時複製 | 快照查詢 + 增量查詢 |
Merge On Read讀時合併 | 快照查詢 + 增量查詢 + 讀取優化 |
Copy On Write:僅採用列式存儲文件(parquet)存儲文件。更新數據時,在寫入的同時同步合併文件,僅僅修改文件的版次並重寫。
Merge On Read:採用列式存儲文件(parquet)+行式存儲文件(avro)存儲數據。更新數據時,新數據被寫入delta文件並隨後以異步或同步的方式合併成新版本的列式存儲文件。
取捨 | CopyOnWrite | MergeOnRead |
---|---|---|
數據延遲 | 高 | 低 |
Update cost (I/O)更新操做開銷(I/O) | 高(重寫整個parquet) | 低(追加到delta記錄) |
Parquet文件大小 | 小(高更新(I/O)開銷) | 大(低更新開銷) |
寫入頻率 | 高 | 低(取決於合併策略) |
取捨 | 快照 | 讀取優化 |
---|---|---|
數據延遲 | 低 | 高 |
查詢延遲 | 高(合併基礎/列式存儲文件 + 行式存儲delta / 日誌 文件) | 低(原有的基礎/列式存儲文件查詢性能) |
如下是整合spark結構化流+hudi的示意代碼,因爲Hudi OutputFormat目前只支持在spark rdd對象中調用,所以寫入HDFS操做採用了spark structured streaming的forEachBatch算子。具體說明見註釋。
package pers.machi.sparkhudi import org.apache.log4j.Logger import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.{DataFrame, Row, SaveMode} import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType} object SparkHudi { val logger = Logger.getLogger(SparkHudi.getClass) def main(args: Array[String]): Unit = { val spark = SparkSession .builder .appName("SparkHudi") //.master("local[*]") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("spark.default.parallelism", 9) .config("spark.sql.shuffle.partitions", 9) .enableHiveSupport() .getOrCreate() // 添加監聽器,每一批次處理完成,將該批次的相關信息,如起始offset,抓取記錄數量,處理時間打印到控制檯 spark.streams.addListener(new StreamingQueryListener() { override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = { println("Query started: " + queryStarted.id) } override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = { println("Query terminated: " + queryTerminated.id) } override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = { println("Query made progress: " + queryProgress.progress) } }) // 定義kafka流 val dataStreamReader = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("subscribe", "testTopic") .option("startingOffsets", "latest") .option("maxOffsetsPerTrigger", 100000) .option("failOnDataLoss", false) // 加載流數據,這裏由於只是測試使用,直接讀取kafka消息而不作其餘處理,是spark結構化流會自動生成每一套消息對應的kafka元數據,如消息所在主題,分區,消息對應offset等。 val df = dataStreamReader.load() .selectExpr( "topic as kafka_topic" "CAST(partition AS STRING) kafka_partition", "cast(timestamp as String) kafka_timestamp", "CAST(offset AS STRING) kafka_offset", "CAST(key AS STRING) kafka_key", "CAST(value AS STRING) kafka_value", "current_timestamp() current_time", ) .selectExpr( "kafka_topic" "concat(kafka_partition,'-',kafka_offset) kafka_partition_offset", "kafka_offset", "kafka_timestamp", "kafka_key", "kafka_value", "substr(current_time,1,10) partition_date") // 建立並啓動query val query = df .writeStream .queryName("demo"). .foreachBatch { (batchDF: DataFrame, _: Long) => { batchDF.persist() println(LocalDateTime.now() + "start writing cow table") batchDF.write.format("org.apache.hudi") .option(TABLE_TYPE_OPT_KEY, "COPY_ON_WRITE") .option(PRECOMBINE_FIELD_OPT_KEY, "kafka_timestamp") // 以kafka分區和偏移量做爲組合主鍵 .option(RECORDKEY_FIELD_OPT_KEY, "kafka_partition_offset") // 以當前日期做爲分區 .option(PARTITIONPATH_FIELD_OPT_KEY, "partition_date") .option(TABLE_NAME, "copy_on_write_table") .option(HIVE_STYLE_PARTITIONING_OPT_KEY, true) .mode(SaveMode.Append) .save("/tmp/sparkHudi/COPY_ON_WRITE") println(LocalDateTime.now() + "start writing mor table") batchDF.write.format("org.apache.hudi") .option(TABLE_TYPE_OPT_KEY, "MERGE_ON_READ") .option(TABLE_TYPE_OPT_KEY, "COPY_ON_WRITE") .option(PRECOMBINE_FIELD_OPT_KEY, "kafka_timestamp") .option(RECORDKEY_FIELD_OPT_KEY, "kafka_partition_offset") .option(PARTITIONPATH_FIELD_OPT_KEY, "partition_date") .option(TABLE_NAME, "merge_on_read_table") .option(HIVE_STYLE_PARTITIONING_OPT_KEY, true) .mode(SaveMode.Append) .save("/tmp/sparkHudi/MERGE_ON_READ") println(LocalDateTime.now() + "finish") batchDF.unpersist() } } .option("checkpointLocation", "/tmp/sparkHudi/checkpoint/") .start() query.awaitTermination() } }
受限於測試條件,此次測試沒有考慮update操做,而僅僅是測試hudi對追加新數據的性能。
數據程序一共運行5天,期間未發生報錯致使程序退出。
kafka天天讀取數據約1500萬條,被消費的topic共有9個分區。
幾點說明以下
1 是否有數據丟失及重複
因爲每條記錄的分區+偏移量具備惟一性,經過檢查同一分區下是否有偏移量重複及不連續的狀況,能夠判定數據不存丟失及重複消費的狀況。
2 最小可支持的單日寫入數據條數
數據寫入效率,對於cow及mor表,不存在更新操做時,寫入速率接近。這本次測試中,spark每秒處理約170條記錄。單日可處理1500萬條記錄。
3 cow和mor表文件大小對比
每十分鐘讀取兩種表同一分區小文件大小,單位M。結果以下圖,mor表文件大小增長較大,佔用磁盤資源較多。不存在更新操做時,儘量使用cow表。