wget https://github.com/apache/hudi/archive/release-0.5.2-incubating.tar.gztar zxvf release-0.5.2-incubating.tar.gzcd release-0.5.2-incubatingmvn clean package -DskipTests -DskipITscp ./hudi-hadoop-mr/target/hudi-hadoop-mr-0.5.2-incubating.jar $HIVE_HOME/lib/
拷貝依賴包到 Hive 路徑是爲了 Hive 可以正常讀到 Hudi 的數據,至此服務器環境準備完畢,環境的初始化仍是比較簡單的。php
CREATE TABLE ods.ods_user_event( uuid STRING, name STRING, addr STRING, update_time STRING, date STRING)stored as parquet;
而後是 Maven 的依賴,詳細代碼後臺回覆 hudi 後便可獲取。
<dependency> <groupId>org.apache.hudi</groupId> <artifactId>hudi-spark_2.11</artifactId> <version>0.5.2-incubating</version> </dependency> <dependency> <groupId>org.apache.hudi</groupId> <artifactId>hudi-common</artifactId> <version>0.5.2-incubating</version> </dependency>
-
初始化 SparkSession,配置相關配置項 -
構建 DataFrame,你們能夠自由發揮,這裏的案例是從Hive讀數據構建。 -
DataFrame寫入Hudi,這一塊說到底就是把數據寫入 HDFS 路徑下,可是須要一堆配置,這些配置就體現了 Hudi 的特性:
def main(args: Array[String]): Unit = { val sss = SparkSession.builder.appName("hudi") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("hive.metastore.uris", "thrift://ip:port") .enableHiveSupport().getOrCreate()
val sql = "select * from ods.ods_user_event" val df: DataFrame = sss.sql(sql)
df.write.format("org.apache.hudi") .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "recordKey") .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "update_time") .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "date") .option(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH, "true") .option(HoodieIndexConfig.INDEX_TYPE_PROP, HoodieIndex.IndexType.GLOBAL_BLOOM.name()) .option("hoodie.insert.shuffle.parallelism", "10") .option("hoodie.upsert.shuffle.parallelism", "10") .option(HoodieWriteConfig.TABLE_NAME, "ods.ods_user_event_hudi") .mode(SaveMode.Append) .save("/user/hudi/lake/ods.db/ods_user_event_hudi") }
[ ]Found 4 itemsdrwxr-xr-x - hadoop hadoop 0 2020-05-25 18:42 /user/hudi/lake/ods.db/ods_user_event_hudi/20200501drwxr-xr-x - hadoop hadoop 0 2020-05-25 18:42 /user/hudi/lake/ods.db/ods_user_event_hudi/20200502drwxr-xr-x - hadoop hadoop 0 2020-05-25 18:42 /user/hudi/lake/ods.db/ods_user_event_hudi/20200503drwxr-xr-x - hadoop hadoop 0 2020-05-25 18:42 /user/hudi/lake/ods.db/ods_user_event_hudi/20200504
Caused by: org.apache.hudi.exception.HoodieKeyException: recordKey value: "null" for field: "user_uid" cannot be null or empty.
CREATE TABLE ods.ods_user_event_hudi( uuid STRING, name STRING, addr STRING, update_time STRING, date STRING)PARTITIONED BY ( `dt` string)ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'STORED AS INPUTFORMAT 'org.apache.hudi.hadoop.HoodieParquetInputFormat'OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'LOCATION '/user/hudi/lake/ods.db/ods_user_event_hudi'
alter table ods.ods_user_event_hudi add if not exists partition(dt='20200504') location '/user/hudi/lake/ods.db/ods_user_event_hudi/20200504'
抱歉我也沒發現更好的辦法,只能送你個簡單的腳本了。
start_date=20190101end_date=20200520start=`date -d "$start_date" "+%s"`end=`date -d "$end_date" "+%s"`for((i=start;i<=end;i+=86400)); do dt=$(date -d "@$i" "+%Y%m%d") hive -e "alter table ods.ods_user_event_hudi add if not exists partition(dt='${dt}') location '/user/hudi/lake/ods.db/ods_user_event_hudi/${dt}'; "done
本文分享自微信公衆號 - 老懞大數據(simon_bigdata)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。git