1. 添加依賴
<properties> <flink.version>1.11.2</flink.version> <scala.version>2.11</scala.version> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.11_${scala.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_${scala.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_${scala.version}</artifactId> <version>${flink.version}</version> </dependency> <!-- 添加flink table api 集成Hive的依賴--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-hive_${scala.version}</artifactId> <version>${flink.version}</version> </dependency> <!-- Hive Dependency --> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>2.1.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-shaded-hadoop-2-uber</artifactId> <version>2.6.5-7.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch6_${scala.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-csv</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.4.0</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-annotations</artifactId> <version>2.4.0</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-core</artifactId> <version>2.4.0</version> </dependency> </dependencies>
2. 建立blink版本的批處理Table執行環境
EnvironmentSettings bbSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inBatchMode() .build(); TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);
- 通過實際測試,目前HiveTableSink 不支持流式寫入(未實現 AppendStreamTableSink),必須是批處理環境才能夠往hive裏面寫入數據,而不能將流式數據寫入hive。例如將kafka建立一張臨時表,而後將表中的數據流持續插入hive,這是不能夠的,官網上1.11版本經過flink sql-client能夠實現hive的流式寫入,還有待驗證。
3. 鏈接文件系統,建立hive catalog,對錶進行操做,相似於Spark on Hive,flink能夠直接獲取Hive的元數據,並使用flink進行計算。
// 鏈接外部文件 bbTableEnv.connect(new FileSystem().path("file:///E:/d.txt")) .withFormat(new Csv().fieldDelimiter(',')) .withSchema(new Schema().field("id", DataTypes.STRING())) .createTemporaryTable("output"); // 設置 hive 方言 bbTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); // 獲取hive-site.xml目錄 String hiveConfDir = Thread.currentThread().getContextClassLoader().getResource("").getPath().substring(1); HiveCatalog hive = new HiveCatalog("hive", "warningplatform", hiveConfDir); bbTableEnv.registerCatalog("hive", hive); bbTableEnv.useCatalog("hive"); bbTableEnv.useDatabase("warningplatform"); bbTableEnv.executeSql("insert into test select id from default_catalog.default_database.output");
-
經過bbTableEnv.connect()去建立臨時表的方式已通過時了,建議使用bbTableEnv.executeSql()的方式,經過DDL去建立臨時表,臨時表究竟是屬於哪個catalog目前還不太肯定,究竟是什麼規則目前還不清楚。 查資料得知,臨時表與單個Flink會話的生命週期相關,臨時表始終存儲在內存中。 永久表須要一個catalog來管理表對應的元數據,好比hive metastore,該表將一直存在,直到明確刪除該表爲止。 所以猜想:default_catalog是存儲在內存中,若是在切換成hive catalog以前建立臨時表,那咱們就能夠使用default_catalog.default_database.tableName來獲取這個臨時表。 若是切換了catalog再去建立臨時表,那咱們就沒法獲取到臨時表了,由於它不在default_catalog中,並且保存在內存裏面,直接查詢臨時表會去當前的catalog裏面去查找臨時表,所以必定要在default_catalog 裏面建立臨時表。 而臨時視圖好像是存儲在當前的catalog裏面java
-
經過bbTableEnv.createTemporaryView()建立的視圖則是屬於當前的database的sql
bbTableEnv.createTemporaryView("output",bbTableEnv.sqlQuery("select * from default_catalog.default_database.output"));
-
注意1.11版本的執行sql的方法發生了改變,經過執行環境的executeSql(),executeInsert()等來進行插入或者執行sql語句apache