從flink的官方文檔,咱們知道flink的編程模型分爲四層,sql層是最高層的api,Table api是中間層,DataStream/DataSet Api 是核心,stateful Streaming process層是底層實現。html
其中,java
flink dataset api使用及原理 介紹了DataSet Api sql
flink DataStream API使用及原理介紹了DataStream Api express
flink中的時間戳如何使用?---Watermark使用及原理 介紹了底層實現的基礎Watermarkapache
flink window實例分析 介紹了window的概念及使用原理編程
Flink中的狀態與容錯 介紹了State的概念及checkpoint,savepoint的容錯機制api
上上篇<使用flink Table &Sql api來構建批量和流式應用(1)Table的基本概念>介紹了Table的基本概念及使用方法框架
上篇<使用flink Table &Sql api來構建批量和流式應用(2)Table API概述>ide
本篇主要看看Flink Sql 有哪些功能及背後的原理oop
1. sql功能
體如今org.apache.flink.table.api.TableEnvironment,目前flink僅支持select和insert操做
(1) select
/** * Evaluates a SQL query on registered tables and retrieves the result as a {@link Table}. * * <p>All tables referenced by the query must be registered in the TableEnvironment. * A {@link Table} is automatically registered when its {@link Table#toString()} method is * called, for example when it is embedded into a String. * Hence, SQL queries can directly reference a {@link Table} as follows: * * <pre> * {@code * Table table = ...; * String tableName = table.toString(); * // the table is not registered to the table environment * tEnv.sqlQuery("SELECT * FROM tableName"); * } * </pre> * * @param query The SQL query to evaluate. * @return The result of the query as Table */ Table sqlQuery(String query);
(2) update(當前僅支持insert)
/** * Evaluates a SQL statement such as INSERT, UPDATE or DELETE; or a DDL statement; * NOTE: Currently only SQL INSERT statements are supported. * * <p>All tables referenced by the query must be registered in the TableEnvironment. * A {@link Table} is automatically registered when its {@link Table#toString()} method is * called, for example when it is embedded into a String. * Hence, SQL queries can directly reference a {@link Table} as follows: * * <pre> * {@code * // register the configured table sink into which the result is inserted. * tEnv.registerTableSink("sinkTable", configuredSink); * Table sourceTable = ... * String tableName = sourceTable.toString(); * // sourceTable is not registered to the table environment * tEnv.sqlUpdate(s"INSERT INTO sinkTable SELECT * FROM tableName"); * } * </pre> * * @param stmt The SQL statement to evaluate. */ void sqlUpdate(String stmt); /** * Evaluates a SQL statement such as INSERT, UPDATE or DELETE; or a DDL statement; * NOTE: Currently only SQL INSERT statements are supported. * * <p>All tables referenced by the query must be registered in the TableEnvironment. * A {@link Table} is automatically registered when its {@link Table#toString()} method is * called, for example when it is embedded into a String. * Hence, SQL queries can directly reference a {@link Table} as follows: * * <pre> * {@code * // register the configured table sink into which the result is inserted. * tEnv.registerTableSink("sinkTable", configuredSink); * Table sourceTable = ... * String tableName = sourceTable.toString(); * // sourceTable is not registered to the table environment * tEnv.sqlUpdate(s"INSERT INTO sinkTable SELECT * FROM tableName", config); * } * </pre> * * @param stmt The SQL statement to evaluate. * @param config The {@link QueryConfig} to use. */ void sqlUpdate(String stmt, QueryConfig config);
2. sql解析原理
Apache Calcite面向Hadoop新的sql引擎,它提供了標準的SQL語言、多種查詢優化和鏈接各類數據源的能力。除此以外,Calcite還提供了OLAP和流處理的查詢引擎。它2013年成爲了Apache孵化項目以來,在Hadoop中愈來愈引人注目,並被衆多項目集成。好比Flink/Storm/Drill/Phoenix都依賴它作sql解析和優化。
先從demo跑起來,看看sql 解析都經歷了什麼工程?
(1) select
package org.apache.flink.table.examples.java; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.StreamTableEnvironment; import java.util.Arrays; /** * Simple example for demonstrating the use of SQL on a Stream Table in Java. * * <p>This example shows how to: * - Convert DataStreams to Tables * - Register a Table under a name * - Run a StreamSQL query on the registered Table * */ public class StreamSQLExample { // ************************************************************************* // PROGRAM // ************************************************************************* public static void main(String[] args) throws Exception { // set up execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); DataStream<Order> orderA = env.fromCollection(Arrays.asList( new Order(1L, "beer", 3), new Order(1L, "diaper", 4), new Order(3L, "rubber", 2))); DataStream<Order> orderB = env.fromCollection(Arrays.asList( new Order(2L, "pen", 3), new Order(2L, "rubber", 3), new Order(4L, "beer", 1))); // convert DataStream to Table Table tableA = tEnv.fromDataStream(orderA, "user, product, amount"); // register DataStream as Table tEnv.registerDataStream("OrderB", orderB, "user, product, amount"); // union the two tables Table result = tEnv.sqlQuery("SELECT * FROM " + tableA + " WHERE amount > 2 UNION ALL " + "SELECT * FROM OrderB WHERE amount < 2"); tEnv.toAppendStream(result, Order.class).print(); env.execute(); } // ************************************************************************* // USER DATA TYPES // ************************************************************************* /** * Simple POJO. */ public static class Order { public Long user; public String product; public int amount; public Order() { } public Order(Long user, String product, int amount) { this.user = user; this.product = product; this.amount = amount; } @Override public String toString() { return "Order{" + "user=" + user + ", product='" + product + '\'' + ", amount=" + amount + '}'; } } }
實現代碼以下
override def sqlQuery(query: String): Table = { val planner = getFlinkPlanner // parse the sql query val parsed = planner.parse(query) if (null != parsed && parsed.getKind.belongsTo(SqlKind.QUERY)) { // validate the sql query val validated = planner.validate(parsed) // transform to a relational tree val relational = planner.rel(validated) new TableImpl(this, new PlannerQueryOperation(relational.rel)) } else { throw new TableException( "Unsupported SQL query! sqlQuery() only accepts SQL queries of type " + "SELECT, UNION, INTERSECT, EXCEPT, VALUES, and ORDER_BY.") } }
>>parse the sql query
在calcite中用SqlNode表示
public SqlSelect(SqlParserPos pos, SqlNodeList keywordList, SqlNodeList selectList, SqlNode from, SqlNode where, SqlNodeList groupBy, SqlNode having, SqlNodeList windowDecls, SqlNodeList orderBy, SqlNode offset, SqlNode fetch) { super(pos); this.keywordList = Objects.requireNonNull(keywordList != null ? keywordList : new SqlNodeList(pos)); this.selectList = selectList; this.from = from; this.where = where; this.groupBy = groupBy; this.having = having; this.windowDecls = Objects.requireNonNull(windowDecls != null ? windowDecls : new SqlNodeList(pos)); this.orderBy = orderBy; this.offset = offset; this.fetch = fetch; }
>>validate the sql query
SqlValidatorImpl驗證sqlNode
public SqlNode validate(SqlNode topNode) { SqlValidatorScope scope = new EmptyScope(this); scope = new CatalogScope(scope, ImmutableList.of("CATALOG")); final SqlNode topNode2 = validateScopedExpression(topNode, scope); final RelDataType type = getValidatedNodeType(topNode2); Util.discard(type); return topNode2; }
>>transform to a relational tree
SqlToRelConverter.java
/** * Converts an unvalidated query's parse tree into a relational expression. * * @param query Query to convert * @param needsValidation Whether to validate the query before converting; * <code>false</code> if the query has already been * validated. * @param top Whether the query is top-level, say if its result * will become a JDBC result set; <code>false</code> if * the query will be part of a view. */ public RelRoot convertQuery( SqlNode query, final boolean needsValidation, final boolean top) { if (needsValidation) { query = validator.validate(query); } RelMetadataQuery.THREAD_PROVIDERS.set( JaninoRelMetadataProvider.of(cluster.getMetadataProvider())); RelNode result = convertQueryRecursive(query, top, null).rel; if (top) { if (isStream(query)) { result = new LogicalDelta(cluster, result.getTraitSet(), result); } } RelCollation collation = RelCollations.EMPTY; if (!query.isA(SqlKind.DML)) { if (isOrdered(query)) { collation = requiredCollation(result); } } checkConvertedType(query, result); if (SQL2REL_LOGGER.isDebugEnabled()) { SQL2REL_LOGGER.debug( RelOptUtil.dumpPlan("Plan after converting SqlNode to RelNode", result, SqlExplainFormat.TEXT, SqlExplainLevel.EXPPLAN_ATTRIBUTES)); } final RelDataType validatedRowType = validator.getValidatedNodeType(query); return RelRoot.of(result, validatedRowType, query.getKind()) .withCollation(collation); }
(2)update
代碼實現
override def sqlUpdate(stmt: String): Unit = { sqlUpdate(stmt, this.queryConfig) } override def sqlUpdate(stmt: String, config: QueryConfig): Unit = { val planner = getFlinkPlanner // parse the sql query val parsed = planner.parse(stmt) parsed match { case insert: SqlInsert => // validate the SQL query val query = insert.getSource val validatedQuery = planner.validate(query) // get query result as Table val queryResult = new TableImpl(this, new PlannerQueryOperation(planner.rel(validatedQuery).rel)) // get name of sink table val targetTablePath = insert.getTargetTable.asInstanceOf[SqlIdentifier].names // insert query result into sink table insertInto(queryResult, config, targetTablePath.asScala:_*) case _ => throw new TableException( "Unsupported SQL query! sqlUpdate() only accepts SQL statements of type INSERT.") } }
步驟相似,再也不贅述。
3. 總結
Flink Table API&SQL 爲流式數據和靜態數據的關係查詢保留統一的接口,並且利用了Calcite的查詢優化框架和SQL parser。該設計是基於Flink已構建好的API構建的,DataStream API 提供低延時高吞吐的流處理能力並且就有exactly-once語義並且能夠基於event-time進行處理。並且DataSet擁有穩定高效的內存算子和流水線式的數據交換。Flink的core API和引擎的全部改進都會自動應用到Table API和SQL上。
參考資料: