Logstash 參考指南(Logstash配置示例)

Logstash配置示例

下面的示例演示如何配置Logstash來過濾事件,處理Apache日誌和syslog消息,並使用條件來控制哪些事件由過濾器或輸出處理。php

若是你須要幫助構建grok模式,請嘗試grok調試器,Grok調試器是基本許可證下的X-Pack特性,所以能夠 無償使用

配置過濾器

過濾器是一種在線處理機制,它提供了根據須要對數據進行切片和切割的靈活性,讓咱們看一下活動中的一些過濾器,下面的配置文件設置了grokdate過濾器。css

input { stdin { } }

filter {
  grok {
    match => { "message" => "%{COMBINEDAPACHELOG}" }
  }
  date {
    match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
  }
}

output {
  elasticsearch { hosts => ["localhost:9200"] }
  stdout { codec => rubydebug }
}

使用此配置運行Logstash:ios

bin/logstash -f logstash-filter.conf

如今,將下面的行粘貼到你的終端並按Enter鍵,這樣它就會被stdin輸入處理:git

127.0.0.1 - - [11/Dec/2013:00:01:45 -0800] "GET /xampp/status.php HTTP/1.1" 200 3891 "http://cadenza/xampp/navi.php" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:25.0) Gecko/20100101 Firefox/25.0"

你應該會看到返回到stdout的是這樣的:github

{
        "message" => "127.0.0.1 - - [11/Dec/2013:00:01:45 -0800] \"GET /xampp/status.php HTTP/1.1\" 200 3891 \"http://cadenza/xampp/navi.php\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:25.0) Gecko/20100101 Firefox/25.0\"",
     "@timestamp" => "2013-12-11T08:01:45.000Z",
       "@version" => "1",
           "host" => "cadenza",
       "clientip" => "127.0.0.1",
          "ident" => "-",
           "auth" => "-",
      "timestamp" => "11/Dec/2013:00:01:45 -0800",
           "verb" => "GET",
        "request" => "/xampp/status.php",
    "httpversion" => "1.1",
       "response" => "200",
          "bytes" => "3891",
       "referrer" => "\"http://cadenza/xampp/navi.php\"",
          "agent" => "\"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:25.0) Gecko/20100101 Firefox/25.0\""
}

如你所見,Logstash(在grok過濾器的幫助下)可以解析日誌行(碰巧是Apache的「組合日誌」格式),並將其分解爲許多不一樣的離散信息,一旦開始查詢和分析日誌數據,這就很是有用。例如,你將可以輕鬆地在HTTP響應碼、IP地址、referrers等上運行報表。Logstash有不少現成的grok模式,所以若是你須要解析通用的日誌格式,極可能已經有人爲你完成了這項工做,有關更多信息,請參閱GitHub上Logstash grok模式的列表。web

本例中使用的另外一個過濾器是date過濾器,這個過濾器會解析一個時間戳,並將其用做事件的時間戳(無論你何時使用日誌數據)。你將注意到,本例中的@timestamp字段設置爲2013年12月11日,儘管Logstash在隨後的某個時間攝取了該事件,這在備份日誌時很是方便,它使你可以告訴Logstash「使用此值做爲此事件的時間戳」。shell

處理Apache日誌

讓咱們作一些有用的事情:處理apache2訪問日誌文件!咱們將從本地主機上的文件中讀取輸入,並根據須要使用條件處理事件。首先,建立一個名爲logstash-apache.conf的文件包含如下內容(你能夠根據須要更改日誌文件路徑):apache

input {
  file {
    path => "/tmp/access_log"
    start_position => "beginning"
  }
}

filter {
  if [path] =~ "access" {
    mutate { replace => { "type" => "apache_access" } }
    grok {
      match => { "message" => "%{COMBINEDAPACHELOG}" }
    }
  }
  date {
    match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
  }
}

output {
  elasticsearch {
    hosts => ["localhost:9200"]
  }
  stdout { codec => rubydebug }
}

而後,使用如下日誌條目(或使用你本身的webserver中的一些日誌條目)建立上面配置的輸入文件(在本例中爲「/tmp/access_log」):ruby

71.141.244.242 - kurt [18/May/2011:01:48:10 -0700] "GET /admin HTTP/1.1" 301 566 "-" "Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US; rv:1.9.2.3) Gecko/20100401 Firefox/3.6.3"
134.39.72.245 - - [18/May/2011:12:40:18 -0700] "GET /favicon.ico HTTP/1.1" 200 1189 "-" "Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 5.1; Trident/4.0; .NET CLR 2.0.50727; .NET CLR 3.0.4506.2152; .NET CLR 3.5.30729; InfoPath.2; .NET4.0C; .NET4.0E)"
98.83.179.51 - - [18/May/2011:19:35:08 -0700] "GET /css/main.css HTTP/1.1" 200 1837 "http://www.safesand.com/information.htm" "Mozilla/5.0 (Windows NT 6.0; WOW64; rv:2.0.1) Gecko/20100101 Firefox/4.0.1"

如今,使用-f標誌運行Logstash以將其傳遞到配置文件:服務器

bin/logstash -f logstash-apache.conf

如今你應該在Elasticsearch中看到你的apache日誌數據了,Logstash打開並讀取指定的輸入文件,處理遇到的每一個事件。記錄到此文件的任何追加行也將被捕獲,由Logstash做爲事件處理,並存儲在Elasticsearch中。還有一個額外的好處,他們儲藏的字段「type」設置爲「apache_access」(這是由輸入配置中的type ⇒ "apache_access"行)。

在這個配置中,Logstash只查看apache access_log,可是經過更改上面配置中的一行就能夠同時查看access_log和error_log(其實是任何文件匹配*log):

input {
  file {
    path => "/tmp/*_log"
...

當你從新啓動Logstash時,它將同時處理error和access日誌,可是,若是你檢查數據(可能使用elasticsearch-kopf),你會看到access_log被分解爲離散字段,而error_log則不是。這是由於咱們使用了grok過濾器來匹配標準的組合apache日誌格式,並自動將數據分割爲單獨的字段。若是咱們能根據它的格式來控制行是如何被解析的,不是很好嗎?嗯,咱們能夠…

注意,Logstash不會從新處理access_log文件中已經查看的事件,從文件中讀取數據時,Logstash保存其位置,並只在添加新行時處理它們。

使用條件

你可使用條件來控制哪些事件由過濾器或輸出處理,例如,你能夠根據出如今哪一個文件(access_log、error_log以及以「log」結尾的其餘隨機文件)中來標記每一個事件。

input {
  file {
    path => "/tmp/*_log"
  }
}

filter {
  if [path] =~ "access" {
    mutate { replace => { type => "apache_access" } }
    grok {
      match => { "message" => "%{COMBINEDAPACHELOG}" }
    }
    date {
      match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
    }
  } else if [path] =~ "error" {
    mutate { replace => { type => "apache_error" } }
  } else {
    mutate { replace => { type => "random_logs" } }
  }
}

output {
  elasticsearch { hosts => ["localhost:9200"] }
  stdout { codec => rubydebug }
}

這個示例使用type字段標記全部事件,但實際上不解析errorrandom文件,有不少類型的錯誤日誌,它們應該如何標記取決於你使用的日誌。

相似地,你可使用條件將事件定向到特定的輸出,例如,你能夠:

  • 警告nagios任何狀態爲5xx的apache事件
  • 將任何4xx狀態記錄到Elasticsearch
  • 經過statsd記錄全部的狀態代碼

要告訴nagios任何具備5xx狀態碼的http事件,首先須要檢查type字段的值,若是是apache,那麼你能夠檢查status字段是否包含5xx錯誤,若是是,發送到nagios。若是不是5xx錯誤,檢查status字段是否包含4xx錯誤,若是是,發送到Elasticsearch。最後,將全部apache狀態碼發送到statsd,不管狀態字段包含什麼:

output {
  if [type] == "apache" {
    if [status] =~ /^5\d\d/ {
      nagios { ...  }
    } else if [status] =~ /^4\d\d/ {
      elasticsearch { ... }
    }
    statsd { increment => "apache.%{status}" }
  }
}

處理Syslog消息

Syslog是Logstash最多見的用例之一,並且它處理得很是好(只要日誌行大體符合RFC3164),Syslog其實是UNIX網絡日誌記錄標準,它將消息從客戶端發送到本地文件,或經過rsyslog發送到集中式日誌服務器。對於本例,你不須要一個功能正常的syslog實例;咱們將從命令行中僞造它,這樣你就能夠了解發生了什麼。

首先,讓咱們爲Logstash + syslog建立一個簡單的配置文件,名爲logstash-syslog.conf

input {
  tcp {
    port => 5000
    type => syslog
  }
  udp {
    port => 5000
    type => syslog
  }
}

filter {
  if [type] == "syslog" {
    grok {
      match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }
      add_field => [ "received_at", "%{@timestamp}" ]
      add_field => [ "received_from", "%{host}" ]
    }
    date {
      match => [ "syslog_timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
    }
  }
}

output {
  elasticsearch { hosts => ["localhost:9200"] }
  stdout { codec => rubydebug }
}

使用這個新配置運行Logstash:

bin/logstash -f logstash-syslog.conf

一般,客戶端將鏈接到端口5000上的Logstash實例併發送消息,對於本例,咱們將telnet到Logstash並輸入一條日誌行(相似於前面在STDIN中輸入日誌行),打開另外一個shell窗口與Logstash syslog輸入進行交互並輸入如下命令:

telnet localhost 5000

複製粘貼如下行做爲示例(你能夠本身嘗試一些,可是要記住,若是grok過濾器對你的數據不正確,它們可能沒法解析)。

Dec 23 12:11:43 louis postfix/smtpd[31499]: connect from unknown[95.75.93.154]
Dec 23 14:42:56 louis named[16000]: client 199.48.164.7#64817: query (cache) 'amsterdamboothuren.com/MX/IN' denied
Dec 23 14:30:01 louis CRON[619]: (www-data) CMD (php /usr/share/cacti/site/poller.php >/dev/null 2>/var/log/cacti/poller-error.log)
Dec 22 18:28:06 louis rsyslogd: [origin software="rsyslogd" swVersion="4.2.0" x-pid="2253" x-info="http://www.rsyslog.com"] rsyslogd was HUPed, type 'lightweight'.

如今,當你的原始shell處理和解析消息時,你應該會看到Logstash的輸出!

{
                 "message" => "Dec 23 14:30:01 louis CRON[619]: (www-data) CMD (php /usr/share/cacti/site/poller.php >/dev/null 2>/var/log/cacti/poller-error.log)",
              "@timestamp" => "2013-12-23T22:30:01.000Z",
                "@version" => "1",
                    "type" => "syslog",
                    "host" => "0:0:0:0:0:0:0:1:52617",
        "syslog_timestamp" => "Dec 23 14:30:01",
         "syslog_hostname" => "louis",
          "syslog_program" => "CRON",
              "syslog_pid" => "619",
          "syslog_message" => "(www-data) CMD (php /usr/share/cacti/site/poller.php >/dev/null 2>/var/log/cacti/poller-error.log)",
             "received_at" => "2013-12-23 22:49:22 UTC",
           "received_from" => "0:0:0:0:0:0:0:1:52617",
    "syslog_severity_code" => 5,
    "syslog_facility_code" => 1,
         "syslog_facility" => "user-level",
         "syslog_severity" => "notice"
}
相關文章
相關標籤/搜索