Flume-ng源碼解析之啓動流程

今天咱們經過閱讀Flume-NG的源碼來看看Flume的整個啓動流程,廢話很少說,翠花,上源碼!!node

1 主類也是啓動類

在這裏我貼出Application中跟啓動有關的方法,其餘大家能夠本身看源碼,畢竟源碼解析解的是思路。apache

org.apache.flume.node.Applicationapp

/*主函數*/
  public static void main(String[] args) {

    try {

      boolean isZkConfigured = false;

      Options options = new Options();

      Option option = new Option("n", "name", true, "the name of this agent");
      option.setRequired(true);
      options.addOption(option);

      option = new Option("f", "conf-file", true,
          "specify a config file (required if -z missing)");
      option.setRequired(false);
      options.addOption(option);

      option = new Option(null, "no-reload-conf", false,
          "do not reload config file if changed");
      options.addOption(option);

      // Options for Zookeeper
      option = new Option("z", "zkConnString", true,
          "specify the ZooKeeper connection to use (required if -f missing)");
      option.setRequired(false);
      options.addOption(option);

      option = new Option("p", "zkBasePath", true,
          "specify the base path in ZooKeeper for agent configs");
      option.setRequired(false);
      options.addOption(option);

      option = new Option("h", "help", false, "display help text");
      options.addOption(option);

      CommandLineParser parser = new GnuParser();
      CommandLine commandLine = parser.parse(options, args);

      if (commandLine.hasOption('h')) {
        new HelpFormatter().printHelp("flume-ng agent", options, true);
        return;
      }

      String agentName = commandLine.getOptionValue('n');
      boolean reload = !commandLine.hasOption("no-reload-conf");

      if (commandLine.hasOption('z') || commandLine.hasOption("zkConnString")) {
        isZkConfigured = true;
      }
      
      Application application = null;
      if (isZkConfigured) {
        // get options
        String zkConnectionStr = commandLine.getOptionValue('z');
        String baseZkPath = commandLine.getOptionValue('p');

        if (reload) {
          EventBus eventBus = new EventBus(agentName + "-event-bus");
          List<LifecycleAware> components = Lists.newArrayList();
          PollingZooKeeperConfigurationProvider zookeeperConfigurationProvider =
              new PollingZooKeeperConfigurationProvider(
                  agentName, zkConnectionStr, baseZkPath, eventBus);
          components.add(zookeeperConfigurationProvider);
          application = new Application(components);
          eventBus.register(application);
        } else {
          StaticZooKeeperConfigurationProvider zookeeperConfigurationProvider =
              new StaticZooKeeperConfigurationProvider(
                  agentName, zkConnectionStr, baseZkPath);
          application = new Application();
          application.handleConfigurationEvent(zookeeperConfigurationProvider.getConfiguration());
        }
      } else {
        File configurationFile = new File(commandLine.getOptionValue('f'));
        /*
         * 確保當文件不存在時agent會啓動失敗
         */
        if (!configurationFile.exists()) {
          // If command line invocation, then need to fail fast
          if (System.getProperty(Constants.SYSPROP_CALLED_FROM_SERVICE) ==
              null) {
            String path = configurationFile.getPath();
            try {
              path = configurationFile.getCanonicalPath();
            } catch (IOException ex) {
              logger.error("Failed to read canonical path for file: " + path,
                  ex);
            }
            throw new ParseException(
                "The specified configuration file does not exist: " + path);
          }
        }
        List<LifecycleAware> components = Lists.newArrayList();

        if (reload) {
          EventBus eventBus = new EventBus(agentName + "-event-bus");
          PollingPropertiesFileConfigurationProvider configurationProvider =
              new PollingPropertiesFileConfigurationProvider(
                  agentName, configurationFile, eventBus, 30);
          components.add(configurationProvider);
          application = new Application(components);
          eventBus.register(application);
        } else {
          PropertiesFileConfigurationProvider configurationProvider =
              new PropertiesFileConfigurationProvider(agentName, configurationFile);
          application = new Application();
          application.handleConfigurationEvent(configurationProvider.getConfiguration());
        }
      }
      application.start();

      final Application appReference = application;
      Runtime.getRuntime().addShutdownHook(new Thread("agent-shutdown-hook") {
        @Override
        public void run() {
          appReference.stop();
        }
      });

    } catch (Exception e) {
      logger.error("A fatal error occurred while running. Exception follows.", e);
    }
  }
  
  /*啓動方法*/
  public synchronized void start() {
    for (LifecycleAware component : components) {
      supervisor.supervise(component,
          new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
    }
  }

  /*響應EventBus的方法*/
  @Subscribe
  public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) {
    stopAllComponents();
    startAllComponents(conf);
  }

  /*中止方法*/
  public synchronized void stop() {
    supervisor.stop();
    if (monitorServer != null) {
      monitorServer.stop();
    }
  }

  /*中止全部組件*/
  private void stopAllComponents() {
    if (this.materializedConfiguration != null) {
      logger.info("Shutting down configuration: {}", this.materializedConfiguration);
      for (Entry<String, SourceRunner> entry :
           this.materializedConfiguration.getSourceRunners().entrySet()) {
        try {
          logger.info("Stopping Source " + entry.getKey());
          supervisor.unsupervise(entry.getValue());
        } catch (Exception e) {
          logger.error("Error while stopping {}", entry.getValue(), e);
        }
      }

      for (Entry<String, SinkRunner> entry :
           this.materializedConfiguration.getSinkRunners().entrySet()) {
        try {
          logger.info("Stopping Sink " + entry.getKey());
          supervisor.unsupervise(entry.getValue());
        } catch (Exception e) {
          logger.error("Error while stopping {}", entry.getValue(), e);
        }
      }

      for (Entry<String, Channel> entry :
           this.materializedConfiguration.getChannels().entrySet()) {
        try {
          logger.info("Stopping Channel " + entry.getKey());
          supervisor.unsupervise(entry.getValue());
        } catch (Exception e) {
          logger.error("Error while stopping {}", entry.getValue(), e);
        }
      }
    }
    if (monitorServer != null) {
      monitorServer.stop();
    }
  }

  /*啓動全部組件*/
  private void startAllComponents(MaterializedConfiguration materializedConfiguration) {
    logger.info("Starting new configuration:{}", materializedConfiguration);

    this.materializedConfiguration = materializedConfiguration;
    /*啓動Channel*/
    for (Entry<String, Channel> entry :
        materializedConfiguration.getChannels().entrySet()) {
      try {
        logger.info("Starting Channel " + entry.getKey());
        supervisor.supervise(entry.getValue(),
            new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
      } catch (Exception e) {
        logger.error("Error while starting {}", entry.getValue(), e);
      }
    }

    /*
     * Wait for all channels to start.
     */
    for (Channel ch : materializedConfiguration.getChannels().values()) {
      while (ch.getLifecycleState() != LifecycleState.START
          && !supervisor.isComponentInErrorState(ch)) {
        try {
          logger.info("Waiting for channel: " + ch.getName() +
              " to start. Sleeping for 500 ms");
          Thread.sleep(500);
        } catch (InterruptedException e) {
          logger.error("Interrupted while waiting for channel to start.", e);
          Throwables.propagate(e);
        }
      }
    }
    
    /*啓動SinkRunner*/
    for (Entry<String, SinkRunner> entry : materializedConfiguration.getSinkRunners().entrySet()) {
      try {
        logger.info("Starting Sink " + entry.getKey());
        supervisor.supervise(entry.getValue(),
            new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
      } catch (Exception e) {
        logger.error("Error while starting {}", entry.getValue(), e);
      }
    }
    /*啓動SourceRunner*/
    for (Entry<String, SourceRunner> entry :
         materializedConfiguration.getSourceRunners().entrySet()) {
      try {
        logger.info("Starting Source " + entry.getKey());
        supervisor.supervise(entry.getValue(),
            new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
      } catch (Exception e) {
        logger.error("Error while starting {}", entry.getValue(), e);
      }
    }

    this.loadMonitoring();
  }

1)40行:檢查Shell命令,若是含有'h'字符則返回幫助命令ide

2)52行:判斷命令行中是否有ZooKeeper相關信息,若是有則獲取zookeeper相關信息,經過PollingZooKeeperConfigurationProvider或者StaticZooKeeperConfigurationProvider 去調用存儲在zookeeper中的配置信息,若是沒有的話則調用PollingPropertiesFileConfigurationProvider或者PropertiesFileConfigurationProvider去指定路徑讀取配置文件進行加載。函數

3)57行和96行:不管從zookeeper中仍是file中獲取配置文件,都須要判斷命令行中是否有「no-reload-conf」,根據reload = !commandLine.hasOption("no-reload-conf")得到結果,若是reload爲真,那麼程序會每隔30秒檢查一次配置文件,若是檢查到配置文件發生變化則關閉原有組件,從新啓動,這部分細節在XXProvider中體現。post

4)若是是動態輪詢的方式,那麼會將application註冊到EventBus中,而後調用Application類中的start()來啓動components,若是是隻加載一次的話,則使用handleConfigurationEvent()來啓動。handleConfigurationEvent()同時也是EventBus的回調函數,當eventbus執行post時,該方法就會被調用。ui

5)全部Application的start()實際上調用了XXProvider中的start(),來執行真正組件的啓動。this

6)不管是reload仍是一次加載,咱們能夠看到的是都是調用handleConfigurationEvent()來執行,該方法最終是調用stopAllComponents()和startAllComponent來完成的spa

7)stopAllComponent咱們就不說了,重點來看看startAllComponents,它的啓動順序是有講究的,先啓動channels,再啓動SinkRunner,最後啓動SourceRunner。命令行

這裏面有兩個問題:
1)啓動順序的問題:爲何要先啓動channel,由於sink和source的鏈接紐帶就是channel,並且sink和source的啓動都要判斷是否存在鏈接channel,因此channel要先啓動,至於sink比source先啓動,我認爲應該是sink做爲最後消費者,而source做爲數據來源,那麼防止數據堆積在channel中,因此先啓動消費者,再啓動生產者,需求促進供給嘛,瞎扯的。
2)SourceRunner和SinkRunner是什麼玩意,source和sink哪裏去了,這裏就簡單說下,SourceRunner和SinkRunner是用於啓動source和sink的驅動類,咱們在下一篇source、sink和channel的分析中再來細說

2 XXProvider

該類是啓動組件時主要的類,這裏就以PollingPropertiesFileConfigurationProvider舉例說明,上代碼

public class PollingPropertiesFileConfigurationProvider
    extends PropertiesFileConfigurationProvider
    implements LifecycleAware {

  private static final Logger LOGGER =
      LoggerFactory.getLogger(PollingPropertiesFileConfigurationProvider.class);

  private final EventBus eventBus;
  private final File file;
  private final int interval;
  private final CounterGroup counterGroup;
  private LifecycleState lifecycleState;

  private ScheduledExecutorService executorService;

  public PollingPropertiesFileConfigurationProvider(String agentName,
      File file, EventBus eventBus, int interval) {
    super(agentName, file);
    this.eventBus = eventBus;
    this.file = file;
    this.interval = interval;
    counterGroup = new CounterGroup();
    lifecycleState = LifecycleState.IDLE;
  }

  @Override
  public void start() {
    LOGGER.info("Configuration provider starting");

    Preconditions.checkState(file != null,
        "The parameter file must not be null");

    executorService = Executors.newSingleThreadScheduledExecutor(
            new ThreadFactoryBuilder().setNameFormat("conf-file-poller-%d")
                .build());

    FileWatcherRunnable fileWatcherRunnable =
        new FileWatcherRunnable(file, counterGroup);

    executorService.scheduleWithFixedDelay(fileWatcherRunnable, 0, interval,
        TimeUnit.SECONDS);

    lifecycleState = LifecycleState.START;

    LOGGER.debug("Configuration provider started");
  }

  @Override
  public void stop() {
    LOGGER.info("Configuration provider stopping");

    executorService.shutdown();
    try {
      while (!executorService.awaitTermination(500, TimeUnit.MILLISECONDS)) {
        LOGGER.debug("Waiting for file watcher to terminate");
      }
    } catch (InterruptedException e) {
      LOGGER.debug("Interrupted while waiting for file watcher to terminate");
      Thread.currentThread().interrupt();
    }
    lifecycleState = LifecycleState.STOP;
    LOGGER.debug("Configuration provider stopped");
  }

  @Override
  public synchronized  LifecycleState getLifecycleState() {
    return lifecycleState;
  }


  @Override
  public String toString() {
    return "{ file:" + file + " counterGroup:" + counterGroup + "  provider:"
        + getClass().getCanonicalName() + " agentName:" + getAgentName() + " }";
  }

  public class FileWatcherRunnable implements Runnable {

    private final File file;
    private final CounterGroup counterGroup;

    private long lastChange;

    public FileWatcherRunnable(File file, CounterGroup counterGroup) {
      super();
      this.file = file;
      this.counterGroup = counterGroup;
      this.lastChange = 0L;
    }

    @Override
    public void run() {
      LOGGER.debug("Checking file:{} for changes", file);

      counterGroup.incrementAndGet("file.checks");

      long lastModified = file.lastModified();

      if (lastModified > lastChange) {
        LOGGER.info("Reloading configuration file:{}", file);

        counterGroup.incrementAndGet("file.loads");

        lastChange = lastModified;

        try {
          eventBus.post(getConfiguration());
        } catch (Exception e) {
          LOGGER.error("Failed to load configuration data. Exception follows.",
              e);
        } catch (NoClassDefFoundError e) {
          LOGGER.error("Failed to start agent because dependencies were not " +
              "found in classpath. Error follows.", e);
        } catch (Throwable t) {
          // caught because the caller does not handle or log Throwables
          LOGGER.error("Unhandled error", t);
        }
      }
    }
  }

}

1)咱們在Application類中看看它的start()

public synchronized void start() {
    for (LifecycleAware component : components) {
      supervisor.supervise(component,
          new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
    }
  }

這裏使用supervisor.supervise()來啓動component,那麼我就不帶你們去看LifecycleSupervisor這個類,在這裏就告訴你們,這個方法內部仍是調用了LifecycleAware的start()來進行啓動。既然講到了LifecycleAware接口,怎麼說都得看看代碼

@InterfaceAudience.Public
@InterfaceStability.Stable
public interface LifecycleAware {

  public void start();
  public void stop();
  public LifecycleState getLifecycleState();

}

很是簡單,就是三個方法,start()、stop()和getLifecycleState,可是flume大多數涉及啓動關閉的類都實現了它。

2)咱們能夠看到PollingPropertiesFileConfigurationProvider也是實現了LifecycleAware接口,那麼appliaction的start()實際上就是調用PollingPropertiesFileConfigurationProvider的start()

3)48行:啓動了一個30秒執行一次的線程,也就是30秒檢查一次配置文件。這個線程是一個內部類FileWatcherRunnable(77行)

4)99行:判斷文件是否有改動,若是有則調用eventBus.post(getConfiguration()).那麼訂閱了事件的Application類則會調用handleConfigurationEvent()執行組件的所有關閉和重啓。

5)同時咱們注意到該類中是沒有getConfiguration()的,該方法是它的父類AbstractConfigurationProvider中定義的,用於獲取配置文件信息,這裏就不帶你們看了,有興趣能夠看一下。

啓動流程的分析就到此爲止,這裏顯示不了行號,尷尬

相關文章
相關標籤/搜索