B站視頻地址:Logstash如何成爲鎮得住場面的數據管道nginx
公衆號視頻地址:Logstash如何成爲鎮得住場面的數據管道正則表達式
知乎視頻地址:Logstash如何成爲鎮得住場面的數據管道json
首先咱們延續上一期視頻中日誌採集架構的案例,Filebeat採集日誌並推送Kafka消息隊列進行分發,再由Logstash消費日誌消息,並將日誌數據最終落地在Elasticsearch集羣索引當中,Kafka做爲消息隊列分發服務須要將收集到的日誌消息繼續分發下去,最終數據落地在Elasticsearch集羣索引當中。ruby
那麼鏈接整個過程的主角Logstash是如何工做的,就是咱們今天講解的重點。架構
Logstash工做過程分爲三個部分:Input輸入、Filter過濾、Output輸出,它們共同協做造成了完整的Logstash數據管道傳輸機制elasticsearch
咱們先從一個最簡單的例子演示開始,看看Logstash是怎麼輸入和輸出的,這一次先跳過filter過濾環節。分佈式
下面查看已經預置好的一個配置文件01-kafka-elastic-nginx.confide
首先是input輸入配置點,從Kafka訂閱消息,Kafka集羣地址與filebeat中都指向了一個地址,其餘配置咱們先略過,後續Kafka專題再說大數據
下來看到要訂閱的Topic主題TestT3,咱們先不用json格式解碼消息,默認就是純文本的方式spa
同樣的,這一步先略過過濾環節,直接看看output輸出配置點,目標是給Elasticsearch輸出數據,並指定了elasticsearch集羣的三個節點
輸出環節建立須要寫入的elasticsearch日誌索引,咱們先按照默認的filebeat採集時間,進行日期格式化,按照每一個小時創建一個索引,這塊會有時間問題,一下子再說。
讓數據輸出到終端,方便咱們調試結果。
經過演示中最簡單的配置方式,這時候的Logstash已經成爲鏈接Kafka和Elastisearch之間的數據管道了!
好,接下來咱們將全部系統運行起來,並生成一條nginx請求日誌,看看管道各個階段的數據變化。
首先nginx日誌數據被filebeat採集,是一條典型的無結構的文本日誌數據,你們注意紅色標註的時間是2021年2月21日13時
接着這條日誌數據經過Kafka進入到了Logstash管道的輸入階段,
Logstash爲這條日誌生成了更爲很是龐大的Json數據,裏面包括了全部被採集主機的信息,以及nginx日誌,實際上這些原始信息並無被良好的進行數據清洗與結構化
最後數據被寫入到Elastisearch一個按小時劃分的索引當中,對應時間爲2021年2月21日5時
咱們發現Logstash對原始數據在沒有任何處理的狀況下,會很不方便未來數據的使用;
此次咱們利用Logstash json解碼器讓管道從新再來一次,
接下來咱們進入Logstash中對應的配置文件,並找到input輸入點的codec配置,刪掉註釋,打開Logstash對輸入數據的json解碼方式·。
咱們看看再次進入管道中的日誌數據,Logstash首先對原始日誌數據進行Json解析
這時候咱們再看Json解析後的數據,是否是就清晰多了,filebeat採集到的本地機器數據、以及紅色框中Nginx HTTP日誌數據、以及其餘標籤數據都進行了字段分離
作到這一步其實仍是不夠好,爲何呢?一方面由於咱們依然但願將Nginx HTTP的日誌數據也進行結構化處理,
另外一個方面,Filebeat傳遞給Logstash的系統時間是慢了8個小時的UTC時間標準,反而Nginx日誌中的時間是咱們本地的北京時間標準,所以咱們但願用Nginx日誌時間做爲建立Elasticsearch日誌索引的惟一依據
這時候咱們就要使用Logstash的過濾機制了,咱們繼續進入Logstash對應的配置中,刪掉過濾配置中的註釋,讓Logstash過濾最經常使用插件grok、date、ruby、mutate起做用
grok插件是專業處理非結構化數據的能手,經過自定義的Nginx日誌正則表達式,就能實現Nginx日誌的結構化解析
date插件用於處理時間問題,咱們經過date插件將nginx日誌中的時間轉換成Logstash時間對象,並賦給一個新的臨時時間字段indextime
ruby就是在過濾過程當中能夠插入ruby腳本語言來進行程序級處理,咱們經過ruby語言對indextime時間格式化,生成一個精確到小時的字符串字段index.date,用於elasticsearch索引名稱
mutate是最經常使用的能夠對管道中數據字段進行操做的插件了,咱們的目的是刪除臨時時間字段indextime
最後咱們還須要將output輸出中的索引生成方式修改一下,註釋掉原來用filebeat生默認時間生成的索引,改爲nginx日誌時間生成的索引。
咱們從新運行Logstash,數據通過了Input解碼、日誌grok結構化處理、本地時間對象建立,並進行日期格式化,爲了生成新的Elasticsearch索引字段,並對臨時字段進行刪除,最終通過Output輸出階段,建立Elasticsearch索引或寫入日誌數據
讓咱們看看Elasticsearch最終保存的數據效果,index索引對應的時間來自過濾器建立的index.date字段,index.date字段又來自nginx日誌中分離出的本地時間。這樣咱們就不用再去修改Logstash的系統時間了
咱們看到菱形標註的字段數據就是由過濾器對nginx http日誌進行結構化抽取的結果,
一樣elasticsearch依然保存着nginx日誌的原始數據以備不時之需
前往讀字節的知乎——瞭解更多關於大數據的知識公衆號「讀字節」 分佈式,大數據,軟件架構的深度,專業解讀