接着 https://blog.51cto.com/mapengfei/2554700 輸出到kafka和文件,這2種都是隻支持追加模式,那要實現 撤回模式(Retract)和更新插入模式(upsert),大部分場景是在操做數據庫中,像mysql,es,mongo等,這裏實現下輸入到mysql和esjava
再貼一下集中模式的區別:node
啓動一個mysql服務器,新建數據庫test_mafei,裏面建一張表用來存儲輸出數據mysql
DROP TABLE IF EXISTS `sensor_count`; CREATE TABLE `sensor_count` ( `id` varchar(1000) COLLATE utf8_unicode_ci DEFAULT NULL, `counts` bigint(20) DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci; SET FOREIGN_KEY_CHECKS = 1;
package com.mafei.apitest.tabletest import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.DataTypes import org.apache.flink.table.api.scala._ import org.apache.flink.table.descriptors.{Csv, Elasticsearch, FileSystem, Json, Schema} object MysqlOutputTest { def main(args: Array[String]): Unit = { //1 、建立環境 val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val tableEnv = StreamTableEnvironment.create(env) //二、讀取文件 val filePath = "/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt" tableEnv.connect(new FileSystem().path(filePath)) .withFormat(new Csv()) //由於txt裏頭是以,分割的跟csv同樣,因此能夠用oldCsv .withSchema(new Schema() //這個表結構要跟你txt中的內容對的上 .field("id", DataTypes.STRING()) .field("timestamp", DataTypes.BIGINT()) .field("temper", DataTypes.DOUBLE()) ).createTemporaryTable("inputTable") val sensorTable = tableEnv.from("inputTable") //作簡單轉換 val simpleTramsformTable = sensorTable .select("id,temper") .filter("id='sensor1'") //聚合轉換 val aggTable = sensorTable .groupBy('id) .select('id, 'id.count as 'count) //直接打印輸出效果: simpleTramsformTable.toAppendStream[(String, Double)].print("simpleTramsformTable: ") // 寫到mysql中 val sinkMysql : String = """ |create table mysqlOutput ( | id varchar(20) not null, | counts bigint not null | ) with ( | 'connector.type' = 'jdbc', | 'connector.url' = 'jdbc:mysql://10.0.0.97:3306/test_mafei', | 'connector.table' = 'sensor_count', | 'connector.driver' = 'com.mysql.jdbc.Driver', | 'connector.username' = 'root', | 'connector.password' = '' | ) |""".stripMargin tableEnv.sqlUpdate(sinkMysql) aggTable.insertInto("mysqlOutput") env.execute("esOutputTest") } }
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>1.10.1</version> </dependency>
package com.mafei.apitest.tabletest import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.{DataTypes, Table} import org.apache.flink.table.api.scala._ import org.apache.flink.table.descriptors.{Csv, Elasticsearch, FileSystem, Json, Schema} object EsOutputTest { def main(args: Array[String]): Unit = { //1 、建立環境 val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val tableEnv = StreamTableEnvironment.create(env) //二、讀取文件 val filePath = "/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt" tableEnv.connect(new FileSystem().path(filePath)) .withFormat(new Csv()) //由於txt裏頭是以,分割的跟csv同樣,因此能夠用oldCsv .withSchema(new Schema() //這個表結構要跟你txt中的內容對的上 .field("id", DataTypes.STRING()) .field("timestamp", DataTypes.BIGINT()) .field("temper", DataTypes.DOUBLE()) ).createTemporaryTable("inputTable") val sensorTable = tableEnv.from("inputTable") //作簡單轉換 val simpleTramsformTable = sensorTable .select("id,temper") .filter("id='sensor1'") //聚合轉換 val aggTable = sensorTable .groupBy('id) .select('id, 'id.count as 'count) //直接打印輸出效果: simpleTramsformTable.toAppendStream[(String, Double)].print("simpleTramsformTable: ") // 寫到es中 tableEnv.connect( new Elasticsearch() .version("7") .host("localhost", 9200, "http") .index("sensor_test") .documentType("temperature") ) .inUpsertMode() .withFormat(new Json()) .withSchema( new Schema() .field("id", DataTypes.STRING()) .field("count", DataTypes.BIGINT()) ) .createTemporaryTable("esOutputTable") aggTable.insertInto("esOutputTable") env.execute("esOutputTest") } }
# 查看下es有多少索引,能夠看到多了個sensor_test的索引 [root@node71 yum.repos.d]# curl http://127.0.0.1:9200/_cat/indices yellow open sensor_test stoACXcQRl66Xnpcu4e4AQ 1 1 4 2 10.6kb 10.6kb # 查詢數據 [root@node71 yum.repos.d]# curl http://127.0.0.1:9200/sensor_test/_search?pretty { "took" : 86, "timed_out" : false, "_shards" : { "total" : 1, "successful" : 1, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : { "value" : 4, "relation" : "eq" }, "max_score" : 1.0, "hits" : [ { "_index" : "sensor_test", "_type" : "_doc", "_id" : "sensor1", "_score" : 1.0, "_source" : { "id" : "sensor1", "count" : 1 } }, { "_index" : "sensor_test", "_type" : "_doc", "_id" : "sensor2", "_score" : 1.0, "_source" : { "id" : "sensor2", "count" : 1 } } } ] } }