在雲原生時代和容器化浪潮中,容器的日誌採集是一個看起來不起眼卻又沒法忽視的重要議題。對於容器日誌採集咱們經常使用的工具備filebeat和fluentd,二者對比各有優劣,相比基於ruby的fluentd,考慮到可定製性,咱們通常默認選擇golang技術棧的filbeat做爲主力的日誌採集agent。
相比較傳統的日誌採集方式,容器化下單節點會運行更多的服務,負載也會有更短的生命週期,而這些更容易對日誌採集agent形成壓力,雖然filebeat足夠輕量級和高性能,但若是不瞭解filebeat的機制,不合理的配置filebeat,實際的生產環境使用中可能也會給咱們帶來意想不到的麻煩和難題。node
日誌採集的功能看起來不復雜,主要功能無非就是找到配置的日誌文件,而後讀取並處理,發送至相應的後端如elasticsearch,kafka等。
filebeat官網有張示意圖,以下所示: linux
filebeat.inputs:
- type: log
# Paths that should be crawled and fetched. Glob based paths.
paths:
- /var/log/*.log
複製代碼
不一樣的harvester goroutine採集到的日誌數據都會發送至一個全局的隊列queue中,queue的實現有兩種:基於內存和基於磁盤的隊列,目前基於磁盤的隊列仍是處於alpha階段,filebeat默認啓用的是基於內存的緩存隊列。
每當隊列中的數據緩存到必定的大小或者超過了定時的時間(默認1s),會被註冊的client從隊列中消費,發送至配置的後端。目前能夠設置的client有kafka、elasticsearch、redis等。golang
雖然這一切看着挺簡單,但在實際使用中,咱們仍是須要考慮更多的問題,例如:redis
這些均須要對filebeat有更深刻的理解,下面讓咱們跟隨filebeat的源碼一塊兒探究其中的實現機制。docker
filebeat源碼歸屬於beats項目,而beats項目的設計初衷是爲了採集各種的數據,因此beats抽象出了一個libbeat庫,基於libbeat咱們能夠快速的開發實現一個採集的工具,除了filebeat,還有像metricbeat、packetbeat等官方的項目也是在beats工程中。
若是咱們大體看一下代碼就會發現,libbeat已經實現了內存緩存隊列memqueue、幾種output日誌發送客戶端,數據的過濾處理processor等通用功能,而filebeat只須要實現日誌文件的讀取等和日誌相關的邏輯便可。json
從代碼的實現角度來看,filebeat大概能夠分如下幾個模塊:後端
對於日誌文件的採集和生命週期管理,filebeat抽象出一個Crawler的結構體, 在filebeat啓動後,crawler會根據配置建立,而後遍歷並運行每一個input:緩存
for _, inputConfig := range c.inputConfigs {
err := c.startInput(pipeline, inputConfig, r.GetStates())
}
複製代碼
在每一個input運行的邏輯裏,首先會根據配置獲取匹配的日誌文件,須要注意的是,這裏的匹配方式並不是正則,而是採用linux glob的規則,和正則仍是有一些區別。ruby
matches, err := filepath.Glob(path)
複製代碼
獲取到了全部匹配的日誌文件以後,會通過一些複雜的過濾,例如若是配置了exclude_files
則會忽略這類文件,同時還會查詢文件的狀態,若是文件的最近一次修改時間大於ignore_older
的配置,也會不去採集該文件。bash
匹配到最終須要採集的日誌文件以後,filebeat會對每一個文件啓動harvester goroutine,在該goroutine中不停的讀取日誌,併發送給內存緩存隊列memqueue。
在(h *Harvester) Run()
方法中,咱們能夠看到這麼一個無限循環,省略了一些邏輯的代碼以下所示:
for {
message, err := h.reader.Next()
if err != nil {
switch err {
case ErrFileTruncate:
logp.Info("File was truncated. Begin reading file from offset 0: %s", h.state.Source)
h.state.Offset = 0
filesTruncated.Add(1)
case ErrRemoved:
logp.Info("File was removed: %s. Closing because close_removed is enabled.", h.state.Source)
case ErrRenamed:
logp.Info("File was renamed: %s. Closing because close_renamed is enabled.", h.state.Source)
case ErrClosed:
logp.Info("Reader was closed: %s. Closing.", h.state.Source)
case io.EOF:
logp.Info("End of file reached: %s. Closing because close_eof is enabled.", h.state.Source)
case ErrInactive:
logp.Info("File is inactive: %s. Closing because close_inactive of %v reached.", h.state.Source, h.config.CloseInactive)
default:
logp.Err("Read line error: %v; File: %v", err, h.state.Source)
}
return nil
}
...
if !h.sendEvent(data, forwarder) {
return nil
}
}
複製代碼
能夠看到,reader.Next()方法會不停的讀取日誌,若是沒有返回異常,則發送日誌數據到緩存隊列中。
返回的異常有幾種類型,除了讀取到EOF外,還會有例如文件一段時間不活躍等狀況發生會使harvester goroutine退出,再也不採集該文件,並關閉文件句柄。
filebeat爲了防止佔據過多的採集日誌文件的文件句柄,默認的close_inactive
參數爲5min,若是日誌文件5min內沒有被修改,上面代碼會進入ErrInactive的case,以後該harvester goroutine會被關閉。
這種場景下還須要注意的是,若是某個文件日誌採集中被移除了,可是因爲此時被filebeat保持着文件句柄,文件佔據的磁盤空間會被保留直到harvester goroutine結束。
在memqueue被初始化時,filebeat會根據配置min_event
是否大於1建立BufferingEventLoop或者DirectEventLoop,通常默認都是BufferingEventLoop,即帶緩衝的隊列。
type bufferingEventLoop struct {
broker *Broker
buf *batchBuffer
flushList flushList
eventCount int
minEvents int
maxEvents int
flushTimeout time.Duration
// active broker API channels
events chan pushRequest
get chan getRequest
pubCancel chan producerCancelRequest
// ack handling
acks chan int // ackloop -> eventloop : total number of events ACKed by outputs
schedACKS chan chanList // eventloop -> ackloop : active list of batches to be acked
pendingACKs chanList // ordered list of active batches to be send to the ackloop
ackSeq uint // ack batch sequence number to validate ordering
// buffer flush timer state
timer *time.Timer
idleC <-chan time.Time
}
複製代碼
BufferingEventLoop是一個實現了Broker、帶有各類channel的結構,主要用於將日誌發送至consumer消費。 BufferingEventLoop的run方法中,一樣是一個無限循環,這裏能夠認爲是一個日誌事件的調度中心。
for {
select {
case <-broker.done:
return
case req := <-l.events: // producer pushing new event
l.handleInsert(&req)
case req := <-l.get: // consumer asking for next batch
l.handleConsumer(&req)
case count := <-l.acks:
l.handleACK(count)
case <-l.idleC:
l.idleC = nil
l.timer.Stop()
if l.buf.length() > 0 {
l.flushBuffer()
}
}
}
複製代碼
上文中harvester goroutine每次讀取到日誌數據以後,最終會被髮送至bufferingEventLoop中的events chan pushRequest
channel,而後觸發上面req := <-l.events
的case,handleInsert方法會把數據添加至bufferingEventLoop的buf中,buf即memqueue實際緩存日誌數據的隊列,若是buf長度超過配置的最大值或者bufferingEventLoop中的timer定時器觸發了case <-l.idleC
,均會調用flushBuffer()方法。
flushBuffer()又會觸發req := <-l.get
的case,而後運行handleConsumer方法,該方法中最重要的是這一句代碼:
req.resp <- getResponse{ackChan, events}
複製代碼
這裏獲取到了consumer消費者的response channel,而後發送數據給這個channel。真正到這,纔會觸發consumer對memqueue的消費。因此,其實memqueue並不是一直不停的在被consumer消費,而是在memqueue通知consumer的時候才被消費,咱們能夠理解爲一種脈衝式的發送。
實際上,早在filebeat初始化的時候,就已經建立了一個eventConsumer並在loop無限循環方法裏試圖從Broker中獲取日誌數據。
for {
if !paused && c.out != nil && consumer != nil && batch == nil {
out = c.out.workQueue
queueBatch, err := consumer.Get(c.out.batchSize)
...
batch = newBatch(c.ctx, queueBatch, c.out.timeToLive)
}
...
select {
case <-c.done:
return
case sig := <-c.sig:
handleSignal(sig)
case out <- batch:
batch = nil
}
}
複製代碼
上面consumer.Get就是消費者consumer從Broker中獲取日誌數據,而後發送至out的channel中被output client發送,咱們看一下Get方法裏的核心代碼:
select {
case c.broker.requests <- getRequest{sz: sz, resp: c.resp}:
case <-c.done:
return nil, io.EOF
}
// if request has been send, we do have to wait for a response
resp := <-c.resp
return &batch{
consumer: c,
events: resp.buf,
ack: resp.ack,
state: batchActive,
}, nil
複製代碼
getRequest的結構以下:
type getRequest struct {
sz int // request sz events from the broker
resp chan getResponse // channel to send response to
}
複製代碼
getResponse的結構:
type getResponse struct {
ack *ackChan
buf []publisher.Event
}
複製代碼
getResponse裏包含了日誌的數據,而getRequest包含了一個發送至消費者的channel。
在上文bufferingEventLoop緩衝隊列的handleConsumer方法裏接收到的參數爲getRequest,裏面包含了consumer請求的getResponse channel。
若是handleConsumer不發送數據,consumer.Get方法會一直阻塞在select中,直到flushBuffer,consumer的getResponse channel纔會接收到日誌數據。
在建立beats時,會建立一個clientWorker,clientWorker的run方法中,會不停的從consumer發送的channel裏讀取日誌數據,而後調用client.Publish批量發送日誌。
func (w *clientWorker) run() {
for !w.closed.Load() {
for batch := range w.qu {
if err := w.client.Publish(batch); err != nil {
return
}
}
}
}
複製代碼
libbeats庫中包含了kafka、elasticsearch、logstash等幾種client,它們均實現了client接口:
type Client interface {
Close() error
Publish(publisher.Batch) error
String() string
}
複製代碼
固然最重要的是實現Publish接口,而後將日誌發送出去。
實際上,filebeat中日誌數據在各類channel裏流轉的設計仍是比較複雜和繁瑣的,筆者也是研究了很久、畫了很長的架構圖才理清楚其中的邏輯。 這裏抽出了一個簡化後的圖以供參考:
filebeat維護了一個registry文件在本地的磁盤,該registry文件維護了全部已經採集的日誌文件的狀態。 實際上,每當日誌數據發送至後端成功後,會返回ack事件。filebeat啓動了一個獨立的registry協程負責監聽該事件,接收到ack事件後會將日誌文件的State狀態更新至registry文件中,State中的Offset表示讀取到的文件偏移量,因此filebeat會保證Offset記錄以前的日誌數據確定被後端的日誌存儲接收到。
State結構以下所示:
type State struct {
Id string `json:"-"` // local unique id to make comparison more efficient
Finished bool `json:"-"` // harvester state
Fileinfo os.FileInfo `json:"-"` // the file info
Source string `json:"source"`
Offset int64 `json:"offset"`
Timestamp time.Time `json:"timestamp"`
TTL time.Duration `json:"ttl"`
Type string `json:"type"`
Meta map[string]string `json:"meta"`
FileStateOS file.StateOS
}
複製代碼
記錄在registry文件中的數據大體以下所示:
[{"source":"/tmp/aa.log","offset":48,"timestamp":"2019-07-03T13:54:01.298995+08:00","ttl":-1,"type":"log","meta":null,"FileStateOS":{"inode":7048952,"device":16777220}}]
複製代碼
因爲文件可能會被更名或移動,filebeat會根據inode和設備號來標誌每一個日誌文件。
若是filebeat異常重啓,每次採集harvester啓動的時候都會讀取registry文件,從上次記錄的狀態繼續採集,確保不會從頭開始重複發送全部的日誌文件。
固然,若是日誌發送過程當中,還沒來得及返回ack,filebeat就掛掉,registry文件確定不會更新至最新的狀態,那麼下次採集的時候,這部分的日誌就會重複發送,因此這意味着filebeat只能保證at least once,沒法保證不重複發送。
還有一個比較異常的狀況是,linux下若是老文件被移除,新文件立刻建立,頗有可能它們有相同的inode,而因爲filebeat根據inode來標誌文件記錄採集的偏移,會致使registry裏記錄的實際上是被移除的文件State狀態,這樣新的文件採集卻從老的文件Offset開始,從而會遺漏日誌數據。
爲了儘可能避免inode被複用的狀況,同時防止registry文件隨着時間增加愈來愈大,建議使用clean_inactive和clean_remove配置將長時間未更新或者被刪除的文件State從registry中移除。
同時咱們能夠發如今harvester讀取日誌中,會更新registry的狀態處理一些異常場景。例如,若是一個日誌文件被清空,filebeat會在下一次Reader.Next方法中返回ErrFileTruncate異常,將inode標誌文件的Offset置爲0,結束此次harvester,從新啓動新的harvester,雖然文件不變,可是registry中的Offset爲0,採集會從頭開始。
特別注意的是,若是使用容器部署filebeat,須要將registry文件掛載到宿主機上,不然容器重啓後registry文件丟失,會使filebeat從頭開始重複採集日誌文件。
目前filebeat支持reload input配置,module配置,但reload的機制只有定時更新。
在配置中打開reload.enable以後,還能夠配置reload.period表示自動reload配置的時間間隔。
filebeat在啓動時,會建立一個專門用於reload的協程。對於每一個正在運行的harvester,filebeat會將其加入一個全局的Runner列表,每次到了定時的間隔後,會觸發一次配置文件的diff判斷,若是是須要中止的加入stopRunner列表,而後逐個關閉,新的則加入startRunner列表,啓動新的Runner。
filebeat官方文檔提供了在kubernetes下基於daemonset的部署方式,最主要的一個配置以下所示:
- type: docker
containers.ids:
- "*"
processors:
- add_kubernetes_metadata:
in_cluster: true
複製代碼
即設置輸入input爲docker類型。因爲全部的容器的標準輸出日誌默認都在節點的/var/lib/docker/containers/<containerId>/*-json.log
路徑,因此本質上採集的是這類日誌文件。
和傳統的部署方式有所區別的是,若是服務部署在kubernetes上,咱們查看和檢索日誌的維度不能僅僅侷限於節點和服務,還須要有podName,containerName等,因此每條日誌咱們都須要打標增長kubernetes的元信息才發送至後端。
filebeat會在配置中增長了add_kubernetes_metadata的processor的狀況下,啓動監聽kubernetes的watch服務,監聽全部kubernetes pod的變動,而後將歸屬本節點的pod最新的事件同步至本地的緩存中。
節點上一旦發生容器的銷燬建立,/var/lib/docker/containers/下會有目錄的變更,filebeat根據路徑提取出containerId,再根據containerId從本地的緩存中找到pod信息,從而能夠獲取到podName、label等數據,並加到日誌的元信息fields中。
filebeat還有一個beta版的功能autodiscover,autodiscover的目的是把分散到不一樣節點上的filebeat配置文件集中管理。目前也支持kubernetes做爲provider,本質上仍是監聽kubernetes事件而後採集docker的標準輸出文件。
大體架構以下所示:
可是在實際生產環境使用中,僅採集容器的標準輸出日誌仍是遠遠不夠,咱們每每還須要採集容器掛載出來的自定義日誌目錄,還須要控制每一個服務的日誌採集方式以及更多的定製化功能。
在輕舟容器雲上,咱們自研了一個監聽kubernetes事件自動生成filebeat配置的agent,經過CRD的方式,支持自定義容器內部日誌目錄、支持自定義fields、支持多行讀取等功能。同時可在kubernetes上統一管理各類日誌配置,並且無需用戶感知pod的建立銷燬和遷移,自動完成各類場景下的日誌配置生成和更新。
雖然beats系列主打輕量級,雖然用golang寫的filebeat的內存佔用確實比較基於jvm的logstash等好太多,可是事實告訴咱們其實沒那麼簡單。
正常啓動filebeat,通常確實只會佔用三、40MB內存,可是在輕舟容器雲上偶發性的咱們也會發現某些節點上的filebeat容器內存佔用超過配置的pod limit限制(通常設置爲200MB),而且不停的觸發的OOM。
究其緣由,通常容器化環境中,特別是裸機上運行的容器個數可能會比較多,致使建立大量的harvester去採集日誌。若是沒有很好的配置filebeat,會有較大機率致使內存急劇上升。
固然,filebeat內存佔據較大的部分仍是memqueue,全部採集到的日誌都會先發送至memqueue彙集,再經過output發送出去。每條日誌的數據在filebeat中都被組裝爲event結構,filebeat默認配置的memqueue緩存的event個數爲4096,可經過queue.mem.events
設置。默認最大的一條日誌的event大小限制爲10MB,可經過max_bytes
設置。4096 * 10MB = 40GB
,能夠想象,極端場景下,filebeat至少佔據40GB的內存。特別是配置了multiline多行模式的狀況下,若是multiline配置有誤,單個event誤採集爲上千條日誌的數據,極可能致使memqueue佔據了大量內存,導致內存爆炸。
因此,合理的配置日誌文件的匹配規則,限制單行日誌大小,根據實際狀況配置memqueue緩存的個數,才能在實際使用中規避filebeat的內存佔用過大的問題。
通常狀況下filebeat可知足大部分的日誌採集需求,可是仍然避免不了一些特殊的場景須要咱們對filebeat進行定製化開發,固然filebeat自己的設計也提供了良好的擴展性。
beats目前只提供了像elasticsearch、kafka、logstash等幾類output客戶端,若是咱們想要filebeat直接發送至其餘後端,須要定製化開發本身的output。一樣,若是須要對日誌作過濾處理或者增長元信息,也能夠自制processor插件。
不管是增長output仍是寫個processor,filebeat提供的大致思路基本相同。通常來說有3種方式: