解鎖Apache Hudi刪除記錄新姿式

1. 引入

在0.5.1版本以前,用戶若想刪除某條記錄,可使用Spark DataSource,並將DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY設置爲EmptyHoodieRecordPayload.class.getName,即可刪除指定記錄,在Hudi新發布的0.5.1版本,可不使用上述配置項刪除記錄,而提供三種方式刪除記錄:Hudi APISpark DataSourceDeltaStreamer,下面逐一介紹如何使用。sql

2. 步驟

2.1 使用Hudi API

若是應用程序中已經內嵌了HoodieWriteClient,能夠直接使用HoodieWriteClient以下API刪除記錄shell

/**
* Deletes a list of {@link HoodieKey}s from the Hoodie table, at the supplied commitTime {@link HoodieKey}s will be
* deduped and non existant keys will be removed before deleting.
*
* @param keys {@link List} of {@link HoodieKey}s to be deleted
* @param commitTime Commit time handle
* @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts
*/
public JavaRDD<WriteStatus> delete(JavaRDD<HoodieKey> keys, final String commitTime);apache

2.2 使用DataSource

介紹如何使用Datasource API對示例數據集執行刪除的示例。與快速入門中的示例相同。json

1 啓動spark-shell

bin/spark-shell --packages org.apache.hudi:hudi-spark-bundle:0.5.1-incubating,org.apache.spark:spark-avro_2.11:2.4.4 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'ide

2 導入必要的Import

import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._ui

val tableName = "hudi_cow_table"
val basePath = "file:///tmp/hudi_cow_table"
val dataGen = new DataGeneratorspa

3 插入數據

val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.write.format("org.apache.hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
mode(Overwrite).
save(basePath);.net

4 查詢數據

val roViewDF = spark.
read.
format("org.apache.hudi").
load(basePath + "////")
roViewDF.createOrReplaceTempView("hudi_ro_table")
spark.sql("select count(*) from hudi_ro_table").show() // should return 10 (number of records inserted above)
val riderValue = spark.sql("select distinct rider from hudi_ro_table").show()
// copy the value displayed to be used in next stepscala

5 準備待刪除數據集

首先經過查詢準備好待刪除的數據集code

val df = spark.sql("select uuid, partitionPath from hudi_ro_table where rider = 'rider-213'")

6. 刪除數據

val deletes = dataGen.generateDeletes(df.collectAsList())
val df = spark.read.json(spark.sparkContext.parallelize(deletes, 2));
df.write.format("org.apache.hudi").
options(getQuickstartWriteConfigs).
option(OPERATION_OPT_KEY,"delete").
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath);

7. 驗證

從新加載表記錄,驗證記錄是否被刪除

val roViewDFAfterDelete = spark.
read.
format("org.apache.hudi").
load(basePath + "////")
roViewDFAfterDelete.createOrReplaceTempView("hudi_ro_table")
spark.sql("select uuid, partitionPath from hudi_ro_table where rider = 'rider-213'").show() // should not return any rows

2.3 使用DeltaStreamer

使用HoodieDeltaStreamer進行刪除與upsert相同,它依賴每一個記錄中名爲「hoodie_is_deleted」的boolean類型的特定字段。

  • 若是記錄的字段值設置爲false或不存在,則將其視爲常規upsert。

  • 若是不是(若是該值設置爲true),則將其視爲已刪除記錄。

這意味着必須更改數據源的schema來添加此字段,而且全部傳入記錄都應設置此字段值,在將來的版本中咱們將盡可能放開這點。

如原始數據源的schema以下。

{
"type":"record",
"name":"example_tbl",
"fields":[{
"name": "uuid",
"type": "String"
}, {
"name": "ts",
"type": "string"
}, {
"name": "partitionPath",
"type": "string"
}, {
"name": "rank",
"type": "long"
}
]}

那麼要利用DeltaStreamer的刪除功能,必須更改schema以下。

{
"type":"record",
"name":"example_tbl",
"fields":[{
"name": "uuid",
"type": "String"
}, {
"name": "ts",
"type": "string"
}, {
"name": "partitionPath",
"type": "string"
}, {
"name": "rank",
"type": "long"
}, {
"name" : "_hoodie_is_deleted",
"type" : "boolean",
"default" : false
}
]}

upsert傳入記錄示例數據以下

{"ts": 0.0, "uuid": "69cdb048-c93e-4532-adf9-f61ce6afe605", "rank": 1034, "partitionpath": "americas/brazil/sao_paulo", "_hoodie_is_deleted" : false}

delete傳入記錄示例數據以下

{"ts": 0.0, "uuid": "19tdb048-c93e-4532-adf9-f61ce6afe10", "rank": 1045, "partitionpath": "americas/brazil/sao_paulo", "_hoodie_is_deleted" : true}

只須要進行一次性的變動,DeltasDreamer將處理每批中的upsert和delete,而且每一批均可以包含upsert和deletes的混合,以後不須要額外的步驟或更改。

3. 總結

在Hudi 0.5.1-incubating版本中引入了額外三種刪除記錄的能力,用戶可以使用上述任意一種方案來達到刪除記錄的目的。

相關文章
相關標籤/搜索