filebeat 源碼分析

因爲業務須要,咱們要對 beats 進行二次開發。因此最近我在看它的實現。這篇文章就是對此的一段總結。node

beats是知名的ELK日誌分析套件的一部分。它的前身是logstash-forwarder,用於收集日誌並轉發給後端(logstash、elasticsearch、redis、kafka等等)。filebeat是beats項目中的一種beats,負責收集日誌文件的新增內容。 雖然標題是《Filebeat源碼分析》,不過因爲filebeat依賴於公共庫 libbeat,本文會花一半的篇幅跟它打交道。libbeat 集合了各個 beat 會用到的內容,包括公共的配置,輸出的管理等等。每一個beat專一於本身的收集工做,而後轉發給libbeat進一步處理和輸出。git

每一個 beat 的構建是獨立的。從 filebeat 的入口文件filebeat/main.go能夠看到,它向libbeat傳遞了名字、版本和構造函數來構造自身。跟着走到libbeat/beater/beater.go,咱們能夠看到程序的啓動時的主要工做都是在這裏完成的,包括命令行參數的處理、通用配置項的解析,以及最爲重要的:調用象徵一個beat的生命週期的若干方法。github

每一個 beat 都實現了Beater接口,其中包含SetupConfigRunCleanupStop五個接口方法。redis

type Beater interface {
        Config(*Beat) error
        Setup(*Beat) error
        Run(*Beat) error
        Cleanup(*Beat) error
        Stop()
}

當libbeat用傳進來的構造函數構造了一個beater時,它會依Config -> Setup -> Run -> Cleanup的順序調用這幾個方法,並將Stop註冊到信號處理函數中。json

Beat是在libbeat的構造入口處定義的結構體,其功能就是存儲構造過程當中的中間產物,好比解析出的配置會存儲在Beat.Config,根據配置中建立的Publisher也會存在Beat.Publisher裏面。後端

咱們能夠跳回filebeat/,在beat/filebeat.go看到接口的具體實現。Config方法裏面,filebeat只會讀取跟本身名字相關的配置。Setup方法則基本沒幹什麼。CleanupStop方法天然只是作些收尾工做。重頭戲在於Run方法。api

Run方法中,filebeat逐個建立了spooler
因爲業務須要,咱們要對 beats 進行二次開發。因此最近我在看它的實現(對應版本是v5.0.0-alpha4)。本文是對此的一段總結。async

beats是知名的ELK日誌分析套件的一部分。它的前身是logstash-forwarder,用於收集日誌並轉發給後端(logstash、elasticsearch、redis、kafka等等)。filebeat是beats項目中的一種beat,負責收集日誌文件的新增內容。 雖然標題是《filebeat源碼分析》,不過因爲filebeat依賴於公共庫libbeat,本文會花一半的篇幅跟它打交道。libbeat集合了各個beat會用到的內容,包括公共的配置,輸出的管理等等。每一個beat專一於本身的收集工做,而後轉發給libbeat進一步處理和輸出。elasticsearch

beat的構造

每一個 beat 的構建是獨立的。從 filebeat 的入口文件filebeat/main.go能夠看到,它向libbeat傳遞了名字、版本和構造函數來構造自身。跟着走到libbeat/beater/beater.go,咱們能夠看到程序的啓動時的主要工做都是在這裏完成的,包括命令行參數的處理、通用配置項的解析,以及最爲重要的:調用象徵一個beat的生命週期的若干方法。函數

每一個 beat 都實現了Beater接口,其中包含SetupConfigRunCleanupStop五個接口方法。

type Beater interface {
        Config(*Beat) error
        Setup(*Beat) error
        Run(*Beat) error
        Cleanup(*Beat) error
        Stop()
}

當libbeat用傳進來的構造函數構造了一個beater時,它會依Config -> Setup -> Run -> Cleanup的順序調用這幾個方法,並將Stop註冊到信號處理函數中。CleanupStop其實都是作收尾工做的,只是前者是在beat退出時調用,後者是在收到SIGINT和SIGTERM時調用。

Beat是在libbeat的構造入口處定義的結構體,其功能就是存儲構造過程當中的中間產物,好比解析出的配置會存儲在Beat.Config,根據配置中建立的Publisher也會存在Beat.Publisher裏面。

日誌收集邏輯

咱們能夠跳回filebeat/,在beat/filebeat.go看到接口的具體實現。Config方法裏面,filebeat只會讀取跟本身名字相關的配置。Setup方法則基本沒幹什麼。CleanupStop方法天然只是作些收尾工做。重頭戲在於Run方法。

Run方法中,filebeat逐個建立了registrarpublisherspoolercrawler四個組件。收集到的數據在它們間的流動方向,恰好跟建立順序相反。

crawler負責具體的日誌採集工做。它會根據配置文件啓動多個prospector,每一個prospector處理一類日誌文件類型,有着一組獨立的配置。prospector會啓動一個prospectorer幹主要的活。根據默認配置,這個prospectorer會是ProspectorLog類型的。ProspectorLog類型的prospectorer會掃描目標路徑下匹配的文件,根據registry裏存儲的狀態判斷每一個文件。若是以前處理過,調用harvestExisingFile;不然調用harvestNewFile。前者會判斷對應的harvester是否還在運行。這兩個函數都涉及到一個Harvester的建立。隨便一提,registry是registrar建立以後傳遞給crawler的,裏面是文件的處理狀態記錄。

對於ProspectorLog來講,它在建立Harvester時調用的是harvester/log.go中的Harvest方法。該方法首先建立一個LineReader。其次從LogFileReader開始,根據配置,一層層套上LineEncoder、JSONProcessor、Multiline等裝飾器。每一個裝飾器負責本身相關的文本處理。而後,不停地調用readline從該reader中讀取內容,根據讀到的內容填充FileEvent的值,把FileEvent發送給spooler。注意FileEvent並不必定含有具體的日誌內容,它也有可能只包含文件相關的狀態信息。FileEvent也不只僅有具體的日誌內容,它還包含讀取事件以及其餘配置相關的值。當readline時發生錯誤或者EOF,harvester會隨之退出。prospector會每隔scan_frequency(默認10秒)以後重啓對應的prospectorer,而後繼續建立出harvester,繼續收集的工做。本來我覺得filebeat會使用inotify這樣的API,只有在文件發生變更時纔去讀新數據。看來它只是按期去讀取上一次offset以後的數據。

以上三部分的代碼分別位於crawlerprospectorharvester目錄下。三者關係總結以下:

crawler-prospector-harvester

另外,harvester裏面有三個目錄,reader、processor、encoding,顧名思義,主要是處理一些文本層級上的細節。

如今咱們隨着FileEvent(如下簡稱爲事件)來到spooler裏面。一進來,就看到事件們在這裏排起了長龍。spooler接到事件後,不急着發出去,而是排進隊列中。若是隊列(長度取決於spool_size,默認2048)已滿,調用flush方法把事件刷到publisher裏面。此外配置的flush時間idle_timeout(默認5s)到時後,也會調用flush方法。

大熱天的排得很焦急,終於到publisher了。publisher從spooler中接收事件,轉換其類型。@timestamp和其餘跟日誌內容無關的tag就是在這一步打入到發送數據中的,詳情看input/event.go。它接着調用PublishEvents把數據發送出去,並註冊了通知函數和標誌位Guaranteed。正是因爲這個Guaranteed標誌位,filebeat的數據會在發送時一直重試到成功爲此。當數據被髮送時,發送方調用通知函數,publisher知道能夠把這個事件劃入到「已確認」的隊列中。又是一個隊列!事件們領了確認表格,此次它們要排隊進入registrar。

publisher的已確認隊列每秒刷一次,就像遊樂園的柵欄同樣,一打開就有一批新的事件擠進registrar。registrar根據每一個事件的文件狀態更新記錄,並在處理了最後一個事件後以json格式寫入到文件registry中。這個文件的內容以下:

{"/root/fnlek/logs/error.log":{"source":"/root/fnlek/logs/error.log","offset":0,"FileStateOS":{"inode":798434,"device":2049}},"/root/fnlek/test.log":{"source":"/root/fnlek/test.log","offset":211670,"FileStateOS":{"inode":798467,"device":2049}},"/root/fnlek/test1.log":{"source":"/root/fnlek/test1.log","offset":0,"FileStateOS":{"inode":798464,"device":2049}},"/root/fnlek/test2.log":{"source":"/root/fnlek/test2.log","offset":0,"FileStateOS":{"inode":798611,"device":2049}}...

正是靠這個文件作持久化,filebeat啓動時才能繼續前一次的工做(記得討論prospector時提到的registry嗎)。雖然不能保證每條日誌僅被髮送一次,但至少保證每條日誌都有機會被髮送。

filebeat

filebeat的內容到此結束,接下來又轉到libbeat了。

日誌發送邏輯

前情提要:filebeat/publisher把數據經過PublishEvents發送出去。

如今輪到libbeat/publisher幹活了。每一個PublishEvents背後,都有一個默默無聞的Client進行具體的發送工做。這個Client由libbeat/publisher/publish.goConnect函數建立的,讓咱們進入libbeat/publisher/client.go瞧瞧。咱們接着能夠看到,這些事件(已經被打包成消息了)會被同一目錄下的async.gopublish方法所發送。

真正負責發送的類型是outputWorker。不過這些worker會被makeAsyncOutput函數根據flush_intervalmax_bulk_size配置包裝成BulkWorker。而後待發送的消息會被丟到一個chan message隊列裏面去,待flush_interval(默認1s)時間到或者消息數超過maxBulkSize(默認50)才被髮出去。以後會有一次根據消息內部的events長度,對發送的消息數的切割,使得每次發送都不超過max_bulk_size。還記得嗎,因爲spool_size默認值是2048,不考慮超時的狀況下,filebeat的一次PublishEvents至關於發送端的41次發送。

中間省略若干轉發不提……

歷經艱難險阻,最終會調用到outputs/mode/mode.go的PublishEvents方法,若是不涉及load banlancer,會進入同一目錄下的single/single.go。如今轉給實現了mode.ConnectionMode的具體Client去調用PublishEvents方法。「具體」是哪個Client呢?

各輸出插件會在init的時候向outputsPlugins表註冊本身的插件名和對應的初始化函數。libbeat/publisher初始化時會經過InitOutputs讀取outputsPlugins表,具體讀取到那個取決於output配置的值。讀取到的值會用來建立具體的outputWorker,每一個outputWorker會有一個實現了mode.ConnectionMode的Client。

若是當時調用的PublishEvent,走的流程大概也是這樣,不過中間就不須要有隊列了。

讓咱們挑一個輸出插件——elasticsearch看看。代碼位於outputs/elasticsearch,第一眼看上去就跟elasticsearch的客戶端同樣。

elasticsearch/
├── api.go
├── api_integration_test.go
├── api_mock_test.go
├── api_test.go
├── bulkapi.go
├── bulkapi_integration_test.go
├── bulkapi_mock_test.go
├── client.go
├── client_integration_test.go
├── client_test.go
├── config.go
├── enc.go
├── json_read.go
├── output.go
├── output_test.go
├── topology.go
├── url.go
└── url_test.go

elasticsearch插件的PublishEvents方法定義於outputs/elasticsearch/client.go。很簡單,先判斷是否處於已鏈接狀態,若是不是,調用Connect方法。接着拼接數據,以POST /_bulk請求發送。若是配置了gzip壓縮,可能會用GzipEncoder處理下。具體的HTTP/HTTPS發送邏輯由go內置的http包完成。

有趣的是,elasticsearch客戶端實現的Connect方法會發送一個HEAD /的請求。儘管HTTP/HTTPS確實不須要提早創建鏈接,不過用HEAD /來探活意義也不大吧?

發送了請求並不表明成功——還得收到確認無誤的響應才行。libbeat在outputs/mode/single/single.gopublish方法實現中實現了重傳的機制。若是沒有使用Guaranteed標誌位,發送重試的次數取決於max_retries的設置(默認爲3)。另外,若是部分數據發送成功,會將重試計數器重置爲0。舉個例子,若是POST /_bulk請求發送的數據有一部分得到了確認,下一次重試時將不會包含這些數據,並且重試的次數會從零算起。一旦發送成功——抑或重試屢次均告失敗,通知函數會被調用,告知數據處理完畢。轉了一圈,如今咱們又回到filebeat/publisher了。

閱讀代碼,我得出一個有趣的結論,爲了不頻繁輸出對被收集者的影響,filebeat裏面許多地方都用到了隊列來實現批處理,結果致使其輸出更像是脈衝式的:一段時間內可能會進行連續多個輸出。若是你想避免該行爲,能夠調小隊列長度和刷隊列的時間。固然這麼一來,輸出會更加頻繁(但更平滑),整體消耗的資源會更多。

相關文章
相關標籤/搜索