Apache Flink 零基礎入門(十八)Flink Table API&SQL

什麼是Flink關係型API?

雖然Flink已經支持了DataSet和DataStream API,可是有沒有一種更好的方式去編程,而不用關心具體的API實現?不須要去了解Java和Scala的具體實現。java

Flink provides three layered APIs. Each API offers a different trade-off between conciseness and expressiveness and targets different use cases.sql

Flink提供了三層API,每一層API提供了一個在簡潔性和表達力之間的權衡 。express

最低層是一個有狀態的事件驅動。在這一層進行開發是很是麻煩的。apache

雖然不少功能基於DataSet和DataStreamAPI是能夠完成的,須要熟悉這兩套API,並且必需要熟悉Java和Scala,這是有必定的難度的。一個框架若是在使用的過程當中無法使用SQL來處理,那麼這個框架就有很大的限制。雖然對於開發人員無所謂,可是對於用戶來講卻不顯示。所以SQL是很是面向大衆語言。編程

比如MapReduce使用Hive SQL,Spark使用Spark SQL,Flink使用Flink SQL。api

雖然Flink支持批處理/流處理,那麼如何作到API層面的統一?框架

這樣Table和SQL應運而生。ide

這其實就是一個關係型API,操做起來如同操做Mysql同樣簡單。ui

Apache Flink features two relational APIs - the Table API and SQL - for unified stream and batch processing. The Table API is a language-integrated query API for Scala and Java that allows the composition of queries from relational operators such as selection, filter, and join in a very intuitive way. spa

Apache Flink經過使用Table API和SQL 兩大特性,來統一批處理和流處理。 Table API是一個查詢API,集成了Scala和Java語言,而且容許使用select filter join等操做。

使用Table SQL API須要額外依賴

java:

<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

scala:

<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

使用Table SQL API編程

首先導入上面的依賴,而後讀取sales.csv文件,文件內容以下:

transactionId,customerId,itemId,amountPaid
111,1,1,100.0
112,2,2,505.0
113,1,3,510.0
114,2,4,600.0
115,3,2,500.0
116,4,2,500.0
117,1,2,500.0
118,1,2,500.0
119,1,3,500.0
120,1,2,500.0
121,2,4,500.0
122,1,2,500.0
123,1,4,500.0
124,1,2,500.0

Scala

object TableSQLAPI {

  def main(args: Array[String]): Unit = {
    val bEnv = ExecutionEnvironment.getExecutionEnvironment
    val bTableEnv = BatchTableEnvironment.create(bEnv)
    val filePath="E:/test/sales.csv"
    // 已經拿到DataSet
    val csv = bEnv.readCsvFile[SalesLog](filePath,ignoreFirstLine = true)
    // DataSet => Table
  }

  case class SalesLog(transactionId:String,customerId:String,itemId:String,amountPaid:Double
                     )
}

首先拿到DataSet,接下來將DataSet轉爲Table,而後就能夠執行SQL了

// DataSet => Table
    val salesTable = bTableEnv.fromDataSet(csv)
    // 註冊成Table  Table => table
    bTableEnv.registerTable("sales", salesTable)
    // sql
    val resultTable = bTableEnv.sqlQuery("select customerId, sum(amountPaid) money from sales group by customerId")
    bTableEnv.toDataSet[Row](resultTable).print()

輸出結果以下:

4,500.0
3,500.0
1,4110.0
2,1605.0

這種方式只須要使用SQL就能夠實現以前寫mapreduce的功能。大大方便了開發過程。

Java

package com.vincent.course06;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.types.Row;

public class JavaTableSQLAPI {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment();
        BatchTableEnvironment bTableEnv = BatchTableEnvironment.create(bEnv);
        DataSource<Sales> salesDataSource = bEnv.readCsvFile("E:/test/sales.csv").ignoreFirstLine().
                pojoType(Sales.class, "transactionId", "customerId", "itemId", "amountPaid");
        Table sales = bTableEnv.fromDataSet(salesDataSource);
        bTableEnv.registerTable("sales", sales);
        Table resultTable = bTableEnv.sqlQuery("select customerId, sum(amountPaid) money from sales group by customerId");
        DataSet<Row> rowDataSet = bTableEnv.toDataSet(resultTable, Row.class);
        rowDataSet.print();
    }

    public static class Sales {
        public String transactionId;
        public String customerId;
        public String itemId;
        public Double amountPaid;

        @Override
        public String toString() {
            return "Sales{" +
                    "transactionId='" + transactionId + '\'' +
                    ", customerId='" + customerId + '\'' +
                    ", itemId='" + itemId + '\'' +
                    ", amountPaid=" + amountPaid +
                    '}';
        }
    }
}
相關文章
相關標籤/搜索