本文須要有相關spring boot 或spring cloud 相關微服務框架的基礎,若是您具有相關基礎能夠很容易的實現下述過程!!!!!!!java
但願本文的所說對須要的您有所幫助mysql
從這裏咱們開始進入閒聊階段。git
你們都知道 spring boot整合了不少不少的第三方框架,咱們這裏就簡單討論和使用 性能監控和JVM監控相關的東西。其餘的本文不討論雖然有些關聯,因此開篇有說須要有相關spring boot框架基礎說了這麼多廢話,下面真正進入主題。github
這裏首先給你們看下總體的數據流程圖,其中兩條主線一條是接口或方法性能監控數據收集,還有一條是spring boot 微服務JVM相關指標數據採集,最後都彙總到InfluxDB時序數據庫中在用數據展現工具Grafara進行數據展現或報警。web
〇、基礎服務spring
基礎服務比較多,其中包括RabbitMQ,Eureka註冊中心,influxDB,Grafara(不知道這些東西 請百度或谷歌一下了解相關知識),下面簡單說下各基礎服務的功能:sql
RabbitMQ 一款很流行的消息中間件,主要用它來收集spring boot應用監控性能相關信息,爲何是RabbitMQ而不是什麼別的 kafka等等,由於測試方便性能也夠用,spring boot整合的夠完善。docker
Eureka 註冊中心,通常看過或用過spring cloud相關框架的都知道spring cloud註冊中心主要推薦使用Eureka!至於爲何不作過多討論不是本文主要討論的關注點。本文主要用來同步和獲取註冊到註冊中心的應用的相關信息。數據庫
InfluxDB和Grafara爲何選這兩個,其餘方案如 ElasticSearch 、Logstash 、Kibana,ELK的組合等!緣由很顯然 influxDB是時序數據庫數據的壓縮比率比其餘(ElasticSearch )好的不少(固然本人沒有實際測試過都是看一些文檔)。同時InfluxDB使用SQL很是相似mysql等關係型數據庫入門方便,Grafara工具可預警。等等!!!!!!!!!!!json
好了工具就簡單介紹到這裏,至於這些工具怎麼部署搭建請搭建先自行找資料學習,仍是由於不是本文重點介紹的內容,不深刻討論。若是有docker相關基礎的童鞋能夠直接下載個鏡像啓動起來作測試使用(本人就是使用docker啓動的上面的基礎應用(Eureka除外))
1、被監控的應用
這裏很少說被監控應用確定是spring boot項目可是要引用一下相關包和相關注解以及修改相關配置文件
包引用,這些包是必須引用的
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-sleuth-zipkin-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-hystrix</artifactId> </dependency>
簡單說下呢相關包的功能spring-cloud-starter-netflix-eureka-client用於註冊中心使用的包,spring-cloud-starter-stream-rabbit 發送RabbitMQ相關包,spring-boot-starter-actuator發佈監控相關rest接口包,
spring-cloud-starter-hystrix熔斷性能監控相關包。
相關注解
@EnableHystrix//開啓性能監控 @RefreshScope//刷新配置文件 與本章無關 @EnableAutoConfiguration @EnableFeignClients//RPC調用與本章無關 @RestController @SpringBootApplication public class ServerTestApplication { protected final static Logger logger = LoggerFactory.getLogger(ServerTestApplication.class); public static void main(String[] args) { SpringApplication.run(ServerTestApplication.class, args); } }
配置文件相關
hystrix.command.default.execution.isolation.thread.timeoutInMilliseconds: 60000
hystrix.threadpool.default.coreSize: 100
spring:
application:
name: spring-cloud-server2-test
rabbitmq:
host: 10.10.12.21
port: 5672
username: user
password: password
encrypt:
failOnError: false
server:
port: 8081
eureka:
instance:
appname: spring-cloud-server2-test
prefer-ip-address: true
client:
serviceUrl:
defaultZone: http://IP:PORT/eureka/#註冊中心地址
eureka-server-total-connections-per-host: 500
endpoints:
refresh:
sensitive: false
metrics:
sensitive: false
dump:
sensitive: false
auditevents:
sensitive: false
features:
sensitive: false
mappings:
sensitive: false
trace:
sensitive: false
autoconfig:
sensitive: false
loggers:
sensitive: false
簡單解釋一下endpoints下面相關配置,主要就是 原來這些路徑是須要受權訪問的,經過配置讓這些路徑接口再也不是敏感的須要受權訪問的接口這應咱們就能夠輕鬆的訪問註冊到註冊中心的每一個服務的響應的接口。這裏插一句接口性能須要在方法上面加上以下相似相關注解,而後纔會有相關性能數據輸出
@Value("${name}") private String name; @HystrixCommand(commandProperties = { @HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds", value = "20000") }, threadPoolProperties = { @HystrixProperty(name = "coreSize", value = "64") }, threadPoolKey = "test1") @GetMapping("/testpro1") public String getStringtest1(){ return name; }
好了到這裏你的應用基本上就具有相關性能輸出的能力了。你能夠訪問
若是是上圖的接口 你的應用基本OK,爲何是基本由於你截圖沒有體現性能信息發送RabbitMQ的相關信息。這個須要看日誌,加入你失敗了評論區在討論。咱們先關注主線。
好的spring boot 應用就先說道這裏。開始下一主題
2、性能指標數據採集
剛纔訪問http://IP:port/hystrix.stream這個顯示出來的信息就是藉口或方法性能相關信息的輸出,若是上面都沒有問題的話數據應該發送到了RabbitMQ上面了咱們直接去RabbitMQ上面接收相關數據就能夠了。
性能指標數據的採集服務主要應用如下包
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <!-- https://mvnrepository.com/artifact/com.github.miwurster/spring-data-influxdb --> <dependency> <groupId>org.influxdb</groupId> <artifactId>influxdb-java</artifactId> <version>2.8</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-autoconfigure</artifactId> </dependency>
直接貼代碼
package application; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; /** * * @author zyg * */ @SpringBootApplication public class RabbitMQApplication { public static void main(String[] args) { SpringApplication.run(RabbitMQApplication.class, args); } }
package application; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * * @author zyg * */ @Configuration public class RabbitMQConfig { public final static String QUEUE_NAME = "spring-boot-queue"; public final static String EXCHANGE_NAME = "springCloudHystrixStream"; public final static String ROUTING_KEY = "#"; // 建立隊列 @Bean public Queue queue() { return new Queue(QUEUE_NAME); } // 建立一個 topic 類型的交換器 @Bean public TopicExchange exchange() { return new TopicExchange(EXCHANGE_NAME); } // 使用路由鍵(routingKey)把隊列(Queue)綁定到交換器(Exchange) @Bean public Binding binding(Queue queue, TopicExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY); } @Bean public ConnectionFactory connectionFactory() { //rabbitmq IP 端口號 CachingConnectionFactory connectionFactory = new CachingConnectionFactory("IP", 5672); connectionFactory.setUsername("user"); connectionFactory.setPassword("password"); return connectionFactory; } @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { return new RabbitTemplate(connectionFactory); } }
package application; import java.util.Map; import java.util.concurrent.TimeUnit; import org.influxdb.InfluxDB; import org.influxdb.InfluxDBFactory; import org.influxdb.dto.Point; import org.influxdb.dto.Point.Builder; import org.influxdb.dto.Query; import org.influxdb.dto.QueryResult; /** * * @author zyg * */ public class InfluxDBConnect { private String username;// 用戶名 private String password;// 密碼 private String openurl;// 鏈接地址 private String database;// 數據庫 private InfluxDB influxDB; public InfluxDBConnect(String username, String password, String openurl, String database) { this.username = username; this.password = password; this.openurl = openurl; this.database = database; } /** 鏈接時序數據庫;得到InfluxDB **/ public InfluxDB influxDbBuild() { if (influxDB == null) { influxDB = InfluxDBFactory.connect(openurl, username, password); influxDB.createDatabase(database); } return influxDB; } /** * 設置數據保存策略 defalut 策略名 /database 數據庫名/ 30d 數據保存時限30天/ 1 副本個數爲1/ 結尾DEFAULT * 表示 設爲默認的策略 */ public void createRetentionPolicy() { String command = String.format("CREATE RETENTION POLICY \"%s\" ON \"%s\" DURATION %s REPLICATION %s DEFAULT", "defalut", database, "30d", 1); this.query(command); } /** * 查詢 * * @param command * 查詢語句 * @return */ public QueryResult query(String command) { return influxDB.query(new Query(command, database)); } /** * 插入 * * @param measurement * 表 * @param tags * 標籤 * @param fields * 字段 */ public void insert(String measurement, Map<String, String> tags, Map<String, Object> fields) { Builder builder = Point.measurement(measurement); builder.time(((long)fields.get("currentTime"))*1000000, TimeUnit.NANOSECONDS); builder.tag(tags); builder.fields(fields); // influxDB.write(database, "", builder.build()); } /** * 刪除 * * @param command * 刪除語句 * @return 返回錯誤信息 */ public String deleteMeasurementData(String command) { QueryResult result = influxDB.query(new Query(command, database)); return result.getError(); } /** * 建立數據庫 * * @param dbName */ public void createDB(String dbName) { influxDB.createDatabase(dbName); } /** * 刪除數據庫 * * @param dbName */ public void deleteDB(String dbName) { influxDB.deleteDatabase(dbName); } public String getUsername() { return username; } public void setUsername(String username) { this.username = username; } public String getPassword() { return password; } public void setPassword(String password) { this.password = password; } public String getOpenurl() { return openurl; } public void setOpenurl(String openurl) { this.openurl = openurl; } public void setDatabase(String database) { this.database = database; } }
package application; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * * @author zyg * */ @Configuration public class InfluxDBConfiguration { private String username = "admin";//用戶名 private String password = "admin";//密碼 private String openurl = "http://IP:8086";//InfluxDB鏈接地址 private String database = "test_db";//數據庫 @Bean public InfluxDBConnect getInfluxDBConnect(){ InfluxDBConnect influxDB = new InfluxDBConnect(username, password, openurl, database); influxDB.influxDbBuild(); influxDB.createRetentionPolicy(); return influxDB; } }
package application; import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; import com.fasterxml.jackson.databind.ObjectMapper; /** * * @author zyg * */ @Component public class Consumer { protected final static Logger logger = LoggerFactory.getLogger(Consumer.class); private ObjectMapper objectMapper = new ObjectMapper(); @Autowired private InfluxDBConnect influxDB; @RabbitListener(queues = RabbitMQConfig.QUEUE_NAME) public void sendToSubject(org.springframework.amqp.core.Message message) { String payload = new String(message.getBody()); logger.info(payload); if (payload.startsWith("\"")) { // Legacy payload from an Angel client payload = payload.substring(1, payload.length() - 1); payload = payload.replace("\\\"", "\""); } try { if (payload.startsWith("[")) { @SuppressWarnings("unchecked") List<Map<String, Object>> list = this.objectMapper.readValue(payload, List.class); for (Map<String, Object> map : list) { sendMap(map); } } else { @SuppressWarnings("unchecked") Map<String, Object> map = this.objectMapper.readValue(payload, Map.class); sendMap(map); } } catch (IOException ex) { logger.error("Error receiving hystrix stream payload: " + payload, ex); } } private void sendMap(Map<String, Object> map) { Map<String, Object> data = getPayloadData(map); data.remove("latencyExecute"); data.remove("latencyTotal"); Map<String, String> tags = new HashMap<String, String>(); tags.put("type", data.get("type").toString()); tags.put("name", data.get("name").toString()); tags.put("instanceId", data.get("instanceId").toString()); //tags.put("group", data.get("group").toString()); influxDB.insert("testaaa", tags, data); // for (String key : data.keySet()) { // logger.info("{}:{}",key,data.get(key)); // } } public static Map<String, Object> getPayloadData(Map<String, Object> jsonMap) { @SuppressWarnings("unchecked") Map<String, Object> origin = (Map<String, Object>) jsonMap.get("origin"); String instanceId = null; if (origin.containsKey("id")) { instanceId = origin.get("host") + ":" + origin.get("id").toString(); } if (!StringUtils.hasText(instanceId)) { // TODO: instanceid template instanceId = origin.get("serviceId") + ":" + origin.get("host") + ":" + origin.get("port"); } @SuppressWarnings("unchecked") Map<String, Object> data = (Map<String, Object>) jsonMap.get("data"); data.put("instanceId", instanceId); return data; } }
這裏很少說,就是接收RabbitMQ信息而後保存到InfluxDB數據庫中。
3、JVM相關數據採集
JVM相關數據採集很是簡單主要思想就是定時輪訓被監控服務的接口地址而後把返回信息插入到InfluxDB中
服務引用的包很少說這個服務是須要註冊到註冊中心Eureka中的由於須要獲取全部服務的監控信息。
插入InfluxDB代碼和上面基本相似只不過多了一個批量插入方法
package com.zjs.collection; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.netflix.eureka.EnableEurekaClient; /** * * @author zyg * */ @EnableEurekaClient @SpringBootApplication public class ApplictionCollection { public static void main(String[] args) { SpringApplication.run(ApplictionCollection.class, args); } }
/** * 批量插入 * * @param measurement * 表 * @param tags * 標籤 * @param fields * 字段 */ public void batchinsert(String measurement, Map<String, String> tags, List<Map<String, Object>> fieldslist) { org.influxdb.dto.BatchPoints.Builder batchbuilder=BatchPoints.database(database); for (Map<String, Object> map : fieldslist) { Builder builder = Point.measurement(measurement); tags.put("instanceId", map.get("instanceId").toString()); builder.time((long)map.get("currentTime"), TimeUnit.NANOSECONDS); builder.tag(tags); builder.fields(map); batchbuilder.point(builder.build()); } System.out.println(batchbuilder.build().toString()); influxDB.write(batchbuilder.build()); }
package com.zjs.collection; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.client.ServiceInstance; import org.springframework.cloud.client.discovery.DiscoveryClient; import org.springframework.context.annotation.Bean; import org.springframework.http.client.SimpleClientHttpRequestFactory; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.springframework.web.client.RestTemplate; /** * 獲取微服務實例 * * @author zyg * */ @Component @SpringBootApplication @EnableScheduling public class MicServerInstanceInfoHandle { protected final static Logger logger = LoggerFactory.getLogger(MicServerInstanceInfoHandle.class); final String pathtail = "/metrics/mem.*|heap.*|threads.*|gc.*|nonheap.*|classes.*"; Map<String, String> tags; ThreadPoolExecutor threadpool; @Autowired DiscoveryClient dc; @Autowired RestTemplate restTemplate; final static LinkedBlockingQueue<Map<String, Object>> jsonMetrics = new LinkedBlockingQueue<>(1000); /** * 初始化實例 能夠吧相關參數設置到配置文件 */ public MicServerInstanceInfoHandle() { tags = new HashMap<String, String>(); threadpool = new ThreadPoolExecutor(4, 20, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100)); } @Autowired private InfluxDBConnect influxDB; /** * metrics數據獲取 */ @Scheduled(fixedDelay = 2000) public void metricsDataObtain() { logger.info("開始獲取metrics數據"); List<String> servicelist = dc.getServices(); for (String str : servicelist) { List<ServiceInstance> silist = dc.getInstances(str); for (ServiceInstance serviceInstance : silist) { threadpool.execute(new MetricsHandle(serviceInstance)); } } } /** * 將數據插入到influxdb數據庫 */ @Scheduled(fixedDelay = 5000) public void metricsDataToInfluxDB() { logger.info("開始批量將metrics數據insert-influxdb"); ArrayList<Map<String, Object>> metricslist = new ArrayList<>(); MicServerInstanceInfoHandle.jsonMetrics.drainTo(metricslist); if (!metricslist.isEmpty()) { logger.info("批量插入條數:{}", metricslist.size()); influxDB.batchinsert("metrics", tags, metricslist); } logger.info("結束批量metrics數據insert"); } @Bean public RestTemplate getRestTemplate() { RestTemplate restTemplate = new RestTemplate(); SimpleClientHttpRequestFactory achrf = new SimpleClientHttpRequestFactory(); achrf.setConnectTimeout(10000); achrf.setReadTimeout(10000); restTemplate.setRequestFactory(achrf); return restTemplate; } class MetricsHandle extends Thread { private ServiceInstance serviceInstanc; public MetricsHandle(ServiceInstance serviceInstance){ serviceInstanc=serviceInstance; } @Override public void run() { try { logger.info("獲取 {}:{}:{} 應用metrics數據",serviceInstanc.getServiceId(),serviceInstanc.getHost(),serviceInstanc.getPort()); @SuppressWarnings("unchecked") Map<String, Object> mapdata = restTemplate .getForObject(serviceInstanc.getUri().toString() + pathtail, Map.class); mapdata.put("instanceId", serviceInstanc.getServiceId() + ":" + serviceInstanc.getHost() + ":" + serviceInstanc.getPort()); mapdata.put("type", "metrics"); mapdata.put("currentTime", System.currentTimeMillis() * 1000000); MicServerInstanceInfoHandle.jsonMetrics.add(mapdata); } catch (Exception e) { logger.error("instanceId:{},host:{},port:{},path:{},exception:{}", serviceInstanc.getServiceId(), serviceInstanc.getHost(), serviceInstanc.getPort(), serviceInstanc.getUri(), e.getMessage()); } } } }
這裏簡單解釋一下這句代碼 final String pathtail = "/metrics/mem.*|heap.*|threads.*|gc.*|nonheap.*|classes.*"; ,metrics這個路徑下的信息不少可是咱們不是都須要因此咱們須要有選擇的獲取這樣節省流量和時間。上面關鍵類MicServerInstanceInfoHandle作了一個多線程訪問主要應對註冊中心有成百上千個服務的時候單線程可能輪序不過來,同時作了一個隊列緩衝,批量插入到InfluxDB。
4、結果展現
若是你數據採集成功了就能夠繪製出來上面的圖形下面是對應的sql
SELECT mean("rollingCountFallbackSuccess"), mean("rollingCountSuccess") FROM "testaaa" WHERE ("instanceId" = 'IP:spring-cloud-server1-test:8082' AND "type" = 'HystrixCommand') AND $timeFilter GROUP BY time($__interval) fill(null) SELECT mean("currentPoolSize") FROM "testaaa" WHERE ("type" = 'HystrixThreadPool' AND "instanceId" = '10.10.12.51:spring-cloud-server1-test:8082') AND $timeFilter GROUP BY time($__interval) fill(null)
SELECT "heap", "heap.committed", "heap.used", "mem", "mem.free", "nonheap", "nonheap.committed", "nonheap.used" FROM "metrics" WHERE ("instanceId" = 'SPRING-CLOUD-SERVER1-TEST:10.10.12.51:8082') AND $timeFilter
好了到這裏就基本結束了。
5、優化及設想
上面的基礎服務確定都是須要高可用的,毋庸置疑都是須要學習的。若是有時間我也會向你們一一介紹,你們亦能夠去搜索相關資料查看!
可能有人問有一個叫telegraf的小插件直接就能收集相關數據進行聚合結果監控,
其實我以前也是使用的telegraf這個小工具可是發現一個問題,
就是每次被監控的應用重啓的時候相關字段名就會變,
由於他採集使用的是類實例的名字做爲字段名,這應咱們會很不方便,每次重啓應用咱們都要從新設置sql語句這樣很是不友好,
再次感受收集數據編碼難度不大因此本身就寫了收集數據的代碼!若是有哪位大神對telegraf比較瞭解能夠解決上面我說的問題記得給我留言哦!在這裏先感謝!
有些地方是須要優化的,好比一些IP端口什麼的都是能夠放到配置文件裏面的。
還有一種想法就是我可不能夠像收集性能信息同樣直接應用來收集JVM信息讓JVM相關信息直接發送到MQ當中而後再插入InfluxDB中
6、總結
從spring boot到如今短短的二、3年時間就迅速變得火爆,知識體系也變得完善,開發成本愈來愈低,
因此普及程度就愈來愈高,微服務雖然很好可是咱們也要很好的善於運用,監控就是重要的一環,
試想一下你的機房運行着成千上萬的服務,穩定運行和及時發現有問題的服務是多麼重要的一件事情!
但願以上對你們有所幫助