ExecSource從本地收集日誌併發送至fileChannel

常見的日誌收集方式有兩種,一種是經由本地日誌文件作媒介,異步地發送到遠程日誌倉庫,一種是基於RPC方式的同步日誌收集,直接發送到遠程日誌倉庫。這篇講講Flume NG如何從本地日誌文件中收集日誌。 ExecSource是用來執行本地shell命令,並把本地日誌文件中的數據封裝成Event事件流在Flume NG中流動。它的典型配置以下,指定source類型是exec,指定Source下游的Channel是哪一個,指定要執行的shell命令。最經常使用的命令就是tail -F命令,能夠從本地日誌文件中獲取新追加的日誌。java

producer.sources.s1.type = exec
producer.sources.s1.channels = channel
producer.sources.s1.command = tail -F /data/logs/test.log

看一下ExecSource的實現流程shell

  1. ExecSource維護了一個單線程的線程池executor,以及配置的shell命令,計數器等屬性
public void start() {
  logger.info("Exec source starting with command:{}", command);
  executor = Executors.newSingleThreadExecutor();
  runner = new ExecRunnable(shell, command, getChannelProcessor(), sourceCounter,
      restart, restartThrottle, logStderr, bufferCount, batchTimeout, charset);
  // FIXME: Use a callback-like executor / future to signal us upon failure.
  runnerFuture = executor.submit(runner);
  sourceCounter.start();
  super.start();
  logger.debug("Exec source started");
}
  1. ExecRunnable對象實現了Runnable接口,被executor線程池執行。 ExecRunnable實現了獲取本地日誌的主要流程
  2. ExecRunnable維護了一個定時執行的線程池timedFlushService,定時(3s)去檢查Event列表,若是符合批量輸出的要求,就批量flush event 這裏就是執行shell命令,而且將shell命令的輸出結果做爲輸入流讀到reader中,InputStreamReader是字節流通向字符流的橋樑,它使用指定的charset讀取字節並將其解碼爲字符,每次調用read方法都會從底層輸入流讀取一個或多個字節。
public void run() {
  do {
    String exitCode = "unknown";
    BufferedReader reader = null;
    String line = null;
    final List<Event> eventList = new ArrayList<Event>();
    timedFlushService = Executors.newSingleThreadScheduledExecutor(
            new ThreadFactoryBuilder().setNameFormat(
            "timedFlushExecService" +
            Thread.currentThread().getId() + "-%d").build());
    try {
      if(shell != null) {
        String[] commandArgs = formulateShellCommand(shell, command);
        process = Runtime.getRuntime().exec(commandArgs);
      }  else {
        String[] commandArgs = command.split("\\s+");
        process = new ProcessBuilder(commandArgs).start();
      }
      reader = new BufferedReader(
          new InputStreamReader(process.getInputStream(), charset));
          StderrReader stderrReader = new StderrReader(new BufferedReader(
          new InputStreamReader(process.getErrorStream(), charset)), logStderr);
      stderrReader.setName("StderrReader-[" + command + "]");
      stderrReader.setDaemon(true);
      stderrReader.start();

      future = timedFlushService.scheduleWithFixedDelay(new Runnable() {
          @Override
          public void run() {
            try {
              synchronized (eventList) {
                if(!eventList.isEmpty() && timeout()) {
                  flushEventBatch(eventList);
                }
              }
            } catch (Exception e) {
              logger.error("Exception occured when processing event batch", e);
              if(e instanceof InterruptedException) {
                  Thread.currentThread().interrupt();
              }
            }
          }
      },
      batchTimeout, batchTimeout, TimeUnit.MILLISECONDS);
      while ((line = reader.readLine()) != null) {
        synchronized (eventList) {
          sourceCounter.incrementEventReceivedCount();
          eventList.add(EventBuilder.withBody(line.getBytes(charset)));
          if(eventList.size() >= bufferCount || timeout()) {
            flushEventBatch(eventList);
          }
        }
      }
      synchronized (eventList) {
          if(!eventList.isEmpty()) {
            flushEventBatch(eventList);   //此將event發送到channel中
          }
      }
    } catch (Exception e) {
      logger.error("Failed while running command: " + command, e);
      if(e instanceof InterruptedException) {
        Thread.currentThread().interrupt();
      }
    } finally {
      if (reader != null) {
        try {
          reader.close();
        } catch (IOException ex) {
          logger.error("Failed to close reader for exec source", ex);
        }
      }
      exitCode = String.valueOf(kill());
    }
    if(restart) {
      logger.info("Restarting in {}ms, exit code {}", restartThrottle,
          exitCode);
      try {
        Thread.sleep(restartThrottle);
      } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
      }
    } else {
      logger.info("Command [" + command + "] exited with " + exitCode);
    }
  } while(restart);
}
  1. ExecRunnable使用Runtime.getRuntime().exec以及java.lang.ProcessBuilder來使用Java平臺執行操做系統的Shell命令,並把這個Shell命令建立的進程的輸出流重定向到Java平臺的流,從而在Java平臺能夠獲取到本地日誌文件的數據。這裏的Shell命令是tail -F

輸入圖片說明 這裏最主要的是步驟是在Java平臺中使用Shell命令來獲取本地日誌文件的數據,主要的代碼以下網絡

try {
          if(shell != null) {
            String[] commandArgs = formulateShellCommand(shell, command);
            process = Runtime.getRuntime().exec(commandArgs);
          }  else {
            String[] commandArgs = command.split("\\s+");
            process = new ProcessBuilder(commandArgs).start();
          }
          reader = new BufferedReader(
              new InputStreamReader(process.getInputStream(), charset));
          // 當tail -F沒有數據時,reader.readLine會阻塞,直到有數據到達
          while ((line = reader.readLine()) != null) {
            synchronized (eventList) {
              sourceCounter.incrementEventReceivedCount();
              eventList.add(EventBuilder.withBody(line.getBytes(charset)));//event產生
              if(eventList.size() >= bufferCount || timeout()) {
                flushEventBatch(eventList);
              }
            }
          }

將java.lang.Process表明的本地進程的輸出流重定向到Java的輸入流中,當tail -F沒有數據時,Java輸入流的reader.readLine會阻塞,直到有新數據到達。獲取到新數據後,首先是將數據封裝成Event,若是超過了批量限制,就flushEventBatch flushEventBatch會將Event列表交給ChannelProcessor批量處理。異步

public static Event withBody(byte[] body, Map<String, String> headers) {
    Event event = new SimpleEvent();
    if(body == null) {
      body = new byte[0];
    }
    event.setBody(body);
    if (headers != null) {
      event.setHeaders(new HashMap<String, String>(headers));
    }
    return event;
  }
// ExecSource.flushEventBatch
private void flushEventBatch(List<Event> eventList){
      channelProcessor.processEventBatch(eventList);
      sourceCounter.addToEventAcceptedCount(eventList.size());
      eventList.clear();
      lastPushToChannel = systemClock.currentTimeMillis();
    }

ExecSource是異步收集本地日誌的實現,它不保證可靠性,好比Java平臺建立的tail -F進程出問題了,那麼目標日誌文件的收集會收到影響。ExecSource的好處是性能比RPC方式要好,減小了網絡的流量,同時避免了對應用程序的傾入性,能夠無縫地接入。ide

相關文章
相關標籤/搜索