Flink SQL 鏈接Hive並寫入/讀取數據

1. 添加依賴

<properties>
        <flink.version>1.11.2</flink.version>
        <scala.version>2.11</scala.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.11_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!--       添加flink table api 集成Hive的依賴-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-hive_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- Hive Dependency -->
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-exec</artifactId>
            <version>2.1.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-shaded-hadoop-2-uber</artifactId>
            <version>2.6.5-7.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-elasticsearch6_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-csv</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.4.0</version>
        </dependency>

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-annotations</artifactId>
            <version>2.4.0</version>
        </dependency>

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>2.4.0</version>
        </dependency>
    </dependencies>

2. 建立blink版本的批處理Table執行環境

EnvironmentSettings bbSettings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inBatchMode()
                .build();
TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);
  • 通過實際測試,目前HiveTableSink 不支持流式寫入(未實現 AppendStreamTableSink),必須是批處理環境才能夠往hive裏面寫入數據,而不能將流式數據寫入hive。例如將kafka建立一張臨時表,而後將表中的數據流持續插入hive,這是不能夠的,官網上1.11版本經過flink sql-client能夠實現hive的流式寫入,還有待驗證。

3. 鏈接文件系統,建立hive catalog,對錶進行操做,相似於Spark on Hive,flink能夠直接獲取Hive的元數據,並使用flink進行計算。

// 鏈接外部文件
        bbTableEnv.connect(new FileSystem().path("file:///E:/d.txt"))
                .withFormat(new Csv().fieldDelimiter(','))
                .withSchema(new Schema().field("id", DataTypes.STRING()))
                .createTemporaryTable("output");

        // 設置 hive 方言
        bbTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
        // 獲取hive-site.xml目錄
        String hiveConfDir = Thread.currentThread().getContextClassLoader().getResource("").getPath().substring(1);
        HiveCatalog hive = new HiveCatalog("hive", "warningplatform", hiveConfDir);
        bbTableEnv.registerCatalog("hive", hive);

        bbTableEnv.useCatalog("hive");
        bbTableEnv.useDatabase("warningplatform");

        bbTableEnv.executeSql("insert into  test select id from    default_catalog.default_database.output");
  • 經過bbTableEnv.connect()去建立臨時表的方式已通過時了,建議使用bbTableEnv.executeSql()的方式,經過DDL去建立臨時表,臨時表究竟是屬於哪個catalog目前還不太肯定,究竟是什麼規則目前還不清楚。 查資料得知,臨時表與單個Flink會話的生命週期相關,臨時表始終存儲在內存中。 永久表須要一個catalog來管理表對應的元數據,好比hive metastore,該表將一直存在,直到明確刪除該表爲止。 所以猜想:default_catalog是存儲在內存中,若是在切換成hive catalog以前建立臨時表,那咱們就能夠使用default_catalog.default_database.tableName來獲取這個臨時表。 若是切換了catalog再去建立臨時表,那咱們就沒法獲取到臨時表了,由於它不在default_catalog中,並且保存在內存裏面,直接查詢臨時表會去當前的catalog裏面去查找臨時表,所以必定要在default_catalog 裏面建立臨時表。 而臨時視圖好像是存儲在當前的catalog裏面java

  • 經過bbTableEnv.createTemporaryView()建立的視圖則是屬於當前的database的sql

    bbTableEnv.createTemporaryView("output",bbTableEnv.sqlQuery("select * from default_catalog.default_database.output"));
  • 注意1.11版本的執行sql的方法發生了改變,經過執行環境的executeSql(),executeInsert()等來進行插入或者執行sql語句apache

相關文章
相關標籤/搜索