Flink使用HiveCatalog能夠經過批或者流的方式來處理Hive中的表。這就意味着Flink既能夠做爲Hive的一個批處理引擎,也能夠經過流處理的方式來讀寫Hive中的表,從而爲實時數倉的應用和流批一體的落地實踐奠基了堅實的基礎。本文將以Flink1.12爲例,介紹Flink集成Hive的另一個很是重要的方面——Hive維表JOIN(Temporal Table Join)與Flink讀寫Hive表的方式。如下是全文,但願本文對你有所幫助。java
Flink支持以批處理(Batch)和流處理(Streaming)的方式寫入Hive表。當以批處理的方式寫入Hive表時,只有當寫入做業結束時,才能夠看到寫入的數據。批處理的方式寫入支持append模式和overwrite模式。正則表達式
Flink SQL> use catalog myhive; -- 使用catalog Flink SQL> INSERT INTO users SELECT 2,'tom'; Flink SQL> set execution.type=batch; -- 使用批處理模式 Flink SQL> INSERT OVERWRITE users SELECT 2,'tom';
-- 向靜態分區表寫入數據 Flink SQL> INSERT OVERWRITE myparttable PARTITION (my_type='type_1', my_date='2019-08-08') SELECT 'Tom', 25; -- 向動態分區表寫入數據 Flink SQL> INSERT OVERWRITE myparttable SELECT 'Tom', 25, 'type_1', '2019-08-08';
流式寫入Hive表,不支持Insert overwrite 方式,不然報以下錯誤:sql
[ERROR] Could not execute SQL statement. Reason: java.lang.IllegalStateException: Streaming mode not support overwrite.
下面的示例是將kafka的數據流式寫入Hive的分區表json
-- 使用流處理模式 Flink SQL> set execution.type=streaming; -- 使用Hive方言 Flink SQL> SET table.sql-dialect=hive; -- 建立一張Hive分區表 CREATE TABLE user_behavior_hive_tbl ( `user_id` BIGINT, -- 用戶id `item_id` BIGINT, -- 商品id `cat_id` BIGINT, -- 品類id `action` STRING, -- 用戶行爲 `province` INT, -- 用戶所在的省份 `ts` BIGINT -- 用戶行爲發生的時間戳 ) PARTITIONED BY (dt STRING,hr STRING,mi STRING) STORED AS parquet TBLPROPERTIES ( 'partition.time-extractor.timestamp-pattern'='$dt $hr:$mi:00', 'sink.partition-commit.trigger'='partition-time', 'sink.partition-commit.delay'='0S', 'sink.partition-commit.policy.kind'='metastore,success-file' ); -- 使用默認SQL方言 Flink SQL> SET table.sql-dialect=default; -- 建立一張kafka數據源表 CREATE TABLE user_behavior ( `user_id` BIGINT, -- 用戶id `item_id` BIGINT, -- 商品id `cat_id` BIGINT, -- 品類id `action` STRING, -- 用戶行爲 `province` INT, -- 用戶所在的省份 `ts` BIGINT, -- 用戶行爲發生的時間戳 `proctime` AS PROCTIME(), -- 經過計算列產生一個處理時間列 `eventTime` AS TO_TIMESTAMP(FROM_UNIXTIME(ts, 'yyyy-MM-dd HH:mm:ss')), -- 事件時間 WATERMARK FOR eventTime AS eventTime - INTERVAL '5' SECOND -- 定義watermark ) WITH ( 'connector' = 'kafka', -- 使用 kafka connector 'topic' = 'user_behaviors', -- kafka主題 'scan.startup.mode' = 'earliest-offset', -- 偏移量 'properties.group.id' = 'group1', -- 消費者組 'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092', 'format' = 'json', -- 數據源格式爲json 'json.fail-on-missing-field' = 'true', 'json.ignore-parse-errors' = 'false' );
關於Hive表的一些屬性解釋:bootstrap
partition.time-extractor.timestamp-pattern緩存
$year-$month-$day $hour:00:00
,若是是按天時進行分區,則該屬性值爲:$dt $hour:00:00
;sink.partition-commit.triggerbash
解釋:分區觸發器類型,可選 process-time 或partition-time。app
sink.partition-commit.delay性能
sink.partition-commit.policy.kind大數據
解釋:提交分區的策略,用於通知下游的應用該分區已經完成了寫入,也就是說該分區的數據能夠被訪問讀取。可選的值以下:
_SUCCESS
文件能夠同時配置上面的兩個值,好比metastore,success-file
執行流式寫入Hive表
-- streaming sql,將數據寫入Hive表 INSERT INTO user_behavior_hive_tbl SELECT user_id, item_id, cat_id, action, province, ts, FROM_UNIXTIME(ts, 'yyyy-MM-dd'), FROM_UNIXTIME(ts, 'HH'), FROM_UNIXTIME(ts, 'mm') FROM user_behavior; -- batch sql,查詢Hive表的分區數據 SELECT * FROM user_behavior_hive_tbl WHERE dt='2021-01-04' AND hr='16' AND mi = '46';
同時查看Hive表的分區數據:
尖叫提示:1.Flink讀取Hive表默認使用的是batch模式,若是要使用流式讀取Hive表,須要而外指定一些參數,見下文。
2.只有在完成 Checkpoint 以後,文件纔會從 In-progress 狀態變成 Finish 狀態,同時生成
_SUCCESS
文件,因此,Flink流式寫入Hive表須要開啓並配置 Checkpoint。對於Flink SQL Client而言,須要在flink-conf.yaml中開啓CheckPoint,配置內容爲:state.backend: filesystem
execution.checkpointing.externalized-checkpoint-retention:RETAIN_ON_CANCELLATION
execution.checkpointing.interval: 60s
execution.checkpointing.mode: EXACTLY_ONCE
state.savepoints.dir: hdfs://kms-1:8020/flink-savepoints
Flink支持以批處理(Batch)和流處理(Streaming)的方式讀取Hive中的表。批處理的方式與Hive的自己查詢相似,即只在提交查詢的時刻查詢一次Hive表。流處理的方式將會持續地監控Hive表,而且會增量地提取新的數據。默認狀況下,Flink是以批處理的方式讀取Hive表。
關於流式讀取Hive表,Flink既支持分區表又支持非分區表。對於分區表而言,Flink將會監控新產生的分區數據,並以增量的方式讀取這些數據。對於非分區表,Flink會監控Hive表存儲路徑文件夾裏面的新文件,並以增量的方式讀取新的數據。
Flink讀取Hive表能夠配置一下參數:
streaming-source.enable
streaming-source.partition.include
streaming-source.monitor-interval
streaming-source.partition-order
streaming-source.consume-start-offset
partition.time-extractor.timestamp-pattern
配置時間戳提取的正則表達式。在 SQL Client 中須要顯示地開啓 SQL Hint 功能
Flink SQL> set table.dynamic-table-options.enabled= true;
使用SQLHint流式查詢Hive表
SELECT * FROM user_behavior_hive_tbl /*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.consume-start-offset'='2021-01-03') */;
Flink 1.12 支持了 Hive 最新的分區做爲時態表的功能,能夠經過 SQL 的方式直接關聯 Hive 分區表的最新分區,而且會自動監聽最新的 Hive 分區,當監控到新的分區後,會自動地作維表數據的全量替換。
Flink支持的是processing-time的temporal join,也就是說老是與最新版本的時態表進行JOIN。另外,Flink既支持非分區表的temporal join,又支持分區表的temporal join。對於分區表而言,Flink會監聽Hive表的最新分區數據。值得注意的是,Flink尚不支持 event-time temporal join。
對於一張隨着時間變化的Hive分區表,Flink能夠讀取該表的數據做爲一個無界流。若是Hive分區表的每一個分區都包含全量的數據,那麼每一個分區將作爲一個時態表的版本數據,即將最新的分區數據做爲一個全量維表數據。值得注意的是,該功能特色僅支持Flink的STREAMING模式。
使用 Hive 最新分區做爲 Tempmoral table 以前,須要設置必要的兩個參數:
'streaming-source.enable' = 'true', 'streaming-source.partition.include' = 'latest'
除此以外還有一些其餘的參數,關於參數的解釋見上面的分析。咱們在使用Hive維表的時候,既能夠在建立Hive表時指定具體的參數,也可使用SQL Hint的方式動態指定參數。一個Hive維表的建立模板以下:
-- 使用Hive的sql方言 SET table.sql-dialect=hive; CREATE TABLE dimension_table ( product_id STRING, product_name STRING, unit_price DECIMAL(10, 4), pv_count BIGINT, like_count BIGINT, comment_count BIGINT, update_time TIMESTAMP(3), update_user STRING, ... ) PARTITIONED BY (pt_year STRING, pt_month STRING, pt_day STRING) TBLPROPERTIES ( -- 方式1:按照分區名排序來識別最新分區(推薦使用該種方式) 'streaming-source.enable' = 'true', -- 開啓Streaming source 'streaming-source.partition.include' = 'latest',-- 選擇最新分區 'streaming-source.monitor-interval' = '12 h',-- 每12小時加載一次最新分區數據 'streaming-source.partition-order' = 'partition-name', -- 按照分區名排序 -- 方式2:分區文件的建立時間排序來識別最新分區 'streaming-source.enable' = 'true', 'streaming-source.partition.include' = 'latest', 'streaming-source.partition-order' = 'create-time',-- 分區文件的建立時間排序 'streaming-source.monitor-interval' = '12 h' -- 方式3:按照分區時間排序來識別最新分區 'streaming-source.enable' = 'true', 'streaming-source.partition.include' = 'latest', 'streaming-source.monitor-interval' = '12 h', 'streaming-source.partition-order' = 'partition-time', -- 按照分區時間排序 'partition.time-extractor.kind' = 'default', 'partition.time-extractor.timestamp-pattern' = '$pt_year-$pt_month-$pt_day 00:00:00' );
有了上面的Hive維表,咱們就可使用該維表與Kafka的實時流數據進行JOIN,獲得相應的寬表數據。
-- 使用default sql方言 SET table.sql-dialect=default; -- kafka實時流數據表 CREATE TABLE orders_table ( order_id STRING, order_amount DOUBLE, product_id STRING, log_ts TIMESTAMP(3), proctime as PROCTIME() ) WITH (...); -- 將流表與hive最新分區數據關聯 SELECT * FROM orders_table AS orders JOIN dimension_table FOR SYSTEM_TIME AS OF orders.proctime AS dim ON orders.product_id = dim.product_id;
除了在定義Hive維表時指定相關的參數,咱們還能夠經過SQL Hint的方式動態指定相關的參數,具體方式以下:
SELECT * FROM orders_table AS orders JOIN dimension_table /*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.partition.include' = 'latest', 'streaming-source.monitor-interval' = '1 h', 'streaming-source.partition-order' = 'partition-name') */ FOR SYSTEM_TIME AS OF orders.proctime AS dim -- 時態表(維表) ON orders.product_id = dim.product_id;
對於Hive的非分區表,當使用temporal join時,整個Hive表會被緩存到Slot內存中,而後根據流中的數據對應的key與其進行匹配。使用最新的Hive表進行temporal join不須要進行額外的配置,咱們只須要配置一個Hive表緩存的TTL時間,該時間的做用是:當緩存過時時,就會從新掃描Hive表並加載最新的數據。
lookup.join.cache.ttl
尖叫提示:當使用此種方式時,Hive表必須是有界的lookup表,即非Streaming Source的時態表,換句話說,該表的屬性streaming-source.enable = false。
若是要使用Streaming Source的時態表,記得配置streaming-source.monitor-interval的值,即數據更新的時間間隔。
-- Hive維表數據使用批處理的方式按天裝載 SET table.sql-dialect=hive; CREATE TABLE dimension_table ( product_id STRING, product_name STRING, unit_price DECIMAL(10, 4), pv_count BIGINT, like_count BIGINT, comment_count BIGINT, update_time TIMESTAMP(3), update_user STRING, ... ) TBLPROPERTIES ( 'streaming-source.enable' = 'false', -- 關閉streaming source 'streaming-source.partition.include' = 'all', -- 讀取全部數據 'lookup.join.cache.ttl' = '12 h' ); -- kafka事實表 SET table.sql-dialect=default; CREATE TABLE orders_table ( order_id STRING, order_amount DOUBLE, product_id STRING, log_ts TIMESTAMP(3), proctime as PROCTIME() ) WITH (...); -- Hive維表join,Flink會加載該維表的全部數據到內存中 SELECT * FROM orders_table AS orders JOIN dimension_table FOR SYSTEM_TIME AS OF orders.proctime AS dim ON orders.product_id = dim.product_id;
尖叫提示:1.每個子任務都須要緩存一份維表的全量數據,必定要確保TM的task Slot 大小可以容納維表的數據量;
2.推薦將streaming-source.monitor-interval和lookup.join.cache.ttl的值設爲一個較大的數,由於頻繁的更新和加載數據會影響性能。
3.當緩存的維表數據須要從新刷新時,目前的作法是將整個表進行加載,所以不可以將新數據與舊數據區分開來。
假設維表的數據是經過批處理的方式(好比天天)裝載至Hive中,而Kafka中的事實流數據須要與該維表進行JOIN,從而構建一個寬表數據,這個時候就可使用Hive的維表JOIN。
SET table.sql-dialect=default; CREATE TABLE fact_user_behavior ( `user_id` BIGINT, -- 用戶id `item_id` BIGINT, -- 商品id `action` STRING, -- 用戶行爲 `province` INT, -- 用戶所在的省份 `ts` BIGINT, -- 用戶行爲發生的時間戳 `proctime` AS PROCTIME(), -- 經過計算列產生一個處理時間列 `eventTime` AS TO_TIMESTAMP(FROM_UNIXTIME(ts, 'yyyy-MM-dd HH:mm:ss')), -- 事件時間 WATERMARK FOR eventTime AS eventTime - INTERVAL '5' SECOND -- 定義watermark ) WITH ( 'connector' = 'kafka', -- 使用 kafka connector 'topic' = 'user_behaviors', -- kafka主題 'scan.startup.mode' = 'earliest-offset', -- 偏移量 'properties.group.id' = 'group1', -- 消費者組 'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092', 'format' = 'json', -- 數據源格式爲json 'json.fail-on-missing-field' = 'true', 'json.ignore-parse-errors' = 'false' );
SET table.sql-dialect=hive; CREATE TABLE dim_item ( item_id BIGINT, item_name STRING, unit_price DECIMAL(10, 4) ) PARTITIONED BY (dt STRING) TBLPROPERTIES ( 'streaming-source.enable' = 'true', 'streaming-source.partition.include' = 'latest', 'streaming-source.monitor-interval' = '12 h', 'streaming-source.partition-order' = 'partition-name' );
SELECT fact.item_id, dim.item_name, count(*) AS buy_cnt FROM fact_user_behavior AS fact LEFT JOIN dim_item FOR SYSTEM_TIME AS OF fact.proctime AS dim ON fact.item_id = dim.item_id WHERE fact.action = 'buy' GROUP BY fact.item_id,dim.item_name;
使用SQL Hint方式,關聯非分區的Hive維表:
set table.dynamic-table-options.enabled= true; SELECT fact.item_id, dim.item_name, count(*) AS buy_cnt FROM fact_user_behavior AS fact LEFT JOIN dim_item1 /*+ OPTIONS('streaming-source.enable'='false', 'streaming-source.partition.include' = 'all', 'lookup.join.cache.ttl' = '12 h') */ FOR SYSTEM_TIME AS OF fact.proctime AS dim ON fact.item_id = dim.item_id WHERE fact.action = 'buy' GROUP BY fact.item_id,dim.item_name;
本文以最新版本的Flink1.12爲例,介紹了Flink讀寫Hive的不一樣方式,並對每種方式給出了相應的使用示例。在實際應用中,一般有將實時數據流與 Hive 維表 join 來構造寬表的需求,Flink提供了Hive維表JOIN,能夠簡化用戶使用的複雜度。本文在最後詳細說明了Flink進行Hive維表JOIN的基本步驟以及使用示例,但願對你有所幫助。
公衆號『大數據技術與數倉』,回覆『資料』領取大數據資料包