package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.scan import org.apache.flink.api.scala.{ExecutionEnvironment, _} import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.scala._ object Run { def main(args: Array[String]): Unit = { //獲得批環境 val env = ExecutionEnvironment.getExecutionEnvironment val dataSet = env.fromElements(("小明",15,"男"),("小王",45,"男"),("小李",25,"女"),("小慧",35,"女")) //獲得Table環境 val tableEnv = TableEnvironment.getTableEnvironment(env) //註冊table tableEnv.registerDataSet("user1",dataSet,'name,'age,'sex) tableEnv.sqlQuery(s"select name,age FROM user1") .first(100).print() /** * 輸出結果 * * 小明,15 * 小王,45 * 小李,25 * 小慧,35 */ } }
小明,15 小王,45 小李,25 小慧,35
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.scan import org.apache.flink.api.scala.{ExecutionEnvironment, _} import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.scala._ object Run { def main(args: Array[String]): Unit = { //獲得批環境 val env = ExecutionEnvironment.getExecutionEnvironment val dataSet = env.fromElements(("小明",15,"男"),("小王",45,"男"),("小李",25,"女"),("小慧",35,"女")) //獲得Table環境 val tableEnv = TableEnvironment.getTableEnvironment(env) //註冊table tableEnv.registerDataSet("user1",dataSet,'name,'age,'sex) tableEnv.sqlQuery(s"select t1.name,t1.age FROM user1 as t1") .first(100).print() /** * 輸出結果 * * 小明,15 * 小王,45 * 小李,25 * 小慧,35 */ } }
小明,15 小王,45 小李,25 小慧,35
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.scan import org.apache.flink.api.scala.{ExecutionEnvironment, _} import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.scala._ object Run { def main(args: Array[String]): Unit = { //獲得批環境 val env = ExecutionEnvironment.getExecutionEnvironment val dataSet = env.fromElements(("小明",15,"男"),("小王",45,"男"),("小李",25,"女"),("小慧",35,"女")) //獲得Table環境 val tableEnv = TableEnvironment.getTableEnvironment(env) //註冊table tableEnv.registerDataSet("user1",dataSet,'name,'age,'sex) tableEnv.sqlQuery(s"select name a,age as b FROM user1 ") .first(100).print() /** * 輸出結果 * * 小明,15 * 小王,45 * 小李,25 * 小慧,35 */ } }
小明,15 小王,45 小李,25 小慧,35
功能描述:查詢一個表的數據,只返回指定的前幾行(爭對並行度而言,因此並行度不同,結果不同)git
scala 程序github
package com.opensourceteams.mo`dule.bigdata.flink.example.sql.dataset.operations.limit import org.apache.flink.api.scala.{ExecutionEnvironment, _} import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.scala._ object Run { def main(args: Array[String]): Unit = { //獲得批環境 val env = ExecutionEnvironment.getExecutionEnvironment env.setParallelism(2) val dataSet = env.fromElements(("小明",15,"男"),("小王",45,"男"),("小李",25,"女"),("小慧",35,"女")) //獲得Table環境 val tableEnv = TableEnvironment.getTableEnvironment(env) //註冊table tableEnv.registerDataSet("user1",dataSet,'name,'age,'sex) /** * 先排序,按age的降序排序,輸出前100位結果,注意是按同一個並行度中的數據進行排序,也就是同一個分區 */ tableEnv.sqlQuery(s"select name,age FROM user1 ORDER BY age desc LIMIT 100 ") .first(100).print() /** * 輸出結果 並行度設置爲2 * * 小明,15 * 小王,45 * 小慧,35 * 小李,25 */ /** * 輸出結果 並行度設置爲1 * * 小王,45 * 小慧,35 * 小李,25 * 小明,15 */ } }
小明,15 小王,45 小慧,35 小李,25
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.where import org.apache.flink.api.scala.{ExecutionEnvironment, _} import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.scala._ object Run { def main(args: Array[String]): Unit = { //獲得批環境 val env = ExecutionEnvironment.getExecutionEnvironment val dataSet = env.fromElements(("小明",15,"男"),("小王",45,"男"),("小李",25,"女"),("小慧",35,"女")) //獲得Table環境 val tableEnv = TableEnvironment.getTableEnvironment(env) //註冊table tableEnv.registerDataSet("user1",dataSet,'name,'age,'sex) tableEnv.sqlQuery(s"select name,age,sex FROM user1 where sex = '女'") .first(100).print() /** * 輸出結果 * * 小李,25,女 * 小慧,35,女 */ } }
小李,25,女 小慧,35,女
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.whereBetweenAnd import org.apache.flink.api.scala.{ExecutionEnvironment, _} import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.scala._ object Run { def main(args: Array[String]): Unit = { //獲得批環境 val env = ExecutionEnvironment.getExecutionEnvironment val dataSet = env.fromElements(("小明",15,"男"),("小王",45,"男"),("小李",25,"女"),("小慧",35,"女")) //獲得Table環境 val tableEnv = TableEnvironment.getTableEnvironment(env) //註冊table tableEnv.registerDataSet("user1",dataSet,'name,'age,'sex) tableEnv.sqlQuery(s"select name,age,sex FROM user1 where age between 20 and 35") .first(100).print() /** * 結果 * * 小李,25,女 * 小慧,35,女 */ } }
小李,25,女 小慧,35,女
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.aggregations.sum import org.apache.flink.api.scala.{ExecutionEnvironment, _} import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.scala._ object Run { def main(args: Array[String]): Unit = { //獲得批環境 val env = ExecutionEnvironment.getExecutionEnvironment val dataSet = env.fromElements(("小明",15,"男",1500),("小王",45,"男",4000),("小李",25,"女",800),("小慧",35,"女",500)) //獲得Table環境 val tableEnv = TableEnvironment.getTableEnvironment(env) //註冊table tableEnv.registerDataSet("user1",dataSet,'name,'age,'sex,'salary) //彙總全部數據 tableEnv.sqlQuery(s"select sum(salary) FROM user1") .first(100).print() /** * 輸出結果 * * 6800 */ } }
6800
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.aggregations.max import org.apache.flink.api.scala.{ExecutionEnvironment, _} import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.scala._ object Run { def main(args: Array[String]): Unit = { //獲得批環境 val env = ExecutionEnvironment.getExecutionEnvironment val dataSet = env.fromElements(("小明",15,"男",1500),("小王",45,"男",4000),("小李",25,"女",800),("小慧",35,"女",500)) //獲得Table環境 val tableEnv = TableEnvironment.getTableEnvironment(env) //註冊table tableEnv.registerDataSet("user1",dataSet,'name,'age,'sex,'salary) //彙總全部數據 tableEnv.sqlQuery(s"select max(salary) FROM user1 ") .first(100).print() /** * 輸出結果 * * 4000 */ } }
4000
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.aggregations.min import org.apache.flink.api.scala.{ExecutionEnvironment, _} import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.scala._ object Run { def main(args: Array[String]): Unit = { //獲得批環境 val env = ExecutionEnvironment.getExecutionEnvironment val dataSet = env.fromElements(("小明",15,"男",1500),("小王",45,"男",4000),("小李",25,"女",800),("小慧",35,"女",500)) //獲得Table環境 val tableEnv = TableEnvironment.getTableEnvironment(env) //註冊table tableEnv.registerDataSet("user1",dataSet,'name,'age,'sex,'salary) tableEnv.sqlQuery(s"select min(salary) FROM user1 ") .first(100).print() /** * 輸出結果 * * 500 */ } }
500
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.aggregations.group import org.apache.flink.api.scala.{ExecutionEnvironment, _} import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.scala._ object Run { def main(args: Array[String]): Unit = { //獲得批環境 val env = ExecutionEnvironment.getExecutionEnvironment val dataSet = env.fromElements(("小明",15,"男",1500),("小王",45,"男",4000),("小李",25,"女",800),("小慧",35,"女",500)) //獲得Table環境 val tableEnv = TableEnvironment.getTableEnvironment(env) //註冊table tableEnv.registerDataSet("user1",dataSet,'name,'age,'sex,'salary) //彙總全部數據 tableEnv.sqlQuery(s"select sex,sum(salary) FROM user1 group by sex") .first(100).print() /** * 輸出結果 * * 女,1300 * 男,5500 */ } }
女,1300 男,5500
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.aggregations.group_having import org.apache.flink.api.scala.{ExecutionEnvironment, _} import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.scala._ object Run { def main(args: Array[String]): Unit = { //獲得批環境 val env = ExecutionEnvironment.getExecutionEnvironment val dataSet = env.fromElements(("小明",15,"男",1500),("小王",45,"男",4000),("小李",25,"女",800),("小慧",35,"女",500)) //獲得Table環境 val tableEnv = TableEnvironment.getTableEnvironment(env) //註冊table tableEnv.registerDataSet("user1",dataSet,'name,'age,'sex,'salary) //分組統計,having是分組條件查詢 tableEnv.sqlQuery(s"select sex,sum(salary) FROM user1 group by sex having sum(salary) >1500") .first(100).print() /** * 輸出結果 * * */ } }
男,5500
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.aggregations.distinct import org.apache.flink.api.scala.{ExecutionEnvironment, _} import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.scala._ object Run { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val dataSet = env.fromElements(("a",15,"male"),("a",45,"female"),("d",25,"male"),("c",35,"female")) val tableEnv = TableEnvironment.getTableEnvironment(env) tableEnv.registerDataSet("user1",dataSet,'name,'age,'sex) /** * 對數據去重 */ tableEnv.sqlQuery("select distinct name FROM user1 ") .first(100).print() /** * 輸出結果 * * a * c * d */ } }
a c d
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.join.innerJoin import org.apache.flink.api.scala.{ExecutionEnvironment, _} import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.scala._ object Run { def main(args: Array[String]): Unit = { //獲得批環境 val env = ExecutionEnvironment.getExecutionEnvironment val dataSet = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(3,"小李",25,"女",800),(4,"小慧",35,"女",500)) val dataSetGrade = env.fromElements((1,"語文",100),(2,"數學",80),(1,"外語",50) ) //獲得Table環境 val tableEnv = TableEnvironment.getTableEnvironment(env) //註冊table tableEnv.registerDataSet("user",dataSet,'id,'name,'age,'sex,'salary) tableEnv.registerDataSet("grade",dataSetGrade,'userId,'name,'fraction) //內鏈接,兩個表 // tableEnv.sqlQuery("select * FROM `user` INNER JOIN grade on `user`.id = grade.userId ") tableEnv.sqlQuery("select `user`.*,grade.name,grade.fraction FROM `user` INNER JOIN grade on `user`.id = grade.userId ") .first(100).print() /** * 輸出結果 * 2,小王,45,男,4000,數學,80 * 1,小明,15,男,1500,語文,100 * 1,小明,15,男,1500,外語,50 */ } }
2,小王,45,男,4000,數學,80 1,小明,15,男,1500,語文,100 1,小明,15,男,1500,外語,50
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.join.leftJoin import org.apache.flink.api.scala.{ExecutionEnvironment, _} import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.scala._ object Run { def main(args: Array[String]): Unit = { //獲得批環境 val env = ExecutionEnvironment.getExecutionEnvironment val dataSet = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(3,"小李",25,"女",800),(4,"小慧",35,"女",500)) val dataSetGrade = env.fromElements((1,"語文",100),(2,"數學",80),(1,"外語",50) ) //獲得Table環境 val tableEnv = TableEnvironment.getTableEnvironment(env) //註冊table tableEnv.registerDataSet("user",dataSet,'id,'name,'age,'sex,'salary) tableEnv.registerDataSet("grade",dataSetGrade,'userId,'name,'fraction) //左鏈接,拿左邊的表中的每一行數據,去關聯右邊的數據,若是有相同的匹配數據,就都匹配出來,若是沒有,就匹配一條,不過右邊的數據爲空 tableEnv.sqlQuery("select `user`.*,grade.name,grade.fraction FROM `user` LEFT JOIN grade on `user`.id = grade.userId ") .first(100).print() /** * 輸出結果 * * 1,小明,15,男,1500,語文,100 * 1,小明,15,男,1500,外語,50 * 2,小王,45,男,4000,數學,80 * 4,小慧,35,女,500,null,null * 3,小李,25,女,800,null,null * * */ } }
1,小明,15,男,1500,語文,100 1,小明,15,男,1500,外語,50 2,小王,45,男,4000,數學,80 4,小慧,35,女,500,null,null 3,小李,25,女,800,null,null
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.join.rightJoin import org.apache.flink.api.scala.{ExecutionEnvironment, _} import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.scala._ object Run { def main(args: Array[String]): Unit = { //獲得批環境 val env = ExecutionEnvironment.getExecutionEnvironment val dataSet = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(3,"小李",25,"女",800),(4,"小慧",35,"女",500)) val dataSetGrade = env.fromElements((1,"語文",100),(2,"數學",80),(1,"外語",50),(10,"外語",90) ) //獲得Table環境 val tableEnv = TableEnvironment.getTableEnvironment(env) //註冊table tableEnv.registerDataSet("user",dataSet,'id,'name,'age,'sex,'salary) tableEnv.registerDataSet("grade",dataSetGrade,'userId,'name,'fraction) //左鏈接,拿左邊的表中的每一行數據,去關聯右邊的數據,若是有相同的匹配數據,就都匹配出來,若是沒有,就匹配一條,不過右邊的數據爲空 tableEnv.sqlQuery("select `user`.*,grade.name,grade.fraction FROM `user` RIGHT JOIN grade on `user`.id = grade.userId ") .first(100).print() /** * 輸出結果 * * 1,小明,15,男,1500,外語,50 * 1,小明,15,男,1500,語文,100 * 2,小王,45,男,4000,數學,80 * null,null,null,null,null,外語,90 * * */ } }
1,小明,15,男,1500,外語,50 1,小明,15,男,1500,語文,100 2,小王,45,男,4000,數學,80 null,null,null,null,null,外語,90
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.join.fullOuterJoin import org.apache.flink.api.scala.{ExecutionEnvironment, _} import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.scala._ object Run { def main(args: Array[String]): Unit = { //獲得批環境 val env = ExecutionEnvironment.getExecutionEnvironment val dataSet = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(3,"小李",25,"女",800),(4,"小慧",35,"女",500)) val dataSetGrade = env.fromElements((1,"語文",100),(2,"數學",80),(1,"外語",50),(10,"外語",90) ) //獲得Table環境 val tableEnv = TableEnvironment.getTableEnvironment(env) //註冊table tableEnv.registerDataSet("user",dataSet,'id,'name,'age,'sex,'salary) tableEnv.registerDataSet("grade",dataSetGrade,'userId,'name,'fraction) //左,右,全匹配全部數據 tableEnv.sqlQuery("select `user`.*,grade.name,grade.fraction FROM `user` FULL OUTER JOIN grade on `user`.id = grade.userId ") .first(100).print() /** * 輸出結果 * * * 3,小李,25,女,800,null,null * 1,小明,15,男,1500,外語,50 * 1,小明,15,男,1500,語文,100 * 2,小王,45,男,4000,數學,80 * 4,小慧,35,女,500,null,null * null,null,null,null,null,外語,90 * * * */ } }
3,小李,25,女,800,null,null 1,小明,15,男,1500,外語,50 1,小明,15,男,1500,語文,100 2,小王,45,男,4000,數學,80 4,小慧,35,女,500,null,null null,null,null,null,null,外語,90
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.setOperations.union import org.apache.flink.api.scala.{ExecutionEnvironment, _} import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.scala._ object Run { def main(args: Array[String]): Unit = { //獲得批環境 val env = ExecutionEnvironment.getExecutionEnvironment val dataSet = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(3,"小李",25,"女",800),(4,"小慧",35,"女",500)) val dataSet2 = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(30,"小李",25,"女",800),(40,"小慧",35,"女",500)) //獲得Table環境 val tableEnv = TableEnvironment.getTableEnvironment(env) //註冊table tableEnv.registerDataSet("user",dataSet,'id,'name,'age,'sex,'salary) tableEnv.registerDataSet("t2",dataSet2,'id,'name,'age,'sex,'salary) /** * union 鏈接兩個表,會去重 */ tableEnv.sqlQuery( "select * from (" +"select t1.* FROM `user` as t1 ) " + + " UNION " + " ( select t2.* FROM t2 )" ) .first(100).print() /** * 輸出結果 * * 30,小李,25,女,800 * 40,小慧,35,女,500 * 2,小王,45,男,4000 * 4,小慧,35,女,500 * 3,小李,25,女,800 * 1,小明,15,男,1500 * */ } }
30,小李,25,女,800 40,小慧,35,女,500 2,小王,45,男,4000 4,小慧,35,女,500 3,小李,25,女,800 1,小明,15,男,1500
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.setOperations.unionAll import org.apache.flink.api.scala.{ExecutionEnvironment, _} import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.scala._ object Run { def main(args: Array[String]): Unit = { //獲得批環境 val env = ExecutionEnvironment.getExecutionEnvironment val dataSet = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(3,"小李",25,"女",800),(4,"小慧",35,"女",500)) val dataSet2 = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(30,"小李",25,"女",800),(40,"小慧",35,"女",500)) //獲得Table環境 val tableEnv = TableEnvironment.getTableEnvironment(env) //註冊table tableEnv.registerDataSet("user",dataSet,'id,'name,'age,'sex,'salary) tableEnv.registerDataSet("t2",dataSet2,'id,'name,'age,'sex,'salary) /** * union 鏈接兩個表,不會去重 */ tableEnv.sqlQuery( "select * from (" +"select t1.* FROM `user` as t1 ) " + + " UNION ALL " + " ( select t2.* FROM t2 )" ) .first(100).print() /** * 輸出結果 * * 1,小明,15,男,1500 * 2,小王,45,男,4000 * 3,小李,25,女,800 * 4,小慧,35,女,500 * 1,小明,15,男,1500 * 2,小王,45,男,4000 * 30,小李,25,女,800 * 40,小慧,35,女,500 * */ } }
1,小明,15,男,1500 2,小王,45,男,4000 3,小李,25,女,800 4,小慧,35,女,500 1,小明,15,男,1500 2,小王,45,男,4000 30,小李,25,女,800 40,小慧,35,女,500
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.setOperations.intersect import org.apache.flink.api.scala.{ExecutionEnvironment, _} import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.scala._ object Run { def main(args: Array[String]): Unit = { //獲得批環境 val env = ExecutionEnvironment.getExecutionEnvironment val dataSet = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(3,"小李",25,"女",800),(4,"小慧",35,"女",500)) val dataSet2 = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(30,"小李",25,"女",800),(40,"小慧",35,"女",500)) //獲得Table環境 val tableEnv = TableEnvironment.getTableEnvironment(env) //註冊table tableEnv.registerDataSet("user",dataSet,'id,'name,'age,'sex,'salary) tableEnv.registerDataSet("t2",dataSet2,'id,'name,'age,'sex,'salary) /** * INTERSECT 鏈接兩個表,找相同的數據(相交的數據,重疊的數據) */ tableEnv.sqlQuery( "select * from (" +"select t1.* FROM `user` as t1 ) " + + " INTERSECT " + " ( select t2.* FROM t2 )" ) .first(100).print() /** * 輸出結果 * * 1,小明,15,男,1500 * 2,小王,45,男,4000 * */ } }
1,小明,15,男,1500 2,小王,45,男,4000
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.setOperations.in import org.apache.flink.api.scala.{ExecutionEnvironment, _} import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.scala._ object Run { def main(args: Array[String]): Unit = { //獲得批環境 val env = ExecutionEnvironment.getExecutionEnvironment val dataSet = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(3,"小李",25,"女",800),(4,"小慧",35,"女",500)) val dataSet2 = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(30,"小李",25,"女",800),(40,"小慧",35,"女",500)) //獲得Table環境 val tableEnv = TableEnvironment.getTableEnvironment(env) //註冊table tableEnv.registerDataSet("user",dataSet,'id,'name,'age,'sex,'salary) tableEnv.registerDataSet("t2",dataSet2,'id,'name,'age,'sex,'salary) /** * in ,子查詢 */ tableEnv.sqlQuery( "select t1.* FROM `user` t1 where t1.id in " + " (select t2.id from t2) " ) .first(100).print() /** * 輸出結果 * * 1,小明,15,男,1500 * 2,小王,45,男,4000 * */ } }
1,小明,15,男,1500 2,小王,45,男,4000
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.setOperations.except import org.apache.flink.api.scala.{ExecutionEnvironment, _} import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.scala._ object Run { def main(args: Array[String]): Unit = { //獲得批環境 val env = ExecutionEnvironment.getExecutionEnvironment val dataSet = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(3,"小李",25,"女",800),(4,"小慧",35,"女",500)) val dataSet2 = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(30,"小李",25,"女",800),(40,"小慧",35,"女",500)) //獲得Table環境 val tableEnv = TableEnvironment.getTableEnvironment(env) //註冊table tableEnv.registerDataSet("user",dataSet,'id,'name,'age,'sex,'salary) tableEnv.registerDataSet("t2",dataSet2,'id,'name,'age,'sex,'salary) /** * EXCEPT 鏈接兩個表,找不相同的數據(不相交的數據,不重疊的數據) */ tableEnv.sqlQuery( "select * from (" +"select t1.* FROM `user` as t1 ) " + + " EXCEPT " + " ( select t2.* FROM t2 )" ) .first(100).print() /** * 輸出結果 * * 3,小李,25,女,800 * 4,小慧,35,女,500 * */ } }
3,小李,25,女,800 4,小慧,35,女,500
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.insert import org.apache.flink.api.scala.typeutils.Types import org.apache.flink.api.scala.{ExecutionEnvironment, _} import org.apache.flink.core.fs.FileSystem.WriteMode import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.scala._ import org.apache.flink.api.scala._ import org.apache.flink.table.sinks.CsvTableSink import org.apache.flink.api.common.typeinfo.TypeInformation object Run { def main(args: Array[String]): Unit = { //獲得批環境 val env = ExecutionEnvironment.getExecutionEnvironment val dataSet = env.fromElements(("小明",15,"男"),("小王",45,"男"),("小李",25,"女"),("小慧",35,"女")) //獲得Table環境 val tableEnv = TableEnvironment.getTableEnvironment(env) //註冊table tableEnv.registerDataSet("user1",dataSet,'name,'age,'sex) // create a TableSink val csvSink = new CsvTableSink("sink-data/csv/a.csv",",",1,WriteMode.OVERWRITE); val fieldNames = Array("name", "age", "sex") val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.INT, Types.STRING) tableEnv.registerTableSink("t2",fieldNames,fieldTypes,csvSink) tableEnv.sqlUpdate(s" insert into t2 select name,age,sex FROM user1 ") env.execute() /** * 輸出結果 * a.csv * * 小明,15,男 * 小王,45,男 * 小李,25,女 * 小慧,35,女 */ } }
小明,15,男 小王,45,男 小李,25,女 小慧,35,女