在基於elk的日誌系統中,filebeat幾乎是其中必不可少的一個組件,例外是使用性能較差的logstash file input插件或本身造個功能相似的輪子:)。html
在使用和了解filebeat的過程當中,筆者對其一些功能上的實現產生了疑問,諸如:node
爲了找到答案,筆者閱讀了filebeat和部分libbeat的源碼(read the fucking source code),本文便是對此過程的一次總結。一方面是方便往後回顧,另外一方面也但願能解答你們對filebeat的一些疑惑。nginx
本文主要內容包括filebeat基本介紹、源碼解析兩個部分,主要面向的是:想要了解filebeat實現、想改造或擴展filebeat功能或想參考filebeat開發自定義beats
的讀者。redis
filebeat是一個開源的日誌運輸程序,屬於beats家族中的一員,和其餘beats同樣都基於libbeat庫實現。其中,libbeat是一個提供公共功能的庫,功能包括: 配置解析、日誌打印、事件處理和發送等。算法
對於任一種beats來講,主要邏輯都包含兩個部分[2]
:docker
其中第二點已由libbeat實現,所以各個beats實際只須要關心如何收集數據並生成事件後發送給libbeat的Publisher。beats和libeat的交互以下圖所示:json
具體到filebeat,它能採集數據的類型包括: log文件、標準輸入、redis、udp和tcp包、容器日誌和syslog,其中最多見的是使用log類型採集文件日誌發送到Elasticsearch或Logstash。然後續的源碼解析,也主要基於這種使用場景。segmentfault
基於libbeat實現的filebeat,主要擁有如下幾個特性[3]
:後端
下圖是filebeat及使用libbeat的一些主要模塊,爲筆者根據源碼的理解所做。設計模式
1. filebeat主要模塊
2. libbeat主要模塊
├── autodiscover # 包含filebeat的autodiscover適配器(adapter),當autodiscover發現新容器時建立對應類型的輸入
├── beater # 包含與libbeat庫交互相關的文件
├── channel # 包含filebeat輸出到pipeline相關的文件
├── config # 包含filebeat配置結構和解析函數
├── crawler # 包含Crawler結構和相關函數
├── fileset # 包含module和fileset相關的結構
├── harvester # 包含Harvester接口定義、Reader接口及實現等
├── input # 包含全部輸入類型的實現(好比: log, stdin, syslog) ├── inputsource # 在syslog輸入類型中用於讀取tcp或udp syslog ├── module # 包含各module和fileset配置 ├── modules.d # 包含各module對應的日誌路徑配置文件,用於修改默認路徑 ├── processor # 用於從容器日誌的事件字段source中提取容器id ├── prospector # 包含舊版本的輸入結構Prospector,現已被Input取代 ├── registrar # 包含Registrar結構和方法 └── util # 包含beat事件和文件狀態的通用結構Data └── ...
除了以上目錄註釋外,如下將介紹一些我的認爲比較重要的文件的詳細內容,讀者可做爲閱讀源碼時的一個參考。
包含與libbeat庫交互相關的文件:
filebeat輸出(到pipeline)相關的文件
包含Input接口及各類輸入類型的Input和Harvester實現
包含Harvester接口定義、Reader接口及實現等
beats通用事件結構(libbeat/beat/event.go
):
type Event struct { Timestamp time.Time // 收集日誌時記錄的時間戳,對應es文檔中的@timestamp字段 Meta common.MapStr // meta信息,outpus可選的將其做爲事件字段輸出。好比輸出爲es且指定了pipeline時,其pipeline id就被包含在此字段中 Fields common.MapStr // 默認輸出字段定義在field.yml,其餘字段能夠在經過fields配置項指定 Private interface{} // for beats private use }
Crawler(filebeat/crawler/crawler.go
):
// Crawler 負責抓取日誌併發送到libbeat pipeline type Crawler struct { inputs map[uint64]*input.Runner // 包含全部輸入的runner inputConfigs []*common.Config out channel.Factory wg sync.WaitGroup InputsFactory cfgfile.RunnerFactory ModulesFactory cfgfile.RunnerFactory modulesReloader *cfgfile.Reloader inputReloader *cfgfile.Reloader once bool beatVersion string beatDone chan struct{} }
log類型Input(filebeat/input/log/input.go
)
// Input contains the input and its config type Input struct { cfg *common.Config config config states *file.States harvesters *harvester.Registry // 包含Input全部Harvester outlet channel.Outleter // Input共享的Publisher client stateOutlet channel.Outleter done chan struct{} numHarvesters atomic.Uint32 meta map[string]string }
log類型Harvester(filebeat/input/log/harvester.go
):
type Harvester struct { id uuid.UUID config config source harvester.Source // the source being watched // shutdown handling done chan struct{} stopOnce sync.Once stopWg *sync.WaitGroup stopLock sync.Mutex // internal harvester state state file.State states *file.States log *Log // file reader pipeline reader reader.Reader encodingFactory encoding.EncodingFactory encoding encoding.Encoding // event/state publishing outletFactory OutletFactory publishState func(*util.Data) bool onTerminate func() }
Registrar(filebeat/registrar/registrar.go
):
type Registrar struct { Channel chan []file.State out successLogger done chan struct{} registryFile string // Path to the Registry File fileMode os.FileMode // Permissions to apply on the Registry File wg sync.WaitGroup states *file.States // Map with all file paths inside and the corresponding state gcRequired bool // gcRequired is set if registry state needs to be gc'ed before the next write gcEnabled bool // gcEnabled indictes the registry contains some state that can be gc'ed in the future flushTimeout time.Duration bufferedStateUpdates int }
libbeat Pipeline(libbeat/publisher/pipeline/pipeline.go
)
type Pipeline struct { beatInfo beat.Info logger *logp.Logger queue queue.Queue output *outputController observer observer eventer pipelineEventer // wait close support waitCloseMode WaitCloseMode waitCloseTimeout time.Duration waitCloser *waitCloser // pipeline ack ackMode pipelineACKMode ackActive atomic.Bool ackDone chan struct{} ackBuilder ackBuilder // pipelineEventsACK eventSema *sema processors pipelineProcessors }
filebeat啓動流程以下圖所示:
1. 執行root命令
在filebeat/main.go
文件中,main
函數調用了cmd.RootCmd.Execute()
,而RootCmd
則是在cmd/root.go
中被init
函數初始化,其中就註冊了filebeat.go:New
函數以建立實現了beater
接口的filebeat實例
對於任意一個beats來講,都須要有:1) 實現Beater
接口的具體Beater(如Filebeat); 2) 建立該具體Beater的(New)函數[4]
。
beater接口定義(beat/beat.go):
type Beater interface { // The main event loop. This method should block until signalled to stop by an // invocation of the Stop() method. Run(b *Beat) error // Stop is invoked to signal that the Run method should finish its execution. // It will be invoked at most once. Stop() }
2. 初始化和運行Filebeat
libbeat/cmd/instance/beat.go:Beat
結構(*Beat).launch
方法
(*Beat).Init()
初始化Beat:加載beats公共config(*Beat).createBeater
registerTemplateLoading
: 當輸出爲es時,註冊加載es模板的回調函數pipeline.Load
: 建立Pipeline:包含隊列、事件處理器、輸出等setupMetrics
: 安裝監控filebeat.New
: 解析配置(其中輸入配置包括配置文件中的Input和module Input)等loadDashboards
加載kibana dashboard(*Filebeat).Run
: 運行filebeat3. Filebeat運行
從收集日誌、到發送事件到publisher,其數據流以下圖所示:
以log類型爲例
Setup
方法建立一系列reader造成讀處理鏈關於log類型的reader處理鏈,以下圖所示:
opt表示根據配置決定是否建立該reader
Reader包括:
os.File
,用於從指定offset開始讀取日誌行。雖然位於處理鏈的最內部,但其Next函數中實際的處理邏輯(讀文件行)倒是最新被執行的。除了Line Reader外,這些reader都實現了Reader
接口:
type Reader interface { Next() (Message, error) }
Reader經過內部包含Reader
對象的方式,使Reader造成一個處理鏈,其實這就是設計模式中的責任鏈模式。
各Reader的Next方法的通用形式像是這樣:Next
方法調用內部Reader
對象的Next
方法獲取Message
,而後處理後返回。
func (r *SomeReader) Next() (Message, error) { message, err := r.reader.Next() if err != nil { return message, err } // do some processing... return message, nil }
在Crawler收集日誌並轉換成事件後,其就會經過調用Publisher對應client的Publish接口將事件送到Publisher,後續的處理流程也都將由libbeat完成,事件的流轉以下圖所示:
在harvester調用client.Publish
接口時,其內部會使用配置中定義的processors對事件進行處理,而後纔將事件發送到Publisher隊列。
經過官方文檔瞭解到,processor包含兩種:在Input內定義做爲局部(Input獨享)的processor,其只對該Input產生的事件生效;在頂層配置中定義做爲全局processor,其對所有事件生效。
其對應的代碼實現方式是: filebeat在使用libbeat pipeline的ConnectWith
接口建立client時(factory.go
中(*OutletFactory)Create
函數),會將Input內部的定義processor做爲參數傳遞給ConnectWith
接口。而在ConnectWith
實現中,會將參數中的processor和全局processor(在建立pipeline時生成)合併。從這裏讀者也能夠發現,實際上每一個Input都獨享一個client,其包含一些Input自身的配置定義邏輯。
任一Processor都實現了Processor接口:Run函數包含處理邏輯,String返回Processor名。
type Processor interface { Run(event *beat.Event) (*beat.Event, error) String() string }
關於支持的processors及其使用,讀者能夠參考官方文檔Filter and enhance the exported data這一小節
在事件通過處理器處理後,下一步將被髮往Publisher的隊列。在client.go
在(*client) publish
方法中咱們能夠看到,事件是經過調用c.producer.Publish(pubEvent)
被實際發送的,而producer則經過具體Queue的Producer
方法生成。
隊列對象被包含在pipeline.go:Pipeline
結構中,其接口的定義以下:
type Queue interface { io.Closer BufferConfig() BufferConfig Producer(cfg ProducerConfig) Producer Consumer() Consumer }
主要的,Producer方法生成Producer對象,用於向隊列中push事件;Consumer方法生成Consumer對象,用於從隊列中取出事件。Producer
和Consumer
接口定義以下:
type Producer interface { Publish(event publisher.Event) bool TryPublish(event publisher.Event) bool Cancel() int } type Consumer interface { Get(sz int) (Batch, error) Close() error }
在配置中沒有指定隊列配置時,默認使用了memqueue
做爲隊列實現,下面咱們來看看memqueue及其對應producer和consumer定義:
Broker結構(memqueue在代碼中實際對應的結構名是Broker):
type Broker struct { done chan struct{} logger logger bufSize int // buf brokerBuffer // minEvents int // idleTimeout time.Duration // api channels events chan pushRequest requests chan getRequest pubCancel chan producerCancelRequest // internal channels acks chan int scheduledACKs chan chanList eventer queue.Eventer // wait group for worker shutdown wg sync.WaitGroup waitOnClose bool }
根據是否須要ack分爲forgetfullProducer和ackProducer兩種producer:
type forgetfullProducer struct { broker *Broker openState openState } type ackProducer struct { broker *Broker cancel bool seq uint32 state produceState openState openState }
consumer結構:
type consumer struct { broker *Broker resp chan getResponse done chan struct{} closed atomic.Bool }
三者的運做方式以下圖所示:
Producer
經過Publish
或TryPublish
事件放入Broker
的隊列,即結構中的channel對象evetns
Broker
的主事件循環EventLoop將(請求)事件從events channel取出,放入自身結構體對象ringBuffer中。
directEventLoop
:收到事件後儘量快的轉發;2)帶buffer事件循環結構bufferingEventLoop
:當buffer滿或刷新超時時轉發。具體使用哪種取決於memqueue配置項flush.min_events,大於1時使用後者,不然使用前者。eventConsumer
調用Consumer的Get
方法獲取事件:1)首先將獲取事件請求(包括請求事件數和用於存放其響應事件的channel resp
)放入Broker的請求隊列requests中,等待主事件循環EventLoop處理後將事件放入resp;2)獲取resp的事件,組裝成batch結構後返回eventConsumer
將事件放入output對應隊列中這部分關於事件在隊列中各類channel間的流轉,筆者認爲是比較消耗性能的,但不清楚設計者這樣設計的考量是什麼。 另外值得思考的是,在多個go routine使用隊列交互的場景下,libbeat中都使用了go語言channel做爲其底層的隊列,它是否能夠徹底替代加鎖隊列的使用呢?
在隊列消費者將事件放入output工做隊列後,事件將在pipeline/output.go:netClientWorker
的run()
方法中被取出,而後使用具體output client將事件發送到指定輸出(好比:es、logstash等)。
其中,netClientWorker
的數目取決於具體輸出client的數目(好比es做爲輸出時,client數目爲host數目),它們共享相同的output工做隊列。
此時若是發送失敗會發生什麼呢? 在outputs/elasticsearch/client.go:Client
的Publish
方法能夠看到:發送失敗會重試失敗的事件,直到所有事件都發送成功後才調用ACK確認。
在事件發送成功後, 其ack的數據流以下圖所示:
pipeline_ack.go:pipelineEventsACK
的事件隊列events
中pipelineEventsACK
在worker中將事件取出,調用 acker.go:(*eventACKer).ackEvents
,將ack(文件狀態)放入registrar的隊列Channel中。此回調函數在filebeat.go:(*Filebeat)Run
方法中經過Publisher.SetACKHandler
設置。Run()
方法中取出隊列中的文件狀態,刷新registry文件經過ack機制和registrar模塊,filebeat實現了對已發送成功事件對應文件狀態的記錄,這使它即便在程序crash後重啓的狀況下也能從以前的文件位置恢復並繼續處理,保證了日誌數據(事件)被至少發送一次。
至此,本篇文章關於filebeat源碼解析的內容已經結束。
從總體看,filebeat的代碼沒有包含複雜的算法邏輯或底層實現,但其總體代碼結構仍是比較清晰的,即便對於不須要參考filebeat特性實現去開發自定義beats的讀者來講,仍屬於值得一讀的源碼。