flink sql使用kafka做爲source和sink

你們都知道sql有着簡單,直接,容易上手等優點,因此如今大有用sql去掉api的趨勢。那麼咱們少說廢話,下面先上個sql的列子node

val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.enableCheckpointing(10000)
    env.setParallelism(1)
    //注入數據源
    var tableEnv: StreamTableEnvironment  = TableEnvironment.getTableEnvironment(env)
    tableEnv.registerExternalCatalog("kafka", new UDMExternalCatalog())
    tableEnv.sqlUpdate(
      s"""INSERT INTO `kafka.kafka-k8s.pb_sink_test`
         |select
         |fstDeptSet,
         |filedName1,
         |filedName2,
         |userId,
         |brandNames
         |from kafka.`kafka-k8s`.`pb_internal_test`
         | """.stripMargin)
    env.execute("Flink SQL Skeleton")

上面是一個查詢,插入語句,在flink中會被轉爲一個任務進行提交sql

下面咱們大概講一下flink內部kafka的實例化過程api

有圖可知,主要分爲4大步驟,先經過calcite分析sql,轉爲相應的relnode,在根據用戶配置的schema和Java spi,過濾出須要的kafka produce和kafka consumer版本。scala

kafka consumer對應於select部分code

kafka produce對應於insert部分blog

相關文章
相關標籤/搜索