因爲近期在研究ELK和最新的實時計算框架Flink,因此把之前熱力圖項目flume+kafka+SparkStreaming+mysql+ssm+高德地圖熱力圖項目換組件重構一下。效果不會變好,可能更麻煩,性能更低,純屬應用一下最近研究的新組件和新計算框架。css
filebeat 6.2.0
kafka 0.8.2
Flink 1.6.1
ElasticSearch 6.4.0
springboot 2.1.5
scala 2.11html
import random import time phone = [ "13869555210", "18542360152", "15422556663", "18852487210", "13993584664", "18754366522", "15222436542", "13369568452", "13893556666", "15366698558" ] location = [ "123.449169, 41.740567", "123.450169, 41.740705", "123.451169, 41.741405", "123.452169, 41.741805", "123.453654, 41.742405", "123.454654, 41.742805", "123.455654, 41.743405", "123.458654, 41.743705" ] def sample_phone(): return random.sample(phone, 1)[0] def sample_location(): return random.sample(location, 1)[0] def generator_log(count=10): time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) f = open("/var/log/lyh.log", "a+") while count >= 1: query_log = "{phone}\t{location}\t{date}".format(phone=sample_phone(), location=sample_location(), date=time_str) f.write(query_log + "\n") # print query_log count = count - 1 if __name__ == '__main__': while True: generator_log(100) time.sleep(5)
把代碼上傳到linux環境運行,腳本功能向/var/log/lyh.log文件內 每隔5秒隨即生成100條數據。內容就是電話號碼+經緯度+時間,後期用Flink實時處理時候須要拿到經緯度信息。java
filebeat是ELK日誌收集系統體系裏抓取日誌的插件,咱們這裏爲了應用一下用他來抓取咱們上面Python腳本生成的數據。
修改filebeat.yml配置文件,配置監控抓取信息的文件,和輸出的位置python
filebeat.prospectors: - type: log #抓取信息後以log格式json字符串輸出 paths: - /var/log/lyh.log #監控抓取數據的文件 filebeat.config.modules: path: ${path.config}/modules.d/*.yml reload.enabled: false #若是不適用logstash對日誌進行過濾,也能夠直接輸出到es #output.elasticsearch: # hosts: ["172.24.112.17:9200"] # #輸出到kafka output.kafka: hosts: ["hadoop1:9092", "hadoop2:9092", "hadoop3:9092"] topic: 'log'
注意:
filebeat和kafka的版本必定要兼容否者報錯,具體哪一個版本之間互相兼容參考官方文檔https://www.elastic.co/guide/en/beats/filebeat/6.4/kafka-output.html
啓動filebeat命令mysql
sudo -u elk ./filebeat -e -c filebeat.yml -d "publish"
pom依賴:jquery
<dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.11.8</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>1.6.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.11</artifactId> <version>1.6.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_2.11</artifactId> <version>1.6.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>1.6.1</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.8 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.8_2.11</artifactId> <version>1.6.1</version> </dependency> <!-- https://mvnrepository.com/artifact/log4j/log4j --> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.36</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch6_2.11</artifactId> <version>1.6.1</version> </dependency> </dependencies>
Flink代碼:linux
import java.text.SimpleDateFormat import java.util.{Date, Properties} import com.alibaba.fastjson.JSON import org.apache.flink.streaming.connectors.kafka._ import org.apache.flink.streaming.util.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala._ import org.apache.flink.api.common.functions.RuntimeContext import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink import org.apache.http.HttpHost import org.elasticsearch.action.index.IndexRequest import org.elasticsearch.client.Requests object Flink_kafka { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment // 很是關鍵,必定要設置啓動檢查點!! env.enableCheckpointing(5000) //配置kafka信息 val props = new Properties() props.setProperty("bootstrap.servers", "192.168.199.128:9092,192.168.199.131:9092,192.168.199.132:9092") props.setProperty("zookeeper.connect", "192.168.199.128:2181,192.168.199.131:2181,192.168.199.132:2181") props.setProperty("group.id", "test") //讀取數據,第一個參數是kafka的topic,也就是上面filebeat配置文件裏面設定的topic叫log val consumer = new FlinkKafkaConsumer08[String]("log", new SimpleStringSchema(), props) //設置只讀取最新數據 consumer.setStartFromLatest() //添加kafka爲數據源 //18542360152 116.410588, 39.880172 2019-05-24 23:43:38 val stream = env.addSource(consumer).map( x=>{ JSON.parseObject(x) } ).map(x=>{ x.getString("message") }).map(x=>{ val jingwei=x.split("\\t")(1) val wei=jingwei.split(",")(0).trim val jing=jingwei.split(",")(1).trim //調一下時間格式,es裏面存儲時間默認是UTC格式日期,+0800是設置成北京時區 val sdf=new SimpleDateFormat("yyyy-MM-dd\'T\'HH:mm:ss.SSS+0800") val time=sdf.format(new Date()) val resultStr=wei+","+jing+","+time resultStr }) stream.print() //數據清洗之後是這種樣子 123.450169,41.740705,2019-05-26T19:03:59.281+0800 //把清洗好的數據存入es中,數據入庫 val httpHosts = new java.util.ArrayList[HttpHost] httpHosts.add(new HttpHost("192.168.199.128", 9200, "http"))//es的client經過http請求鏈接到es進行增刪改查操做 val esSinkBuilder = new ElasticsearchSink.Builder[String]( httpHosts, new ElasticsearchSinkFunction[String]{ //參數element就是上面清洗好的數據格式 def createIndexRequest(element: String):IndexRequest={ val json = new java.util.HashMap[String, String] json.put("wei", element.split(",")(0)) json.put("jing", element.split(",")(1)) json.put("time", element.split(",")(2)) return Requests.indexRequest() .index("location-index") .`type`("location") .source(json) } override def process(element: String, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = { requestIndexer.add(createIndexRequest(element)) } } ) //批量請求的配置;這將指示接收器在每一個元素以後發出請求,不然將對它們進行緩衝。 esSinkBuilder.setBulkFlushMaxActions(1) stream.addSink(esSinkBuilder.build()) env.execute("Kafka_Flink") } }
注意:
ES存儲時間時候的格式和時區問題
elasticsearch原生支持date類型,json格式經過字符來表示date類型。
因此在用json提交日期至elasticsearch的時候,es會隱式轉換,把es認爲是date類型的字符串直接轉爲date類型。web
date類型是包含時區信息的,若是咱們沒有在json表明日期的字符串中顯式指定時區,對es來講沒什麼問題,
可是若是經過kibana顯示es裏的數據時,就會出現問題,數據的時間會晚8個小時。ajax
kibana在經過瀏覽器展現的時候,會經過js獲取當前客戶端機器所在的時區,也就是東八區,因此kibana會把從es獲得的日期數據減去8小時。
最佳實踐方案就是:往es提交日期數據時,直接提交帶有時區信息的日期字符串,
如:「2016-07-15T12:58:17.136+0800」
。 這個是世界協調時間(UTC)格式-es默認支持的格式
java格式化:spring
String FULL_FORMAT="yyyy-MM-dd\'T\'HH:mm:ss.SSS+0800"; Date now=new Date(); new SimpleDateFormat(FULL_FORMAT).format(now)
整個web項目的pom依賴:
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-tomcat</artifactId> <scope>provided</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!--JavaServer Pages Standard Tag Library,JSP標準標籤庫--> <dependency> <groupId>javax.servlet</groupId> <artifactId>jstl</artifactId> </dependency> <!--內置tocat對Jsp支持的依賴,用於編譯Jsp--> <dependency> <groupId>org.apache.tomcat.embed</groupId> <artifactId>tomcat-embed-jasper</artifactId> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>6.4.3</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.11.1</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.36</version> </dependency> </dependencies>
4.1咱們要從es中查出距離當前時間20秒之內的全部數據,而且按經緯度聚合統計數量。
es的查詢語句:
QQ截圖20190526193643.png
使用聚合查詢以前要先設置一下mapping,把jing和wei的屬性fielddata設置成true,默認是false。不改爲true進行聚會查詢會報錯。
上面語句是先查出距離當前時間20秒內的全部數據,而後根據jing和wei數據進行聚合也就是sql裏的group by,聚會之後統計總數。意思就是當前經緯度內的總人數,數越大表明該區域人越多。
web代碼裏面咱們要把上述查詢語句經過es的api換成java代碼實現。
先建立一個Location實體類,來存放查詢出來的數據,總數,經度,緯度。
public class Location { private Integer count; private double wei; private double jing; public Integer getCount() { return count; } public void setCount(Integer count) { this.count = count; } public double getWei() { return wei; } public void setWei(double wei) { this.wei = wei; } public double getJing() { return jing; } public void setJing(double jing) { this.jing = jing; } }
寫一個es的工具類,建立和es鏈接的client,一些基本增刪改查方法,以及上面查詢語句的java代碼實現
import com.test.flink_web_show.controller.Location; import org.apache.http.HttpHost; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.script.ScriptType; import org.elasticsearch.script.mustache.SearchTemplateRequest; import org.elasticsearch.script.mustache.SearchTemplateResponse; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.bucket.terms.Terms; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; public class RestClientUtils { private RestHighLevelClient client = null; public RestClientUtils() { if (client == null){ synchronized (RestHighLevelClient.class){ if (client == null){ client = getClient(); } } } } private RestHighLevelClient getClient(){ RestHighLevelClient client = new RestHighLevelClient( RestClient.builder( new HttpHost("192.168.199.128", 9200, "http"), new HttpHost("192.168.199.128", 9201, "http"))); return client; } public void closeClient(){ try { if (client != null){ client.close(); } } catch (IOException e) { e.printStackTrace(); } } /*------------------------------------------------ search Api 多條件查詢 start ----------------------------------------------*/ /** * 查詢模板 * @throws Exception */ public List<Location> searchTemplate(String indexName, String JsonStr, Map<String, Object> scriptParams) throws Exception{ //Inline Templates SearchTemplateRequest request = new SearchTemplateRequest(); request.setRequest(new SearchRequest(indexName)); request.setScriptType(ScriptType.INLINE); request.setScript(JsonStr); request.setScriptParams(scriptParams); //Synchronous Execution SearchTemplateResponse response = client.searchTemplate(request, RequestOptions.DEFAULT); //SearchTemplate Response SearchResponse searchResponse = response.getResponse(); //Retrieving SearchHits 獲取結果數據 SearchHits hits = searchResponse.getHits(); long totalHits = hits.getTotalHits(); float maxScore = hits.getMaxScore(); System.out.println("totalHits: " + totalHits); System.out.println("maxScore: " + maxScore); System.out.println("------------------------------------------"); SearchHit[] searchHits = hits.getHits(); /*for (SearchHit hit : searchHits) { // do something with the SearchHit String index = hit.getIndex(); String type = hit.getType(); String id = hit.getId(); float score = hit.getScore(); String sourceAsString = hit.getSourceAsString(); System.out.println("index: " + index); System.out.println("type: " + type); System.out.println("id: " + id); System.out.println("score: " + score); System.out.println(sourceAsString); System.out.println("------------------------------------------"); }*/ //獲得aggregations下內容 ArrayList<Location> locations = new ArrayList<>(); Aggregations aggregations = searchResponse.getAggregations(); if(aggregations!=null){ Map<String, Aggregation> aggregationMap = aggregations.getAsMap(); Terms companyAggregation = (Terms) aggregationMap.get("group_by_jing"); List<? extends Terms.Bucket> buckets = companyAggregation.getBuckets(); for(Terms.Bucket bk:buckets){ Location location = new Location(); Object key = bk.getKey(); long docCount = bk.getDocCount(); System.out.println("key: "+key.toString()); System.out.println("doc_count: "+docCount); String jingdu = key.toString().split("#split#")[0]; String substring_jing = jingdu.substring(1, jingdu.length() - 1); location.setJing(Double.parseDouble(substring_jing)); String weidu = key.toString().split("#split#")[1]; String substring_wei = weidu.substring(1, weidu.length() - 1); location.setWei(Double.parseDouble(substring_wei)); location.setCount((int)docCount); locations.add(location); } } return locations; } }
es的java api比較複雜具體參考個人另外一篇簡書ElasticSearch java API
Controller代碼:
import com.alibaba.fastjson.JSON; import com.test.flink_web_show.es_utils.RestClientUtils; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.servlet.ModelAndView; import javax.servlet.http.HttpServletResponse; import java.util.HashMap; import java.util.List; import java.util.Map; @Controller public class HomeController { @RequestMapping("/") public ModelAndView home() { ModelAndView modelAndView = new ModelAndView(); modelAndView.setViewName("index"); return modelAndView; } @RequestMapping("/get_map") public void getMap(HttpServletResponse response) throws Exception{ RestClientUtils restClientUtils = new RestClientUtils(); String searchJSON="{\n" + " \"query\": {\n" + " \"bool\": {\n" + " \"filter\": {\n" + " \"range\": {\n" + " \"{{time}}\": {\n" + " \"{{gte}}\": \"{{value1}}\", \n" + " \"{{lt}}\": \"{{now}}\"\n" + " }\n" + " }\n" + " }\n" + " }\n" + " },\n" + " \"aggs\": {\n" + " \"{{group_by_jing}}\": {\n" + " \"terms\": {\n" + " \"script\": \"{{doc['jing'].values +'#split#'+ doc['wei'].values}}\"\n" + " }\n" + " }\n" + " }\n" + "}"; Map<String, Object> map = new HashMap<>(); map.put("time","time"); map.put("gte","gte"); map.put("value1","now-20s"); map.put("lt","lt"); map.put("now","now"); map.put("group_by_jing","group_by_jing"); map.put("doc['jing'].values +'#split#'+ doc['wei'].values","doc['jing'].values +'#split#'+ doc['wei'].values"); List<Location> locations = restClientUtils.searchTemplate("location-index", searchJSON, map); restClientUtils.closeClient(); String json = JSON.toJSONString(locations); response.getWriter().print(json); } }
前臺jsp代碼:
<%-- Created by IntelliJ IDEA. User: ttc Date: 2018/7/6 Time: 14:06 To change this template use File | Settings | File Templates. --%> <%@ page contentType="text/html;charset=UTF-8" language="java" %> <!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"/> <title>高德地圖</title> <link rel="stylesheet" href="http://cache.amap.com/lbs/static/main1119.css"/> </head> <body> <script src="https://cdn.bootcss.com/echarts/4.1.0.rc2/echarts.min.js"></script> <script src="https://cdn.bootcss.com/jquery/3.3.1/jquery.min.js"></script> <script src="http://webapi.amap.com/maps?v=1.4.9&key=d16808eab90b7545923a1c2f4bb659ef"></script> <div id="container"></div> <script> //定義地圖第一次打開的中心位置 var map = new AMap.Map("container", { resizeEnable: true, center: [123.453169, 41.742567], //中心點的經緯度 zoom: 17 //初始地圖的縮放度 }); var heatmap; map.plugin(["AMap.Heatmap"],function() { //加載熱力圖插件 heatmap = new AMap.Heatmap(map,{ raduis:50, opacity:[0,0.7] }); //在地圖對象疊加熱力圖 //具體參數見接口文檔 }); //定時函數每1秒就發送一個ajax請求,去es裏面查詢數據賦值給points對象,從而更新heatmap對象來給熱力圖添加數據 setInterval(function (args) { var points =(function a(){ //<![CDATA[ var city=[]; $.ajax({ type:"POST", url:"/get_map", dataType:'json', async:false, // success:function(result){ for(var i=0;i<result.length;i++){ //alert("調用了"); city.push({"lng":result[i].wei,"lat":result[i].jing,"count":result[i].count}); } } }); return city; })();//]]> heatmap.setDataSet({data:points,max:70}); //設置熱力圖數據集 },1000) // var map = new AMap.Map('container', { // pitch:75, // 地圖俯仰角度,有效範圍 0 度- 83 度 // viewMode:'3D' // 地圖模式 //}); </script> </body> </html>
上述爲所有代碼部分
按順序啓動項目所有流程
啓動zookeeper
zkServer.sh start
啓動kafka
bin/kafka-server-start.sh config/server.properties
啓動es
sudo -u elk bin/elasticsearch
啓動filebeat
sudo -u elk ./filebeat -e -c filebeat.yml -d "publish"
啓動Python腳本生成模擬數據
python phoneData_every5second.py
啓動Flink項目,實時接收並處理數據存入到es
啓動web項目完成動態地圖展現
做者:__元昊__ 連接:https://www.jianshu.com/p/c148bf91c3ac 來源:簡書 簡書著做權歸做者全部,任何形式的轉載都請聯繫做者得到受權並註明出處。