在雲原生時代和容器化浪潮中,容器的日誌採集是一個看起來不起眼卻又沒法忽視的重要議題。對於容器日誌採集咱們經常使用的工具備filebeat和fluentd,二者對比各有優劣,相比基於ruby的fluentd,考慮到可定製性,咱們通常默認選擇golang技術棧的filbeat做爲主力的日誌採集agent。
相比較傳統的日誌採集方式,容器化下單節點會運行更多的服務,負載也會有更短的生命週期,而這些更容易對日誌採集agent形成壓力,雖然filebeat足夠輕量級和高性能,但若是不瞭解filebeat的機制,不合理的配置filebeat,實際的生產環境使用中可能也會給咱們帶來意想不到的麻煩和難題。node
日誌採集的功能看起來不復雜,主要功能無非就是找到配置的日誌文件,而後讀取並處理,發送至相應的後端如elasticsearch,kafka等。
filebeat官網有張示意圖,以下所示:
針對每一個日誌文件,filebeat都會啓動一個harvester協程,即一個goroutine,在該goroutine中不停的讀取日誌文件,直到文件的EOF末尾。一個最簡單的表示採集目錄的input配置大概以下所示:linux
filebeat.inputs: - type: log # Paths that should be crawled and fetched. Glob based paths. paths: - /var/log/*.log
不一樣的harvester goroutine採集到的日誌數據都會發送至一個全局的隊列queue中,queue的實現有兩種:基於內存和基於磁盤的隊列,目前基於磁盤的隊列仍是處於alpha階段,filebeat默認啓用的是基於內存的緩存隊列。
每當隊列中的數據緩存到必定的大小或者超過了定時的時間(默認1s),會被註冊的client從隊列中消費,發送至配置的後端。目前能夠設置的client有kafka、elasticsearch、redis等。 golang
雖然這一切看着挺簡單,但在實際使用中,咱們仍是須要考慮更多的問題,例如:redis
這些均須要對filebeat有更深刻的理解,下面讓咱們跟隨filebeat的源碼一塊兒探究其中的實現機制。docker
filebeat源碼歸屬於beats項目,而beats項目的設計初衷是爲了採集各種的數據,因此beats抽象出了一個libbeat庫,基於libbeat咱們能夠快速的開發實現一個採集的工具,除了filebeat,還有像metricbeat、packetbeat等官方的項目也是在beats工程中。
若是咱們大體看一下代碼就會發現,libbeat已經實現了內存緩存隊列memqueue、幾種output日誌發送客戶端,數據的過濾處理processor等通用功能,而filebeat只須要實現日誌文件的讀取等和日誌相關的邏輯便可。 json
從代碼的實現角度來看,filebeat大概能夠分如下幾個模塊:後端
對於日誌文件的採集和生命週期管理,filebeat抽象出一個Crawler的結構體,
在filebeat啓動後,crawler會根據配置建立,而後遍歷並運行每一個input:緩存
for _, inputConfig := range c.inputConfigs { err := c.startInput(pipeline, inputConfig, r.GetStates()) }
在每一個input運行的邏輯裏,首先會根據配置獲取匹配的日誌文件,須要注意的是,這裏的匹配方式並不是正則,而是採用linux glob的規則,和正則仍是有一些區別。ruby
matches, err := filepath.Glob(path)
獲取到了全部匹配的日誌文件以後,會通過一些複雜的過濾,例如若是配置了exclude_files
則會忽略這類文件,同時還會查詢文件的狀態,若是文件的最近一次修改時間大於ignore_older
的配置,也會不去採集該文件。架構
匹配到最終須要採集的日誌文件以後,filebeat會對每一個文件啓動harvester goroutine,在該goroutine中不停的讀取日誌,併發送給內存緩存隊列memqueue。
在(h *Harvester) Run()
方法中,咱們能夠看到這麼一個無限循環,省略了一些邏輯的代碼以下所示:
for { message, err := h.reader.Next() if err != nil { switch err { case ErrFileTruncate: logp.Info("File was truncated. Begin reading file from offset 0: %s", h.state.Source) h.state.Offset = 0 filesTruncated.Add(1) case ErrRemoved: logp.Info("File was removed: %s. Closing because close_removed is enabled.", h.state.Source) case ErrRenamed: logp.Info("File was renamed: %s. Closing because close_renamed is enabled.", h.state.Source) case ErrClosed: logp.Info("Reader was closed: %s. Closing.", h.state.Source) case io.EOF: logp.Info("End of file reached: %s. Closing because close_eof is enabled.", h.state.Source) case ErrInactive: logp.Info("File is inactive: %s. Closing because close_inactive of %v reached.", h.state.Source, h.config.CloseInactive) default: logp.Err("Read line error: %v; File: %v", err, h.state.Source) } return nil } ... if !h.sendEvent(data, forwarder) { return nil } }
能夠看到,reader.Next()方法會不停的讀取日誌,若是沒有返回異常,則發送日誌數據到緩存隊列中。
返回的異常有幾種類型,除了讀取到EOF外,還會有例如文件一段時間不活躍等狀況發生會使harvester goroutine退出,再也不採集該文件,並關閉文件句柄。
filebeat爲了防止佔據過多的採集日誌文件的文件句柄,默認的close_inactive
參數爲5min,若是日誌文件5min內沒有被修改,上面代碼會進入ErrInactive的case,以後該harvester goroutine會被關閉。
這種場景下還須要注意的是,若是某個文件日誌採集中被移除了,可是因爲此時被filebeat保持着文件句柄,文件佔據的磁盤空間會被保留直到harvester goroutine結束。
在memqueue被初始化時,filebeat會根據配置min_event
是否大於1建立BufferingEventLoop或者DirectEventLoop,通常默認都是BufferingEventLoop,即帶緩衝的隊列。
type bufferingEventLoop struct { broker *Broker buf *batchBuffer flushList flushList eventCount int minEvents int maxEvents int flushTimeout time.Duration // active broker API channels events chan pushRequest get chan getRequest pubCancel chan producerCancelRequest // ack handling acks chan int // ackloop -> eventloop : total number of events ACKed by outputs schedACKS chan chanList // eventloop -> ackloop : active list of batches to be acked pendingACKs chanList // ordered list of active batches to be send to the ackloop ackSeq uint // ack batch sequence number to validate ordering // buffer flush timer state timer *time.Timer idleC <-chan time.Time }
BufferingEventLoop是一個實現了Broker、帶有各類channel的結構,主要用於將日誌發送至consumer消費。
BufferingEventLoop的run方法中,一樣是一個無限循環,這裏能夠認爲是一個日誌事件的調度中心。
for { select { case <-broker.done: return case req := <-l.events: // producer pushing new event l.handleInsert(&req) case req := <-l.get: // consumer asking for next batch l.handleConsumer(&req) case count := <-l.acks: l.handleACK(count) case <-l.idleC: l.idleC = nil l.timer.Stop() if l.buf.length() > 0 { l.flushBuffer() } } }
上文中harvester goroutine每次讀取到日誌數據以後,最終會被髮送至bufferingEventLoop中的events chan pushRequest
channel,而後觸發上面req := <-l.events
的case,handleInsert方法會把數據添加至bufferingEventLoop的buf中,buf即memqueue實際緩存日誌數據的隊列,若是buf長度超過配置的最大值或者bufferingEventLoop中的timer定時器觸發了case <-l.idleC
,均會調用flushBuffer()方法。
flushBuffer()又會觸發req := <-l.get
的case,而後運行handleConsumer方法,該方法中最重要的是這一句代碼:
req.resp <- getResponse{ackChan, events}
這裏獲取到了consumer消費者的response channel,而後發送數據給這個channel。真正到這,纔會觸發consumer對memqueue的消費。因此,其實memqueue並不是一直不停的在被consumer消費,而是在memqueue通知consumer的時候才被消費,咱們能夠理解爲一種脈衝式的發送。
實際上,早在filebeat初始化的時候,就已經建立了一個eventConsumer並在loop無限循環方法裏試圖從Broker中獲取日誌數據。
for { if !paused && c.out != nil && consumer != nil && batch == nil { out = c.out.workQueue queueBatch, err := consumer.Get(c.out.batchSize) ... batch = newBatch(c.ctx, queueBatch, c.out.timeToLive) } ... select { case <-c.done: return case sig := <-c.sig: handleSignal(sig) case out <- batch: batch = nil } }
上面consumer.Get就是消費者consumer從Broker中獲取日誌數據,而後發送至out的channel中被output client發送,咱們看一下Get方法裏的核心代碼:
select { case c.broker.requests <- getRequest{sz: sz, resp: c.resp}: case <-c.done: return nil, io.EOF } // if request has been send, we do have to wait for a response resp := <-c.resp return &batch{ consumer: c, events: resp.buf, ack: resp.ack, state: batchActive, }, nil
getRequest的結構以下:
type getRequest struct { sz int // request sz events from the broker resp chan getResponse // channel to send response to }
getResponse的結構:
type getResponse struct { ack *ackChan buf []publisher.Event }
getResponse裏包含了日誌的數據,而getRequest包含了一個發送至消費者的channel。
在上文bufferingEventLoop緩衝隊列的handleConsumer方法裏接收到的參數爲getRequest,裏面包含了consumer請求的getResponse channel。
若是handleConsumer不發送數據,consumer.Get方法會一直阻塞在select中,直到flushBuffer,consumer的getResponse channel纔會接收到日誌數據。
在建立beats時,會建立一個clientWorker,clientWorker的run方法中,會不停的從consumer發送的channel裏讀取日誌數據,而後調用client.Publish批量發送日誌。
func (w *clientWorker) run() { for !w.closed.Load() { for batch := range w.qu { if err := w.client.Publish(batch); err != nil { return } } } }
libbeats庫中包含了kafka、elasticsearch、logstash等幾種client,它們均實現了client接口:
type Client interface { Close() error Publish(publisher.Batch) error String() string }
固然最重要的是實現Publish接口,而後將日誌發送出去。
實際上,filebeat中日誌數據在各類channel裏流轉的設計仍是比較複雜和繁瑣的,筆者也是研究了很久、畫了很長的架構圖才理清楚其中的邏輯。
這裏抽出了一個簡化後的圖以供參考:
filebeat維護了一個registry文件在本地的磁盤,該registry文件維護了全部已經採集的日誌文件的狀態。
實際上,每當日誌數據發送至後端成功後,會返回ack事件。filebeat啓動了一個獨立的registry協程負責監聽該事件,接收到ack事件後會將日誌文件的State狀態更新至registry文件中,State中的Offset表示讀取到的文件偏移量,因此filebeat會保證Offset記錄以前的日誌數據確定被後端的日誌存儲接收到。
State結構以下所示:
type State struct { Id string `json:"-"` // local unique id to make comparison more efficient Finished bool `json:"-"` // harvester state Fileinfo os.FileInfo `json:"-"` // the file info Source string `json:"source"` Offset int64 `json:"offset"` Timestamp time.Time `json:"timestamp"` TTL time.Duration `json:"ttl"` Type string `json:"type"` Meta map[string]string `json:"meta"` FileStateOS file.StateOS }
記錄在registry文件中的數據大體以下所示:
[{"source":"/tmp/aa.log","offset":48,"timestamp":"2019-07-03T13:54:01.298995+08:00","ttl":-1,"type":"log","meta":null,"FileStateOS":{"inode":7048952,"device":16777220}}]
因爲文件可能會被更名或移動,filebeat會根據inode和設備號來標誌每一個日誌文件。
若是filebeat異常重啓,每次採集harvester啓動的時候都會讀取registry文件,從上次記錄的狀態繼續採集,確保不會從頭開始重複發送全部的日誌文件。
固然,若是日誌發送過程當中,還沒來得及返回ack,filebeat就掛掉,registry文件確定不會更新至最新的狀態,那麼下次採集的時候,這部分的日誌就會重複發送,因此這意味着filebeat只能保證at least once,沒法保證不重複發送。
還有一個比較異常的狀況是,linux下若是老文件被移除,新文件立刻建立,頗有可能它們有相同的inode,而因爲filebeat根據inode來標誌文件記錄採集的偏移,會致使registry裏記錄的實際上是被移除的文件State狀態,這樣新的文件採集卻從老的文件Offset開始,從而會遺漏日誌數據。
爲了儘可能避免inode被複用的狀況,同時防止registry文件隨着時間增加愈來愈大,建議使用clean_inactive和clean_remove配置將長時間未更新或者被刪除的文件State從registry中移除。
同時咱們能夠發如今harvester讀取日誌中,會更新registry的狀態處理一些異常場景。例如,若是一個日誌文件被清空,filebeat會在下一次Reader.Next方法中返回ErrFileTruncate異常,將inode標誌文件的Offset置爲0,結束此次harvester,從新啓動新的harvester,雖然文件不變,可是registry中的Offset爲0,採集會從頭開始。
特別注意的是,若是使用容器部署filebeat,須要將registry文件掛載到宿主機上,不然容器重啓後registry文件丟失,會使filebeat從頭開始重複採集日誌文件。
目前filebeat支持reload input配置,module配置,但reload的機制只有定時更新。
在配置中打開reload.enable以後,還能夠配置reload.period表示自動reload配置的時間間隔。
filebeat在啓動時,會建立一個專門用於reload的協程。對於每一個正在運行的harvester,filebeat會將其加入一個全局的Runner列表,每次到了定時的間隔後,會觸發一次配置文件的diff判斷,若是是須要中止的加入stopRunner列表,而後逐個關閉,新的則加入startRunner列表,啓動新的Runner。
filebeat官方文檔提供了在kubernetes下基於daemonset的部署方式,最主要的一個配置以下所示:
- type: docker containers.ids: - "*" processors: - add_kubernetes_metadata: in_cluster: true
即設置輸入input爲docker類型。因爲全部的容器的標準輸出日誌默認都在節點的/var/lib/docker/containers/<containerId>/*-json.log
路徑,因此本質上採集的是這類日誌文件。
和傳統的部署方式有所區別的是,若是服務部署在kubernetes上,咱們查看和檢索日誌的維度不能僅僅侷限於節點和服務,還須要有podName,containerName等,因此每條日誌咱們都須要打標增長kubernetes的元信息才發送至後端。
filebeat會在配置中增長了add_kubernetes_metadata的processor的狀況下,啓動監聽kubernetes的watch服務,監聽全部kubernetes pod的變動,而後將歸屬本節點的pod最新的事件同步至本地的緩存中。
節點上一旦發生容器的銷燬建立,/var/lib/docker/containers/下會有目錄的變更,filebeat根據路徑提取出containerId,再根據containerId從本地的緩存中找到pod信息,從而能夠獲取到podName、label等數據,並加到日誌的元信息fields中。
filebeat還有一個beta版的功能autodiscover,autodiscover的目的是把分散到不一樣節點上的filebeat配置文件集中管理。目前也支持kubernetes做爲provider,本質上仍是監聽kubernetes事件而後採集docker的標準輸出文件。
大體架構以下所示:
可是在實際生產環境使用中,僅採集容器的標準輸出日誌仍是遠遠不夠,咱們每每還須要採集容器掛載出來的自定義日誌目錄,還須要控制每一個服務的日誌採集方式以及更多的定製化功能。
在輕舟容器雲上,咱們自研了一個監聽kubernetes事件自動生成filebeat配置的agent,經過CRD的方式,支持自定義容器內部日誌目錄、支持自定義fields、支持多行讀取等功能。同時可在kubernetes上統一管理各類日誌配置,並且無需用戶感知pod的建立銷燬和遷移,自動完成各類場景下的日誌配置生成和更新。
雖然beats系列主打輕量級,雖然用golang寫的filebeat的內存佔用確實比較基於jvm的logstash等好太多,可是事實告訴咱們其實沒那麼簡單。
正常啓動filebeat,通常確實只會佔用三、40MB內存,可是在輕舟容器雲上偶發性的咱們也會發現某些節點上的filebeat容器內存佔用超過配置的pod limit限制(通常設置爲200MB),而且不停的觸發的OOM。
究其緣由,通常容器化環境中,特別是裸機上運行的容器個數可能會比較多,致使建立大量的harvester去採集日誌。若是沒有很好的配置filebeat,會有較大機率致使內存急劇上升。
固然,filebeat內存佔據較大的部分仍是memqueue,全部採集到的日誌都會先發送至memqueue彙集,再經過output發送出去。每條日誌的數據在filebeat中都被組裝爲event結構,filebeat默認配置的memqueue緩存的event個數爲4096,可經過queue.mem.events
設置。默認最大的一條日誌的event大小限制爲10MB,可經過max_bytes
設置。4096 * 10MB = 40GB
,能夠想象,極端場景下,filebeat至少佔據40GB的內存。特別是配置了multiline多行模式的狀況下,若是multiline配置有誤,單個event誤採集爲上千條日誌的數據,極可能致使memqueue佔據了大量內存,導致內存爆炸。
因此,合理的配置日誌文件的匹配規則,限制單行日誌大小,根據實際狀況配置memqueue緩存的個數,才能在實際使用中規避filebeat的內存佔用過大的問題。
通常狀況下filebeat可知足大部分的日誌採集需求,可是仍然避免不了一些特殊的場景須要咱們對filebeat進行定製化開發,固然filebeat自己的設計也提供了良好的擴展性。
beats目前只提供了像elasticsearch、kafka、logstash等幾類output客戶端,若是咱們想要filebeat直接發送至其餘後端,須要定製化開發本身的output。一樣,若是須要對日誌作過濾處理或者增長元信息,也能夠自制processor插件。
不管是增長output仍是寫個processor,filebeat提供的大致思路基本相同。通常來說有3種方式: