本文主要研究一下storm的IEventLoggerhtml
storm-2.0.0/storm-client/src/jvm/org/apache/storm/metric/IEventLogger.javajava
/** * EventLogger interface for logging the event info to a sink like log file or db for inspecting the events via UI for debugging. */ public interface IEventLogger { void prepare(Map<String, Object> conf, Map<String, Object> arguments, TopologyContext context); /** * This method would be invoked when the {@link EventLoggerBolt} receives a tuple from the spouts or bolts that has event logging * enabled. * * @param e the event */ void log(EventInfo e); void close(); /** * A wrapper for the fields that we would log. */ class EventInfo { private long ts; private String component; private int task; private Object messageId; private List<Object> values; public EventInfo(long ts, String component, int task, Object messageId, List<Object> values) { this.ts = ts; this.component = component; this.task = task; this.messageId = messageId; this.values = values; } public long getTs() { return ts; } public String getComponent() { return component; } public int getTask() { return task; } public Object getMessageId() { return messageId; } public List<Object> getValues() { return values; } /** * Returns a default formatted string with fields separated by "," * * @return a default formatted string with fields separated by "," */ @Override public String toString() { return new Date(ts).toString() + "," + component + "," + String.valueOf(task) + "," + (messageId == null ? "" : messageId.toString()) + "," + values.toString(); } } }
storm-2.0.0/storm-client/src/jvm/org/apache/storm/metric/FileBasedEventLogger.javaapache
public class FileBasedEventLogger implements IEventLogger { private static final Logger LOG = LoggerFactory.getLogger(FileBasedEventLogger.class); private static final int FLUSH_INTERVAL_MILLIS = 1000; private Path eventLogPath; private BufferedWriter eventLogWriter; private ScheduledExecutorService flushScheduler; private volatile boolean dirty = false; private void initLogWriter(Path logFilePath) { try { LOG.info("logFilePath {}", logFilePath); eventLogPath = logFilePath; eventLogWriter = Files.newBufferedWriter(eventLogPath, StandardCharsets.UTF_8, StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.APPEND); } catch (IOException e) { LOG.error("Error setting up FileBasedEventLogger.", e); throw new RuntimeException(e); } } private void setUpFlushTask() { ThreadFactory threadFactory = new ThreadFactoryBuilder() .setNameFormat("event-logger-flush-%d") .setDaemon(true) .build(); flushScheduler = Executors.newSingleThreadScheduledExecutor(threadFactory); Runnable runnable = new Runnable() { @Override public void run() { try { if (dirty) { eventLogWriter.flush(); dirty = false; } } catch (IOException ex) { LOG.error("Error flushing " + eventLogPath, ex); throw new RuntimeException(ex); } } }; flushScheduler.scheduleAtFixedRate(runnable, FLUSH_INTERVAL_MILLIS, FLUSH_INTERVAL_MILLIS, TimeUnit.MILLISECONDS); } @Override public void prepare(Map<String, Object> conf, Map<String, Object> arguments, TopologyContext context) { String stormId = context.getStormId(); int port = context.getThisWorkerPort(); /* * Include the topology name & worker port in the file name so that * multiple event loggers can log independently. */ String workersArtifactRoot = ConfigUtils.workerArtifactsRoot(conf, stormId, port); Path path = Paths.get(workersArtifactRoot, "events.log"); File dir = path.toFile().getParentFile(); if (!dir.exists()) { dir.mkdirs(); } initLogWriter(path); setUpFlushTask(); } @Override public void log(EventInfo event) { try { //TODO: file rotation eventLogWriter.write(buildLogMessage(event)); eventLogWriter.newLine(); dirty = true; } catch (IOException ex) { LOG.error("Error logging event {}", event, ex); throw new RuntimeException(ex); } } protected String buildLogMessage(EventInfo event) { return event.toString(); } @Override public void close() { try { eventLogWriter.close(); } catch (IOException ex) { LOG.error("Error closing event log.", ex); } closeFlushScheduler(); } private void closeFlushScheduler() { if (flushScheduler != null) { flushScheduler.shutdown(); try { if (!flushScheduler.awaitTermination(2, TimeUnit.SECONDS)) { flushScheduler.shutdownNow(); } } catch (InterruptedException ie) { // (Re-)Cancel if current thread also interrupted flushScheduler.shutdownNow(); // Preserve interrupt status Thread.currentThread().interrupt(); } } } }
若是是dirty的話
)storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.javaapp
public static void addEventLogger(Map<String, Object> conf, StormTopology topology) { Integer numExecutors = ObjectReader.getInt(conf.get(Config.TOPOLOGY_EVENTLOGGER_EXECUTORS), ObjectReader.getInt(conf.get(Config.TOPOLOGY_WORKERS))); if (numExecutors == null || numExecutors == 0) { return; } HashMap<String, Object> componentConf = new HashMap<>(); componentConf.put(Config.TOPOLOGY_TASKS, numExecutors); componentConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, ObjectReader.getInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS))); Bolt eventLoggerBolt = Thrift.prepareSerializedBoltDetails( eventLoggerInputs(topology), new EventLoggerBolt(), null, numExecutors, componentConf); for (Object component : allComponents(topology).values()) { ComponentCommon common = getComponentCommon(component); common.put_to_streams(EVENTLOGGER_STREAM_ID, Thrift.outputFields(eventLoggerBoltFields())); } topology.put_to_bolts(EVENTLOGGER_COMPONENT_ID, eventLoggerBolt); } public static List<String> eventLoggerBoltFields() { return Arrays.asList(EventLoggerBolt.FIELD_COMPONENT_ID, EventLoggerBolt.FIELD_MESSAGE_ID, EventLoggerBolt.FIELD_TS, EventLoggerBolt.FIELD_VALUES); } public static Map<GlobalStreamId, Grouping> eventLoggerInputs(StormTopology topology) { Map<GlobalStreamId, Grouping> inputs = new HashMap<GlobalStreamId, Grouping>(); Set<String> allIds = new HashSet<String>(); allIds.addAll(topology.get_bolts().keySet()); allIds.addAll(topology.get_spouts().keySet()); for (String id : allIds) { inputs.put(Utils.getGlobalStreamId(id, EVENTLOGGER_STREAM_ID), Thrift.prepareFieldsGrouping(Arrays.asList("component-id"))); } return inputs; }
storm-2.0.0/storm-client/src/jvm/org/apache/storm/metric/EventLoggerBolt.javajvm
public class EventLoggerBolt implements IBolt { /* The below field declarations are also used in common.clj to define the event logger output fields */ public static final String FIELD_TS = "ts"; public static final String FIELD_VALUES = "values"; public static final String FIELD_COMPONENT_ID = "component-id"; public static final String FIELD_MESSAGE_ID = "message-id"; private static final Logger LOG = LoggerFactory.getLogger(EventLoggerBolt.class); private List<IEventLogger> eventLoggers; @Override public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) { LOG.info("EventLoggerBolt prepare called"); eventLoggers = new ArrayList<>(); List<Map<String, Object>> registerInfo = (List<Map<String, Object>>) topoConf.get(Config.TOPOLOGY_EVENT_LOGGER_REGISTER); if (registerInfo != null && !registerInfo.isEmpty()) { initializeEventLoggers(topoConf, context, registerInfo); } else { initializeDefaultEventLogger(topoConf, context); } } @Override public void execute(Tuple input) { LOG.debug("** EventLoggerBolt got tuple from sourceComponent {}, with values {}", input.getSourceComponent(), input.getValues()); Object msgId = input.getValueByField(FIELD_MESSAGE_ID); EventInfo eventInfo = new EventInfo(input.getLongByField(FIELD_TS), input.getSourceComponent(), input.getSourceTask(), msgId, (List<Object>) input.getValueByField(FIELD_VALUES)); for (IEventLogger eventLogger : eventLoggers) { eventLogger.log(eventInfo); } } @Override public void cleanup() { for (IEventLogger eventLogger : eventLoggers) { eventLogger.close(); } } private void initializeEventLoggers(Map<String, Object> topoConf, TopologyContext context, List<Map<String, Object>> registerInfo) { for (Map<String, Object> info : registerInfo) { String className = (String) info.get(TOPOLOGY_EVENT_LOGGER_CLASS); Map<String, Object> arguments = (Map<String, Object>) info.get(TOPOLOGY_EVENT_LOGGER_ARGUMENTS); IEventLogger eventLogger; try { eventLogger = (IEventLogger) Class.forName(className).newInstance(); } catch (Exception e) { throw new RuntimeException("Could not instantiate a class listed in config under section " + Config.TOPOLOGY_EVENT_LOGGER_REGISTER + " with fully qualified name " + className, e); } eventLogger.prepare(topoConf, arguments, context); eventLoggers.add(eventLogger); } } private void initializeDefaultEventLogger(Map<String, Object> topoConf, TopologyContext context) { FileBasedEventLogger eventLogger = new FileBasedEventLogger(); eventLogger.prepare(topoConf, null, context); eventLoggers.add(eventLogger); } }
conf.setNumEventLoggers
),默認爲0,即禁用。開啓了event logger的話,能夠點擊spout或bolt的debug,而後打開events連接,就能夠在界面上查看debug期間的tuple數據。conf.registerEventLogger
),則StormCommon採用的是該配置來初始化EventLogger,默認的FileBasedEventLogger若是沒有被設置進去的話,則不會被初始化;StormCommon在addEventLogger的時候,對全部的spout及bolt增長一個declareStream,輸出到EVENTLOGGER_STREAM_ID;同時對EventLoggerBolt經過相似fieldsGrouping(componentId,Utils.getGlobalStreamId(id, EVENTLOGGER_STREAM_ID),new Fields("component-id"))將全部的spout及bolt做爲inputs;輸入到EventLoggerBolt的tuple的字段爲ventLoggerBolt.FIELD_COMPONENT_ID,EventLoggerBolt.FIELD_MESSAGE_ID,EventLoggerBolt.FIELD_TS, EventLoggerBolt.FIELD_VALUES