從flink的官方文檔,咱們知道flink的編程模型分爲四層,sql層是最高層的api,Table api是中間層,DataStream/DataSet Api 是核心,stateful Streaming process層是底層實現。html
其中,java
flink dataset api使用及原理 介紹了DataSet Api sql
flink DataStream API使用及原理介紹了DataStream Api 數據庫
flink中的時間戳如何使用?---Watermark使用及原理 介紹了底層實現的基礎Watermarkapache
flink window實例分析 介紹了window的概念及使用原理編程
Flink中的狀態與容錯 介紹了State的概念及checkpoint,savepoint的容錯機制json
0. 基本概念:api
0.1 TableEnvironment性能優化
TableEnvironment是Table API和SQL集成的核心概念,它主要負責:數據結構
/** * The base class for batch and stream TableEnvironments. * * <p>The TableEnvironment is a central concept of the Table API and SQL integration. It is * responsible for: * * <ul> * <li>Registering a Table in the internal catalog</li> * <li>Registering an external catalog</li> * <li>Executing SQL queries</li> * <li>Registering a user-defined scalar function. For the user-defined table and aggregate * function, use the StreamTableEnvironment or BatchTableEnvironment</li> * </ul> */
0.2 Catalog
Catalog:全部對數據庫和表的元數據信息都存放再Flink CataLog內部目錄結構中,其存放了flink內部全部與Table相關的元數據信息,包括表結構信息/數據源信息等。
/** * This interface is responsible for reading and writing metadata such as database/table/views/UDFs * from a registered catalog. It connects a registered catalog and Flink's Table API. */
其結構以下:
0.3 TableSource
在使用Table API時,能夠將外部的數據源直接註冊成Table數據結構。此結構稱之爲TableSource
/** * Defines an external table with the schema that is provided by {@link TableSource#getTableSchema}. * * <p>The data of a {@link TableSource} is produced as a {@code DataSet} in case of a {@code BatchTableSource} * or as a {@code DataStream} in case of a {@code StreamTableSource}. The type of ths produced * {@code DataSet} or {@code DataStream} is specified by the {@link TableSource#getProducedDataType()} method. * * <p>By default, the fields of the {@link TableSchema} are implicitly mapped by name to the fields of * the produced {@link DataType}. An explicit mapping can be defined by implementing the * {@link DefinedFieldMapping} interface. * * @param <T> The return type of the {@link TableSource}. */
0.4 TableSink
數據處理完成後須要將結果寫入外部存儲中,在Table API中有對應的Sink模塊,此模塊爲TableSink
/** * A {@link TableSink} specifies how to emit a table to an external * system or location. * * <p>The interface is generic such that it can support different storage locations and formats. * * @param <T> The return type of the {@link TableSink}. */
0.5 Table Connector
在Flink1.6版本以後,爲了可以讓Table API經過配置化的方式鏈接外部系統,且同時能夠在sql client中使用,flink 提出了Table Connector的概念,主要目的時將Table Source和Table Sink的定義和使用分離。
經過Table Connector將不一樣內建的Table Source和TableSink封裝,造成能夠配置化的組件,在Table Api和Sql client可以同時使用。
/** * Creates a table source and/or table sink from a descriptor. * * <p>Descriptors allow for declaring the communication to external systems in an * implementation-agnostic way. The classpath is scanned for suitable table factories that match * the desired configuration. * * <p>The following example shows how to read from a connector using a JSON format and * register a table source as "MyTable": * * <pre> * {@code * * tableEnv * .connect( * new ExternalSystemXYZ() * .version("0.11")) * .withFormat( * new Json() * .jsonSchema("{...}") * .failOnMissingField(false)) * .withSchema( * new Schema() * .field("user-name", "VARCHAR").from("u_name") * .field("count", "DECIMAL") * .registerSource("MyTable"); * } *</pre> * * @param connectorDescriptor connector descriptor describing the external system */ TableDescriptor connect(ConnectorDescriptor connectorDescriptor);
本篇主要聚焦於sql和Table Api。
1.sql
1.1 基於DataSet api的sql
示例:
package org.apache.flink.table.examples.java; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.BatchTableEnvironment; /** * Simple example that shows how the Batch SQL API is used in Java. * * <p>This example shows how to: * - Convert DataSets to Tables * - Register a Table under a name * - Run a SQL query on the registered Table */ public class WordCountSQL { // ************************************************************************* // PROGRAM // ************************************************************************* public static void main(String[] args) throws Exception { // set up execution environment ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tEnv = BatchTableEnvironment.create(env); DataSet<WC> input = env.fromElements( new WC("Hello", 1), new WC("Ciao", 1), new WC("Hello", 1)); // register the DataSet as table "WordCount" tEnv.registerDataSet("WordCount", input, "word, frequency"); // run a SQL query on the Table and retrieve the result as a new Table Table table = tEnv.sqlQuery( "SELECT word, SUM(frequency) as frequency FROM WordCount GROUP BY word"); DataSet<WC> result = tEnv.toDataSet(table, WC.class); result.print(); } // ************************************************************************* // USER DATA TYPES // ************************************************************************* /** * Simple POJO containing a word and its respective count. */ public static class WC { public String word; public long frequency; // public constructor to make it a Flink POJO public WC() {} public WC(String word, long frequency) { this.word = word; this.frequency = frequency; } @Override public String toString() { return "WC " + word + " " + frequency; } } }
其中,BatchTableEnvironment
/** * The {@link TableEnvironment} for a Java batch {@link ExecutionEnvironment} that works * with {@link DataSet}s. * * <p>A TableEnvironment can be used to: * <ul> * <li>convert a {@link DataSet} to a {@link Table}</li> * <li>register a {@link DataSet} in the {@link TableEnvironment}'s catalog</li> * <li>register a {@link Table} in the {@link TableEnvironment}'s catalog</li> * <li>scan a registered table to obtain a {@link Table}</li> * <li>specify a SQL query on registered tables to obtain a {@link Table}</li> * <li>convert a {@link Table} into a {@link DataSet}</li> * <li>explain the AST and execution plan of a {@link Table}</li> * </ul> */
BatchTableSource
/** Defines an external batch table and provides access to its data. * * @param <T> Type of the {@link DataSet} created by this {@link TableSource}. */
BatchTableSink
/** Defines an external {@link TableSink} to emit a batch {@link Table}. * * @param <T> Type of {@link DataSet} that this {@link TableSink} expects and supports. */
1.2 基於DataStream api的sql
示例代碼
package org.apache.flink.table.examples.java; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.StreamTableEnvironment; import java.util.Arrays; /** * Simple example for demonstrating the use of SQL on a Stream Table in Java. * * <p>This example shows how to: * - Convert DataStreams to Tables * - Register a Table under a name * - Run a StreamSQL query on the registered Table * */ public class StreamSQLExample { // ************************************************************************* // PROGRAM // ************************************************************************* public static void main(String[] args) throws Exception { // set up execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); DataStream<Order> orderA = env.fromCollection(Arrays.asList( new Order(1L, "beer", 3), new Order(1L, "diaper", 4), new Order(3L, "rubber", 2))); DataStream<Order> orderB = env.fromCollection(Arrays.asList( new Order(2L, "pen", 3), new Order(2L, "rubber", 3), new Order(4L, "beer", 1))); // convert DataStream to Table Table tableA = tEnv.fromDataStream(orderA, "user, product, amount"); // register DataStream as Table tEnv.registerDataStream("OrderB", orderB, "user, product, amount"); // union the two tables Table result = tEnv.sqlQuery("SELECT * FROM " + tableA + " WHERE amount > 2 UNION ALL " + "SELECT * FROM OrderB WHERE amount < 2"); tEnv.toAppendStream(result, Order.class).print(); env.execute(); } // ************************************************************************* // USER DATA TYPES // ************************************************************************* /** * Simple POJO. */ public static class Order { public Long user; public String product; public int amount; public Order() { } public Order(Long user, String product, int amount) { this.user = user; this.product = product; this.amount = amount; } @Override public String toString() { return "Order{" + "user=" + user + ", product='" + product + '\'' + ", amount=" + amount + '}'; } } }
其中,StreamTableEnvironment
/** * The {@link TableEnvironment} for a Java {@link StreamExecutionEnvironment} that works with * {@link DataStream}s. * * <p>A TableEnvironment can be used to: * <ul> * <li>convert a {@link DataStream} to a {@link Table}</li> * <li>register a {@link DataStream} in the {@link TableEnvironment}'s catalog</li> * <li>register a {@link Table} in the {@link TableEnvironment}'s catalog</li> * <li>scan a registered table to obtain a {@link Table}</li> * <li>specify a SQL query on registered tables to obtain a {@link Table}</li> * <li>convert a {@link Table} into a {@link DataStream}</li> * <li>explain the AST and execution plan of a {@link Table}</li> * </ul> */
StreamTableSource
/** Defines an external stream table and provides read access to its data. * * @param <T> Type of the {@link DataStream} created by this {@link TableSource}. */
StreamTableSink
/** * Defines an external stream table and provides write access to its data. * * @param <T> Type of the {@link DataStream} created by this {@link TableSink}. */
2. table api
示例
package org.apache.flink.table.examples.java; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.BatchTableEnvironment; /** * Simple example for demonstrating the use of the Table API for a Word Count in Java. * * <p>This example shows how to: * - Convert DataSets to Tables * - Apply group, aggregate, select, and filter operations */ public class WordCountTable { // ************************************************************************* // PROGRAM // ************************************************************************* public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment(); BatchTableEnvironment tEnv = BatchTableEnvironment.create(env); DataSet<WC> input = env.fromElements( new WC("Hello", 1), new WC("Ciao", 1), new WC("Hello", 1)); Table table = tEnv.fromDataSet(input); Table filtered = table .groupBy("word") .select("word, frequency.sum as frequency") .filter("frequency = 2"); DataSet<WC> result = tEnv.toDataSet(filtered, WC.class); result.print(); } // ************************************************************************* // USER DATA TYPES // ************************************************************************* /** * Simple POJO containing a word and its respective count. */ public static class WC { public String word; public long frequency; // public constructor to make it a Flink POJO public WC() {} public WC(String word, long frequency) { this.word = word; this.frequency = frequency; } @Override public String toString() { return "WC " + word + " " + frequency; } } }
3.數據轉換
3.1 DataSet與Table相互轉換
DataSet-->Table
註冊方式:
// register the DataSet as table "WordCount" tEnv.registerDataSet("WordCount", input, "word, frequency");
轉換方式:
Table table = tEnv.fromDataSet(input);
Table-->DataSet
DataSet<WC> result = tEnv.toDataSet(filtered, WC.class);
3.2 DataStream與Table相互轉換
DataStream-->Table
註冊方式:
tEnv.registerDataStream("OrderB", orderB, "user, product, amount");
轉換方式:
Table tableA = tEnv.fromDataStream(orderA, "user, product, amount");
Table-->DataStream
DataSet<WC> result = tEnv.toDataSet(filtered, WC.class);
參考資料
【1】https://ci.apache.org/projects/flink/flink-docs-release-1.8/concepts/programming-model.html
【2】Flink原理、實戰與性能優化