在上一篇中咱們介紹了Logstash快速入門,本文主要介紹的是ELK日誌系統中的Logstash的實戰使用。實戰使用我打算從如下的幾個場景來進行講解。html
在咱們使用logstash將採集的數據傳輸到ES中的時候,會發現採集的時間@timestamp
的時間和咱們本地的不一致,這個主要是由於時區的問題致使的,咱們在計算時間的時候須要將這個時間增長8小時,可是這樣會很不方便。爲了永久解決這個問題,咱們能夠在logstash中的filter中對該字段進行轉換,增長8小時。java
添加的配置以下:json
ruby { code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)" } ruby { code => "event.set('@timestamp',event.get('timestamp'))" } mutate { remove_field => ["timestamp"] }
本來示例圖:
ruby
添加配置以後的示例圖:
能夠看到添加配置以後@timestamp
時間已經和本地時間基本一致了。app
咱們在進行採集日誌到ES中的時候,有時須要對日誌內容進行切割。好比獲得日誌內容的時間以及日誌級別等等。這時咱們就能夠經過grok來對日誌內容進行切分,好比將制定好的日誌內容切割爲日誌時間、線程名稱、日誌級別、類名以及詳細內容等等。咱們只須要在logstash的filter中使用grok語法便可完成日誌內容切割。
這裏咱們使用JAVA的Logback來制定日誌輸出格式,而後經過日誌的格式編寫grok語法,最後將grok配置添加到logstash的filter中。elasticsearch
Logback輸出配置:測試
|%d{yyyy-MM-dd HH:mm:ss.SSS}|[%thread]|%-5level|%logger{50}|-%msg%n.net
日誌樣例數據:插件
|2020-07-24 17:08:33.159|[Thread-5]|INFO|com.pancm.Application|-測試示例三: All things in their being are good for something. 天生我才必有用3線程
grok模式:
|%{DATA:log_time}|%{DATA:thread}|%{DATA:log_level}|%{DATA:class_name}|-%{GREEDYDATA:content}
使用grok分析
能夠看到以及分析匹配成功了。
而後咱們在filter中添加以下配置:
grok {
match => { "message" =>"|%{DATA:log_time}|%{DATA:thread}|%{DATA:log_level}|%{DATA:class_name}|-%{GREEDYDATA:content}"
}
}
最終輸出的日誌到ES的示例圖:
咱們在使用Logstash採集日誌的時候,若是沒有指定索引庫或模板,則會使用ElasticSearch默認自帶的名字爲」logstash」的模板,默認應用於Logstash寫入數據到ElasticSearch使用。可是咱們但願使用自定義的索引模板,將採集的日誌按照咱們自身的想法來寫入,此時咱們就須要用到自定義模板了。
主要有兩種方式,一種是在logstash的output插件中使用template指定本機器上的一個模板json路徑, 例如 template => "/home/logstash.json"
,json裏面的內容爲咱們自定的索引mapping,雖然這種方式簡單,可是分散在Logstash機器上,維護起來比較麻煩。還有一種是在elasticsearc服務端自定義配置模板,事先將模板設置好,而後在logstash的output輸出中指定該模板便可,這種方式比較靈活方便,可動態更改,全局生效。
這裏咱們仍是經過一個示例來進行說明,咱們首先建立一個template_mylog的模板,配置這幾個字段:
log_time、thread、log_level、class_name、content。
語句以下:
PUT _template/template_mylog { "index_patterns" : [ "mylog-*" ], "order" : 10, "settings": { "index.number_of_shards": 3, "number_of_replicas": 1 }, "mappings" : { "properties" : { "log_level" : { "type" : "keyword" }, "thread" : { "type" : "keyword" }, "class_name" : { "type" : "keyword" }, "content" : { "type" : "keyword" }, "log_time" : { "type" : "date","format" : "yyyy-MM-dd HH:mm:ss.SSS"} } } }
示例圖:
注:上述的配置比其餘mapping而言多了兩個新配置,一個是index_patterns,該配置代表自動建立的索引開頭以mylog-
的索引庫都會採用該模板;而order表示順序級別,在有相同的索引模板中,該值越大,優先級越高。
建立成功以後,咱們只需在output中的添加以下配置便可。
elasticsearch { hosts => ["127.0.0.1:9200"] index => "mylog-%{+YYYY.MM.dd}" }
而後咱們啓動logstash進行日誌的採集。
效果圖:
咱們在使用logstash採集日誌的時候,有時有多種不一樣的日誌而且須要採集到不一樣的索引庫中,這時咱們就能夠經過標記來進行寫入。好比採集/home/logs目錄下的日誌我定義一個標記爲java,採集/home/logs2目錄下的日誌我定義一個標記爲java2,那麼在寫入ElasticSearch的時候只須要根據該標記區分寫入便可。
logstash input配置示例:
file { path => ["/home/logs/mylog-2020-08-13.0.txt"] type => "java" start_position => "beginning" sincedb_path => "/dev/null" } file { path => ["/home/logs2/*.txt"] type => "java2" start_position => "beginning" sincedb_path => "/dev/null" }
logstash output配置示例:
if [type] == "java"{ elasticsearch { hosts => ["127.0.0.1:9200"] index => "mylog-%{+YYYY.MM.dd}" } } if [type] == "java2"{ elasticsearch { hosts => ["127.0.0.1:9200"] index => "mylog-%{+YYYY.MM}" } }
示例圖在多行內容合併場景中。
咱們在採集日誌的時候,常常會遇到異常日誌,而且異常日誌並不是爲一行內容,若是咱們按照原有的方式採集,在ElasticSearch中顯示的是一行一行的內容,這樣的話咱們排查問題會很頭疼。幸虧Logstash中支持多行日誌合併,使用multiline.pattern、multiline.negate和multiline.what來實現配置實現。
下面的配置中,咱們經過制定匹配規則將以空格開頭的全部行合併到上一行,並把以Caused by開頭的也追加到上一行。
在Logstash的input配置中添加以下配置:
codec => multiline { pattern => "\s*\[" negate => "true" what => "previous" }
異常日誌:
原異常日誌在ElasticSearch中示例圖:
多行合併以後的效果圖:
logstash-test.conf 配置
input{ file { path => ["/home/logs/mylog-2020-08-13.0.txt"] type => "java" start_position => "beginning" sincedb_path => "/dev/null" } file { path => ["/home/logs2/*.txt"] type => "java2" codec => multiline { pattern => "\s*\[" negate => "true" what => "previous" } start_position => "beginning" sincedb_path => "/dev/null" } } filter { grok { match => { "message" =>"\|%{DATA:log_time}\|%{DATA:thread}\|%{DATA:log_level}\|%{DATA:class_name}\|-%{GREEDYDATA:content}" } } ruby { code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)" } ruby { code => "event.set('@timestamp',event.get('timestamp'))" } mutate { remove_field => ["timestamp"] } } output { stdout { codec => rubydebug } if [type] == "java"{ elasticsearch { hosts => ["127.0.0.1:9200"] index => "mylog-%{+YYYY.MM.dd}" } } if [type] == "java2"{ elasticsearch { hosts => ["127.0.0.1:9200"] index => "mylog-%{+YYYY.MM}" } } }
1.logstash: Could not execute action: PipelineAction::Create
解決辦法: 斜杆採用「/」
2, logstash: object mapping for [host] tried to parse field [host] as object, but found a concrete value
解決辦法: 在filter裏面添加以下配置:
mutate { rename => { "host" => "host.name" } }
原創不易,若是感受不錯,但願給個推薦!您的支持是我寫做的最大動力!
版權聲明:
做者:虛無境
博客園出處:http://www.cnblogs.com/xuwujing
CSDN出處:http://blog.csdn.net/qazwsxpcm
我的博客出處:http://www.panchengming.com