Apache Hudihtml
Apache Hudi 在基於 HDFS/S3 數據存儲之上,提供了兩種流原語:git
通常來講,咱們會將大量數據存儲到HDFS/S3,新數據增量寫入,而舊數據鮮有改動,特別是在通過數據清洗,放入數據倉庫的場景。並且在數據倉庫如 hive中,對於update的支持很是有限,計算昂貴。另外一方面,如果有僅對某段時間內新增數據進行分析的場景,則hive、presto、hbase等也未提供原生方式,而是須要根據時間戳進行過濾分析。sql
在此需求下,Hudi能夠提供這兩種需求的實現。第一個是對record級別的更新,另外一個是僅對增量數據的查詢。且Hudi提供了對Hive、presto、Spark的支持,能夠直接使用這些組件對Hudi管理的數據進行查詢。shell
存儲類型apache
咱們看一下 Hudi 的兩種存儲類型:app
視圖異步
在瞭解這兩種存儲類型後,咱們再看一下Hudi支持的存儲數據的視圖(也就是查詢模式):ide
在以上3種視圖中,「讀優化視圖」與「增量視圖」都可在「寫時複製」與「讀時合併」的存儲類型下使用。而「實時視圖「僅能在」讀時合併「模式下使用。性能
存儲類型優化 |
支持的視圖 |
寫時複製 |
讀優化 + 增量 |
讀時合併 |
讀優化 + 增量 + 近實時 |
時間軸
最後介紹一下 Hudi 的核心 —— 時間軸。Hudi 會維護一個時間軸,在每次執行操做時(如寫入、刪除、合併等),均會帶有一個時間戳。經過時間軸,能夠實如今僅查詢某個時間點以後成功提交的數據,或是僅查詢某個時間點以前的數據。這樣能夠避免掃描更大的時間範圍,並不是常高效地只消費更改過的文件(例如在某個時間點提交了更改操做後,僅query某個時間點以前的數據,則仍能夠query修改前的數據)。
使用案例
下面咱們嘗試使用Hudi API 進行讀寫。
寫入數據
首先準備數據集,部分條目爲:
1535908|Big Run|Stream|WV|38.6370428|-80.8595469|794
875609|Constable Hook|Cape|NJ|40.657881|-74.0990309|7
1217998|Gooseberry Island|Island|RI|41.4534361|-71.3253284|10
26603|Boone Moore Spring|Spring|AZ|34.0895692|-111.410065|3681
1506738|Missouri Flat|Flat|WA|46.7634987|-117.0346113|2605
…
啓動spark-shell,並指定hudi jar包:
spark-shell --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" --conf "spark.sql.hive.convertMetastoreParquet=false" --jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar
加載指定包:
import scala.collection.JavaConversions._ import org.apache.spark.sql.SaveMode._ import org.apache.hudi.DataSourceReadOptions._ import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.config.HoodieWriteConfig._ import org.apache.hudi.hive.MultiPartKeysValueExtractor
指定建立的Hudi表名與路徑:
val tableName = "hudi_table" val basePath = "s3://xxxx/xxx"
構造 DataFrame:
val lineRDD = sc.textFile("features.txt").map(_.split("\\|")).filter(_.length > 6) case class Record(id:Int, name:String, c_class:String, state:String, latitude:Float, longitude:String, elevation:Int) val RecordRDD = lineRDD.map(x=>Record(x(0).toInt, x(1), x(2), x(3), x(4).toFloat, x(5), x(6).toInt)) val featureDF=RecordRDD.toDF
插入數據到 Hudi(以及Hive):
featureDF.write.format("org.apache.hudi"). option(RECORDKEY_FIELD_OPT_KEY, "c_class"). option(PARTITIONPATH_FIELD_OPT_KEY, "state"). option(PRECOMBINE_FIELD_OPT_KEY, "id"). option(TABLE_NAME, tableName). option(HIVE_SYNC_ENABLED_OPT_KEY, "true"). option(HIVE_TABLE_OPT_KEY, "hivehudi"). option(HIVE_PARTITION_FIELDS_OPT_KEY, "state"). option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, classOf[MultiPartKeysValueExtractor].getName). mode(Overwrite). save(basePath);
咱們能夠看到目錄結構相似於 Hive:
hudi/hudi_table/AR/44bfae35-056b-4bcd-8970-5f1271c3845d-0_18-215-89206_20191121100011.parquet
hudi/hudi_table/CA/2a591ee9-afa4-48d9-bd16-63376a1b8e06-0_38-215-89226_20191121100011.parquet
hudi/hudi_table/CT/911510f9-0655-405f-afad-be9c15429e81-0_46-215-89234_20191121100011.parquet
…
表名爲hudi_table,分區鍵爲 state,真正存儲數據的文件爲parquet。
查詢數據
首先載入數據格式:
val toViewDF = spark.read.format("org.apache.hudi").load(basePath + "/*/*")
咱們在上面插入數據的時候,同時建立了 Hive 表,因此有如下兩種方式作查詢:
spark.sql("select name from hivehudi where c_class='Summit'").show()
+--------------------+
| name|
+--------------------+
| High Knob|
| White Rock Mountain|
| Open Mine Hill|
…
2. 使用臨時表:
roViewDF.registerTempTable("hudi_ro_table")
spark.sql("select id,name from hudi_ro_table where c_class='Stream'").show()
+-------+--------------------+
| id| name|
+-------+--------------------+
| 539931| Tiger Point Gully|
| 871801| Dry Brook|
| 847407| McClusky Creek|
| 637687| Shaw Drain|
| 749747| Duncan Creek|
|1502779| Brushy Lick|
…
更新數據
首先咱們看一條數據:
spark.sql("select id,name from hudi_ro_table where c_class='Stream' and id=539931").show()
+------+-----------------+
| id| name|
+------+-----------------+
|539931|Tiger Point Gully|
而後更新此數據(更新的數據存儲在一個新的源文件中):
val updateRDD = sc.textFile("update.txt").map(_.split("\\|")).filter(_.length>6) val updateDF = updateRDD.map(x=>Record(x(0).toInt, x(1), x(2), x(3), x(4).toFloat, x(5), x(6).toInt)).toDF updateDF.write.format("org.apache.hudi"). option(RECORDKEY_FIELD_OPT_KEY, "c_class"). option(PARTITIONPATH_FIELD_OPT_KEY, "state"). option(PRECOMBINE_FIELD_OPT_KEY, "id"). option(TABLE_NAME, tableName). option(HIVE_SYNC_ENABLED_OPT_KEY, "true"). option(HIVE_TABLE_OPT_KEY, "hivehudi"). option(HIVE_PARTITION_FIELDS_OPT_KEY, "state"). option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, classOf[MultiPartKeysValueExtractor].getName). mode(Append). save(basePath);
能夠看到咱們這裏使用的模式由Overwrite 改成了 Append,也就是追加的模式,其他的基本不變。咱們首先分別看一下 hive 表與 hudi 表中的數據變化。
Hive 表中:
spark.sql("select id,name from hivehudi where c_class='Stream' and id=539931").show()
+------+-----------------+
| id| name|
+------+-----------------+
|539931|Tiger Point Gully|
|539931| Tiger-update|
+------+-----------------+
Hudi 表中:
val appViewDF = spark.read.format("org.apache.hudi").load(basePath + "/*/*") appViewDF.registerTempTable("hudi_update_table") spark.sql("select id,name from hudi_update_table where c_class='Stream' and id=539931").show()
+------+-----------------+
| id| name|
+------+-----------------+
|539931|Tiger Point Gully|
|539931| Tiger-update|
+------+-----------------+
能夠看到都可以查到更新後的數據。
對數據執行 select * 加上過濾條件:
能夠看到表中有2個比較有意思的字段,分別爲:_hoodie_commit_time, _hoodie_commit_seqno
上文咱們提到過 Hudi 有一個核心爲時間軸,每次執行一個commit時,都會生成一個時間戳。這裏 _hoodie_commit_time 即記錄了commit 的時間戳。進一步的,Hudi 即是基於此實現了增量查詢。
下面咱們嘗試一下增量查詢:
// 獲取 commit 時間戳 val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_update_table order by commitTime").map(k => k.getString(0)).take(3) // 設置起始時間戳爲上次時間戳 val beginTime = commits(commits.length - 2) // 增量查詢 val incViewDF = spark. read. format("org.apache.hudi"). option(VIEW_TYPE_OPT_KEY, VIEW_TYPE_INCREMENTAL_OPT_VAL). option(BEGIN_INSTANTTIME_OPT_KEY, beginTime). load(basePath);
incViewDF.registerTempTable("hudi_incr_table") spark.sql("select * from hudi_incr_table where c_class='Stream' and id=539931").show()
這裏咱們使用增量查詢的選項 VIEW_TYPE_INCREMENTAL_OPT_VAL,以及設置了時間戳的起始時間。查詢結果爲:
能夠看到查詢到的數據僅爲上次commit 後的數據。
固然,咱們也能夠指定時間段內的數據進行查詢,指定下面選項便可:
option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
option(END_INSTANTTIME_OPT_KEY, endTime).
Hudi CLI
最後咱們看下一下 Hudi CLI
// 啓動 hudi cli:
/usr/lib/hudi/cli/bin/hudi-cli.sh
// 鏈接hudi 數據表
connect --path s3://xxxx/hudi/hudi_table
接下來咱們能夠查看提交過的 commit:
甚至回滾 commit:
commit rollback --commit 20191122073858
回滾後再次對 hive 表執行查詢:
spark.sql("select id,name from hivehudi where c_class='Stream' and id=539931").show()
+------+-----------------+
| id| name|
+------+-----------------+
|539931|Tiger Point Gully|
+------+-----------------+
能夠看到以前更新的數據已經被刪除。
在 Hudi Cli 下,咱們也能夠建立表(create)、列出commit時文件級別的信息(commit showfiles)等。更多 Hudi cli 的用法,能夠在 Hudi Cli 下輸入 help 獲取更多信息。
References:
Apache Hudi 官方介紹:https://hudi.apache.org/index.html
Apache Hudi Quick Start:https://hudi.apache.org/quickstart.html
Apache Hudi CLI: https://hudi.apache.org/admin_guide.html
原文出處:https://www.cnblogs.com/zackstang/p/11912994.html