容器日誌採集利器:filebeat深度剖析與實踐

在雲原生時代和容器化浪潮中,容器的日誌採集是一個看起來不起眼卻又沒法忽視的重要議題。對於容器日誌採集咱們經常使用的工具備filebeat和fluentd,二者對比各有優劣,相比基於ruby的fluentd,考慮到可定製性,咱們通常默認選擇golang技術棧的filbeat做爲主力的日誌採集agent。
相比較傳統的日誌採集方式,容器化下單節點會運行更多的服務,負載也會有更短的生命週期,而這些更容易對日誌採集agent形成壓力,雖然filebeat足夠輕量級和高性能,但若是不瞭解filebeat的機制,不合理的配置filebeat,實際的生產環境使用中可能也會給咱們帶來意想不到的麻煩和難題。node

總體架構

日誌採集的功能看起來不復雜,主要功能無非就是找到配置的日誌文件,而後讀取並處理,發送至相應的後端如elasticsearch,kafka等。
filebeat官網有張示意圖,以下所示:
![](filebeat.png)
針對每一個日誌文件,filebeat都會啓動一個harvester協程,即一個goroutine,在該goroutine中不停的讀取日誌文件,直到文件的EOF末尾。一個最簡單的表示採集目錄的input配置大概以下所示:linux

filebeat.inputs:
- type: log
  # Paths that should be crawled and fetched. Glob based paths.
  paths:
    - /var/log/*.log

不一樣的harvester goroutine採集到的日誌數據都會發送至一個全局的隊列queue中,queue的實現有兩種:基於內存和基於磁盤的隊列,目前基於磁盤的隊列仍是處於alpha階段,filebeat默認啓用的是基於內存的緩存隊列。
每當隊列中的數據緩存到必定的大小或者超過了定時的時間(默認1s),會被註冊的client從隊列中消費,發送至配置的後端。目前能夠設置的client有kafka、elasticsearch、redis等。 golang

雖然這一切看着挺簡單,但在實際使用中,咱們仍是須要考慮更多的問題,例如:redis

  • 日誌文件是如何被filbebeat發現又是如何被採集的?
  • filebeat是如何確保日誌採集發送到遠程的存儲中,不丟失一條數據的?
  • 若是filebeat掛掉,下次採集如何確保從上次的狀態開始而不會從新採集全部日誌?
  • filebeat的內存或者cpu佔用過多,該如何分析解決?
  • filebeat如何支持docker和kubernetes,如何配置容器化下的日誌採集?
  • 想讓filebeat採集的日誌發送至的後端存儲,若是原生不支持,怎樣定製化開發?

這些均須要對filebeat有更深刻的理解,下面讓咱們跟隨filebeat的源碼一塊兒探究其中的實現機制。docker

一條日誌是如何被採集的

filebeat源碼歸屬於beats項目,而beats項目的設計初衷是爲了採集各種的數據,因此beats抽象出了一個libbeat庫,基於libbeat咱們能夠快速的開發實現一個採集的工具,除了filebeat,還有像metricbeat、packetbeat等官方的項目也是在beats工程中。
若是咱們大體看一下代碼就會發現,libbeat已經實現了內存緩存隊列memqueue、幾種output日誌發送客戶端,數據的過濾處理processor等通用功能,而filebeat只須要實現日誌文件的讀取等和日誌相關的邏輯便可。 json

從代碼的實現角度來看,filebeat大概能夠分如下幾個模塊:後端

  • input: 找到配置的日誌文件,啓動harvester
  • harvester: 讀取文件,發送至spooler
  • spooler: 緩存日誌數據,直到能夠發送至publisher
  • publisher: 發送日誌至後端,同時通知registrar
  • registrar: 記錄日誌文件被採集的狀態

1. 找到日誌文件

對於日誌文件的採集和生命週期管理,filebeat抽象出一個Crawler的結構體,
在filebeat啓動後,crawler會根據配置建立,而後遍歷並運行每一個input:緩存

for _, inputConfig := range c.inputConfigs {
        err := c.startInput(pipeline, inputConfig, r.GetStates())
    }

在每一個input運行的邏輯裏,首先會根據配置獲取匹配的日誌文件,須要注意的是,這裏的匹配方式並不是正則,而是採用linux glob的規則,和正則仍是有一些區別。ruby

matches, err := filepath.Glob(path)

獲取到了全部匹配的日誌文件以後,會通過一些複雜的過濾,例如若是配置了exclude_files則會忽略這類文件,同時還會查詢文件的狀態,若是文件的最近一次修改時間大於ignore_older的配置,也會不去採集該文件。架構

2. 讀取日誌文件

匹配到最終須要採集的日誌文件以後,filebeat會對每一個文件啓動harvester goroutine,在該goroutine中不停的讀取日誌,併發送給內存緩存隊列memqueue。
(h *Harvester) Run()方法中,咱們能夠看到這麼一個無限循環,省略了一些邏輯的代碼以下所示:

for {
        message, err := h.reader.Next()
        if err != nil {
            switch err {
            case ErrFileTruncate:
                logp.Info("File was truncated. Begin reading file from offset 0: %s", h.state.Source)
                h.state.Offset = 0
                filesTruncated.Add(1)
            case ErrRemoved:
                logp.Info("File was removed: %s. Closing because close_removed is enabled.", h.state.Source)
            case ErrRenamed:
                logp.Info("File was renamed: %s. Closing because close_renamed is enabled.", h.state.Source)
            case ErrClosed:
                logp.Info("Reader was closed: %s. Closing.", h.state.Source)
            case io.EOF:
                logp.Info("End of file reached: %s. Closing because close_eof is enabled.", h.state.Source)
            case ErrInactive:
                logp.Info("File is inactive: %s. Closing because close_inactive of %v reached.", h.state.Source, h.config.CloseInactive)
            default:
                logp.Err("Read line error: %v; File: %v", err, h.state.Source)
            }
            return nil
        }
        ...
        if !h.sendEvent(data, forwarder) {
            return nil
        }
}

能夠看到,reader.Next()方法會不停的讀取日誌,若是沒有返回異常,則發送日誌數據到緩存隊列中。
返回的異常有幾種類型,除了讀取到EOF外,還會有例如文件一段時間不活躍等狀況發生會使harvester goroutine退出,再也不採集該文件,並關閉文件句柄。
filebeat爲了防止佔據過多的採集日誌文件的文件句柄,默認的close_inactive參數爲5min,若是日誌文件5min內沒有被修改,上面代碼會進入ErrInactive的case,以後該harvester goroutine會被關閉。
這種場景下還須要注意的是,若是某個文件日誌採集中被移除了,可是因爲此時被filebeat保持着文件句柄,文件佔據的磁盤空間會被保留直到harvester goroutine結束。

3. 緩存隊列

在memqueue被初始化時,filebeat會根據配置min_event是否大於1建立BufferingEventLoop或者DirectEventLoop,通常默認都是BufferingEventLoop,即帶緩衝的隊列。

type bufferingEventLoop struct {
    broker *Broker

    buf        *batchBuffer
    flushList  flushList
    eventCount int

    minEvents    int
    maxEvents    int
    flushTimeout time.Duration

    // active broker API channels
    events    chan pushRequest
    get       chan getRequest
    pubCancel chan producerCancelRequest

    // ack handling
    acks        chan int      // ackloop -> eventloop : total number of events ACKed by outputs
    schedACKS   chan chanList // eventloop -> ackloop : active list of batches to be acked
    pendingACKs chanList      // ordered list of active batches to be send to the ackloop
    ackSeq      uint          // ack batch sequence number to validate ordering

    // buffer flush timer state
    timer *time.Timer
    idleC <-chan time.Time
}

BufferingEventLoop是一個實現了Broker、帶有各類channel的結構,主要用於將日誌發送至consumer消費。
BufferingEventLoop的run方法中,一樣是一個無限循環,這裏能夠認爲是一個日誌事件的調度中心。

for {
        select {
        case <-broker.done:
            return
        case req := <-l.events: // producer pushing new event
            l.handleInsert(&req)
        case req := <-l.get: // consumer asking for next batch
            l.handleConsumer(&req)
        case count := <-l.acks:
            l.handleACK(count)
        case <-l.idleC:
            l.idleC = nil
            l.timer.Stop()
            if l.buf.length() > 0 {
                l.flushBuffer()
            }
        }
    }

上文中harvester goroutine每次讀取到日誌數據以後,最終會被髮送至bufferingEventLoop中的events chan pushRequest channel,而後觸發上面req := <-l.events的case,handleInsert方法會把數據添加至bufferingEventLoop的buf中,buf即memqueue實際緩存日誌數據的隊列,若是buf長度超過配置的最大值或者bufferingEventLoop中的timer定時器觸發了case <-l.idleC,均會調用flushBuffer()方法。
flushBuffer()又會觸發req := <-l.get的case,而後運行handleConsumer方法,該方法中最重要的是這一句代碼:

req.resp <- getResponse{ackChan, events}

這裏獲取到了consumer消費者的response channel,而後發送數據給這個channel。真正到這,纔會觸發consumer對memqueue的消費。因此,其實memqueue並不是一直不停的在被consumer消費,而是在memqueue通知consumer的時候才被消費,咱們能夠理解爲一種脈衝式的發送。

4. 消費隊列

實際上,早在filebeat初始化的時候,就已經建立了一個eventConsumer並在loop無限循環方法裏試圖從Broker中獲取日誌數據。

for {
        if !paused && c.out != nil && consumer != nil && batch == nil {
            out = c.out.workQueue
            queueBatch, err := consumer.Get(c.out.batchSize)
            ...
            batch = newBatch(c.ctx, queueBatch, c.out.timeToLive)
        }
        ...
        select {
        case <-c.done:
            return
        case sig := <-c.sig:
            handleSignal(sig)
        case out <- batch:
            batch = nil
        }
    }

上面consumer.Get就是消費者consumer從Broker中獲取日誌數據,而後發送至out的channel中被output client發送,咱們看一下Get方法裏的核心代碼:

select {
    case c.broker.requests <- getRequest{sz: sz, resp: c.resp}:
    case <-c.done:
        return nil, io.EOF
    }

    // if request has been send, we do have to wait for a response
    resp := <-c.resp
    return &batch{
        consumer: c,
        events:   resp.buf,
        ack:      resp.ack,
        state:    batchActive,
    }, nil

getRequest的結構以下:

type getRequest struct {
    sz   int              // request sz events from the broker
    resp chan getResponse // channel to send response to
}

getResponse的結構:

type getResponse struct {
    ack *ackChan
    buf []publisher.Event
}

getResponse裏包含了日誌的數據,而getRequest包含了一個發送至消費者的channel。
在上文bufferingEventLoop緩衝隊列的handleConsumer方法裏接收到的參數爲getRequest,裏面包含了consumer請求的getResponse channel。
若是handleConsumer不發送數據,consumer.Get方法會一直阻塞在select中,直到flushBuffer,consumer的getResponse channel纔會接收到日誌數據。

5. 發送日誌

在建立beats時,會建立一個clientWorker,clientWorker的run方法中,會不停的從consumer發送的channel裏讀取日誌數據,而後調用client.Publish批量發送日誌。

func (w *clientWorker) run() {
    for !w.closed.Load() {
        for batch := range w.qu {
            if err := w.client.Publish(batch); err != nil {
                return
            }
        }
    }
}

libbeats庫中包含了kafka、elasticsearch、logstash等幾種client,它們均實現了client接口:

type Client interface {
    Close() error
    Publish(publisher.Batch) error
    String() string
}

固然最重要的是實現Publish接口,而後將日誌發送出去。

實際上,filebeat中日誌數據在各類channel裏流轉的設計仍是比較複雜和繁瑣的,筆者也是研究了很久、畫了很長的架構圖才理清楚其中的邏輯。
這裏抽出了一個簡化後的圖以供參考:
arch

如何保證at least once

filebeat維護了一個registry文件在本地的磁盤,該registry文件維護了全部已經採集的日誌文件的狀態。
實際上,每當日誌數據發送至後端成功後,會返回ack事件。filebeat啓動了一個獨立的registry協程負責監聽該事件,接收到ack事件後會將日誌文件的State狀態更新至registry文件中,State中的Offset表示讀取到的文件偏移量,因此filebeat會保證Offset記錄以前的日誌數據確定被後端的日誌存儲接收到。
State結構以下所示:

type State struct {
    Id          string            `json:"-"` // local unique id to make comparison more efficient
    Finished    bool              `json:"-"` // harvester state
    Fileinfo    os.FileInfo       `json:"-"` // the file info
    Source      string            `json:"source"`
    Offset      int64             `json:"offset"`
    Timestamp   time.Time         `json:"timestamp"`
    TTL         time.Duration     `json:"ttl"`
    Type        string            `json:"type"`
    Meta        map[string]string `json:"meta"`
    FileStateOS file.StateOS
}

記錄在registry文件中的數據大體以下所示:

[{"source":"/tmp/aa.log","offset":48,"timestamp":"2019-07-03T13:54:01.298995+08:00","ttl":-1,"type":"log","meta":null,"FileStateOS":{"inode":7048952,"device":16777220}}]

因爲文件可能會被更名或移動,filebeat會根據inode和設備號來標誌每一個日誌文件。
若是filebeat異常重啓,每次採集harvester啓動的時候都會讀取registry文件,從上次記錄的狀態繼續採集,確保不會從頭開始重複發送全部的日誌文件。
固然,若是日誌發送過程當中,還沒來得及返回ack,filebeat就掛掉,registry文件確定不會更新至最新的狀態,那麼下次採集的時候,這部分的日誌就會重複發送,因此這意味着filebeat只能保證at least once,沒法保證不重複發送。
還有一個比較異常的狀況是,linux下若是老文件被移除,新文件立刻建立,頗有可能它們有相同的inode,而因爲filebeat根據inode來標誌文件記錄採集的偏移,會致使registry裏記錄的實際上是被移除的文件State狀態,這樣新的文件採集卻從老的文件Offset開始,從而會遺漏日誌數據。
爲了儘可能避免inode被複用的狀況,同時防止registry文件隨着時間增加愈來愈大,建議使用clean_inactive和clean_remove配置將長時間未更新或者被刪除的文件State從registry中移除。

同時咱們能夠發如今harvester讀取日誌中,會更新registry的狀態處理一些異常場景。例如,若是一個日誌文件被清空,filebeat會在下一次Reader.Next方法中返回ErrFileTruncate異常,將inode標誌文件的Offset置爲0,結束此次harvester,從新啓動新的harvester,雖然文件不變,可是registry中的Offset爲0,採集會從頭開始。

特別注意的是,若是使用容器部署filebeat,須要將registry文件掛載到宿主機上,不然容器重啓後registry文件丟失,會使filebeat從頭開始重複採集日誌文件。

filebeat自動reload更新

目前filebeat支持reload input配置,module配置,但reload的機制只有定時更新。
在配置中打開reload.enable以後,還能夠配置reload.period表示自動reload配置的時間間隔。
filebeat在啓動時,會建立一個專門用於reload的協程。對於每一個正在運行的harvester,filebeat會將其加入一個全局的Runner列表,每次到了定時的間隔後,會觸發一次配置文件的diff判斷,若是是須要中止的加入stopRunner列表,而後逐個關閉,新的則加入startRunner列表,啓動新的Runner。

filebeat對kubernetes的支持

filebeat官方文檔提供了在kubernetes下基於daemonset的部署方式,最主要的一個配置以下所示:

- type: docker
      containers.ids:
      - "*"
      processors:
        - add_kubernetes_metadata:
            in_cluster: true

即設置輸入input爲docker類型。因爲全部的容器的標準輸出日誌默認都在節點的/var/lib/docker/containers/<containerId>/*-json.log路徑,因此本質上採集的是這類日誌文件。
和傳統的部署方式有所區別的是,若是服務部署在kubernetes上,咱們查看和檢索日誌的維度不能僅僅侷限於節點和服務,還須要有podName,containerName等,因此每條日誌咱們都須要打標增長kubernetes的元信息才發送至後端。
filebeat會在配置中增長了add_kubernetes_metadata的processor的狀況下,啓動監聽kubernetes的watch服務,監聽全部kubernetes pod的變動,而後將歸屬本節點的pod最新的事件同步至本地的緩存中。
節點上一旦發生容器的銷燬建立,/var/lib/docker/containers/下會有目錄的變更,filebeat根據路徑提取出containerId,再根據containerId從本地的緩存中找到pod信息,從而能夠獲取到podName、label等數據,並加到日誌的元信息fields中。
filebeat還有一個beta版的功能autodiscover,autodiscover的目的是把分散到不一樣節點上的filebeat配置文件集中管理。目前也支持kubernetes做爲provider,本質上仍是監聽kubernetes事件而後採集docker的標準輸出文件。
大體架構以下所示:
logagent
可是在實際生產環境使用中,僅採集容器的標準輸出日誌仍是遠遠不夠,咱們每每還須要採集容器掛載出來的自定義日誌目錄,還須要控制每一個服務的日誌採集方式以及更多的定製化功能。

在輕舟容器雲上,咱們自研了一個監聽kubernetes事件自動生成filebeat配置的agent,經過CRD的方式,支持自定義容器內部日誌目錄、支持自定義fields、支持多行讀取等功能。同時可在kubernetes上統一管理各類日誌配置,並且無需用戶感知pod的建立銷燬和遷移,自動完成各類場景下的日誌配置生成和更新。

性能分析與調優

雖然beats系列主打輕量級,雖然用golang寫的filebeat的內存佔用確實比較基於jvm的logstash等好太多,可是事實告訴咱們其實沒那麼簡單。
正常啓動filebeat,通常確實只會佔用三、40MB內存,可是在輕舟容器雲上偶發性的咱們也會發現某些節點上的filebeat容器內存佔用超過配置的pod limit限制(通常設置爲200MB),而且不停的觸發的OOM。
究其緣由,通常容器化環境中,特別是裸機上運行的容器個數可能會比較多,致使建立大量的harvester去採集日誌。若是沒有很好的配置filebeat,會有較大機率致使內存急劇上升。
固然,filebeat內存佔據較大的部分仍是memqueue,全部採集到的日誌都會先發送至memqueue彙集,再經過output發送出去。每條日誌的數據在filebeat中都被組裝爲event結構,filebeat默認配置的memqueue緩存的event個數爲4096,可經過queue.mem.events設置。默認最大的一條日誌的event大小限制爲10MB,可經過max_bytes設置。4096 * 10MB = 40GB,能夠想象,極端場景下,filebeat至少佔據40GB的內存。特別是配置了multiline多行模式的狀況下,若是multiline配置有誤,單個event誤採集爲上千條日誌的數據,極可能致使memqueue佔據了大量內存,導致內存爆炸。
因此,合理的配置日誌文件的匹配規則,限制單行日誌大小,根據實際狀況配置memqueue緩存的個數,才能在實際使用中規避filebeat的內存佔用過大的問題。

如何對filebeat進行擴展開發

通常狀況下filebeat可知足大部分的日誌採集需求,可是仍然避免不了一些特殊的場景須要咱們對filebeat進行定製化開發,固然filebeat自己的設計也提供了良好的擴展性。
beats目前只提供了像elasticsearch、kafka、logstash等幾類output客戶端,若是咱們想要filebeat直接發送至其餘後端,須要定製化開發本身的output。一樣,若是須要對日誌作過濾處理或者增長元信息,也能夠自制processor插件。
不管是增長output仍是寫個processor,filebeat提供的大致思路基本相同。通常來說有3種方式:

  1. 直接fork filebeat,在現有的源碼上開發。output或者processor都提供了相似Run、Stop等的接口,只須要實現該類接口,而後在init方法中註冊相應的插件初始化方法便可。固然,因爲golang中init方法是在import包時才被調用,因此須要在初始化filebeat的代碼中手動import。
  2. 複製一份filebeat的main.go,import咱們自研的插件庫,而後從新編譯。本質上和方式1區別不大。
  3. filebeat還提供了基於golang plugin的插件機制,須要把自研的插件編譯成.so共享連接庫,而後在filebeat啓動參數中經過-plugin指定庫所在路徑。不過實際上一方面golang plugin還不夠成熟穩定,一方面自研的插件依然須要依賴相同版本的libbeat庫,並且還須要相同的golang版本編譯,坑可能更多,不太推薦。
相關文章
相關標籤/搜索