flink1.7.2 tableapi批處理示例

flink1.7.2 tableapi批處理示例

源碼

概述

  • 本文爲flink1.7.2 tableapi批處理示例
  • 主要操做包括: print table,DataSet 轉換成table,Scan,select,as,where / filter,groupBy,distinct,join,leftOuterJoin,rightOuterJoin union,unionAll,intersect,intersectAll,minus,minusAll,in,orderBy,fetch,offset,Sink csv,insert

print table

  • 功能描述: 打印輸出表數據
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.tableapi.convert.dataset

import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._

object Run2 {


  def main(args: Array[String]): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment
    val tableEnv = TableEnvironment.getTableEnvironment(env)

    val dataSet = env.fromElements( (1,"a",10),(2,"b",20), (3,"c",30) )



    //從dataset轉化爲 table
    val table = tableEnv.fromDataSet(dataSet)

    table.first(1000).print()


    /**
      * 打印輸出表數據
      *
      * 1,a,10
      * 2,b,20
      * 3,c,30
      */


  }

}
  • 輸出結果
1,a,10
2,b,20
3,c,30

DataSet 轉換成table

package com.opensourceteams.module.bigdata.flink.example.tableapi.convert.dataset

import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._

object Run1 {


  def main(args: Array[String]): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment
    val tableEnv = TableEnvironment.getTableEnvironment(env)

    val dataSet = env.fromElements( (1,"a",10),(2,"b",20), (3,"c",30) )



    //從dataset轉化爲 table
    val table = tableEnv.fromDataSet(dataSet)

    //註冊table
    tableEnv.registerTable("user1",table)


    //查詢table 全部數據
    tableEnv.scan("user1").first(10)

      //print 輸出 (至關於sink)
      .print()


    /**
      * 輸出結果
      * 
      * 1,a,10
      * 2,b,20
      * 3,c,30
      */



  }

}
  • 輸出結果
1,a,10
2,b,20
3,c,30

Scan

  • 功能描述: 查詢表中全部數據
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.scan

import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._

object Run {


  def main(args: Array[String]): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment
    val tableEnv = TableEnvironment.getTableEnvironment(env)

    val dataSet = env.fromElements( (1,"a",10),(2,"b",20), (3,"c",30) )



    //從dataset轉化爲 table
    val table = tableEnv.fromDataSet(dataSet)

    //註冊table
    tableEnv.registerTable("user1",table)


    //查詢table 全部數據
    tableEnv.scan("user1").first(100)

      //print 輸出 (至關於sink)
      .print()


    /**
      * 輸出結果
      *
      * 1,a,10
      * 2,b,20
      * 3,c,30
      */



  }

}
  • 輸出結果
1,a,10
2,b,20
3,c,30

select

  • 功能描述: 選擇表中須要的字段
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.select

import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._

object Run {


  def main(args: Array[String]): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment
    val tableEnv = TableEnvironment.getTableEnvironment(env)

    val dataSet = env.fromElements( (1,"a",10),(2,"b",20), (3,"c",30) )



    //從dataset轉化爲 table
    val table = tableEnv.fromDataSet(dataSet)

    //註冊table
    tableEnv.registerTable("user1",table)


    //查詢table 全部數據
    tableEnv.scan("user1")
      //選擇須要的字段
      .select('_1,'_2,'_3)
      .first(100)

      //print 輸出 (至關於sink)
      .print()


    /**
      * 輸出結果
      *
      * 1,a,10
      * 2,b,20
      * 3,c,30
      */



  }

}
  • 輸出結果
1,a,10
2,b,20
3,c,30

as

  • 功能描述: 重命名字段名稱
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.as

import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._

object Run {


  def main(args: Array[String]): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment
    val tableEnv = TableEnvironment.getTableEnvironment(env)

    val dataSet = env.fromElements( (1,"a",10),(2,"b",20), (3,"c",30) )



    //從dataset轉化爲 table
    val table = tableEnv.fromDataSet(dataSet)

    //註冊table
    tableEnv.registerTable("user1",table)


    //查詢table 全部數據
    tableEnv.scan("user1")

      //重命令字段名稱
      .as('id,'name,'value)
      //選擇須要的字段
      .select('id,'name,'value)
      .first(100)

      //print 輸出 (至關於sink)
      .print()


    /**
      * 輸出結果
      *
      * 1,a,10
      * 2,b,20
      * 3,c,30
      */



  }

}
  • 輸出結果
1,a,10
2,b,20
3,c,30

as

  • 功能描述: 重命名字段名稱
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.as

import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._

object Run {


  def main(args: Array[String]): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment
    val tableEnv = TableEnvironment.getTableEnvironment(env)

    val dataSet = env.fromElements( (1,"a",10),(2,"b",20), (3,"c",30) )



    //從dataset轉化爲 table
    val table = tableEnv.fromDataSet(dataSet)

    //註冊table
    tableEnv.registerTable("user1",table)


    //查詢table 全部數據
    tableEnv.scan("user1")

      //重命令字段名稱
      .as('id,'name,'value)
      //選擇須要的字段
       .select('id,'name as 'name2,'value)
      .first(100)

      //print 輸出 (至關於sink)
      .print()


    /**
      * 輸出結果
      *
      * 1,a,10
      * 2,b,20
      * 3,c,30
      */



  }

}
  • 輸出結果
1,a,10
2,b,20
3,c,30

where / filter (過濾字段,字符串)

  • 功能描述: 條件過濾
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.where

import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._

object Run {


  def main(args: Array[String]): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment
    val tableEnv = TableEnvironment.getTableEnvironment(env)

    val dataSet = env.fromElements( (1,"a",10),(2,"b",20), (3,"c",30), (4,"c",20) )



    //從dataset轉化爲 table
    val table = tableEnv.fromDataSet(dataSet)

    //註冊table
    tableEnv.registerTable("user1",table)


    //查詢table 全部數據
    tableEnv.scan("user1")

      //重命令字段名稱
      .as('id,'name,'value)
      //選擇須要的字段
      .select('id,'name,'value)
      //條件過濾
      .where("value=20")
      .where("id=4")

      .first(100)

      //print 輸出 (至關於sink)
      .print()


    /**
      * 輸出結果
      *
      * 1,a,10
      * 2,b,20
      * 3,c,30
      */



  }

}
  • 輸出結果
4,c,20

where / filter (過濾字段,表達式)

  • 功能描述: 過濾數據
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.where

import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._

object Run2 {


  def main(args: Array[String]): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment
    val tableEnv = TableEnvironment.getTableEnvironment(env)

    val dataSet = env.fromElements( (1,"a",10),(2,"b",20), (3,"c",30), (4,"c",20) )



    //從dataset轉化爲 table
    val table = tableEnv.fromDataSet(dataSet)

    //註冊table
    tableEnv.registerTable("user1",table)


    //查詢table 全部數據
    tableEnv.scan("user1")

      //重命令字段名稱
      .as('id,'name,'value)
      //選擇須要的字段
      .select('id,'name,'value)
      //條件過濾
      .where('value === 20)
      .where('id === 4)


      .first(100)

      //print 輸出 (至關於sink)
      .print()


    /**
      * 輸出結果
      *
      * 1,a,10
      * 2,b,20
      * 3,c,30
      */



  }

}
  • 輸出結果
4,c,20

groupBy

  • 功能描述: 分組統計
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.groupBy

import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._

object Run {


  def main(args: Array[String]): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment
    val tableEnv = TableEnvironment.getTableEnvironment(env)

    val dataSet = env.fromElements( (1,"a",10),(2,"b",20), (3,"c",30), (4,"c",40) )



    //從dataset轉化爲 table
    val table = tableEnv.fromDataSet(dataSet)

    //註冊table
    tableEnv.registerTable("user1",table)


    //查詢table 全部數據
    tableEnv.scan("user1")

      //重命令字段名稱
      .as('id,'name,'value)

      //選擇須要的字段


      .groupBy('name)

      .select('name,'value.sum as 'value)





      .first(100)

      //print 輸出 (至關於sink)
      .print()


    /**
      * 輸出結果
      *
      * 1,a,10
      * 2,b,20
      * 3,c,30
      */



  }

}
  • 輸出結果
  • 70 = 30 + 40
a,10
b,20
c,70

distinct

  • 功能描述: 查詢記錄去重
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.distinct

import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._

object Run {


  def main(args: Array[String]): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment
    val tableEnv = TableEnvironment.getTableEnvironment(env)

    val dataSet = env.fromElements( (1,"a",10),(1,"a",10),(2,"b",20), (3,"c",30) )



    //從dataset轉化爲 table
    val table = tableEnv.fromDataSet(dataSet)

    //註冊table
    tableEnv.registerTable("user1",table)


    //查詢table 全部數據
    tableEnv.scan("user1")
      //記錄去重
      .distinct()


      .first(100)

      //print 輸出 (至關於sink)
      .print()


    /**
      * 輸出結果
      *
      * 1,a,10
      * 2,b,20
      * 3,c,30
      */



  }

}
  • 輸出結果
1,a,10
3,c,30
2,b,20

distinct

  • 功能描述: sum.distinct ,去掉字段重複的再求和
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.distinct

import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._

object Run2 {


  def main(args: Array[String]): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment
    val tableEnv = TableEnvironment.getTableEnvironment(env)

    val dataSet = env.fromElements( (1,"a",10),(1,"a",10),(2,"b",20), (3,"c",30),(20,"b",20) )



    //從dataset轉化爲 table
    val table = tableEnv.fromDataSet(dataSet)

    //註冊table
    tableEnv.registerTable("user1",table)


    //查詢table 全部數據
    tableEnv.scan("user1")
      //去掉字段重複的再求和
      .select('_3.sum.distinct)


      .first(100)

      //print 輸出 (至關於sink)
      .print()


    /**
      * 輸出結果
      * 60
      */



  }

}
  • 輸出結果
60

join

  • 功能描述: 內鏈接git

  • scala 程序github

package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.innerJoin

import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._

object Run {


  def main(args: Array[String]): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment
    val tableEnv = TableEnvironment.getTableEnvironment(env)

    val dataSet = env.fromElements( (1,"a",10),(2,"b",20), (3,"c",30) )
    val dataSet2 = env.fromElements( (1,"a",100),(20,"b",20), (30,"c",30) )



    //列不能重複
    val table = tableEnv.fromDataSet(dataSet,'a,'b,'c)
    val table2 = tableEnv.fromDataSet(dataSet2,'d,'e,'f)



   table.join(table2).where(" a = d ").first(1000).print()






  }

}
  • 輸出結果
1,a,10,1,a,100

leftOuterJoin

  • 功能描述: 左外鏈接,用左表中的每個元素,去鏈接右表中的元素,若是右表中存在,就匹配值,如呆不存在就爲空值
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.leftOuterJoin

import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._

object Run {


  def main(args: Array[String]): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment
    val tableEnv = TableEnvironment.getTableEnvironment(env)

    val dataSet = env.fromElements( (1,"a",10),(2,"b",20), (3,"c",30) )
    val dataSet2 = env.fromElements( (1,"a",100),(20,"b",20), (30,"c",30) )



    //列不能重複
    val table = tableEnv.fromDataSet(dataSet,'a,'b,'c)
    val table2 = tableEnv.fromDataSet(dataSet2,'d,'e,'f)



   //table.leftOuterJoin(table2,"a=d").first(1000).print()
   table.leftOuterJoin(table2,'a === 'd).first(1000).print()


    /**
      * 輸出結果
      *
      * 2,b,20,null,null,null
      * 1,a,10,1,a,100
      * 3,c,30,null,null,null
      */



  }

}
  • 輸出結果
1,a,10,1,a,100
2,b,20,null,null,null
3,c,30,null,null,null

rightOuterJoin

  • 功能描述: 右外鏈接,用右表中的每個元素,去鏈接左表中的元素,若是左表中存在,就匹配值,如呆不存在就爲空值
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.rightOuterJoin

import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._

object Run {


  def main(args: Array[String]): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment
    val tableEnv = TableEnvironment.getTableEnvironment(env)

    val dataSet = env.fromElements( (1,"a",10),(2,"b",20), (3,"c",30) )
    val dataSet2 = env.fromElements( (1,"a",100),(20,"b",20), (30,"c",30) )



    //列不能重複
    val table = tableEnv.fromDataSet(dataSet,'a,'b,'c)
    val table2 = tableEnv.fromDataSet(dataSet2,'d,'e,'f)



   table.rightOuterJoin(table2,"a = d").first(1000).print()


    /**
      * 輸出結果
      *
      *
      * null,null,null,20,b,20
      * null,null,null,30,c,30
      * 1,a,10,1,a,100
      */



  }

}
  • 輸出結果
null,null,null,20,b
null,null,null,30,c
1,a,10,1,a,100

union

  • 功能描述: 兩個表串連,取並集(會去重)
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.union

import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._

object Run {


  def main(args: Array[String]): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment
    val tableEnv = TableEnvironment.getTableEnvironment(env)

    val dataSet = env.fromElements( (1,"a",10),(2,"b",20), (3,"c",30) )
    val dataSet2 = env.fromElements( (1,"a",100),(2,"b",20),(20,"b",20), (30,"c",30) )



    //列不能重複
    val table = tableEnv.fromDataSet(dataSet,'a,'b,'c)
    val table2 = tableEnv.fromDataSet(dataSet2,'d,'e,'f)



   table.union(table2).first(1000).print()

    /**
      * 輸出結果
      *
      * 30,c,30
      * 1,a,100
      * 2,b,20
      * 20,b,20
      * 1,a,10
      * 3,c,30
      */






  }

}
  • 輸出結果
30,c,30
1,a,100
2,b,20
20,b,20
1,a,10
3,c,30

unionAll 兩個表串連,取並集(不會去重)

  • 功能描述:
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.unionAll

import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._

object Run {


  def main(args: Array[String]): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment
    val tableEnv = TableEnvironment.getTableEnvironment(env)

    val dataSet = env.fromElements( (1,"a",10),(2,"b",20), (3,"c",30) )
    val dataSet2 = env.fromElements( (1,"a",100),(2,"b",20),(20,"b",20), (30,"c",30) )



    //列不能重複
    val table = tableEnv.fromDataSet(dataSet,'a,'b,'c)
    val table2 = tableEnv.fromDataSet(dataSet2,'d,'e,'f)



   table.unionAll(table2).first(1000).print()

    /**
      * 輸出結果
      *
      * 30,c,30
      * 1,a,100
      * 2,b,20
      * 20,b,20
      * 1,a,10
      * 3,c,30
      */






  }

}
  • 輸出結果
1,a,10
2,b,20
3,c,30
1,a,100
2,b,20
20,b,20
30,c,30

intersect,兩個表相鏈接,取交集 (會去重)

  • 功能描述:
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.intersect

import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._

object Run {


  def main(args: Array[String]): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment
    val tableEnv = TableEnvironment.getTableEnvironment(env)

    val dataSet = env.fromElements( (1,"a",10),(2,"b",20), (3,"c",30) )
    val dataSet2 = env.fromElements( (1,"a",100),(2,"b",20),(20,"b",20), (30,"c",30) )



    //列不能重複
    val table = tableEnv.fromDataSet(dataSet,'a,'b,'c)
    val table2 = tableEnv.fromDataSet(dataSet2,'d,'e,'f)



   table.intersect(table2).first(1000).print()

    /**
      * 輸出結果
      *
      * 2,b,20
      */






  }

}
  • 輸出結果
2,b,20

intersectAll,兩個表相鏈接,取交集 (不會去重)

  • 功能描述:
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.intersectAll

import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._

object Run {


  def main(args: Array[String]): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment
    val tableEnv = TableEnvironment.getTableEnvironment(env)


    val dataSet = env.fromElements( (1,"a",10),(2,"b",20),(2,"b",20),(2,"b",20), (3,"c",30) )
    val dataSet2 = env.fromElements( (1,"a",100),(2,"b",20),(2,"b",20),(20,"b",20), (30,"c",30) )



    //列不能重複
    val table = tableEnv.fromDataSet(dataSet,'a,'b,'c)
    val table2 = tableEnv.fromDataSet(dataSet2,'d,'e,'f)



   table.intersectAll(table2).first(1000).print()

    /**
      * 輸出結果
      *
      * 2,b,20
      */






  }

}
  • 輸出結果
2,b,20
2,b,20

minus

  • 功能描述: 左表不存在於右表中的數據,會去重
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.minus

import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._

object Run {


  def main(args: Array[String]): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment
    val tableEnv = TableEnvironment.getTableEnvironment(env)
    env.setParallelism(1)


    val dataSet = env.fromElements( (1,"a",10),(2,"b",20),(2,"b",20),(2,"b",20), (3,"c",30) )
    val dataSet2 = env.fromElements( (1,"a",100),(2,"b",20),(2,"b",20),(20,"b",20), (30,"c",30) )




    val table = tableEnv.fromDataSet(dataSet,'a,'b,'c)
    val table2 = tableEnv.fromDataSet(dataSet2,'d,'e,'f)


    /**
      * 左表不存在於右表中的數據,會去重
      */
   table.minus(table2).first(1000).print()

    /**
      * 輸出結果
      * 1,a,10
      * 3,c,30
      */






  }

}
  • 輸出結果
1,a,10
3,c,30

minusAll

  • 功能描述: 左表不存在於右表中的數據,不會去重,若是左表某個元素有n次,右表中有m次,那這個元素出現的是n - m次
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.minusAll

import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._

object Run {


  def main(args: Array[String]): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment
    val tableEnv = TableEnvironment.getTableEnvironment(env)
    env.setParallelism(1)


    val dataSet = env.fromElements( (1,"a",10),(2,"b",20),(2,"b",20),(2,"b",20),(2,"b",20), (3,"c",30) )
    val dataSet2 = env.fromElements( (1,"a",100),(2,"b",20),(2,"b",20),(20,"b",20), (30,"c",30) )




    val table = tableEnv.fromDataSet(dataSet,'a,'b,'c)
    val table2 = tableEnv.fromDataSet(dataSet2,'d,'e,'f)


    /**
      * 左表不存在於右表中的數據,不會去重,若是左表某個元素有n次,右表中有m次,那這個元素出現的是n - m次
      */
   table.minusAll(table2).first(1000).print()

    /**
      * 輸出結果
      *
      * 1,a,10
      * 2,b,20
      * 2,b,20
      * 3,c,30
      */






  }

}
  • 輸出結果
1,a,10
2,b,20
2,b,20
3,c,30

in

  • 功能描述:表和子表的關係,子查詢只能由一列組成, 表的查詢條件的列類型須要和子查詢保持一致, 若是子查詢中的值在表中存在就返回真,這個元素就知足條件能夠被返回來
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.in

import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._

object Run {


  def main(args: Array[String]): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment
    val tableEnv = TableEnvironment.getTableEnvironment(env)

    val dataSet = env.fromElements( (1,"a",10),(2,"b",20), (3,"c",30) )
    val dataSet2 = env.fromElements( (1,"a",100),(20,"b",20), (30,"c",30) )



    //列不能重複
    val table = tableEnv.fromDataSet(dataSet,'a,'b,'c)
    val table2 = tableEnv.fromDataSet(dataSet2,'d)


    /**
      * 表和子表的關係
      * 子查詢只能由一列組成,表的查詢條件的列類型須要和子查詢保持一致
      * 若是子查詢中的值在表中存在就返回真,這個元素就知足條件能夠被返回來
      */
   table.where('a.in(table2))

      .first(1000).print()


    /**
      * 輸出結果
      *
      * 1,a,10
      */



  }

}
  • 輸出結果
1,a,10

orderBy

  • 功能描述: 按指定列的升序或降序排序(是按分區來排序的)
  • 經測試只能按一列進行排騙子
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.orderBy

import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._

object Run {


  def main(args: Array[String]): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment
    val tableEnv = TableEnvironment.getTableEnvironment(env)

    env.setParallelism(1)

    val dataSet = env.fromElements( (1,"a",10),(2,"b",20) ,(20,"f",200),(3,"c",30) )



    //從dataset轉化爲 table
    val table = tableEnv.fromDataSet(dataSet)

    //註冊table
    tableEnv.registerTable("user1",table)


    //查詢table 全部數據
    tableEnv.scan("user1").as('id,'name,'value1)
      //.orderBy('id.asc)  //按id列,升序排序(注意是按分區來排序)
      .orderBy('id.desc)
      //.orderBy('value1.asc)

      .first(1000)

      //print 輸出 (至關於sink)
      .print()


    /**
      * 輸出結果
      * 
      * 20,f,200
      * 3,c,30
      * 2,b,20
      * 1,a,10
      */



  }

}
  • 輸出結果
20,f,200
3,c,30
2,b,20
1,a,10

fetch

  • 功能描述: 先進行排序後,取前幾個元素
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.fetch

import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._

object Run {


  def main(args: Array[String]): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment
    val tableEnv = TableEnvironment.getTableEnvironment(env)

    env.setParallelism(1)

    val dataSet = env.fromElements( (1,"a",10),(2,"b",20) ,(20,"f",200),(3,"c",30) )



    //從dataset轉化爲 table
    val table = tableEnv.fromDataSet(dataSet)

    //註冊table
    tableEnv.registerTable("user1",table)


    //查詢table 全部數據
    tableEnv.scan("user1").as('id,'name,'value1)
      //.orderBy('id.asc)  //按id列,升序排序(注意是按分區來排序)
       .orderBy('id.desc)

      .fetch(2)  //只有有序的才能用,只取了2個元素

      .first(1000)
      //print 輸出 (至關於sink)
      .print()


    /**
      * 輸出結果
      *
      * 20,f,200
      * 3,c,30
      */



  }

}
  • 輸出結果
20,f,200
3,c,30

offset

  • 功能描述: 只有有序的才能用,偏移了2個元素
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.offset

import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._

object Run {


  def main(args: Array[String]): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment
    val tableEnv = TableEnvironment.getTableEnvironment(env)

    env.setParallelism(1)

    val dataSet = env.fromElements( (1,"a",10),(2,"b",20) ,(20,"f",200),(3,"c",30) )



    //從dataset轉化爲 table
    val table = tableEnv.fromDataSet(dataSet)

    //註冊table
    tableEnv.registerTable("user1",table)


    //查詢table 全部數據
    tableEnv.scan("user1").as('id,'name,'value1)
      //.orderBy('id.asc)  //按id列,升序排序(注意是按分區來排序)
       .orderBy('id.desc)

      .offset(2)  //只有有序的才能用,偏移了2個元素

      .first(1000)
      //print 輸出 (至關於sink)
      .print()


    /**
      * 輸出結果
      *
      * 2,b,20
      * 1,a,10
      */



  }

}
  • 輸出結果
2,b,20
1,a,10

Sink csv

  • 功能描述:
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.tableapi.sink.csv

import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.table.api.{TableEnvironment, Types}
import org.apache.flink.table.sinks.CsvTableSink

object Run {


  def main(args: Array[String]): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment
    val tableEnv = TableEnvironment.getTableEnvironment(env)

    val dataSet = env.fromElements( (1,"a",10),(2,"b",20), (3,"c",30) )



    //從dataset轉化爲 table
    val table = tableEnv.fromDataSet(dataSet)


    val cvsTableSink = new CsvTableSink("sink-data/csv/a.csv",
      ",",
      1,
      WriteMode.OVERWRITE
        )

    val fieldNames: Array[String] = Array("id", "name", "value")
    val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.STRING, Types.INT)


    tableEnv.registerTableSink("cvsTableSink",fieldNames,fieldTypes,cvsTableSink)

    table.insertInto("cvsTableSink")


    env.execute()





  }

}
  • 輸出結果
1,a,10
2,b,20
3,c,30

insert

  • 功能描述: 往一個表中插入數據,至關於sink
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.insert

import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.table.api.{TableEnvironment, Types}
import org.apache.flink.table.api.scala._
import org.apache.flink.table.sinks.CsvTableSink

object Run {


  def main(args: Array[String]): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment
    val tableEnv = TableEnvironment.getTableEnvironment(env)

    val dataSet = env.fromElements( (1,"a",10),(2,"b",20), (3,"c",30) )



    //從dataset轉化爲 table
    val table = tableEnv.fromDataSet(dataSet)


    val cvsTableSink = new CsvTableSink("/opt/n_001_workspaces/bigdata/flink/flink-maven-scala-2/sink-data/csv/a.csv",
      ",",
      1,
      WriteMode.OVERWRITE
        )

    val fieldNames: Array[String] = Array("id", "name", "value")
    val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.STRING, Types.INT)


    tableEnv.registerTableSink("cvsTableSink",fieldNames,fieldTypes,cvsTableSink)

    table.insertInto("cvsTableSink")


    env.execute()





  }

}
  • 輸出結果
  • a.csv
1,a,10
2,b,20
3,c,30
相關文章
相關標籤/搜索