7月7日,Flink 1.11 版本發佈,與 1.10 版本相比,1.11 版本最爲顯著的一個改進是 Hive Integration 顯著加強,也就是真正意義上實現了基於 Hive 的流批一體。php
本文用簡單的本地示例來體驗 Hive Streaming 的便利性並跟你們分享體驗的過程以及個人心得,但願對你們上手使用有所幫助。html
添加相關依賴web
<properties> <scala.bin.version>2.11</scala.bin.version> <flink.version>1.11.0</flink.version> <flink-shaded-hadoop.version>2.6.5-10.0</flink-shaded-hadoop.version> <hive.version>1.1.0</hive.version></properties>
<dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.bin.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.bin.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_${scala.bin.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_${scala.bin.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-hive_${scala.bin.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-sql-connector-kafka_${scala.bin.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-shaded-hadoop-2-uber</artifactId> <version>${flink-shaded-hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>${hive.version}</version> </dependency>
建立執行環境sql
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironmentstreamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)streamEnv.setParallelism(3)
val tableEnvSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build()val tableEnv = StreamTableEnvironment.create(streamEnv, tableEnvSettings)tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE)tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(20))
註冊 HiveCatalogapache
val catalogName = "my_catalog"val catalog = new HiveCatalog( catalogName, // catalog name "default", // default database "/Users/lmagic/develop", // Hive config (hive-site.xml) directory "1.1.0" // Hive version)tableEnv.registerCatalog(catalogName, catalog)tableEnv.useCatalog(catalogName)
建立 Kafka 流表json
tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS stream_tmp")tableEnv.executeSql("DROP TABLE IF EXISTS stream_tmp.analytics_access_log_kafka")
tableEnv.executeSql( """ |CREATE TABLE stream_tmp.analytics_access_log_kafka ( | ts BIGINT, | userId BIGINT, | eventType STRING, | fromType STRING, | columnType STRING, | siteId BIGINT, | grouponId BIGINT, | partnerId BIGINT, | merchandiseId BIGINT, | procTime AS PROCTIME(), | eventTime AS TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000,'yyyy-MM-dd HH:mm:ss')), | WATERMARK FOR eventTime AS eventTime - INTERVAL '15' SECOND |) WITH ( | 'connector' = 'kafka', | 'topic' = 'ods_analytics_access_log', | 'properties.bootstrap.servers' = 'kafka110:9092,kafka111:9092,kafka112:9092' | 'properties.group.id' = 'flink_hive_integration_exp_1', | 'scan.startup.mode' = 'latest-offset', | 'format' = 'json', | 'json.fail-on-missing-field' = 'false', | 'json.ignore-parse-errors' = 'true' |) """.stripMargin)
hive> DESCRIBE FORMATTED stream_tmp.analytics_access_log_kafka;OK# col_name data_type comment
# Detailed Table InformationDatabase: stream_tmpOwner: nullCreateTime: Wed Jul 15 18:25:09 CST 2020LastAccessTime: UNKNOWNProtect Mode: NoneRetention: 0Location: hdfs://sht-bdmq-cls/user/hive/warehouse/stream_tmp.db/analytics_access_log_kafkaTable Type: MANAGED_TABLETable Parameters: flink.connector kafka flink.format json flink.json.fail-on-missing-field false flink.json.ignore-parse-errors true flink.properties.bootstrap.servers kafka110:9092,kafka111:9092,kafka112:9092 flink.properties.group.id flink_hive_integration_exp_1 flink.scan.startup.mode latest-offset flink.schema.0.data-type BIGINT flink.schema.0.name ts flink.schema.1.data-type BIGINT flink.schema.1.name userId flink.schema.10.data-type TIMESTAMP(3) flink.schema.10.expr TO_TIMESTAMP(FROM_UNIXTIME(`ts` / 1000, 'yyyy-MM-dd HH:mm:ss')) flink.schema.10.name eventTime flink.schema.2.data-type VARCHAR(2147483647) flink.schema.2.name eventType # 略...... flink.schema.9.data-type TIMESTAMP(3) NOT NULL flink.schema.9.expr PROCTIME() flink.schema.9.name procTime flink.schema.watermark.0.rowtime eventTime flink.schema.watermark.0.strategy.data-type TIMESTAMP(3) flink.schema.watermark.0.strategy.expr `eventTime` - INTERVAL '15' SECOND flink.topic ods_analytics_access_log is_generic true transient_lastDdlTime 1594808709
# Storage InformationSerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDeInputFormat: org.apache.hadoop.mapred.TextInputFormatOutputFormat: org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormatCompressed: NoNum Buckets: -1Bucket Columns: []Sort Columns: []Storage Desc Params: serialization.format 1Time taken: 1.797 seconds, Fetched: 61 row(s)
建立 Hive 表bootstrap
tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS hive_tmp")tableEnv.executeSql("DROP TABLE IF EXISTS hive_tmp.analytics_access_log_hive")
tableEnv.executeSql( """ |CREATE TABLE hive_tmp.analytics_access_log_hive ( | ts BIGINT, | user_id BIGINT, | event_type STRING, | from_type STRING, | column_type STRING, | site_id BIGINT, | groupon_id BIGINT, | partner_id BIGINT, | merchandise_id BIGINT |) PARTITIONED BY ( | ts_date STRING, | ts_hour STRING, | ts_minute STRING |) STORED AS PARQUET |TBLPROPERTIES ( | 'sink.partition-commit.trigger' = 'partition-time', | 'sink.partition-commit.delay' = '1 min', | 'sink.partition-commit.policy.kind' = 'metastore,success-file', | 'partition.time-extractor.timestamp-pattern' = '$ts_date $ts_hour:$ts_minute:00' |) """.stripMargin)
-
sink.partition-commit.trigger :觸發分區提交的時間特徵。默認爲 processing-time,即處理時間,很顯然在有延遲的狀況下,可能會形成數據分區錯亂。因此這裏使用 partition-time,即按照分區時間戳(即分區內數據對應的事件時間)來提交。 -
partition.time-extractor.timestamp-pattern :分區時間戳的抽取格式。須要寫成 yyyy-MM-dd HH:mm:ss 的形式,並用 Hive 表中相應的分區字段作佔位符替換。顯然,Hive 表的分區字段值來自流表中定義好的事件時間,後面會看到。 -
sink.partition-commit.delay :觸發分區提交的延遲。在時間特徵設爲 partition-time 的狀況下,當水印時間戳大於分區建立時間加上此延遲時,分區纔會真正提交。此值最好與分區粒度相同,例如若 Hive 表按1小時分區,此參數可設爲 1 h,若按 10 分鐘分區,可設爲 10 min。 -
sink.partition-commit.policy.kind :分區提交策略,能夠理解爲使分區對下游可見的附加操做。 metastore 表示更新 Hive Metastore 中的表元數據, success-file 則表示在分區內建立 _SUCCESS 標記文件。
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#streaming-sinkapi
流式寫入 Hive安全
tableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)tableEnv.executeSql( """ |INSERT INTO hive_tmp.analytics_access_log_hive |SELECT | ts,userId,eventType,fromType,columnType,siteId,grouponId,partnerId,merchandiseId, | DATE_FORMAT(eventTime,'yyyy-MM-dd'), | DATE_FORMAT(eventTime,'HH'), | DATE_FORMAT(eventTime,'mm') |FROM stream_tmp.analytics_access_log_kafka |WHERE merchandiseId > 0 """.stripMargin)
hive> SELECT from_unixtime(min(cast(ts / 1000 AS BIGINT))),from_unixtime(max(cast(ts / 1000 AS BIGINT))) > FROM hive_tmp.analytics_access_log_hive > WHERE ts_date = '2020-07-15' AND ts_hour = '23' AND ts_minute = '23';OK2020-07-15 23:23:00 2020-07-15 23:23:59Time taken: 1.115 seconds, Fetched: 1 row(s)
流式讀取 Hive微信
tableEnv.getConfig.getConfiguration.setBoolean(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED, true)
val result = tableEnv.sqlQuery( """ |SELECT merchandise_id,count(1) AS pv |FROM hive_tmp.analytics_access_log_hive |/*+ OPTIONS( | 'streaming-source.enable' = 'true', | 'streaming-source.monitor-interval' = '1 min', | 'streaming-source.consume-start-offset' = '2020-07-15 23:30:00' |) */ |WHERE event_type = 'shtOpenGoodsDetail' |AND ts_date >= '2020-07-15' |GROUP BY merchandise_id |ORDER BY pv DESC LIMIT 10 """.stripMargin)
result.toRetractStream[Row].print().setParallelism(1)streamEnv.execute()
-
streaming-source.enable:設爲 true,表示該 Hive 表能夠做爲 Source。 -
streaming-source.monitor-interval:感知 Hive 表新增數據的週期,以上設爲 1 分鐘。對於分區表而言,則是監控新分區的生成,以增量讀取數據。 -
streaming-source.consume-start-offset:開始消費的時間戳,一樣須要寫成 yyyy-MM-dd HH:mm:ss 的形式。
https://links.jianshu.com/go?to=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.11%2Fdev%2Ftable%2Fhive%2Fhive_streaming.html%23streaming-reading
The End
一個實踐機會
Apache Flink 極客挑戰賽
本文分享自微信公衆號 - Flink 中文社區(gh_5efd76d10a8d)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。