logstash-2-插件配置

配置語法:  Logstash必須有一個 input 和一個 output

1, 處理輸入的input 

1), 從文件錄入

logstash使用一個名爲 filewatch的 ruby gem庫來監聽文件變化, 這個庫記錄一個 .sincedb的數據文件跟蹤監聽日誌文件的當前位置html

input {
    file {
        path => ["/var/log/*.log", "/var/log/message"]
        type => "system"
        start_position => "beginning"
    }
}
output {
    stdout{
        codec=>rubydebug } }
 

其餘配置java

discover_interval: 每隔多久檢查path下是否有新文件, 默認15s
exclude: 不行唄監聽的文件排除
close_older: 被監聽的文件多久沒更新就關閉監聽, 默認3600s
ignore_older: 檢查文件列表時, 若是最後修改時間超過這個值, 就虎烈

2) 標準輸入: (Stdin)

logstash最簡單最基本的輸入方式nginx

在 {LH}/下, 新建  stdin.conf, 並輸入如下內容: git

input{
    stdin{
        add_field=>{"key"=>"value"}
        codec=>"plain"
        tags=>["add"]
        type=>"std"
    }
}
output {
    stdout{
        codec=>rubydebug    
    }
}

使用命令運行github

./bin/logstash -f ./stdin.conf

啓動後輸入 helloworld, 能夠看到以下輸出redis

這兒的 type 和tags 是logstash的兩個特俗字段, 一般會在輸入區域經過type標記事件類型, tags則在數據處理階段, 由具體的插件來添加或刪除的json

 3) syslog

 從設備上收集日誌的時候可用 ruby

input {
  syslog {
    port => "514"
  }
}
output {
    stdout{
        codec=>rubydebug } }

此時, 系統的日誌都會到logstash中來, 建議使用使用LogStash::Inputs::TCP和 LogStash::Filters::Grok 配合實現一樣的 syslog 功能! 具體可見: https://kibana.logstash.es/content/logstash/plugins/input/syslog.html網絡

input {
  tcp {
    port => "8514"
  }
}
filter {
  grok {
    match => ["message", "%{SYSLOGLINE}" ]
  }
  syslog_pri { }
}

4) 網絡數據讀取, tcp

可被redis,等替代做爲 logstash broker 的角色, 但logstash又本身的tcp插件 app

input {
    tcp {
        port => 8888
        mode => "server"
        ssl_enable => false
    }
}

最佳使用是: 配合  nc 命令導入就數據

# nc 127.0.0.1 8888 < olddata

導入完畢後, nc命令會結束, 若是使用file會一直監聽新數據

 

2 編碼插件 codec

使用codec能夠處理不一樣類型的數據,  使得logstash 造成 input | decode | filter | encode | output 的數據流, codec就是用來 encode 和 decode的

1), json格式

將nginx的日誌導入爲json格式: nginx須要配置 conf, 在 http{} 下進行配置, 全部server共享

logformat json '{"@timestamp":"$time_iso8601",'
               '"@version":"1",'
               '"host":"$server_addr",'
               '"client":"$remote_addr",'
               '"size":$body_bytes_sent,'
               '"responsetime":$request_time,'
               '"domain":"$host",'
               '"url":"$uri",'
               '"status":"$status"}';
access_log /var/log/nginx/access.log_json json;

修改stdin.conf

input {
    file {
        path => "/var/log/nginx/access.log_json"
        codec => "json"
    }
}

而後訪問本地nginx, 能夠看到logstash輸出: 

 

2), multiline 合併多行數據

 一個事件打印多行內容, 很難經過命令行解析分析, 所以須要: 

input {
    stdin {
        codec => multiline {
            pattern => "^\["
            negate => true
            what => "previous"
        }
    }
}

將當前的數據添加到下一行後面, 知道新匹配 ^[ 位置

3, filter插件: 

擴展了進入過濾器的原始數據,進行復雜的邏輯處理,甚至能夠無中生有的添加新的 logstash 事件到後續的流程中去!

1) 時間處理

logstash內部使用了java的 joda 時間庫來處理時間 

filter {
    grok {
        match => ["message", "%{HTTPDATE:logdate}"]
    }
    date {
        match => ["logdate", "dd/MMM/yyyy:HH:mm:ss Z"]
    }
}

 

2) grok, 正則捕獲

 能夠將輸入的文本匹配到字段中去: 

input {stdin{}}
filter {
    grok {
        match => {
            "message" => "\s+(?<request_time>\d+(?:\.\d+)?)\s+"
        }
    }
}
output {stdout{codec => rubydebug}}

而後輸入  begin 123.456 end

 

 

grok支持預約義的grok表達式: (本身的變量)

%{PATTERN_NAME:capture_name:data_type}

 因此上例可改爲: 

filter {
    grok {
        match => {
            "message" => "%{WORD} %{NUMBER:request_time:float} %{WORD}"
        }
    }
}

從新運行後, request_time的值變爲float類型的, 

實際使用中: 建議把全部的fork表達式統一寫在一個地方, 而後patterns_dir指明. 若是將message中的全部信息都grok到不通字段了, 數據就存儲重複了, 所以能夠用remove_filed或者 overwrite來重寫message

filter {
    grok {
        patterns_dir => ["/path/to/your/own/patterns"]
        match => {
            "message" => "%{SYSLOGBASE} %{DATA:message}"
        }
     data => {
        "match" => ["date1", "YYYY-MM-dd HH:mm:ss.SSS" ]
     } overwrite
=> ["message"] } }

冒號(:) 能夠從新命名

附: grok 正則變量類型: https://github.com/wenbronk/elasticsearch-elasticsearch-learn/blob/master/grok%E5%86%85%E9%83%A8%E5%8F%98%E9%87%8F.txt

 

3) dissect

跟grok相似, 但資源消耗較小. 當日志格式有比較簡明的分隔標誌位,並且重複性較大的時候,咱們可使用 dissect 插件更快的完成解析工做 

filter {
    dissect {
        mapping => {
            "message" => "%{ts} %{+ts} %{+ts} %{src} %{} %{prog}[%{pid}]: %{msg}"
        }
        convert_datatype => {
            pid => "int"
        }
    }
}

好比配置: http://rizhiyi.com/index.do?id=123

http://%{domain}/%{?url}?%{?arg1}=%{&arg1}

匹配後
{
  domain => "rizhiyi.com",
  id => "123"
}

解釋

%{+key} 這個 + 表示,前面已經捕獲到一個 key 字段了,而此次捕獲的內容,自動添補到以前 key 字段內容的後面。
%{+key/2} 這個 /2 表示,在有屢次捕獲內容都填到 key 字段裏的時候,拼接字符串的順序誰前誰後。/2 表示排第 2 位。
%{?string} 這個 ? 表示,這塊只是一個佔位,並不會實際生成捕獲字段存到 Event 裏面。
%{?string} %{&string} 當一樣捕獲名稱都是 string,可是一個 ? 一個 & 的時候,表示這是一個鍵值對。

4) geoip: 免費的ip地址歸類查詢庫, 可根據ip提供對應的低於信息, 包括省,市,經緯度,, 可視化地圖統計等

input {stdin{}}
filter {
    geoip {
        source => "message"
    }
}
output {stdout{codec => rubydebug}}

運行結果

若是隻想要其中某些字段, 能夠經過fileds來指定

 

geoip {
        fields => ["city_name", "continent_code", "country_code2", "country_code3", "country_name", "dma_code", "ip", "latitude", "longitude", "postal_code", "region_name", "timezone"]
    }

5, metrics, filters/metrics 插件是使用 Ruby 的 Metriks 模塊來實如今內存裏實時的計數和採樣分析

 最近一分鐘 504 請求的個數超過 100 個就報警:

 
 
filter {
    metrics {
        timer => {"rt" => "%{request_time}"}
        percentiles => [25, 75]
        add_tag => "percentile"
    }
    if "percentile" in [tags] {
        ruby {
            code => "l=event.get('[rt][p75]')-event.get('[rt][p25]');event.set('[rt][low]', event.get('[rt][p25]')-l);event.set('[rt][high]',event.get('[rt][p75]')+l)"
        }
    }
}
output {
    if "percentile" in [tags] and ([rt][last] > [rt][high] or [rt][last] < [rt][low]) {
        exec {
            command => "echo \"Anomaly: %{[rt][last]}\""
        }
    }
}
 

 

 6, mutate, 類型轉換

可轉換的類型包括  integer, float, string

filter {
    mutate {
        convert => ["request_time", "float"]
    }
}

字符串處理: , sub

gsub => ["urlparams", "[\\?#]", "_"]

split: 

filter {
    mutate {
        split => ["message", "|"]
    }
}

join, 將split切分的在join回去

filter {
    mutate {
        split => ["message", "|"]
    }
    mutate {
        join => ["message", ","]
    }
}

rename: 字段重命名:

filter {
    mutate {
        rename => ["syslog_host", "host"]
    }
}

7, split切分

是multiline插件的反向, 將一行數據切分到多個事件中去

filter {
    split {
        field => "message"
        terminator => "#"
    }
}

而後輸入  "test1#test2", 能夠看到被輸出到2個事件中

4, output

 1, 標準輸出 (Stdout)

output {
    stdout {
        codec => rubydebug
        workers => 2
    }
}

2, 輸出到es

output {
    elasticsearch {
        hosts => ["192.168.0.2:9200"] # 有多個用逗號隔開
        index => "logstash-%{type}-%{+YYYY.MM.dd}"
        document_type => "%{type}"
        flush_size => 20000
        idle_flush_time => 10
        sniffing => true
        template_overwrite => true
    }
}

 

注意索引名中不能有大寫字母,不然 ES 在日誌中會報 InvalidIndexNameException,可是 Logstash 不會報錯,這個錯誤比較隱晦,也容易掉進這個坑中。

 3), email

126郵箱發送到 qq郵箱的示例

output {
    email {
        port           =>    "25"
        address        =>    "smtp.126.com"
        username       =>    "test@126.com"
        password       =>    ""
        authentication =>    "plain"
        use_tls        =>    true
        from           =>    "test@126.com"
        subject        =>    "Warning: %{title}"
        to             =>    "test@qq.com"
        via            =>    "smtp"
        body           =>    "%{message}"
    }
}
相關文章
相關標籤/搜索