聊聊storm的IEventLogger

本文主要研究一下storm的IEventLoggerhtml

IEventLogger

storm-2.0.0/storm-client/src/jvm/org/apache/storm/metric/IEventLogger.javajava

/**
 * EventLogger interface for logging the event info to a sink like log file or db for inspecting the events via UI for debugging.
 */
public interface IEventLogger {

    void prepare(Map<String, Object> conf, Map<String, Object> arguments, TopologyContext context);

    /**
     * This method would be invoked when the {@link EventLoggerBolt} receives a tuple from the spouts or bolts that has event logging
     * enabled.
     *
     * @param e the event
     */
    void log(EventInfo e);

    void close();

    /**
     * A wrapper for the fields that we would log.
     */
    class EventInfo {
        private long ts;
        private String component;
        private int task;
        private Object messageId;
        private List<Object> values;

        public EventInfo(long ts, String component, int task, Object messageId, List<Object> values) {
            this.ts = ts;
            this.component = component;
            this.task = task;
            this.messageId = messageId;
            this.values = values;
        }

        public long getTs() {
            return ts;
        }

        public String getComponent() {
            return component;
        }

        public int getTask() {
            return task;
        }

        public Object getMessageId() {
            return messageId;
        }

        public List<Object> getValues() {
            return values;
        }

        /**
         * Returns a default formatted string with fields separated by ","
         *
         * @return a default formatted string with fields separated by ","
         */
        @Override
        public String toString() {
            return new Date(ts).toString() + "," + component + "," + String.valueOf(task) + ","
                   + (messageId == null ? "" : messageId.toString()) + "," + values.toString();
        }
    }
}
  • IEventLogger定義了log方法,同時也定義了EventInfo對象

FileBasedEventLogger

storm-2.0.0/storm-client/src/jvm/org/apache/storm/metric/FileBasedEventLogger.javaapache

public class FileBasedEventLogger implements IEventLogger {
    private static final Logger LOG = LoggerFactory.getLogger(FileBasedEventLogger.class);

    private static final int FLUSH_INTERVAL_MILLIS = 1000;

    private Path eventLogPath;
    private BufferedWriter eventLogWriter;
    private ScheduledExecutorService flushScheduler;
    private volatile boolean dirty = false;

    private void initLogWriter(Path logFilePath) {
        try {
            LOG.info("logFilePath {}", logFilePath);
            eventLogPath = logFilePath;
            eventLogWriter = Files.newBufferedWriter(eventLogPath, StandardCharsets.UTF_8, StandardOpenOption.CREATE,
                                                     StandardOpenOption.WRITE, StandardOpenOption.APPEND);
        } catch (IOException e) {
            LOG.error("Error setting up FileBasedEventLogger.", e);
            throw new RuntimeException(e);
        }
    }


    private void setUpFlushTask() {
        ThreadFactory threadFactory = new ThreadFactoryBuilder()
            .setNameFormat("event-logger-flush-%d")
            .setDaemon(true)
            .build();

        flushScheduler = Executors.newSingleThreadScheduledExecutor(threadFactory);
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                try {
                    if (dirty) {
                        eventLogWriter.flush();
                        dirty = false;
                    }
                } catch (IOException ex) {
                    LOG.error("Error flushing " + eventLogPath, ex);
                    throw new RuntimeException(ex);
                }
            }
        };

        flushScheduler.scheduleAtFixedRate(runnable, FLUSH_INTERVAL_MILLIS, FLUSH_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
    }


    @Override
    public void prepare(Map<String, Object> conf, Map<String, Object> arguments, TopologyContext context) {
        String stormId = context.getStormId();
        int port = context.getThisWorkerPort();

        /*
         * Include the topology name & worker port in the file name so that
         * multiple event loggers can log independently.
         */
        String workersArtifactRoot = ConfigUtils.workerArtifactsRoot(conf, stormId, port);

        Path path = Paths.get(workersArtifactRoot, "events.log");
        File dir = path.toFile().getParentFile();
        if (!dir.exists()) {
            dir.mkdirs();
        }
        initLogWriter(path);
        setUpFlushTask();
    }

    @Override
    public void log(EventInfo event) {
        try {
            //TODO: file rotation
            eventLogWriter.write(buildLogMessage(event));
            eventLogWriter.newLine();
            dirty = true;
        } catch (IOException ex) {
            LOG.error("Error logging event {}", event, ex);
            throw new RuntimeException(ex);
        }
    }

    protected String buildLogMessage(EventInfo event) {
        return event.toString();
    }

    @Override
    public void close() {
        try {
            eventLogWriter.close();

        } catch (IOException ex) {
            LOG.error("Error closing event log.", ex);
        }

        closeFlushScheduler();
    }

    private void closeFlushScheduler() {
        if (flushScheduler != null) {
            flushScheduler.shutdown();
            try {
                if (!flushScheduler.awaitTermination(2, TimeUnit.SECONDS)) {
                    flushScheduler.shutdownNow();
                }
            } catch (InterruptedException ie) {
                // (Re-)Cancel if current thread also interrupted
                flushScheduler.shutdownNow();
                // Preserve interrupt status
                Thread.currentThread().interrupt();
            }
        }
    }
}
  • IEventLogger默認的實現爲FileBasedEventLogger,它啓動一個定時任務,每隔FLUSH_INTERVAL_MILLIS時間將數據flush到磁盤(若是是dirty的話)
  • 默認的文件路徑爲workersArtifactRoot目錄下的events.log

StormCommon.addEventLogger

storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.javaapp

public static void addEventLogger(Map<String, Object> conf, StormTopology topology) {
        Integer numExecutors = ObjectReader.getInt(conf.get(Config.TOPOLOGY_EVENTLOGGER_EXECUTORS),
                                                   ObjectReader.getInt(conf.get(Config.TOPOLOGY_WORKERS)));
        if (numExecutors == null || numExecutors == 0) {
            return;
        }
        HashMap<String, Object> componentConf = new HashMap<>();
        componentConf.put(Config.TOPOLOGY_TASKS, numExecutors);
        componentConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, ObjectReader.getInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)));
        Bolt eventLoggerBolt = Thrift.prepareSerializedBoltDetails(
            eventLoggerInputs(topology), new EventLoggerBolt(), null, numExecutors, componentConf);

        for (Object component : allComponents(topology).values()) {
            ComponentCommon common = getComponentCommon(component);
            common.put_to_streams(EVENTLOGGER_STREAM_ID, Thrift.outputFields(eventLoggerBoltFields()));
        }
        topology.put_to_bolts(EVENTLOGGER_COMPONENT_ID, eventLoggerBolt);
    }

    public static List<String> eventLoggerBoltFields() {
        return Arrays.asList(EventLoggerBolt.FIELD_COMPONENT_ID, EventLoggerBolt.FIELD_MESSAGE_ID,
                             EventLoggerBolt.FIELD_TS, EventLoggerBolt.FIELD_VALUES);
    }

    public static Map<GlobalStreamId, Grouping> eventLoggerInputs(StormTopology topology) {
        Map<GlobalStreamId, Grouping> inputs = new HashMap<GlobalStreamId, Grouping>();
        Set<String> allIds = new HashSet<String>();
        allIds.addAll(topology.get_bolts().keySet());
        allIds.addAll(topology.get_spouts().keySet());

        for (String id : allIds) {
            inputs.put(Utils.getGlobalStreamId(id, EVENTLOGGER_STREAM_ID),
                       Thrift.prepareFieldsGrouping(Arrays.asList("component-id")));
        }
        return inputs;
    }
  • 這裏從Config.TOPOLOGY_EVENTLOGGER_EXECUTORS讀取numExecutors,若是爲null則使用Config.TOPOLOGY_WORKERS的值,默認是0,即禁用event logger
  • 這裏還讀取了Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS做爲Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS
  • 這裏建立了EventLoggerBolt,該bolt使用了fieldsGrouping("component-id")以及Utils.getGlobalStreamId(id, EVENTLOGGER_STREAM_ID)將全部的spout及bolt都做爲該bolt的inputs,從而接收全部的tuple,其字段爲ventLoggerBolt.FIELD_COMPONENT_ID,EventLoggerBolt.FIELD_MESSAGE_ID,EventLoggerBolt.FIELD_TS, EventLoggerBolt.FIELD_VALUES;同時也會對每一個spout或bolt添加一個輸出到名爲EVENTLOGGER_STREAM_ID的stream的聲明,這樣使得數據得以流向EventLoggerBolt

EventLoggerBolt

storm-2.0.0/storm-client/src/jvm/org/apache/storm/metric/EventLoggerBolt.javajvm

public class EventLoggerBolt implements IBolt {

    /*
     The below field declarations are also used in common.clj to define the event logger output fields
      */
    public static final String FIELD_TS = "ts";
    public static final String FIELD_VALUES = "values";
    public static final String FIELD_COMPONENT_ID = "component-id";
    public static final String FIELD_MESSAGE_ID = "message-id";
    private static final Logger LOG = LoggerFactory.getLogger(EventLoggerBolt.class);
    private List<IEventLogger> eventLoggers;

    @Override
    public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
        LOG.info("EventLoggerBolt prepare called");

        eventLoggers = new ArrayList<>();
        List<Map<String, Object>> registerInfo = (List<Map<String, Object>>) topoConf.get(Config.TOPOLOGY_EVENT_LOGGER_REGISTER);
        if (registerInfo != null && !registerInfo.isEmpty()) {
            initializeEventLoggers(topoConf, context, registerInfo);
        } else {
            initializeDefaultEventLogger(topoConf, context);
        }
    }

    @Override
    public void execute(Tuple input) {
        LOG.debug("** EventLoggerBolt got tuple from sourceComponent {}, with values {}", input.getSourceComponent(), input.getValues());

        Object msgId = input.getValueByField(FIELD_MESSAGE_ID);
        EventInfo eventInfo = new EventInfo(input.getLongByField(FIELD_TS), input.getSourceComponent(),
                                            input.getSourceTask(), msgId, (List<Object>) input.getValueByField(FIELD_VALUES));

        for (IEventLogger eventLogger : eventLoggers) {
            eventLogger.log(eventInfo);
        }
    }

    @Override
    public void cleanup() {
        for (IEventLogger eventLogger : eventLoggers) {
            eventLogger.close();
        }
    }

    private void initializeEventLoggers(Map<String, Object> topoConf, TopologyContext context, List<Map<String, Object>> registerInfo) {
        for (Map<String, Object> info : registerInfo) {
            String className = (String) info.get(TOPOLOGY_EVENT_LOGGER_CLASS);
            Map<String, Object> arguments = (Map<String, Object>) info.get(TOPOLOGY_EVENT_LOGGER_ARGUMENTS);

            IEventLogger eventLogger;
            try {
                eventLogger = (IEventLogger) Class.forName(className).newInstance();
            } catch (Exception e) {
                throw new RuntimeException("Could not instantiate a class listed in config under section "
                                           + Config.TOPOLOGY_EVENT_LOGGER_REGISTER + " with fully qualified name " + className, e);
            }

            eventLogger.prepare(topoConf, arguments, context);
            eventLoggers.add(eventLogger);
        }
    }

    private void initializeDefaultEventLogger(Map<String, Object> topoConf, TopologyContext context) {
        FileBasedEventLogger eventLogger = new FileBasedEventLogger();
        eventLogger.prepare(topoConf, null, context);
        eventLoggers.add(eventLogger);
    }
}
  • EventLoggerBolt在prepare的時候,從topoConf讀取Config.TOPOLOGY_EVENT_LOGGER_REGISTER信息,若是registerInfo爲空的話則使用默認的FileBasedEventLogger,不然按registerInfo中註冊的eventLoggers來初始化
  • 這裏的execute方法就是挨個遍歷eventLoggers,而後調用log方法

小結

  • 要開啓EventLogger的話,要設置Config.TOPOLOGY_EVENTLOGGER_EXECUTORS的值大於0(conf.setNumEventLoggers),默認爲0,即禁用。開啓了event logger的話,能夠點擊spout或bolt的debug,而後打開events連接,就能夠在界面上查看debug期間的tuple數據。
  • 設置Config.TOPOLOGY_EVENTLOGGER_EXECUTORS大於0了以後,若是沒有本身設置Config.TOPOLOGY_EVENT_LOGGER_REGISTER,則默認啓用的是FileBasedEventLogger,當開啓spout或bolt的debug的時候,會將EventInfo打印到workersArtifactRoot目錄下的events.log
  • 若是自定義了Config.TOPOLOGY_EVENT_LOGGER_REGISTER(conf.registerEventLogger),則StormCommon採用的是該配置來初始化EventLogger,默認的FileBasedEventLogger若是沒有被設置進去的話,則不會被初始化;StormCommon在addEventLogger的時候,對全部的spout及bolt增長一個declareStream,輸出到EVENTLOGGER_STREAM_ID;同時對EventLoggerBolt經過相似fieldsGrouping(componentId,Utils.getGlobalStreamId(id, EVENTLOGGER_STREAM_ID),new Fields("component-id"))將全部的spout及bolt做爲inputs;輸入到EventLoggerBolt的tuple的字段爲ventLoggerBolt.FIELD_COMPONENT_ID,EventLoggerBolt.FIELD_MESSAGE_ID,EventLoggerBolt.FIELD_TS, EventLoggerBolt.FIELD_VALUES

doc

相關文章
相關標籤/搜索