實戰 | 將Kafka流式數據攝取至Hudi

1. 引入

Hudi支持如下存儲數據的視圖sql

  • 讀優化視圖 : 在此視圖上的查詢將查看給定提交或壓縮操做中數據集的最新快照。該視圖僅將最新parquet文件暴露給查詢,因此它有可能看不到最新的數據,並保證與非Hudi列式數據集相比,具備相同的列式查詢性能shell

  • 增量視圖 : 對該視圖的查詢只能看到從某個提交/壓縮後寫入數據集的新數據。該視圖有效地提供了更改流,來支持增量數據管道。apache

  • 實時視圖 : 在此視圖上的查詢將查看某個增量提交操做中數據集的最新快照。該視圖經過動態合併最新的基本文件和增量文件來提供近實時數據集。bootstrap

使用Hudi自帶的DeltaStreamer工具寫數據到Hudi,開啓--enable-hive-sync 便可同步數據到hive表。微信

2. 步驟

2.1 DeltaStreamer啓動命令

  
    
  
  
   
   
            
   
   
  1. app

  2. curl

  3. ide

  4. 工具

  5. oop

spark-submit --master yarn \ --driver-memory 1G \ --num-executors 2 \ --executor-memory 1G \ --executor-cores 4 \ --deploy-mode cluster \ --conf spark.yarn.executor.memoryOverhead=512 \ --conf spark.yarn.driver.memoryOverhead=512 \ --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer `ls /.../hudi-utilities-bundle_2.11-0.5.2-SNAPSHOT.jar` \ --props hdfs://../kafka.properties \ --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \ --source-class org.apache.hudi.utilities.sources.JsonKafkaSource \ --target-base-path hdfs://../business \ --op UPSERT \ --target-table business \ '這裏其實並非hive表的名稱,實際表名是在kafka.properties中配置' --enable-hive-sync \ '開啓同步至hive' --table-type MERGE_ON_READ \ --source-ordering-field create_time \ --source-limit 5000000

2.2 kafka.properties配置實例

  
    
  
  
   
   
            
   
   
hoodie.upsert.shuffle.parallelism=2hoodie.insert.shuffle.parallelism=2hoodie.bulkinsert.shuffle.parallelism=2hoodie.datasource.write.recordkey.field=uuidhoodie.datasource.write.partitionpath.field=create_timehoodie.datasource.write.precombine.field=update_timehoodie.deltastreamer.schemaprovider.source.schema.file=hdfs://../t_business.avschoodie.deltastreamer.schemaprovider.target.schema.file=hdfs://../t3_trip.t_business.avschoodie.deltastreamer.source.kafka.topic=t_business_topicgroup.id=t_business_groupbootstrap.servers=localhostauto.offset.reset=latesthoodie.parquet.max.file.size=134217728hoodie.datasource.write.keygenerator.class=org.apache.hudi.utilities.keygen.TimestampBasedKeyGeneratorhoodie.deltastreamer.keygen.timebased.timestamp.type=DATE_STRINGhoodie.deltastreamer.keygen.timebased.input.dateformat=yyyy-MM-dd HH:mm:sshoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/ddhoodie.datasource.hive_sync.database=dwdhoodie.datasource.hive_sync.table=testhoodie.datasource.hive_sync.username=用戶名hoodie.datasource.hive_sync.password=密碼hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://.....hoodie.datasource.hive_sync.partition_fields=分區字段

3. 不一樣模式

3.1 MOR模式

若是使用MOR模式寫入數據會在Hive的dwd庫下面生成兩張表。分別是testro 和 testrt testrt表支持:快照視圖和增量視圖查詢 testro表支持:讀優化視圖查詢

3.1.1 使用Spark查詢

  
    
  
  
   
   
            
   
   

spark-shell --master yarn \--driver-memory 1G \--num-executors 1 \--executor-memory 1G \--executor-cores 1 \--jars /home/t3cx/apps/hudi/hudi-spark-bundle_2.11-0.5.2-SNAPSHOT.jar \--conf spark.sql.hive.convertMetastoreParquet=false '在進行快照視圖查詢的時候須要添加此配置'#快照視圖spark.sql("select count(*) from dwd.test_rt").show() #讀優化視圖spark.sql("select count(*) from dwd.test_ro").show() #增量視圖saprk sql不支持

3.1.2 使用Hive查詢

  
    
  
  
   
   
            
   
   


beeline -u jdbc:hive2://incubator-t3-infra04:10000 -n t3cx -p t3cx \ --hiveconf hive.stats.autogather=false \ #讀優化查詢 select * from dwd.test_ro; #快照查詢 select * from dwd.test_rt; #增量查詢 set hive.input.format=org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat; set hoodie.test.consume.mode=INCREMENTAL; set hoodie.test.consume.max.commits=3; set hoodie.test.consume.start.timestamp=20200427114546; select count(*) from dwd.test_rt where `_hoodie_commit_time` > '20200427114546'; #注意:#一、hudi中parquet作了shaded,我在測試中發現(CDH6.3.0)下必須加載hudi-hadoop-mr中的parquet-avro包才行,clouder用戶須要必需要從新安裝mr所須要的jar#二、set hive.input.format=org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat 最好顯示設置,不然有可能在某種狀況下沒法加載到hive.input.formate,即使在create-table的時候已經指定

3.2 COW模式

若是使用COW模式寫入數據,會在Hive的dwd庫下面生成一張表,test test表支持:快照視圖和增量視圖

3.2.1 使用Spark查詢

  
    
  
  
   
   
            
   
   

spark-shell --master yarn \--driver-memory 1G \--num-executors 1 \--executor-memory 1G \--executor-cores 1 \--jars /home/t3cx/apps/hudi/hudi-spark-bundle_2.11-0.5.2-SNAPSHOT.jar \--conf spark.sql.hive.convertMetastoreParquet=false#快照視圖spark.sql("select count(*) from dwd.test").show()
  
    
  
  
   
   
            
   
   
//增量視圖 無需遍歷所有數據,便可獲取時間大於20200426140637的數據import org.apache.hudi.DataSourceReadOptionsval hoodieIncViewDF = spark.read.format("org.apache.hudi").option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL).option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "20200426140637").load("hdfs://..../t3_trip_t_business15")spark.sql("select count(*) from dwd.test_rt where _hoodie_commit_time>'20200426140637'").show()

3.2.2 使用Hive查詢

  
    
  
  
   
   
            
   
   

beeline -u jdbc:hive2://incubator-t3-infra04:10000 -n t3cx -p t3cx \ --hiveconf hive.stats.autogather=false \ #快照查詢 select count(*) from dwd.test; #增量查詢 set hoodie.test.consume.mode=INCREMENTAL; set hoodie.test.consume.max.commits=3; set hoodie.test.consume.start.timestamp=20200427114546; select count(*) from dwd.test where `_hoodie_commit_time` > '20200427114546';

4. 總結

DeltaStreamer是Hudi提供的很是實用的工具,經過DeltaStreamer能夠將Kafka、DFS上的數據導入Hudi,而本篇博文主要講解了如何使用DeltaStreamer將數據從Kafka導入Hudi,並演示瞭如何使用Spark和Hive查詢Hudi數據。


本文分享自微信公衆號 - ApacheHudi(ApacheHudi)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。

相關文章
相關標籤/搜索