咱們已經在前面講過加載flume的配置文件,其中有channel sink和source的組件參數,flume1.8源碼閱讀(一),前面知識講加載相關的參數,下面將source的啓動ide
public class EventDrivenSourceRunner extends SourceRunner { private LifecycleState lifecycleState; public EventDrivenSourceRunner() { lifecycleState = LifecycleState.IDLE; } @Override public void start() { Source source = getSource(); ChannelProcessor cp = source.getChannelProcessor(); cp.initialize(); source.start();//這是啓動source lifecycleState = LifecycleState.START; } @Override public void stop() { Source source = getSource(); source.stop(); ChannelProcessor cp = source.getChannelProcessor(); cp.close(); lifecycleState = LifecycleState.STOP; } @Override public String toString() { return "EventDrivenSourceRunner: { source:" + getSource() + " }"; } @Override public LifecycleState getLifecycleState() { return lifecycleState; } }
在這裏以SpoolDirectorySource爲例來啓動ui
@Override public synchronized void start() { logger.info("SpoolDirectorySource source starting with directory: {}", spoolDirectory); executor = Executors.newSingleThreadScheduledExecutor(); File directory = new File(spoolDirectory); try { reader = new ReliableSpoolingFileEventReader.Builder() .spoolDirectory(directory) .completedSuffix(completedSuffix) .includePattern(includePattern) .ignorePattern(ignorePattern) .trackerDirPath(trackerDirPath) .annotateFileName(fileHeader) .fileNameHeader(fileHeaderKey) .annotateBaseName(basenameHeader) .baseNameHeader(basenameHeaderKey) .deserializerType(deserializerType) .deserializerContext(deserializerContext) .deletePolicy(deletePolicy) .inputCharset(inputCharset) .decodeErrorPolicy(decodeErrorPolicy) .consumeOrder(consumeOrder) .recursiveDirectorySearch(recursiveDirectorySearch) .build(); } catch (IOException ioe) { throw new FlumeException("Error instantiating spooling event parser", ioe); } Runnable runner = new SpoolDirectoryRunnable(reader, sourceCounter); executor.scheduleWithFixedDelay( runner, 0, pollDelay, TimeUnit.MILLISECONDS);//在這裏啓動線程來讀取數據 super.start(); logger.debug("SpoolDirectorySource source started"); sourceCounter.start(); }
public void start() { PollableSource source = (PollableSource) getSource(); ChannelProcessor cp = source.getChannelProcessor(); cp.initialize(); source.start();//啓動source組件,加載參數和得到文件 runner = new PollingRunner();//讀取數據 runner.source = source; runner.counterGroup = counterGroup; runner.shouldStop = shouldStop; runnerThread = new Thread(runner); runnerThread.setName(getClass().getSimpleName() + "-" + source.getClass().getSimpleName() + "-" + source.getName()); runnerThread.start(); lifecycleState = LifecycleState.START; }
PollableSourceRunner比EventDrivenSourceRunner多了一個本身處理的事件處理方法,EventDrivenSourceRunner自己本身帶一個方法 這裏就是source的啓動,我只是簡單的介紹一下啓動的流程,裏面詳細的內容會後續再說明,今天就寫到這,明天繼續寫。。。。加油,天天寫一點,天天就會有一點收穫,再難也要堅持先去。。。。.net