ELK - 優化 index patterns 和 Kibana 中顯示的多餘字段

Demo 跑起來以後,就須要根據具體的負載和日誌進行優化了,本次主要是優化在 Kibana 界面中 [Table] 展開的 Patterns,過多的 Patterns 有幾個負面做用:
1)、干擾查看信息
2)、增大索引佔用空間
3)、下降 es 的寫入性能nginx

ELK各組件版本:(Windows Server, Linux 下其餘大同小異)
1)、Filebeat - 7.3.1
2)、Logstash - 7.3.1
3)、Elasticsearch: 7.3.1
4)、Kibana - 7.3.1web

 

如下是優化前的內容,實際須要的僅僅是6個字段,但此處卻有35個字段之多,主要來源是 filebeat 和 logstash 處理時應用了 es 默認模板產生的大量冗餘字段。
這裏的結果是已經在 logstash 中初步配置了 remove_field,將 "@version"、"message" 、"log_create_time"  移除了,否則整個 Table 展開後更加的臃腫。
@version 和 message 是 es 的動態模板自動建立的,log_create_time 是自定義的字段,用於替換默認模板的 @timestamp json

           

 

Kibana -> Setting -> Index Patternswindows

 

日誌樣例:api

2019-11-21 08:45:39.656 | ^Warn||ThreadID: 35|Audilog not defined in config.
2019-11-21 08:47:39.012 | *Error|SQLServer||ThreadID: 35|Database connection refused.
2019-11-21 09:42:50.156 | Info|Kafka||ThreadID: 35|Reviced Message:
ID:                                  1
Mode:                                Delay
Message Id:                          1
Map:                                 len=45
Binary Version:                      3.4.6

 以 「|」 爲分隔符,將日誌分爲 6 個 field,能夠看到有的只有5段,有的不在一行:
field1 - logtime
field2 - level
field3 - comp // 此字段可能爲空
field4 - blank // 此處都是 null
field5 - threadId
field6 - logbody服務器

 

一、先看一下優化後的配置文件app

Filebeat 的配置文件,若是要收集一臺服務器上的不一樣程序的日誌,能夠新建多個配置文件並配置不一樣的 logstash 端口,再啓動多個進程收集。elasticsearch

若是是輸出到 kafka,則能夠在一份配置裏配置多個 intput 利用 filed 字段在 output 裏送往不一樣的 topic,而沒必要啓動多個進程。
性能

  • filebeat.yml
# log files input filebeat.inputs: - type: enabled: true paths: - F:\payserverlog\*.log multiline.pattern: '[0-9]{4}-[0-9]{2}-[0-9]{2]' // 將不在一行的日誌,拼接到以日期開頭的行後 multiline.negate: true multiline.match: after fields: nginx: payserver scan_frequency: 10s max_bytes: 1048576 tail_files: false backoff: 1s backoff_factor: 2 # output to logstash output.logstash: hosts: ["172.16.0.11:5146"] processors: - drop_fields: fields: ["input_type", "log.offset", "host.name", "input.type", "agent.hostname", "agent.type", "ecs.version", "agent.ephemeral_id", "agent.id", "agent.version", "fields.ics", "log.file.path", "log.flags" ] monitoring: enabled: true
elasticsearch: ["http://172.16.0.11:9200"]
# logging logging.level: info logging.to_file: true logging.files: path: E:\ELK\filebeat-7.3.1-windows-x86_64\logs name: filebeat-5146.log interval: 1h keepfiles: 7 logging.json: false

 

Logstash 的配置文件優化

當單機有多個程序日誌須要收集而且輸出端是 Logstash 時,有幾種方式:
1)、使用 if [filed] = 'xxx' 來區分 grok 和 output,此種方式 Logstash 須要作大量的 if 判斷,官方稱之爲 Conditional Hell (條件地獄),會嚴重下降 grok 效率 - 不推薦
2)、啓動多個 Logstash 實例,須要配置和管理多個 JVM - 不推薦
3)、使用 Logstash 的 Pipeline , 沒必要管理多個 JVM,也沒必要作大量 if 判斷,如下的配置使用的就是該種方式

  •  Config\logstash.yml
pipeline.worker: 10 pipeline.batch.size: 3000 pipeline.batch.delay: 10 http.host: "172.16.0.11" xpack.monitoring.enabled: true xpack.monitoring.elasticsearch.hosts: ["http://172.16.0.11:9200"] log.level: info

 

  • Config\pipeline.yml
- pipeline: main
pipeline.workers: 8
pipeline.batch.size: 3000
pipeline.delay: 200 path.config: E:\\ELK\\logstash
-7.3.1\\pipeline\\5044-server01-payapi.conf - pipeline: server-01-error
pipeline.workers: 8
pipeline.batch.size: 3000
pipeline.delay: 200
path.config: E:\\ELK\\logstash-7.3.1\\pipeline\\5045-server01-payweb.conf - pipeline: server-02-access
pipeline.workers: 8
pipeline.batch.size: 3000
  pipeline.delay: 200
path.config: E:\\ELK\\logstash-7.3.1\\pipeline\\5046-server02-payapi.conf - pipeline: server-02-error
pipeline.workers: 8
pipeline.batch.size: 3000
pipeline.delay: 200
path.config: E:\\ELK\\logstash
-7.3.1\\pipeline\\5047-server02-payweb.conf

 

  • Pipeline\5044-server01-payapi.conf
input {   beats {   port => 5044 // 當配置了Pipeline時,不一樣的 pipeline 配置文件,此端口不可衝突 client_inactivity_timeout => 600 } } filter { grok {   match => {   "message" => "%{DATA:logtime}\|%{DATA:level}\|%{DATA:comp}\|%{DATA:blank}\|%{DATA:threadId}\|%{GREEDYDATA:logdoby}" } } if "_grokparsefailure" in [tags] { // 某些行按 "|" 分割只有5段,按前面的 grok 會解析失敗,並生成一個值爲 "_grokparsefailure" 的 tag,此處從新解析失敗的行   grok {   match => {   "message" => "%{DATA:logtime}\|%{DATA:level}\|%{DATA:blank}\|%{DATA:threadId}\|%{GREEDYDATA:logdoby}" } } }

grok {
  match => {
"message" => "%{TIMESTAMP_ISO8601:log_create_time}" // 將日誌的時間按照 TIMESTAMP_ISO8601 解析給臨時變量 log_create_time
}
}

date {
match => ["log_create_time", "yyyy-MM-dd HH:mm:ss.SSS"] // 按時間格式匹配一下
target => "@timestamp" // 將 log_create_time 寫入 @timestamp
}
mutate {
remove_field => "@version"
remove_field => "message"
remove_field => "log_create_time"
remove-field => "tags"
gsub => ["level", "\s", ""] // 移除字段中的空格
gsub => ["comp", "\s", ""]  
}
}

output {
  elasticsearch {
    hosts => ["http://172.16.0.11:9200"]
    index => "payapi-server01-%{+yyyy.MM.dd}"
    manage_template => false // 取消 logstash 自動管理模板功能
    template_name => template_payapi // 映射自定義模板的名字,自定義模板的建立在下方
  }
}

其餘的幾個配置文件相似,此處注意幾點:

1)、@timestamp 默認是 logstash 處理日誌時的時間,當日志的生成時間和 logstash 的處理時間較爲接近時問題不大;但假如你要索引幾個月前的文檔或者日誌,此時這個時間差幾乎就不能接受;因此此處新建了一個臨時變量 log_create_time,再使用 date 插件,將其寫入 @timestamp,最後@timestamp 就等於日誌的生成時間了,在這個例子裏,應該明白 @timestamp、logtime、log_create_time 三者是相同的。

2)、沒有使用 logstash 默認的索引模板,使用的是自定義的索引模板,在 Kibana 的 Console 中新建模板:

PUT /_template/template_payapi {
  "index_patherns" : "[payapi-*]", // 以 payapi- 開頭的索引均會應用此模板
  "order" : 99, // 模板的優先級
  "settings" : {
    "number_of_shards" : 1, // 索引使用的分片數量
    "number_of_replicas" : 0, // 索引的副本數,當你須要導入大量數據,第一次創建索引時,能夠設置爲0,提升寫入速度,導入完成後能夠動態修改
    "refresh_interval" : "15s" // 向臨時空間刷寫的頻率,相似於硬盤的 fsync
  },
  "mapping" : {
    "dynamic" : "false", // 看下方解釋
    "properties" : {
      "@timestamp" : {
        "type" : "text"
      },
      "logtime" : {
        "type" : "text", // 這裏注意,不是全部時間都是 date
        "index" : "false" // true:字段可用於搜索, false: 不能用於搜索
      },
      "level" : {
        "type" : "text",
        "index" : "true"
      },

      "comp" : {
        "type" : "text",
        "index" : "true"
      },

      "blank" : {
        "type" : "text",
        "index" : "false"
      },

      "threadId" : {
        "type" : "text",
        "index" : "true"
      },

      "logbody" : {
        "type" : "text",
        "index" : "ture"
      }

    }
  }
}

 

dynamic 值
說明
true 在索引一個文檔時,若是文檔中有 field 不在 mapping 中,會自動映射類型,添加到 mapping,並索引該字段;
false

在索引一個文檔時,若是文檔中有 field 不在 mapping 中,不會添加到 mapping,也不會索引該字段,

可是會保存下來,在 _source 能夠看到該字段,但該字段不能被搜索;

strict 在索引一個文檔時,若是文檔中有 field 不在 mapping 中,logstash 會直接拋出異常,拒絕索引;




 

 

 

 

3)、查看模板,在 Kibana 的 Console 中執行

GET /_template/template_payapi

 

二、最後看下優化以後的效果,僅保留須要的字段

   

 

Kibana -> Setting -> Index Patterns

 僅保留不能被刪除的保留字段和本身須要的字段,同時 stroage 佔用和寫入速度也更加的 stable。

 

參考連接:
https://blog.csdn.net/shumoyin/article/details/84137178
https://www.jianshu.com/p/dc73ec69c9f7

相關文章
相關標籤/搜索