Apache Hudi + AWS S3 + Athena實戰

Apache Hudi在阿里巴巴集團、EMIS Health,LinkNovate,Tathastu.AI,騰訊,Uber內使用,而且由Amazon AWS EMR和Google雲平臺支持,最近Amazon Athena支持了在Amazon S3上查詢Apache Hudi數據集的能力,本博客將測試Athena查詢S3上Hudi格式數據集。git

1. 準備-Spark環境,S3 Bucket

須要使用Spark寫入Hudi數據,登錄Amazon EMR並啓動spark-shell:github

$ export SCALA_VERSION=2.12
$ export SPARK_VERSION=2.4.4
$ spark-shell \
--packages org.apache.hudi:hudi-spark-bundle_${SCALA_VERSION}:0.5.3,org.apache.spark:spark-avro_${SCALA_VERSION}:${SPARK_VERSION}\
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
...
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.4
      /_/
         
Using Scala version 2.12.10 (OpenJDK 64-Bit Server VM, Java 1.8.0_242)
Type in expressions to have them evaluated.
Type :help for more information.
scala>

接着使用以下scala代碼設置表名,基礎路徑以及數據生成器來生成數據。這裏設置basepaths3://hudi_athena_test/hudi_trips,以便後面進行查詢sql

import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
val tableName = "hudi_trips"
val basePath = "s3://hudi_athena_test/hudi_trips"
val dataGen = new DataGenerator

2. 插入數據

生成新的行程數據,導入DataFrame,並將其寫入Hudi表shell

val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.write.format("hudi").
  options(getQuickstartWriteConfigs).
  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
  option(TABLE_NAME, tableName).
  mode(Overwrite).
  save(basePath)

3. 建立Athena數據庫/表

Hudi內置表分區支持,因此在建立表後須要添加分區,安裝athenareader工具,其提供Athena多個查詢和其餘有用的特性。數據庫

go get -u github.com/uber/athenadriver/athenareader

接着建立hudi_athena_test.sql文件,內容以下express

DROP DATABASE IF EXISTS hudi_athena_test CASCADE;
create database hudi_athena_test;
CREATE EXTERNAL TABLE `trips`(
  `begin_lat` double,
  `begin_lon` double,
  `driver` string,
  `end_lat` double,
  `end_lon` double,
  `fare` double,
  `rider` string,
  `ts` double,
  `uuid` string
) PARTITIONED BY (`partitionpath` string) 
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT 'org.apache.hudi.hadoop.HoodieParquetInputFormat' 
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' 
LOCATION 's3://hudi_athena_test/hudi_trips'
ALTER TABLE trips ADD
  PARTITION (partitionpath = 'americas/united_states/san_francisco') LOCATION 's3://hudi_athena_test/hudi_trips/americas/united_states/san_francisco'
  PARTITION (partitionpath = 'americas/brazil/sao_paulo') LOCATION 's3://hudi_athena_test/hudi_trips/americas/brazil/sao_paulo'
  PARTITION (partitionpath = 'asia/india/chennai') LOCATION 's3://hudi_athena_test/hudi_trips/asia/india/chennai'

使用以下命令運行SQL語句apache

$ athenareader -q hudi_athena_test.sql

4. 使用Athena查詢Hudi

若是沒有錯誤,那麼說明庫和表在Athena中都已建立好,所以能夠在Athena中查詢Hudi數據集,使用athenareader查詢結果以下json

athenareader -q "select * from trips" -o markdown

也能夠帶條件進行查詢markdown

athenareader -q "select fare,rider from trips where fare>20" -o markdown

5. 更新Hudi表再次查詢

Hudi支持S3中的數據,回到spark-shell並使用以下命令更新部分數據ide

val updates = convertToStringList(dataGen.generateUpdates(10))
val df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
df.write.format("hudi").
  options(getQuickstartWriteConfigs).
  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
  option(TABLE_NAME, tableName).
  mode(Append).
  save(basePath)

運行完成後,使用athenareader再次查詢

athenareader -q "select * from trips" -o markdown

能夠看到數據已經更新了

6. 限制

Athena不支持查詢快照或增量查詢,Hive/SparkSQL支持,爲進行驗證,經過spark-shell建立一個快照

spark.
  read.
  format("hudi").
  load(basePath + "/*/*/*/*").
  createOrReplaceTempView("hudi_trips_snapshot")

使用以下代碼查詢

val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_trips_snapshot order by commitTime").map(k => k.getString(0)).take(50)
val beginTime = commits(commits.length - 2)

使用Athena查詢將會失敗,由於沒有物化

$ athenareader -q "select distinct(_hoodie_commit_time) as commitTime from hudi_trips_snapshot order by commitTime"
SYNTAX_ERROR: line 1:57: Table awsdatacatalog.hudi_athena_test.hudi_trips_snapshot does not exist

根據官方文檔,Athena支持查詢Hudi數據集的Read-Optimized視圖,同時,咱們能夠經過Athena來建立視圖並進行查詢,使用Athena在Hudi表上建立一個視圖

$ athenareader -q "create view fare_greater_than_40 as select * from trips where fare>40" -a

查詢視圖

$ athenareader -q "select fare,rider from fare_greater_than_40"
 FARE                RIDER     
 43.4923811219014    rider-213 
 63.72504913279929   rider-284 
 90.25710109008239   rider-284 
 93.56018115236618   rider-213 
 49.527694252432056  rider-284 
 90.9053809533154    rider-284 
 98.3428192817987    rider-284
相關文章
相關標籤/搜索