Apache Hudi 編譯測試

測試環境:Ubuntu 16.04
Hudi版本:0.5.2
Spark版本:2.4.0

下載編譯測試

  • 下載
    git clone https://github.com/apache/incubator-hudi.git && cd incubator-hudi
  • 編譯
    mvn clean package -Dmaven.test.skip=true
    在這裏插入圖片描述

  • Hudi CLI測試
    在這裏插入圖片描述

寫入查詢測試

  • Spark shell啓動html

    ./spark-shell --packages org.apache.hudi:hudi-spark-bundle_2.11:0.5.1-incubating,org.apache.spark:spark-avro_2.11:2.4.4 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'

    在這裏插入圖片描述

  • 插入數據測試java

    import org.apache.hudi.QuickstartUtils._
    import scala.collection.JavaConversions._
    import org.apache.spark.sql.SaveMode._
    import org.apache.hudi.DataSourceReadOptions._
    import org.apache.hudi.DataSourceWriteOptions._
    import org.apache.hudi.config.HoodieWriteConfig._
    
    val tableName = "hudi_trips_cow"
    val basePath = "file:///tmp/hudi_trips_cow"
    val dataGen = new DataGenerator
    val inserts = convertToStringList(dataGen.generateInserts(10))
    val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
    df.write.format("hudi").
      options(getQuickstartWriteConfigs).
      option(PRECOMBINE_FIELD_OPT_KEY, "ts").
      option(RECORDKEY_FIELD_OPT_KEY, "uuid").
      option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
      option(TABLE_NAME, tableName).
      mode(Overwrite).
      save(basePath)

    在這裏插入圖片描述

  • 讀取數據測試git

    val tripsSnapshotDF = spark.
      read.
      format("hudi").
      load(basePath + "/*/*/*/*")
    tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
    
    spark.sql("select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0").show()
    spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from hudi_trips_snapshot").show()

    在這裏插入圖片描述

  • 更新測試github

    val updates = convertToStringList(dataGen.generateUpdates(10))
    val df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
    df.write.format("hudi").
      options(getQuickstartWriteConfigs).
      option(PRECOMBINE_FIELD_OPT_KEY, "ts").
      option(RECORDKEY_FIELD_OPT_KEY, "uuid").
      option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
      option(TABLE_NAME, tableName).
      mode(Append).
      save(basePath)

    更新前數據:
    在這裏插入圖片描述
    更新後數據:
    在這裏插入圖片描述除了_hoodie_record_key ,其它字段都發生了更新


    sql

  • 增量查詢測試shell

    spark.
      read.
      format("hudi").
      load(basePath + "/*/*/*/*").
      createOrReplaceTempView("hudi_trips_snapshot")
    
    val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_trips_snapshot order by commitTime").map(k => k.getString(0)).take(10)
    val beginTime = commits(commits.length - 2) // commit time we are interested in
    
    val tripsIncrementalDF = spark.read.format("hudi").
      option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).
      option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
      load(basePath)
    tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")

    先新插入1批數據
    在這裏插入圖片描述
    增量查詢
    在這裏插入圖片描述
    查詢全量數據
    在這裏插入圖片描述這裏增量表只能查到最新插入的數據




    apache

  • 查詢指定時間段的數據json

    val beginTime = "000" // Represents all commits > this time.
    val endTime = commits(commits.length - 2) // commit time we are interested in
    
    //incrementally query data
    val tripsPointInTimeDF = spark.read.format("hudi").
      option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).
      option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
      option(END_INSTANTTIME_OPT_KEY, endTime).
      load(basePath)
    tripsPointInTimeDF.createOrReplaceTempView("hudi_trips_point_in_time")
    spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_point_in_time where fare > 20.0").show()

    在這裏插入圖片描述

  • 刪除數據bash

    // fetch total records count
    spark.sql("select uuid, partitionPath from hudi_trips_snapshot").count()
    // fetch two records to be deleted
    val ds = spark.sql("select uuid, partitionPath from hudi_trips_snapshot").limit(2)
    
    // issue deletes
    val deletes = dataGen.generateDeletes(ds.collectAsList())
    val df = spark.read.json(spark.sparkContext.parallelize(deletes, 2));
    df.write.format("hudi").
      options(getQuickstartWriteConfigs).
      option(OPERATION_OPT_KEY,"delete").
      option(PRECOMBINE_FIELD_OPT_KEY, "ts").
      option(RECORDKEY_FIELD_OPT_KEY, "uuid").
      option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
      option(TABLE_NAME, tableName).
      mode(Append).
      save(basePath)
    
    // run the same read query as above.
    val roAfterDeleteViewDF = spark.
      read.
      format("hudi").
      load(basePath + "/*/*/*/*")
    roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot")
    // fetch should return (total - 2) records
    spark.sql("select uuid, partitionPath from hudi_trips_snapshot").count()

    在這裏插入圖片描述

https://hudi.apache.org/docs/quick-start-guide.htmlmaven

相關文章
相關標籤/搜索