logstash經常使用插件解析

官方地址:https://www.elastic.co/guide/en/logstash-versioned-plugins/current/index.htmlphp

配置文件寫法:
# 日誌導入input {}
# 日誌篩選匹配處理filter {}
# 日誌匹配輸出output {}
# 日誌解析配置文件的框架共分爲三個模塊,input,output,filter。後面會一一講解, 每一個模塊裏面存在不一樣的插件。html

input 模塊

例子1

# file爲經常使用文件插件,插件內選項不少,可根據需求自行判斷
input {        
file {        
path => "/var/lib/mysql/slow.log"        # 要導入的文件的位置,可使用*,例如/var/log/nginx/*.log        
Excude =>」*.gz」                                # 要排除的文件        
start_position => "beginning"            # 從文件開始的位置開始讀,end表示從結尾開始讀        
ignore_older => 0                             # 多久以內沒修改過的文件不讀取,0爲無限制,單位爲秒        
sincedb_path => "/dev/null"              # 記錄文件上次讀取位置,輸出到null表示每次都從文件首行開始解析        
type => "mysql-slow"                        # type字段,可代表導入的日誌類型    
}   
} mysql

例子2

# redis插件爲經常使用插件,插件內選項不少,可根據需求自行判斷    
input {    
redis {        
batch_count => 1                         # EVAL命令返回的事件數目,設置爲5表示一次請求返回5條日誌信息        
data_type => "list"                       # logstash redis插件工做方式        
key => "logstash-test-list"             # 監聽的鍵值        
host => "127.0.0.1"                      # redis地址        
port => 6379                                # redis端口號        
password => "123123"                    # 若是有安全認證,此項爲認證密碼        
db => 0                                        # 若是應用使用了不一樣的數據庫,此爲redis數據庫的編號,默認爲0。        
threads => 1                                # 啓用線程數量      
}
}
經常使用的 input 插件其實有不少,這裏只舉例了兩種。其餘還有 kafka,tcp 等等nginx

filter 模塊

例子

filter {                                         # 插件不少,這裏選取我使用過的插件作講述    
if ([message] =~ "正則表達式")  {  drop {}  }    # 正則匹配=~,!~,包含判斷in,not in ,字符串匹配==,!=,等等,匹配以後能夠作任何操做,這裏過濾掉匹配行,除了作過濾操做,if後面能夠做任意操做,甚至能夠爲匹配到的任意行作單獨的正則分割操做    
multiline {        
pattern => "正則表達式"        
negate => true        
what => "previous"                               # 多行合併,因爲一些日誌存在一條多行的狀況,這個模塊能夠進行指定多行合併,經過正則匹配,匹配到的內容上面的多行合併爲一條日誌。    
}       
grok {        
match => { "message" => "正則表達式"             # 正則匹配日誌,能夠篩選分割出須要記錄的字段和值        }           
remove_field => ["message"]                             # 刪除不須要記錄的字段   
}       
date {        match => ["timestamp","dd/MMM/yyyy:HH:mm:ss Z"]        # 記錄@timestamp時間,能夠設置日誌中自定的時間字段,若是日誌中沒有時間字段,也能夠本身生成        
target=>「@timestamp」                                          # 將匹配的timestamp字段放在指定的字段 默認是@timestamp    }    
ruby {        code => "event.timestamp.time.localtime"        # timestamp時區鎖定    }   
}正則表達式

output 模塊

例子1

output {    
# tdout { codec => "rubydebug" }                  # 篩選過濾後的內容輸出到終端顯示    
elasticsearch {                                               # 導出到es,最經常使用的插件        
codec => "json"                                             # 導出格式爲json        
hosts => ["127.0.0.1:9200"]                           # ES地址+端口        
index => "logstash-slow-%{+YYYY.MM.dd}"           # 導出到index內,可使用時間變量        
user => "admin"        password => "xxxxxx"           # ES若是有安全認證就使用帳號密碼驗證,無安全認證就不須要        
flush_size => 500                                                   # 默認500,logstash一次性攢夠500條的數據在向es發送        
idle_flush_time => 1                              # 默認1s,若是1s內沒攢夠500,仍是會一次性把數據發給ES    }   } redis

例子2

output {     
redis{                                                        # 輸出到redis的插件,下面選項根據需求使用         
batch => true                                           # 設爲false,一次rpush,發一條數據,true爲發送一批         
batch_events => 50                                # 一次rpush發送多少數據         
batch_timeout => 5                                 # 一次rpush消耗多少時間         
codec => plain                                        # 對輸出數據進行codec,避免使用logstash的separate filter         
congestion_interval => 1                          # 多長時間進項一次擁塞檢查         
congestion_threshold => 5                      # 限制一個list中能夠存在多少個item,當數量足夠時,就會阻塞直到有其餘消費者消費list中的數據         
data_type => list                                  # 使用list仍是publish         
db => 0                                                # 使用redis的那個數據庫,默認爲0號         
host => ["127.0.0.1:6379"]                   # redis 的地址和端口,會覆蓋全局端口         
key => xxx                                           # list或channel的名字         
password => xxx                                   # redis的密碼,默認不使用         
port => 6379                                        # 全局端口,默認6379,若是host已指定,本條失效         
reconnect_interval => 1                       # 失敗重連的間隔,默認爲1s         
timeout => 5                                      # 鏈接超時的時間         
workers => 1                                      # 工做進程     
}
}
經常使用插件還有不少,更多的插件使用能夠查看官方文檔
經過上面的介紹,咱們大致知道了 logstash 的處理流程:
input => filter => output
接下來就看一完整的應用例子
完整的應用:
Elasticsearch slow-logsql

input {    
file {        
path => ["/var/log/elasticsearch/private_test_index_search_slowlog.log"]        
start_position => "beginning"        
ignore_older => 0        
# sincedb_path => "/dev/null"        
type => "elasticsearch_slow"        
}   
}
filter {    
grok {        
match =>  { "message" => "^/[(/d/d){1,2}-(?:0[1-9]|1[0-2])-(?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9])/s+(?:2[0123]|[01]?[0-9]):(?:[0-5][0-9]):(?:(?:[0-5]?[0-9]|60)(?:[:.,][0-9]+)?)/]/[(TRACE|DEBUG|WARN/s|INFO/s)/]/[(?[a-z/.]+)/]/s/[(?[a-z0-9/-/.]+)/]/s/[(?[A-Za-z0-9/./_/-]+)/]/[/d+/]/s+took/[(?[/./d]+(ms|s|m))/]/,/s+took_millis/[(/d)+/]/,/s+types/[(?([A-Za-z/_]+|[A-Za-z/_]*))/]/,/s+stats/[/]/,/s+search_type/[(?[A-Z/_]+)/]/,/s+total_shards/[/d+/]/,/s+source/[(?[/s/S]+)/]/,/s+extra_source/[[/s/S]*/]/,/s*$" }        
remove_field => ["message"]        
}       
date {        
match => ["timestamp","dd/MMM/yyyy:HH:mm:ss Z"]         
}       
ruby {        
code => "event.timestamp.time.localtime"        
}       }

output {     
elasticsearch {         
codec => "json"         
hosts => ["127.0.0.1:9200"]         
index => "logstash-elasticsearch-slow-%{+YYYY.MM.dd}"         
user => "admin"         
password => "xxxx"    
}   
}

Mysql-slow log數據庫

input {    
file {        
path => "/var/lib/mysql/slow.log"        
start_position => "beginning"        
ignore_older => 0        
# sincedb_path => "/dev/null"        
type => "mysql-slow"    
}   
}
filter {    
if ([message] =~ "^(//usr//local|Tcp|Time)[/s/S]*")
{ drop {} }    
multiline {        
pattern => "^/#/s+Time/:/s+/d+/s+(0[1-9]|[12][0-9]|3[01]|[1-9])"        
negate => true        
what => "previous"    
}       
grok {        
match => { "message" => "^/#/sTime/:/s+/d+/s+(?%{TIME})/n+/#/pcidata@xxx.com/:/s+[A-Za-z0-9/_]+/[(?[A-Za-z0-9/_]+)/]/pcidta@xxx.com/s+(?[A-Za-z0-9/_]+)/s+/[/]/n+/#/s+Query/_time/:/s+(?[0-9/.]+)/s+Lock/_time/:/s+(?[0-9/.]+)/s+Rows/_sent/:/s+(?/d+)/s+Rows/_examined/:/s+(?/d+)(/n+|/n+use/s+(?[A-Za-z0-9/_]+)/;/n+)SET/s+timestamp/=/d+/;/n+(?[/s/S]+)$"   }           
remove_field => ["message"]   }       
date {        
match => ["timestamp","dd/MMM/yyyy:HH:mm:ss Z"]     
}       
ruby {        
code => "event.timestamp.time.localtime"    
}   
}
output {     
elasticsearch {        
codec => "json"        
hosts => ["127.0.0.1:9200"]        
index => "logstash-mysql-slow-%{+YYYY.MM.dd}"        
user => "admin"        
password => "xxxxx"    
}   
}

Nginx access.log
logstash 中內置 nginx 的正則,咱們只要稍做修改就能使用
將下面的內容寫入到/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-patterns-core-2.0.5/patterns/grok-patterns 文件中json

X_FOR (%{IPV4}|-)NGINXACCESS %{COMBINEDAPACHELOG} /"%{X_FOR:http_x_forwarded_for}/"ERRORDATE %{YEAR}/%{MONTHNUM}/%{MONTHDAY} %{TIME}NGINXERROR_ERROR %{ERRORDATE:timestamp}/s{1,}/[%{DATA:err_severity}/]/s{1,}(%{NUMBER:pid:int}#%{NUMBER}:/s{1,}/*%{NUMBER}|/*%{NUMBER}) %{DATA:err_message}(?:,/s{1,}client:/s{1,}(?%{IP}|%{HOSTNAME}))(?:,/s{1,}server:/s{1,}%{IPORHOST:server})(?:, request: %{QS:request})?(?:, host: %{QS:server_ip})?(?:, referrer:/"%{URI:referrer})?NGINXERROR_OTHER %{ERRORDATE:timestamp}/s{1,}/[%{DATA:err_severity}/]/s{1,}%{GREEDYDATA:err_message}

以後的 log 配置文件以下安全

input {    
file {    
path => [ "/var/log/nginx/www-access.log" ]    
start_position => "beginning"    
# sincedb_path => "/dev/null"    
type => "nginx_access"    
}   
}
filter {    
grok {         
match => { "message" => "%{NGINXACCESS}"}    
}    
mutate {        
convert => [ "response","integer" ]        
convert => [ "bytes","integer" ]    
}    
date {        
match => [ "timestamp","dd/MMM/yyyy:HH:mm:ss Z"]    
}       
ruby {        
code => "event.timestamp.time.localtime"    
}   
}
output {    
elasticsearch {        
codec => "json"        
hosts => ["127.0.0.1:9200"]        
index => "logstash-nginx-access-%{+YYYY.MM.dd}"        
user => "admin"        
password => "xxxx"    
}
} 

Nginx error.log

input {    
file {    
path => [ "/var/log/nginx/www-error.log" ]    
start_position => "beginning"    
# sincedb_path => "/dev/null"    
type => "nginx_error"    
}
}
filter {    
grok {        
match => [  
"message","%{NGINXERROR_ERROR}",                   
"message","%{NGINXERROR_OTHER}"                 
]    
}       
ruby {        
code => "event.timestamp.time.localtime"    
}        
date {         
match => [ "timestamp","dd/MMM/yyyy:HH:mm:ss"]     
}
}
output {    
elasticsearch {        
codec => "json"        
hosts => ["127.0.0.1:9200"]        
index => "logstash-nginx-error-%{+YYYY.MM.dd}"        
user => "admin"        
password => "xxxx"    
}   
} 

PHP error.log

input {    
file {        
path => ["/var/log/php/error.log"]        
start_position => "beginning"        
# sincedb_path => "/dev/null"        
type => "php-fpm_error"    
}   
}
filter {    
multiline {        pattern => "^/[(0[1-9]|[12][0-9]|3[01]|[1-9])/-%{MONTH}-%{YEAR}[/s/S]+"        negate => true        what => "previous"    }       
grok {        match => { "message" => "^/[(?(0[1-9]|[12][0-9]|3[01]|[1-9])/-%{MONTH}-%{YEAR}/s+%{TIME}?)/s+[A-Za-z]+//[A-Za-z]+/]/s+(?(?:[A-Z]{3}/s+[A-Z]{1}[a-z]{5,7}|[A-Z]{3}/s+[A-Z]{1}[a-z/s]{9,11}))/:/s+(?[/s/S]+$)" }        
remove_field => ["message"]    }       
date {        match => ["timestamp","dd/MMM/yyyy:HH:mm:ss Z"]     }       
ruby {        code => "event.timestamp.time.localtime"    }   }
output {    
elasticsearch {        
codec => "json"        
hosts => ["127.0.0.1:9200"]        
index => "logstash-php-error-%{+YYYY.MM.dd}"        
user => "admin"        
password => "xxxxx"    }   
}

Php-fpm slow-log

input {    
file {        
path => ["/var/log/php-fpm/www.slow.log"]        
start_position => "beginning"        
# sincedb_path => "/dev/null"        
type => "php-fpm_slow"    
}   
}
filter {    
multiline {        pattern => "^$"        negate => true        what => "previous"    }       
grok {        match => { "message" => "^/[(?(0[1-9]|[12][0-9]|3[01]|[1-9])/-%{MONTH}-%{YEAR}/s+%{TIME})/]/s+/[[a-z]{4}/s+(?[A-Za-z0-9]{1,8})/]/s+[a-z]{3}/s+(?/d{1,7})/n(?[/s/S]+$)" }        
remove_field => ["message"]    }       
date {        match => ["timestamp","dd/MMM/yyyy:HH:mm:ss Z"]     }       
ruby {        code => "event.timestamp.time.localtime"    }   }
output {    
elasticsearch {        
codec => "json"        
hosts => ["127.0.0.1:9200"]        
index => "logstash-php-fpm-slow-%{+YYYY.MM.dd}"        
user => "admin"        
password => "xxxx"    }   
}

log 解析配置文件統一放在/etc/logstash/conf.d 目錄下,不過也能夠任意放置,統一塊兒來最好。
在多個配置文件的時候,不能使用以下命令運行logstash:
/opt/logstash/bin/logstash -f /etc/logstash/conf.d/(或者有個*)
這個命令會拼接配置文件,不會單個使用,會報錯。
若是有多個配置文件,就一個一個啓動:
/opt/logstash/bin/logstash -f /etc/logstash/conf.d/nginx_error.conf &
可是這樣也很麻煩,若是配置文件不少的狀況下須要一個個來,而且啓動
速度還很慢,寫了一個測試腳本用來方便使用,僅供參考:

#!/bin/bash
# /配置文件存放目錄根據需求本身更改
conf_path=/etc/logstash/conf.d
conf_name=$( ls ${conf_path} )
case $1 in
start)    
echo "-----------please wait.----------"    
echo "The start-up process is too slow."    
for cf in ${conf_name}    
do          
/opt/logstash/bin/logstash  -f $conf_path/$cf > /dev/null 2>&;1 &;            
if [ $? -ne 0 ];then                
echo 'The '${cf}' start-up failed.'            
fi        
sleep 20    
done    
echo "start-up success."
;;    
stop)    
ps -ef |grep logstash |grep -v grep > /dev/null 2>&;1     
if [ $? -eq 0 ];then        
ps -ef|grep logstash |grep -v grep |awk '{print $2}'|xargs  kill -9 > /dev/null 2>&;1        
sleep 2        
echo "Stop success."    
fi  
;;
restart)    
ps -ef |grep logstash |grep -v grep 2>&;1    
if [ $? -eq 0 ];then        
ps -ef|grep logstash |grep -v grep |awk '{print $2}'|xargs  kill -9 > /dev/null 2>&;1        
sleep 3        
echo "Stop success."    
fi      
echo "-----------please wait.----------"    
echo "The start-up process is too slow."    
for cf in ${conf_name}    
do          
/opt/logstash/bin/logstash  -f $conf_path/$cf > /dev/null 2>&;1 &;            
if [ $? -ne 0 ];then                
echo 'The '${cf}' start-up failed.'            
fi        
sleep 10    
done     
echo "start-up success."
;;
*)    
echo "Usage: "$0" {start|stop|restart|}"    
exit 1;
;;
esac 

 腳本的名字中不要包含 logstash,這裏保存爲 log_stash.sh,使用./log_stash.sh (start|stop|restart) 來執行腳本。

相關文章
相關標籤/搜索