本文主要研究一下springboot2如何上報metrics到statsdhtml
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>io.micrometer</groupId> <artifactId>micrometer-registry-statsd</artifactId> </dependency>
# Whether exporting of metrics to StatsD is enabled. management.metrics.export.statsd.enabled=true # StatsD line protocol to use. datalog or esty management.metrics.export.statsd.flavor=etsy # Host of the StatsD server to receive exported metrics. management.metrics.export.statsd.host=192.168.99.100 # Port of the StatsD server to receive exported metrics. management.metrics.export.statsd.port=8125 # Total length of a single payload should be kept within your network's MTU. management.metrics.export.statsd.max-packet-length=1400 # How often gauges will be polled. When a gauge is polled, its value is recalculated and if the value has changed (or publishUnchangedMeters is true), it is sent to the StatsD server. management.metrics.export.statsd.polling-frequency=10s # Whether to send unchanged meters to the StatsD server. management.metrics.export.statsd.publish-unchanged-meters=true # Maximum size of the queue of items waiting to be sent to the StatsD server. management.metrics.export.statsd.queue-size=2147483647
micrometer-registry-statsd-1.0.1-sources.jar!/io/micrometer/statsd/StatsdFlavor.javajava
public enum StatsdFlavor { /** * https://github.com/etsy/statsd/blob/master/docs/metric_types.md */ ETSY, /** * https://docs.datadoghq.com/guides/dogstatsd/#datagram-format */ DATADOG, /** * https://www.influxdata.com/blog/getting-started-with-sending-statsd-metrics-to-telegraf-influxdb/ * * For gauges to work as expected, you should set `delete_gauges = false` in your input options as documented here: * https://github.com/influxdata/telegraf/tree/master/plugins/inputs/statsd */ TELEGRAF }
flavor有好幾種,默認是DATALOG,這裏使用ETSY
spring-boot-actuator-autoconfigure-2.0.0.RELEASE-sources.jar!/org/springframework/boot/actuate/autoconfigure/metrics/export/statsd/StatsdProperties.javareact
@ConfigurationProperties(prefix = "management.metrics.export.statsd") public class StatsdProperties { /** * Whether exporting of metrics to StatsD is enabled. */ private boolean enabled = true; /** * StatsD line protocol to use. */ private StatsdFlavor flavor = StatsdFlavor.DATADOG; /** * Host of the StatsD server to receive exported metrics. */ private String host = "localhost"; /** * Port of the StatsD server to receive exported metrics. */ private Integer port = 8125; /** * Total length of a single payload should be kept within your network's MTU. */ private Integer maxPacketLength = 1400; /** * How often gauges will be polled. When a gauge is polled, its value is recalculated * and if the value has changed (or publishUnchangedMeters is true), it is sent to the * StatsD server. */ private Duration pollingFrequency = Duration.ofSeconds(10); /** * Maximum size of the queue of items waiting to be sent to the StatsD server. */ private Integer queueSize = Integer.MAX_VALUE; /** * Whether to send unchanged meters to the StatsD server. */ private boolean publishUnchangedMeters = true; //...... }
注意這裏的queueSize默認是無限大。不過仔細看源碼貌似沒看到調用的地方。
curl -i http://localhost:8080/actuator/metrics/jvm.memory.used?tag=area:heap
返回git
{ "name": "jvm.memory.used", "measurements": [ { "statistic": "VALUE", "value": 442224240 } ], "availableTags": [ { "tag": "id", "values": [ "PS Eden Space", "PS Old Gen", "PS Survivor Space" ] } ] }
curl -i http://localhost:8080/actuator/metrics/jvm.memory.used?tag=area:heap&tag=id:PS Eden Space
返回github
{ "name": "jvm.memory.used", "measurements": [ { "statistic": "VALUE", "value": 388454976 } ], "availableTags": [] }
因爲esty的statsd不支持tag,所以輸出時將tag轉爲了name的一部分
micrometer-registry-statsd-1.0.1-sources.jar!/io/micrometer/statsd/internal/FlavorStatsdLineBuilder.javaspring
/** * A Statsd serializer for a particular {@link Meter} that formats the line in different * ways depending on the prevailing {@link StatsdFlavor}. * * @author Jon Schneider */ public class FlavorStatsdLineBuilder implements StatsdLineBuilder { private final Meter.Id id; private final StatsdFlavor flavor; private final HierarchicalNameMapper nameMapper; private final MeterRegistry.Config config; private final Function<NamingConvention, String> datadogTagString; private final Function<NamingConvention, String> telegrafTagString; public FlavorStatsdLineBuilder(Meter.Id id, StatsdFlavor flavor, HierarchicalNameMapper nameMapper, MeterRegistry.Config config) { this.id = id; this.flavor = flavor; this.nameMapper = nameMapper; this.config = config; // service:payroll,region:us-west this.datadogTagString = memoize(convention -> id.getTags().iterator().hasNext() ? id.getConventionTags(convention).stream() .map(t -> t.getKey() + ":" + t.getValue()) .collect(Collectors.joining(",")) : null ); // service=payroll,region=us-west this.telegrafTagString = memoize(convention -> id.getTags().iterator().hasNext() ? id.getConventionTags(convention).stream() .map(t -> t.getKey() + "=" + t.getValue()) .collect(Collectors.joining(",")) : null ); } @Override public String count(long amount, Statistic stat) { return line(Long.toString(amount), stat, "c"); } @Override public String gauge(double amount, Statistic stat) { return line(DoubleFormat.decimalOrNan(amount), stat, "g"); } @Override public String histogram(double amount) { return line(DoubleFormat.decimalOrNan(amount), null, "h"); } @Override public String timing(double timeMs) { return line(DoubleFormat.decimalOrNan(timeMs), null, "ms"); } private String line(String amount, @Nullable Statistic stat, String type) { switch (flavor) { case ETSY: return metricName(stat) + ":" + amount + "|" + type; case DATADOG: return metricName(stat) + ":" + amount + "|" + type + tags(stat, datadogTagString.apply(config.namingConvention()),":", "|#"); case TELEGRAF: default: return metricName(stat) + tags(stat, telegrafTagString.apply(config.namingConvention()),"=", ",") + ":" + amount + "|" + type; } } private String tags(@Nullable Statistic stat, String otherTags, String keyValueSeparator, String preamble) { String tags = of(stat == null ? null : "statistic" + keyValueSeparator + stat.getTagValueRepresentation(), otherTags) .filter(Objects::nonNull) .collect(Collectors.joining(",")); if(!tags.isEmpty()) tags = preamble + tags; return tags; } private String metricName(@Nullable Statistic stat) { switch (flavor) { case ETSY: return nameMapper.toHierarchicalName(stat != null ? id.withTag(stat) : id, config.namingConvention()); case DATADOG: case TELEGRAF: default: return config.namingConvention().name(id.getName(), id.getType(), id.getBaseUnit()); } } }
重點看tags方法
spring-boot-actuator-autoconfigure-2.0.0.RELEASE-sources.jar!/org/springframework/boot/actuate/autoconfigure/metrics/export/statsd/StatsdMetricsExportAutoConfiguration.javaspringboot
@Configuration @AutoConfigureBefore({ CompositeMeterRegistryAutoConfiguration.class, SimpleMetricsExportAutoConfiguration.class }) @AutoConfigureAfter(MetricsAutoConfiguration.class) @ConditionalOnBean(Clock.class) @ConditionalOnClass(StatsdMeterRegistry.class) @ConditionalOnProperty(prefix = "management.metrics.export.statsd", name = "enabled", havingValue = "true", matchIfMissing = true) @EnableConfigurationProperties(StatsdProperties.class) public class StatsdMetricsExportAutoConfiguration { @Bean @ConditionalOnMissingBean(StatsdConfig.class) public StatsdConfig statsdConfig(StatsdProperties statsdProperties) { return new StatsdPropertiesConfigAdapter(statsdProperties); } @Bean @ConditionalOnMissingBean public StatsdMeterRegistry statsdMeterRegistry(StatsdConfig statsdConfig, HierarchicalNameMapper hierarchicalNameMapper, Clock clock) { return new StatsdMeterRegistry(statsdConfig, hierarchicalNameMapper, clock); } @Bean @ConditionalOnMissingBean public HierarchicalNameMapper hierarchicalNameMapper() { return HierarchicalNameMapper.DEFAULT; } @Bean public StatsdMetrics statsdMetrics() { return new StatsdMetrics(); } }
注意這裏使用StatsdPropertiesConfigAdapter將statsdProperties適配爲statsdConfig
這裏還建立了StatsdMeterRegistry
micrometer-registry-statsd-1.0.1-sources.jar!/io/micrometer/statsd/StatsdMeterRegistry.javaapp
public class StatsdMeterRegistry extends MeterRegistry { //...... private StatsdMeterRegistry(StatsdConfig config, HierarchicalNameMapper nameMapper, NamingConvention namingConvention, Clock clock, @Nullable Function<Meter.Id, StatsdLineBuilder> lineBuilderFunction, @Nullable Consumer<String> lineSink) { super(clock); this.statsdConfig = config; this.nameMapper = nameMapper; this.lineBuilderFunction = lineBuilderFunction; this.lineSink = lineSink; config().namingConvention(namingConvention); UnicastProcessor<String> processor = UnicastProcessor.create(Queues.<String>unboundedMultiproducer().get()); try { Class.forName("ch.qos.logback.classic.turbo.TurboFilter", false, getClass().getClassLoader()); this.publisher = new LogbackMetricsSuppressingUnicastProcessor(processor); } catch (ClassNotFoundException e) { this.publisher = processor; } if (lineSink != null) { publisher.subscribe(new Subscriber<String>() { @Override public void onSubscribe(Subscription s) { s.request(Long.MAX_VALUE); } @Override public void onNext(String line) { if (started.get()) { lineSink.accept(line); } } @Override public void onError(Throwable t) { } @Override public void onComplete() { meterPoller.dispose(); } }); // now that we're connected, start polling gauges and other pollable meter types meterPoller.replace(Flux.interval(statsdConfig.pollingFrequency()) .doOnEach(n -> pollableMeters.forEach(StatsdPollable::poll)) .subscribe()); } if (config.enabled()) start(); } public void start() { if (started.compareAndSet(false, true) && lineSink == null) { UdpClient.create(statsdConfig.host(), statsdConfig.port()) .newHandler((in, out) -> out .options(NettyPipeline.SendOptions::flushOnEach) .sendString(publisher) .neverComplete() ) .subscribe(client -> { this.udpClient.replace(client); // now that we're connected, start polling gauges and other pollable meter types meterPoller.replace(Flux.interval(statsdConfig.pollingFrequency()) .doOnEach(n -> pollableMeters.forEach(StatsdPollable::poll)) .subscribe()); }); } } public void stop() { if (started.compareAndSet(true, false)) { udpClient.dispose(); meterPoller.dispose(); } } @Override public void close() { stop(); super.close(); } //...... }
1.能夠看到底層是使用reactor的UdpClient,processor使用的是UnicastProcessor,用的隊列是無界的MpscLinkedQueue
2.這裏我看半天沒看到配置文件設置的queueSize做用在哪裏
3.具體的數據中轉是經過這個processor來處理,UdpClient這裏訂閱processor,而後各個metrics往processor產生數據
好比
micrometer-registry-statsd-1.0.1-sources.jar!/io/micrometer/statsd/StatsdGauge.javacurl
public class StatsdGauge<T> extends AbstractMeter implements Gauge, StatsdPollable { private final StatsdLineBuilder lineBuilder; private final Subscriber<String> publisher; private final WeakReference<T> ref; private final ToDoubleFunction<T> value; private final AtomicReference<Double> lastValue = new AtomicReference<>(Double.NaN); private final boolean alwaysPublish; StatsdGauge(Id id, StatsdLineBuilder lineBuilder, Subscriber<String> publisher, @Nullable T obj, ToDoubleFunction<T> value, boolean alwaysPublish) { super(id); this.lineBuilder = lineBuilder; this.publisher = publisher; this.ref = new WeakReference<>(obj); this.value = value; this.alwaysPublish = alwaysPublish; } @Override public double value() { T obj = ref.get(); return obj != null ? value.applyAsDouble(ref.get()) : 0; } @Override public void poll() { double val = value(); if (alwaysPublish || lastValue.getAndSet(val) != val) { publisher.onNext(lineBuilder.gauge(val)); } } @SuppressWarnings("EqualsWhichDoesntCheckParameterClass") @Override public boolean equals(Object o) { return MeterEquivalence.equals(this, o); } @Override public int hashCode() { return MeterEquivalence.hashCode(this); } }
能夠看到這裏的poll方法往publisheronNext數據
springboot2目前貌似不支持直接在配置文件指定statsd的prefix(1.x版本的spring.metrics.export.statsd.prefix在2版本中已經被標記爲廢棄
),這樣會形成多個應用服務上報指標的時候,沒法區分開來是哪一個服務的指標(目測將來版本會支持,2.0.0版本要支持的話,也能夠寫點代碼擴展支持prefix
)。jvm