使用flink Table &Sql api來構建批量和流式應用(1)Table的基本概念

 從flink的官方文檔,咱們知道flink的編程模型分爲四層,sql層是最高層的api,Table api是中間層,DataStream/DataSet Api 是核心,stateful Streaming process層是底層實現。html

 

 

 

其中,java

flink dataset api使用及原理 介紹了DataSet Api sql

flink DataStream API使用及原理介紹了DataStream Api 數據庫

flink中的時間戳如何使用?---Watermark使用及原理 介紹了底層實現的基礎Watermarkapache

flink window實例分析 介紹了window的概念及使用原理編程

Flink中的狀態與容錯 介紹了State的概念及checkpoint,savepoint的容錯機制json

0. 基本概念:api

0.1 TableEnvironment性能優化

TableEnvironment是Table API和SQL集成的核心概念,它主要負責:數據結構

  一、在內部目錄Catalog中註冊一個Table
  二、註冊一個外部目錄Catalog
  三、執行SQL查詢
  四、註冊一個用戶自定義函數UDF
  五、將DataStream或者DataSet轉換成Table
  六、持有BatchTableEnvironment或者StreamTableEnvironment的引用
/**
 * The base class for batch and stream TableEnvironments.
 *
 * <p>The TableEnvironment is a central concept of the Table API and SQL integration. It is
 * responsible for:
 *
 * <ul>
 *     <li>Registering a Table in the internal catalog</li>
 *     <li>Registering an external catalog</li>
 *     <li>Executing SQL queries</li>
 *     <li>Registering a user-defined scalar function. For the user-defined table and aggregate
 *     function, use the StreamTableEnvironment or BatchTableEnvironment</li>
 * </ul>
 */

 

0.2 Catalog

Catalog:全部對數據庫和表的元數據信息都存放再Flink CataLog內部目錄結構中,其存放了flink內部全部與Table相關的元數據信息,包括表結構信息/數據源信息等。

/**
 * This interface is responsible for reading and writing metadata such as database/table/views/UDFs
 * from a registered catalog. It connects a registered catalog and Flink's Table API.
 */

其結構以下:

 0.3 TableSource

在使用Table API時,能夠將外部的數據源直接註冊成Table數據結構。此結構稱之爲TableSource

/**
 * Defines an external table with the schema that is provided by {@link TableSource#getTableSchema}.
 *
 * <p>The data of a {@link TableSource} is produced as a {@code DataSet} in case of a {@code BatchTableSource}
 * or as a {@code DataStream} in case of a {@code StreamTableSource}. The type of ths produced
 * {@code DataSet} or {@code DataStream} is specified by the {@link TableSource#getProducedDataType()} method.
 *
 * <p>By default, the fields of the {@link TableSchema} are implicitly mapped by name to the fields of
 * the produced {@link DataType}. An explicit mapping can be defined by implementing the
 * {@link DefinedFieldMapping} interface.
 *
 * @param <T> The return type of the {@link TableSource}.
 */

0.4 TableSink

數據處理完成後須要將結果寫入外部存儲中,在Table API中有對應的Sink模塊,此模塊爲TableSink

/**
 * A {@link TableSink} specifies how to emit a table to an external
 * system or location.
 *
 * <p>The interface is generic such that it can support different storage locations and formats.
 *
 * @param <T> The return type of the {@link TableSink}.
 */

0.5 Table Connector

在Flink1.6版本以後,爲了可以讓Table API經過配置化的方式鏈接外部系統,且同時能夠在sql client中使用,flink 提出了Table Connector的概念,主要目的時將Table Source和Table Sink的定義和使用分離。

經過Table Connector將不一樣內建的Table Source和TableSink封裝,造成能夠配置化的組件,在Table Api和Sql client可以同時使用。

    /**
     * Creates a table source and/or table sink from a descriptor.
     *
     * <p>Descriptors allow for declaring the communication to external systems in an
     * implementation-agnostic way. The classpath is scanned for suitable table factories that match
     * the desired configuration.
     *
     * <p>The following example shows how to read from a connector using a JSON format and
     * register a table source as "MyTable":
     *
     * <pre>
     * {@code
     *
     * tableEnv
     *   .connect(
     *     new ExternalSystemXYZ()
     *       .version("0.11"))
     *   .withFormat(
     *     new Json()
     *       .jsonSchema("{...}")
     *       .failOnMissingField(false))
     *   .withSchema(
     *     new Schema()
     *       .field("user-name", "VARCHAR").from("u_name")
     *       .field("count", "DECIMAL")
     *   .registerSource("MyTable");
     * }
     *</pre>
     *
     * @param connectorDescriptor connector descriptor describing the external system
     */
    TableDescriptor connect(ConnectorDescriptor connectorDescriptor);

 

 本篇主要聚焦於sql和Table Api。

 1.sql

1.1 基於DataSet api的sql

示例:

package org.apache.flink.table.examples.java;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;

/**
 * Simple example that shows how the Batch SQL API is used in Java.
 *
 * <p>This example shows how to:
 *  - Convert DataSets to Tables
 *  - Register a Table under a name
 *  - Run a SQL query on the registered Table
 */
public class WordCountSQL {

    // *************************************************************************
    //     PROGRAM
    // *************************************************************************

    public static void main(String[] args) throws Exception {

        // set up execution environment
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);

        DataSet<WC> input = env.fromElements(
            new WC("Hello", 1),
            new WC("Ciao", 1),
            new WC("Hello", 1));

        // register the DataSet as table "WordCount"
        tEnv.registerDataSet("WordCount", input, "word, frequency");

        // run a SQL query on the Table and retrieve the result as a new Table
        Table table = tEnv.sqlQuery(
            "SELECT word, SUM(frequency) as frequency FROM WordCount GROUP BY word");

        DataSet<WC> result = tEnv.toDataSet(table, WC.class);

        result.print();
    }

    // *************************************************************************
    //     USER DATA TYPES
    // *************************************************************************

    /**
     * Simple POJO containing a word and its respective count.
     */
    public static class WC {
        public String word;
        public long frequency;

        // public constructor to make it a Flink POJO
        public WC() {}

        public WC(String word, long frequency) {
            this.word = word;
            this.frequency = frequency;
        }

        @Override
        public String toString() {
            return "WC " + word + " " + frequency;
        }
    }
}

其中,BatchTableEnvironment

/**
 * The {@link TableEnvironment} for a Java batch {@link ExecutionEnvironment} that works
 * with {@link DataSet}s.
 *
 * <p>A TableEnvironment can be used to:
 * <ul>
 *     <li>convert a {@link DataSet} to a {@link Table}</li>
 *     <li>register a {@link DataSet} in the {@link TableEnvironment}'s catalog</li>
 *     <li>register a {@link Table} in the {@link TableEnvironment}'s catalog</li>
 *     <li>scan a registered table to obtain a {@link Table}</li>
 *     <li>specify a SQL query on registered tables to obtain a {@link Table}</li>
 *     <li>convert a {@link Table} into a {@link DataSet}</li>
 *     <li>explain the AST and execution plan of a {@link Table}</li>
 * </ul>
 */

BatchTableSource

/** Defines an external batch table and provides access to its data.
 *
 * @param <T> Type of the {@link DataSet} created by this {@link TableSource}.
 */

BatchTableSink

/** Defines an external {@link TableSink} to emit a batch {@link Table}.
 *
 * @param <T> Type of {@link DataSet} that this {@link TableSink} expects and supports.
 */

 

1.2 基於DataStream api的sql

示例代碼

package org.apache.flink.table.examples.java;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;

import java.util.Arrays;

/**
 * Simple example for demonstrating the use of SQL on a Stream Table in Java.
 *
 * <p>This example shows how to:
 *  - Convert DataStreams to Tables
 *  - Register a Table under a name
 *  - Run a StreamSQL query on the registered Table
 *
 */
public class StreamSQLExample {

    // *************************************************************************
    //     PROGRAM
    // *************************************************************************

    public static void main(String[] args) throws Exception {

        // set up execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        DataStream<Order> orderA = env.fromCollection(Arrays.asList(
            new Order(1L, "beer", 3),
            new Order(1L, "diaper", 4),
            new Order(3L, "rubber", 2)));

        DataStream<Order> orderB = env.fromCollection(Arrays.asList(
            new Order(2L, "pen", 3),
            new Order(2L, "rubber", 3),
            new Order(4L, "beer", 1)));

        // convert DataStream to Table
        Table tableA = tEnv.fromDataStream(orderA, "user, product, amount");
        // register DataStream as Table
        tEnv.registerDataStream("OrderB", orderB, "user, product, amount");

        // union the two tables
        Table result = tEnv.sqlQuery("SELECT * FROM " + tableA + " WHERE amount > 2 UNION ALL " +
                        "SELECT * FROM OrderB WHERE amount < 2");

        tEnv.toAppendStream(result, Order.class).print();

        env.execute();
    }

    // *************************************************************************
    //     USER DATA TYPES
    // *************************************************************************

    /**
     * Simple POJO.
     */
    public static class Order {
        public Long user;
        public String product;
        public int amount;

        public Order() {
        }

        public Order(Long user, String product, int amount) {
            this.user = user;
            this.product = product;
            this.amount = amount;
        }

        @Override
        public String toString() {
            return "Order{" +
                "user=" + user +
                ", product='" + product + '\'' +
                ", amount=" + amount +
                '}';
        }
    }
}

其中,StreamTableEnvironment

/**
 * The {@link TableEnvironment} for a Java {@link StreamExecutionEnvironment} that works with
 * {@link DataStream}s.
 *
 * <p>A TableEnvironment can be used to:
 * <ul>
 *     <li>convert a {@link DataStream} to a {@link Table}</li>
 *     <li>register a {@link DataStream} in the {@link TableEnvironment}'s catalog</li>
 *     <li>register a {@link Table} in the {@link TableEnvironment}'s catalog</li>
 *     <li>scan a registered table to obtain a {@link Table}</li>
 *     <li>specify a SQL query on registered tables to obtain a {@link Table}</li>
 *     <li>convert a {@link Table} into a {@link DataStream}</li>
 *     <li>explain the AST and execution plan of a {@link Table}</li>
 * </ul>
 */

 StreamTableSource

/** Defines an external stream table and provides read access to its data.
 *
 * @param <T> Type of the {@link DataStream} created by this {@link TableSource}.
 */

StreamTableSink

/**
 * Defines an external stream table and provides write access to its data.
 *
 * @param <T> Type of the {@link DataStream} created by this {@link TableSink}.
 */

 

2. table api

示例

package org.apache.flink.table.examples.java;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;

/**
 * Simple example for demonstrating the use of the Table API for a Word Count in Java.
 *
 * <p>This example shows how to:
 *  - Convert DataSets to Tables
 *  - Apply group, aggregate, select, and filter operations
 */
public class WordCountTable {

    // *************************************************************************
    //     PROGRAM
    // *************************************************************************

    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment();
        BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);

        DataSet<WC> input = env.fromElements(
                new WC("Hello", 1),
                new WC("Ciao", 1),
                new WC("Hello", 1));

        Table table = tEnv.fromDataSet(input);

        Table filtered = table
                .groupBy("word")
                .select("word, frequency.sum as frequency")
                .filter("frequency = 2");

        DataSet<WC> result = tEnv.toDataSet(filtered, WC.class);

        result.print();
    }

    // *************************************************************************
    //     USER DATA TYPES
    // *************************************************************************

    /**
     * Simple POJO containing a word and its respective count.
     */
    public static class WC {
        public String word;
        public long frequency;

        // public constructor to make it a Flink POJO
        public WC() {}

        public WC(String word, long frequency) {
            this.word = word;
            this.frequency = frequency;
        }

        @Override
        public String toString() {
            return "WC " + word + " " + frequency;
        }
    }
}

 3.數據轉換

  3.1 DataSet與Table相互轉換

    DataSet-->Table

      註冊方式:

          // register the DataSet as table "WordCount"
          tEnv.registerDataSet("WordCount", input, "word, frequency"); 
     轉換方式:
       Table table = tEnv.fromDataSet(input);

    Table-->DataSet

        DataSet<WC> result = tEnv.toDataSet(filtered, WC.class);

  3.2 DataStream與Table相互轉換

    DataStream-->Table

      註冊方式:

          tEnv.registerDataStream("OrderB", orderB, "user, product, amount");
     轉換方式:
       Table tableA = tEnv.fromDataStream(orderA, "user, product, amount");

    Table-->DataStream

        DataSet<WC> result = tEnv.toDataSet(filtered, WC.class);

 

參考資料

【1】https://ci.apache.org/projects/flink/flink-docs-release-1.8/concepts/programming-model.html

【2】Flink原理、實戰與性能優化

相關文章
相關標籤/搜索