常見的日誌收集方式有兩種,一種是經由本地日誌文件作媒介,異步地發送到遠程日誌倉庫,一種是基於RPC方式的同步日誌收集,直接發送到遠程日誌倉庫。這篇講講Flume NG如何從本地日誌文件中收集日誌。 ExecSource是用來執行本地shell命令,並把本地日誌文件中的數據封裝成Event事件流在Flume NG中流動。它的典型配置以下,指定source類型是exec,指定Source下游的Channel是哪一個,指定要執行的shell命令。最經常使用的命令就是tail -F命令,能夠從本地日誌文件中獲取新追加的日誌。java
producer.sources.s1.type = exec producer.sources.s1.channels = channel producer.sources.s1.command = tail -F /data/logs/test.log
看一下ExecSource的實現流程shell
public void start() { logger.info("Exec source starting with command:{}", command); executor = Executors.newSingleThreadExecutor(); runner = new ExecRunnable(shell, command, getChannelProcessor(), sourceCounter, restart, restartThrottle, logStderr, bufferCount, batchTimeout, charset); // FIXME: Use a callback-like executor / future to signal us upon failure. runnerFuture = executor.submit(runner); sourceCounter.start(); super.start(); logger.debug("Exec source started"); }
public void run() { do { String exitCode = "unknown"; BufferedReader reader = null; String line = null; final List<Event> eventList = new ArrayList<Event>(); timedFlushService = Executors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder().setNameFormat( "timedFlushExecService" + Thread.currentThread().getId() + "-%d").build()); try { if(shell != null) { String[] commandArgs = formulateShellCommand(shell, command); process = Runtime.getRuntime().exec(commandArgs); } else { String[] commandArgs = command.split("\\s+"); process = new ProcessBuilder(commandArgs).start(); } reader = new BufferedReader( new InputStreamReader(process.getInputStream(), charset)); StderrReader stderrReader = new StderrReader(new BufferedReader( new InputStreamReader(process.getErrorStream(), charset)), logStderr); stderrReader.setName("StderrReader-[" + command + "]"); stderrReader.setDaemon(true); stderrReader.start(); future = timedFlushService.scheduleWithFixedDelay(new Runnable() { @Override public void run() { try { synchronized (eventList) { if(!eventList.isEmpty() && timeout()) { flushEventBatch(eventList); } } } catch (Exception e) { logger.error("Exception occured when processing event batch", e); if(e instanceof InterruptedException) { Thread.currentThread().interrupt(); } } } }, batchTimeout, batchTimeout, TimeUnit.MILLISECONDS); while ((line = reader.readLine()) != null) { synchronized (eventList) { sourceCounter.incrementEventReceivedCount(); eventList.add(EventBuilder.withBody(line.getBytes(charset))); if(eventList.size() >= bufferCount || timeout()) { flushEventBatch(eventList); } } } synchronized (eventList) { if(!eventList.isEmpty()) { flushEventBatch(eventList); //此將event發送到channel中 } } } catch (Exception e) { logger.error("Failed while running command: " + command, e); if(e instanceof InterruptedException) { Thread.currentThread().interrupt(); } } finally { if (reader != null) { try { reader.close(); } catch (IOException ex) { logger.error("Failed to close reader for exec source", ex); } } exitCode = String.valueOf(kill()); } if(restart) { logger.info("Restarting in {}ms, exit code {}", restartThrottle, exitCode); try { Thread.sleep(restartThrottle); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } else { logger.info("Command [" + command + "] exited with " + exitCode); } } while(restart); }
這裏最主要的是步驟是在Java平臺中使用Shell命令來獲取本地日誌文件的數據,主要的代碼以下網絡
try { if(shell != null) { String[] commandArgs = formulateShellCommand(shell, command); process = Runtime.getRuntime().exec(commandArgs); } else { String[] commandArgs = command.split("\\s+"); process = new ProcessBuilder(commandArgs).start(); } reader = new BufferedReader( new InputStreamReader(process.getInputStream(), charset)); // 當tail -F沒有數據時,reader.readLine會阻塞,直到有數據到達 while ((line = reader.readLine()) != null) { synchronized (eventList) { sourceCounter.incrementEventReceivedCount(); eventList.add(EventBuilder.withBody(line.getBytes(charset)));//event產生 if(eventList.size() >= bufferCount || timeout()) { flushEventBatch(eventList); } } }
將java.lang.Process表明的本地進程的輸出流重定向到Java的輸入流中,當tail -F沒有數據時,Java輸入流的reader.readLine會阻塞,直到有新數據到達。獲取到新數據後,首先是將數據封裝成Event,若是超過了批量限制,就flushEventBatch flushEventBatch會將Event列表交給ChannelProcessor批量處理。異步
public static Event withBody(byte[] body, Map<String, String> headers) { Event event = new SimpleEvent(); if(body == null) { body = new byte[0]; } event.setBody(body); if (headers != null) { event.setHeaders(new HashMap<String, String>(headers)); } return event; } // ExecSource.flushEventBatch private void flushEventBatch(List<Event> eventList){ channelProcessor.processEventBatch(eventList); sourceCounter.addToEventAcceptedCount(eventList.size()); eventList.clear(); lastPushToChannel = systemClock.currentTimeMillis(); }
ExecSource是異步收集本地日誌的實現,它不保證可靠性,好比Java平臺建立的tail -F進程出問題了,那麼目標日誌文件的收集會收到影響。ExecSource的好處是性能比RPC方式要好,減小了網絡的流量,同時避免了對應用程序的傾入性,能夠無縫地接入。ide