使用Logstash的grok過濾日誌文件

能夠使用Logstash的grok模塊對任意文本解析並結構化輸出。Logstash默認帶有120中匹配模式。
php

能夠參見源代碼html

logstash/patterns/grok-patternslinux

logstash/lib/logstash/filters/grok.rbgit


grok的語法格式爲 %{SYNTAX:SEMANTIC}github

SYNTAX是文本要匹配的模式,例如3.14匹配 NUMBER 模式,127.0.0.1 匹配 IP 模式。
正則表達式


SEMANTIC 是匹配到的文本片斷的標識。例如 「3.14」 能夠是一個時間的持續時間,因此能夠簡單地叫作"duration" ,字符串"55.3.244.1"能夠被標識爲「client」apache

因此,grok過濾器表達式能夠寫成:
ruby


%{NUMBER:duration} %{IP:client}ssh


默認狀況下,全部的SEMANTIC是以字符串的方式保存,若是想要轉換一個SEMANTIC的數據類型,例如轉換一個字符串爲×××,能夠寫成以下的方式:curl


%{NUMBER:num:int}


例如日誌

55.3.244.1 GET /index.html 15824 0.043


能夠寫成以下的grok過濾表達式


%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}


再舉一個實際的案例

常規的Apache日誌

127.0.0.1 - - [13/Apr/2015:17:22:03 +0800] "GET /router.php HTTP/1.1" 404 285 "-" "curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/3.15.3 zlib/1.2.3 libidn/1.18 libssh2/1.4.2"
127.0.0.1 - - [13/Apr/2015:17:22:03 +0800] "GET /router.php HTTP/1.1" 404 285 "-" "curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/3.15.3 zlib/1.2.3 libidn/1.18 libssh2/1.4.2"

使用Logstash收集

 input{
 
  file {
    type => "apache"
    path => "/var/log/httpd/access_log"
    exclude => ["*.gz"]
    sincedb_path => "/dev/null"
 
       }
 
      }

output {
   stdout {
      codec => rubydebug
          }
        }

顯示:

{
       "message" => "127.0.0.1 - - [13/Apr/2015:17:22:03 +0800] \"GET /router.php HTTP/1.1\" 404 285 \"-\" \"curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/3.15.3 zlib/1.2.3 libidn/1.18 libssh2/1.4.2\"",
      "@version" => "1",
    "@timestamp" => "2015-04-13T09:22:03.844Z",
          "type" => "apache",
          "host" => "xxxxxx",
          "path" => "/var/log/httpd/access_log"
}
{
       "message" => "127.0.0.1 - - [13/Apr/2015:17:22:03 +0800] \"GET /router.php HTTP/1.1\" 404 285 \"-\" \"curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/3.15.3 zlib/1.2.3 libidn/1.18 libssh2/1.4.2\"",
      "@version" => "1",
    "@timestamp" => "2015-04-13T09:22:03.844Z",
          "type" => "apache",
          "host" => "xxxxxx",
          "path" => "/var/log/httpd/access_log"
}


修改配置以下:



input {

  file {
    type => "apache"
    path => "/var/log/httpd/access_log"
    exclude => ["*.gz"]
    sincedb_path => "/dev/null"
    
       }

      }
filter {
  if [type] == "apache" {
     grok {
          match => ["message",  "%{COMBINEDAPACHELOG}"]
          }
                         }
       }

output {
   stdout {
      codec => rubydebug
          }
       }


顯示:

{
        "message" => "127.0.0.1 - - [14/Apr/2015:09:53:40 +0800] \"GET /router.php HTTP/1.1\" 404 285 \"-\" \"curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/3.15.3 zlib/1.2.3 libidn/1.18 libssh2/1.4.2\"",
       "@version" => "1",
     "@timestamp" => "2015-04-14T01:53:57.182Z",
           "type" => "apache",
           "host" => "xxxxxxxx",
           "path" => "/var/log/httpd/access_log",
       "clientip" => "127.0.0.1",
          "ident" => "-",
           "auth" => "-",
      "timestamp" => "14/Apr/2015:09:53:40 +0800",
           "verb" => "GET",
        "request" => "/router.php",
    "httpversion" => "1.1",
       "response" => "404",
          "bytes" => "285",
       "referrer" => "\"-\"",
          "agent" => "\"curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/3.15.3 zlib/1.2.3 libidn/1.18 libssh2/1.4.2\""
}
{
        "message" => "127.0.0.1 - - [14/Apr/2015:09:53:40 +0800] \"GET /router.php HTTP/1.1\" 404 285 \"-\" \"curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/3.15.3 zlib/1.2.3 libidn/1.18 libssh2/1.4.2\"",
       "@version" => "1",
     "@timestamp" => "2015-04-14T01:53:57.187Z",
           "type" => "apache",
           "host" => "xxxxxxx",
           "path" => "/var/log/httpd/access_log",
       "clientip" => "127.0.0.1",
          "ident" => "-",
           "auth" => "-",
      "timestamp" => "14/Apr/2015:09:53:40 +0800",
           "verb" => "GET",
        "request" => "/router.php",
    "httpversion" => "1.1",
       "response" => "404",
          "bytes" => "285",
       "referrer" => "\"-\"",
          "agent" => "\"curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/3.15.3 zlib/1.2.3 libidn/1.18 libssh2/1.4.2\""
}


這裏的%{COMBINEDAPACHELOG} 是logstash自帶的匹配模式


patterns/grok-patterns 

COMMONAPACHELOG %{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "(?:%{WORD:verb} %{NOTSPACE:req
uest}(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})" %{NUMBER:response} (?:%{NUMBER:bytes}|-)
COMBINEDAPACHELOG %{COMMONAPACHELOG} %{QS:referrer} %{QS:agent}


grok能夠支持任意正則表達式

因此支持的正則表達式的語法能夠參見

http://www.geocities.jp/kosako3/oniguruma/doc/RE.txt


在有些狀況下自帶的匹配模式沒法知足需求,能夠自定義一些匹配模式

首先能夠根據正則表達式匹配文本片斷


(?<field_name>the pattern here)


例如,postfix日誌有一個字段表示 queue id,能夠使用如下表達式進行匹配:


(?<queue_id>[0-9A-F]{10,11}


能夠手動建立一個匹配文件

#     # contents of ./patterns/postfix:

    POSTFIX_QUEUEID [0-9A-F]{10,11}

    Jan  1 06:25:43 mailserver14 postfix/cleanup[21403]: BEF25A72965: message-id=<20130101142543.5828399CCAF@mailserver14.example.com>

     filter {
       grok {
         patterns_dir => "./patterns"
         match => [ "message", "%{SYSLOGBASE} %{POSTFIX_QUEUEID:queue_id}: %{GREEDYDATA:syslog_message}" ]
       }
     }

 The above will match and result in the following fields:

 * timestamp: Jan  1 06:25:43
 * logsource: mailserver14
 * program: postfix/cleanup
 * pid: 21403
 * queue_id: BEF25A72965
 * syslog_message: message-id=<20130101142543.5828399CCAF@mailserver14.example.com>

 The `timestamp`, `logsource`, `program`, and `pid` fields come from the
 SYSLOGBASE pattern which itself is defined by other patterns.


能夠使用重寫

The fields to overwrite.
 
 This allows you to overwrite a value in a field that already exists.
 
   For example, if you have a syslog line in the 'message' field, you can
   overwrite the 'message' field with part of the match like so:
  
       filter {
         grok {
           match => [
             "message",
             "%{SYSLOGBASE} %{DATA:message}"
           ]
           overwrite => [ "message" ]
         }
       }
  
    In this case, a line like "May 29 16:37:11 sadness logger: hello world"
    will be parsed and 'hello world' will overwrite the original message.









參考文檔:

http://logstash.net/docs/1.4.2/filters/grok

https://github.com/logstash/logstash/tree/v1.4.2/patterns

相關文章
相關標籤/搜索