logstash使用一個名爲 filewatch的 ruby gem庫來監聽文件變化, 這個庫記錄一個 .sincedb的數據文件跟蹤監聽日誌文件的當前位置html
input { file { path => ["/var/log/*.log", "/var/log/message"] type => "system" start_position => "beginning" } }
output {
stdout{
codec=>rubydebug } }
其餘配置java
discover_interval: 每隔多久檢查path下是否有新文件, 默認15s
exclude: 不行唄監聽的文件排除
close_older: 被監聽的文件多久沒更新就關閉監聽, 默認3600s
ignore_older: 檢查文件列表時, 若是最後修改時間超過這個值, 就虎烈
logstash最簡單最基本的輸入方式nginx
在 {LH}/下, 新建 stdin.conf, 並輸入如下內容: git
input{ stdin{ add_field=>{"key"=>"value"} codec=>"plain" tags=>["add"] type=>"std" } } output { stdout{ codec=>rubydebug } }
使用命令運行github
./bin/logstash -f ./stdin.conf
啓動後輸入 helloworld, 能夠看到以下輸出redis
這兒的 type 和tags 是logstash的兩個特俗字段, 一般會在輸入區域經過type標記事件類型, tags則在數據處理階段, 由具體的插件來添加或刪除的json
從設備上收集日誌的時候可用 ruby
input { syslog { port => "514" } }
output {
stdout{
codec=>rubydebug } }
此時, 系統的日誌都會到logstash中來, 建議使用使用LogStash::Inputs::TCP
和 LogStash::Filters::Grok
配合實現一樣的 syslog 功能! 具體可見: https://kibana.logstash.es/content/logstash/plugins/input/syslog.html網絡
input { tcp { port => "8514" } } filter { grok { match => ["message", "%{SYSLOGLINE}" ] } syslog_pri { } }
可被redis,等替代做爲 logstash broker 的角色, 但logstash又本身的tcp插件 app
input { tcp { port => 8888 mode => "server" ssl_enable => false } }
最佳使用是: 配合 nc 命令導入就數據
# nc 127.0.0.1 8888 < olddata
導入完畢後, nc命令會結束, 若是使用file會一直監聽新數據
使用codec能夠處理不一樣類型的數據, 使得logstash 造成 input | decode | filter | encode | output 的數據流, codec就是用來 encode 和 decode的
將nginx的日誌導入爲json格式: nginx須要配置 conf, 在 http{} 下進行配置, 全部server共享
logformat json '{"@timestamp":"$time_iso8601",' '"@version":"1",' '"host":"$server_addr",' '"client":"$remote_addr",' '"size":$body_bytes_sent,' '"responsetime":$request_time,' '"domain":"$host",' '"url":"$uri",' '"status":"$status"}'; access_log /var/log/nginx/access.log_json json;
修改stdin.conf
input { file { path => "/var/log/nginx/access.log_json" codec => "json" } }
而後訪問本地nginx, 能夠看到logstash輸出:
一個事件打印多行內容, 很難經過命令行解析分析, 所以須要:
input { stdin { codec => multiline { pattern => "^\[" negate => true what => "previous" } } }
將當前的數據添加到下一行後面, 知道新匹配 ^[ 位置
擴展了進入過濾器的原始數據,進行復雜的邏輯處理,甚至能夠無中生有的添加新的 logstash 事件到後續的流程中去!
logstash內部使用了java的 joda 時間庫來處理時間
filter { grok { match => ["message", "%{HTTPDATE:logdate}"] } date { match => ["logdate", "dd/MMM/yyyy:HH:mm:ss Z"] } }
能夠將輸入的文本匹配到字段中去:
input {stdin{}} filter { grok { match => { "message" => "\s+(?<request_time>\d+(?:\.\d+)?)\s+" } } } output {stdout{codec => rubydebug}}
而後輸入 begin 123.456 end
grok支持預約義的grok表達式: (本身的變量)
%{PATTERN_NAME:capture_name:data_type}
因此上例可改爲:
filter { grok { match => { "message" => "%{WORD} %{NUMBER:request_time:float} %{WORD}" } } }
從新運行後, request_time的值變爲float類型的,
實際使用中: 建議把全部的fork表達式統一寫在一個地方, 而後patterns_dir指明. 若是將message中的全部信息都grok到不通字段了, 數據就存儲重複了, 所以能夠用remove_filed或者 overwrite來重寫message
filter { grok { patterns_dir => ["/path/to/your/own/patterns"] match => { "message" => "%{SYSLOGBASE} %{DATA:message}" }
data => {
"match" => ["date1", "YYYY-MM-dd HH:mm:ss.SSS" ]
} overwrite => ["message"] } }
冒號(:) 能夠從新命名
附: grok 正則變量類型: https://github.com/wenbronk/elasticsearch-elasticsearch-learn/blob/master/grok%E5%86%85%E9%83%A8%E5%8F%98%E9%87%8F.txt
跟grok相似, 但資源消耗較小. 當日志格式有比較簡明的分隔標誌位,並且重複性較大的時候,咱們可使用 dissect 插件更快的完成解析工做
filter { dissect { mapping => { "message" => "%{ts} %{+ts} %{+ts} %{src} %{} %{prog}[%{pid}]: %{msg}" } convert_datatype => { pid => "int" } } }
好比配置: http://rizhiyi.com/index.do?id=123
http://%{domain}/%{?url}?%{?arg1}=%{&arg1} 匹配後 { domain => "rizhiyi.com", id => "123" }
解釋
%{+key} 這個 + 表示,前面已經捕獲到一個 key 字段了,而此次捕獲的內容,自動添補到以前 key 字段內容的後面。 %{+key/2} 這個 /2 表示,在有屢次捕獲內容都填到 key 字段裏的時候,拼接字符串的順序誰前誰後。/2 表示排第 2 位。 %{?string} 這個 ? 表示,這塊只是一個佔位,並不會實際生成捕獲字段存到 Event 裏面。 %{?string} %{&string} 當一樣捕獲名稱都是 string,可是一個 ? 一個 & 的時候,表示這是一個鍵值對。
input {stdin{}} filter { geoip { source => "message" } } output {stdout{codec => rubydebug}}
運行結果
若是隻想要其中某些字段, 能夠經過fileds來指定
geoip { fields => ["city_name", "continent_code", "country_code2", "country_code3", "country_name", "dma_code", "ip", "latitude", "longitude", "postal_code", "region_name", "timezone"] }
最近一分鐘 504 請求的個數超過 100 個就報警:
filter { metrics { timer => {"rt" => "%{request_time}"} percentiles => [25, 75] add_tag => "percentile" } if "percentile" in [tags] { ruby { code => "l=event.get('[rt][p75]')-event.get('[rt][p25]');event.set('[rt][low]', event.get('[rt][p25]')-l);event.set('[rt][high]',event.get('[rt][p75]')+l)" } } } output { if "percentile" in [tags] and ([rt][last] > [rt][high] or [rt][last] < [rt][low]) { exec { command => "echo \"Anomaly: %{[rt][last]}\"" } } }
可轉換的類型包括 integer, float, string
filter { mutate { convert => ["request_time", "float"] } }
字符串處理: , sub
gsub => ["urlparams", "[\\?#]", "_"]
split:
filter { mutate { split => ["message", "|"] } }
join, 將split切分的在join回去
filter { mutate { split => ["message", "|"] } mutate { join => ["message", ","] } }
rename: 字段重命名:
filter { mutate { rename => ["syslog_host", "host"] } }
是multiline插件的反向, 將一行數據切分到多個事件中去
filter { split { field => "message" terminator => "#" } }
而後輸入 "test1#test2", 能夠看到被輸出到2個事件中
output { stdout { codec => rubydebug workers => 2 } }
output { elasticsearch { hosts => ["192.168.0.2:9200"] # 有多個用逗號隔開 index => "logstash-%{type}-%{+YYYY.MM.dd}" document_type => "%{type}" flush_size => 20000 idle_flush_time => 10 sniffing => true template_overwrite => true } }
注意索引名中不能有大寫字母,不然 ES 在日誌中會報 InvalidIndexNameException,可是 Logstash 不會報錯,這個錯誤比較隱晦,也容易掉進這個坑中。
3), email
126郵箱發送到 qq郵箱的示例
output { email { port => "25" address => "smtp.126.com" username => "test@126.com" password => "" authentication => "plain" use_tls => true from => "test@126.com" subject => "Warning: %{title}" to => "test@qq.com" via => "smtp" body => "%{message}" } }