聊聊storm的LoggingClusterMetricsConsumer

本文主要研究一下storm的LoggingClusterMetricsConsumerhtml

LoggingClusterMetricsConsumer

storm-2.0.0/storm-server/src/main/java/org/apache/storm/metric/LoggingClusterMetricsConsumer.javajava

public class LoggingClusterMetricsConsumer implements IClusterMetricsConsumer {
    public static final Logger LOG = LoggerFactory.getLogger(LoggingClusterMetricsConsumer.class);

    static private String padding = "                       ";

    @Override
    public void prepare(Object registrationArgument) {
    }

    @Override
    public void handleDataPoints(ClusterInfo clusterInfo, Collection<DataPoint> dataPoints) {
        StringBuilder sb = new StringBuilder();
        String header = String.format("%d\t%15s\t%40s\t",
                                      clusterInfo.getTimestamp(), "<cluster>", "<cluster>");
        sb.append(header);
        logDataPoints(dataPoints, sb, header);
    }

    @Override
    public void handleDataPoints(SupervisorInfo supervisorInfo, Collection<DataPoint> dataPoints) {
        StringBuilder sb = new StringBuilder();
        String header = String.format("%d\t%15s\t%40s\t",
                                      supervisorInfo.getTimestamp(),
                                      supervisorInfo.getSrcSupervisorHost(),
                                      supervisorInfo.getSrcSupervisorId());
        sb.append(header);
        for (DataPoint p : dataPoints) {
            sb.delete(header.length(), sb.length());
            sb.append(p.getName())
              .append(padding).delete(header.length() + 23, sb.length()).append("\t")
              .append(p.getValue());
            LOG.info(sb.toString());
        }
    }

    @Override
    public void cleanup() {
    }

    private void logDataPoints(Collection<DataPoint> dataPoints, StringBuilder sb, String header) {
        for (DataPoint p : dataPoints) {
            sb.delete(header.length(), sb.length());
            sb.append(p.getName())
              .append(padding).delete(header.length() + 23, sb.length()).append("\t")
              .append(p.getValue());
            LOG.info(sb.toString());
        }
    }
}
  • 這個是cluster級別的metrics consumer,只能在storm.yaml裏頭配置
  • 它的handleDataPoints供ClusterMetricsConsumerExecutor回調
  • 這裏handleDataPoints僅僅是打印到日誌文件

storm.yaml配置

## Cluster Metrics Consumers
storm.cluster.metrics.consumer.register:
   - class: "org.apache.storm.metric.LoggingClusterMetricsConsumer"
#   - class: "com.example.demo.metric.FixedLoggingClusterMetricsConsumer"
#     argument:
#       - endpoint: "metrics-collector.mycompany.org"
#
storm.cluster.metrics.consumer.publish.interval.secs: 5
  • 這裏指定了consumer類爲LoggingClusterMetricsConsumer,同時指定了publish interval爲5秒

cluster.xml

<?xml version="1.0" encoding="UTF-8"?>
<!--
 Licensed to the Apache Software Foundation (ASF) under one or more
 contributor license agreements.  See the NOTICE file distributed with
 this work for additional information regarding copyright ownership.
 The ASF licenses this file to You under the Apache License, Version 2.0
 (the "License"); you may not use this file except in compliance with
 the License.  You may obtain a copy of the License at

     http://www.apache.org/licenses/LICENSE-2.0

 Unless required by applicable law or agreed to in writing, software
 distributed under the License is distributed on an "AS IS" BASIS,
 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
-->

<configuration monitorInterval="60" shutdownHook="disable" packages="org.apache.logging.log4j.core,io.sentry.log4j2">
<properties>
    <property name="pattern">%d{yyyy-MM-dd HH:mm:ss.SSS} %c{1.} %t [%p] %msg%n</property>
    <property name="patternMetrics">%d %-8r %m%n</property>
</properties>
<appenders>
    <RollingFile name="A1" immediateFlush="false"
                 fileName="${sys:storm.log.dir}/${sys:logfile.name}"
                 filePattern="${sys:storm.log.dir}/${sys:logfile.name}.%i.gz">
        <PatternLayout>
            <pattern>${pattern}</pattern>
        </PatternLayout>
        <Policies>
            <SizeBasedTriggeringPolicy size="100 MB"/> <!-- Or every 100 MB -->
        </Policies>
        <DefaultRolloverStrategy max="9"/>
    </RollingFile>
    <RollingFile name="WEB-ACCESS" immediateFlush="false"
                 fileName="${sys:storm.log.dir}/access-web-${sys:daemon.name}.log"
                 filePattern="${sys:storm.log.dir}/access-web-${sys:daemon.name}.log.%i.gz">
        <PatternLayout>
            <pattern>${pattern}</pattern>
        </PatternLayout>
        <Policies>
            <SizeBasedTriggeringPolicy size="100 MB"/> <!-- Or every 100 MB -->
        </Policies>
        <DefaultRolloverStrategy max="9"/>
    </RollingFile>
    <RollingFile name="THRIFT-ACCESS" immediateFlush="false"
                 fileName="${sys:storm.log.dir}/access-${sys:logfile.name}"
                 filePattern="${sys:storm.log.dir}/access-${sys:logfile.name}.%i.gz">
    <PatternLayout>
        <pattern>${pattern}</pattern>
    </PatternLayout>
        <Policies>
            <SizeBasedTriggeringPolicy size="100 MB"/> <!-- Or every 100 MB -->
        </Policies>
        <DefaultRolloverStrategy max="9"/>
    </RollingFile>
    <RollingFile name="METRICS"
                 fileName="${sys:storm.log.dir}/${sys:logfile.name}.metrics"
                 filePattern="${sys:storm.log.dir}/${sys:logfile.name}.metrics.%i.gz">
        <PatternLayout>
            <pattern>${patternMetrics}</pattern>
        </PatternLayout>
        <Policies>
            <SizeBasedTriggeringPolicy size="2 MB"/>
        </Policies>
        <DefaultRolloverStrategy max="9"/>
    </RollingFile>
    <Syslog name="syslog" format="RFC5424" charset="UTF-8" host="localhost" port="514"
            protocol="UDP" appName="[${sys:daemon.name}]" mdcId="mdc" includeMDC="true"
            facility="LOCAL5" enterpriseNumber="18060" newLine="true" exceptionPattern="%rEx{full}"
            messageId="[${sys:user.name}:S0]" id="storm" immediateFlush="true" immediateFail="true"/>
</appenders>
<loggers>

    <Logger name="org.apache.storm.logging.filters.AccessLoggingFilter" level="info" additivity="false">
        <AppenderRef ref="WEB-ACCESS"/>
        <AppenderRef ref="syslog"/>
    </Logger>
    <Logger name="org.apache.storm.logging.ThriftAccessLogger" level="info" additivity="false">
        <AppenderRef ref="THRIFT-ACCESS"/>
        <AppenderRef ref="syslog"/>
    </Logger>
    <Logger name="org.apache.storm.metric.LoggingClusterMetricsConsumer" level="info" additivity="false">
        <appender-ref ref="METRICS"/>
    </Logger>
    <root level="info"> <!-- We log everything -->
        <appender-ref ref="A1"/>
        <appender-ref ref="syslog"/>
        <appender-ref ref="Sentry" level="ERROR" />
    </root>
</loggers>
</configuration>
  • cluster.xml指定了metrics logging的相關配置,這裏使用的是METRICS appender,該appender是一個RollingFile,文件名爲${sys:storm.log.dir}/${sys:logfile.name}.metrics,例如nimbus默認的logfile.name爲nimbus.log,supervisor默認的logfile.name爲supervisor.log,於是寫入的文件爲nimbus.log.metrics及supervisor.log.metrics

日誌輸出實例

2018-11-06 07:51:51,488 18628    1541490711           <cluster>                                <cluster>        supervisors             1
2018-11-06 07:51:51,488 18628    1541490711           <cluster>                                <cluster>        topologies              0
2018-11-06 07:51:51,489 18629    1541490711           <cluster>                                <cluster>        slotsTotal              4
2018-11-06 07:51:51,489 18629    1541490711           <cluster>                                <cluster>        slotsUsed               0
2018-11-06 07:51:51,489 18629    1541490711           <cluster>                                <cluster>        slotsFree               4
2018-11-06 07:51:51,489 18629    1541490711           <cluster>                                <cluster>        executorsTotal          0
2018-11-06 07:51:51,489 18629    1541490711           <cluster>                                <cluster>        tasksTotal              0
2018-11-06 07:51:51,496 18636    1541490711      192.168.99.100     5bbd576d-218c-4365-ac5e-865b1f6e9b29        slotsTotal              4
2018-11-06 07:51:51,497 18637    1541490711      192.168.99.100     5bbd576d-218c-4365-ac5e-865b1f6e9b29        slotsUsed               0
2018-11-06 07:51:51,497 18637    1541490711      192.168.99.100     5bbd576d-218c-4365-ac5e-865b1f6e9b29        totalMem                3072.0
2018-11-06 07:51:51,497 18637    1541490711      192.168.99.100     5bbd576d-218c-4365-ac5e-865b1f6e9b29        totalCpu                400.0
2018-11-06 07:51:51,498 18638    1541490711      192.168.99.100     5bbd576d-218c-4365-ac5e-865b1f6e9b29        usedMem                 0.0
2018-11-06 07:51:51,498 18638    1541490711      192.168.99.100     5bbd576d-218c-4365-ac5e-865b1f6e9b29        usedCpu                 0.0

ClusterMetricsConsumerExecutor

storm-2.0.0/storm-server/src/main/java/org/apache/storm/metric/ClusterMetricsConsumerExecutor.javaweb

public class ClusterMetricsConsumerExecutor {
    public static final Logger LOG = LoggerFactory.getLogger(ClusterMetricsConsumerExecutor.class);
    private static final String ERROR_MESSAGE_PREPARATION_CLUSTER_METRICS_CONSUMER_FAILED =
        "Preparation of Cluster Metrics Consumer failed. " +
        "Please check your configuration and/or corresponding systems and relaunch Nimbus. " +
        "Skipping handle metrics.";

    private IClusterMetricsConsumer metricsConsumer;
    private String consumerClassName;
    private Object registrationArgument;

    public ClusterMetricsConsumerExecutor(String consumerClassName, Object registrationArgument) {
        this.consumerClassName = consumerClassName;
        this.registrationArgument = registrationArgument;
    }

    public void prepare() {
        try {
            metricsConsumer = (IClusterMetricsConsumer) Class.forName(consumerClassName).newInstance();
            metricsConsumer.prepare(registrationArgument);
        } catch (Exception e) {
            LOG.error("Could not instantiate or prepare Cluster Metrics Consumer with fully qualified name " +
                      consumerClassName, e);

            if (metricsConsumer != null) {
                metricsConsumer.cleanup();
            }
            metricsConsumer = null;
        }
    }

    public void handleDataPoints(final IClusterMetricsConsumer.ClusterInfo clusterInfo, final Collection<DataPoint> dataPoints) {
        if (metricsConsumer == null) {
            LOG.error(ERROR_MESSAGE_PREPARATION_CLUSTER_METRICS_CONSUMER_FAILED);
            return;
        }

        try {
            metricsConsumer.handleDataPoints(clusterInfo, dataPoints);
        } catch (Throwable e) {
            LOG.error("Error while handling cluster data points, consumer class: " + consumerClassName, e);
        }
    }

    public void handleDataPoints(final IClusterMetricsConsumer.SupervisorInfo supervisorInfo, final Collection<DataPoint> dataPoints) {
        if (metricsConsumer == null) {
            LOG.error(ERROR_MESSAGE_PREPARATION_CLUSTER_METRICS_CONSUMER_FAILED);
            return;
        }

        try {
            metricsConsumer.handleDataPoints(supervisorInfo, dataPoints);
        } catch (Throwable e) {
            LOG.error("Error while handling cluster data points, consumer class: " + consumerClassName, e);
        }
    }

    public void cleanup() {
        if (metricsConsumer != null) {
            metricsConsumer.cleanup();
        }
    }
}
  • ClusterMetricsConsumerExecutor在prepare的時候,根據consumerClassName來實例化IClusterMetricsConsumer的實現類,以後傳入調用metricsConsumer.prepare(registrationArgument)作些準備
  • ClusterMetricsConsumerExecutor的handleDataPoints方法其實是代理了metricsConsumer的handleDataPoints
  • 該handleDataPoints方法有兩個,他們都有共同的參數dataPoints,另一個不一樣的參數,是一個傳的是ClusterInfo,一個是SupervisorInfo,分別用於nimbus及supervisor

Nimbus.launchServer

storm-2.0.0/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.javaexpress

public void launchServer() throws Exception {
        try {
            BlobStore store = blobStore;
            IStormClusterState state = stormClusterState;
            NimbusInfo hpi = nimbusHostPortInfo;

            LOG.info("Starting Nimbus with conf {}", ConfigUtils.maskPasswords(conf));
            validator.prepare(conf);

            //......

            timer.scheduleRecurring(0, ObjectReader.getInt(conf.get(DaemonConfig.STORM_CLUSTER_METRICS_CONSUMER_PUBLISH_INTERVAL_SECS)),
                                    () -> {
                                        try {
                                            if (isLeader()) {
                                                sendClusterMetricsToExecutors();
                                            }
                                        } catch (Exception e) {
                                            throw new RuntimeException(e);
                                        }
                                    });
            
            timer.scheduleRecurring(5, 5, clusterMetricSet);
        } catch (Exception e) {
            if (Utils.exceptionCauseIsInstanceOf(InterruptedException.class, e)) {
                throw e;
            }

            if (Utils.exceptionCauseIsInstanceOf(InterruptedIOException.class, e)) {
                throw e;
            }
            LOG.error("Error on initialization of nimbus", e);
            Utils.exitProcess(13, "Error on initialization of nimbus");
        }
    }

    private boolean isLeader() throws Exception {
        return leaderElector.isLeader();
    }
  • Nimbus的launchServer方法建立了一個定時任務,若是是leader節點,則調用sendClusterMetricsToExecutors方法發送相關metrics到metrics consumer
  • 定時任務的調度時間間隔爲DaemonConfig.STORM_CLUSTER_METRICS_CONSUMER_PUBLISH_INTERVAL_SECS(storm.cluster.metrics.consumer.publish.interval.secs),在defaults.yaml文件中默認爲60,也能夠本身在storm.yaml中本身指定
  • 除了發送metrics到metrics consumer,它還有一個定時任務,每隔5秒調用一下ClusterSummaryMetricSet這個線程

Nimbus.sendClusterMetricsToExecutors

storm-2.0.0/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.javaapache

private void sendClusterMetricsToExecutors() throws Exception {
        ClusterInfo clusterInfo = mkClusterInfo();
        ClusterSummary clusterSummary = getClusterInfoImpl();
        List<DataPoint> clusterMetrics = extractClusterMetrics(clusterSummary);
        Map<IClusterMetricsConsumer.SupervisorInfo, List<DataPoint>> supervisorMetrics = extractSupervisorMetrics(clusterSummary);
        for (ClusterMetricsConsumerExecutor consumerExecutor : clusterConsumerExceutors) {
            consumerExecutor.handleDataPoints(clusterInfo, clusterMetrics);
            for (Entry<IClusterMetricsConsumer.SupervisorInfo, List<DataPoint>> entry : supervisorMetrics.entrySet()) {
                consumerExecutor.handleDataPoints(entry.getKey(), entry.getValue());
            }
        }
    }
  • nimbus的sendClusterMetricsToExecutors方法經過extractClusterMetrics及extractSupervisorMetrics提取相關metrics,而後調用consumerExecutor.handleDataPoints傳遞數據

ClusterSummaryMetricSet

storm-2.0.0/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.javasegmentfault

private class ClusterSummaryMetricSet implements Runnable {
        private static final int CACHING_WINDOW = 5;
        
        private final ClusterSummaryMetrics clusterSummaryMetrics = new ClusterSummaryMetrics();
        
        private final Function<String, Histogram> registerHistogram = (name) -> {
            //This histogram reflects the data distribution across only one ClusterSummary, i.e.,
            // data distribution across all entities of a type (e.g., data from all nimbus/topologies) at one moment.
            // Hence we use half of the CACHING_WINDOW time to ensure it retains only data from the most recent update
            final Histogram histogram = new Histogram(new SlidingTimeWindowReservoir(CACHING_WINDOW / 2, TimeUnit.SECONDS));
            clusterSummaryMetrics.put(name, histogram);
            return histogram;
        };
        private volatile boolean active = false;

        //NImbus metrics distribution
        private final Histogram nimbusUptime = registerHistogram.apply("nimbuses:uptime-secs");

        //Supervisor metrics distribution
        private final Histogram supervisorsUptime = registerHistogram.apply("supervisors:uptime-secs");
        private final Histogram supervisorsNumWorkers = registerHistogram.apply("supervisors:num-workers");
        private final Histogram supervisorsNumUsedWorkers = registerHistogram.apply("supervisors:num-used-workers");
        private final Histogram supervisorsUsedMem = registerHistogram.apply("supervisors:used-mem");
        private final Histogram supervisorsUsedCpu = registerHistogram.apply("supervisors:used-cpu");
        private final Histogram supervisorsFragmentedMem = registerHistogram.apply("supervisors:fragmented-mem");
        private final Histogram supervisorsFragmentedCpu = registerHistogram.apply("supervisors:fragmented-cpu");

        //Topology metrics distribution
        private final Histogram topologiesNumTasks = registerHistogram.apply("topologies:num-tasks");
        private final Histogram topologiesNumExecutors = registerHistogram.apply("topologies:num-executors");
        private final Histogram topologiesNumWorker = registerHistogram.apply("topologies:num-workers");
        private final Histogram topologiesUptime = registerHistogram.apply("topologies:uptime-secs");
        private final Histogram topologiesReplicationCount = registerHistogram.apply("topologies:replication-count");
        private final Histogram topologiesRequestedMemOnHeap = registerHistogram.apply("topologies:requested-mem-on-heap");
        private final Histogram topologiesRequestedMemOffHeap = registerHistogram.apply("topologies:requested-mem-off-heap");
        private final Histogram topologiesRequestedCpu = registerHistogram.apply("topologies:requested-cpu");
        private final Histogram topologiesAssignedMemOnHeap = registerHistogram.apply("topologies:assigned-mem-on-heap");
        private final Histogram topologiesAssignedMemOffHeap = registerHistogram.apply("topologies:assigned-mem-off-heap");
        private final Histogram topologiesAssignedCpu = registerHistogram.apply("topologies:assigned-cpu");
        private final StormMetricsRegistry metricsRegistry;

        /**
         * Constructor to put all items in ClusterSummary in MetricSet as a metric.
         * All metrics are derived from a cached ClusterSummary object,
         * expired {@link ClusterSummaryMetricSet#CACHING_WINDOW} seconds after first query in a while from reporters.
         * In case of {@link com.codahale.metrics.ScheduledReporter}, CACHING_WINDOW should be set shorter than
         * reporting interval to avoid outdated reporting.
         */
        ClusterSummaryMetricSet(StormMetricsRegistry metricsRegistry) {
            this.metricsRegistry = metricsRegistry;
            //Break the code if out of sync to thrift protocol
            assert ClusterSummary._Fields.values().length == 3
                && ClusterSummary._Fields.findByName("supervisors") == ClusterSummary._Fields.SUPERVISORS
                && ClusterSummary._Fields.findByName("topologies") == ClusterSummary._Fields.TOPOLOGIES
                && ClusterSummary._Fields.findByName("nimbuses") == ClusterSummary._Fields.NIMBUSES;

            final CachedGauge<ClusterSummary> cachedSummary = new CachedGauge<ClusterSummary>(CACHING_WINDOW, TimeUnit.SECONDS) {
                @Override
                protected ClusterSummary loadValue() {
                    try {
                        ClusterSummary newSummary = getClusterInfoImpl();
                        LOG.debug("The new summary is {}", newSummary);
                        /*
                         * Update histograms based on the new summary. Most common implementation of Reporter reports Gauges before
                         * Histograms. Because DerivativeGauge will trigger cache refresh upon reporter's query, histogram will also be
                         * updated before query
                         */
                        updateHistogram(newSummary);
                        return newSummary;
                    } catch (Exception e) {
                        LOG.warn("Get cluster info exception.", e);
                        throw new RuntimeException(e);
                    }
                }
            };

            clusterSummaryMetrics.put("cluster:num-nimbus-leaders", new DerivativeGauge<ClusterSummary, Long>(cachedSummary) {
                @Override
                protected Long transform(ClusterSummary clusterSummary) {
                    return clusterSummary.get_nimbuses().stream()
                            .filter(NimbusSummary::is_isLeader)
                            .count();
                }
            });
            clusterSummaryMetrics.put("cluster:num-nimbuses", new DerivativeGauge<ClusterSummary, Integer>(cachedSummary) {
                @Override
                protected Integer transform(ClusterSummary clusterSummary) {
                    return clusterSummary.get_nimbuses_size();
                }
            });
            clusterSummaryMetrics.put("cluster:num-supervisors", new DerivativeGauge<ClusterSummary, Integer>(cachedSummary) {
                @Override
                protected Integer transform(ClusterSummary clusterSummary) {
                    return clusterSummary.get_supervisors_size();
                }
            });
            clusterSummaryMetrics.put("cluster:num-topologies", new DerivativeGauge<ClusterSummary, Integer>(cachedSummary) {
                @Override
                protected Integer transform(ClusterSummary clusterSummary) {
                    return clusterSummary.get_topologies_size();
                }
            });
            clusterSummaryMetrics.put("cluster:num-total-workers", new DerivativeGauge<ClusterSummary, Integer>(cachedSummary) {
                @Override
                protected Integer transform(ClusterSummary clusterSummary) {
                    return clusterSummary.get_supervisors().stream()
                            .mapToInt(SupervisorSummary::get_num_workers)
                            .sum();
                }
            });
            clusterSummaryMetrics.put("cluster:num-total-used-workers", new DerivativeGauge<ClusterSummary, Integer>(cachedSummary) {
                @Override
                protected Integer transform(ClusterSummary clusterSummary) {
                    return clusterSummary.get_supervisors().stream()
                            .mapToInt(SupervisorSummary::get_num_used_workers)
                            .sum();
                }
            });
            clusterSummaryMetrics.put("cluster:total-fragmented-memory-non-negative", new DerivativeGauge<ClusterSummary, Double>(cachedSummary) {
                @Override
                protected Double transform(ClusterSummary clusterSummary) {
                    return clusterSummary.get_supervisors().stream()
                            //Filtered negative value
                            .mapToDouble(supervisorSummary -> Math.max(supervisorSummary.get_fragmented_mem(), 0))
                            .sum();
                }
            });
            clusterSummaryMetrics.put("cluster:total-fragmented-cpu-non-negative", new DerivativeGauge<ClusterSummary, Double>(cachedSummary) {
                @Override
                protected Double transform(ClusterSummary clusterSummary) {
                    return clusterSummary.get_supervisors().stream()
                            //Filtered negative value
                            .mapToDouble(supervisorSummary -> Math.max(supervisorSummary.get_fragmented_cpu(), 0))
                            .sum();
                }
            });
        }

        private void updateHistogram(ClusterSummary newSummary) {
            for (NimbusSummary nimbusSummary : newSummary.get_nimbuses()) {
                nimbusUptime.update(nimbusSummary.get_uptime_secs());
            }
            for (SupervisorSummary summary : newSummary.get_supervisors()) {
                supervisorsUptime.update(summary.get_uptime_secs());
                supervisorsNumWorkers.update(summary.get_num_workers());
                supervisorsNumUsedWorkers.update(summary.get_num_used_workers());
                supervisorsUsedMem.update(Math.round(summary.get_used_mem()));
                supervisorsUsedCpu.update(Math.round(summary.get_used_cpu()));
                supervisorsFragmentedMem.update(Math.round(summary.get_fragmented_mem()));
                supervisorsFragmentedCpu.update(Math.round(summary.get_fragmented_cpu()));
            }
            for (TopologySummary summary : newSummary.get_topologies()) {
                topologiesNumTasks.update(summary.get_num_tasks());
                topologiesNumExecutors.update(summary.get_num_executors());
                topologiesNumWorker.update(summary.get_num_workers());
                topologiesUptime.update(summary.get_uptime_secs());
                topologiesReplicationCount.update(summary.get_replication_count());
                topologiesRequestedMemOnHeap.update(Math.round(summary.get_requested_memonheap()));
                topologiesRequestedMemOffHeap.update(Math.round(summary.get_requested_memoffheap()));
                topologiesRequestedCpu.update(Math.round(summary.get_requested_cpu()));
                topologiesAssignedMemOnHeap.update(Math.round(summary.get_assigned_memonheap()));
                topologiesAssignedMemOffHeap.update(Math.round(summary.get_assigned_memoffheap()));
                topologiesAssignedCpu.update(Math.round(summary.get_assigned_cpu()));
            }
        }

        void setActive(final boolean active) {
            if (this.active != active) {
                this.active = active;
                if (active) {
                    metricsRegistry.registerAll(clusterSummaryMetrics);
                } else {
                    metricsRegistry.removeAll(clusterSummaryMetrics);
                }
            }
        }

        @Override
        public void run() {
            try {
                setActive(isLeader());
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }
  • 這個線程主要是調用setActive方法,作的工做的話,就是不斷判斷節點狀態變化,若是leader發生變化,本身是leader則註冊clusterSummaryMetrics,若是本身變成不是leader則刪除掉clusterSummaryMetrics
  • clusterSummaryMetrics添加了cluster:num-nimbus-leaders、cluster:num-nimbuses、cluster:num-supervisors、cluster:num-topologies、cluster:num-total-workers、cluster:num-total-used-workers、cluster:total-fragmented-memory-non-negative、cluster:total-fragmented-cpu-non-negative這幾個指標

小結

  • LoggingClusterMetricsConsumer消費的是cluster級別的指標,它消費了指標數據,而後打印到日誌文件,log4j2的配置讀取的是cluster.xml,最後寫入的文件是nimbus.log.metrics、supervisor.log.metrics;而LoggingMetricsConsumer是topology的worker級別的,log4j2的配置讀取的是worker.xml,最後寫入的文件是worker.log.metrics
  • Nimbus在launchServer的時候,會創建一個定時任務,在當前節點是leader的狀況下,定時發送metrics指標到ClusterMetricsConsumerExecutor,而後間接回調LoggingClusterMetricsConsumer的handleDataPoints方法,把數據打印到日誌;LoggingMetricsConsumer採起的是在Executor設置定時任務來發射metricsTickTuple,觸發SpoutExecutor以及BoltExecutor發送指標到topology內置的MetricsConsumerBolt,而後MetricsConsumerBolt回調LoggingMetricsConsumer.handleDataPoints方法來消費數據,把數據打印到日誌
  • handleDataPoints處理兩類info,一類是ClusterInfo,一類是SupervisorInfo;這裏要注意一下定時任務是在當前節點是leader的狀況下才會sendClusterMetricsToExecutors的,正常狀況nimbus與supervisor不在同一個節點上,於是supervisor.log.metrics多是空的
  • LoggingMetricsConsumer的實現依賴舊版的IMetric,而LoggingClusterMetricsConsumer則不依賴IMetric,它是從IStormClusterState中獲取的指標
  • storm 1.2版本引入了基於Dropwizard Metrics的新的指標系統,TopologyContext中返回IMetric的registerMetric在1.2版本已經被標記爲Deprecated,於是LoggingMetricsConsumer後續可能須要改造爲基於metrics2的MetricRegistry來獲取指標

doc

相關文章
相關標籤/搜索