本文主要研究一下storm的LoggingMetricsConsumerhtml
storm-2.0.0/storm-client/src/jvm/org/apache/storm/metric/LoggingMetricsConsumer.javajava
public class LoggingMetricsConsumer implements IMetricsConsumer { public static final Logger LOG = LoggerFactory.getLogger(LoggingMetricsConsumer.class); static private String padding = " "; @Override public void prepare(Map<String, Object> topoConf, Object registrationArgument, TopologyContext context, IErrorReporter errorReporter) { } @Override public void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> dataPoints) { StringBuilder sb = new StringBuilder(); String header = String.format("%d\t%15s:%-4d\t%3d:%-11s\t", taskInfo.timestamp, taskInfo.srcWorkerHost, taskInfo.srcWorkerPort, taskInfo.srcTaskId, taskInfo.srcComponentId); sb.append(header); for (DataPoint p : dataPoints) { sb.delete(header.length(), sb.length()); sb.append(p.name) .append(padding).delete(header.length() + 23, sb.length()).append("\t") .append(p.value); LOG.info(sb.toString()); } } @Override public void cleanup() { } }
<?xml version="1.0" encoding="UTF-8"?> <!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. --> <configuration monitorInterval="60" shutdownHook="disable"> <properties> <property name="pattern">%d{yyyy-MM-dd HH:mm:ss.SSS} %c{1.} %t [%p] %msg%n</property> <property name="patternNoTime">%msg%n</property> <property name="patternMetrics">%d %-8r %m%n</property> </properties> <appenders> <RollingFile name="A1" fileName="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}" filePattern="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.%i.gz"> <PatternLayout> <pattern>${pattern}</pattern> </PatternLayout> <Policies> <SizeBasedTriggeringPolicy size="100 MB"/> <!-- Or every 100 MB --> </Policies> <DefaultRolloverStrategy max="9"/> </RollingFile> <RollingFile name="STDOUT" fileName="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.out" filePattern="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.out.%i.gz"> <PatternLayout> <pattern>${patternNoTime}</pattern> </PatternLayout> <Policies> <SizeBasedTriggeringPolicy size="100 MB"/> <!-- Or every 100 MB --> </Policies> <DefaultRolloverStrategy max="4"/> </RollingFile> <RollingFile name="STDERR" fileName="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.err" filePattern="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.err.%i.gz"> <PatternLayout> <pattern>${patternNoTime}</pattern> </PatternLayout> <Policies> <SizeBasedTriggeringPolicy size="100 MB"/> <!-- Or every 100 MB --> </Policies> <DefaultRolloverStrategy max="4"/> </RollingFile> <RollingFile name="METRICS" fileName="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.metrics" filePattern="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.metrics.%i.gz"> <PatternLayout> <pattern>${patternMetrics}</pattern> </PatternLayout> <Policies> <SizeBasedTriggeringPolicy size="2 MB"/> </Policies> <DefaultRolloverStrategy max="9"/> </RollingFile> <Syslog name="syslog" format="RFC5424" charset="UTF-8" host="localhost" port="514" protocol="UDP" appName="[${sys:storm.id}:${sys:worker.port}]" mdcId="mdc" includeMDC="true" facility="LOCAL5" enterpriseNumber="18060" newLine="true" exceptionPattern="%rEx{full}" messageId="[${sys:user.name}:${sys:logging.sensitivity}]" id="storm" immediateFail="true" immediateFlush="true"/> </appenders> <loggers> <root level="info"> <!-- We log everything --> <appender-ref ref="A1"/> <appender-ref ref="syslog"/> </root> <Logger name="org.apache.storm.metric.LoggingMetricsConsumer" level="info" additivity="false"> <appender-ref ref="METRICS"/> </Logger> <Logger name="STDERR" level="INFO"> <appender-ref ref="STDERR"/> <appender-ref ref="syslog"/> </Logger> <Logger name="STDOUT" level="INFO"> <appender-ref ref="STDOUT"/> <appender-ref ref="syslog"/> </Logger> </loggers> </configuration>
conf.registerMetricsConsumer(org.apache.storm.metric.LoggingMetricsConsumer.class, 1);
topology.metrics.consumer.register: - class: "org.apache.storm.metric.LoggingMetricsConsumer" max.retain.metric.tuples: 100 parallelism.hint: 1 - class: "org.apache.storm.metric.HttpForwardingMetricsConsumer" parallelism.hint: 1 argument: "http://example.com:8080/metrics/my-topology/"
storm-2.0.0/storm-client/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.javaexpress
public class MetricsConsumerBolt implements IBolt { public static final Logger LOG = LoggerFactory.getLogger(MetricsConsumerBolt.class); private final int _maxRetainMetricTuples; private final Predicate<IMetricsConsumer.DataPoint> _filterPredicate; private final DataPointExpander _expander; private final BlockingQueue<MetricsTask> _taskQueue; IMetricsConsumer _metricsConsumer; String _consumerClassName; OutputCollector _collector; Object _registrationArgument; private Thread _taskExecuteThread; private volatile boolean _running = true; public MetricsConsumerBolt(String consumerClassName, Object registrationArgument, int maxRetainMetricTuples, Predicate<IMetricsConsumer.DataPoint> filterPredicate, DataPointExpander expander) { _consumerClassName = consumerClassName; _registrationArgument = registrationArgument; _maxRetainMetricTuples = maxRetainMetricTuples; _filterPredicate = filterPredicate; _expander = expander; if (_maxRetainMetricTuples > 0) { _taskQueue = new LinkedBlockingDeque<>(_maxRetainMetricTuples); } else { _taskQueue = new LinkedBlockingDeque<>(); } } @Override public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) { try { _metricsConsumer = (IMetricsConsumer) Class.forName(_consumerClassName).newInstance(); } catch (Exception e) { throw new RuntimeException("Could not instantiate a class listed in config under section " + Config.TOPOLOGY_METRICS_CONSUMER_REGISTER + " with fully qualified name " + _consumerClassName, e); } _metricsConsumer.prepare(topoConf, _registrationArgument, context, collector); _collector = collector; _taskExecuteThread = new Thread(new MetricsHandlerRunnable()); _taskExecuteThread.setDaemon(true); _taskExecuteThread.start(); } @Override public void execute(Tuple input) { IMetricsConsumer.TaskInfo taskInfo = (IMetricsConsumer.TaskInfo) input.getValue(0); Collection<IMetricsConsumer.DataPoint> dataPoints = (Collection) input.getValue(1); Collection<IMetricsConsumer.DataPoint> expandedDataPoints = _expander.expandDataPoints(dataPoints); List<IMetricsConsumer.DataPoint> filteredDataPoints = getFilteredDataPoints(expandedDataPoints); MetricsTask metricsTask = new MetricsTask(taskInfo, filteredDataPoints); while (!_taskQueue.offer(metricsTask)) { _taskQueue.poll(); } _collector.ack(input); } private List<IMetricsConsumer.DataPoint> getFilteredDataPoints(Collection<IMetricsConsumer.DataPoint> dataPoints) { return Lists.newArrayList(Iterables.filter(dataPoints, _filterPredicate)); } @Override public void cleanup() { _running = false; _metricsConsumer.cleanup(); _taskExecuteThread.interrupt(); } static class MetricsTask { private IMetricsConsumer.TaskInfo taskInfo; private Collection<IMetricsConsumer.DataPoint> dataPoints; public MetricsTask(IMetricsConsumer.TaskInfo taskInfo, Collection<IMetricsConsumer.DataPoint> dataPoints) { this.taskInfo = taskInfo; this.dataPoints = dataPoints; } public IMetricsConsumer.TaskInfo getTaskInfo() { return taskInfo; } public Collection<IMetricsConsumer.DataPoint> getDataPoints() { return dataPoints; } } class MetricsHandlerRunnable implements Runnable { @Override public void run() { while (_running) { try { MetricsTask task = _taskQueue.take(); _metricsConsumer.handleDataPoints(task.getTaskInfo(), task.getDataPoints()); } catch (InterruptedException e) { break; } catch (Throwable t) { LOG.error("Exception occurred during handle metrics", t); } } } } }
storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.javaapache
protected StormTopology systemTopologyImpl(Map<String, Object> topoConf, StormTopology topology) throws InvalidTopologyException { validateBasic(topology); StormTopology ret = topology.deepCopy(); addAcker(topoConf, ret); if (hasEventLoggers(topoConf)) { addEventLogger(topoConf, ret); } addMetricComponents(topoConf, ret); addSystemComponents(topoConf, ret); addMetricStreams(ret); addSystemStreams(ret); validateStructure(ret); return ret; } public static void addMetricComponents(Map<String, Object> conf, StormTopology topology) { Map<String, Bolt> metricsConsumerBolts = metricsConsumerBoltSpecs(conf, topology); for (Map.Entry<String, Bolt> entry : metricsConsumerBolts.entrySet()) { topology.put_to_bolts(entry.getKey(), entry.getValue()); } } public static void addMetricStreams(StormTopology topology) { for (Object component : allComponents(topology).values()) { ComponentCommon common = getComponentCommon(component); StreamInfo streamInfo = Thrift.outputFields(Arrays.asList("task-info", "data-points")); common.put_to_streams(Constants.METRICS_STREAM_ID, streamInfo); } } public static Map<String, Bolt> metricsConsumerBoltSpecs(Map<String, Object> conf, StormTopology topology) { Map<String, Bolt> metricsConsumerBolts = new HashMap<>(); Set<String> componentIdsEmitMetrics = new HashSet<>(); componentIdsEmitMetrics.addAll(allComponents(topology).keySet()); componentIdsEmitMetrics.add(Constants.SYSTEM_COMPONENT_ID); Map<GlobalStreamId, Grouping> inputs = new HashMap<>(); for (String componentId : componentIdsEmitMetrics) { inputs.put(Utils.getGlobalStreamId(componentId, Constants.METRICS_STREAM_ID), Thrift.prepareShuffleGrouping()); } List<Map<String, Object>> registerInfo = (List<Map<String, Object>>) conf.get(Config.TOPOLOGY_METRICS_CONSUMER_REGISTER); if (registerInfo != null) { Map<String, Integer> classOccurrencesMap = new HashMap<String, Integer>(); for (Map<String, Object> info : registerInfo) { String className = (String) info.get(TOPOLOGY_METRICS_CONSUMER_CLASS); Object argument = info.get(TOPOLOGY_METRICS_CONSUMER_ARGUMENT); Integer maxRetainMetricTuples = ObjectReader.getInt(info.get( TOPOLOGY_METRICS_CONSUMER_MAX_RETAIN_METRIC_TUPLES), 100); Integer phintNum = ObjectReader.getInt(info.get(TOPOLOGY_METRICS_CONSUMER_PARALLELISM_HINT), 1); Map<String, Object> metricsConsumerConf = new HashMap<String, Object>(); metricsConsumerConf.put(Config.TOPOLOGY_TASKS, phintNum); List<String> whitelist = (List<String>) info.get( TOPOLOGY_METRICS_CONSUMER_WHITELIST); List<String> blacklist = (List<String>) info.get( TOPOLOGY_METRICS_CONSUMER_BLACKLIST); FilterByMetricName filterPredicate = new FilterByMetricName(whitelist, blacklist); Boolean expandMapType = ObjectReader.getBoolean(info.get( TOPOLOGY_METRICS_CONSUMER_EXPAND_MAP_TYPE), false); String metricNameSeparator = ObjectReader.getString(info.get( TOPOLOGY_METRICS_CONSUMER_METRIC_NAME_SEPARATOR), "."); DataPointExpander expander = new DataPointExpander(expandMapType, metricNameSeparator); MetricsConsumerBolt boltInstance = new MetricsConsumerBolt(className, argument, maxRetainMetricTuples, filterPredicate, expander); Bolt metricsConsumerBolt = Thrift.prepareSerializedBoltDetails(inputs, boltInstance, null, phintNum, metricsConsumerConf); String id = className; if (classOccurrencesMap.containsKey(className)) { // e.g. [\"a\", \"b\", \"a\"]) => [\"a\", \"b\", \"a#2\"]" int occurrenceNum = classOccurrencesMap.get(className); occurrenceNum++; classOccurrencesMap.put(className, occurrenceNum); id = Constants.METRICS_COMPONENT_ID_PREFIX + className + "#" + occurrenceNum; } else { id = Constants.METRICS_COMPONENT_ID_PREFIX + className; classOccurrencesMap.put(className, 1); } metricsConsumerBolts.put(id, metricsConsumerBolt); } } return metricsConsumerBolts; }
storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/Executor.javaapp
protected final Map<Integer, Map<Integer, Map<String, IMetric>>> intervalToTaskToMetricToRegistry; protected void setupMetrics() { for (final Integer interval : intervalToTaskToMetricToRegistry.keySet()) { StormTimer timerTask = workerData.getUserTimer(); timerTask.scheduleRecurring(interval, interval, () -> { TupleImpl tuple = new TupleImpl(workerTopologyContext, new Values(interval), Constants.SYSTEM_COMPONENT_ID, (int) Constants.SYSTEM_TASK_ID, Constants.METRICS_TICK_STREAM_ID); AddressedTuple metricsTickTuple = new AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple); try { receiveQueue.publish(metricsTickTuple); receiveQueue.flush(); // avoid buffering } catch (InterruptedException e) { LOG.warn("Thread interrupted when publishing metrics. Setting interrupt flag."); Thread.currentThread().interrupt(); return; } } ); } } public Map<Integer, Map<Integer, Map<String, IMetric>>> getIntervalToTaskToMetricToRegistry() { return intervalToTaskToMetricToRegistry; }
storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/Task.javaless
private TopologyContext mkTopologyContext(StormTopology topology) throws IOException { Map<String, Object> conf = workerData.getConf(); return new TopologyContext( topology, workerData.getTopologyConf(), workerData.getTaskToComponent(), workerData.getComponentToSortedTasks(), workerData.getComponentToStreamToFields(), // This is updated by the Worker and the topology has shared access to it workerData.getBlobToLastKnownVersion(), workerData.getTopologyId(), ConfigUtils.supervisorStormResourcesPath( ConfigUtils.supervisorStormDistRoot(conf, workerData.getTopologyId())), ConfigUtils.workerPidsRoot(conf, workerData.getWorkerId()), taskId, workerData.getPort(), workerData.getLocalTaskIds(), workerData.getDefaultSharedResources(), workerData.getUserSharedResources(), executor.getSharedExecutorData(), executor.getIntervalToTaskToMetricToRegistry(), executor.getOpenOrPrepareWasCalled()); }
storm-2.0.0/storm-client/src/jvm/org/apache/storm/task/TopologyContext.javajvm
public class TopologyContext extends WorkerTopologyContext implements IMetricsContext { private Integer _taskId; private Map<String, Object> _taskData = new HashMap<>(); private List<ITaskHook> _hooks = new ArrayList<>(); private Map<String, Object> _executorData; private Map<Integer, Map<Integer, Map<String, IMetric>>> _registeredMetrics; public <T extends IMetric> T registerMetric(String name, T metric, int timeBucketSizeInSecs) { if (_openOrPrepareWasCalled.get()) { throw new RuntimeException("TopologyContext.registerMetric can only be called from within overridden " + "IBolt::prepare() or ISpout::open() method."); } if (metric == null) { throw new IllegalArgumentException("Cannot register a null metric"); } if (timeBucketSizeInSecs <= 0) { throw new IllegalArgumentException("TopologyContext.registerMetric can only be called with timeBucketSizeInSecs " + "greater than or equal to 1 second."); } if (getRegisteredMetricByName(name) != null) { throw new RuntimeException("The same metric name `" + name + "` was registered twice."); } Map<Integer, Map<Integer, Map<String, IMetric>>> m1 = _registeredMetrics; if (!m1.containsKey(timeBucketSizeInSecs)) { m1.put(timeBucketSizeInSecs, new HashMap<Integer, Map<String, IMetric>>()); } Map<Integer, Map<String, IMetric>> m2 = m1.get(timeBucketSizeInSecs); if (!m2.containsKey(_taskId)) { m2.put(_taskId, new HashMap<String, IMetric>()); } Map<String, IMetric> m3 = m2.get(_taskId); if (m3.containsKey(name)) { throw new RuntimeException("The same metric name `" + name + "` was registered twice."); } else { m3.put(name, metric); } return metric; } //...... }
topology.builtin.metrics.bucket.size.secs
)值,在defaults.yaml中默認爲60,即Executor每隔60秒發射一次metricsTickTuple,其streamId爲Constants.METRICS_TICK_STREAM_IDstorm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/Executor.javaide
public void metricsTick(Task task, TupleImpl tuple) { try { Integer interval = tuple.getInteger(0); int taskId = task.getTaskId(); Map<Integer, Map<String, IMetric>> taskToMetricToRegistry = intervalToTaskToMetricToRegistry.get(interval); Map<String, IMetric> nameToRegistry = null; if (taskToMetricToRegistry != null) { nameToRegistry = taskToMetricToRegistry.get(taskId); } if (nameToRegistry != null) { IMetricsConsumer.TaskInfo taskInfo = new IMetricsConsumer.TaskInfo( hostname, workerTopologyContext.getThisWorkerPort(), componentId, taskId, Time.currentTimeSecs(), interval); List<IMetricsConsumer.DataPoint> dataPoints = new ArrayList<>(); for (Map.Entry<String, IMetric> entry : nameToRegistry.entrySet()) { IMetric metric = entry.getValue(); Object value = metric.getValueAndReset(); if (value != null) { IMetricsConsumer.DataPoint dataPoint = new IMetricsConsumer.DataPoint(entry.getKey(), value); dataPoints.add(dataPoint); } } if (!dataPoints.isEmpty()) { task.sendUnanchored(Constants.METRICS_STREAM_ID, new Values(taskInfo, dataPoints), executorTransfer, pendingEmits); executorTransfer.flush(); } } } catch (Exception e) { throw Utils.wrapInRuntime(e); } }
這個使用的是舊版的metrics,非V2版本
)topology.builtin.metrics.bucket.size.secs
)參數,默認爲60,即60秒發射一次metricsTickTuple