這一節咱們將介紹使用DeltaStreamer工具從外部源甚至其餘Hudi數據集攝取新更改的方法,
以及經過使用Hudi數據源的upserts加快大型Spark做業的方法。
對於此類數據集,咱們可使用各類查詢引擎查詢它們。html
在此以前,瞭解Hudi數據源及delta streamer工具提供的三種不一樣的寫操做以及如何最佳利用它們可能會有所幫助。
這些操做能夠在針對數據集發出的每一個提交/增量提交中進行選擇/更改。java
HoodieDeltaStreamer
實用工具 (hudi-utilities-bundle中的一部分) 提供了從DFS或Kafka等不一樣來源進行攝取的方式,並具備如下功能。git
命令行選項更詳細地描述了這些功能:github
[hoodie]$ spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer `ls packaging/hudi-utilities-bundle/target/hudi-utilities-bundle-*.jar` --help Usage: <main class> [options] Options: --commit-on-errors Commit even when some records failed to be written Default: false --enable-hive-sync Enable syncing to hive Default: false --filter-dupes Should duplicate records from source be dropped/filtered outbefore insert/bulk-insert Default: false --help, -h --hudi-conf Any configuration that can be set in the properties file (using the CLI parameter "--propsFilePath") can also be passed command line using this parameter Default: [] --op Takes one of these values : UPSERT (default), INSERT (use when input is purely new data/inserts to gain speed) Default: UPSERT Possible Values: [UPSERT, INSERT, BULK_INSERT] --payload-class subclass of HoodieRecordPayload, that works off a GenericRecord. Implement your own, if you want to do something other than overwriting existing value Default: org.apache.hudi.OverwriteWithLatestAvroPayload --props path to properties file on localfs or dfs, with configurations for Hudi client, schema provider, key generator and data source. For Hudi client props, sane defaults are used, but recommend use to provide basic things like metrics endpoints, hive configs etc. For sources, referto individual classes, for supported properties. Default: file:///Users/vinoth/bin/hoodie/src/test/resources/delta-streamer-config/dfs-source.properties --schemaprovider-class subclass of org.apache.hudi.utilities.schema.SchemaProvider to attach schemas to input & target table data, built in options: FilebasedSchemaProvider Default: org.apache.hudi.utilities.schema.FilebasedSchemaProvider --source-class Subclass of org.apache.hudi.utilities.sources to read data. Built-in options: org.apache.hudi.utilities.sources.{JsonDFSSource (default), AvroDFSSource, JsonKafkaSource, AvroKafkaSource, HiveIncrPullSource} Default: org.apache.hudi.utilities.sources.JsonDFSSource --source-limit Maximum amount of data to read from source. Default: No limit For e.g: DFSSource => max bytes to read, KafkaSource => max events to read Default: 9223372036854775807 --source-ordering-field Field within source record to decide how to break ties between records with same key in input data. Default: 'ts' holding unix timestamp of record Default: ts --spark-master spark master to use. Default: local[2] * --target-base-path base path for the target Hudi dataset. (Will be created if did not exist first time around. If exists, expected to be a Hudi dataset) * --target-table name of the target table in Hive --transformer-class subclass of org.apache.hudi.utilities.transform.Transformer. UDF to transform raw source dataset to a target dataset (conforming to target schema) before writing. Default : Not set. E:g - org.apache.hudi.utilities.transform.SqlQueryBasedTransformer (which allows a SQL query template to be passed as a transformation function)
該工具採用層次結構組成的屬性文件,並具備可插拔的接口,用於提取數據、生成密鑰和提供模式。
從Kafka和DFS攝取數據的示例配置在這裏:hudi-utilities/src/test/resources/delta-streamer-config
。算法
例如:當您讓Confluent Kafka、Schema註冊表啓動並運行後,能夠用這個命令產生一些測試數據
(impressions.avro,
由schema-registry代碼庫提供)sql
[confluent-5.0.0]$ bin/ksql-datagen schema=../impressions.avro format=avro topic=impressions key=impressionid
而後用以下命令攝取這些數據。數據庫
[hoodie]$ spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer `ls packaging/hudi-utilities-bundle/target/hudi-utilities-bundle-*.jar` \ --props file://${PWD}/hudi-utilities/src/test/resources/delta-streamer-config/kafka-source.properties \ --schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider \ --source-class org.apache.hudi.utilities.sources.AvroKafkaSource \ --source-ordering-field impresssiontime \ --target-base-path file:///tmp/hudi-deltastreamer-op --target-table uber.impressions \ --op BULK_INSERT
在某些狀況下,您可能須要預先將現有數據集遷移到Hudi。 請參考遷移指南。apache
hudi-spark
模塊提供了DataSource API,能夠將任何數據幀寫入(也能夠讀取)到Hudi數據集中。
如下是在指定須要使用的字段名稱的以後,如何插入更新數據幀的方法,這些字段包括
recordKey => _row_key
、partitionPath => partition
和precombineKey => timestamp
json
inputDF.write() .format("org.apache.hudi") .options(clientOpts) // 能夠傳入任何Hudi客戶端參數 .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key") .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "partition") .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY(), "timestamp") .option(HoodieWriteConfig.TABLE_NAME, tableName) .mode(SaveMode.Append) .save(basePath);
上面的兩個工具都支持將數據集的最新模式同步到Hive Metastore,以便查詢新的列和分區。
若是須要從命令行或在獨立的JVM中運行它,Hudi提供了一個HiveSyncTool
,
在構建了hudi-hive模塊以後,能夠按如下方式調用它。ide
cd hudi-hive ./run_sync_tool.sh [hudi-hive]$ ./run_sync_tool.sh --help Usage: <main class> [options] Options: * --base-path Basepath of Hudi dataset to sync * --database name of the target database in Hive --help, -h Default: false * --jdbc-url Hive jdbc connect url * --pass Hive password * --table name of the target table in Hive * --user Hive username
經過容許用戶指定不一樣的數據記錄負載實現,Hudi支持對存儲在Hudi數據集中的數據執行兩種類型的刪除。
org.apache.hudi.EmptyHoodieRecordPayload
類,它就是實現了這一功能。deleteDF // 僅包含要刪除的記錄的數據幀 .write().format("org.apache.hudi") .option(...) // 根據設置須要添加HUDI參數,例如記錄鍵、分區路徑和其餘參數 // 指定record_key,partition_key,precombine_fieldkey和常規參數 .option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY, "org.apache.hudi.EmptyHoodieRecordPayload")
Hudi還對存儲在Hudi數據集中的數據執行幾個關鍵的存儲管理功能。在DFS上存儲數據的關鍵方面是管理文件大小和數量以及回收存儲空間。
例如,HDFS在處理小文件上性能不好,這會對Name Node的內存及RPC施加很大的壓力,並可能破壞整個集羣的穩定性。
一般,查詢引擎可在較大的列文件上提供更好的性能,由於它們能夠有效地攤銷得到列統計信息等的成本。
即便在某些雲數據存儲上,列出具備大量小文件的目錄也經常比較慢。
如下是一些有效管理Hudi數據集存儲的方法。