轉-filebeat 源碼分析

背景

在基於elk的日誌系統中,filebeat幾乎是其中必不可少的一個組件,例外是使用性能較差的logstash file input插件或本身造個功能相似的輪子:)。html

在使用和了解filebeat的過程當中,筆者對其一些功能上的實現產生了疑問,諸如:node

  1. 爲何libbeat能如此容易的進行擴展,衍生出多個應用普遍的beat運輸程序?
  2. 爲何它的性能比logstash好? (https://logz.io/blog/filebeat-vs-logstash/
  3. 是如何實現‘保證至少發送一次’這個feature的呢?
  4. 代碼模塊是如何劃分、如何組織、如何運行的呢?
  5. ...

爲了找到答案,筆者閱讀了filebeat和部分libbeat的源碼(read the fucking source code),本文便是對此過程的一次總結。一方面是方便往後回顧,另外一方面也但願能解答你們對filebeat的一些疑惑。nginx

本文主要內容包括filebeat基本介紹、源碼解析兩個部分,主要面向的是:想要了解filebeat實現、想改造或擴展filebeat功能或想參考filebeat開發自定義beats的讀者。redis

filebeat基本介紹

filebeat是一個開源的日誌運輸程序,屬於beats家族中的一員,和其餘beats同樣都基於libbeat庫實現。其中,libbeat是一個提供公共功能的庫,功能包括: 配置解析、日誌打印、事件處理和發送等。算法

對於任一種beats來講,主要邏輯都包含兩個部分[2]docker

  1. 收集數據並轉換成事件
  2. 發送事件到指定的輸出

其中第二點已由libbeat實現,所以各個beats實際只須要關心如何收集數據並生成事件後發送給libbeat的Publisher。beats和libeat的交互以下圖所示:json

beats和libeat的交互

具體到filebeat,它能採集數據的類型包括: log文件、標準輸入、redis、udp和tcp包、容器日誌和syslog,其中最多見的是使用log類型採集文件日誌發送到Elasticsearch或Logstash。然後續的源碼解析,也主要基於這種使用場景。segmentfault

基於libbeat實現的filebeat,主要擁有如下幾個特性[3]後端

  1. 在運輸日誌內容方面它擁有健壯性:正常狀況下,filebeat讀取並運輸日誌行,但若是期間程序因某些緣由被中斷了,它會記住中斷前已處理成功的讀取位置,在程序再次啓動時恢復。
  2. 能夠解析多種常見的日誌格式,簡化用戶操做: filebeta內置多個模塊(module):auditd、Apache、NGINX、System、MySQL等,它們將常見日誌格式的收集、解析和可視化簡化成了一個單獨命令,模塊的實現方式:基於操做系統定義各個模塊對應日誌的默認路徑、使用ingest node的pipeline解析特定的日誌格式、結合kibana dashboard可視化解析後特定格式的日誌。
  3. 支持容器應用的日誌收集,而且能經過libbeat的autodiscover特性檢測新加入的容器並使用對應的模塊(module)或輸入
  4. 不會使pipeline超過負載:使用backpressure-sensitive 協議感知後端(好比logstash、elasticsesarch等)壓力,若是後端忙於處理數據,則下降讀日誌的速度;一旦阻塞被解決,則恢復。
  5. 能夠將運輸日誌到elasticsearch或logstash中,在kibana進行可視化

filebeat源碼解析

模塊結構

下圖是filebeat及使用libbeat的一些主要模塊,爲筆者根據源碼的理解所做。設計模式

filebeat模塊結構

1. filebeat主要模塊

  • Crawler: 管理全部Input收集數據併發送事件到libbeat的Publisher
  • Input: 對應可配置的一種輸入類型,每種類型都有具體的Input和Harvester實現。 配置項
    • Harvester: 對應一個輸入源,是收集數據的實際工做者。配置中,一個具體的Input能夠包含多個輸入源(Harvester)
  • module: 簡化了一些常見程序日誌(好比nginx日誌)收集、解析、可視化(kibana dashboard)配置項
    • fileset: module下具體的一種Input定義(好比nginx包括access和error log),包含:1)輸入配置;2)es ingest node pipeline定義;3)事件字段定義;4)示例kibana dashboard
  • Registrar:用於在事件發送成功後記錄文件狀態

2. libbeat主要模塊

  • Publisher:
    • client: 提供Publish接口讓filebeat將事件發送到Publisher。在發送到隊列以前,內部會先調用processors(包括input 內部的processors和全局processors)進行處理。
    • processor: 事件處理器,可對事件按照配置中的條件進行各類處理(好比刪除事件、保留指定字段等)。配置項
    • queue: 事件隊列,有memqueue(基於內存)和spool(基於磁盤文件)兩種實現。配置項
    • outputs: 事件的輸出端,好比ES、Logstash、kafka等。配置項
    • acker: 事件確認回調,在事件發送成功後進行回調
  • autodiscover:用於自動發現容器並將其做爲輸入源

filebeat目錄組織

├── autodiscover        # 包含filebeat的autodiscover適配器(adapter),當autodiscover發現新容器時建立對應類型的輸入
├── beater              # 包含與libbeat庫交互相關的文件
├── channel             # 包含filebeat輸出到pipeline相關的文件
├── config              # 包含filebeat配置結構和解析函數
├── crawler             # 包含Crawler結構和相關函數
├── fileset             # 包含module和fileset相關的結構
├── harvester           # 包含Harvester接口定義、Reader接口及實現等
├── input               # 包含全部輸入類型的實現(好比: log, stdin, syslog) ├── inputsource # 在syslog輸入類型中用於讀取tcp或udp syslog ├── module # 包含各module和fileset配置 ├── modules.d # 包含各module對應的日誌路徑配置文件,用於修改默認路徑 ├── processor # 用於從容器日誌的事件字段source中提取容器id ├── prospector # 包含舊版本的輸入結構Prospector,現已被Input取代 ├── registrar # 包含Registrar結構和方法 └── util # 包含beat事件和文件狀態的通用結構Data └── ...

除了以上目錄註釋外,如下將介紹一些我的認爲比較重要的文件的詳細內容,讀者可做爲閱讀源碼時的一個參考。

/beater

包含與libbeat庫交互相關的文件:

  • acker.go: 包含在libbeat設置的ack回調函數,事件成功發送後被調用
  • channels.go: 包含在ack回調函數中被調用的記錄者(logger),包括:
    1. registrarLogger: 將已確認事件寫入registrar運行隊列
    2. finishedLogger: 統計已確認事件數量
  • filebeat.go: 包含實現了beater接口的filebeat結構,接口函數包括:
    1. New:建立了filebeat實例
    2. Run:運行filebeat
    3. Stop: 中止filebeat運行
  • signalwait.go:基於channel實現的等待函數,在filebeat中用於:
    1. 等待fileebat結束
    2. 等待確認事件被寫入registry文件

/channel

filebeat輸出(到pipeline)相關的文件

  • factory.go: 包含OutletFactory,用於建立輸出器Outleter對象
  • interface.go: 定義輸出接口Outleter
  • outlet.go: 實現Outleter,封裝了libbeat的pipeline client,其在harvester中被調用用於將事件發送給pipeline
  • util.go: 定義ack回調的參數結構data,包含beat事件和文件狀態

/input

包含Input接口及各類輸入類型的Input和Harvester實現

  • Input:對應配置中的一個Input項,同個Input下可包含多個輸入源(好比文件)
  • Harvester:每一個輸入源對應一個Harvester,負責實際收集數據、併發送事件到pipeline

/harvester

包含Harvester接口定義、Reader接口及實現等

  • forwarder.go: Forwarder結構(包含outlet)定義,用於轉發事件
  • harvester.go: Harvester接口定義,具體實現則在/input目錄下
  • registry.go: Registry結構,用於在Input中管理多個Harvester(輸入源)的啓動和中止
  • source.go: Source接口定義,表示輸入源。目前僅有Pipe一種實現(包含os.File),用在log、stdin和docker輸入類型中。btw,這三種輸入類型都是用的log input的實現。
  • /reader目錄: Reader接口定義和各類Reader實現

重要數據結構

beats通用事件結構(libbeat/beat/event.go):

type Event struct { Timestamp time.Time // 收集日誌時記錄的時間戳,對應es文檔中的@timestamp字段 Meta common.MapStr // meta信息,outpus可選的將其做爲事件字段輸出。好比輸出爲es且指定了pipeline時,其pipeline id就被包含在此字段中 Fields common.MapStr // 默認輸出字段定義在field.yml,其餘字段能夠在經過fields配置項指定 Private interface{} // for beats private use }

Crawler(filebeat/crawler/crawler.go):

// Crawler 負責抓取日誌併發送到libbeat pipeline type Crawler struct { inputs map[uint64]*input.Runner // 包含全部輸入的runner inputConfigs []*common.Config out channel.Factory wg sync.WaitGroup InputsFactory cfgfile.RunnerFactory ModulesFactory cfgfile.RunnerFactory modulesReloader *cfgfile.Reloader inputReloader *cfgfile.Reloader once bool beatVersion string beatDone chan struct{} }

log類型Input(filebeat/input/log/input.go)

// Input contains the input and its config type Input struct { cfg *common.Config config config states *file.States harvesters *harvester.Registry // 包含Input全部Harvester outlet channel.Outleter // Input共享的Publisher client stateOutlet channel.Outleter done chan struct{} numHarvesters atomic.Uint32 meta map[string]string }

log類型Harvester(filebeat/input/log/harvester.go):

type Harvester struct { id uuid.UUID config config source harvester.Source // the source being watched // shutdown handling done chan struct{} stopOnce sync.Once stopWg *sync.WaitGroup stopLock sync.Mutex // internal harvester state state file.State states *file.States log *Log // file reader pipeline reader reader.Reader encodingFactory encoding.EncodingFactory encoding encoding.Encoding // event/state publishing outletFactory OutletFactory publishState func(*util.Data) bool onTerminate func() }

Registrar(filebeat/registrar/registrar.go):

type Registrar struct { Channel chan []file.State out successLogger done chan struct{} registryFile string // Path to the Registry File fileMode os.FileMode // Permissions to apply on the Registry File wg sync.WaitGroup states *file.States // Map with all file paths inside and the corresponding state gcRequired bool // gcRequired is set if registry state needs to be gc'ed before the next write gcEnabled bool // gcEnabled indictes the registry contains some state that can be gc'ed in the future flushTimeout time.Duration bufferedStateUpdates int }

libbeat Pipeline(libbeat/publisher/pipeline/pipeline.go)

type Pipeline struct { beatInfo beat.Info logger *logp.Logger queue queue.Queue output *outputController observer observer eventer pipelineEventer // wait close support waitCloseMode WaitCloseMode waitCloseTimeout time.Duration waitCloser *waitCloser // pipeline ack ackMode pipelineACKMode ackActive atomic.Bool ackDone chan struct{} ackBuilder ackBuilder // pipelineEventsACK eventSema *sema processors pipelineProcessors }

執行邏輯

filebeat啓動

filebeat啓動流程以下圖所示:

filebeat啓動流程

1. 執行root命令

filebeat/main.go文件中,main函數調用了cmd.RootCmd.Execute(),而RootCmd則是在cmd/root.go中被init函數初始化,其中就註冊了filebeat.go:New函數以建立實現了beater接口的filebeat實例

對於任意一個beats來講,都須要有:1) 實現Beater接口的具體Beater(如Filebeat); 2) 建立該具體Beater的(New)函數[4]

beater接口定義(beat/beat.go):

type Beater interface { // The main event loop. This method should block until signalled to stop by an // invocation of the Stop() method. Run(b *Beat) error // Stop is invoked to signal that the Run method should finish its execution. // It will be invoked at most once. Stop() }

2. 初始化和運行Filebeat

  • 建立libbeat/cmd/instance/beat.go:Beat結構
  • 執行(*Beat).launch方法
    • (*Beat).Init() 初始化Beat:加載beats公共config
    • (*Beat).createBeater
      • registerTemplateLoading: 當輸出爲es時,註冊加載es模板的回調函數
      • pipeline.Load: 建立Pipeline:包含隊列、事件處理器、輸出等
      • setupMetrics: 安裝監控
      • filebeat.New: 解析配置(其中輸入配置包括配置文件中的Input和module Input)等
    • loadDashboards 加載kibana dashboard
    • (*Filebeat).Run: 運行filebeat

3. Filebeat運行

  • 設置加載es pipeline的回調函數
  • 初始化registrar和crawler
  • 設置事件完成的回調函數
  • 啓動Registrar、啓動Crawler、啓動Autodiscover
  • 等待filebeat運行結束

日誌收集

從收集日誌、到發送事件到publisher,其數據流以下圖所示:

日誌收集數據流
  • Crawler根據Input配置建立並啓動具體Input對象

以log類型爲例

  • Log input對象建立時會從registry讀取文件狀態(主要是offset),而後爲input配置中的文件路徑建立harvester並運行
    • harvester啓動時會經過Setup方法建立一系列reader造成讀處理鏈
  • harvester從registry記錄的文件位置開始讀取,組裝成事件(beat.Event)後發給Publisher

reader

關於log類型的reader處理鏈,以下圖所示:

reader處理鏈

opt表示根據配置決定是否建立該reader

Reader包括:

  • Line: 包含os.File,用於從指定offset開始讀取日誌行。雖然位於處理鏈的最內部,但其Next函數中實際的處理邏輯(讀文件行)倒是最新被執行的。
  • Encode: 包含Line Reader,將其讀取到的行生成Message結構後返回
  • JSON, DockerJSON: 將json形式的日誌內容decode成字段
  • StripNewLine:去除日誌行尾部的空白符
  • Multiline: 用於讀取多行日誌
  • Limit: 限制單行日誌字節數

除了Line Reader外,這些reader都實現了Reader接口:

type Reader interface { Next() (Message, error) }

Reader經過內部包含Reader對象的方式,使Reader造成一個處理鏈,其實這就是設計模式中的責任鏈模式。

各Reader的Next方法的通用形式像是這樣:Next方法調用內部Reader對象的Next方法獲取Message,而後處理後返回。

func (r *SomeReader) Next() (Message, error) { message, err := r.reader.Next() if err != nil { return message, err } // do some processing... return message, nil }

事件處理和隊列

在Crawler收集日誌並轉換成事件後,其就會經過調用Publisher對應client的Publish接口將事件送到Publisher,後續的處理流程也都將由libbeat完成,事件的流轉以下圖所示:

事件處理、進入隊列及輸出過程

事件處理器processor

在harvester調用client.Publish接口時,其內部會使用配置中定義的processors對事件進行處理,而後纔將事件發送到Publisher隊列。

經過官方文檔瞭解到,processor包含兩種:在Input內定義做爲局部(Input獨享)的processor,其只對該Input產生的事件生效;在頂層配置中定義做爲全局processor,其對所有事件生效。

其對應的代碼實現方式是: filebeat在使用libbeat pipeline的ConnectWith接口建立client時(factory.go(*OutletFactory)Create函數),會將Input內部的定義processor做爲參數傳遞給ConnectWith接口。而在ConnectWith實現中,會將參數中的processor和全局processor(在建立pipeline時生成)合併。從這裏讀者也能夠發現,實際上每一個Input都獨享一個client,其包含一些Input自身的配置定義邏輯。

任一Processor都實現了Processor接口:Run函數包含處理邏輯,String返回Processor名。

type Processor interface { Run(event *beat.Event) (*beat.Event, error) String() string }

關於支持的processors及其使用,讀者能夠參考官方文檔Filter and enhance the exported data這一小節

隊列queue

在事件通過處理器處理後,下一步將被髮往Publisher的隊列。在client.go(*client) publish方法中咱們能夠看到,事件是經過調用c.producer.Publish(pubEvent)被實際發送的,而producer則經過具體Queue的Producer方法生成。

隊列對象被包含在pipeline.go:Pipeline結構中,其接口的定義以下:

type Queue interface { io.Closer BufferConfig() BufferConfig Producer(cfg ProducerConfig) Producer Consumer() Consumer }

主要的,Producer方法生成Producer對象,用於向隊列中push事件;Consumer方法生成Consumer對象,用於從隊列中取出事件。ProducerConsumer接口定義以下:

type Producer interface { Publish(event publisher.Event) bool TryPublish(event publisher.Event) bool Cancel() int } type Consumer interface { Get(sz int) (Batch, error) Close() error }

在配置中沒有指定隊列配置時,默認使用了memqueue做爲隊列實現,下面咱們來看看memqueue及其對應producer和consumer定義:

Broker結構(memqueue在代碼中實際對應的結構名是Broker):

type Broker struct { done chan struct{} logger logger bufSize int // buf brokerBuffer // minEvents int // idleTimeout time.Duration // api channels events chan pushRequest requests chan getRequest pubCancel chan producerCancelRequest // internal channels acks chan int scheduledACKs chan chanList eventer queue.Eventer // wait group for worker shutdown wg sync.WaitGroup waitOnClose bool }

根據是否須要ack分爲forgetfullProducer和ackProducer兩種producer:

type forgetfullProducer struct { broker *Broker openState openState } type ackProducer struct { broker *Broker cancel bool seq uint32 state produceState openState openState }

consumer結構:

type consumer struct { broker *Broker resp chan getResponse done chan struct{} closed atomic.Bool }

三者的運做方式以下圖所示:

queue、producer、consumer關係
  • Producer經過PublishTryPublish事件放入Broker的隊列,即結構中的channel對象evetns
  • Broker的主事件循環EventLoop將(請求)事件從events channel取出,放入自身結構體對象ringBuffer中。
    • 主事件循環有兩種類型:1)直接(不帶buffer)事件循環結構directEventLoop:收到事件後儘量快的轉發;2)帶buffer事件循環結構bufferingEventLoop:當buffer滿或刷新超時時轉發。具體使用哪種取決於memqueue配置項flush.min_events,大於1時使用後者,不然使用前者。
  • eventConsumer調用Consumer的Get方法獲取事件:1)首先將獲取事件請求(包括請求事件數和用於存放其響應事件的channel resp)放入Broker的請求隊列requests中,等待主事件循環EventLoop處理後將事件放入resp;2)獲取resp的事件,組裝成batch結構後返回
  • eventConsumer將事件放入output對應隊列中

這部分關於事件在隊列中各類channel間的流轉,筆者認爲是比較消耗性能的,但不清楚設計者這樣設計的考量是什麼。 另外值得思考的是,在多個go routine使用隊列交互的場景下,libbeat中都使用了go語言channel做爲其底層的隊列,它是否能夠徹底替代加鎖隊列的使用呢?

事件發送

在隊列消費者將事件放入output工做隊列後,事件將在pipeline/output.go:netClientWorkerrun()方法中被取出,而後使用具體output client將事件發送到指定輸出(好比:es、logstash等)。

其中,netClientWorker的數目取決於具體輸出client的數目(好比es做爲輸出時,client數目爲host數目),它們共享相同的output工做隊列。

此時若是發送失敗會發生什麼呢? 在outputs/elasticsearch/client.go:ClientPublish方法能夠看到:發送失敗會重試失敗的事件,直到所有事件都發送成功後才調用ACK確認。

ack機制和registrar記錄文件狀態

在事件發送成功後, 其ack的數據流以下圖所示:

registrar記錄文件狀態過程
  • 在事件發送成功後,其被放入pipeline_ack.go:pipelineEventsACK的事件隊列events
  • pipelineEventsACK在worker中將事件取出,調用 acker.go:(*eventACKer).ackEvents,將ack(文件狀態)放入registrar的隊列Channel中。此回調函數在filebeat.go:(*Filebeat)Run方法中經過Publisher.SetACKHandler設置。
  • 在Registrar的Run()方法中取出隊列中的文件狀態,刷新registry文件

經過ack機制和registrar模塊,filebeat實現了對已發送成功事件對應文件狀態的記錄,這使它即便在程序crash後重啓的狀況下也能從以前的文件位置恢復並繼續處理,保證了日誌數據(事件)被至少發送一次。

總結

至此,本篇文章關於filebeat源碼解析的內容已經結束。

從總體看,filebeat的代碼沒有包含複雜的算法邏輯或底層實現,但其總體代碼結構仍是比較清晰的,即便對於不須要參考filebeat特性實現去開發自定義beats的讀者來講,仍屬於值得一讀的源碼。

參考

  1. filebeat官方文檔: https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-getting-started.html
  2. Creating a New Beat: https://www.elastic.co/guide/en/beats/devguide/6.5/newbeat-overview.html
  3. filebeat主頁: https://www.elastic.co/products/beats/filebeat
  4. The Beater Interface: https://www.elastic.co/guide/en/beats/devguide/current/beater-interface.html#beater-interface
  5. filebeat源碼分析: http://www.javashuo.com/article/p-rjqqshvn-kp.html
相關文章
相關標籤/搜索