flume1.8源碼閱讀(二)

咱們已經在前面講過加載flume的配置文件,其中有channel sink和source的組件參數,flume1.8源碼閱讀(一),前面知識講加載相關的參數,下面將source的啓動ide

以這種EventDrivenSourceRunner方式啓動source

public class EventDrivenSourceRunner extends SourceRunner {

  private LifecycleState lifecycleState;

  public EventDrivenSourceRunner() {
    lifecycleState = LifecycleState.IDLE;
  }

  @Override
  public void start() {
    Source source = getSource();
    ChannelProcessor cp = source.getChannelProcessor();
    cp.initialize();
    source.start();//這是啓動source
    lifecycleState = LifecycleState.START;
  }

  @Override
  public void stop() {
    Source source = getSource();
    source.stop();
    ChannelProcessor cp = source.getChannelProcessor();
    cp.close();
    lifecycleState = LifecycleState.STOP;
  }

  @Override
  public String toString() {
    return "EventDrivenSourceRunner: { source:" + getSource() + " }";
  }

  @Override
  public LifecycleState getLifecycleState() {
    return lifecycleState;
  }

}

在這裏以SpoolDirectorySource爲例來啓動ui

@Override
  public synchronized void start() {
    logger.info("SpoolDirectorySource source starting with directory: {}",
        spoolDirectory);

    executor = Executors.newSingleThreadScheduledExecutor();

    File directory = new File(spoolDirectory);
    try {
      reader = new ReliableSpoolingFileEventReader.Builder()
          .spoolDirectory(directory)
          .completedSuffix(completedSuffix)
          .includePattern(includePattern)
          .ignorePattern(ignorePattern)
          .trackerDirPath(trackerDirPath)
          .annotateFileName(fileHeader)
          .fileNameHeader(fileHeaderKey)
          .annotateBaseName(basenameHeader)
          .baseNameHeader(basenameHeaderKey)
          .deserializerType(deserializerType)
          .deserializerContext(deserializerContext)
          .deletePolicy(deletePolicy)
          .inputCharset(inputCharset)
          .decodeErrorPolicy(decodeErrorPolicy)
          .consumeOrder(consumeOrder)
          .recursiveDirectorySearch(recursiveDirectorySearch)
          .build();
    } catch (IOException ioe) {
      throw new FlumeException("Error instantiating spooling event parser",
          ioe);
    }

    Runnable runner = new SpoolDirectoryRunnable(reader, sourceCounter);
    executor.scheduleWithFixedDelay(
        runner, 0, pollDelay, TimeUnit.MILLISECONDS);//在這裏啓動線程來讀取數據

    super.start();
    logger.debug("SpoolDirectorySource source started");
    sourceCounter.start();
  }

以這種PollableSourceRunner方式啓動source

public void start() {
    PollableSource source = (PollableSource) getSource();
    ChannelProcessor cp = source.getChannelProcessor();
    cp.initialize();
    source.start();//啓動source組件,加載參數和得到文件

    runner = new PollingRunner();//讀取數據

    runner.source = source;
    runner.counterGroup = counterGroup;
    runner.shouldStop = shouldStop;

    runnerThread = new Thread(runner);
    runnerThread.setName(getClass().getSimpleName() + "-" + 
        source.getClass().getSimpleName() + "-" + source.getName());
    runnerThread.start();

    lifecycleState = LifecycleState.START;
  }

PollableSourceRunner比EventDrivenSourceRunner多了一個本身處理的事件處理方法,EventDrivenSourceRunner自己本身帶一個方法 這裏就是source的啓動,我只是簡單的介紹一下啓動的流程,裏面詳細的內容會後續再說明,今天就寫到這,明天繼續寫。。。。加油,天天寫一點,天天就會有一點收穫,再難也要堅持先去。。。。.net

相關文章
相關標籤/搜索