Demo 跑起來以後,就須要根據具體的負載和日誌進行優化了,本次主要是優化在 Kibana 界面中 [Table] 展開的 Patterns,過多的 Patterns 有幾個負面做用:
1)、干擾查看信息
2)、增大索引佔用空間
3)、下降 es 的寫入性能nginx
ELK各組件版本:(Windows Server, Linux 下其餘大同小異)
1)、Filebeat - 7.3.1
2)、Logstash - 7.3.1
3)、Elasticsearch: 7.3.1
4)、Kibana - 7.3.1web
如下是優化前的內容,實際須要的僅僅是6個字段,但此處卻有35個字段之多,主要來源是 filebeat 和 logstash 處理時應用了 es 默認模板產生的大量冗餘字段。
這裏的結果是已經在 logstash 中初步配置了 remove_field,將 "@version"、"message" 、"log_create_time" 移除了,否則整個 Table 展開後更加的臃腫。
@version 和 message 是 es 的動態模板自動建立的,log_create_time 是自定義的字段,用於替換默認模板的 @timestamp json
Kibana -> Setting -> Index Patternswindows
日誌樣例:api
2019-11-21 08:47:39.012 | *Error|SQLServer||ThreadID: 35|Database connection refused.
2019-11-21 09:42:50.156 | Info|Kafka||ThreadID: 35|Reviced Message:
ID: 1
Mode: Delay
Message Id: 1
Map: len=45
Binary Version: 3.4.6
以 「|」 爲分隔符,將日誌分爲 6 個 field,能夠看到有的只有5段,有的不在一行:
field1 - logtime
field2 - level
field3 - comp // 此字段可能爲空
field4 - blank // 此處都是 null
field5 - threadId
field6 - logbody服務器
一、先看一下優化後的配置文件app
Filebeat 的配置文件,若是要收集一臺服務器上的不一樣程序的日誌,能夠新建多個配置文件並配置不一樣的 logstash 端口,再啓動多個進程收集。elasticsearch
若是是輸出到 kafka,則能夠在一份配置裏配置多個 intput 利用 filed 字段在 output 裏送往不一樣的 topic,而沒必要啓動多個進程。
性能
- filebeat.yml
# log files input filebeat.inputs: - type: enabled: true paths: - F:\payserverlog\*.log multiline.pattern: '[0-9]{4}-[0-9]{2}-[0-9]{2]' // 將不在一行的日誌,拼接到以日期開頭的行後 multiline.negate: true multiline.match: after fields: nginx: payserver scan_frequency: 10s max_bytes: 1048576 tail_files: false backoff: 1s backoff_factor: 2 # output to logstash output.logstash: hosts: ["172.16.0.11:5146"] processors: - drop_fields: fields: ["input_type", "log.offset", "host.name", "input.type", "agent.hostname", "agent.type", "ecs.version", "agent.ephemeral_id", "agent.id", "agent.version", "fields.ics", "log.file.path", "log.flags" ] monitoring: enabled: true
elasticsearch: ["http://172.16.0.11:9200"]
# logging logging.level: info logging.to_file: true logging.files: path: E:\ELK\filebeat-7.3.1-windows-x86_64\logs name: filebeat-5146.log interval: 1h keepfiles: 7 logging.json: false
Logstash 的配置文件優化
當單機有多個程序日誌須要收集而且輸出端是 Logstash 時,有幾種方式:
1)、使用 if [filed] = 'xxx' 來區分 grok 和 output,此種方式 Logstash 須要作大量的 if 判斷,官方稱之爲 Conditional Hell (條件地獄),會嚴重下降 grok 效率 - 不推薦
2)、啓動多個 Logstash 實例,須要配置和管理多個 JVM - 不推薦
3)、使用 Logstash 的 Pipeline , 沒必要管理多個 JVM,也沒必要作大量 if 判斷,如下的配置使用的就是該種方式
- Config\logstash.yml
pipeline.worker: 10 pipeline.batch.size: 3000 pipeline.batch.delay: 10 http.host: "172.16.0.11" xpack.monitoring.enabled: true xpack.monitoring.elasticsearch.hosts: ["http://172.16.0.11:9200"] log.level: info
- Config\pipeline.yml
- pipeline: main
pipeline.workers: 8
pipeline.batch.size: 3000
pipeline.delay: 200 path.config: E:\\ELK\\logstash-7.3.1\\pipeline\\5044-server01-payapi.conf - pipeline: server-01-error
pipeline.workers: 8
pipeline.batch.size: 3000
pipeline.delay: 200 path.config: E:\\ELK\\logstash-7.3.1\\pipeline\\5045-server01-payweb.conf - pipeline: server-02-access
pipeline.workers: 8
pipeline.batch.size: 3000
pipeline.delay: 200 path.config: E:\\ELK\\logstash-7.3.1\\pipeline\\5046-server02-payapi.conf - pipeline: server-02-error
pipeline.workers: 8
pipeline.batch.size: 3000
pipeline.delay: 200
path.config: E:\\ELK\\logstash-7.3.1\\pipeline\\5047-server02-payweb.conf
- Pipeline\5044-server01-payapi.conf
input { beats { port => 5044 // 當配置了Pipeline時,不一樣的 pipeline 配置文件,此端口不可衝突 client_inactivity_timeout => 600 } } filter { grok { match => { "message" => "%{DATA:logtime}\|%{DATA:level}\|%{DATA:comp}\|%{DATA:blank}\|%{DATA:threadId}\|%{GREEDYDATA:logdoby}" } } if "_grokparsefailure" in [tags] { // 某些行按 "|" 分割只有5段,按前面的 grok 會解析失敗,並生成一個值爲 "_grokparsefailure" 的 tag,此處從新解析失敗的行 grok { match => { "message" => "%{DATA:logtime}\|%{DATA:level}\|%{DATA:blank}\|%{DATA:threadId}\|%{GREEDYDATA:logdoby}" } } }
grok {
match => {
"message" => "%{TIMESTAMP_ISO8601:log_create_time}" // 將日誌的時間按照 TIMESTAMP_ISO8601 解析給臨時變量 log_create_time
}
}
date {
match => ["log_create_time", "yyyy-MM-dd HH:mm:ss.SSS"] // 按時間格式匹配一下
target => "@timestamp" // 將 log_create_time 寫入 @timestamp
}
mutate {
remove_field => "@version"
remove_field => "message"
remove_field => "log_create_time"
remove-field => "tags"
gsub => ["level", "\s", ""] // 移除字段中的空格
gsub => ["comp", "\s", ""]
}
}
output {
elasticsearch {
hosts => ["http://172.16.0.11:9200"]
index => "payapi-server01-%{+yyyy.MM.dd}"
manage_template => false // 取消 logstash 自動管理模板功能
template_name => template_payapi // 映射自定義模板的名字,自定義模板的建立在下方
}
}
其餘的幾個配置文件相似,此處注意幾點:
1)、@timestamp 默認是 logstash 處理日誌時的時間,當日志的生成時間和 logstash 的處理時間較爲接近時問題不大;但假如你要索引幾個月前的文檔或者日誌,此時這個時間差幾乎就不能接受;因此此處新建了一個臨時變量 log_create_time,再使用 date 插件,將其寫入 @timestamp,最後@timestamp 就等於日誌的生成時間了,在這個例子裏,應該明白 @timestamp、logtime、log_create_time 三者是相同的。
2)、沒有使用 logstash 默認的索引模板,使用的是自定義的索引模板,在 Kibana 的 Console 中新建模板:
PUT /_template/template_payapi {
"index_patherns" : "[payapi-*]", // 以 payapi- 開頭的索引均會應用此模板
"order" : 99, // 模板的優先級
"settings" : {
"number_of_shards" : 1, // 索引使用的分片數量
"number_of_replicas" : 0, // 索引的副本數,當你須要導入大量數據,第一次創建索引時,能夠設置爲0,提升寫入速度,導入完成後能夠動態修改
"refresh_interval" : "15s" // 向臨時空間刷寫的頻率,相似於硬盤的 fsync
},
"mapping" : {
"dynamic" : "false", // 看下方解釋
"properties" : {
"@timestamp" : {
"type" : "text"
},
"logtime" : {
"type" : "text", // 這裏注意,不是全部時間都是 date
"index" : "false" // true:字段可用於搜索, false: 不能用於搜索
},
"level" : {
"type" : "text",
"index" : "true"
},
"comp" : {
"type" : "text",
"index" : "true"
},
"blank" : {
"type" : "text",
"index" : "false"
},
"threadId" : {
"type" : "text",
"index" : "true"
},
"logbody" : {
"type" : "text",
"index" : "ture"
}
}
}
}
dynamic 值 |
說明 |
true | 在索引一個文檔時,若是文檔中有 field 不在 mapping 中,會自動映射類型,添加到 mapping,並索引該字段; |
false | 在索引一個文檔時,若是文檔中有 field 不在 mapping 中,不會添加到 mapping,也不會索引該字段, 可是會保存下來,在 _source 能夠看到該字段,但該字段不能被搜索; |
strict | 在索引一個文檔時,若是文檔中有 field 不在 mapping 中,logstash 會直接拋出異常,拒絕索引; |
3)、查看模板,在 Kibana 的 Console 中執行
GET /_template/template_payapi
二、最後看下優化以後的效果,僅保留須要的字段
Kibana -> Setting -> Index Patterns
僅保留不能被刪除的保留字段和本身須要的字段,同時 stroage 佔用和寫入速度也更加的 stable。
參考連接:
https://blog.csdn.net/shumoyin/article/details/84137178
https://www.jianshu.com/p/dc73ec69c9f7