Hive 集成 Hudi 實踐(含代碼)| 數據湖系列據湖系列


公衆號後臺愈來愈多人問關於數據湖相關的內容,看來你們對新技術仍是很感興趣的。關於數據湖的資料網絡上仍是比較少的,特別是實踐系列,對於新技術來講,基礎的入門文檔仍是頗有必要的,因此這一篇但願可以幫助到想使用Hudi的同窗入門。

本篇的Hudi使用的是孵化版本 0.5.2;其餘依賴 Spark-2.4.4,Hive-1.1.0

Hudi 服務器環境準備
    
      
    
    
     
     
              
     
     
 
      
wget https://github.com/apache/hudi/archive/release-0.5.2-incubating.tar.gztar zxvf release-0.5.2-incubating.tar.gzcd release-0.5.2-incubatingmvn clean package -DskipTests -DskipITscp ./hudi-hadoop-mr/target/hudi-hadoop-mr-0.5.2-incubating.jar $HIVE_HOME/lib/

拷貝依賴包到 Hive 路徑是爲了 Hive 可以正常讀到 Hudi 的數據,至此服務器環境準備完畢,環境的初始化仍是比較簡單的。php


用 Spark 寫一段數據
一切準備完畢先寫一段數據到 Hudi 裏,首先數據源 ods.ods_user_event 的表結構爲:
  
    
  
  
   
   
            
   
   

 
    

而後是 Maven 的依賴,詳細代碼後臺回覆 hudi 後便可獲取。
CREATE TABLE ods.ods_user_event( uuid STRING, name STRING, addr STRING, update_time STRING, date STRING)stored as parquet;
  
    
  
  
   
   
            
   
   

 
    
<dependency> <groupId>org.apache.hudi</groupId> <artifactId>hudi-spark_2.11</artifactId> <version>0.5.2-incubating</version> </dependency> <dependency> <groupId>org.apache.hudi</groupId> <artifactId>hudi-common</artifactId> <version>0.5.2-incubating</version> </dependency>
代碼邏輯:
  1. 初始化 SparkSession,配置相關配置項
  2. 構建 DataFrame,你們能夠自由發揮,這裏的案例是從Hive讀數據構建。
  3. DataFrame寫入Hudi,這一塊說到底就是把數據寫入 HDFS 路徑下,可是須要一堆配置,這些配置就體現了 Hudi 的特性:
  4. DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY :指定惟一id的列名
    DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY:指定更新時間,該字段數值大的數據會覆蓋小的
    DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY :指定分區列,和Hive的分區概念相似
    HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH :設置當分區變動時,當前數據的分區目錄是否變動
    HoodieIndexConfig.INDEX_TYPE_PROP :設置索引類型目前有 HBASE,INMEMORY,BLOOM,GLOBAL_BLOOM 四種索引。例子中,選擇了 HoodieGlobalBloomIndex(全局索引),會在全部分區內查找指定的 recordKey。而 HoodieBloomIndex 只在指定的分區內查找。
  
    
  
  
   
   
            
   
   
 
    
def main(args: Array[String]): Unit = { val sss = SparkSession.builder.appName("hudi") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("hive.metastore.uris", "thrift://ip:port") .enableHiveSupport().getOrCreate()
val sql = "select * from ods.ods_user_event" val df: DataFrame = sss.sql(sql)
df.write.format("org.apache.hudi") .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "recordKey") .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "update_time") .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "date") .option(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH, "true") .option(HoodieIndexConfig.INDEX_TYPE_PROP, HoodieIndex.IndexType.GLOBAL_BLOOM.name()) .option("hoodie.insert.shuffle.parallelism", "10") .option("hoodie.upsert.shuffle.parallelism", "10") .option(HoodieWriteConfig.TABLE_NAME, "ods.ods_user_event_hudi") .mode(SaveMode.Append) .save("/user/hudi/lake/ods.db/ods_user_event_hudi") }
執行成功後會有以下結果,由於咱們是按照date分區,每一天的數據會生成一個文件夾和Hive相似。
  
    
  
  
   
   
            
   
   
 
    
[hadoop@hadoop31 ~]# hdfs dfs -ls /user/hudi/lake/ods.db/ods_user_event_hudi/Found 4 itemsdrwxr-xr-x - hadoop hadoop 0 2020-05-25 18:42 /user/hudi/lake/ods.db/ods_user_event_hudi/20200501drwxr-xr-x - hadoop hadoop 0 2020-05-25 18:42 /user/hudi/lake/ods.db/ods_user_event_hudi/20200502drwxr-xr-x - hadoop hadoop 0 2020-05-25 18:42 /user/hudi/lake/ods.db/ods_user_event_hudi/20200503drwxr-xr-x   - hadoop hadoop 0 2020-05-25 18:42 /user/hudi/lake/ods.db/ods_user_event_hudi/20200504
另外,注意 recordKey 必須惟一,否則數據會被覆蓋,且值不能爲 null,不然會有如下報錯。
  
    
  
  
   
   
            
   
   
 
    
Caused by: org.apache.hudi.exception.HoodieKeyException: recordKey value: "null" for field: "user_uid" cannot be null or empty.


Hive 建立外部表讀數據

上一步中 Spark 將數據寫到了 hudi,想要經過Hive訪問到這塊數據,就須要建立一個Hive外部表了,由於 Hudi 配置了分區,因此爲了能讀到全部的數據,我們的外部表也得分區,分區字段名可隨意配置。
  
    
  
  
   
   
            
   
   

 
    
CREATE TABLE ods.ods_user_event_hudi( uuid STRING, name STRING, addr STRING, update_time STRING, date STRING)PARTITIONED BY ( `dt` string)ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'STORED AS INPUTFORMAT 'org.apache.hudi.hadoop.HoodieParquetInputFormat'OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'LOCATION '/user/hudi/lake/ods.db/ods_user_event_hudi'
至此,直接讀數據確定是空的,由於咱們建立的是個分區表,因此還須要指定分區。
  
    
  
  
   
   
            
   
   
 
    
alter table ods.ods_user_event_hudi add if not exists partition(dt='20200504') location '/user/hudi/lake/ods.db/ods_user_event_hudi/20200504'

那麼這個時候問題來了,一年有365個分區,要一個一個創建手動建立分區嗎?
抱歉我也沒發現更好的辦法,只能送你個簡單的腳本了。
  
    
  
  
   
   
            
   
   
 
    
#!/bin/bashstart_date=20190101end_date=20200520start=`date -d "$start_date" "+%s"`end=`date -d "$end_date" "+%s"`for((i=start;i<=end;i+=86400)); do dt=$(date -d "@$i" "+%Y%m%d") hive -e "alter table ods.ods_user_event_hudi add if not exists partition(dt='${dt}') location '/user/hudi/lake/ods.db/ods_user_event_hudi/${dt}'; "done


後記

最後,執行 select * from ods.ods_user_event_hudi 要是沒有數據你來找我。另外值得注意的是,若是此時直接用 Hive 將數據 insert into ods.ods_user_event_hudi,雖然數據會寫入到 hudi 的目錄下,可是相同的 recordKey 是不會覆蓋原有數據的。

下一篇詳細寫 Spark 操做 Hudi 的相關內容,敬請期待。本篇詳細代碼後臺回覆 hudi 後便可獲取。




- END -

有收穫就點個「在看」吧 ▼

本文分享自微信公衆號 - 老懞大數據(simon_bigdata)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。git

相關文章
相關標籤/搜索