寫入Apache Hudi數據集

這一節咱們將介紹使用DeltaStreamer工具從外部源甚至其餘Hudi數據集攝取新更改的方法,
以及經過使用Hudi數據源的upserts加快大型Spark做業的方法。
對於此類數據集,咱們可使用各類查詢引擎查詢它們。html

寫操做

在此以前,瞭解Hudi數據源及delta streamer工具提供的三種不一樣的寫操做以及如何最佳利用它們可能會有所幫助。
這些操做能夠在針對數據集發出的每一個提交/增量提交中進行選擇/更改。java

  • UPSERT(插入更新) :這是默認操做,在該操做中,經過查找索引,首先將輸入記錄標記爲插入或更新。
    在運行啓發式方法以肯定如何最好地將這些記錄放到存儲上,如優化文件大小之類後,這些記錄最終會被寫入。
    對於諸如數據庫更改捕獲之類的用例,建議該操做,由於輸入幾乎確定包含更新。
  • INSERT(插入) :就使用啓發式方法肯定文件大小而言,此操做與插入更新(UPSERT)很是類似,但此操做徹底跳過了索引查找步驟。
    所以,對於日誌重複數據刪除等用例(結合下面提到的過濾重複項的選項),它能夠比插入更新快得多。
    插入也適用於這種用例,這種狀況數據集能夠容許重複項,但只須要Hudi的事務寫/增量提取/存儲管理功能。
  • BULK_INSERT(批插入) :插入更新和插入操做都將輸入記錄保存在內存中,以加快存儲優化啓發式計算的速度(以及其它未說起的方面)。
    因此對Hudi數據集進行初始加載/引導時這兩種操做會很低效。批量插入提供與插入相同的語義,但同時實現了基於排序的數據寫入算法,
    該算法能夠很好地擴展數百TB的初始負載。可是,相比於插入和插入更新能保證文件大小,批插入在調整文件大小上只能盡力而爲。

DeltaStreamer

HoodieDeltaStreamer實用工具 (hudi-utilities-bundle中的一部分) 提供了從DFS或Kafka等不一樣來源進行攝取的方式,並具備如下功能。git

  • 從Kafka單次攝取新事件,從Sqoop、HiveIncrementalPuller輸出或DFS文件夾中的多個文件
    增量導入
  • 支持json、avro或自定義記錄類型的傳入數據
  • 管理檢查點,回滾和恢復
  • 利用DFS或Confluent schema註冊表的Avro模式。
  • 支持自定義轉換操做

命令行選項更詳細地描述了這些功能:github

[hoodie]$ spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer `ls packaging/hudi-utilities-bundle/target/hudi-utilities-bundle-*.jar` --help
Usage: <main class> [options]
  Options:
    --commit-on-errors
        Commit even when some records failed to be written
      Default: false
    --enable-hive-sync
          Enable syncing to hive
       Default: false
    --filter-dupes
          Should duplicate records from source be dropped/filtered outbefore 
          insert/bulk-insert 
      Default: false
    --help, -h
    --hudi-conf
          Any configuration that can be set in the properties file (using the CLI 
          parameter "--propsFilePath") can also be passed command line using this 
          parameter 
          Default: []
    --op
      Takes one of these values : UPSERT (default), INSERT (use when input is
      purely new data/inserts to gain speed)
      Default: UPSERT
      Possible Values: [UPSERT, INSERT, BULK_INSERT]
    --payload-class
      subclass of HoodieRecordPayload, that works off a GenericRecord.
      Implement your own, if you want to do something other than overwriting
      existing value
      Default: org.apache.hudi.OverwriteWithLatestAvroPayload
    --props
      path to properties file on localfs or dfs, with configurations for
      Hudi client, schema provider, key generator and data source. For
      Hudi client props, sane defaults are used, but recommend use to
      provide basic things like metrics endpoints, hive configs etc. For
      sources, referto individual classes, for supported properties.
      Default: file:///Users/vinoth/bin/hoodie/src/test/resources/delta-streamer-config/dfs-source.properties
    --schemaprovider-class
      subclass of org.apache.hudi.utilities.schema.SchemaProvider to attach
      schemas to input & target table data, built in options:
      FilebasedSchemaProvider
      Default: org.apache.hudi.utilities.schema.FilebasedSchemaProvider
    --source-class
      Subclass of org.apache.hudi.utilities.sources to read data. Built-in
      options: org.apache.hudi.utilities.sources.{JsonDFSSource (default),
      AvroDFSSource, JsonKafkaSource, AvroKafkaSource, HiveIncrPullSource}
      Default: org.apache.hudi.utilities.sources.JsonDFSSource
    --source-limit
      Maximum amount of data to read from source. Default: No limit For e.g:
      DFSSource => max bytes to read, KafkaSource => max events to read
      Default: 9223372036854775807
    --source-ordering-field
      Field within source record to decide how to break ties between records
      with same key in input data. Default: 'ts' holding unix timestamp of
      record
      Default: ts
    --spark-master
      spark master to use.
      Default: local[2]
  * --target-base-path
      base path for the target Hudi dataset. (Will be created if did not
      exist first time around. If exists, expected to be a Hudi dataset)
  * --target-table
      name of the target table in Hive
    --transformer-class
      subclass of org.apache.hudi.utilities.transform.Transformer. UDF to
      transform raw source dataset to a target dataset (conforming to target
      schema) before writing. Default : Not set. E:g -
      org.apache.hudi.utilities.transform.SqlQueryBasedTransformer (which
      allows a SQL query template to be passed as a transformation function)

該工具採用層次結構組成的屬性文件,並具備可插拔的接口,用於提取數據、生成密鑰和提供模式。
從Kafka和DFS攝取數據的示例配置在這裏:hudi-utilities/src/test/resources/delta-streamer-config算法

例如:當您讓Confluent Kafka、Schema註冊表啓動並運行後,能夠用這個命令產生一些測試數據
impressions.avro
由schema-registry代碼庫提供)sql

[confluent-5.0.0]$ bin/ksql-datagen schema=../impressions.avro format=avro topic=impressions key=impressionid

而後用以下命令攝取這些數據。數據庫

[hoodie]$ spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer `ls packaging/hudi-utilities-bundle/target/hudi-utilities-bundle-*.jar` \
  --props file://${PWD}/hudi-utilities/src/test/resources/delta-streamer-config/kafka-source.properties \
  --schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider \
  --source-class org.apache.hudi.utilities.sources.AvroKafkaSource \
  --source-ordering-field impresssiontime \
  --target-base-path file:///tmp/hudi-deltastreamer-op --target-table uber.impressions \
  --op BULK_INSERT

在某些狀況下,您可能須要預先將現有數據集遷移到Hudi。 請參考遷移指南apache

Datasource Writer

hudi-spark模塊提供了DataSource API,能夠將任何數據幀寫入(也能夠讀取)到Hudi數據集中。
如下是在指定須要使用的字段名稱的以後,如何插入更新數據幀的方法,這些字段包括
recordKey => _row_keypartitionPath => partitionprecombineKey => timestampjson

inputDF.write()
       .format("org.apache.hudi")
       .options(clientOpts) // 能夠傳入任何Hudi客戶端參數
       .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key")
       .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "partition")
       .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY(), "timestamp")
       .option(HoodieWriteConfig.TABLE_NAME, tableName)
       .mode(SaveMode.Append)
       .save(basePath);

與Hive同步

上面的兩個工具都支持將數據集的最新模式同步到Hive Metastore,以便查詢新的列和分區。
若是須要從命令行或在獨立的JVM中運行它,Hudi提供了一個HiveSyncTool
在構建了hudi-hive模塊以後,能夠按如下方式調用它。ide

cd hudi-hive
./run_sync_tool.sh
 [hudi-hive]$ ./run_sync_tool.sh --help
Usage: <main class> [options]
  Options:
  * --base-path
       Basepath of Hudi dataset to sync
  * --database
       name of the target database in Hive
    --help, -h
       Default: false
  * --jdbc-url
       Hive jdbc connect url
  * --pass
       Hive password
  * --table
       name of the target table in Hive
  * --user
       Hive username

刪除數據

經過容許用戶指定不一樣的數據記錄負載實現,Hudi支持對存儲在Hudi數據集中的數據執行兩種類型的刪除。

  • Soft Deletes(軟刪除) :使用軟刪除時,用戶但願保留鍵,但僅使全部其餘字段的值都爲空。
    經過確保適當的字段在數據集模式中能夠爲空,並在將這些字段設置爲null以後直接向數據集插入更新這些記錄,便可輕鬆實現這一點。
  • Hard Deletes(硬刪除) :這種更強形式的刪除是從數據集中完全刪除記錄在存儲上的任何痕跡。
    這能夠經過觸發一個帶有自定義負載實現的插入更新來實現,這種實現可使用老是返回Optional.Empty做爲組合值的DataSource或DeltaStreamer。
    Hudi附帶了一個內置的org.apache.hudi.EmptyHoodieRecordPayload類,它就是實現了這一功能。
deleteDF // 僅包含要刪除的記錄的數據幀
   .write().format("org.apache.hudi")
   .option(...) // 根據設置須要添加HUDI參數,例如記錄鍵、分區路徑和其餘參數
   // 指定record_key,partition_key,precombine_fieldkey和常規參數
   .option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY, "org.apache.hudi.EmptyHoodieRecordPayload")

存儲管理

Hudi還對存儲在Hudi數據集中的數據執行幾個關鍵的存儲管理功能。在DFS上存儲數據的關鍵方面是管理文件大小和數量以及回收存儲空間。
例如,HDFS在處理小文件上性能不好,這會對Name Node的內存及RPC施加很大的壓力,並可能破壞整個集羣的穩定性。
一般,查詢引擎可在較大的列文件上提供更好的性能,由於它們能夠有效地攤銷得到列統計信息等的成本。
即便在某些雲數據存儲上,列出具備大量小文件的目錄也經常比較慢。

如下是一些有效管理Hudi數據集存儲的方法。

  • Hudi中的小文件處理功能,能夠分析傳入的工做負載並將插入內容分配到現有文件組中,
    而不是建立新文件組。新文件組會生成小文件。
  • 能夠配置Cleaner來清理較舊的文件片,清理的程度能夠調整,
    具體取決於查詢所需的最長時間和增量拉取所需的回溯。
  • 用戶還能夠調整基礎/parquet文件日誌文件的大小
    和預期的壓縮率,使足夠數量的插入被分到同一個文件組中,最終產生大小合適的基礎文件。
  • 智能調整批插入並行度,能夠產生大小合適的初始文件組。
    實際上,正確執行此操做很是關鍵,由於文件組一旦建立後就不能刪除,只能如前所述對其進行擴展。
  • 對於具備大量更新的工做負載,讀取時合併存儲提供了一種很好的機制,
    能夠快速將其攝取到較小的文件中,以後經過壓縮將它們合併爲較大的基礎文件。

相關文章
相關標籤/搜索