經過bin/flume-ng 腳本能夠看到Flume啓動的入口是org.apache.flume.node.Application類,那麼就從Application類開始研究。java
CommandLineParser parser = new GnuParser(); CommandLine commandLine = parser.parse(options, args); File configurationFile = new File(commandLine.getOptionValue('f')); String agentName = commandLine.getOptionValue('n'); boolean reload = !commandLine.hasOption("no-reload-conf");
讀取命令行參數,f指定配置文件,n指定agent name ,no-reload-conf 參數決定是否採用動態加載配置文件,若沒有配置,reload爲true,採用動態加載,不然只加載一次。node
List<LifecycleAware> components = Lists.newArrayList(); Application application; if(reload) { EventBus eventBus = new EventBus(agentName + "-event-bus"); PollingPropertiesFileConfigurationProvider configurationProvider = new PollingPropertiesFileConfigurationProvider(agentName, configurationFile, eventBus, 30); components.add(configurationProvider); application = new Application(components); eventBus.register(application); } else { PropertiesFileConfigurationProvider configurationProvider = new PropertiesFileConfigurationProvider(agentName, configurationFile); application = new Application(); application.handleConfigurationEvent(configurationProvider.getConfiguration()); } application.start();
實現動態加載功能採用了發佈訂閱模式,使用guava中的EventBus實現。apache
關於EventBus能夠參考:http://my.oschina.net/u/2311010/blog/515188 app
接下來繼續看EventBus是如何實現動態加載的。ide
LifecycleAware生命週期組件,有start(),stop(),getLifecycleState()三個方法,source,channel,sink都實現了該接口post
components.add(configurationProvider);添加PollingPropertiesFileConfigurationProvider對象,目前components只添加了一個對象,並做爲參數傳遞給Application的構造方法,接下來看下Application的start()。ui
public synchronized void start() { for(LifecycleAware component : components) { supervisor.supervise(component, new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); } }//此處的component 就是上面的PollingPropertiesFileConfigurationProvider對象
supervise 方法會對 component 建立一個 MonitorRunnable 進程,並放入默認有10個線程的 monitorService 去執行this
Supervisoree process = new Supervisoree(); process.status = new Status(); process.policy = policy; process.status.desiredState = desiredState; process.status.error = false; MonitorRunnable monitorRunnable = new MonitorRunnable(); monitorRunnable.lifecycleAware = lifecycleAware; monitorRunnable.supervisoree = process; monitorRunnable.monitorService = monitorService; supervisedProcesses.put(lifecycleAware, process); ScheduledFuture<?> future = monitorService.scheduleWithFixedDelay( monitorRunnable, 0, 3, TimeUnit.SECONDS); //定時調度MonitorRunnable monitorFutures.put(lifecycleAware, future); //MonitorRunnable返回的結果保存到monitorFutures
接下來看MonitorRunnable的run(),根據supervisoree.status.desiredState的狀態去調用lifecycleAware的start或者stop方法,此處的lifecycleAware就是PollingPropertiesFileConfigurationProvider對象spa
public void start() { LOGGER.info("Configuration provider starting"); Preconditions.checkState(file != null, "The parameter file must not be null"); executorService = Executors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder().setNameFormat("conf-file-poller-%d") .build()); FileWatcherRunnable fileWatcherRunnable = new FileWatcherRunnable(file, counterGroup); executorService.scheduleWithFixedDelay(fileWatcherRunnable, 0, interval, TimeUnit.SECONDS); lifecycleState = LifecycleState.START; LOGGER.debug("Configuration provider started"); }
PollingPropertiesFileConfigurationProvider的start()建立一個單線程,每隔30s執行FileWatcherRunnable的run方法。.net
public void run() { LOGGER.debug("Checking file:{} for changes", file); counterGroup.incrementAndGet("file.checks"); long lastModified = file.lastModified(); if (lastModified > lastChange) { LOGGER.info("Reloading configuration file:{}", file); counterGroup.incrementAndGet("file.loads"); lastChange = lastModified; try { eventBus.post(getConfiguration()); } catch (Exception e) { LOGGER.error("Failed to load configuration data. Exception follows.", e); } catch (NoClassDefFoundError e) { LOGGER.error("Failed to start agent because dependencies were not " + "found in classpath. Error follows.", e); } catch (Throwable t) { // caught because the caller does not handle or log Throwables LOGGER.error("Unhandled error", t); } } }
若是配置文件最後的修改時間晚於文件上次的修改時間,則會調用eventBus.post(getConfiguration());
getConfiguration()會讀取配置文件,把sources,channel,sink相信的配置信息保存在SimpleMaterializedConfiguration對象中,並返回
public MaterializedConfiguration getConfiguration() { MaterializedConfiguration conf = new SimpleMaterializedConfiguration(); FlumeConfiguration fconfig = getFlumeConfiguration(); AgentConfiguration agentConf = fconfig.getConfigurationFor(getAgentName()); if (agentConf != null) { Map<String, ChannelComponent> channelComponentMap = Maps.newHashMap(); Map<String, SourceRunner> sourceRunnerMap = Maps.newHashMap(); Map<String, SinkRunner> sinkRunnerMap = Maps.newHashMap(); try { loadChannels(agentConf, channelComponentMap); loadSources(agentConf, channelComponentMap, sourceRunnerMap); loadSinks(agentConf, channelComponentMap, sinkRunnerMap); Set<String> channelNames = new HashSet<String>(channelComponentMap.keySet()); for(String channelName : channelNames) { ChannelComponent channelComponent = channelComponentMap. get(channelName); if(channelComponent.components.isEmpty()) { LOGGER.warn(String.format("Channel %s has no components connected" + " and has been removed.", channelName)); channelComponentMap.remove(channelName); Map<String, Channel> nameChannelMap = channelCache. get(channelComponent.channel.getClass()); if(nameChannelMap != null) { nameChannelMap.remove(channelName); } } else { LOGGER.info(String.format("Channel %s connected to %s", channelName, channelComponent.components.toString())); conf.addChannel(channelName, channelComponent.channel); } } for(Map.Entry<String, SourceRunner> entry : sourceRunnerMap.entrySet()) { conf.addSourceRunner(entry.getKey(), entry.getValue()); } for(Map.Entry<String, SinkRunner> entry : sinkRunnerMap.entrySet()) { conf.addSinkRunner(entry.getKey(), entry.getValue()); } } catch (InstantiationException ex) { LOGGER.error("Failed to instantiate component", ex); } finally { channelComponentMap.clear(); sourceRunnerMap.clear(); sinkRunnerMap.clear(); } } else { LOGGER.warn("No configuration found for this host:{}", getAgentName()); } return conf; }
eventBus.post(getConfiguration())會調用Application 的handleConfigurationEvent方法
public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) { stopAllComponents(); startAllComponents(conf); }
其中stopAllComponents()用來中止正在運行的組件,順序是:source、sink、channel,這樣能夠避免中止組件致使的數據丟失。
startAllComponents(conf)會根據返回的配置文件內容啓動全部組件,啓動順序正好於中止順序相反
private void startAllComponents(MaterializedConfiguration materializedConfiguration) { logger.info("Starting new configuration:{}", materializedConfiguration); this.materializedConfiguration = materializedConfiguration; for (Entry<String, Channel> entry : materializedConfiguration.getChannels().entrySet()) { try{ logger.info("Starting Channel " + entry.getKey()); supervisor.supervise(entry.getValue(), new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); } catch (Exception e){ logger.error("Error while starting {}", entry.getValue(), e); } } /* * Wait for all channels to start. */ for(Channel ch: materializedConfiguration.getChannels().values()){ while(ch.getLifecycleState() != LifecycleState.START && !supervisor.isComponentInErrorState(ch)){ try { logger.info("Waiting for channel: " + ch.getName() + " to start. Sleeping for 500 ms"); Thread.sleep(500); } catch (InterruptedException e) { logger.error("Interrupted while waiting for channel to start.", e); Throwables.propagate(e); } } } for (Entry<String, SinkRunner> entry : materializedConfiguration.getSinkRunners() .entrySet()) { try{ logger.info("Starting Sink " + entry.getKey()); supervisor.supervise(entry.getValue(), new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); } catch (Exception e) { logger.error("Error while starting {}", entry.getValue(), e); } } for (Entry<String, SourceRunner> entry : materializedConfiguration .getSourceRunners().entrySet()) { try{ logger.info("Starting Source " + entry.getKey()); supervisor.supervise(entry.getValue(), new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); } catch (Exception e) { logger.error("Error while starting {}", entry.getValue(), e); } } this.loadMonitoring(); }
啓動組件會調用supervisor.supervise()方法,supervise方法會調用對應的lifecycleAware的start()方法。
flume的啓動和動態加載就已經分析完畢了