使用 grafana+prometheus+jmx 做爲普通的監控手段,是比較有用的。我以前的文章介紹了相應的實現辦法。java
可是,按照以前的實現,咱們更多的只能是監控 單值型的數據,如請求量,tps 等等,對於複雜組合型的指標卻不容易監控。node
這種狀況通常帶有必定的業務屬性,好比想監控mq中的每一個topic的消費狀況,每類產品的實時訂單狀況等等。固然,對於看過完整的 prometheus 的監控數據的同窗來講,會以爲很正常,由於你會看到以下的數據:web
# HELP java_lang_MemoryPool_PeakUsage_max java.lang.management.MemoryUsage (java.lang<type=MemoryPool, name=Metaspace><PeakUsage>max) # TYPE java_lang_MemoryPool_PeakUsage_max untyped java_lang_MemoryPool_PeakUsage_max{name="Metaspace",} -1.0 java_lang_MemoryPool_PeakUsage_max{name="PS Old Gen",} 1.415053312E9 java_lang_MemoryPool_PeakUsage_max{name="PS Eden Space",} 6.96778752E8 java_lang_MemoryPool_PeakUsage_max{name="Code Cache",} 2.5165824E8 java_lang_MemoryPool_PeakUsage_max{name="Compressed Class Space",} 1.073741824E9 java_lang_MemoryPool_PeakUsage_max{name="PS Survivor Space",} 5242880.0
這裏面的 name 就是普通標籤嘛,同理於其餘埋點咯。應該是能夠實現的。安全
是的,prometheus 是方便實現這玩意的,可是咱們以前不是使用 jmx_exportor 做爲導出工具嘛,使用的埋點組件是 io.dropwizard.metrics:metrics-core 。app
而它則是重在單值的監控,因此,用它咱們是實現不了帶指標的數據的監控了。框架
那怎麼辦呢?三個辦法!運維
1. 直接替換原有的 metrics-core 組件爲 prometheus 的client 組件,由於官方是支持這種操做的;
2. 使用 prometheus-client 組件與 metrics-core 組件配合,各自使用各自的功能;
3. 自行實現帶標籤的埋點,這多是基於 MBean 的;dom
以上這幾種方案,各有優劣。方案1可能改動太大,並且可能功能不兼容不可行; 方案2可能存在整合不了或者功能衝突狀況,固然若是能整合,絕對是最好的; 方案3實現複雜度就高了,好比監控值維護、線程安全、MBean數據吐出方式等等。curl
好吧,無論怎麼樣,咱們仍是都看看吧。jvm
1. 引入 pom 依賴
<dependency> <groupId>io.prometheus</groupId> <artifactId>simpleclient</artifactId> <version>0.8.0</version> </dependency> <dependency> <groupId>io.prometheus</groupId> <artifactId>simpleclient_hotspot</artifactId> <version>0.8.0</version> </dependency> <dependency> <groupId>io.prometheus</groupId> <artifactId>simpleclient_servlet</artifactId> <version>0.8.0</version> </dependency>
2. 框架註冊監控
@Configuration public class PrometheusConfig { @Bean public ServletRegistrationBean servletRegistrationBean(){ // 將埋點指標吐出到 /metrics 節點 return new ServletRegistrationBean(new MetricsServlet(), "/metrics"); } }
3. 業務埋點數據
// 註冊指標實例 io.prometheus.client.Counter c = io.prometheus.client.Counter.build() .name("jmx_test_abc_ffff") .labelNames("topic") .help("topic counter usage.") .register(); public void incTopicMetric(String topic) { // c.labels("test").inc(); // for test }
4. 獲取埋點數據信息
curl http://localhost:8080/metrics # 對外暴露http接口調用,結果以下 # HELP jmx_test_abc_ffff counter usage. # TYPE jmx_test_abc_ffff counter jmx_test_abc_ffff{topic="bbb",} 1.0 jmx_test_abc_ffff{topic="2",} 2.0 jmx_test_abc_ffff{topic="test",} 1.0
能夠看出,效果我們是實現了。可是,對於已經運行的東西,要改這玩意可能不是那麼友好。主要有如下幾點:
1. 暴露數據方式變動,原來由javaagent進行統一處理的數據,如今可能因爲應用端口的不一,致使收集的配置會變動,不必定符合運維場景;
2. 須要將原來的埋點進行替換;
不處理之前的監控,將新監控帶標籤數據吐入到 jmx_exportor 中。
咱們試着使用如上的埋點方式:
// 註冊指標實例 io.prometheus.client.Counter c = io.prometheus.client.Counter.build() .name("jmx_test_abc_ffff") .labelNames("topic") .help("topic counter usage.") .register(); public void incTopicMetric(String topic) { // c.labels("test").inc(); // for test }
好像數據是不會進入的到 jmx_exportor 的。這也不奇怪,畢竟我們也不瞭解其原理,難道想靠運氣取勝??
細去查看 metrics-core 組件的埋點實現方案,發現其是向 MBean 中吐入數據,從而被 jmx_exportor 抓取的。
// com.codahale.metrics.jmx.JmxReporter.JmxListener#onCounterAdded @Override public void onCounterAdded(String name, Counter counter) { try { if (filter.matches(name, counter)) { final ObjectName objectName = createName("counters", name); registerMBean(new JmxCounter(counter, objectName), objectName); } } catch (InstanceAlreadyExistsException e) { LOGGER.debug("Unable to register counter", e); } catch (JMException e) { LOGGER.warn("Unable to register counter", e); } } // 向 mBeanServer 註冊監控實例 // 默認狀況下 mBeanServer = ManagementFactory.getPlatformMBeanServer(); private void registerMBean(Object mBean, ObjectName objectName) throws InstanceAlreadyExistsException, JMException { ObjectInstance objectInstance = mBeanServer.registerMBean(mBean, objectName); if (objectInstance != null) { // the websphere mbeanserver rewrites the objectname to include // cell, node & server info // make sure we capture the new objectName for unregistration registered.put(objectName, objectInstance.getObjectName()); } else { registered.put(objectName, objectName); } }
而 prometheus-client 則是經過 CollectorRegistry.defaultRegistry 進行註冊實例的。
// io.prometheus.client.SimpleCollector.Builder#register() /** * Create and register the Collector with the default registry. */ public C register() { return register(CollectorRegistry.defaultRegistry); } /** * Create and register the Collector with the given registry. */ public C register(CollectorRegistry registry) { C sc = create(); registry.register(sc); return sc; }
因此,好像原理上來說是不一樣的。至於到底爲何不能監控到數據,那還很差說。至少,你能夠學習 metrics-core 使用 MBean 的形式將數據導出。這是咱們下一個方案要討論的事。
這裏我能夠給到一個最終簡單又不失巧合的方式,實現兩個監控組件的兼容,同時向 jmx_exportor 進行導出。以下:
1. 引入 javaagent 依賴包
<!-- javaagent 包,與 外部使用的 jmx_exportor 一致 --> <dependency> <groupId>io.prometheus.jmx</groupId> <artifactId>jmx_prometheus_javaagent</artifactId> <version>0.12.0</version> </dependency>
2. 使用 agent 的工具類進行埋點
由於 javaagent 裏面提供一套完整的 client 工具包,因此,咱們可使用。
// 註冊指標實例 // 將 io.prometheus.client.Counter 包替換爲 io.prometheus.jmx.shaded.io.prometheus.client.Counter io.prometheus.client.Counter c = io.prometheus.client.Counter.build() .name("jmx_test_abc_ffff") .labelNames("topic") .help("topic counter usage.") .register(); public void incTopicMetric(String topic) { // c.labels("test").inc(); // for test }
3. 原樣使用 jmx_exportor 就能夠導出監控數據了
爲何換一個包這樣就能夠了?
由於 jmx_exportor 也是經過註冊 CollectorRegistry.defaultRegistry 來進行收集數據的,咱們只要保持與其實例一致,就能夠作到在同一個jvm內共享數據了。
// 測試類 public class PrometheusMbeanMetricsMain { private static ConcurrentHashMap<String, AtomicInteger> topicContainer = new ConcurrentHashMap<>(); private static MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); public static void main(String[] args) throws Exception { // 模擬某個topic String commingTopic = "test_topic"; AtomicInteger myTopic1Counter = getMetricCounter(commingTopic); System.out.println("jmx started!"); while(true){ System.out.println("---"); // 計數增長 myTopic1Counter.incrementAndGet(); Thread.sleep(10000); } } private static AtomicInteger getMetricCounter(String topic) throws MalformedObjectNameException, NotCompliantMBeanException, InstanceAlreadyExistsException, MBeanRegistrationException { AtomicInteger myTopic1Counter = topicContainer.get(topic); if(myTopic1Counter == null) { myTopic1Counter = new AtomicInteger(0); Hashtable<String, String> tab = new Hashtable<>(); tab.put("topic", topic); // 佔位符,雖然不知道什麼意思,可是感受很厲害的樣子 tab.put("_", "_value"); ObjectName objectName = new ObjectName("mydomain_test", tab); // 註冊監控實例 到 MBeanServer 中 ObjectInstance objectInstance = mBeanServer.registerMBean(new JmxCounter(myTopic1Counter, objectName), objectName); } return myTopic1Counter; } } // JmxCounter, MBean 要求: 1. 接口必須定義成Public的; 2. 接口命名規範符合要求, 即接口名叫 XYZMBean ,那麼實現名就必須必定是XYZ; // DynamicMBean public interface JmxCounterMBean { public Object getCount() throws Exception; } public class JmxCounter implements JmxCounterMBean { private AtomicInteger metric; private ObjectName objectName; public JmxCounter(AtomicInteger metric, ObjectName objectName) { this.objectName = objectName; this.metric = metric; } @Override public Object getCount() throws Exception { // 返回監控結果 return metric.get(); } }
最後,見證奇蹟的時刻。結果以下:
# HELP mydomain_test_value_Count Attribute exposed for management (mydomain_test<_=_value, topic=b_topic><>Count) # TYPE mydomain_test_value_Count untyped mydomain_test_value_Count{topic="b_topic",} 1.0 mydomain_test_value_Count{topic="a_topic",} 88.0
很明顯,這是一個糟糕的實現,不要學他。僅爲了演示效果。
因此,總結下來,天然是使用方案2了。兩個組件兼容,實現簡單,性能也不錯。若是隻是爲了使用,到此就能夠了。不過你得明白,以上方案有取巧的成分在。
jmx_exportor 也是能夠經過 http_server 暴露數據。
// io.prometheus.client.exporter.HTTPServer /** * Start a HTTP server serving Prometheus metrics from the given registry. */ public HTTPServer(InetSocketAddress addr, CollectorRegistry registry, boolean daemon) throws IOException { server = HttpServer.create(); server.bind(addr, 3); // 使用 HTTPMetricHandler 處理請求 HttpHandler mHandler = new HTTPMetricHandler(registry); // 綁定到 /metrics 地址上 server.createContext("/", mHandler); server.createContext("/metrics", mHandler); executorService = Executors.newFixedThreadPool(5, DaemonThreadFactory.defaultThreadFactory(daemon)); server.setExecutor(executorService); start(daemon); } /** * Start a HTTP server by making sure that its background thread inherit proper daemon flag. */ private void start(boolean daemon) { if (daemon == Thread.currentThread().isDaemon()) { server.start(); } else { FutureTask<Void> startTask = new FutureTask<Void>(new Runnable() { @Override public void run() { server.start(); } }, null); DaemonThreadFactory.defaultThreadFactory(daemon).newThread(startTask).start(); try { startTask.get(); } catch (ExecutionException e) { throw new RuntimeException("Unexpected exception on starting HTTPSever", e); } catch (InterruptedException e) { // This is possible only if the current tread has been interrupted, // but in real use cases this should not happen. // In any case, there is nothing to do, except to propagate interrupted flag. Thread.currentThread().interrupt(); } } }
因此,能夠主要邏輯是 HTTPMetricHandler 處理。來看看。
// io.prometheus.client.exporter.HTTPServer.HTTPMetricHandler#handle public void handle(HttpExchange t) throws IOException { String query = t.getRequestURI().getRawQuery(); ByteArrayOutputStream response = this.response.get(); response.reset(); OutputStreamWriter osw = new OutputStreamWriter(response); // 主要由該 TextFormat 進行格式化輸出 // registry.filteredMetricFamilySamples() 進行數據收集 TextFormat.write004(osw, registry.filteredMetricFamilySamples(parseQuery(query))); osw.flush(); osw.close(); response.flush(); response.close(); t.getResponseHeaders().set("Content-Type", TextFormat.CONTENT_TYPE_004); if (shouldUseCompression(t)) { t.getResponseHeaders().set("Content-Encoding", "gzip"); t.sendResponseHeaders(HttpURLConnection.HTTP_OK, 0); final GZIPOutputStream os = new GZIPOutputStream(t.getResponseBody()); response.writeTo(os); os.close(); } else { t.getResponseHeaders().set("Content-Length", String.valueOf(response.size())); t.sendResponseHeaders(HttpURLConnection.HTTP_OK, response.size()); // 寫向客戶端 response.writeTo(t.getResponseBody()); } t.close(); } }
jmx_exportor 有一個 JmxScraper, 專門用於處理 MBean 的值。
// io.prometheus.jmx.JmxScraper#doScrape /** * Get a list of mbeans on host_port and scrape their values. * * Values are passed to the receiver in a single thread. */ public void doScrape() throws Exception { MBeanServerConnection beanConn; JMXConnector jmxc = null; // 默認直接獲取本地的 jmx 信息 // 便是經過共享 ManagementFactory.getPlatformMBeanServer() 變量來實現通訊的 if (jmxUrl.isEmpty()) { beanConn = ManagementFactory.getPlatformMBeanServer(); } else { Map<String, Object> environment = new HashMap<String, Object>(); if (username != null && username.length() != 0 && password != null && password.length() != 0) { String[] credent = new String[] {username, password}; environment.put(javax.management.remote.JMXConnector.CREDENTIALS, credent); } if (ssl) { environment.put(Context.SECURITY_PROTOCOL, "ssl"); SslRMIClientSocketFactory clientSocketFactory = new SslRMIClientSocketFactory(); environment.put(RMIConnectorServer.RMI_CLIENT_SOCKET_FACTORY_ATTRIBUTE, clientSocketFactory); environment.put("com.sun.jndi.rmi.factory.socket", clientSocketFactory); } // 若是是遠程獲取,則會經過 rmi 進行遠程通訊獲取 jmxc = JMXConnectorFactory.connect(new JMXServiceURL(jmxUrl), environment); beanConn = jmxc.getMBeanServerConnection(); } try { // Query MBean names, see #89 for reasons queryMBeans() is used instead of queryNames() Set<ObjectName> mBeanNames = new HashSet<ObjectName>(); for (ObjectName name : whitelistObjectNames) { for (ObjectInstance instance : beanConn.queryMBeans(name, null)) { mBeanNames.add(instance.getObjectName()); } } for (ObjectName name : blacklistObjectNames) { for (ObjectInstance instance : beanConn.queryMBeans(name, null)) { mBeanNames.remove(instance.getObjectName()); } } // Now that we have *only* the whitelisted mBeans, remove any old ones from the cache: jmxMBeanPropertyCache.onlyKeepMBeans(mBeanNames); for (ObjectName objectName : mBeanNames) { long start = System.nanoTime(); scrapeBean(beanConn, objectName); logger.fine("TIME: " + (System.nanoTime() - start) + " ns for " + objectName.toString()); } } finally { if (jmxc != null) { jmxc.close(); } } } // io.prometheus.jmx.JmxScraper#scrapeBean private void scrapeBean(MBeanServerConnection beanConn, ObjectName mbeanName) { MBeanInfo info; try { info = beanConn.getMBeanInfo(mbeanName); } catch (IOException e) { logScrape(mbeanName.toString(), "getMBeanInfo Fail: " + e); return; } catch (JMException e) { logScrape(mbeanName.toString(), "getMBeanInfo Fail: " + e); return; } MBeanAttributeInfo[] attrInfos = info.getAttributes(); Map<String, MBeanAttributeInfo> name2AttrInfo = new LinkedHashMap<String, MBeanAttributeInfo>(); for (int idx = 0; idx < attrInfos.length; ++idx) { MBeanAttributeInfo attr = attrInfos[idx]; if (!attr.isReadable()) { logScrape(mbeanName, attr, "not readable"); continue; } name2AttrInfo.put(attr.getName(), attr); } final AttributeList attributes; try { // 經過 MBean 調用對象,獲取全部屬性值,略去不說 attributes = beanConn.getAttributes(mbeanName, name2AttrInfo.keySet().toArray(new String[0])); } catch (Exception e) { logScrape(mbeanName, name2AttrInfo.keySet(), "Fail: " + e); return; } for (Attribute attribute : attributes.asList()) { MBeanAttributeInfo attr = name2AttrInfo.get(attribute.getName()); logScrape(mbeanName, attr, "process"); // 處理單個key的屬性值, 如 topic=aaa,ip=1 將會進行再次循環處理 processBeanValue( mbeanName.getDomain(), // 獲取有效的屬性列表, 咱們能夠簡單看一下過濾規則, 以下文 jmxMBeanPropertyCache.getKeyPropertyList(mbeanName), new LinkedList<String>(), attr.getName(), attr.getType(), attr.getDescription(), attribute.getValue() ); } } // 處理每一個 mBean 的屬性,寫入到 receiver 中 // io.prometheus.jmx.JmxScraper#processBeanValue /** * Recursive function for exporting the values of an mBean. * JMX is a very open technology, without any prescribed way of declaring mBeans * so this function tries to do a best-effort pass of getting the values/names * out in a way it can be processed elsewhere easily. */ private void processBeanValue( String domain, LinkedHashMap<String, String> beanProperties, LinkedList<String> attrKeys, String attrName, String attrType, String attrDescription, Object value) { if (value == null) { logScrape(domain + beanProperties + attrName, "null"); } // 單值狀況,數字型,字符串型,能夠處理 else if (value instanceof Number || value instanceof String || value instanceof Boolean) { logScrape(domain + beanProperties + attrName, value.toString()); // 解析出的數據存入 receiver 中,能夠是 jmx, 或者 控制檯 this.receiver.recordBean( domain, beanProperties, attrKeys, attrName, attrType, attrDescription, value); } // 多值型狀況 else if (value instanceof CompositeData) { logScrape(domain + beanProperties + attrName, "compositedata"); CompositeData composite = (CompositeData) value; CompositeType type = composite.getCompositeType(); attrKeys = new LinkedList<String>(attrKeys); attrKeys.add(attrName); for(String key : type.keySet()) { String typ = type.getType(key).getTypeName(); Object valu = composite.get(key); processBeanValue( domain, beanProperties, attrKeys, key, typ, type.getDescription(), valu); } } // 更復雜型對象 else if (value instanceof TabularData) { // I don't pretend to have a good understanding of TabularData. // The real world usage doesn't appear to match how they were // meant to be used according to the docs. I've only seen them // used as 'key' 'value' pairs even when 'value' is itself a // CompositeData of multiple values. logScrape(domain + beanProperties + attrName, "tabulardata"); TabularData tds = (TabularData) value; TabularType tt = tds.getTabularType(); List<String> rowKeys = tt.getIndexNames(); CompositeType type = tt.getRowType(); Set<String> valueKeys = new TreeSet<String>(type.keySet()); valueKeys.removeAll(rowKeys); LinkedList<String> extendedAttrKeys = new LinkedList<String>(attrKeys); extendedAttrKeys.add(attrName); for (Object valu : tds.values()) { if (valu instanceof CompositeData) { CompositeData composite = (CompositeData) valu; LinkedHashMap<String, String> l2s = new LinkedHashMap<String, String>(beanProperties); for (String idx : rowKeys) { Object obj = composite.get(idx); if (obj != null) { // Nested tabulardata will repeat the 'key' label, so // append a suffix to distinguish each. while (l2s.containsKey(idx)) { idx = idx + "_"; } l2s.put(idx, obj.toString()); } } for(String valueIdx : valueKeys) { LinkedList<String> attrNames = extendedAttrKeys; String typ = type.getType(valueIdx).getTypeName(); String name = valueIdx; if (valueIdx.toLowerCase().equals("value")) { // Skip appending 'value' to the name attrNames = attrKeys; name = attrName; } processBeanValue( domain, l2s, attrNames, name, typ, type.getDescription(), composite.get(valueIdx)); } } else { logScrape(domain, "not a correct tabulardata format"); } } } else if (value.getClass().isArray()) { logScrape(domain, "arrays are unsupported"); } else { // 多半會返回不支持的對象而後得不到jmx監控值 // mydomain_test{3=3, topic=aaa} java.util.Hashtable is not exported logScrape(domain + beanProperties, attrType + " is not exported"); } } // 咱們看下prometheus 對 mbeanName 的轉換操做,會將各類特殊字符轉換爲 屬性列表 // io.prometheus.jmx.JmxMBeanPropertyCache#getKeyPropertyList public LinkedHashMap<String, String> getKeyPropertyList(ObjectName mbeanName) { LinkedHashMap<String, String> keyProperties = keyPropertiesPerBean.get(mbeanName); if (keyProperties == null) { keyProperties = new LinkedHashMap<String, String>(); // 轉化爲 string 格式 String properties = mbeanName.getKeyPropertyListString(); // 此處爲 prometheus 認識的格式,已經匹配上了 Matcher match = PROPERTY_PATTERN.matcher(properties); while (match.lookingAt()) { keyProperties.put(match.group(1), match.group(2)); properties = properties.substring(match.end()); if (properties.startsWith(",")) { properties = properties.substring(1); } match.reset(properties); } keyPropertiesPerBean.put(mbeanName, keyProperties); } return keyProperties; } // io.prometheus.jmx.JmxMBeanPropertyCache#PROPERTY_PATTERN private static final Pattern PROPERTY_PATTERN = Pattern.compile( "([^,=:\\*\\?]+)" + // Name - non-empty, anything but comma, equals, colon, star, or question mark "=" + // Equals "(" + // Either "\"" + // Quoted "(?:" + // A possibly empty sequence of "[^\\\\\"]*" + // Greedily match anything but backslash or quote "(?:\\\\.)?" + // Greedily see if we can match an escaped sequence ")*" + "\"" + "|" + // Or "[^,=:\"]*" + // Unquoted - can be empty, anything but comma, equals, colon, or quote ")");
prometheus 的數據格式以下,如何從埋點數據轉換?
# HELP mydomain_test_value_Count Attribute exposed for management (mydomain_test<_=_value, topic=b_topic><>Count) # TYPE mydomain_test_value_Count untyped mydomain_test_value_Count{topic="b_topic",} 1.0 mydomain_test_value_Count{topic="a_topic",} 132.0
是一個輸出格式問題,也是一協議問題。
// io.prometheus.client.exporter.common.TextFormat#write004 public static void write004(Writer writer, Enumeration<Collector.MetricFamilySamples> mfs) throws IOException { /* See http://prometheus.io/docs/instrumenting/exposition_formats/ * for the output format specification. */ while(mfs.hasMoreElements()) { Collector.MetricFamilySamples metricFamilySamples = mfs.nextElement(); writer.write("# HELP "); writer.write(metricFamilySamples.name); writer.write(' '); writeEscapedHelp(writer, metricFamilySamples.help); writer.write('\n'); writer.write("# TYPE "); writer.write(metricFamilySamples.name); writer.write(' '); writer.write(typeString(metricFamilySamples.type)); writer.write('\n'); for (Collector.MetricFamilySamples.Sample sample: metricFamilySamples.samples) { writer.write(sample.name); // 帶 labelNames 的,依次輸出對應的標籤 if (sample.labelNames.size() > 0) { writer.write('{'); for (int i = 0; i < sample.labelNames.size(); ++i) { writer.write(sample.labelNames.get(i)); writer.write("=\""); writeEscapedLabelValue(writer, sample.labelValues.get(i)); writer.write("\","); } writer.write('}'); } writer.write(' '); writer.write(Collector.doubleToGoString(sample.value)); if (sample.timestampMs != null){ writer.write(' '); writer.write(sample.timestampMs.toString()); } writer.write('\n'); } } }
done.