劉偉 360雲計算 node
女主宣言golang
最近由於雲原生日誌收集的須要,咱們打算使用Filebeat做爲容器日誌收集工具,並對其進行二次開發,所以筆者將談談 Filebeat 收集日誌的那些事兒。本文不涉及過具體的源碼分析,但願經過閱讀您能夠了解filebeat的基本使用方法和原理,姑且算是filebeat的入門吧。json
PS:豐富的一線技術、多元化的表現形式,盡在「360雲計算」,點關注哦!網絡
1多線程
前言併發
開源日誌收集組件衆多,之因此選擇Filebeat,主要基於如下幾點:app
功能上能知足咱們的需求:收集磁盤日誌文件,發送到Kafka集羣;支持多行收集和自定義字段等;jvm
性能上相比運行於jvm上的logstash和flume優點明顯;ide
Filebeat基於golang 技術棧,二次開發對於咱們來講有必定的技術積累;工具
部署方便,沒有第三方依賴;
2
Filebeat 能作什麼
簡單來講Filebeat就是數據的搬運工,只不過除了搬運還能夠對數據做一些深加工,爲業務增長一些附加值。
Filebeat能夠從多種不一樣的上游input 中接受須要收集的數據,其中咱們最經常使用的就是 log input,即從日誌中收集數據;
Filebeat對收集來的數據進行加工,好比:多行合併,增長業務自定義字段,json等格式的encode;
Filebeat將加工好的數據發送到被稱爲output的下游,其中咱們最經常使用的就是 Elasticsearch 和 Kafka;
Filebeat具備ACK反饋確認機制,即成功發送到output後,會將當前進度反饋給input, 這樣在進程重啓後能夠斷點續傳;
Filebeat在發送output失敗後,會啓動retry機制,和上一次ACK反饋確認機制一塊兒,保證了每次消息至少發送一次的語義;
Filebeat在發送output時,因爲網絡等緣由發生阻塞,則在input上游端會減慢收集,自適應匹配下游output的狀態。
一圖以蔽之。
3
Filebeat 背後的「老大」
說到Filebeat,它其實只是 beats 家族衆多成員中的一個。除了Filebeat, 還有不少其餘的beat小夥伴:
beat |
功能 |
Filebeat | 收集日誌文件 |
Metricbeat | 收集各類指標數據 |
Packetbeat | 收集網絡數據包 |
Auditbeat | 收集審計數據 |
Heartbeat | 收集服務運行狀態監測數據 |
... |
... |
若是你願意的話,你也能夠按照beat的規範來寫本身的beat。
能實現以上這些beat,都離不開beats家族真正的「老大」—— libbeat, 它是beat體系的核心庫。咱們接下來看一下libbeat到底都作了些什麼:
libbeat提供了publisher組件,用於對接input;
收集到的數據在進入到libbeat後,首先會通過各類 processor的加工處理,好比過濾添加字段,多行合併等等;
input組件經過publisher組件將收集到的數據推送到publisher內部的隊列;
libbeat自己實現了前面介紹過的多種output, 所以它負責將處理好的數據經過output組件發送出去;
libbeat自己封裝了retry的邏輯;
libbeat負責將ACK反饋經過到input組件 ;
因而可知,大部分活兒都是libbeat來作,當「老大」不容易啊~。
input僅須要作兩件事:
從不一樣的介質中收集數據後投遞給libbeat;
接收libbeat反饋回來的ACK, 做相應的持久化;
4
Filebeat 的簡單使用示例
Filebeat自己的使用很簡單,咱們只須要按需寫好相應的input和output配置就行了。下面咱們以一個收集磁盤日誌文件到Kafka集羣的例子來說一下。
1. 配置inputs.d目錄
在filebeat.yml添加以下配置,這樣咱們能夠將每一種等收集的路徑寫在單獨的配置文件裏,而後將這些配置文件統一放到inputs.d目錄,方便管理
filebeat.config.inputs:enabled: truepath: inputs.d/*.yml
2. 在inputs.d目錄下建立test1.yml,內容以下
- type: log # Change to true to enable t enabled: true # Paths that should be crawl paths: - /home/lw/test/filebeat/*.log fields: log_topic: lw_filebeat_t_2
這個配置說明會收集全部匹配/home/lw/test/filebeat/*.log的文件內容,而且咱們添加了一個自定義的filed: log_topic: lw_filebeat_t_2, 這個咱們後面會講到。
3. 在filebeat.yml中配置kafka output:
output.kafka: hosts: ["xxx.xxx.xxx.xxx:9092", "xxx.xxx.xxx.xxx:9092", "xxx.xxx.xxx.xxx:9092"] version: 0.9.0.1 topic: '%{[fields.log_topic]}' partition.round_robin: reachable_only: true compression: none required_acks: 1 max_message_bytes: 1000000 codec.format: string: '%{[host.name]}-%{[message]}'
其中:
hosts是kafka集羣的broker list;
topic: '%{[fields.log_topic]}' : 這項指定了咱們要寫入kafka集羣哪一個topic, 能夠看到它實現上是引用了上面test.yml配置中咱們自定義的filed字段,經過這種方式咱們就能夠將收集的不一樣路徑的數據寫入到不一樣的topic中,可是這個有個限制就是隻能寫到一個kafka集羣,由於當前版本的filebeat不容許同時配置多個output。
codec.format: 指定了寫入kafka集羣的消息格式,咱們在從日誌文件中讀取的每行內容前面加上了當前機器的hostname。
啓動就很簡單了,filebeat和filebeat.yml, inputs.d都在同一目錄下,而後 ./filebeat run就行了。
filebeat自己有不少全局的配置,每種input和output又有不少各自的配置,關乎日誌收集的內存使用,是否是會丟失日誌等方方面面,你們在使用時還須要仔細閱讀,這裏不贅述。
5
Log input 是如何從日誌文件中收集日誌的
input的建立:
根據配置文件內容建立相應的Processors, 用於前面提到的對從文件中讀取到的內容的加工處理;
建立Acker, 用於持久化libbeat反饋回來的收集發送進度;
使用libbeat提供的Pipeline.queue.Producer
建立producer
,用於將處理好的文件內容投遞到libbeat的內部隊列;
收集文件內容:
input會根據配置文件中的收集路徑(正則匹配)來輪詢是否有新文件產生,文件是否已通過期,文件是否被刪除或移動;
針對每個文件建立一個Harvester來逐行讀取文件內容;
將文件內容封裝後經過producer發送到libbeat的內部隊列;
處理文件重命名,刪除,截斷:
獲取文件信息時會獲取文件的device id + indoe做爲文件的惟一標識;
前面咱們提過文件收集進度會被持久化,這樣當建立Harvester時,首先會對文件做openFile, 以 device id + inode爲key在持久化文件中查看當前文件是否被收集過,收集到了什麼位置,而後斷點續傳;
在讀取過程當中,若是文件被截斷,認爲文件已經被同名覆蓋,將從頭開始讀取文件;
若是文件被刪除,由於原文件已被打開,不影響繼續收集,但若是設置了CloseRemoved, 則不會再繼續收集;
若是文件被重命名,由於原文件已被打開,不影響繼續收集,但若是設置了CloseRenamed , 則不會再繼續收集;
6
日誌如何被髮送
發送流程簡述:
input將日誌內容寫入libbeat的內部隊列後,剩下的事件就都交由libbeat來作了;
libbeat會建立consumer, 復現做libbeat的隊列裏消費日誌event, 封裝成Batch對象;
針對每一個Batch對象,還會建立ack Channel, 用來將ACK反饋信息寫入這個channel;
Batch對象會被源源不斷地寫入一個叫workQueue的channel中;
以kafka output爲例,在創kafka output時首先會建立一個outputs.Group,它內部封裝了一組kafka client, 同時啓動一組goroutine;
上面建立的每一個goroutine都從workQueue隊列裏讀取Batch對象,而後經過kafka client發送出去,這裏至關於多線程併發讀隊列後發送;
若kafka client發送成功,寫入信息到ack channel, 最終會經過到input中;
若kafka client發送失敗,啓動重試機制;
重試機制:
以kafka output爲例,若是msg發送失敗,經過讀取 ch <-chan *sarama.ProducerError能夠獲取到全部發送失敗的msg;
針對ErrInvalidMessage, ErrMessageSizeTooLarge 和 ErrInvalidMessageSize這三種錯誤,無需重發;
被髮送的 event都會封裝成 Batch, 這裏重發的時候也是調用Batch.RetryEevnts;
最後會調用到retryer.retry將須要從新的events再次寫入到上圖中黃色所示的 workQueue中,從新進入發送流程;
關於重發次數,能夠設置max retries, 但從代碼中看這個max retries不起做用,目前會一直重試,只不過在重發次數減小到爲0時,會挑選出設置了Guaranteed屬性的event來發送;
若是重發的events數量過多,會暫時阻塞住從正常發送流程向workQueue中寫入數據,優先發送須要重發的數據;
7
後記
在本文裏,咱們沒有深刻到源碼層次,爲了講清filebeat運做的原理,咱們也忽略了一些實現細節,後續將會從源碼層面做進一步剖析。