中國民生銀行大數據團隊的Flume實踐

做者 | 文喬
編輯 | Vincent
AI前線出品| ID:ai-front

AI 前線導語:「中國民生銀行服務器的操做系統種類衆多,除 Linux 外,部分生產系統仍採用 AIX 和 HP-UNIX 操做系統,因爲在 AIX 和 HP-UNIX 沒法使用 Logstash 做爲日誌採集端,在大數據基礎平臺產品團隊通過一系列選型後,採用 Flume 做爲 AIX 和 HP-UNIX 操做系統上日誌採集端。本文做者中國民生銀行總行信息技術部大數據基礎產品平臺組,他將與咱們分享民生銀行在 Flume 上的實踐」。java


一. Flume 簡介node

Apache Flume 是 Cloudera 公司開源的一款分佈式、可靠、可用的服務,可用於從多種不一樣數據源收集、彙集、移動大量日誌數據到集中數據存儲中;它經過事務機制提供了可靠的消息傳輸支持,並自帶負載均衡機制來支撐水平擴展。尤爲近幾年隨着 Flume 的不斷被完善以及升級版本的逐一推出,特別是 flume-ng 的推出,以及 Flume 內部的各類組件不斷豐富,用戶在開發的過程當中使用的便利性獲得很大的改善,現已成爲 Apache 頂級社區項目之一。git


二. 中國民生銀行 Flume 實踐github

中國民生銀行服務器的操做系統種類衆多,除 Linux 外,部分生產系統仍採用 AIX 和 HP-UNIX 操做系統,因爲在 AIX 和 HP-UNIX 沒法使用 Logstash 做爲日誌採集端,在大數據基礎平臺產品團隊通過一系列選型後,採用 Flume 做爲 AIX 和 HP-UNIX 操做系統上日誌採集端。正則表達式

2016 年咱們在測試環境進行試驗,使用的版本是 Apache Flume 1.6,在使用 Taildir Source 組件和核心組件的過程當中,發現其沒法徹底知足咱們的需求,例如:apache

  1. 若 filegroup 路徑中包含正則表達式,則沒法獲取文件的完整路徑,在日誌入到 Elasticsearch 後沒法定位日誌的路徑;
  2. Taildir Source 不支持將多行合併爲一個 event,只能一行一行讀取文件;
  3. filegroup 配置中不支持目錄包含正則表達式,不便配置包含多個日期而且日期自動增加的目錄,例如 /app/logs/yyyymmdd/appLog.log;
  4. 在使用 Host Interceptor 時,發現只能保留主機名或者是 IP,兩者沒法同時保留。

在研究 Flume 源碼以後,咱們在源碼上擴展開發。截至目前,咱們爲開源社區貢獻了 4 個 Patch,其中 FLUME-2955 已被社區 Merge 並在 1.7 版本中發佈,另外咱們在 Github 上開放了一個版本,將 FLUME-2960/2961/3187 三個 Patch 合併到 Flume 1.7 上,歡迎你們下載使用,緩存

Github 地址:bash

https://github.com/tinawenqiao/flume,分支名 trunk-cmbc。
服務器

接下來本文將對每一個 Issue 進行詳細介紹:微信


三. FLUME-2955

3.1 問題和需求

爲了採集後綴爲 log 的日誌文件,filegroups 設置以下:

agent.sources.s1.type = org.apache.flume.source.taildir.TaildirSource
agent.sources.s1.filegroups = f1 
agent.sources.s1.filegroups.f1 = /app/logs/.*.log
複製代碼

注:安卓手機端讀者查看代碼時可左右滑動閱讀完整代碼

若 /app/logs 目錄中存在 a.log、b.log、c.log 三個文件,在 Flume 1.6 版本中,雖然能夠經過 headers.\.\在 event 的 header 裏放入自定義的 key 和 value,可是因爲正則表達式匹配上了目錄中多個文件,因此沒法經過該方法設置,這樣致使日誌數據入到 Elasticsearch 後,用戶從 Kibana 從查詢時沒法定位到數據所在的日誌文件路徑。

3.2 解決辦法

增長 fileHeader 和 fileHeaderKey 兩個參數,兩個參數含義分別是:

修改類 ReliableTaildirEventReader 中 readEvents() 方法,根據配置文件的值,選擇是否在 event 的 header 里加入文件的路徑,主要代碼以下:

Map<String, String> headers = currentFile.getHeaders();
if (annotateFileName || (headers != null && !headers.isEmpty())) {
  for (Event event : events) {
    if (headers != null && !headers.isEmpty()) {
      event.getHeaders().putAll(headers);
    }
    if (annotateFileName) {
      event.getHeaders().put(fileNameHeader, currentFile.getPath());
    }
  }
}
複製代碼

3.3 相關配置示例

agent.sources.s1.type = org.apache.flume.source.taildir.TaildirSource
agent.sources.s1.filegroups = f1 
agent.sources.s1.filegroups.f1 = /app/logs/.*.log
agent.sources.s1.fileHeader = true
agent.sources.s1.fileHeaderKey = path
複製代碼


四. FLUME-2960

4.1 問題和需求

在實際應用寫日誌時,不少系統是根據日期生成日期目錄,每一個日期目錄中包含一個或多個日誌文件,所以存在:

/app/logs/20170101/、/app/logs/20170102/、/app/logs/20170103/

等多個目錄,且 /app/logs/ 目錄下天天會自動生成新的日期目錄,可是根據 Taildir Source 中 filegroups.\的描述,只支持文件名帶正則,所以 1.6 版本的 Taildir Source 沒法知足該需求。

4.2 解決辦法

增長 filegroups.\.parentDir 和 filegroups.\.filePattern 兩個參數,兩個參數含義分別是:

修改類 TaildirMatcher 中匹配文件的方法,相關代碼以下:

private List<File> getMatchingFilesNoCache() {
  final List<File> result = Lists.newArrayList();
  try {
    Set options = EnumSet.of(FOLLOW_LINKS);
    Files.walkFileTree(Paths.get(parentDir.toString()), options, Integer.MAX_VALUE,
            new SimpleFileVisitor<Path>() {
        @Override
        public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {
          if (fileMatcher.matches(file.toAbsolutePath())) {
            result.add(file.toFile());
          }
          return FileVisitResult.CONTINUE;
        }
        @Override
        public FileVisitResult visitFileFailed(Path file, IOException exc) {
          return FileVisitResult.CONTINUE;
        }
        });
  } 
  ...
}
複製代碼

另外進行了配置參數的兼容性處理,用戶仍可保留之前的 filegroups 配置,不需單獨配置 parentDir 和 filePattern,程序會將 filegroups 中的文件的目錄賦值給 parentDir,文件名賦值給 filePattern。

須要注意的是:在 Taildir Source 中有個參數 cachePatternMatching,默認值是 true,其做用是緩存正則匹配的文件列表和消費文件的順序,若目錄中文件較多時,使用正則匹配比較耗時,設置該參數可提升性能,當發現文件的目錄修改後會刷新緩存列表。因爲 filePattern 中可包含目錄,若 cachePatternMatching 設爲 true,在 filePattern 的子目錄中新增文件,parentDir 的修改時間不變,此時新增的日誌文件不能被跟蹤到,所以,建議在 filePattern 包含目錄的狀況下,將 cachePatternMatching 設置爲 false

4.3 相關配置示例

agent.sources.s2.type = org.apache.flume.source.taildir.TaildirSource
agent.sources.s2.filegroups = f1 f2
agent.sources.s2.filegroups.f1.parentDir = /app/log/
agent.sources.s2.filegroups.f1.filePattern = /APP.log.\\d{8}
agent.sources.s2.filegroups.f2.parentDir = /app/log/
agent.sources.s2.filegroups.f2.filePattern = /\\w/.*log
agent.sources.s2.cachePatternMatching = false
複製代碼


五. FLUME-2961

5.1 問題和需求

Taildir Source 按行讀取日誌,把每一行做爲內容放入 flume event 的 body 中,對於如下這種每行就能夠結束的日誌處理沒有問題:

13 七月 2016 23:37:30,580 INFO  [lifecycleSupervisor-1-0] (org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start:62)  - Configuration provider starting
13 七月 2016 23:37:30,585 INFO  [conf-file-poller-0] (org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run:134)  - Reloading configuration file:conf/taildir.conf
13 七月 2016 23:37:30,592 INFO  [conf-file-poller-0] (org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty:1013)  - Processing:s1
複製代碼

可是對於相似 Java Stacktrace 的日誌,若是按上述處理,如下日誌被截斷成 9 個 flume event(一共 9 行)輸出,而咱們但願這樣的日誌記錄,要做爲 1 個 flume event,而不是 9 個輸出:

13 七月 2016 23:37:41,942 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.sink.kafka.KafkaSink.process:229)  - Failed to publish events
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.RecordTooLargeException: The message is 2000067 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
    at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:686)
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:449)
    at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:200)
    at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
    at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 2000067 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
複製代碼

5.2 解決辦法

設計一個 buffer event 緩存多行內容,仿照 Logstash 的 codec/mulitline 插件配置,增長了以下參數:

主要修改了類 TailFile 裏的 readEvents() 方法,相關代碼以下:

if (this.multiline) {
  if (raf != null) { // when file has not closed yet
    boolean match = this.multilinePatternMatched;
    while (events.size() < numEvents) {
      LineResult line = readLine();
      if (line == null) {
        break;
      }
      Event event = null;
      logger.debug("TailFile.readEvents: Current line = " + new String(line.line) +
               ". Current time : " + new Timestamp(System.currentTimeMillis()) +
               ". Pos:" + pos +
               ". LineReadPos:" + lineReadPos + ",raf.getPointer:" + raf.getFilePointer());
      switch (this.multilinePatternBelong) {
        case "next":
          event = readMultilineEventNext(line, match);
          break;
        case "previous":
          event = readMultilineEventPre(line, match);
          break;
        default:
          break;
      }
      if (event != null) {
        events.add(event);
      }
      if (bufferEvent != null) {
        if (bufferEvent.getBody().length >= multilineMaxBytes
                || Integer.parseInt(bufferEvent.getHeaders().get("lineCount")) == multilineMaxLines) {
          flushBufferEvent(events);
        }
      }
    }
  }
  if (needFlushTimeoutEvent()) {
    flushBufferEvent(events);
  }
}
複製代碼

合併多行處理的方法代碼以下:

private Event readMultilineEventPre(LineResult line, boolean match)
          throws IOException {
  Event event = null;
  Matcher m = multilinePattern.matcher(new String(line.line));
  boolean find = m.find();
  match = (find && match) || (!find && !match);
  byte[] lineBytes = toOriginBytes(line);
  if (match) {
    /** If matched, merge it to the buffer event. */
    mergeEvent(line);
  } else {
    /**
     * If not matched, this line is not part of previous event when the buffer event is not null.
     * Then create a new event with buffer event's message and put the current line into the * cleared buffer event. */ if (bufferEvent != null) { event = EventBuilder.withBody(bufferEvent.getBody()); } bufferEvent = null; bufferEvent = EventBuilder.withBody(lineBytes); if (line.lineSepInclude) { bufferEvent.getHeaders().put("lineCount", "1"); } else { bufferEvent.getHeaders().put("lineCount", "0"); } long now = System.currentTimeMillis(); bufferEvent.getHeaders().put("time", Long.toString(now)); } return event; } private Event readMultilineEventNext(LineResult line, boolean match) throws IOException { Event event = null; Matcher m = multilinePattern.matcher(new String(line.line)); boolean find = m.find(); match = (find && match) || (!find && !match); if (match) { /** If matched, merge it to the buffer event. */ mergeEvent(line); } else { /** * If not matched, this line is not part of next event. Then merge the current line into the * buffer event and create a new event with the merged message. */ mergeEvent(line); event = EventBuilder.withBody(bufferEvent.getBody()); bufferEvent = null; } return event; } 複製代碼

3.3 相關配置示例

agent.sources.s3.multiline = true
agent.sources.s3.multilinePattern = ^AGENT_IP:
agent.sources.s3.multilinePatternBelong = previous
agent.sources.s3.multilineMatched = false
agent.sources.s3.multilineEventTimeoutSeconds = 120
agent.sources.s3.multilineMaxBytes = 3145728
agent.sources.s3.multilineMaxLines = 3000
複製代碼


六. FLUME-3187

6.1 問題和需求

爲了獲取 Flume agent 所在機器的主機名或 IP,咱們使用了主機名攔截器 (Host Interceptor),可是根據主機名攔截器的定義,只能保留主機名和 IP 中的一種,沒法同時保留主機名和 IP。

Host Interceptor
This interceptor inserts the hostname or IP address of the host that this agent is running on. It inserts a header with key host or a configured key whose value is the hostname or IP address of the host, based on configuration.

6.2 解決辦法

將原來的 useIP 參數擴展,增長一個參數 useHostname,若同時設置爲 true,可同時保留主機名和 IP;另外支持自定義主機名和 IP 地址在 event header 裏的 key,參數以下:

修改了類 HostInterceptor 中的構造方法和攔截方法,相關代碼以下:

addr = InetAddress.getLocalHost();
if (useIP) {
  ip = addr.getHostAddress();
}
if (useHostname) {
  hostname = addr.getCanonicalHostName();
}
複製代碼

6.3 相關配置示例

agent.sources.s4.interceptors = i1
agent.sources.s4.interceptors.i1.type = host
agent.sources.s4.interceptors.i1.useIP = true
agent.sources.s4.interceptors.i1.useHostname = true
agent.sources.s4.interceptors.i1.ip = ip
agent.sources.s4.interceptors.i1.hostname = hostname
複製代碼


總結

目前上述 4 個 Patch 在我行 A 類和 B 類生產系統已實際運行使用,「擁抱開源,回饋開源」,咱們用的是開源軟件,咱們但願也能對開源軟件作出貢獻。後續咱們將分享我行 ELK 日誌平臺架構演進的詳細細節,敬請你們關注!

做者介紹:

文喬,工做於中國民生銀行總行信息技術部大數據基礎產品平臺組,負責行內大數據管控平臺的開發,天眼日誌平臺主要參與人。微信 tinawenqiao,郵箱 wenqiao@cmbc.com.cn。

相關文章
相關標籤/搜索