Apache Flume是一個分佈式,可靠且可用的系統,用於高效地收集,彙總和未來自多個不一樣源的大量日 志數據移動到集中式數據存儲區。 Apache Flume不單單日誌數據聚合,由於數據源能夠定製,所以flume被用於傳輸大規模數據,但不限於網絡流量數據,社交媒體生成數據,電子郵件數據以及幾乎任何可能的數據源
flume-ng-channels、flume-ng-sources、flume-ng-sinks:這三個項目主要包括各類插件,flume-ng-channels包括flile、jdbc等;flume-ng-sources包括kafka、jms等插件;flume-ng-sinks包括hdfs、hive等,你們看插件能夠看看這裏是怎樣實現的,本身也能夠寫一些插件知足本身的需求 flume-ng-client:這個裏面包括log4j日誌直接發送給flume,還包括負載均衡的實現 flume-ng-configuration:這個模塊比較重要,主要加載配置文件的參數,你們必需要詳細看一下,看看是怎樣加載配置文件,還有生成sourcerunner、sinkrunner等 flume-ng-core:這個模塊是核心模塊,這裏有ChannelFactory、SinkFactory、SourceFactory等,看源碼必須得看懂這裏,這就是爲何稱爲核心的東西,我debug好多遍纔看懂這裏,這裏有好多借口,你們應該好好理解 flume-ng-node:這個模塊是flume程序的入口,入口類是Application
boolean reload = !commandLine.hasOption("no-reload-conf");//這個參數是否配置文件改變自動加載參數 if (commandLine.hasOption('z') || commandLine.hasOption("zkConnString")) { isZkConfigured = true;//是否使用zookeeper,因爲如今在實驗階段,不推薦適應zookeeper } Application application = null; if (isZkConfigured) {//這段代碼是對zookeeper的配置加載和調用 // get options String zkConnectionStr = commandLine.getOptionValue('z'); String baseZkPath = commandLine.getOptionValue('p'); if (reload) { EventBus eventBus = new EventBus(agentName + "-event-bus"); List<LifecycleAware> components = Lists.newArrayList(); PollingZooKeeperConfigurationProvider zookeeperConfigurationProvider = new PollingZooKeeperConfigurationProvider( agentName, zkConnectionStr, baseZkPath, eventBus); components.add(zookeeperConfigurationProvider); application = new Application(components); eventBus.register(application); } else { StaticZooKeeperConfigurationProvider zookeeperConfigurationProvider = new StaticZooKeeperConfigurationProvider( agentName, zkConnectionStr, baseZkPath); application = new Application(); application.handleConfigurationEvent(zookeeperConfigurationProvider.getConfiguration()); } } else { File configurationFile = new File(commandLine.getOptionValue('f')); /* * The following is to ensure that by default the agent will fail on * startup if the file does not exist. */ if (!configurationFile.exists()) { // If command line invocation, then need to fail fast if (System.getProperty(Constants.SYSPROP_CALLED_FROM_SERVICE) == null) { String path = configurationFile.getPath(); try { path = configurationFile.getCanonicalPath(); } catch (IOException ex) { logger.error("Failed to read canonical path for file: " + path, ex); } throw new ParseException( "The specified configuration file does not exist: " + path); } } List<LifecycleAware> components = Lists.newArrayList(); if (reload) {//用觀察者模式加載配置文件,若是配置文件有變化會自動加載配置文件 EventBus eventBus = new EventBus(agentName + "-event-bus"); PollingPropertiesFileConfigurationProvider configurationProvider = new PollingPropertiesFileConfigurationProvider( agentName, configurationFile, eventBus, 30); components.add(configurationProvider); application = new Application(components); eventBus.register(application); } else {//這種狀況就是配置文件修改必須重啓flume PropertiesFileConfigurationProvider configurationProvider = new PropertiesFileConfigurationProvider(agentName, configurationFile); application = new Application(); application.handleConfigurationEvent(configurationProvider.getConfiguration());//這裏就是加載配置文件 } } application.start();//若是reload爲true,先加載配置文件 final Application appReference = application; Runtime.getRuntime().addShutdownHook(new Thread("agent-shutdown-hook") { @Override public void run() { appReference.stop(); } });
public synchronized void start() { for (LifecycleAware component : components) { supervisor.supervise(component, new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); } } public synchronized void supervise(LifecycleAware lifecycleAware, SupervisorPolicy policy, LifecycleState desiredState) {//監督程序 if (this.monitorService.isShutdown() || this.monitorService.isTerminated() || this.monitorService.isTerminating()) { throw new FlumeException("Supervise called on " + lifecycleAware + " " + "after shutdown has been initiated. " + lifecycleAware + " will not" + " be started"); } Preconditions.checkState(!supervisedProcesses.containsKey(lifecycleAware), "Refusing to supervise " + lifecycleAware + " more than once"); if (logger.isDebugEnabled()) { logger.debug("Supervising service:{} policy:{} desiredState:{}", new Object[] { lifecycleAware, policy, desiredState }); } Supervisoree process = new Supervisoree(); process.status = new Status(); process.policy = policy; process.status.desiredState = desiredState; process.status.error = false; MonitorRunnable monitorRunnable = new MonitorRunnable();//聲明一個線程 monitorRunnable.lifecycleAware = lifecycleAware; monitorRunnable.supervisoree = process; monitorRunnable.monitorService = monitorService; supervisedProcesses.put(lifecycleAware, process); ScheduledFuture<?> future = monitorService.scheduleWithFixedDelay( monitorRunnable, 0, 3, TimeUnit.SECONDS);//每三秒檢查一下程序是否在 monitorFutures.put(lifecycleAware, future); } public static class MonitorRunnable implements Runnable {//定時器會每隔三秒調用一次 public ScheduledExecutorService monitorService; public LifecycleAware lifecycleAware; public Supervisoree supervisoree; @Override public void run() { logger.debug("checking process:{} supervisoree:{}", lifecycleAware, supervisoree); long now = System.currentTimeMillis(); try { if (supervisoree.status.firstSeen == null) { logger.debug("first time seeing {}", lifecycleAware); supervisoree.status.firstSeen = now; } supervisoree.status.lastSeen = now; synchronized (lifecycleAware) { if (supervisoree.status.discard) { // Unsupervise has already been called on this. logger.info("Component has already been stopped {}", lifecycleAware); return; } else if (supervisoree.status.error) { logger.info("Component {} is in error state, and Flume will not" + "attempt to change its state", lifecycleAware); return; } supervisoree.status.lastSeenState = lifecycleAware.getLifecycleState(); if (!lifecycleAware.getLifecycleState().equals( supervisoree.status.desiredState)) { logger.debug("Want to transition {} from {} to {} (failures:{})", new Object[] { lifecycleAware, supervisoree.status.lastSeenState, supervisoree.status.desiredState, supervisoree.status.failures }); switch (supervisoree.status.desiredState) { case START: try { lifecycleAware.start();//全部組件的啓動都是在這在這個位置啓動,首先你要確認lifecycleAware的實現類,若是你是用觀察者模式調用配置文件,第一次先是調用配置文件,你能夠看一下LifecycleAware實現關係 } catch (Throwable e) { logger.error("Unable to start " + lifecycleAware + " - Exception follows.", e); if (e instanceof Error) { // This component can never recover, shut it down. supervisoree.status.desiredState = LifecycleState.STOP; try { lifecycleAware.stop(); logger.warn("Component {} stopped, since it could not be" + "successfully started due to missing dependencies", lifecycleAware); } catch (Throwable e1) { logger.error("Unsuccessful attempt to " + "shutdown component: {} due to missing dependencies." + " Please shutdown the agent" + "or disable this component, or the agent will be" + "in an undefined state.", e1); supervisoree.status.error = true; if (e1 instanceof Error) { throw (Error) e1; } // Set the state to stop, so that the conf poller can // proceed. } } supervisoree.status.failures++; } break; case STOP: try { lifecycleAware.stop(); } catch (Throwable e) { logger.error("Unable to stop " + lifecycleAware + " - Exception follows.", e); if (e instanceof Error) { throw (Error) e; } supervisoree.status.failures++; } break; default: logger.warn("I refuse to acknowledge {} as a desired state", supervisoree.status.desiredState); } if (!supervisoree.policy.isValid(lifecycleAware, supervisoree.status)) { logger.error( "Policy {} of {} has been violated - supervisor should exit!", supervisoree.policy, lifecycleAware); } } } } catch (Throwable t) { logger.error("Unexpected error", t); } logger.debug("Status check complete"); } }
這個函數比較重要,channel、source和sinks的啓動都會用到這個函數,這個函數是監督函數 從圖中咱們能夠看到PollingPropertiesFileConfigurationProvider類實現LifecycleAware,如今就開始加載相關的參數node
public void start() { LOGGER.info("Configuration provider starting"); Preconditions.checkState(file != null, "The parameter file must not be null"); executorService = Executors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder().setNameFormat("conf-file-poller-%d") .build()); FileWatcherRunnable fileWatcherRunnable = new FileWatcherRunnable(file, counterGroup); executorService.scheduleWithFixedDelay(fileWatcherRunnable, 0, interval, TimeUnit.SECONDS); lifecycleState = LifecycleState.START; LOGGER.debug("Configuration provider started"); }
在這裏有啓動一個線程FileWatcherRunnable網絡
public void run() { LOGGER.debug("Checking file:{} for changes", file); counterGroup.incrementAndGet("file.checks"); long lastModified = file.lastModified(); if (lastModified > lastChange) { LOGGER.info("Reloading configuration file:{}", file); counterGroup.incrementAndGet("file.loads"); lastChange = lastModified; try { eventBus.post(getConfiguration());//在這裏加載配置文件和啓動觀察者模式 } catch (Exception e) { LOGGER.error("Failed to load configuration data. Exception follows.", e); } catch (NoClassDefFoundError e) { LOGGER.error("Failed to start agent because dependencies were not " + "found in classpath. Error follows.", e); } catch (Throwable t) { // caught because the caller does not handle or log Throwables LOGGER.error("Unhandled error", t); } } } }
eventBus.post(getConfiguration())這個函數首先加載配置文件和啓動觀察者模式;app
//這是啓動觀察者模式入口 @Subscribe public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) { stopAllComponents(); startAllComponents(conf); }
咱們看一下加載配置文件負載均衡
public MaterializedConfiguration getConfiguration() { MaterializedConfiguration conf = new SimpleMaterializedConfiguration(); FlumeConfiguration fconfig = getFlumeConfiguration(); AgentConfiguration agentConf = fconfig.getConfigurationFor(getAgentName()); if (agentConf != null) { Map<String, ChannelComponent> channelComponentMap = Maps.newHashMap(); Map<String, SourceRunner> sourceRunnerMap = Maps.newHashMap(); Map<String, SinkRunner> sinkRunnerMap = Maps.newHashMap(); try { loadChannels(agentConf, channelComponentMap);//加載channle相關的參數 loadSources(agentConf, channelComponentMap, sourceRunnerMap);//sourcerunner和指定source的相關參數 loadSinks(agentConf, channelComponentMap, sinkRunnerMap);//sinkrunner和指定sink相關參數 Set<String> channelNames = new HashSet<String>(channelComponentMap.keySet()); for (String channelName : channelNames) { ChannelComponent channelComponent = channelComponentMap.get(channelName); if (channelComponent.components.isEmpty()) { LOGGER.warn(String.format("Channel %s has no components connected" + " and has been removed.", channelName)); channelComponentMap.remove(channelName); Map<String, Channel> nameChannelMap = channelCache.get(channelComponent.channel.getClass()); if (nameChannelMap != null) { nameChannelMap.remove(channelName); } } else { LOGGER.info(String.format("Channel %s connected to %s", channelName, channelComponent.components.toString())); conf.addChannel(channelName, channelComponent.channel); } } for (Map.Entry<String, SourceRunner> entry : sourceRunnerMap.entrySet()) { conf.addSourceRunner(entry.getKey(), entry.getValue()); } for (Map.Entry<String, SinkRunner> entry : sinkRunnerMap.entrySet()) { conf.addSinkRunner(entry.getKey(), entry.getValue()); } } catch (InstantiationException ex) { LOGGER.error("Failed to instantiate component", ex); } finally { channelComponentMap.clear(); sourceRunnerMap.clear(); sinkRunnerMap.clear(); } } else { LOGGER.warn("No configuration found for this host:{}", getAgentName()); } return conf; }
for (String chName : channelNames) { ComponentConfiguration comp = compMap.get(chName); if (comp != null) { Channel channel = getOrCreateChannel(channelsNotReused, comp.getComponentName(), comp.getType());//得到channel的相關參數 try { Configurables.configure(channel, comp); channelComponentMap.put(comp.getComponentName(), new ChannelComponent(channel)); LOGGER.info("Created channel " + chName); } catch (Exception e) { String msg = String.format("Channel %s has been removed due to an " + "error during configuration", chName); LOGGER.error(msg, e); } } } if (channelClass.isAnnotationPresent(Disposable.class)) { Channel channel = channelFactory.create(name, type);//工廠模式建立 channel.setName(name); return channel; }
Map<String, Context> sourceContexts = agentConf.getSourceContext(); for (String sourceName : sourceNames) { Context context = sourceContexts.get(sourceName); if (context != null) { Source source = sourceFactory.create(sourceName, context.getString(BasicConfigurationConstants.CONFIG_TYPE)); try { Configurables.configure(source, context);//加載source的參數,在這裏會調用每一個source的參數 List<Channel> sourceChannels = new ArrayList<Channel>(); String[] channelNames = context.getString( BasicConfigurationConstants.CONFIG_CHANNELS).split("\\s+"); for (String chName : channelNames) { ChannelComponent channelComponent = channelComponentMap.get(chName); if (channelComponent != null) { sourceChannels.add(channelComponent.channel); } } if (sourceChannels.isEmpty()) { String msg = String.format("Source %s is not connected to a " + "channel", sourceName); throw new IllegalStateException(msg); } Map<String, String> selectorConfig = context.getSubProperties( BasicConfigurationConstants.CONFIG_SOURCE_CHANNELSELECTOR_PREFIX); ChannelSelector selector = ChannelSelectorFactory.create( sourceChannels, selectorConfig);//建立channel的選擇器 ChannelProcessor channelProcessor = new ChannelProcessor(selector); Configurables.configure(channelProcessor, context); source.setChannelProcessor(channelProcessor); sourceRunnerMap.put(sourceName, SourceRunner.forSource(source));//建立啓動source的sourcerunner for (Channel channel : sourceChannels) { ChannelComponent channelComponent = Preconditions.checkNotNull(channelComponentMap.get(channel.getName()), String.format("Channel %s", channel.getName())); channelComponent.components.add(sourceName);
flume啓動source必須有sourcerunner來啓動,而sourcerunner分爲PollableSourceRunner和EventDrivenSourceRunner,source有多是其中一種實現的。分佈式
Map<String, Context> sinkContexts = agentConf.getSinkContext(); for (String sinkName : sinkNames) { Context context = sinkContexts.get(sinkName); if (context != null) { Sink sink = sinkFactory.create(sinkName, context.getString( BasicConfigurationConstants.CONFIG_TYPE)); try { Configurables.configure(sink, context);//加載sink的參數 ChannelComponent channelComponent = channelComponentMap.get( context.getString(BasicConfigurationConstants.CONFIG_CHANNEL)); if (channelComponent == null) { String msg = String.format("Sink %s is not connected to a " + "channel", sinkName); throw new IllegalStateException(msg); } sink.setChannel(channelComponent.channel); sinks.put(sinkName, sink); channelComponent.components.add(sinkName); } catch (Exception e) { String msg = String.format("Sink %s has been removed due to an " + "error during configuration", sinkName); LOGGER.error(msg, e); } } } loadSinkGroups(agentConf, sinks, sinkRunnerMap);
sinkrunner只有這一種,它會來調用指定sinkide
由於用的是觀察模式,因此會從這裏執行,這是guava中實現 @Subscribe public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) { stopAllComponents(); startAllComponents(conf); } private void startAllComponents(MaterializedConfiguration materializedConfiguration) { logger.info("Starting new configuration:{}", materializedConfiguration); this.materializedConfiguration = materializedConfiguration; for (Entry<String, Channel> entry : materializedConfiguration.getChannels().entrySet()) { try { logger.info("Starting Channel " + entry.getKey()); supervisor.supervise(entry.getValue(), new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); } catch (Exception e) { logger.error("Error while starting {}", entry.getValue(), e); } } /* * Wait for all channels to start. */ for (Channel ch : materializedConfiguration.getChannels().values()) { while (ch.getLifecycleState() != LifecycleState.START && !supervisor.isComponentInErrorState(ch)) { try { logger.info("Waiting for channel: " + ch.getName() + " to start. Sleeping for 500 ms"); Thread.sleep(500); } catch (InterruptedException e) { logger.error("Interrupted while waiting for channel to start.", e); Throwables.propagate(e); } } } for (Entry<String, SinkRunner> entry : materializedConfiguration.getSinkRunners().entrySet()) { try { logger.info("Starting Sink " + entry.getKey()); supervisor.supervise(entry.getValue(), new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); } catch (Exception e) { logger.error("Error while starting {}", entry.getValue(), e); } } for (Entry<String, SourceRunner> entry : materializedConfiguration.getSourceRunners().entrySet()) { try { logger.info("Starting Source " + entry.getKey()); supervisor.supervise(entry.getValue(), new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); } catch (Exception e) { logger.error("Error while starting {}", entry.getValue(), e); } } this.loadMonitoring(); }
從這裏開始啓動全部的channel sink 和source組件函數