實時分爲處理的實時和數據的實時
即席分析是要求對數據實時的處理,立刻要獲得對應的結果
Flink、Spark Streaming是用來對實時數據的實時處理,數據要求實時,處理也要迅速
數據不實時,處理也不及時的場景則是咱們的數倉T+1數據java
而本文探討的Apache Hudi,對應的場景是數據的實時,而非處理的實時。它旨在將Mysql中的時候以近實時的方式映射到大數據平臺,好比Hive中。mysql
傳統的離線數倉,一般數據是T+1的,不能知足對當日數據分析的需求
而流式計算通常是基於窗口,而且窗口邏輯相對比較固定。
而筆者所在的公司有一類特殊的需求,業務分析比較熟悉現有事務數據庫的數據結構,而且但願有不少即席分析,這些分析包含當日比較實時的數據。慣常他們是基於Mysql從庫,直接經過Sql作相應的分析計算。但不少時候會遇到以下障礙git
所以,一些彌合在OLTP和OLAP之間的技術框架出現,典型有TiDB。它能同時支持OLTP和OLAP。而諸如Apache Hudi和Apache Kudu則至關於現有OLTP和OLAP技術的橋樑。他們可以以現有OLTP中的數據結構存儲數據,支持CRUD,同時提供跟現有OLAP框架的整合(如Hive,Impala),以實現OLAP分析github
Apache Kudu,須要單獨部署集羣。而Apache Hudi則不須要,它能夠利用現有的大數據集羣好比HDFS作數據文件存儲,而後經過Hive作數據分析,相對來講更適合資源受限的環境sql
Hudi 提供了Hudi 表的概念,這些表支持CRUD操做。咱們能夠基於這個特色,將Mysql Binlog的數據重放至Hudi表,而後基於Hive對Hudi表進行查詢分析。數據流向架構以下
數據庫
Hudi表的數據文件,可使用操做系統的文件系統存儲,也可使用HDFS這種分佈式的文件系統存儲。爲了後續分析性能和數據的可靠性,通常使用HDFS進行存儲。以HDFS存儲來看,一個Hudi表的存儲文件分爲兩類。apache
_partition_key
相關的路徑是實際的數據文件,按分區存儲,固然分區的路徑key是能夠指定的,我這裏使用的是_partition_keyHudi真實的數據文件使用Parquet文件格式存儲
session
Hudi把隨着時間流逝,對錶的一系列CRUD操做叫作Timeline。Timeline中某一次的操做,叫作Instant。Instant包含如下信息數據結構
.hoodie文件夾中存放對應操做的狀態記錄
架構
hudi爲了實現數據的CRUD,須要可以惟一標識一條記錄。hudi將把數據集中的惟一字段(record key ) + 數據所在分區 (partitionPath) 聯合起來當作數據的惟一鍵
基於上述基礎概念之上,Hudi提供了兩類表格式COW和MOR。他們會在數據的寫入和查詢性能上有一些不一樣
簡稱COW。顧名思義,他是在數據寫入的時候,複製一份原來的拷貝,在其基礎上添加新數據。正在讀數據的請求,讀取的是是近的完整副本,這相似Mysql 的MVCC的思想。
上圖中,每個顏色都包含了截至到其所在時間的全部數據。老的數據副本在超過必定的個數限制後,將被刪除。這種類型的表,沒有compact instant,由於寫入時至關於已經compact了。
簡稱MOR。新插入的數據存儲在delta log 中。按期再將delta log合併進行parquet數據文件。讀取數據時,會將delta log跟老的數據文件作merge,獲得完整的數據返回。固然,MOR表也能夠像COW表同樣,忽略delta log,只讀取最近的完整數據文件。下圖演示了MOR的兩種數據讀寫方式
我在github上放置了基於Hudi的封裝實現,對應的源碼地址爲 https://github.com/wanqiufeng/hudi-learn。
參數名 | 含義 | 是否必填 | 默認值 |
---|---|---|---|
--base-save-path |
hudi表存放在HDFS的基礎路徑,好比hdfs://192.168.16.181:8020/hudi_data/ | 是 | 無 |
--mapping-mysql-db-name |
指定處理的Mysql庫名 | 是 | 無 |
--mapping-mysql-table-name |
指定處理的Mysql表名 | 是 | 無 |
--store-table-name |
指定Hudi的表名 | 否 | 默認會根據--mapping-mysql-db-name和--mapping-mysql-table-name自動生成。假設--mapping-mysql-db-name 爲crm,--mapping-mysql-table-name爲order。那麼最終的hudi表名爲crm__order |
--real-save-path |
指定hudi表最終存儲的hdfs路徑 | 否 | 默認根據--base-save-path和--store-table-name自動生成,生成格式爲'--base-save-path'+'/'+'--store-table-name' ,推薦默認 |
--primary-key |
指定同步的mysql表中能惟一標識記錄的字段名 | 否 | 默認id |
--partition-key |
指定mysql表中能夠用於分區的時間字段,字段必須是timestamp 或dateime類型 | 是 | 無 |
--precombine-key |
最終用於配置hudi的hoodie.datasource.write.precombine.field |
否 | 默認id |
--kafka-server |
指定Kafka 集羣地址 | 是 | 無 |
--kafka-topic |
指定消費kafka的隊列 | 是 | 無 |
--kafka-group |
指定消費kafka的group | 否 | 默認在存儲表名前加'hudi'前綴,好比'hudi_crm__order' |
--duration-seconds |
因爲本程序使用Spark streaming開發,這裏指定Spark streaming微批的時長 | 否 | 默認10秒 |
一個使用的demo以下
/data/opt/spark-2.4.4-bin-hadoop2.6/bin/spark-submit --class com.niceshot.hudi.CanalKafkaImport2Hudi \ --name hudi__goods \ --master yarn \ --deploy-mode cluster \ --driver-memory 512m \ --executor-memory 512m \ --executor-cores 1 \ --num-executors 1 \ --queue hudi \ --conf spark.executor.memoryOverhead=2048 \ --conf "spark.executor.extraJavaOptions=-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=\tmp\hudi-debug" \ --conf spark.core.connection.ack.wait.timeout=300 \ --conf spark.locality.wait=100 \ --conf spark.streaming.backpressure.enabled=true \ --conf spark.streaming.receiver.maxRate=500 \ --conf spark.streaming.kafka.maxRatePerPartition=200 \ --conf spark.ui.retainedJobs=10 \ --conf spark.ui.retainedStages=10 \ --conf spark.ui.retainedTasks=10 \ --conf spark.worker.ui.retainedExecutors=10 \ --conf spark.worker.ui.retainedDrivers=10 \ --conf spark.sql.ui.retainedExecutions=10 \ --conf spark.yarn.submit.waitAppCompletion=false \ --conf spark.yarn.maxAppAttempts=4 \ --conf spark.yarn.am.attemptFailuresValidityInterval=1h \ --conf spark.yarn.max.executor.failures=20 \ --conf spark.yarn.executor.failuresValidityInterval=1h \ --conf spark.task.maxFailures=8 \ /data/opt/spark-applications/hudi_canal_consumer/hudi-canal-import-1.0-SNAPSHOT-jar-with-dependencies.jar --kafka-server local:9092 --kafka-topic dt_streaming_canal_xxx --base-save-path hdfs://192.168.2.1:8020/hudi_table/ --mapping-mysql-db-name crm --mapping-mysql-table-name order --primary-key id --partition-key createDate --duration-seconds 1200
history_import_and_meta_sync
分支提供了將歷史數據同步至hudi表,以及將hudi表數據結構同步至hive meta的操做
這裏採用的思路是
HiveImport2HudiConfig提供了以下一些參數,用於配置程序執行行爲
參數名 | 含義 | 是否必填 | 默認值 |
---|---|---|---|
--base-save-path |
hudi表存放在HDFS的基礎路徑,好比hdfs://192.168.16.181:8020/hudi_data/ | 是 | 無 |
--mapping-mysql-db-name |
指定處理的Mysql庫名 | 是 | 無 |
--mapping-mysql-table-name |
指定處理的Mysql表名 | 是 | 無 |
--store-table-name |
指定Hudi的表名 | 否 | 默認會根據--mapping-mysql-db-name和--mapping-mysql-table-name自動生成。假設--mapping-mysql-db-name 爲crm,--mapping-mysql-table-name爲order。那麼最終的hudi表名爲crm__order |
--real-save-path |
指定hudi表最終存儲的hdfs路徑 | 否 | 默認根據--base-save-path和--store-table-name自動生成,生成格式爲'--base-save-path'+'/'+'--store-table-name' ,推薦默認 |
--primary-key |
指定同步的hive歷史表中能惟一標識記錄的字段名 | 否 | 默認id |
--partition-key |
指定hive歷史表中能夠用於分區的時間字段,字段必須是timestamp 或dateime類型 | 是 | 無 |
--precombine-key |
最終用於配置hudi的hoodie.datasource.write.precombine.field |
否 | 默認id |
--sync-hive-db-name |
全量歷史數據所在hive的庫名 | 是 | 無 |
--sync-hive-table-name |
全量歷史數據所在hive的表名 | 是 | 無 |
--hive-base-path |
hive的全部數據文件存放地址,須要參看具體的hive配置 | 否 | /user/hive/warehouse |
--hive-site-path |
hive-site.xml配置文件所在的地址 | 是 | 無 |
--tmp-data-path |
程序執行過程當中臨時文件存放路徑。通常默認路徑是/tmp。有可能出現/tmp所在磁盤過小,而致使歷史程序執行失敗的狀況。當出現該狀況時,能夠經過該參數自定義執行路徑 | 否 | 默認操做系統臨時目錄 |
一個程序執行demo
nohup java -jar hudi-learn-1.0-SNAPSHOT.jar --sync-hive-db-name hudi_temp --sync-hive-table-name crm__wx_user_info --base-save-path hdfs://192.168.2.2:8020/hudi_table/ --mapping-mysql-db-name crm --mapping-mysql-table-name "order" --primary-key "id" --partition-key created_date --hive-site-path /etc/lib/hive/conf/hive-site.xml --tmp-data-path /data/tmp > order.log &
須要將hudi的數據結構和分區,以hive外表的形式同步至Hive meta,才能是Hive感知到hudi數據,並經過sql進行查詢分析。Hudi自己在消費Binlog進行存儲時,能夠順帶將相關表元數據信息同步至hive。但考慮到每條寫入Apache Hudi表的數據,都要讀寫Hive Meta ,對Hive的性能可能影響很大。因此我單獨開發了HiveMetaSyncConfig工具,用於同步hudi表元數據至Hive。考慮到目前程序只支持按天分區,因此同步工具能夠一天執行一次便可。參數配置以下
參數名 | 含義 | 是否必填 | 默認值 |
---|---|---|---|
--hive-db-name |
指定hudi表同步至哪一個hive數據庫 | 是 | 無 |
--hive-table-name |
指定hudi表同步至哪一個hive表 | 是 | 無 |
--hive-jdbc-url |
指定hive meta的jdbc連接地址,例如jdbc:hive2://192.168.16.181:10000 | 是 | 無 |
--hive-user-name |
指定hive meta的連接用戶名 | 否 | 默認hive |
--hive-pwd |
指定hive meta的連接密碼 | 否 | 默認hive |
--hudi-table-path |
指定hudi表所在hdfs的文件路徑 | 是 | 無 |
--hive-site-path |
指定hive的hive-site.xml路徑 | 是 | 無 |
一個程序執行demo
java -jar hudi-learn-1.0-SNAPSHOT.jar --hive-db-name streaming --hive-table-name crm__order --hive-user-name hive --hive-pwd hive --hive-jdbc-url jdbc:hive2://192.168.16.181:10000 --hudi-table-path hdfs://192.168.16.181:8020/hudi_table/crm__order --hive-site-path /lib/hive/conf/hive-site.xml
有些hive集羣的hive.input.format配置,默認是org.apache.hadoop.hive.ql.io.CombineHiveInputFormat,這會致使掛載Hudi數據的Hive外表讀取到全部Hudi的Parquet數據,從而致使最終的讀取結果重複。須要將hive的format改成org.apache.hadoop.hive.ql.io.HiveInputFormat
,爲了不在整個集羣層面上更改對其他離線Hive Sql形成沒必要要的影響,建議只對當前hive session設置set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
因爲binlog寫入Hudi表的是基於Spark streaming實現的,這裏給出了一些spark 和spark streaming層面的配置,它能使整個程序工做更穩定
配置 | 含義 |
---|---|
spark.streaming.backpressure.enabled=true | 啓動背壓,該配置能使Spark Streaming消費速率,基於上一次的消費狀況,進行調整,避免程序崩潰 |
spark.ui.retainedJobs=10 spark.ui.retainedStages=10 spark.ui.retainedTasks=10 spark.worker.ui.retainedExecutors=10 spark.worker.ui.retainedDrivers=10 spark.sql.ui.retainedExecutions=10 |
默認狀況下,spark 會在driver中存儲一些spark 程序執行過程當中各stage和task的歷史信息,當driver內存太小時,可能使driver崩潰,經過上述參數,調節這些歷史數據存儲的條數,從而減少對內層使用 |
spark.yarn.maxAppAttempts=4 | 配置當driver崩潰後,嘗試重啓的次數 |
spark.yarn.am.attemptFailuresValidityInterval=1h | 倘若driver執行一週才崩潰一次,那咱們更但願每次都能重啓,而上述配置在累計到重啓4次後,driver就不再會被重啓,該配置則用於重置maxAppAttempts的時間間隔 |
spark.yarn.max.executor.failures=20 | executor執行也可能失敗,失敗後集羣會自動分配新的executor, 該配置用於配置容許executor失敗的次數,超過次數後程序會報(reason: Max number of executor failures (400) reached),並退出 |
spark.yarn.executor.failuresValidityInterval=1h | 指定executor失敗重分配次數重置的時間間隔 |
spark.task.maxFailures=8 | 容許任務執行失敗的次數 |
歡迎關注個人我的公衆號"西北偏北UP",記錄代碼人生,行業思考,科技評論