flume-ng 啓動過程源碼分析

經過bin/flume-ng 腳本能夠看到Flume啓動的入口是org.apache.flume.node.Application類,那麼就從Application類開始研究。java

CommandLineParser parser = new GnuParser();
CommandLine commandLine = parser.parse(options, args);

File configurationFile = new File(commandLine.getOptionValue('f'));
String agentName = commandLine.getOptionValue('n');
boolean reload = !commandLine.hasOption("no-reload-conf");

讀取命令行參數,f指定配置文件,n指定agent name no-reload-conf 參數決定是否採用動態加載配置文件,若沒有配置,reload爲true,採用動態加載,不然只加載一次。node

List<LifecycleAware> components = Lists.newArrayList();
Application application;
if(reload) {
    EventBus eventBus = new EventBus(agentName + "-event-bus");
    PollingPropertiesFileConfigurationProvider configurationProvider =
            new PollingPropertiesFileConfigurationProvider(agentName,
                configurationFile, eventBus, 30);
    components.add(configurationProvider);
    application = new Application(components);
    eventBus.register(application);
} else {
    PropertiesFileConfigurationProvider configurationProvider =
            new PropertiesFileConfigurationProvider(agentName,
                configurationFile);
    application = new Application();
    application.handleConfigurationEvent(configurationProvider.getConfiguration());
}
application.start();

實現動態加載功能採用了發佈訂閱模式,使用guava中的EventBus實現。apache

關於EventBus能夠參考:http://my.oschina.net/u/2311010/blog/515188 app

接下來繼續看EventBus是如何實現動態加載的。ide

LifecycleAware生命週期組件,有start(),stop(),getLifecycleState()三個方法,source,channel,sink都實現了該接口post

components.add(configurationProvider);添加PollingPropertiesFileConfigurationProvider對象,目前components只添加了一個對象,並做爲參數傳遞給Application的構造方法,接下來看下Application的start()。ui

public synchronized void start() {
    for(LifecycleAware component : components) {
      supervisor.supervise(component,
          new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
    }
}//此處的component 就是上面的PollingPropertiesFileConfigurationProvider對象

supervise 方法會對 component 建立一個 MonitorRunnable 進程,並放入默認有10個線程的 monitorService 去執行this

    Supervisoree process = new Supervisoree();
    process.status = new Status();

    process.policy = policy;
    process.status.desiredState = desiredState;
    process.status.error = false;

    MonitorRunnable monitorRunnable = new MonitorRunnable();
    monitorRunnable.lifecycleAware = lifecycleAware;
    monitorRunnable.supervisoree = process;
    monitorRunnable.monitorService = monitorService;

    supervisedProcesses.put(lifecycleAware, process);

    ScheduledFuture<?> future = monitorService.scheduleWithFixedDelay(
        monitorRunnable, 0, 3, TimeUnit.SECONDS);
    //定時調度MonitorRunnable 
    monitorFutures.put(lifecycleAware, future);
    //MonitorRunnable返回的結果保存到monitorFutures

接下來看MonitorRunnable的run(),根據supervisoree.status.desiredState的狀態去調用lifecycleAware的start或者stop方法,此處的lifecycleAware就是PollingPropertiesFileConfigurationProvider對象spa

  public void start() {
    LOGGER.info("Configuration provider starting");

    Preconditions.checkState(file != null,
        "The parameter file must not be null");

    executorService = Executors.newSingleThreadScheduledExecutor(
            new ThreadFactoryBuilder().setNameFormat("conf-file-poller-%d")
                .build());

    FileWatcherRunnable fileWatcherRunnable =
        new FileWatcherRunnable(file, counterGroup);

    executorService.scheduleWithFixedDelay(fileWatcherRunnable, 0, interval,
        TimeUnit.SECONDS);

    lifecycleState = LifecycleState.START;

    LOGGER.debug("Configuration provider started");
  }

PollingPropertiesFileConfigurationProvider的start()建立一個單線程,每隔30s執行FileWatcherRunnable的run方法。.net

    public void run() {
      LOGGER.debug("Checking file:{} for changes", file);

      counterGroup.incrementAndGet("file.checks");

      long lastModified = file.lastModified();

      if (lastModified > lastChange) {
        LOGGER.info("Reloading configuration file:{}", file);

        counterGroup.incrementAndGet("file.loads");

        lastChange = lastModified;

        try {
          eventBus.post(getConfiguration());
        } catch (Exception e) {
          LOGGER.error("Failed to load configuration data. Exception follows.",
              e);
        } catch (NoClassDefFoundError e) {
          LOGGER.error("Failed to start agent because dependencies were not " +
              "found in classpath. Error follows.", e);
        } catch (Throwable t) {
          // caught because the caller does not handle or log Throwables
          LOGGER.error("Unhandled error", t);
        }
      }
    }

若是配置文件最後的修改時間晚於文件上次的修改時間,則會調用eventBus.post(getConfiguration());

getConfiguration()會讀取配置文件,把sources,channel,sink相信的配置信息保存在SimpleMaterializedConfiguration對象中,並返回

  public MaterializedConfiguration getConfiguration() {
    MaterializedConfiguration conf = new SimpleMaterializedConfiguration();
    FlumeConfiguration fconfig = getFlumeConfiguration();
    AgentConfiguration agentConf = fconfig.getConfigurationFor(getAgentName());
    if (agentConf != null) {
      Map<String, ChannelComponent> channelComponentMap = Maps.newHashMap();
      Map<String, SourceRunner> sourceRunnerMap = Maps.newHashMap();
      Map<String, SinkRunner> sinkRunnerMap = Maps.newHashMap();
      try {
        loadChannels(agentConf, channelComponentMap);
        loadSources(agentConf, channelComponentMap, sourceRunnerMap);
        loadSinks(agentConf, channelComponentMap, sinkRunnerMap);
        Set<String> channelNames =
            new HashSet<String>(channelComponentMap.keySet());
        for(String channelName : channelNames) {
          ChannelComponent channelComponent = channelComponentMap.
              get(channelName);
          if(channelComponent.components.isEmpty()) {
            LOGGER.warn(String.format("Channel %s has no components connected" +
                " and has been removed.", channelName));
            channelComponentMap.remove(channelName);
            Map<String, Channel> nameChannelMap = channelCache.
                get(channelComponent.channel.getClass());
            if(nameChannelMap != null) {
              nameChannelMap.remove(channelName);
            }
          } else {
            LOGGER.info(String.format("Channel %s connected to %s",
                channelName, channelComponent.components.toString()));
            conf.addChannel(channelName, channelComponent.channel);
          }
        }
        for(Map.Entry<String, SourceRunner> entry : sourceRunnerMap.entrySet()) {
          conf.addSourceRunner(entry.getKey(), entry.getValue());
        }
        for(Map.Entry<String, SinkRunner> entry : sinkRunnerMap.entrySet()) {
          conf.addSinkRunner(entry.getKey(), entry.getValue());
        }
      } catch (InstantiationException ex) {
        LOGGER.error("Failed to instantiate component", ex);
      } finally {
        channelComponentMap.clear();
        sourceRunnerMap.clear();
        sinkRunnerMap.clear();
      }
    } else {
      LOGGER.warn("No configuration found for this host:{}", getAgentName());
    }
    return conf;
  }

eventBus.post(getConfiguration())會調用Application 的handleConfigurationEvent方法

public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) {
    stopAllComponents();
    startAllComponents(conf);
}

其中stopAllComponents()用來中止正在運行的組件,順序是:source、sink、channel,這樣能夠避免中止組件致使的數據丟失。

startAllComponents(conf)會根據返回的配置文件內容啓動全部組件,啓動順序正好於中止順序相反

  private void startAllComponents(MaterializedConfiguration materializedConfiguration) {
    logger.info("Starting new configuration:{}", materializedConfiguration);

    this.materializedConfiguration = materializedConfiguration;

    for (Entry<String, Channel> entry :
      materializedConfiguration.getChannels().entrySet()) {
      try{
        logger.info("Starting Channel " + entry.getKey());
        supervisor.supervise(entry.getValue(),
            new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
      } catch (Exception e){
        logger.error("Error while starting {}", entry.getValue(), e);
      }
    }

    /*
     * Wait for all channels to start.
     */
    for(Channel ch: materializedConfiguration.getChannels().values()){
      while(ch.getLifecycleState() != LifecycleState.START
          && !supervisor.isComponentInErrorState(ch)){
        try {
          logger.info("Waiting for channel: " + ch.getName() +
              " to start. Sleeping for 500 ms");
          Thread.sleep(500);
        } catch (InterruptedException e) {
          logger.error("Interrupted while waiting for channel to start.", e);
          Throwables.propagate(e);
        }
      }
    }

    for (Entry<String, SinkRunner> entry : materializedConfiguration.getSinkRunners()
        .entrySet()) {
      try{
        logger.info("Starting Sink " + entry.getKey());
        supervisor.supervise(entry.getValue(),
          new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
      } catch (Exception e) {
        logger.error("Error while starting {}", entry.getValue(), e);
      }
    }

    for (Entry<String, SourceRunner> entry : materializedConfiguration
        .getSourceRunners().entrySet()) {
      try{
        logger.info("Starting Source " + entry.getKey());
        supervisor.supervise(entry.getValue(),
          new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
      } catch (Exception e) {
        logger.error("Error while starting {}", entry.getValue(), e);
      }
    }

    this.loadMonitoring();
  }

啓動組件會調用supervisor.supervise()方法,supervise方法會調用對應的lifecycleAware的start()方法。

flume的啓動和動態加載就已經分析完畢了

相關文章
相關標籤/搜索