Apache Hudi 介紹與應用

Apache Hudihtml

Apache Hudi 在基於 HDFS/S3 數據存儲之上,提供了兩種流原語:git

  1. 插入更新
  2. 增量拉取

通常來講,咱們會將大量數據存儲到HDFS/S3,新數據增量寫入,而舊數據鮮有改動,特別是在通過數據清洗,放入數據倉庫的場景。並且在數據倉庫如 hive中,對於update的支持很是有限,計算昂貴。另外一方面,如果有僅對某段時間內新增數據進行分析的場景,則hive、presto、hbase等也未提供原生方式,而是須要根據時間戳進行過濾分析。sql

在此需求下,Hudi能夠提供這兩種需求的實現。第一個是對record級別的更新,另外一個是僅對增量數據的查詢。且Hudi提供了對Hive、presto、Spark的支持,能夠直接使用這些組件對Hudi管理的數據進行查詢。shell

 

存儲類型apache

咱們看一下 Hudi 的兩種存儲類型:app

  1. 寫時複製(copy on write):僅使用列式文件(parquet)存儲數據。在寫入/更新數據時,直接同步合併原文件,生成新版本的基文件(須要重寫整個列數據文件,即便只有一個字節的新數據被提交)。此存儲類型下,寫入數據很是昂貴,而讀取的成本沒有增長,因此適合頻繁讀的工做負載,由於數據集的最新版本在列式文件中始終可用,以進行高效的查詢。
  2. 讀時合併(merge on read):使用列式(parquet)與行式(avro)文件組合,進行數據存儲。在更新記錄時,更新到增量文件中(avro),而後進行異步(或同步)的compaction,建立列式文件(parquet)的新版本。此存儲類型適合頻繁寫的工做負載,由於新記錄是以appending 的模式寫入增量文件中。可是在讀取數據集時,須要將增量文件與舊文件進行合併,生成列式文件。

 

視圖異步

在瞭解這兩種存儲類型後,咱們再看一下Hudi支持的存儲數據的視圖(也就是查詢模式):ide

  1. 讀優化視圖(Read Optimized view):直接query 基文件(數據集的最新快照),也就是列式文件(如parquet)。相較於非Hudi列式數據集,有相同的列式查詢性能
  2. 增量視圖(Incremental View):僅query新寫入數據集的文件,也就是指定一個commit/compaction,query此以後的新數據。
  3. 實時視圖(Real-time View):query最新基文件與增量文件。此視圖經過將最新的基文件(parquet)與增量文件(avro)進行動態合併,而後進行query。能夠提供近實時的數據(會有幾分鐘的延遲)

在以上3種視圖中,「讀優化視圖」與「增量視圖」都可在「寫時複製」與「讀時合併」的存儲類型下使用。而「實時視圖「僅能在」讀時合併「模式下使用。性能

存儲類型優化

支持的視圖

寫時複製

讀優化 + 增量

讀時合併

讀優化 + 增量 + 近實時

 

時間軸

最後介紹一下 Hudi 的核心 —— 時間軸。Hudi 會維護一個時間軸,在每次執行操做時(如寫入、刪除、合併等),均會帶有一個時間戳。經過時間軸,能夠實如今僅查詢某個時間點以後成功提交的數據,或是僅查詢某個時間點以前的數據。這樣能夠避免掃描更大的時間範圍,並不是常高效地只消費更改過的文件(例如在某個時間點提交了更改操做後,僅query某個時間點以前的數據,則仍能夠query修改前的數據)。

 

使用案例

下面咱們嘗試使用Hudi API 進行讀寫。

 

寫入數據

首先準備數據集,部分條目爲:

1535908|Big Run|Stream|WV|38.6370428|-80.8595469|794

875609|Constable Hook|Cape|NJ|40.657881|-74.0990309|7

1217998|Gooseberry Island|Island|RI|41.4534361|-71.3253284|10

26603|Boone Moore Spring|Spring|AZ|34.0895692|-111.410065|3681

1506738|Missouri Flat|Flat|WA|46.7634987|-117.0346113|2605

 

啓動spark-shell,並指定hudi jar包:

spark-shell --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" --conf "spark.sql.hive.convertMetastoreParquet=false" --jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar

 

加載指定包:

import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.hive.MultiPartKeysValueExtractor

 

指定建立的Hudi表名與路徑:

val tableName = "hudi_table"
val basePath = "s3://xxxx/xxx"

 

構造 DataFrame:

val lineRDD = sc.textFile("features.txt").map(_.split("\\|")).filter(_.length > 6)
case class Record(id:Int, name:String, c_class:String, state:String, latitude:Float, longitude:String, elevation:Int)
val RecordRDD = lineRDD.map(x=>Record(x(0).toInt, x(1), x(2), x(3), x(4).toFloat, x(5), x(6).toInt))
val featureDF=RecordRDD.toDF

 

插入數據到 Hudi(以及Hive):

featureDF.write.format("org.apache.hudi").
  option(RECORDKEY_FIELD_OPT_KEY, "c_class").
  option(PARTITIONPATH_FIELD_OPT_KEY, "state").
  option(PRECOMBINE_FIELD_OPT_KEY, "id").
  option(TABLE_NAME, tableName).
  option(HIVE_SYNC_ENABLED_OPT_KEY, "true").
  option(HIVE_TABLE_OPT_KEY, "hivehudi").
  option(HIVE_PARTITION_FIELDS_OPT_KEY, "state").
  option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, classOf[MultiPartKeysValueExtractor].getName).
  mode(Overwrite).
  save(basePath);

 

咱們能夠看到目錄結構相似於 Hive:

hudi/hudi_table/AR/44bfae35-056b-4bcd-8970-5f1271c3845d-0_18-215-89206_20191121100011.parquet

hudi/hudi_table/CA/2a591ee9-afa4-48d9-bd16-63376a1b8e06-0_38-215-89226_20191121100011.parquet

hudi/hudi_table/CT/911510f9-0655-405f-afad-be9c15429e81-0_46-215-89234_20191121100011.parquet

表名爲hudi_table,分區鍵爲 state,真正存儲數據的文件爲parquet。

 

查詢數據

首先載入數據格式:

val toViewDF = spark.read.format("org.apache.hudi").load(basePath + "/*/*")

 

咱們在上面插入數據的時候,同時建立了 Hive 表,因此有如下兩種方式作查詢:

  1. 直接查詢 Hive 表:

  spark.sql("select name from hivehudi where c_class='Summit'").show()

  +--------------------+

  |                name|

  +--------------------+

  |           High Knob|

  | White Rock Mountain|

  |      Open Mine Hill|

       …

      

       2. 使用臨時表:

  roViewDF.registerTempTable("hudi_ro_table")

  spark.sql("select id,name from hudi_ro_table where c_class='Stream'").show()

  +-------+--------------------+

  |     id|                name|

  +-------+--------------------+

  | 539931|   Tiger Point Gully|

  | 871801|           Dry Brook|

  | 847407|      McClusky Creek|

  | 637687|          Shaw Drain|

  | 749747|        Duncan Creek|

  |1502779|         Brushy Lick|

  …

 

更新數據

首先咱們看一條數據:

spark.sql("select id,name from hudi_ro_table where c_class='Stream' and id=539931").show()

+------+-----------------+

|    id|             name|

+------+-----------------+

|539931|Tiger Point Gully|

 

而後更新此數據(更新的數據存儲在一個新的源文件中):

val updateRDD = sc.textFile("update.txt").map(_.split("\\|")).filter(_.length>6)
val updateDF = updateRDD.map(x=>Record(x(0).toInt, x(1), x(2), x(3), x(4).toFloat, x(5), x(6).toInt)).toDF
updateDF.write.format("org.apache.hudi").
  option(RECORDKEY_FIELD_OPT_KEY, "c_class").
  option(PARTITIONPATH_FIELD_OPT_KEY, "state").
  option(PRECOMBINE_FIELD_OPT_KEY, "id").
  option(TABLE_NAME, tableName).
  option(HIVE_SYNC_ENABLED_OPT_KEY, "true").
  option(HIVE_TABLE_OPT_KEY, "hivehudi").
  option(HIVE_PARTITION_FIELDS_OPT_KEY, "state").
  option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, classOf[MultiPartKeysValueExtractor].getName).
  mode(Append).
  save(basePath);

 

能夠看到咱們這裏使用的模式由Overwrite 改成了 Append,也就是追加的模式,其他的基本不變。咱們首先分別看一下 hive 表與 hudi 表中的數據變化。

 

Hive 表中:

spark.sql("select id,name from hivehudi where c_class='Stream' and id=539931").show()

+------+-----------------+

|    id|             name|

+------+-----------------+

|539931|Tiger Point Gully|

|539931|     Tiger-update|

+------+-----------------+

 

Hudi 表中:

val appViewDF = spark.read.format("org.apache.hudi").load(basePath + "/*/*")
appViewDF.registerTempTable("hudi_update_table")
spark.sql("select id,name from hudi_update_table where c_class='Stream' and id=539931").show()

+------+-----------------+

|    id|             name|

+------+-----------------+

|539931|Tiger Point Gully|

|539931|     Tiger-update|

+------+-----------------+

 

能夠看到都可以查到更新後的數據。

對數據執行 select * 加上過濾條件:

 

 

能夠看到表中有2個比較有意思的字段,分別爲:_hoodie_commit_time, _hoodie_commit_seqno

上文咱們提到過 Hudi 有一個核心爲時間軸,每次執行一個commit時,都會生成一個時間戳。這裏 _hoodie_commit_time 即記錄了commit 的時間戳。進一步的,Hudi 即是基於此實現了增量查詢。

下面咱們嘗試一下增量查詢:

// 獲取 commit 時間戳
val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_update_table order by commitTime").map(k => k.getString(0)).take(3)

// 設置起始時間戳爲上次時間戳
val beginTime = commits(commits.length - 2)

// 增量查詢
val incViewDF = spark.
  read.
  format("org.apache.hudi").
  option(VIEW_TYPE_OPT_KEY, VIEW_TYPE_INCREMENTAL_OPT_VAL).
  option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
  load(basePath);
incViewDF.registerTempTable("hudi_incr_table") spark.sql("select * from hudi_incr_table where c_class='Stream' and id=539931").show()

 

這裏咱們使用增量查詢的選項 VIEW_TYPE_INCREMENTAL_OPT_VAL,以及設置了時間戳的起始時間。查詢結果爲:

 

能夠看到查詢到的數據僅爲上次commit 後的數據。

 

固然,咱們也能夠指定時間段內的數據進行查詢,指定下面選項便可:

    option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
    option(END_INSTANTTIME_OPT_KEY, endTime).

 

Hudi CLI

最後咱們看下一下 Hudi CLI

// 啓動 hudi cli:

/usr/lib/hudi/cli/bin/hudi-cli.sh

// 鏈接hudi 數據表

connect --path s3://xxxx/hudi/hudi_table

 

接下來咱們能夠查看提交過的 commit:

 

 

 

甚至回滾 commit:

commit rollback --commit 20191122073858

 

回滾後再次對 hive 表執行查詢:

spark.sql("select id,name from hivehudi where c_class='Stream' and id=539931").show()

+------+-----------------+

|    id|             name|

+------+-----------------+

|539931|Tiger Point Gully|

+------+-----------------+

能夠看到以前更新的數據已經被刪除。

 

在 Hudi  Cli 下,咱們也能夠建立表(create)、列出commit時文件級別的信息(commit showfiles)等。更多 Hudi cli 的用法,能夠在 Hudi Cli 下輸入 help 獲取更多信息。

 

References:

Apache Hudi 官方介紹:https://hudi.apache.org/index.html

Apache Hudi Quick Start:https://hudi.apache.org/quickstart.html

Apache Hudi CLI: https://hudi.apache.org/admin_guide.html

原文出處:https://www.cnblogs.com/zackstang/p/11912994.html

相關文章
相關標籤/搜索