目標: 從txt文件中讀取數據,寫入es,我這裏用的es7.9,若是用的es7以前的版本下面代碼中有個.type("_doc") 類別須要設置java
若是沒有es和kibana(可選)環境能夠先安裝apache
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.9.3-x86_64.rpm wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.9.3-x86_64.rpm.sha512 shasum -a 512 -c elasticsearch-7.9.3-x86_64.rpm.sha512 sudo rpm --install elasticsearch-7.9.3-x86_64.rpm systemctl restart elasticsearch
wget https://artifacts.elastic.co/downloads/kibana/kibana-7.9.3-x86_64.rpm sudo rpm --install kibana-7.9.3-x86_64.rpm systemctl start kibana
先引入Elasticsearch的pom依賴api
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch7_2.12</artifactId> <version>1.10.1</version> </dependency>
新建一個ElasticsearchSinkTest.scala服務器
package com.mafei.sinktest import java.util import org.apache.flink.api.common.functions.RuntimeContext import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation} import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer} import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink import org.apache.http.HttpHost import org.elasticsearch.client.Requests object ElasticsearchSinkTest { def main(args: Array[String]): Unit = { //建立執行環境 val env = StreamExecutionEnvironment.getExecutionEnvironment val inputStream = env.readTextFile("/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt") env.setParallelism(1) inputStream.print() //先轉換成樣例類類型 val dataStream = inputStream .map(data => { val arr = data.split(",") //按照,分割數據,獲取結果 SensorReadingTest5(arr(0), arr(1).toLong, arr(2).toDouble) //生成一個傳感器類的數據,參數中傳toLong和toDouble是由於默認分割後是字符串類別 }) //定義es的鏈接信息 val httpHosts = new util.ArrayList[HttpHost]() httpHosts.add(new HttpHost("127.0.0.1", 9200)) //自定義寫入es的ElasticsearchSinkFunction val myEsSinkFunc = new ElasticsearchSinkFunction[SensorReadingTest5] { override def process(t: SensorReadingTest5, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = { //定義一個map做爲 數據源 val dataSource = new util.HashMap[String, String]() dataSource.put("id", t.id) dataSource.put("temperature", t.temperature.toString) dataSource.put("ts", t.timestamp.toString) //建立index request ,指定index val indexRequest = Requests.indexRequest() indexRequest.index("sensors") //指定寫入哪個索引 .source(dataSource) //指定寫入的數據 // .type("_doc") //我這裏用的es7已經不須要這個參數了 //執行新增操做 requestIndexer.add(indexRequest) } } dataStream.addSink(new ElasticsearchSink.Builder[SensorReadingTest5](httpHosts, myEsSinkFunc) .build() ) env.execute() } }
代碼結構:
curl
到服務器上查看數據,sensor就是咱們剛塞進去的數據
查看全部索引數據
[root@localhost ~]# curl http://127.0.0.1:9200/_cat/indices
green open .kibana-event-log-7.9.3-000001 NvnP2SI9Q_i-z5bNvsgWhA 1 0 1 0 5.5kb 5.5kb
yellow open sensors PGTeT0MZRJ-4hmYkDQnqIw 1 1 6 0 5.4kb 5.4kb
green open .apm-custom-link IdxoOaP9Sh6ssBd0Q9kPsw 1 0 0 0 208b 208b
green open .kibana_task_manager_1 -qAi_8LmTc2eJsWUQwugtw 1 0 6 3195 434.2kb 434.2kb
green open .apm-agent-configuration FG9PE8CARdyKWrdsAg4gbA 1 0 0 0 208b 208b
green open .kibana_1 uVmly8KaQ5uIXZ-IkArnVg 1 0 18 4 10.4mb 10.4melasticsearch
查看塞進去的數據maven
[root@localhost ~]# curl http://127.0.0.1:9200/sensors/_search {"took":0,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":6,"relation":"eq"},"max_score":1.0,"hits":[{"_index":"sensors","_type":"_doc","_id":"h67gkHUBr1E85RDXoNXP","_score":1.0,"_source":{"temperature":"41.0","id":"sensor1","ts":"1603766281"}},{"_index":"sensors","_type":"_doc","_id":"iK7gkHUBr1E85RDXoNXP","_score":1.0,"_source":{"temperature":"42.0","id":"sensor2","ts":"1603766282"}},{"_index":"sensors","_type":"_doc","_id":"ia7gkHUBr1E85RDXoNXP","_score":1.0,"_source":{"temperature":"43.0","id":"sensor3","ts":"1603766283"}},{"_index":"sensors","_type":"_doc","_id":"iq7gkHUBr1E85RDXoNXP","_score":1.0,"_source":{"temperature":"40.1","id":"sensor4","ts":"1603766240"}},{"_index":"sensors","_type":"_doc","_id":"i67gkHUBr1E85RDXoNXP","_score":1.0,"_source":{"temperature":"20.0","id":"sensor4","ts":"1603766284"}},{"_index":"sensors","_type":"_doc","_id":"jK7gkHUBr1E85RDXoNXP","_score":1.0,"_source":{"temperature":"40.2","id":"sensor4","ts":"1603766249"}}]}}