1、logstash淺析
nginx
logstash就是一個過濾器,收集器。他是一個重量級的日誌收集工具。使用logstash會消耗大量的服務器資源,因此一般需求高一點的配置。redis
過濾器:它能夠統一過濾來自不一樣源的數據,並按照開發者的制定的規範輸出到目的地。數據庫
收集器:它能夠採集指定地址的日誌,並傳輸到elasticsearch、redis等目的地。安全
2、logstash的處理展現:ruby
3、插件服務器
input(輸入插件):日誌來源。網絡
a:讀取指定地址的日誌app
input { elasticsearch
file {tcp
path => "/var/log/messages"
type => "system"
start_position => "beginning"
}
}
b:接收filebeat的日誌
input {
beats {
port => 5044
}
}
c:讀取redis中的數據
input {
redis {
batch_count => 1
# EVAL命令返回的事件數目,設置爲5表示一次請求返回5條日誌信息
data_type => "list"
# logstash redis插件工做方式
key => "logstash-test"
# 監聽的鍵值
host => "127.0.0.1"
# redis端口號
password => "123456"
# 若是有安全認證,此項爲認證密碼
db => 0
# 若是應用使用了不一樣的數據庫,此爲redis數據庫的編號,默認爲0。
threads => 1
# 啓用線程數量
}
}
d:讀取syslog、rsyslog的數據
input {
syslog {
port => "514"
}
}
e:讀取網絡數據tcp (舊數據的處理)
input {
tcp {
port => 8888
mode => "server"
ssl_enable => false
}
}
一般與nc命令配合使用
# nc 127.0.0.1 8888 < olddata
f:標準輸入stdin
input {
stdin {
add_field => {"key" => "value"}
codec => "plain"
tags => ["add"]
type => "std"
}
}
# bin/logstash -f stdin.conf 後面輸入字段,就會有輸出(不經常使用)
2.filter
個人理解filter模塊就是對日誌的一個處理過程。簡單介紹我使用過的幾個插件。
a:grok 插件 :拆分日誌,篩選須要的值,並進行排序。配合這個字段 remove_field => ["message"] 能夠刪除不須要的值。nginx日誌爲例:
grok {
match => { "message" => "\[%{TIMESTAMP_ISO8601:time_iso8601}\] \[%{NUMBER:pid}\] \[%{IPORHOST:remote_addr}\] \[%{IPORHOST:http_host}(:%{NUMBER:http_host_port})?\] \[%{IPORHOST:upstream_addr}(:%{NUMBER:upstream_port})?\] \[%{WORD:verb} %{URIPATH:request_uri}(?:%{URIPARAM:request_parameter})? HTTP/%{NUMBER:httpversion}\] \[%{NUMBER:status}\] \[%{BASE10NUM:upstream_response_time} s\] \[%{BASE10NUM:request_time} s\] \[(?:%{NUMBER:bytes_sent}|-) bytes\] \[(?:%{NUMBER:body_bytes_sent}|-) bytes\] \[%{GREEDYDATA:http_user_agent}\]" }
remove_field => ["message"] #一個不想要的字段
}
b:mutate插件:處理數據的格式,特別是時間
mutate {
convert => {
"upstream_response_time" => "float"
"request_time" => "float"
"status" => "integer"
}
}
c:ruby插件 處理事件event,能夠使用各類ruby語法
if [logtype] == "Feed" {
ruby {
code => '
request_uri = event.get("request_uri")
if request_uri.nil?
return
end
}
解釋一下(定義request_uri字段,假如字段爲空,結束操做)
d:date插件 定義時間
date {
locale => "en"
match => ["time_iso8601", "ISO8601"]
}
解釋一下(當前日誌的時間是美國時間,輸出爲北京時間)
f:drop的使用
if [message] !~ "^error" {
drop { }
}
解釋一下(假如信息匹配到error,就過濾掉)
3.output 輸出到
a:elasticsearch 最經常使用的插件
output {
elasticsearch {
hosts => "localhost:9200"
manage_template => false
index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
document_type => "%{[@metadata][type]}"
}
}
b:固然也能夠輸入到redis中 進行進一步處理(參數選擇性添加)
output {
redis{
batch => true
batch_events => 50
batch_timeout => 5
codec => plain
congestion_interval => 1
congestion_threshold => 5
data_type => list
db => 0
host => ["127.0.0.1:6379"]
key => xxx
# list或channel的名字
password => xxx
# redis的密碼,默認不使用
port => 6379
}
}