logstash-grok解析nginx+php日誌

ELK的基礎架構搭建相對來講仍是比較容易的,架構搭建完成後一個重要任務就是如何把生產中的服務器日誌給解析好後php

存進Elasticsearch了,這個過程當中間最關鍵的一步就是如何使用filter插件格式化日誌數據的問題。筆者在此過程當中html

也是被grok的正規表達式磨到想哭的地步,因此你們也不要遇到一點困難就選擇放棄了!nginx


logstash格式化日誌的插件比較多,比較經常使用的就是grok、date、geoipweb

若是日誌自己沒有作成json格式,咱們就須要使用grok格式先把日誌解析成json格式,才能方便ES存儲正則表達式

grok解析日誌時實際使用的是正則表達式來匹配相應字段後並給其字段命名json

但若是每次匹配字段時都要直接使用正則表達式的元字符寫表達式,是一件很痛苦的事情,因此官方在開發grok插件時,bootstrap

就已經爲用戶提早寫好了不少的現成的模式(也即便用元字符寫好的一個表達式模塊),咱們調用grok插件解析日誌,ruby

大多均可以直接使用,但若是咱們的日誌是本身訂製過的格式的話,就須要自行寫grok表達式的模式了。服務器

grok表達式是須要邊寫邊調試的,好在ELK官方在5.5版本之後直接把grok調試工具集成在了kibana的web頁面上了。架構

因此咱們徹底能夠把整個ELK的環境搭建起來以後再去寫grok表達式,這樣就可使用kibana自己集成的grok調試工具了。


下面咱們簡單展現一下grok格式化nginx錯誤日誌(如下操做都是使用kibana集成的grok debug toos進行的)


源日誌內容正文以下:

2018/08/29 21:34:53 [error] 1195#1195: *11 open() "/usr/share/nginx/html/favicon.ico" failed (2: No such file or directory), client: 192.168.10.1, server: localhost, request: "GET /favicon.ico HTTP/1.1", host: "192.168.10.150", referrer: "http://192.168.10.150/index0.html"


粗略格式化grok表達式

(?<timestamp>\d{4}/\d{2}/\d{2}\s{1,}\d{2}:\d{2}:\d{2})\s{1,}\[%{DATA:log_level}\]\s{1,}%{GREEDYDATA:error_message}


格式化後的效果

{

  "error_message": "1195#1195: *11 open() \"/usr/share/nginx/html/favicon.ico\" failed (2: No such file or directory), client: 192.168.10.1, server: localhost, request: \"GET /favicon.ico HTTP/1.1\", host: \"192.168.10.150\", referrer: \"http://192.168.10.150/index0.html\"",

  "log_level": "error",

  "timestamp": "2018/08/29 21:34:53"

}


精細格式化grok表達式

(?<timestamp>\d{4}/\d{2}/\d{2}\s{1,}\d{2}:\d{2}:\d{2})\s{1,}\[%{DATA:log_level}\]\s{1,}(%{NUMBER:pid:int}#%{NUMBER}:\s{1,}\*%{NUMBER}|\*%{NUMBER}) %{DATA:error_message}(?:,\s{1,}client:\s{1,}(?<client>%{IP}|%{HOSTNAME}))(?:,\s{1,}server:\s{1,}%{IPORHOST:server})(?:, request: %{QS:request})?(?:, upstream: %{QS:upstream})?(?:, host: %{QS:host})?(?:, referrer: \"%{URI:referrer})?


{

  "error_message": "open() \"/usr/share/nginx/html/favicon.ico\" failed (2: No such file or directory)",

  "server": "localhost",

  "request": "\"GET /favicon.ico HTTP/1.1\"",

  "log_level": "error",

  "pid": 1195,

  "referrer": "http://192.168.10.150/index0.html",

  "host": "\"192.168.10.150\"",

  "client": "192.168.10.1",

  "timestamp": "2018/08/29 21:34:53"

}

由此能夠對比看出,日誌能夠格式化的比較粗略,也能夠格式化的比較精細,這個須要根據後期往後分析的需求來決定了,若是格式的比較粗略,

後期若是想對比較長的字段裏面的一些內容作過濾分析的話,就會比較麻煩一些。


logstash關於nginx訪問日誌解析的具體配置

/etc/logstash/conf.d/nginx-access.conf

input {

  kafka {

    bootstrap_servers => "172.16.100.7:9092"

    topics => ["nginx-access"]

    consumer_threads => 5

    type => "nginx-access-log"

  }

}


filter {

  if [type] == "nginx-access-log" {

    grok {

      match => { "message" => "%{HOSTNAME:logserver} %{PATH:logpath} %{NGINX_ACCESS_LOG}" }

      remove_field  => "message"

    }

    date {

      match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z", "yyyy/MM/dd HH:mm:ss" ]

    }

    geoip {

      source => "remote_addr"

    }

  }

}


output {

  if [type] == "nginx-access-log" {

    elasticsearch {

      hosts => ["localhost:9200"]

      index => "nginx-%{+YYYY.MM.dd}"

      template => "/etc/logstash/conf.d/nginx.index"

      template_name => "nginx"

      template_overwrite => true

    }

  }

}


input部分咱們配置logstash從kafka中指定的隊列中去相應日誌

consumer_threads配置用幾個線程去kafka隊列中去取日誌

type字段是咱們爲了讓filter可以進行提取對應隊列中的數據本身添加了一個新的字段,

自定義的字段的值爲string格式,因此須要用雙引號括起來


filter部分在使用grok插件進行解析時必須匹配指定隊列中的日誌數據,

因此須要經過if語句進行判斷,只有符合指定條件日誌數據纔會使用下方的grok表達式進行解析。

message正文內容使用了三串模式就寫完了整條表達式,%{HOSTNAME:logserver}和%{PATH:logpath}這兩段

是調用了默認pattern文件中預先定義好的pattern模塊,logstash默認pattern文件路徑:

/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-patterns-core-4.1.2/patterns/grok-patterns

%{NGINX_ACCESS_LOG}是用默認patter模塊自行拼裝的一個pattern的表達式,在我這裏,我把和nginx相關的pattern都寫在了

一個獨立產pattern配置文件裏面了,須要放置在/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-patterns-core-4.1.2/patterns此路徑下,若是新增或修改了pattern文件內容,須要重啓logstash服務


output插件也須要首先判斷自定義的type類型後再進行輸出處理,這裏輸出到ES,而且使用了自定義的映射模版。


本身編寫的nginx模式集合文件

cat /usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-patterns-core-4.1.2/patterns/nginx

NGINX_ALL %{IP:remote_addr} %{DATA:remote_user} \[%{HTTPDATE:timestamp}\] %{QS:request} %{NUMBER:status_code} %{NUMBER:body_bytes} %{QS:request_body} %{QS:http_referer} %{QS:user_agent} %{QS:x_forward_for}%{DATA:upstream_addr} %{DATA:upstream_response_time} %{NUMBER:request_time}


NGINX_NO_FORWARD_NO_UPSTREAM_ADDR %{IP:remote_addr} %{DATA:remote_user} \[%{HTTPDATE:timestamp}\] %{QS:request} %{NUMBER:status_code} %{NUMBER:body_bytes} %{QS:request_body} %{QS:http_referer} %{QS:user_agent} %{DATA:upstream_response_time} %{NUMBER:request_time}


NGINX_NO_UPSTREAM_ADDR %{IP:remote_addr} %{DATA:remote_user} \[%{HTTPDATE:timestamp}\] %{QS:request} %{NUMBER:status_code} %{NUMBER:body_bytes} %{QS:request_body} %{QS:http_referer} %{QS:user_agent} %{QS:x_forward_for} %{DATA:upstream_response_time} %{NUMBER:request_time}


NGINX_NO_REQUEST_BODY %{IP:remote_addr} %{DATA:remote_user} \[%{HTTPDATE:timestamp}\] %{QS:request} %{NUMBER:status_code} %{NUMBER:body_bytes} %{QS:http_referer} %{QS:user_agent} %{QS:x_forward_for}%{DATA:upstream_addr} %{DATA:upstream_response_time} %{NUMBER:request_time}


NGINX_NO HTTP_REFERER %{IP:remote_addr} %{DATA:remote_user} \[%{HTTPDATE:timestamp}\] %{QS:request} %{NUMBER:status_code} %{NUMBER:body_bytes} %{QS:request_body} %{QS:user_agent} %{QS:x_forward_for}%{DATA:upstream_addr} %{DATA:upstream_response_time} %{NUMBER:request_time}


NGINX_SIMPLE %{IP:remote_addr} %{DATA:remote_user} \[%{HTTPDATE:timestamp}\] %{QS:request} %{NUMBER:status_code} %{NUMBER:body_bytes} %{QS:http_referer} %{QS:user_agent} %{QS:x_forward_for}


NGINX_ACCESS_LOG %{NGINX_ALL}|%{NGINX_NO_FORWARD_NO_UPSTREAM_ADDR}|%{NGINX_NO_UPSTREAM_ADDR}|%{NGINX_NO_REQUEST_BODY}|%{NGINX_NO HTTP_REFERER}|%{NGINX_SIMPLE}


NGINX_ERROR_ALL (?<timestamp>\d{4}/\d{2}/\d{2}\s{1,}\d{2}:\d{2}:\d{2})\s{1,}\[%{DATA:log_level}\]\s{1,}(%{NUMBER:pid:int}#%{NUMBER}:\s{1,}\*%{NUMBER}|\*%{NUMBER}) %{DATA:error_message}(?:,\s{1,}client:\s{1,}(?<client>%{IP}|%{HOSTNAME}))(?:,\s{1,}server:\s{1,}%{IPORHOST:server})(?:, request: %{QS:request})?(?:, upstream: %{QS:upstream})?(?:, host: %{QS:host})?(?:, referrer: \"%{URI:referrer})?


NGINX_ERROR_SIMPLE (?<timestamp>\d{4}/\d{2}/\d{2}\s{1,}\d{2}:\d{2}:\d{2})\s{1,}\[%{DATA:log_level}\]\s{1,}%{GREEDYDATA:error_message}


NGINX_ERROR_LOG %{NGINX_ERROR_ALL}|%{NGINX_ERROR_SIMPLE}


本身編寫的php模式集合文件

cat /usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-patterns-core-4.1.2/patterns/php 

DATETIME_PHP %{MONTHDAY}[./-]%{MONTH}[./-]%{YEAR} %{TIME}


PHP_ERROR_ALL \[%{DATETIME_PHP:timestamp}\]\s+%{LOGLEVEL:loglevel}:\s+%{GREEDYDATA:error_message}


#PHP_POOL_ERROR_ALL \[%{DATETIME_PHP:timestamp}\s+Asia/Shanghai\]\s+PHP\s+%{LOGLEVEL:loglevel}:\s+%{GREEDYDATA:error_message}

PHP_POOL_ERROR_ALL \[%{DATETIME_PHP:timestamp} %{DATA:timezone}\] %{WORD:mode} %{DATA:error_type}\: %{GREEDYDATA:log_content}\n(?m)%{GREEDYDATA:stack_trace}


PHP_ERROR_SIMPLE \[%{DATETIME_PHP:timestamp}\s+Asia/Shanghai\]\s+PHP\s+%{GREEDYDATA:error_message}


PHP_ERROR_LOG %{PHP_POOL_ERROR_ALL}|%{PHP_ERROR_ALL}|%{PHP_ERROR_SIMPLE}


#PHP_SLOW_LOG (?m)^\[%{DATETIME_PHP:timestamp}\]\s+\[pool\s+%{USERNAME:poolname}\]\s+pid\s+%{NUMBER:pid}\n%{USERNAME} = %{PATH:script_filename}\n%{GREEDYDATA:detail}^$


#PHP_SLOW_LOG \[%{DATETIME_PHP:timestamp}\]\s+\[pool\s+%{USERNAME:poolname}\]\s+pid\s+%{NUMBER:pid}\n%{USERNAME:source_name} = %{PATH:script_filename}\n%{GREEDYDATA:detail}$


PHP_SLOW_LOG \[%{DATETIME_PHP:timestamp}\]\s+\[pool\s+%{USERNAME}\]\s+pid\s+%{NUMBER}\n%{USERNAME} = %{PATH:script_filename}\n%{GREEDYDATA:slow_detail}$



注意、注意、請注意:

首先是把nginx日誌的各類組合形式都用基礎pattern或者正則元字符組成的單條pattern,

而後把各類組合好的單條pattern語句使用「或者」的邏輯判斷組成一個可以解析多種日誌組

合格式的完整pattern,在這裏須要提醒你們的是,使用「或者」進行拼裝時,須要把匹配條

件較精準的放在前面,不然很容易出現使用調試工具時徹底正確,但放進logstash服務下

正式運行時就會報出許多解析失敗或者解析超時的錯誤信息並將logstash卡死。


解析規則開啓多行匹配模式時,若是日誌的結尾沒有明確的標識符,會把後續的行也匹配成當前

這一條日誌的內容。或許有人會說,個人多行日誌是以空行作分隔的,沒錯,使用filebeat從源

上收集日誌傳送到kafka隊列中時,是沒有問題的,可是logstash從kafka隊列中拉取日誌進行

解析時就會出現麻煩了,由於原來以空行作分隔的消息進入隊列後空行被清除了,因此此時解析

使用多行匹配就須要有明確的結束符,才能正確的匹配到原來屬於一條日誌的內容。


logstash完整的小配置文件,解析nginx錯誤日誌


/etc/logstash/conf.d/nginx-error.conf

input {

  kafka {

    bootstrap_servers => "172.16.100.7:9092"

    topics => ["nginx-error"]

    consumer_threads => 3

    type => "nginx-error-log"

  }

}


filter {

  if [type] == "nginx-error-log" {

    grok {

      match => { "message" => "%{HOSTNAME:logserver} %{PATH:logpath} %{NGINX_ERROR_LOG}" }

      remove_field  => "message"

    }

    date {

      match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z", "yyyy/MM/dd HH:mm:ss" ]

    }

    geoip {

      source => "remote_addr"

    }

  }

}


output {

  if [type] == "nginx-error-log" {

    elasticsearch {

      hosts => ["localhost:9200"]

      index => "nginx-%{+YYYY.MM.dd}"

      template => "/etc/logstash/conf.d/nginx.index"

      template_name => "nginx"

      template_overwrite => true

    }

  }

}


logstash完整的小配置文件,解析nginx訪問日誌

cat /etc/logstash/conf.d/nginx-access.conf 

input {

  kafka {

    bootstrap_servers => "172.16.100.7:9092"

    topics => ["nginx-access"]

    consumer_threads => 5

    type => "nginx-access-log"

  }

}


filter {

  if [type] == "nginx-access-log" {

    grok {

      match => { "message" => "%{HOSTNAME:logserver} %{PATH:logpath} %{NGINX_ACCESS_LOG}" }

      remove_field  => "message"

    }

    date {

      match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z", "yyyy/MM/dd HH:mm:ss" ]

    }

    geoip {

      source => "remote_addr"

    }

  }

}


output {

  if [type] == "nginx-access-log" {

    elasticsearch {

      hosts => ["localhost:9200"]

      index => "nginx-%{+YYYY.MM.dd}"

      template => "/etc/logstash/conf.d/nginx.index"

      template_name => "nginx"

      template_overwrite => true

    }

  }

}


logstash完整的小配置文件,解析php慢日誌

cat /etc/logstash/conf.d/php-slow.conf 

input {

  kafka {

    bootstrap_servers => "172.16.100.7:9092"

    topics => ["php-slow"]

    consumer_threads => 5

    type => "php-slow-log"

  }

}


filter {

  if [type] == "php-slow-log" {

    grok {

      match => { "message" => "^%{HOSTNAME:logserver} %{PATH:logpath} %{PHP_SLOW_LOG}" }

      remove_field => "message"

    }

    date {

      match => [ "timestamp", "dd-MMM-yyyy HH:mm:ss" ]

    }

  }

}


output {

  if [type] == "php-slow-log" {

    elasticsearch {

      hosts => ["localhost:9200"]

      index => "php-%{+YYYY.MM.dd}"

    }

  }

}


logstash完整的小配置文件,解析php錯誤日誌

cat /etc/logstash/conf.d/php-error.conf 

input {

  kafka {

    bootstrap_servers => "172.16.100.7:9092"

    topics => ["php-error"]

    consumer_threads => 5

    type => "php-error-log"

  }

}


filter {

  if [type] == "php-error-log" {

    grok {

      match => { "message" => "^%{HOSTNAME:logserver} %{PATH:logpath} %{PHP_ERROR_LOG}" }

      remove_field => "message"

    }

    date {

      match => [ "timestamp", "dd-MMM-yyyy HH:mm:ss" ]

    }

  }

}


output {

  if [type] == "php-error-log" {

    elasticsearch {

      hosts => ["localhost:9200"]

      index => "php-%{+YYYY.MM.dd}"

    }

  }

}


對了,關於filter插件中使用到的date插件和geoip插件,以及nginx配置文件中使用的自定義的映射模版文件

都會放在後續的文章中進行單獨的介紹。本文分享的核心在於幫助你們理解如何使用grok的模式式寫出解析本身

生產環境中的服務器日誌的表達式。筆者在搭好ELK架構以後在寫這個表達式時也頭疼的好一陣子,查閱了好多相關

的文章,因此你們也不要以爲太痛苦。只要你深信技術都是這樣的過程當中沉澱下來的。

相關文章
相關標籤/搜索