public class GraphiteStormReporter extends ScheduledStormReporter { private final static Logger LOG = LoggerFactory.getLogger(GraphiteStormReporter.class); public static final String GRAPHITE_PREFIXED_WITH = "graphite.prefixed.with"; public static final String GRAPHITE_HOST = "graphite.host"; public static final String GRAPHITE_PORT = "graphite.port"; public static final String GRAPHITE_TRANSPORT = "graphite.transport"; @Override public void prepare(MetricRegistry metricsRegistry, Map stormConf, Map reporterConf) { LOG.debug("Preparing..."); GraphiteReporter.Builder builder = GraphiteReporter.forRegistry(metricsRegistry); TimeUnit durationUnit = MetricsUtils.getMetricsDurationUnit(reporterConf); if (durationUnit != null) { builder.convertDurationsTo(durationUnit); } TimeUnit rateUnit = MetricsUtils.getMetricsRateUnit(reporterConf); if (rateUnit != null) { builder.convertRatesTo(rateUnit); } StormMetricsFilter filter = getMetricsFilter(reporterConf); if(filter != null){ builder.filter(filter); } String prefix = getMetricsPrefixedWith(reporterConf); if (prefix != null) { builder.prefixedWith(prefix); } //defaults to 10 reportingPeriod = getReportPeriod(reporterConf); //defaults to seconds reportingPeriodUnit = getReportPeriodUnit(reporterConf); // Not exposed: // * withClock(Clock) String host = getMetricsTargetHost(reporterConf); Integer port = getMetricsTargetPort(reporterConf); String transport = getMetricsTargetTransport(reporterConf); GraphiteSender sender = null; if (transport.equalsIgnoreCase("udp")) { sender = new GraphiteUDP(host, port); } else { sender = new Graphite(host, port); } reporter = builder.build(sender); } private static String getMetricsPrefixedWith(Map reporterConf) { return Utils.getString(reporterConf.get(GRAPHITE_PREFIXED_WITH), null); } private static String getMetricsTargetHost(Map reporterConf) { return Utils.getString(reporterConf.get(GRAPHITE_HOST), null); } private static Integer getMetricsTargetPort(Map reporterConf) { return Utils.getInt(reporterConf.get(GRAPHITE_PORT), null); } private static String getMetricsTargetTransport(Map reporterConf) { return Utils.getString(reporterConf.get(GRAPHITE_TRANSPORT), "tcp"); } }
public abstract class ScheduledStormReporter implements StormReporter{ private static final Logger LOG = LoggerFactory.getLogger(ScheduledStormReporter.class); protected ScheduledReporter reporter; protected long reportingPeriod; protected TimeUnit reportingPeriodUnit; @Override public void start() { if (reporter != null) { LOG.debug("Starting..."); reporter.start(reportingPeriod, reportingPeriodUnit); } else { throw new IllegalStateException("Attempt to start without preparing " + getClass().getSimpleName()); } } @Override public void stop() { if (reporter != null) { LOG.debug("Stopping..."); reporter.stop(); } else { throw new IllegalStateException("Attempt to stop without preparing " + getClass().getSimpleName()); } } public static TimeUnit getReportPeriodUnit(Map<String, Object> reporterConf) { TimeUnit unit = getTimeUnitForConfig(reporterConf, REPORT_PERIOD_UNITS); return unit == null ? TimeUnit.SECONDS : unit; } private static TimeUnit getTimeUnitForConfig(Map reporterConf, String configName) { String rateUnitString = Utils.getString(reporterConf.get(configName), null); if (rateUnitString != null) { return TimeUnit.valueOf(rateUnitString); } return null; } public static long getReportPeriod(Map reporterConf) { return Utils.getInt(reporterConf.get(REPORT_PERIOD), 10).longValue(); } public static StormMetricsFilter getMetricsFilter(Map reporterConf){ StormMetricsFilter filter = null; Map<String, Object> filterConf = (Map)reporterConf.get("filter"); if(filterConf != null) { String clazz = (String) filterConf.get("class"); if (clazz != null) { filter = Utils.newInstance(clazz); filter.prepare(filterConf); } } return filter; } }