來源於flink社區sql
諮詢一個flink問題。flinsql,能寫入數據到hive表。可是hive表中的數據,都是基於 ".part,,,,inprogress,,,,"
相似的文件。
flink1.12.0 是基於cdh6.2.0編譯的,hive版本是2.1.一、hadoop-3.0.0. 問題截圖以下:
建立hive表:ide
SET table.sql-dialect=hive; CREATE TABLE hive_table ( user_id STRING, order_amount DOUBLE ) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES ( 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00', 'sink.partition-commit.trigger'='partition-time', 'sink.partition-commit.delay'='1 min', 'sink.partition-commit.policy.kind'='metastore,success-file' );
插入數據:oop
INSERT INTO TABLE hive_table SELECT user_id, order_amount, DATE_FORMAT(log_ts, 'yyyy-MM-dd'), DATE_FORMAT(log_ts, 'HH') FROM kafka_table;
分區的提交須要開啓checkpoint ,若是是jar包發佈,直接在代碼裏寫就能夠。
若是用的sql-client提交sql ,能夠在配置文件: sql-client-defaults.yaml 中加入以下配置:code
configuration: execution.checkpointing.interval: 1000
配置完如上後,hive表成功寫入數據,可是爲啥flink-sql及hive卻讀取不到hive表數據呢,hadoop
SELECT * FROM hive_table WHERE dt='2021-06-21' and hr='18';
第一步:資源
CREATE CATALOG myhive WITH ( 'type' = 'hive', 'default-database' = 'default', 'hive-conf-dir' = '/home/admin/hive/conf' );
第二部部署
USE CATALOG myhive;
第三步kafka
select * from hive_table;
猜想可能的問題,咱們本地部署設置的slot都是1,你多是在跑着寫入任務,沒有資源跑讀取任務?
你能夠設置把寫入任務停了,或者設置方言問 :
SET table.sql-dialect=hive;
而後再查詢試試。。。。。it
很是感謝,slot我設置成了4,按照你的方法我排查了下個人問題,應該是我在個人集羣配置文件sql-client-defaults.yaml文件中設置的原始值不對:
我多加了個「hive-version: 2.1.1」,後來把這一行註釋掉,能夠了;並且按照你的方式註冊臨時catalog也能夠了。
這個問題致使我一直卡在這io
sql-client-defaults.yaml文件中設置的錯誤原始值以下:
catalogs: # [] # empty list # A typical catalog definition looks like: - name: myhive type: hive # hive-conf-dir: /opt/hive_conf/ hive-conf-dir: /etc/hive/conf # default-database: ... hive-version: 2.1.1 default-database: myhive
sql-client-defaults.yaml文件中設置的修改後值以下:
catalogs: # [] # empty list # A typical catalog definition looks like: - name: myhive type: hive # hive-conf-dir: /opt/hive_conf/ hive-conf-dir: /etc/hive/conf # default-database: ... # hive-version: 2.1.1 default-database: myhive