今天咱們經過閱讀Flume-NG的源碼來看看Flume的整個啓動流程,廢話很少說,翠花,上源碼!!node
在這裏我貼出Application中跟啓動有關的方法,其餘大家能夠本身看源碼,畢竟源碼解析解的是思路。apache
org.apache.flume.node.Applicationapp
/*主函數*/ public static void main(String[] args) { try { boolean isZkConfigured = false; Options options = new Options(); Option option = new Option("n", "name", true, "the name of this agent"); option.setRequired(true); options.addOption(option); option = new Option("f", "conf-file", true, "specify a config file (required if -z missing)"); option.setRequired(false); options.addOption(option); option = new Option(null, "no-reload-conf", false, "do not reload config file if changed"); options.addOption(option); // Options for Zookeeper option = new Option("z", "zkConnString", true, "specify the ZooKeeper connection to use (required if -f missing)"); option.setRequired(false); options.addOption(option); option = new Option("p", "zkBasePath", true, "specify the base path in ZooKeeper for agent configs"); option.setRequired(false); options.addOption(option); option = new Option("h", "help", false, "display help text"); options.addOption(option); CommandLineParser parser = new GnuParser(); CommandLine commandLine = parser.parse(options, args); if (commandLine.hasOption('h')) { new HelpFormatter().printHelp("flume-ng agent", options, true); return; } String agentName = commandLine.getOptionValue('n'); boolean reload = !commandLine.hasOption("no-reload-conf"); if (commandLine.hasOption('z') || commandLine.hasOption("zkConnString")) { isZkConfigured = true; } Application application = null; if (isZkConfigured) { // get options String zkConnectionStr = commandLine.getOptionValue('z'); String baseZkPath = commandLine.getOptionValue('p'); if (reload) { EventBus eventBus = new EventBus(agentName + "-event-bus"); List<LifecycleAware> components = Lists.newArrayList(); PollingZooKeeperConfigurationProvider zookeeperConfigurationProvider = new PollingZooKeeperConfigurationProvider( agentName, zkConnectionStr, baseZkPath, eventBus); components.add(zookeeperConfigurationProvider); application = new Application(components); eventBus.register(application); } else { StaticZooKeeperConfigurationProvider zookeeperConfigurationProvider = new StaticZooKeeperConfigurationProvider( agentName, zkConnectionStr, baseZkPath); application = new Application(); application.handleConfigurationEvent(zookeeperConfigurationProvider.getConfiguration()); } } else { File configurationFile = new File(commandLine.getOptionValue('f')); /* * 確保當文件不存在時agent會啓動失敗 */ if (!configurationFile.exists()) { // If command line invocation, then need to fail fast if (System.getProperty(Constants.SYSPROP_CALLED_FROM_SERVICE) == null) { String path = configurationFile.getPath(); try { path = configurationFile.getCanonicalPath(); } catch (IOException ex) { logger.error("Failed to read canonical path for file: " + path, ex); } throw new ParseException( "The specified configuration file does not exist: " + path); } } List<LifecycleAware> components = Lists.newArrayList(); if (reload) { EventBus eventBus = new EventBus(agentName + "-event-bus"); PollingPropertiesFileConfigurationProvider configurationProvider = new PollingPropertiesFileConfigurationProvider( agentName, configurationFile, eventBus, 30); components.add(configurationProvider); application = new Application(components); eventBus.register(application); } else { PropertiesFileConfigurationProvider configurationProvider = new PropertiesFileConfigurationProvider(agentName, configurationFile); application = new Application(); application.handleConfigurationEvent(configurationProvider.getConfiguration()); } } application.start(); final Application appReference = application; Runtime.getRuntime().addShutdownHook(new Thread("agent-shutdown-hook") { @Override public void run() { appReference.stop(); } }); } catch (Exception e) { logger.error("A fatal error occurred while running. Exception follows.", e); } } /*啓動方法*/ public synchronized void start() { for (LifecycleAware component : components) { supervisor.supervise(component, new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); } } /*響應EventBus的方法*/ @Subscribe public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) { stopAllComponents(); startAllComponents(conf); } /*中止方法*/ public synchronized void stop() { supervisor.stop(); if (monitorServer != null) { monitorServer.stop(); } } /*中止全部組件*/ private void stopAllComponents() { if (this.materializedConfiguration != null) { logger.info("Shutting down configuration: {}", this.materializedConfiguration); for (Entry<String, SourceRunner> entry : this.materializedConfiguration.getSourceRunners().entrySet()) { try { logger.info("Stopping Source " + entry.getKey()); supervisor.unsupervise(entry.getValue()); } catch (Exception e) { logger.error("Error while stopping {}", entry.getValue(), e); } } for (Entry<String, SinkRunner> entry : this.materializedConfiguration.getSinkRunners().entrySet()) { try { logger.info("Stopping Sink " + entry.getKey()); supervisor.unsupervise(entry.getValue()); } catch (Exception e) { logger.error("Error while stopping {}", entry.getValue(), e); } } for (Entry<String, Channel> entry : this.materializedConfiguration.getChannels().entrySet()) { try { logger.info("Stopping Channel " + entry.getKey()); supervisor.unsupervise(entry.getValue()); } catch (Exception e) { logger.error("Error while stopping {}", entry.getValue(), e); } } } if (monitorServer != null) { monitorServer.stop(); } } /*啓動全部組件*/ private void startAllComponents(MaterializedConfiguration materializedConfiguration) { logger.info("Starting new configuration:{}", materializedConfiguration); this.materializedConfiguration = materializedConfiguration; /*啓動Channel*/ for (Entry<String, Channel> entry : materializedConfiguration.getChannels().entrySet()) { try { logger.info("Starting Channel " + entry.getKey()); supervisor.supervise(entry.getValue(), new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); } catch (Exception e) { logger.error("Error while starting {}", entry.getValue(), e); } } /* * Wait for all channels to start. */ for (Channel ch : materializedConfiguration.getChannels().values()) { while (ch.getLifecycleState() != LifecycleState.START && !supervisor.isComponentInErrorState(ch)) { try { logger.info("Waiting for channel: " + ch.getName() + " to start. Sleeping for 500 ms"); Thread.sleep(500); } catch (InterruptedException e) { logger.error("Interrupted while waiting for channel to start.", e); Throwables.propagate(e); } } } /*啓動SinkRunner*/ for (Entry<String, SinkRunner> entry : materializedConfiguration.getSinkRunners().entrySet()) { try { logger.info("Starting Sink " + entry.getKey()); supervisor.supervise(entry.getValue(), new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); } catch (Exception e) { logger.error("Error while starting {}", entry.getValue(), e); } } /*啓動SourceRunner*/ for (Entry<String, SourceRunner> entry : materializedConfiguration.getSourceRunners().entrySet()) { try { logger.info("Starting Source " + entry.getKey()); supervisor.supervise(entry.getValue(), new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); } catch (Exception e) { logger.error("Error while starting {}", entry.getValue(), e); } } this.loadMonitoring(); }
1)40行:檢查Shell命令,若是含有'h'字符則返回幫助命令ide
2)52行:判斷命令行中是否有ZooKeeper相關信息,若是有則獲取zookeeper相關信息,經過PollingZooKeeperConfigurationProvider或者StaticZooKeeperConfigurationProvider 去調用存儲在zookeeper中的配置信息,若是沒有的話則調用PollingPropertiesFileConfigurationProvider或者PropertiesFileConfigurationProvider去指定路徑讀取配置文件進行加載。函數
3)57行和96行:不管從zookeeper中仍是file中獲取配置文件,都須要判斷命令行中是否有「no-reload-conf」,根據reload = !commandLine.hasOption("no-reload-conf")得到結果,若是reload爲真,那麼程序會每隔30秒檢查一次配置文件,若是檢查到配置文件發生變化則關閉原有組件,從新啓動,這部分細節在XXProvider中體現。post
4)若是是動態輪詢的方式,那麼會將application註冊到EventBus中,而後調用Application類中的start()來啓動components,若是是隻加載一次的話,則使用handleConfigurationEvent()來啓動。handleConfigurationEvent()同時也是EventBus的回調函數,當eventbus執行post時,該方法就會被調用。ui
5)全部Application的start()實際上調用了XXProvider中的start(),來執行真正組件的啓動。this
6)不管是reload仍是一次加載,咱們能夠看到的是都是調用handleConfigurationEvent()來執行,該方法最終是調用stopAllComponents()和startAllComponent來完成的spa
7)stopAllComponent咱們就不說了,重點來看看startAllComponents,它的啓動順序是有講究的,先啓動channels,再啓動SinkRunner,最後啓動SourceRunner。命令行
這裏面有兩個問題:
1)啓動順序的問題:爲何要先啓動channel,由於sink和source的鏈接紐帶就是channel,並且sink和source的啓動都要判斷是否存在鏈接channel,因此channel要先啓動,至於sink比source先啓動,我認爲應該是sink做爲最後消費者,而source做爲數據來源,那麼防止數據堆積在channel中,因此先啓動消費者,再啓動生產者,需求促進供給嘛,瞎扯的。
2)SourceRunner和SinkRunner是什麼玩意,source和sink哪裏去了,這裏就簡單說下,SourceRunner和SinkRunner是用於啓動source和sink的驅動類,咱們在下一篇source、sink和channel的分析中再來細說
該類是啓動組件時主要的類,這裏就以PollingPropertiesFileConfigurationProvider舉例說明,上代碼
public class PollingPropertiesFileConfigurationProvider extends PropertiesFileConfigurationProvider implements LifecycleAware { private static final Logger LOGGER = LoggerFactory.getLogger(PollingPropertiesFileConfigurationProvider.class); private final EventBus eventBus; private final File file; private final int interval; private final CounterGroup counterGroup; private LifecycleState lifecycleState; private ScheduledExecutorService executorService; public PollingPropertiesFileConfigurationProvider(String agentName, File file, EventBus eventBus, int interval) { super(agentName, file); this.eventBus = eventBus; this.file = file; this.interval = interval; counterGroup = new CounterGroup(); lifecycleState = LifecycleState.IDLE; } @Override public void start() { LOGGER.info("Configuration provider starting"); Preconditions.checkState(file != null, "The parameter file must not be null"); executorService = Executors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder().setNameFormat("conf-file-poller-%d") .build()); FileWatcherRunnable fileWatcherRunnable = new FileWatcherRunnable(file, counterGroup); executorService.scheduleWithFixedDelay(fileWatcherRunnable, 0, interval, TimeUnit.SECONDS); lifecycleState = LifecycleState.START; LOGGER.debug("Configuration provider started"); } @Override public void stop() { LOGGER.info("Configuration provider stopping"); executorService.shutdown(); try { while (!executorService.awaitTermination(500, TimeUnit.MILLISECONDS)) { LOGGER.debug("Waiting for file watcher to terminate"); } } catch (InterruptedException e) { LOGGER.debug("Interrupted while waiting for file watcher to terminate"); Thread.currentThread().interrupt(); } lifecycleState = LifecycleState.STOP; LOGGER.debug("Configuration provider stopped"); } @Override public synchronized LifecycleState getLifecycleState() { return lifecycleState; } @Override public String toString() { return "{ file:" + file + " counterGroup:" + counterGroup + " provider:" + getClass().getCanonicalName() + " agentName:" + getAgentName() + " }"; } public class FileWatcherRunnable implements Runnable { private final File file; private final CounterGroup counterGroup; private long lastChange; public FileWatcherRunnable(File file, CounterGroup counterGroup) { super(); this.file = file; this.counterGroup = counterGroup; this.lastChange = 0L; } @Override public void run() { LOGGER.debug("Checking file:{} for changes", file); counterGroup.incrementAndGet("file.checks"); long lastModified = file.lastModified(); if (lastModified > lastChange) { LOGGER.info("Reloading configuration file:{}", file); counterGroup.incrementAndGet("file.loads"); lastChange = lastModified; try { eventBus.post(getConfiguration()); } catch (Exception e) { LOGGER.error("Failed to load configuration data. Exception follows.", e); } catch (NoClassDefFoundError e) { LOGGER.error("Failed to start agent because dependencies were not " + "found in classpath. Error follows.", e); } catch (Throwable t) { // caught because the caller does not handle or log Throwables LOGGER.error("Unhandled error", t); } } } } }
1)咱們在Application類中看看它的start()
public synchronized void start() { for (LifecycleAware component : components) { supervisor.supervise(component, new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); } }
這裏使用supervisor.supervise()來啓動component,那麼我就不帶你們去看LifecycleSupervisor這個類,在這裏就告訴你們,這個方法內部仍是調用了LifecycleAware的start()來進行啓動。既然講到了LifecycleAware接口,怎麼說都得看看代碼
@InterfaceAudience.Public @InterfaceStability.Stable public interface LifecycleAware { public void start(); public void stop(); public LifecycleState getLifecycleState(); }
很是簡單,就是三個方法,start()、stop()和getLifecycleState,可是flume大多數涉及啓動關閉的類都實現了它。
2)咱們能夠看到PollingPropertiesFileConfigurationProvider也是實現了LifecycleAware接口,那麼appliaction的start()實際上就是調用PollingPropertiesFileConfigurationProvider的start()
3)48行:啓動了一個30秒執行一次的線程,也就是30秒檢查一次配置文件。這個線程是一個內部類FileWatcherRunnable(77行)
4)99行:判斷文件是否有改動,若是有則調用eventBus.post(getConfiguration()).那麼訂閱了事件的Application類則會調用handleConfigurationEvent()執行組件的所有關閉和重啓。
5)同時咱們注意到該類中是沒有getConfiguration()的,該方法是它的父類AbstractConfigurationProvider中定義的,用於獲取配置文件信息,這裏就不帶你們看了,有興趣能夠看一下。
啓動流程的分析就到此爲止,這裏顯示不了行號,尷尬