你們都知道sql有着簡單,直接,容易上手等優點,因此如今大有用sql去掉api的趨勢。那麼咱們少說廢話,下面先上個sql的列子node
val env = StreamExecutionEnvironment.getExecutionEnvironment env.enableCheckpointing(10000) env.setParallelism(1) //注入數據源 var tableEnv: StreamTableEnvironment = TableEnvironment.getTableEnvironment(env) tableEnv.registerExternalCatalog("kafka", new UDMExternalCatalog()) tableEnv.sqlUpdate( s"""INSERT INTO `kafka.kafka-k8s.pb_sink_test` |select |fstDeptSet, |filedName1, |filedName2, |userId, |brandNames |from kafka.`kafka-k8s`.`pb_internal_test` | """.stripMargin) env.execute("Flink SQL Skeleton")
上面是一個查詢,插入語句,在flink中會被轉爲一個任務進行提交sql
下面咱們大概講一下flink內部kafka的實例化過程api
有圖可知,主要分爲4大步驟,先經過calcite分析sql,轉爲相應的relnode,在根據用戶配置的schema和Java spi,過濾出須要的kafka produce和kafka consumer版本。scala
kafka consumer對應於select部分code
kafka produce對應於insert部分blog