本文主要研究一下flink的Table API及SQL Programshtml
// for batch programs use ExecutionEnvironment instead of StreamExecutionEnvironment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // create a TableEnvironment // for batch programs use BatchTableEnvironment instead of StreamTableEnvironment StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); // register a Table tableEnv.registerTable("table1", ...) // or tableEnv.registerTableSource("table2", ...); // or tableEnv.registerExternalCatalog("extCat", ...); // register an output Table tableEnv.registerTableSink("outputTable", ...); // create a Table from a Table API query Table tapiResult = tableEnv.scan("table1").select(...); // create a Table from a SQL query Table sqlResult = tableEnv.sqlQuery("SELECT ... FROM table2 ... "); // emit a Table API result Table to a TableSink, same for SQL result tapiResult.insertInto("outputTable"); // execute env.execute();
// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); // register Orders table // scan registered Orders table Table orders = tableEnv.scan("Orders"); // compute revenue for all customers from France Table revenue = orders .filter("cCountry === 'FRANCE'") .groupBy("cID, cName") .select("cID, cName, revenue.sum AS revSum"); // emit or convert Table // execute query
// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); // register Orders table // compute revenue for all customers from France Table revenue = tableEnv.sqlQuery( "SELECT cID, cName, SUM(revenue) AS revSum " + "FROM Orders " + "WHERE cCountry = 'FRANCE' " + "GROUP BY cID, cName" ); // emit or convert Table // execute query
TableSink
)// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); // register "Orders" table // register "RevenueFrance" output table // compute revenue for all customers from France and emit to "RevenueFrance" tableEnv.sqlUpdate( "INSERT INTO RevenueFrance " + "SELECT cID, cName, SUM(revenue) AS revSum " + "FROM Orders " + "WHERE cCountry = 'FRANCE' " + "GROUP BY cID, cName" ); // execute query
TableSink
)// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); // create a TableSink TableSink sink = new CsvTableSink("/path/to/file", fieldDelim = "|"); // register the TableSink with a specific schema String[] fieldNames = {"a", "b", "c"}; TypeInformation[] fieldTypes = {Types.INT, Types.STRING, Types.LONG}; tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, sink); // compute a result Table using Table API operators and/or SQL queries Table result = ... // emit the result Table to the registered TableSink result.insertInto("CsvSinkTable"); // execute the program
或DataSet
)與Table轉換// get StreamTableEnvironment // registration of a DataSet in a BatchTableEnvironment is equivalent StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); DataStream<Tuple2<Long, String>> stream = ... // register the DataStream as Table "myTable" with fields "f0", "f1" tableEnv.registerDataStream("myTable", stream); // register the DataStream as table "myTable2" with fields "myLong", "myString" tableEnv.registerDataStream("myTable2", stream, "myLong, myString");
// get StreamTableEnvironment // registration of a DataSet in a BatchTableEnvironment is equivalent StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); DataStream<Tuple2<Long, String>> stream = ... // Convert the DataStream into a Table with default fields "f0", "f1" Table table1 = tableEnv.fromDataStream(stream); // Convert the DataStream into a Table with fields "myLong", "myString" Table table2 = tableEnv.fromDataStream(stream, "myLong, myString");
// get StreamTableEnvironment. StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); // Table with two fields (String name, Integer age) Table table = ... // convert the Table into an append DataStream of Row by specifying the class DataStream<Row> dsRow = tableEnv.toAppendStream(table, Row.class); // convert the Table into an append DataStream of Tuple2<String, Integer> // via a TypeInformation TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>( Types.STRING(), Types.INT()); DataStream<Tuple2<String, Integer>> dsTuple = tableEnv.toAppendStream(table, tupleType); // convert the Table into a retract DataStream of Row. // A retract stream of type X is a DataStream<Tuple2<Boolean, X>>. // The boolean field indicates the type of the change. // True is INSERT, false is DELETE. DataStream<Tuple2<Boolean, Row>> retractStream = tableEnv.toRetractStream(table, Row.class);
// get BatchTableEnvironment BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); // Table with two fields (String name, Integer age) Table table = ... // convert the Table into a DataSet of Row by specifying a class DataSet<Row> dsRow = tableEnv.toDataSet(table, Row.class); // convert the Table into a DataSet of Tuple2<String, Integer> via a TypeInformation TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>( Types.STRING(), Types.INT()); DataSet<Tuple2<String, Integer>> dsTuple = tableEnv.toDataSet(table, tupleType);
Tuple類型
)// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); //---Tuple類型--- DataStream<Tuple2<Long, Integer>> stream = ... // convert DataStream into Table with default field names "f0" and "f1" Table table = tableEnv.fromDataStream(stream); // convert DataStream into Table with field names "myLong" and "myInt" Table table = tableEnv.fromDataStream(stream, "myLong, myInt");
POJO類型
)// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); //---Tuple類型--- DataStream<Tuple2<Long, Integer>> stream = ... // convert DataStream into Table with default field names "f0" and "f1" Table table = tableEnv.fromDataStream(stream); // convert DataStream into Table with field "f1" only Table table = tableEnv.fromDataStream(stream, "f1"); // convert DataStream into Table with swapped fields Table table = tableEnv.fromDataStream(stream, "f1, f0"); // convert DataStream into Table with swapped fields and field names "myInt" and "myLong" Table table = tableEnv.fromDataStream(stream, "f1 as myInt, f0 as myLong"); //---POJO類型--- // Person is a POJO with fields "name" and "age" DataStream<Person> stream = ... // convert DataStream into Table with default field names "age", "name" (fields are ordered by name!) Table table = tableEnv.fromDataStream(stream); // convert DataStream into Table with renamed fields "myAge", "myName" (name-based) Table table = tableEnv.fromDataStream(stream, "age as myAge, name as myName"); // convert DataStream into Table with projected field "name" (name-based) Table table = tableEnv.fromDataStream(stream, "name"); // convert DataStream into Table with projected and renamed field "myName" (name-based) Table table = tableEnv.fromDataStream(stream, "name as myName");
// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); DataStream<Long> stream = ... // convert DataStream into Table with default field name "f0" Table table = tableEnv.fromDataStream(stream); // convert DataStream into Table with field name "myLong" Table table = tableEnv.fromDataStream(stream, "myLong");
// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); // DataStream of Row with two fields "name" and "age" specified in `RowTypeInfo` DataStream<Row> stream = ... // convert DataStream into Table with default field names "name", "age" Table table = tableEnv.fromDataStream(stream); // convert DataStream into Table with renamed field names "myName", "myAge" (position-based) Table table = tableEnv.fromDataStream(stream, "myName, myAge"); // convert DataStream into Table with renamed fields "myName", "myAge" (name-based) Table table = tableEnv.fromDataStream(stream, "name as myName, age as myAge"); // convert DataStream into Table with projected field "name" (name-based) Table table = tableEnv.fromDataStream(stream, "name"); // convert DataStream into Table with projected and renamed field "myName" (name-based) Table table = tableEnv.fromDataStream(stream, "name as myName");
flink的Table API及SQL Programs的基本用法sql
BatchTableEnvironment或者StreamTableEnvironment
),以後就是建立Table或者TableSource並註冊到catalog(默認使用的catalog是internal的,也能夠本身選擇註冊external catalog
),而後就進行table的query,以後就是一些轉換操做scan方法
),也能夠使用sql query(sqlQuery方法
),或者是混合使用