本文主要研究一下storm的LoggingClusterMetricsConsumerhtml
storm-2.0.0/storm-server/src/main/java/org/apache/storm/metric/LoggingClusterMetricsConsumer.javajava
public class LoggingClusterMetricsConsumer implements IClusterMetricsConsumer { public static final Logger LOG = LoggerFactory.getLogger(LoggingClusterMetricsConsumer.class); static private String padding = " "; @Override public void prepare(Object registrationArgument) { } @Override public void handleDataPoints(ClusterInfo clusterInfo, Collection<DataPoint> dataPoints) { StringBuilder sb = new StringBuilder(); String header = String.format("%d\t%15s\t%40s\t", clusterInfo.getTimestamp(), "<cluster>", "<cluster>"); sb.append(header); logDataPoints(dataPoints, sb, header); } @Override public void handleDataPoints(SupervisorInfo supervisorInfo, Collection<DataPoint> dataPoints) { StringBuilder sb = new StringBuilder(); String header = String.format("%d\t%15s\t%40s\t", supervisorInfo.getTimestamp(), supervisorInfo.getSrcSupervisorHost(), supervisorInfo.getSrcSupervisorId()); sb.append(header); for (DataPoint p : dataPoints) { sb.delete(header.length(), sb.length()); sb.append(p.getName()) .append(padding).delete(header.length() + 23, sb.length()).append("\t") .append(p.getValue()); LOG.info(sb.toString()); } } @Override public void cleanup() { } private void logDataPoints(Collection<DataPoint> dataPoints, StringBuilder sb, String header) { for (DataPoint p : dataPoints) { sb.delete(header.length(), sb.length()); sb.append(p.getName()) .append(padding).delete(header.length() + 23, sb.length()).append("\t") .append(p.getValue()); LOG.info(sb.toString()); } } }
## Cluster Metrics Consumers storm.cluster.metrics.consumer.register: - class: "org.apache.storm.metric.LoggingClusterMetricsConsumer" # - class: "com.example.demo.metric.FixedLoggingClusterMetricsConsumer" # argument: # - endpoint: "metrics-collector.mycompany.org" # storm.cluster.metrics.consumer.publish.interval.secs: 5
<?xml version="1.0" encoding="UTF-8"?> <!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. --> <configuration monitorInterval="60" shutdownHook="disable" packages="org.apache.logging.log4j.core,io.sentry.log4j2"> <properties> <property name="pattern">%d{yyyy-MM-dd HH:mm:ss.SSS} %c{1.} %t [%p] %msg%n</property> <property name="patternMetrics">%d %-8r %m%n</property> </properties> <appenders> <RollingFile name="A1" immediateFlush="false" fileName="${sys:storm.log.dir}/${sys:logfile.name}" filePattern="${sys:storm.log.dir}/${sys:logfile.name}.%i.gz"> <PatternLayout> <pattern>${pattern}</pattern> </PatternLayout> <Policies> <SizeBasedTriggeringPolicy size="100 MB"/> <!-- Or every 100 MB --> </Policies> <DefaultRolloverStrategy max="9"/> </RollingFile> <RollingFile name="WEB-ACCESS" immediateFlush="false" fileName="${sys:storm.log.dir}/access-web-${sys:daemon.name}.log" filePattern="${sys:storm.log.dir}/access-web-${sys:daemon.name}.log.%i.gz"> <PatternLayout> <pattern>${pattern}</pattern> </PatternLayout> <Policies> <SizeBasedTriggeringPolicy size="100 MB"/> <!-- Or every 100 MB --> </Policies> <DefaultRolloverStrategy max="9"/> </RollingFile> <RollingFile name="THRIFT-ACCESS" immediateFlush="false" fileName="${sys:storm.log.dir}/access-${sys:logfile.name}" filePattern="${sys:storm.log.dir}/access-${sys:logfile.name}.%i.gz"> <PatternLayout> <pattern>${pattern}</pattern> </PatternLayout> <Policies> <SizeBasedTriggeringPolicy size="100 MB"/> <!-- Or every 100 MB --> </Policies> <DefaultRolloverStrategy max="9"/> </RollingFile> <RollingFile name="METRICS" fileName="${sys:storm.log.dir}/${sys:logfile.name}.metrics" filePattern="${sys:storm.log.dir}/${sys:logfile.name}.metrics.%i.gz"> <PatternLayout> <pattern>${patternMetrics}</pattern> </PatternLayout> <Policies> <SizeBasedTriggeringPolicy size="2 MB"/> </Policies> <DefaultRolloverStrategy max="9"/> </RollingFile> <Syslog name="syslog" format="RFC5424" charset="UTF-8" host="localhost" port="514" protocol="UDP" appName="[${sys:daemon.name}]" mdcId="mdc" includeMDC="true" facility="LOCAL5" enterpriseNumber="18060" newLine="true" exceptionPattern="%rEx{full}" messageId="[${sys:user.name}:S0]" id="storm" immediateFlush="true" immediateFail="true"/> </appenders> <loggers> <Logger name="org.apache.storm.logging.filters.AccessLoggingFilter" level="info" additivity="false"> <AppenderRef ref="WEB-ACCESS"/> <AppenderRef ref="syslog"/> </Logger> <Logger name="org.apache.storm.logging.ThriftAccessLogger" level="info" additivity="false"> <AppenderRef ref="THRIFT-ACCESS"/> <AppenderRef ref="syslog"/> </Logger> <Logger name="org.apache.storm.metric.LoggingClusterMetricsConsumer" level="info" additivity="false"> <appender-ref ref="METRICS"/> </Logger> <root level="info"> <!-- We log everything --> <appender-ref ref="A1"/> <appender-ref ref="syslog"/> <appender-ref ref="Sentry" level="ERROR" /> </root> </loggers> </configuration>
2018-11-06 07:51:51,488 18628 1541490711 <cluster> <cluster> supervisors 1 2018-11-06 07:51:51,488 18628 1541490711 <cluster> <cluster> topologies 0 2018-11-06 07:51:51,489 18629 1541490711 <cluster> <cluster> slotsTotal 4 2018-11-06 07:51:51,489 18629 1541490711 <cluster> <cluster> slotsUsed 0 2018-11-06 07:51:51,489 18629 1541490711 <cluster> <cluster> slotsFree 4 2018-11-06 07:51:51,489 18629 1541490711 <cluster> <cluster> executorsTotal 0 2018-11-06 07:51:51,489 18629 1541490711 <cluster> <cluster> tasksTotal 0 2018-11-06 07:51:51,496 18636 1541490711 192.168.99.100 5bbd576d-218c-4365-ac5e-865b1f6e9b29 slotsTotal 4 2018-11-06 07:51:51,497 18637 1541490711 192.168.99.100 5bbd576d-218c-4365-ac5e-865b1f6e9b29 slotsUsed 0 2018-11-06 07:51:51,497 18637 1541490711 192.168.99.100 5bbd576d-218c-4365-ac5e-865b1f6e9b29 totalMem 3072.0 2018-11-06 07:51:51,497 18637 1541490711 192.168.99.100 5bbd576d-218c-4365-ac5e-865b1f6e9b29 totalCpu 400.0 2018-11-06 07:51:51,498 18638 1541490711 192.168.99.100 5bbd576d-218c-4365-ac5e-865b1f6e9b29 usedMem 0.0 2018-11-06 07:51:51,498 18638 1541490711 192.168.99.100 5bbd576d-218c-4365-ac5e-865b1f6e9b29 usedCpu 0.0
storm-2.0.0/storm-server/src/main/java/org/apache/storm/metric/ClusterMetricsConsumerExecutor.javaweb
public class ClusterMetricsConsumerExecutor { public static final Logger LOG = LoggerFactory.getLogger(ClusterMetricsConsumerExecutor.class); private static final String ERROR_MESSAGE_PREPARATION_CLUSTER_METRICS_CONSUMER_FAILED = "Preparation of Cluster Metrics Consumer failed. " + "Please check your configuration and/or corresponding systems and relaunch Nimbus. " + "Skipping handle metrics."; private IClusterMetricsConsumer metricsConsumer; private String consumerClassName; private Object registrationArgument; public ClusterMetricsConsumerExecutor(String consumerClassName, Object registrationArgument) { this.consumerClassName = consumerClassName; this.registrationArgument = registrationArgument; } public void prepare() { try { metricsConsumer = (IClusterMetricsConsumer) Class.forName(consumerClassName).newInstance(); metricsConsumer.prepare(registrationArgument); } catch (Exception e) { LOG.error("Could not instantiate or prepare Cluster Metrics Consumer with fully qualified name " + consumerClassName, e); if (metricsConsumer != null) { metricsConsumer.cleanup(); } metricsConsumer = null; } } public void handleDataPoints(final IClusterMetricsConsumer.ClusterInfo clusterInfo, final Collection<DataPoint> dataPoints) { if (metricsConsumer == null) { LOG.error(ERROR_MESSAGE_PREPARATION_CLUSTER_METRICS_CONSUMER_FAILED); return; } try { metricsConsumer.handleDataPoints(clusterInfo, dataPoints); } catch (Throwable e) { LOG.error("Error while handling cluster data points, consumer class: " + consumerClassName, e); } } public void handleDataPoints(final IClusterMetricsConsumer.SupervisorInfo supervisorInfo, final Collection<DataPoint> dataPoints) { if (metricsConsumer == null) { LOG.error(ERROR_MESSAGE_PREPARATION_CLUSTER_METRICS_CONSUMER_FAILED); return; } try { metricsConsumer.handleDataPoints(supervisorInfo, dataPoints); } catch (Throwable e) { LOG.error("Error while handling cluster data points, consumer class: " + consumerClassName, e); } } public void cleanup() { if (metricsConsumer != null) { metricsConsumer.cleanup(); } } }
storm-2.0.0/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.javaexpress
public void launchServer() throws Exception { try { BlobStore store = blobStore; IStormClusterState state = stormClusterState; NimbusInfo hpi = nimbusHostPortInfo; LOG.info("Starting Nimbus with conf {}", ConfigUtils.maskPasswords(conf)); validator.prepare(conf); //...... timer.scheduleRecurring(0, ObjectReader.getInt(conf.get(DaemonConfig.STORM_CLUSTER_METRICS_CONSUMER_PUBLISH_INTERVAL_SECS)), () -> { try { if (isLeader()) { sendClusterMetricsToExecutors(); } } catch (Exception e) { throw new RuntimeException(e); } }); timer.scheduleRecurring(5, 5, clusterMetricSet); } catch (Exception e) { if (Utils.exceptionCauseIsInstanceOf(InterruptedException.class, e)) { throw e; } if (Utils.exceptionCauseIsInstanceOf(InterruptedIOException.class, e)) { throw e; } LOG.error("Error on initialization of nimbus", e); Utils.exitProcess(13, "Error on initialization of nimbus"); } } private boolean isLeader() throws Exception { return leaderElector.isLeader(); }
storm.cluster.metrics.consumer.publish.interval.secs
),在defaults.yaml文件中默認爲60,也能夠本身在storm.yaml中本身指定storm-2.0.0/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.javaapache
private void sendClusterMetricsToExecutors() throws Exception { ClusterInfo clusterInfo = mkClusterInfo(); ClusterSummary clusterSummary = getClusterInfoImpl(); List<DataPoint> clusterMetrics = extractClusterMetrics(clusterSummary); Map<IClusterMetricsConsumer.SupervisorInfo, List<DataPoint>> supervisorMetrics = extractSupervisorMetrics(clusterSummary); for (ClusterMetricsConsumerExecutor consumerExecutor : clusterConsumerExceutors) { consumerExecutor.handleDataPoints(clusterInfo, clusterMetrics); for (Entry<IClusterMetricsConsumer.SupervisorInfo, List<DataPoint>> entry : supervisorMetrics.entrySet()) { consumerExecutor.handleDataPoints(entry.getKey(), entry.getValue()); } } }
storm-2.0.0/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.javasegmentfault
private class ClusterSummaryMetricSet implements Runnable { private static final int CACHING_WINDOW = 5; private final ClusterSummaryMetrics clusterSummaryMetrics = new ClusterSummaryMetrics(); private final Function<String, Histogram> registerHistogram = (name) -> { //This histogram reflects the data distribution across only one ClusterSummary, i.e., // data distribution across all entities of a type (e.g., data from all nimbus/topologies) at one moment. // Hence we use half of the CACHING_WINDOW time to ensure it retains only data from the most recent update final Histogram histogram = new Histogram(new SlidingTimeWindowReservoir(CACHING_WINDOW / 2, TimeUnit.SECONDS)); clusterSummaryMetrics.put(name, histogram); return histogram; }; private volatile boolean active = false; //NImbus metrics distribution private final Histogram nimbusUptime = registerHistogram.apply("nimbuses:uptime-secs"); //Supervisor metrics distribution private final Histogram supervisorsUptime = registerHistogram.apply("supervisors:uptime-secs"); private final Histogram supervisorsNumWorkers = registerHistogram.apply("supervisors:num-workers"); private final Histogram supervisorsNumUsedWorkers = registerHistogram.apply("supervisors:num-used-workers"); private final Histogram supervisorsUsedMem = registerHistogram.apply("supervisors:used-mem"); private final Histogram supervisorsUsedCpu = registerHistogram.apply("supervisors:used-cpu"); private final Histogram supervisorsFragmentedMem = registerHistogram.apply("supervisors:fragmented-mem"); private final Histogram supervisorsFragmentedCpu = registerHistogram.apply("supervisors:fragmented-cpu"); //Topology metrics distribution private final Histogram topologiesNumTasks = registerHistogram.apply("topologies:num-tasks"); private final Histogram topologiesNumExecutors = registerHistogram.apply("topologies:num-executors"); private final Histogram topologiesNumWorker = registerHistogram.apply("topologies:num-workers"); private final Histogram topologiesUptime = registerHistogram.apply("topologies:uptime-secs"); private final Histogram topologiesReplicationCount = registerHistogram.apply("topologies:replication-count"); private final Histogram topologiesRequestedMemOnHeap = registerHistogram.apply("topologies:requested-mem-on-heap"); private final Histogram topologiesRequestedMemOffHeap = registerHistogram.apply("topologies:requested-mem-off-heap"); private final Histogram topologiesRequestedCpu = registerHistogram.apply("topologies:requested-cpu"); private final Histogram topologiesAssignedMemOnHeap = registerHistogram.apply("topologies:assigned-mem-on-heap"); private final Histogram topologiesAssignedMemOffHeap = registerHistogram.apply("topologies:assigned-mem-off-heap"); private final Histogram topologiesAssignedCpu = registerHistogram.apply("topologies:assigned-cpu"); private final StormMetricsRegistry metricsRegistry; /** * Constructor to put all items in ClusterSummary in MetricSet as a metric. * All metrics are derived from a cached ClusterSummary object, * expired {@link ClusterSummaryMetricSet#CACHING_WINDOW} seconds after first query in a while from reporters. * In case of {@link com.codahale.metrics.ScheduledReporter}, CACHING_WINDOW should be set shorter than * reporting interval to avoid outdated reporting. */ ClusterSummaryMetricSet(StormMetricsRegistry metricsRegistry) { this.metricsRegistry = metricsRegistry; //Break the code if out of sync to thrift protocol assert ClusterSummary._Fields.values().length == 3 && ClusterSummary._Fields.findByName("supervisors") == ClusterSummary._Fields.SUPERVISORS && ClusterSummary._Fields.findByName("topologies") == ClusterSummary._Fields.TOPOLOGIES && ClusterSummary._Fields.findByName("nimbuses") == ClusterSummary._Fields.NIMBUSES; final CachedGauge<ClusterSummary> cachedSummary = new CachedGauge<ClusterSummary>(CACHING_WINDOW, TimeUnit.SECONDS) { @Override protected ClusterSummary loadValue() { try { ClusterSummary newSummary = getClusterInfoImpl(); LOG.debug("The new summary is {}", newSummary); /* * Update histograms based on the new summary. Most common implementation of Reporter reports Gauges before * Histograms. Because DerivativeGauge will trigger cache refresh upon reporter's query, histogram will also be * updated before query */ updateHistogram(newSummary); return newSummary; } catch (Exception e) { LOG.warn("Get cluster info exception.", e); throw new RuntimeException(e); } } }; clusterSummaryMetrics.put("cluster:num-nimbus-leaders", new DerivativeGauge<ClusterSummary, Long>(cachedSummary) { @Override protected Long transform(ClusterSummary clusterSummary) { return clusterSummary.get_nimbuses().stream() .filter(NimbusSummary::is_isLeader) .count(); } }); clusterSummaryMetrics.put("cluster:num-nimbuses", new DerivativeGauge<ClusterSummary, Integer>(cachedSummary) { @Override protected Integer transform(ClusterSummary clusterSummary) { return clusterSummary.get_nimbuses_size(); } }); clusterSummaryMetrics.put("cluster:num-supervisors", new DerivativeGauge<ClusterSummary, Integer>(cachedSummary) { @Override protected Integer transform(ClusterSummary clusterSummary) { return clusterSummary.get_supervisors_size(); } }); clusterSummaryMetrics.put("cluster:num-topologies", new DerivativeGauge<ClusterSummary, Integer>(cachedSummary) { @Override protected Integer transform(ClusterSummary clusterSummary) { return clusterSummary.get_topologies_size(); } }); clusterSummaryMetrics.put("cluster:num-total-workers", new DerivativeGauge<ClusterSummary, Integer>(cachedSummary) { @Override protected Integer transform(ClusterSummary clusterSummary) { return clusterSummary.get_supervisors().stream() .mapToInt(SupervisorSummary::get_num_workers) .sum(); } }); clusterSummaryMetrics.put("cluster:num-total-used-workers", new DerivativeGauge<ClusterSummary, Integer>(cachedSummary) { @Override protected Integer transform(ClusterSummary clusterSummary) { return clusterSummary.get_supervisors().stream() .mapToInt(SupervisorSummary::get_num_used_workers) .sum(); } }); clusterSummaryMetrics.put("cluster:total-fragmented-memory-non-negative", new DerivativeGauge<ClusterSummary, Double>(cachedSummary) { @Override protected Double transform(ClusterSummary clusterSummary) { return clusterSummary.get_supervisors().stream() //Filtered negative value .mapToDouble(supervisorSummary -> Math.max(supervisorSummary.get_fragmented_mem(), 0)) .sum(); } }); clusterSummaryMetrics.put("cluster:total-fragmented-cpu-non-negative", new DerivativeGauge<ClusterSummary, Double>(cachedSummary) { @Override protected Double transform(ClusterSummary clusterSummary) { return clusterSummary.get_supervisors().stream() //Filtered negative value .mapToDouble(supervisorSummary -> Math.max(supervisorSummary.get_fragmented_cpu(), 0)) .sum(); } }); } private void updateHistogram(ClusterSummary newSummary) { for (NimbusSummary nimbusSummary : newSummary.get_nimbuses()) { nimbusUptime.update(nimbusSummary.get_uptime_secs()); } for (SupervisorSummary summary : newSummary.get_supervisors()) { supervisorsUptime.update(summary.get_uptime_secs()); supervisorsNumWorkers.update(summary.get_num_workers()); supervisorsNumUsedWorkers.update(summary.get_num_used_workers()); supervisorsUsedMem.update(Math.round(summary.get_used_mem())); supervisorsUsedCpu.update(Math.round(summary.get_used_cpu())); supervisorsFragmentedMem.update(Math.round(summary.get_fragmented_mem())); supervisorsFragmentedCpu.update(Math.round(summary.get_fragmented_cpu())); } for (TopologySummary summary : newSummary.get_topologies()) { topologiesNumTasks.update(summary.get_num_tasks()); topologiesNumExecutors.update(summary.get_num_executors()); topologiesNumWorker.update(summary.get_num_workers()); topologiesUptime.update(summary.get_uptime_secs()); topologiesReplicationCount.update(summary.get_replication_count()); topologiesRequestedMemOnHeap.update(Math.round(summary.get_requested_memonheap())); topologiesRequestedMemOffHeap.update(Math.round(summary.get_requested_memoffheap())); topologiesRequestedCpu.update(Math.round(summary.get_requested_cpu())); topologiesAssignedMemOnHeap.update(Math.round(summary.get_assigned_memonheap())); topologiesAssignedMemOffHeap.update(Math.round(summary.get_assigned_memoffheap())); topologiesAssignedCpu.update(Math.round(summary.get_assigned_cpu())); } } void setActive(final boolean active) { if (this.active != active) { this.active = active; if (active) { metricsRegistry.registerAll(clusterSummaryMetrics); } else { metricsRegistry.removeAll(clusterSummaryMetrics); } } } @Override public void run() { try { setActive(isLeader()); } catch (Exception e) { throw new RuntimeException(e); } } }