使用flink Table &Sql api來構建批量和流式應用(3)Flink Sql 使用

從flink的官方文檔,咱們知道flink的編程模型分爲四層,sql層是最高層的api,Table api是中間層,DataStream/DataSet Api 是核心,stateful Streaming process層是底層實現。html

 

 

 

其中,java

flink dataset api使用及原理 介紹了DataSet Api sql

flink DataStream API使用及原理介紹了DataStream Api express

flink中的時間戳如何使用?---Watermark使用及原理 介紹了底層實現的基礎Watermarkapache

flink window實例分析 介紹了window的概念及使用原理編程

Flink中的狀態與容錯 介紹了State的概念及checkpoint,savepoint的容錯機制api

 上上篇<使用flink Table &Sql api來構建批量和流式應用(1)Table的基本概念>介紹了Table的基本概念及使用方法框架

上篇<使用flink Table &Sql api來構建批量和流式應用(2)Table API概述>ide

本篇主要看看Flink Sql 有哪些功能及背後的原理oop

1. sql功能

 體如今org.apache.flink.table.api.TableEnvironment,目前flink僅支持select和insert操做

(1) select 

    /**
     * Evaluates a SQL query on registered tables and retrieves the result as a {@link Table}.
     *
     * <p>All tables referenced by the query must be registered in the TableEnvironment.
     * A {@link Table} is automatically registered when its {@link Table#toString()} method is
     * called, for example when it is embedded into a String.
     * Hence, SQL queries can directly reference a {@link Table} as follows:
     *
     * <pre>
     * {@code
     *   Table table = ...;
     *   String tableName = table.toString();
     *   // the table is not registered to the table environment
     *   tEnv.sqlQuery("SELECT * FROM tableName");
     * }
     * </pre>
     *
     * @param query The SQL query to evaluate.
     * @return The result of the query as Table
     */
    Table sqlQuery(String query);

(2) update(當前僅支持insert)

    /**
     * Evaluates a SQL statement such as INSERT, UPDATE or DELETE; or a DDL statement;
     * NOTE: Currently only SQL INSERT statements are supported.
     *
     * <p>All tables referenced by the query must be registered in the TableEnvironment.
     * A {@link Table} is automatically registered when its {@link Table#toString()} method is
     * called, for example when it is embedded into a String.
     * Hence, SQL queries can directly reference a {@link Table} as follows:
     *
     * <pre>
     * {@code
     *   // register the configured table sink into which the result is inserted.
     *   tEnv.registerTableSink("sinkTable", configuredSink);
     *   Table sourceTable = ...
     *   String tableName = sourceTable.toString();
     *   // sourceTable is not registered to the table environment
     *   tEnv.sqlUpdate(s"INSERT INTO sinkTable SELECT * FROM tableName");
     * }
     * </pre>
     *
     * @param stmt The SQL statement to evaluate.
     */
    void sqlUpdate(String stmt);

    /**
     * Evaluates a SQL statement such as INSERT, UPDATE or DELETE; or a DDL statement;
     * NOTE: Currently only SQL INSERT statements are supported.
     *
     * <p>All tables referenced by the query must be registered in the TableEnvironment.
     * A {@link Table} is automatically registered when its {@link Table#toString()} method is
     * called, for example when it is embedded into a String.
     * Hence, SQL queries can directly reference a {@link Table} as follows:
     *
     * <pre>
     * {@code
     *   // register the configured table sink into which the result is inserted.
     *   tEnv.registerTableSink("sinkTable", configuredSink);
     *   Table sourceTable = ...
     *   String tableName = sourceTable.toString();
     *   // sourceTable is not registered to the table environment
     *   tEnv.sqlUpdate(s"INSERT INTO sinkTable SELECT * FROM tableName", config);
     * }
     * </pre>
     *
     * @param stmt The SQL statement to evaluate.
     * @param config The {@link QueryConfig} to use.
     */
    void sqlUpdate(String stmt, QueryConfig config);

2. sql解析原理

Apache Calcite面向Hadoop新的sql引擎,它提供了標準的SQL語言、多種查詢優化和鏈接各類數據源的能力。除此以外,Calcite還提供了OLAP和流處理的查詢引擎。它2013年成爲了Apache孵化項目以來,在Hadoop中愈來愈引人注目,並被衆多項目集成。好比Flink/Storm/Drill/Phoenix都依賴它作sql解析和優化。

先從demo跑起來,看看sql 解析都經歷了什麼工程?

(1) select 

package org.apache.flink.table.examples.java;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;

import java.util.Arrays;

/**
 * Simple example for demonstrating the use of SQL on a Stream Table in Java.
 *
 * <p>This example shows how to:
 *  - Convert DataStreams to Tables
 *  - Register a Table under a name
 *  - Run a StreamSQL query on the registered Table
 *
 */
public class StreamSQLExample {

    // *************************************************************************
    //     PROGRAM
    // *************************************************************************

    public static void main(String[] args) throws Exception {

        // set up execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        DataStream<Order> orderA = env.fromCollection(Arrays.asList(
            new Order(1L, "beer", 3),
            new Order(1L, "diaper", 4),
            new Order(3L, "rubber", 2)));

        DataStream<Order> orderB = env.fromCollection(Arrays.asList(
            new Order(2L, "pen", 3),
            new Order(2L, "rubber", 3),
            new Order(4L, "beer", 1)));

        // convert DataStream to Table
        Table tableA = tEnv.fromDataStream(orderA, "user, product, amount");
        // register DataStream as Table
        tEnv.registerDataStream("OrderB", orderB, "user, product, amount");

        // union the two tables
        Table result = tEnv.sqlQuery("SELECT * FROM " + tableA + " WHERE amount > 2 UNION ALL " +
                        "SELECT * FROM OrderB WHERE amount < 2");

        tEnv.toAppendStream(result, Order.class).print();

        env.execute();
    }

    // *************************************************************************
    //     USER DATA TYPES
    // *************************************************************************

    /**
     * Simple POJO.
     */
    public static class Order {
        public Long user;
        public String product;
        public int amount;

        public Order() {
        }

        public Order(Long user, String product, int amount) {
            this.user = user;
            this.product = product;
            this.amount = amount;
        }

        @Override
        public String toString() {
            return "Order{" +
                "user=" + user +
                ", product='" + product + '\'' +
                ", amount=" + amount +
                '}';
        }
    }
}

實現代碼以下

 override def sqlQuery(query: String): Table = {
    val planner = getFlinkPlanner
    // parse the sql query
    val parsed = planner.parse(query)
    if (null != parsed && parsed.getKind.belongsTo(SqlKind.QUERY)) {
      // validate the sql query
      val validated = planner.validate(parsed)
      // transform to a relational tree
      val relational = planner.rel(validated)
      new TableImpl(this, new PlannerQueryOperation(relational.rel))
    } else {
      throw new TableException(
        "Unsupported SQL query! sqlQuery() only accepts SQL queries of type " +
          "SELECT, UNION, INTERSECT, EXCEPT, VALUES, and ORDER_BY.")
    }
  }

>>parse the sql query

在calcite中用SqlNode表示

  public SqlSelect(SqlParserPos pos,
      SqlNodeList keywordList,
      SqlNodeList selectList,
      SqlNode from,
      SqlNode where,
      SqlNodeList groupBy,
      SqlNode having,
      SqlNodeList windowDecls,
      SqlNodeList orderBy,
      SqlNode offset,
      SqlNode fetch) {
    super(pos);
    this.keywordList = Objects.requireNonNull(keywordList != null
        ? keywordList : new SqlNodeList(pos));
    this.selectList = selectList;
    this.from = from;
    this.where = where;
    this.groupBy = groupBy;
    this.having = having;
    this.windowDecls = Objects.requireNonNull(windowDecls != null
        ? windowDecls : new SqlNodeList(pos));
    this.orderBy = orderBy;
    this.offset = offset;
    this.fetch = fetch;
  }

>>validate the sql query

SqlValidatorImpl驗證sqlNode

    public SqlNode validate(SqlNode topNode) {
        SqlValidatorScope scope = new EmptyScope(this);
        scope = new CatalogScope(scope, ImmutableList.of("CATALOG"));
        final SqlNode topNode2 = validateScopedExpression(topNode, scope);
        final RelDataType type = getValidatedNodeType(topNode2);
        Util.discard(type);
        return topNode2;
    }

>>transform to a relational tree

SqlToRelConverter.java

/**
   * Converts an unvalidated query's parse tree into a relational expression.
   *
   * @param query           Query to convert
   * @param needsValidation Whether to validate the query before converting;
   *                        <code>false</code> if the query has already been
   *                        validated.
   * @param top             Whether the query is top-level, say if its result
   *                        will become a JDBC result set; <code>false</code> if
   *                        the query will be part of a view.
   */
  public RelRoot convertQuery(
      SqlNode query,
      final boolean needsValidation,
      final boolean top) {
    if (needsValidation) {
      query = validator.validate(query);
    }

    RelMetadataQuery.THREAD_PROVIDERS.set(
        JaninoRelMetadataProvider.of(cluster.getMetadataProvider()));
    RelNode result = convertQueryRecursive(query, top, null).rel;
    if (top) {
      if (isStream(query)) {
        result = new LogicalDelta(cluster, result.getTraitSet(), result);
      }
    }
    RelCollation collation = RelCollations.EMPTY;
    if (!query.isA(SqlKind.DML)) {
      if (isOrdered(query)) {
        collation = requiredCollation(result);
      }
    }
    checkConvertedType(query, result);

    if (SQL2REL_LOGGER.isDebugEnabled()) {
      SQL2REL_LOGGER.debug(
          RelOptUtil.dumpPlan("Plan after converting SqlNode to RelNode",
              result, SqlExplainFormat.TEXT,
              SqlExplainLevel.EXPPLAN_ATTRIBUTES));
    }

    final RelDataType validatedRowType = validator.getValidatedNodeType(query);
    return RelRoot.of(result, validatedRowType, query.getKind())
        .withCollation(collation);
  }

 

(2)update

代碼實現

  override def sqlUpdate(stmt: String): Unit = {
    sqlUpdate(stmt, this.queryConfig)
  }

  override def sqlUpdate(stmt: String, config: QueryConfig): Unit = {
    val planner = getFlinkPlanner
    // parse the sql query
    val parsed = planner.parse(stmt)
    parsed match {
      case insert: SqlInsert =>
        // validate the SQL query
        val query = insert.getSource
        val validatedQuery = planner.validate(query)

        // get query result as Table
        val queryResult = new TableImpl(this,
          new PlannerQueryOperation(planner.rel(validatedQuery).rel))

        // get name of sink table
        val targetTablePath = insert.getTargetTable.asInstanceOf[SqlIdentifier].names

        // insert query result into sink table
        insertInto(queryResult, config, targetTablePath.asScala:_*)
      case _ =>
        throw new TableException(
          "Unsupported SQL query! sqlUpdate() only accepts SQL statements of type INSERT.")
    }
  }

步驟相似,再也不贅述。

3. 總結

Flink Table API&SQL 爲流式數據和靜態數據的關係查詢保留統一的接口,並且利用了Calcite的查詢優化框架和SQL parser。該設計是基於Flink已構建好的API構建的,DataStream API 提供低延時高吞吐的流處理能力並且就有exactly-once語義並且能夠基於event-time進行處理。並且DataSet擁有穩定高效的內存算子和流水線式的數據交換。Flink的core API和引擎的全部改進都會自動應用到Table API和SQL上。

參考資料:

【1】http://blog.chinaunix.net/uid-29038263-id-5765791.html

相關文章
相關標籤/搜索