package com.opensourceteams.module.bigdata.flink.example.tableapi.convert.dataset import org.apache.flink.api.scala.{ExecutionEnvironment, _} import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.scala._ object Run2 { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) val dataSet = env.fromElements( (1,"a",10),(2,"b",20), (3,"c",30) ) //從dataset轉化爲 table val table = tableEnv.fromDataSet(dataSet) table.first(1000).print() /** * 打印輸出表數據 * * 1,a,10 * 2,b,20 * 3,c,30 */ } }
1,a,10 2,b,20 3,c,30
package com.opensourceteams.module.bigdata.flink.example.tableapi.convert.dataset import org.apache.flink.api.scala.{ExecutionEnvironment, _} import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.scala._ object Run1 { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) val dataSet = env.fromElements( (1,"a",10),(2,"b",20), (3,"c",30) ) //從dataset轉化爲 table val table = tableEnv.fromDataSet(dataSet) //註冊table tableEnv.registerTable("user1",table) //查詢table 全部數據 tableEnv.scan("user1").first(10) //print 輸出 (至關於sink) .print() /** * 輸出結果 * * 1,a,10 * 2,b,20 * 3,c,30 */ } }
1,a,10 2,b,20 3,c,30
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.scan import org.apache.flink.api.scala.{ExecutionEnvironment, _} import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.scala._ object Run { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) val dataSet = env.fromElements( (1,"a",10),(2,"b",20), (3,"c",30) ) //從dataset轉化爲 table val table = tableEnv.fromDataSet(dataSet) //註冊table tableEnv.registerTable("user1",table) //查詢table 全部數據 tableEnv.scan("user1").first(100) //print 輸出 (至關於sink) .print() /** * 輸出結果 * * 1,a,10 * 2,b,20 * 3,c,30 */ } }
1,a,10 2,b,20 3,c,30
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.select import org.apache.flink.api.scala.{ExecutionEnvironment, _} import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.scala._ object Run { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) val dataSet = env.fromElements( (1,"a",10),(2,"b",20), (3,"c",30) ) //從dataset轉化爲 table val table = tableEnv.fromDataSet(dataSet) //註冊table tableEnv.registerTable("user1",table) //查詢table 全部數據 tableEnv.scan("user1") //選擇須要的字段 .select('_1,'_2,'_3) .first(100) //print 輸出 (至關於sink) .print() /** * 輸出結果 * * 1,a,10 * 2,b,20 * 3,c,30 */ } }
1,a,10 2,b,20 3,c,30
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.as import org.apache.flink.api.scala.{ExecutionEnvironment, _} import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.scala._ object Run { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) val dataSet = env.fromElements( (1,"a",10),(2,"b",20), (3,"c",30) ) //從dataset轉化爲 table val table = tableEnv.fromDataSet(dataSet) //註冊table tableEnv.registerTable("user1",table) //查詢table 全部數據 tableEnv.scan("user1") //重命令字段名稱 .as('id,'name,'value) //選擇須要的字段 .select('id,'name,'value) .first(100) //print 輸出 (至關於sink) .print() /** * 輸出結果 * * 1,a,10 * 2,b,20 * 3,c,30 */ } }
1,a,10 2,b,20 3,c,30
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.as import org.apache.flink.api.scala.{ExecutionEnvironment, _} import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.scala._ object Run { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) val dataSet = env.fromElements( (1,"a",10),(2,"b",20), (3,"c",30) ) //從dataset轉化爲 table val table = tableEnv.fromDataSet(dataSet) //註冊table tableEnv.registerTable("user1",table) //查詢table 全部數據 tableEnv.scan("user1") //重命令字段名稱 .as('id,'name,'value) //選擇須要的字段 .select('id,'name as 'name2,'value) .first(100) //print 輸出 (至關於sink) .print() /** * 輸出結果 * * 1,a,10 * 2,b,20 * 3,c,30 */ } }
1,a,10 2,b,20 3,c,30
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.where import org.apache.flink.api.scala.{ExecutionEnvironment, _} import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.scala._ object Run { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) val dataSet = env.fromElements( (1,"a",10),(2,"b",20), (3,"c",30), (4,"c",20) ) //從dataset轉化爲 table val table = tableEnv.fromDataSet(dataSet) //註冊table tableEnv.registerTable("user1",table) //查詢table 全部數據 tableEnv.scan("user1") //重命令字段名稱 .as('id,'name,'value) //選擇須要的字段 .select('id,'name,'value) //條件過濾 .where("value=20") .where("id=4") .first(100) //print 輸出 (至關於sink) .print() /** * 輸出結果 * * 1,a,10 * 2,b,20 * 3,c,30 */ } }
4,c,20
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.where import org.apache.flink.api.scala.{ExecutionEnvironment, _} import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.scala._ object Run2 { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) val dataSet = env.fromElements( (1,"a",10),(2,"b",20), (3,"c",30), (4,"c",20) ) //從dataset轉化爲 table val table = tableEnv.fromDataSet(dataSet) //註冊table tableEnv.registerTable("user1",table) //查詢table 全部數據 tableEnv.scan("user1") //重命令字段名稱 .as('id,'name,'value) //選擇須要的字段 .select('id,'name,'value) //條件過濾 .where('value === 20) .where('id === 4) .first(100) //print 輸出 (至關於sink) .print() /** * 輸出結果 * * 1,a,10 * 2,b,20 * 3,c,30 */ } }
4,c,20
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.groupBy import org.apache.flink.api.scala.{ExecutionEnvironment, _} import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.scala._ object Run { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) val dataSet = env.fromElements( (1,"a",10),(2,"b",20), (3,"c",30), (4,"c",40) ) //從dataset轉化爲 table val table = tableEnv.fromDataSet(dataSet) //註冊table tableEnv.registerTable("user1",table) //查詢table 全部數據 tableEnv.scan("user1") //重命令字段名稱 .as('id,'name,'value) //選擇須要的字段 .groupBy('name) .select('name,'value.sum as 'value) .first(100) //print 輸出 (至關於sink) .print() /** * 輸出結果 * * 1,a,10 * 2,b,20 * 3,c,30 */ } }
a,10 b,20 c,70
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.distinct import org.apache.flink.api.scala.{ExecutionEnvironment, _} import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.scala._ object Run { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) val dataSet = env.fromElements( (1,"a",10),(1,"a",10),(2,"b",20), (3,"c",30) ) //從dataset轉化爲 table val table = tableEnv.fromDataSet(dataSet) //註冊table tableEnv.registerTable("user1",table) //查詢table 全部數據 tableEnv.scan("user1") //記錄去重 .distinct() .first(100) //print 輸出 (至關於sink) .print() /** * 輸出結果 * * 1,a,10 * 2,b,20 * 3,c,30 */ } }
1,a,10 3,c,30 2,b,20
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.distinct import org.apache.flink.api.scala.{ExecutionEnvironment, _} import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.scala._ object Run2 { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) val dataSet = env.fromElements( (1,"a",10),(1,"a",10),(2,"b",20), (3,"c",30),(20,"b",20) ) //從dataset轉化爲 table val table = tableEnv.fromDataSet(dataSet) //註冊table tableEnv.registerTable("user1",table) //查詢table 全部數據 tableEnv.scan("user1") //去掉字段重複的再求和 .select('_3.sum.distinct) .first(100) //print 輸出 (至關於sink) .print() /** * 輸出結果 * 60 */ } }
60
功能描述: 內鏈接git
scala 程序github
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.innerJoin import org.apache.flink.api.scala.{ExecutionEnvironment, _} import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.scala._ object Run { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) val dataSet = env.fromElements( (1,"a",10),(2,"b",20), (3,"c",30) ) val dataSet2 = env.fromElements( (1,"a",100),(20,"b",20), (30,"c",30) ) //列不能重複 val table = tableEnv.fromDataSet(dataSet,'a,'b,'c) val table2 = tableEnv.fromDataSet(dataSet2,'d,'e,'f) table.join(table2).where(" a = d ").first(1000).print() } }
1,a,10,1,a,100
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.leftOuterJoin import org.apache.flink.api.scala.{ExecutionEnvironment, _} import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.scala._ object Run { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) val dataSet = env.fromElements( (1,"a",10),(2,"b",20), (3,"c",30) ) val dataSet2 = env.fromElements( (1,"a",100),(20,"b",20), (30,"c",30) ) //列不能重複 val table = tableEnv.fromDataSet(dataSet,'a,'b,'c) val table2 = tableEnv.fromDataSet(dataSet2,'d,'e,'f) //table.leftOuterJoin(table2,"a=d").first(1000).print() table.leftOuterJoin(table2,'a === 'd).first(1000).print() /** * 輸出結果 * * 2,b,20,null,null,null * 1,a,10,1,a,100 * 3,c,30,null,null,null */ } }
1,a,10,1,a,100 2,b,20,null,null,null 3,c,30,null,null,null
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.rightOuterJoin import org.apache.flink.api.scala.{ExecutionEnvironment, _} import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.scala._ object Run { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) val dataSet = env.fromElements( (1,"a",10),(2,"b",20), (3,"c",30) ) val dataSet2 = env.fromElements( (1,"a",100),(20,"b",20), (30,"c",30) ) //列不能重複 val table = tableEnv.fromDataSet(dataSet,'a,'b,'c) val table2 = tableEnv.fromDataSet(dataSet2,'d,'e,'f) table.rightOuterJoin(table2,"a = d").first(1000).print() /** * 輸出結果 * * * null,null,null,20,b,20 * null,null,null,30,c,30 * 1,a,10,1,a,100 */ } }
null,null,null,20,b null,null,null,30,c 1,a,10,1,a,100
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.union import org.apache.flink.api.scala.{ExecutionEnvironment, _} import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.scala._ object Run { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) val dataSet = env.fromElements( (1,"a",10),(2,"b",20), (3,"c",30) ) val dataSet2 = env.fromElements( (1,"a",100),(2,"b",20),(20,"b",20), (30,"c",30) ) //列不能重複 val table = tableEnv.fromDataSet(dataSet,'a,'b,'c) val table2 = tableEnv.fromDataSet(dataSet2,'d,'e,'f) table.union(table2).first(1000).print() /** * 輸出結果 * * 30,c,30 * 1,a,100 * 2,b,20 * 20,b,20 * 1,a,10 * 3,c,30 */ } }
30,c,30 1,a,100 2,b,20 20,b,20 1,a,10 3,c,30
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.unionAll import org.apache.flink.api.scala.{ExecutionEnvironment, _} import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.scala._ object Run { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) val dataSet = env.fromElements( (1,"a",10),(2,"b",20), (3,"c",30) ) val dataSet2 = env.fromElements( (1,"a",100),(2,"b",20),(20,"b",20), (30,"c",30) ) //列不能重複 val table = tableEnv.fromDataSet(dataSet,'a,'b,'c) val table2 = tableEnv.fromDataSet(dataSet2,'d,'e,'f) table.unionAll(table2).first(1000).print() /** * 輸出結果 * * 30,c,30 * 1,a,100 * 2,b,20 * 20,b,20 * 1,a,10 * 3,c,30 */ } }
1,a,10 2,b,20 3,c,30 1,a,100 2,b,20 20,b,20 30,c,30
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.intersect import org.apache.flink.api.scala.{ExecutionEnvironment, _} import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.scala._ object Run { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) val dataSet = env.fromElements( (1,"a",10),(2,"b",20), (3,"c",30) ) val dataSet2 = env.fromElements( (1,"a",100),(2,"b",20),(20,"b",20), (30,"c",30) ) //列不能重複 val table = tableEnv.fromDataSet(dataSet,'a,'b,'c) val table2 = tableEnv.fromDataSet(dataSet2,'d,'e,'f) table.intersect(table2).first(1000).print() /** * 輸出結果 * * 2,b,20 */ } }
2,b,20
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.intersectAll import org.apache.flink.api.scala.{ExecutionEnvironment, _} import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.scala._ object Run { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) val dataSet = env.fromElements( (1,"a",10),(2,"b",20),(2,"b",20),(2,"b",20), (3,"c",30) ) val dataSet2 = env.fromElements( (1,"a",100),(2,"b",20),(2,"b",20),(20,"b",20), (30,"c",30) ) //列不能重複 val table = tableEnv.fromDataSet(dataSet,'a,'b,'c) val table2 = tableEnv.fromDataSet(dataSet2,'d,'e,'f) table.intersectAll(table2).first(1000).print() /** * 輸出結果 * * 2,b,20 */ } }
2,b,20 2,b,20
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.minus import org.apache.flink.api.scala.{ExecutionEnvironment, _} import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.scala._ object Run { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) env.setParallelism(1) val dataSet = env.fromElements( (1,"a",10),(2,"b",20),(2,"b",20),(2,"b",20), (3,"c",30) ) val dataSet2 = env.fromElements( (1,"a",100),(2,"b",20),(2,"b",20),(20,"b",20), (30,"c",30) ) val table = tableEnv.fromDataSet(dataSet,'a,'b,'c) val table2 = tableEnv.fromDataSet(dataSet2,'d,'e,'f) /** * 左表不存在於右表中的數據,會去重 */ table.minus(table2).first(1000).print() /** * 輸出結果 * 1,a,10 * 3,c,30 */ } }
1,a,10 3,c,30
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.minusAll import org.apache.flink.api.scala.{ExecutionEnvironment, _} import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.scala._ object Run { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) env.setParallelism(1) val dataSet = env.fromElements( (1,"a",10),(2,"b",20),(2,"b",20),(2,"b",20),(2,"b",20), (3,"c",30) ) val dataSet2 = env.fromElements( (1,"a",100),(2,"b",20),(2,"b",20),(20,"b",20), (30,"c",30) ) val table = tableEnv.fromDataSet(dataSet,'a,'b,'c) val table2 = tableEnv.fromDataSet(dataSet2,'d,'e,'f) /** * 左表不存在於右表中的數據,不會去重,若是左表某個元素有n次,右表中有m次,那這個元素出現的是n - m次 */ table.minusAll(table2).first(1000).print() /** * 輸出結果 * * 1,a,10 * 2,b,20 * 2,b,20 * 3,c,30 */ } }
1,a,10 2,b,20 2,b,20 3,c,30
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.in import org.apache.flink.api.scala.{ExecutionEnvironment, _} import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.scala._ object Run { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) val dataSet = env.fromElements( (1,"a",10),(2,"b",20), (3,"c",30) ) val dataSet2 = env.fromElements( (1,"a",100),(20,"b",20), (30,"c",30) ) //列不能重複 val table = tableEnv.fromDataSet(dataSet,'a,'b,'c) val table2 = tableEnv.fromDataSet(dataSet2,'d) /** * 表和子表的關係 * 子查詢只能由一列組成,表的查詢條件的列類型須要和子查詢保持一致 * 若是子查詢中的值在表中存在就返回真,這個元素就知足條件能夠被返回來 */ table.where('a.in(table2)) .first(1000).print() /** * 輸出結果 * * 1,a,10 */ } }
1,a,10
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.orderBy import org.apache.flink.api.scala.{ExecutionEnvironment, _} import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.scala._ object Run { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) env.setParallelism(1) val dataSet = env.fromElements( (1,"a",10),(2,"b",20) ,(20,"f",200),(3,"c",30) ) //從dataset轉化爲 table val table = tableEnv.fromDataSet(dataSet) //註冊table tableEnv.registerTable("user1",table) //查詢table 全部數據 tableEnv.scan("user1").as('id,'name,'value1) //.orderBy('id.asc) //按id列,升序排序(注意是按分區來排序) .orderBy('id.desc) //.orderBy('value1.asc) .first(1000) //print 輸出 (至關於sink) .print() /** * 輸出結果 * * 20,f,200 * 3,c,30 * 2,b,20 * 1,a,10 */ } }
20,f,200 3,c,30 2,b,20 1,a,10
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.fetch import org.apache.flink.api.scala.{ExecutionEnvironment, _} import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.scala._ object Run { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) env.setParallelism(1) val dataSet = env.fromElements( (1,"a",10),(2,"b",20) ,(20,"f",200),(3,"c",30) ) //從dataset轉化爲 table val table = tableEnv.fromDataSet(dataSet) //註冊table tableEnv.registerTable("user1",table) //查詢table 全部數據 tableEnv.scan("user1").as('id,'name,'value1) //.orderBy('id.asc) //按id列,升序排序(注意是按分區來排序) .orderBy('id.desc) .fetch(2) //只有有序的才能用,只取了2個元素 .first(1000) //print 輸出 (至關於sink) .print() /** * 輸出結果 * * 20,f,200 * 3,c,30 */ } }
20,f,200 3,c,30
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.offset import org.apache.flink.api.scala.{ExecutionEnvironment, _} import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.scala._ object Run { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) env.setParallelism(1) val dataSet = env.fromElements( (1,"a",10),(2,"b",20) ,(20,"f",200),(3,"c",30) ) //從dataset轉化爲 table val table = tableEnv.fromDataSet(dataSet) //註冊table tableEnv.registerTable("user1",table) //查詢table 全部數據 tableEnv.scan("user1").as('id,'name,'value1) //.orderBy('id.asc) //按id列,升序排序(注意是按分區來排序) .orderBy('id.desc) .offset(2) //只有有序的才能用,偏移了2個元素 .first(1000) //print 輸出 (至關於sink) .print() /** * 輸出結果 * * 2,b,20 * 1,a,10 */ } }
2,b,20 1,a,10
package com.opensourceteams.module.bigdata.flink.example.tableapi.sink.csv import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.scala.{ExecutionEnvironment, _} import org.apache.flink.core.fs.FileSystem.WriteMode import org.apache.flink.table.api.{TableEnvironment, Types} import org.apache.flink.table.sinks.CsvTableSink object Run { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) val dataSet = env.fromElements( (1,"a",10),(2,"b",20), (3,"c",30) ) //從dataset轉化爲 table val table = tableEnv.fromDataSet(dataSet) val cvsTableSink = new CsvTableSink("sink-data/csv/a.csv", ",", 1, WriteMode.OVERWRITE ) val fieldNames: Array[String] = Array("id", "name", "value") val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.STRING, Types.INT) tableEnv.registerTableSink("cvsTableSink",fieldNames,fieldTypes,cvsTableSink) table.insertInto("cvsTableSink") env.execute() } }
1,a,10 2,b,20 3,c,30
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.insert import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.scala.{ExecutionEnvironment, _} import org.apache.flink.core.fs.FileSystem.WriteMode import org.apache.flink.table.api.{TableEnvironment, Types} import org.apache.flink.table.api.scala._ import org.apache.flink.table.sinks.CsvTableSink object Run { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) val dataSet = env.fromElements( (1,"a",10),(2,"b",20), (3,"c",30) ) //從dataset轉化爲 table val table = tableEnv.fromDataSet(dataSet) val cvsTableSink = new CsvTableSink("/opt/n_001_workspaces/bigdata/flink/flink-maven-scala-2/sink-data/csv/a.csv", ",", 1, WriteMode.OVERWRITE ) val fieldNames: Array[String] = Array("id", "name", "value") val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.STRING, Types.INT) tableEnv.registerTableSink("cvsTableSink",fieldNames,fieldTypes,cvsTableSink) table.insertInto("cvsTableSink") env.execute() } }
1,a,10 2,b,20 3,c,30