Flink 1.11 新特性之 SQL Hive Streaming 簡單示例

7月7日,Flink 1.11 版本發佈,與 1.10 版本相比,1.11 版本最爲顯著的一個改進是 Hive Integration 顯著加強,也就是真正意義上實現了基於 Hive 的流批一體。php


本文用簡單的本地示例來體驗 Hive Streaming 的便利性並跟你們分享體驗的過程以及個人心得,但願對你們上手使用有所幫助。html


添加相關依賴web


測試集羣上的 Hive 版本爲 1.1.0,Hadoop 版本爲 2.6.0,Kafka 版本爲 1.0.1。

<properties> <scala.bin.version>2.11</scala.bin.version> <flink.version>1.11.0</flink.version> <flink-shaded-hadoop.version>2.6.5-10.0</flink-shaded-hadoop.version> <hive.version>1.1.0</hive.version></properties>
<dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.bin.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.bin.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_${scala.bin.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_${scala.bin.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-hive_${scala.bin.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-sql-connector-kafka_${scala.bin.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-shaded-hadoop-2-uber</artifactId> <version>${flink-shaded-hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>${hive.version}</version> </dependency>

另外,別忘了找到 hdfs-site.xml 和 hive-site.xml,並將其加入項目。

建立執行環境sql


Flink 1.11 的 Table/SQL API 中,FileSystem Connector 是靠加強版 StreamingFileSink 組件實現,在源碼中名爲 StreamingFileWriter。咱們知道,只有在 Checkpoint 成功時,StreamingFileSink 寫入的文件纔會由 Pending 狀態變成 Finished 狀態,從而可以安全地被下游讀取。因此,咱們必定要打開 Checkpointing,並設定合理的間隔。


val streamEnv = StreamExecutionEnvironment.getExecutionEnvironmentstreamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)streamEnv.setParallelism(3)
val tableEnvSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build()val tableEnv = StreamTableEnvironment.create(streamEnv, tableEnvSettings)tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE)tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(20))
   

註冊 HiveCatalogapache


val catalogName = "my_catalog"val catalog = new HiveCatalog( catalogName, // catalog name "default", // default database "/Users/lmagic/develop", // Hive config (hive-site.xml) directory "1.1.0" // Hive version)tableEnv.registerCatalog(catalogName, catalog)tableEnv.useCatalog(catalogName)


建立 Kafka 流表json


Kafka Topic 中存儲的是 JSON 格式的埋點日誌,建表時用計算列生成事件時間與水印。1.11 版本 SQL Kafka Connector 的參數相比 1.10 版本有必定簡化。

tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS stream_tmp")tableEnv.executeSql("DROP TABLE IF EXISTS stream_tmp.analytics_access_log_kafka")
tableEnv.executeSql( """ |CREATE TABLE stream_tmp.analytics_access_log_kafka ( | ts BIGINT, | userId BIGINT, | eventType STRING, | fromType STRING, | columnType STRING, | siteId BIGINT, | grouponId BIGINT, | partnerId BIGINT, | merchandiseId BIGINT, | procTime AS PROCTIME(), | eventTime AS TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000,'yyyy-MM-dd HH:mm:ss')), | WATERMARK FOR eventTime AS eventTime - INTERVAL '15' SECOND |) WITH ( | 'connector' = 'kafka', | 'topic' = 'ods_analytics_access_log', | 'properties.bootstrap.servers' = 'kafka110:9092,kafka111:9092,kafka112:9092' | 'properties.group.id' = 'flink_hive_integration_exp_1', | 'scan.startup.mode' = 'latest-offset', | 'format' = 'json', | 'json.fail-on-missing-field' = 'false', | 'json.ignore-parse-errors' = 'true' |) """.stripMargin)

前面已經註冊了 HiveCatalog,故在 Hive 中能夠觀察到建立的 Kafka 流表的元數據(注意該表並無事實上的列)。

hive> DESCRIBE FORMATTED stream_tmp.analytics_access_log_kafka;OK# col_name data_type comment

# Detailed Table InformationDatabase: stream_tmpOwner: nullCreateTime: Wed Jul 15 18:25:09 CST 2020LastAccessTime: UNKNOWNProtect Mode: NoneRetention: 0Location: hdfs://sht-bdmq-cls/user/hive/warehouse/stream_tmp.db/analytics_access_log_kafkaTable Type: MANAGED_TABLETable Parameters: flink.connector kafka flink.format json flink.json.fail-on-missing-field false flink.json.ignore-parse-errors true flink.properties.bootstrap.servers kafka110:9092,kafka111:9092,kafka112:9092 flink.properties.group.id flink_hive_integration_exp_1 flink.scan.startup.mode latest-offset flink.schema.0.data-type BIGINT flink.schema.0.name ts flink.schema.1.data-type BIGINT flink.schema.1.name userId flink.schema.10.data-type TIMESTAMP(3) flink.schema.10.expr TO_TIMESTAMP(FROM_UNIXTIME(`ts` / 1000, 'yyyy-MM-dd HH:mm:ss')) flink.schema.10.name eventTime flink.schema.2.data-type VARCHAR(2147483647) flink.schema.2.name eventType # 略...... flink.schema.9.data-type TIMESTAMP(3) NOT NULL flink.schema.9.expr PROCTIME() flink.schema.9.name procTime flink.schema.watermark.0.rowtime eventTime flink.schema.watermark.0.strategy.data-type TIMESTAMP(3) flink.schema.watermark.0.strategy.expr `eventTime` - INTERVAL '15' SECOND flink.topic ods_analytics_access_log is_generic true transient_lastDdlTime 1594808709
# Storage InformationSerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDeInputFormat: org.apache.hadoop.mapred.TextInputFormatOutputFormat: org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormatCompressed: NoNum Buckets: -1Bucket Columns: []Sort Columns: []Storage Desc Params: serialization.format 1Time taken: 1.797 seconds, Fetched: 61 row(s)


建立 Hive 表bootstrap


Flink SQL 提供了兼容 HiveQL 風格的 DDL,指定 SqlDialect.HIVE 便可( DML 兼容還在開發中)。

爲了方便觀察結果,如下的表採用了天/小時/分鐘的三級分區,實際應用中能夠不用這樣細的粒度(10分鐘甚至1小時的分區可能更合適)。

tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS hive_tmp")tableEnv.executeSql("DROP TABLE IF EXISTS hive_tmp.analytics_access_log_hive")
tableEnv.executeSql( """ |CREATE TABLE hive_tmp.analytics_access_log_hive ( | ts BIGINT, | user_id BIGINT, | event_type STRING, | from_type STRING, | column_type STRING, | site_id BIGINT, | groupon_id BIGINT, | partner_id BIGINT, | merchandise_id BIGINT |) PARTITIONED BY ( | ts_date STRING, | ts_hour STRING, | ts_minute STRING |) STORED AS PARQUET |TBLPROPERTIES ( | 'sink.partition-commit.trigger' = 'partition-time', | 'sink.partition-commit.delay' = '1 min', | 'sink.partition-commit.policy.kind' = 'metastore,success-file', | 'partition.time-extractor.timestamp-pattern' = '$ts_date $ts_hour:$ts_minute:00' |) """.stripMargin)

Hive 表的參數複用了 SQL FileSystem Connector 的相關參數,與分區提交(Partition Commit)密切相關。僅就上面出現的4個參數簡單解釋一下。

  • sink.partition-commit.trigger :觸發分區提交的時間特徵。默認爲 processing-time,即處理時間,很顯然在有延遲的狀況下,可能會形成數據分區錯亂。因此這裏使用 partition-time,即按照分區時間戳(即分區內數據對應的事件時間)來提交。
  • partition.time-extractor.timestamp-pattern :分區時間戳的抽取格式。須要寫成 yyyy-MM-dd HH:mm:ss 的形式,並用 Hive 表中相應的分區字段作佔位符替換。顯然,Hive 表的分區字段值來自流表中定義好的事件時間,後面會看到。
  • sink.partition-commit.delay :觸發分區提交的延遲。在時間特徵設爲 partition-time 的狀況下,當水印時間戳大於分區建立時間加上此延遲時,分區纔會真正提交。此值最好與分區粒度相同,例如若 Hive 表按1小時分區,此參數可設爲 1 h,若按 10 分鐘分區,可設爲 10 min。
  • sink.partition-commit.policy.kind :分區提交策略,能夠理解爲使分區對下游可見的附加操做。 metastore 表示更新 Hive Metastore 中的表元數據, success-file 則表示在分區內建立 _SUCCESS 標記文件。

固然,SQL FileSystem Connector 的功能並不限於此,還有很大自定義的空間(如能夠自定義分區提交策略以合併小文件等)。具體可參見官方文檔。

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#streaming-sinkapi


流式寫入 Hive安全


注意將流表中的事件時間轉化爲 Hive 的分區。

tableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)tableEnv.executeSql( """ |INSERT INTO hive_tmp.analytics_access_log_hive |SELECT | ts,userId,eventType,fromType,columnType,siteId,grouponId,partnerId,merchandiseId, | DATE_FORMAT(eventTime,'yyyy-MM-dd'), | DATE_FORMAT(eventTime,'HH'), | DATE_FORMAT(eventTime,'mm') |FROM stream_tmp.analytics_access_log_kafka |WHERE merchandiseId > 0 """.stripMargin)

來觀察一下流式 Sink 的結果吧。


上文設定的 Checkpoint Interval 是 20 秒,能夠看到,上圖中的數據文件剛好是以 20 秒的間隔寫入的。因爲並行度爲 3,因此每次寫入會生成 3 個文件。分區內全部數據寫入完畢後,會同時生成 _SUCCESS 文件。若是是正在寫入的分區,則會看到 .inprogress 文件。

經過 Hive 查詢一下,肯定數據的時間無誤。

  
    
  
  
   
   
            
   
   

  
    
  
  
   
   
            
   
   

hive> SELECT from_unixtime(min(cast(ts / 1000 AS BIGINT))),from_unixtime(max(cast(ts / 1000 AS BIGINT))) > FROM hive_tmp.analytics_access_log_hive > WHERE ts_date = '2020-07-15' AND ts_hour = '23' AND ts_minute = '23';OK2020-07-15 23:23:00 2020-07-15 23:23:59Time taken: 1.115 seconds, Fetched: 1 row(s)

流式讀取 Hive微信


要將 Hive 表做爲流式 Source,須要啓用 Dynamic Table Options,並經過 Table Hints 來指定 Hive 數據流的參數。如下是簡單地經過 Hive 計算商品 PV 的例子。

  
    
  
  
   
   
            
   
   

  
    
  
  
   
   
            
   
   

tableEnv.getConfig.getConfiguration.setBoolean(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED, true)
val result = tableEnv.sqlQuery( """ |SELECT merchandise_id,count(1) AS pv |FROM hive_tmp.analytics_access_log_hive |/*+ OPTIONS( | 'streaming-source.enable' = 'true', | 'streaming-source.monitor-interval' = '1 min', | 'streaming-source.consume-start-offset' = '2020-07-15 23:30:00' |) */ |WHERE event_type = 'shtOpenGoodsDetail' |AND ts_date >= '2020-07-15' |GROUP BY merchandise_id |ORDER BY pv DESC LIMIT 10 """.stripMargin)
result.toRetractStream[Row].print().setParallelism(1)streamEnv.execute()
三個 Table Hint 參數的含義解釋以下。

  • streaming-source.enable:設爲 true,表示該 Hive 表能夠做爲 Source。
  • streaming-source.monitor-interval:感知 Hive 表新增數據的週期,以上設爲 1 分鐘。對於分區表而言,則是監控新分區的生成,以增量讀取數據。
  • streaming-source.consume-start-offset:開始消費的時間戳,一樣須要寫成 yyyy-MM-dd HH:mm:ss 的形式。

更加具體的說明仍然可參見官方文檔。

https://links.jianshu.com/go?to=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.11%2Fdev%2Ftable%2Fhive%2Fhive_streaming.html%23streaming-reading


最後,因爲 SQL 語句中有 ORDER BY 和 LIMIT 邏輯,因此須要調用 toRetractStream() 方法轉化爲回撤流,便可輸出結果。

The End


Flink 1.11 的 Hive Streaming 功能大大提升了 Hive 數倉的實時性,對 ETL 做業很是有利,同時還可以知足流式持續查詢的需求,具備必定的靈活性。

感興趣的同窗也能夠本身上手測試。

原文連接:
https://www.jianshu.com/p/fb7d29abfa14



  一個實踐機會  

Apache Flink 極客挑戰賽


萬衆矚目的第二屆 Apache Flink 極客挑戰賽來啦!本次大賽全面升級,重量級助陣嘉賓專業指導,強大的資源配置供你發揮創意,還有 30w 豐厚獎金等你帶走~聚焦  Flink 與 AI 技術的應用實踐,挑戰疫情防控的世界級難題,你準備好了麼?

(點擊圖片可瞭解更多大賽信息)

點擊「 閱讀 原文 」便可報名

本文分享自微信公衆號 - Flink 中文社區(gh_5efd76d10a8d)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。

相關文章
相關標籤/搜索