elasticsearch下載: https://download.elasticsearch.org/elasticsearch/release/org/elasticsearch/distribution/tar/elasticsearch/2.2.0/elasticsearch-2.2.0.tar.gzhtml
logstash下載: https://download.elastic.co/logstash/logstash/logstash-2.2.1.tar.gznode
kibana下載: https://download.elastic.co/kibana/kibana/kibana-4.4.1-linux-x64.tar.gzlinux
grok debugger 驗證grok表達式: https://grokdebug.herokuapp.com/nginx
es中文文檔: http://wiki.jikexueyuan.com/project/elasticsearch-definitive-guide-cn/git
elk中文文檔: http://kibana.logstash.es/shell
kibaba搜索語法: http://www.javashuo.com/article/p-ogjmtkhn-cs.htmljson
es配置: https://www.elastic.co/guide/en/elasticsearch/reference/current/setup-configuration.htmlbootstrap
curl localhost:9200/_nodes/stats/process?pretty
1. 修改JAVA_OPTS指定堆棧大小vim
2. ulimit -n 65535以上segmentfault
3. sysctl -w vm.max_map_count=262144
4. 關閉swap或者swappiness設置爲0
5. 設置bootstrap.mlockall: true,而且啓動指定tmp目錄 ./bin/elasticsearch -Djna.tmpdir=/path/to/new/dir
filebeat: registry_file: "/usr/local/filebeat/.filebeat" prospectors: - paths: - /usr/local/nginx/logs/access.log document_type: type1 - paths: - /usr/local/nginx/logs/error.log document_type: type2 - paths: - /usr/local/nginx/logs/other.log document_type: type3 output: logstash: hosts: ["192.168.241.130:5044"] worker: 1
input { beats { port => 5044 # ssl => false # ssl_certificate => "/etc/pki/tls/certs/logstash-forwarder.crt" # ssl_key => "/etc/pki/tls/private/logstash-forwarder.key" } } filter { if [type] == "type1" { grok { patterns_dir => "./patterns" match => { "message" => "%{NGINXLOG}" } } geoip { source => "remote_addr" target => "geoip" database =>"/usr/local/logstash/geoip/GeoLiteCity.dat" add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ] add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ] } # 下面這部分使用日誌時間做爲索引字段,默認是使用日誌導入時間。 date { locale => "en" match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ] } mutate { remove_field => "timestamp" } } else if [type] == "type2" { # 下面這部分爲了解析json,先把日誌裏非json篩選出來。 grok { patterns_dir => "./patterns/" match => {"message" => "%{DATETIME:timestamp} \[%{WORD:level}\] %{GREEDYDATA:jsonmessage}"} } json { source => "jsonmessage" remove_field => ["jsonmessage"] } } else if [type] == "type3" { # 處理多行日誌 multiline { pattern => "^\d{4}\/\d{2}\/\d{2} \d{2}\:\d{2}\:\d{2}" negate => true what => previous } grok { patterns_dir => "./patterns/" match => {"message" => "%{LUALOG}"} } # 匹配失敗走下面 if "_grokparsefailure" in [tags] { grok { patterns_dir => "./patterns" match => { "message" => "%{NGINX_ERROR_LOG}"} } date { locale => "en" match => [ "timestamp", "yyyy/MM/dd HH:mm:ss", "yyyy/MMM/dd HH:mm:ss" ] #匹配多種格式 } mutate { remove_field => "timestamp" } geoip { source => "client" target => "geoip" database =>"/usr/local/logstash/geoip/GeoLiteCity.dat" add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ] add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ] } } } } output { elasticsearch { hosts => "192.168.241.130" index => "logstash-%{type}-%{+YYYY.MM.dd}" document_type => "%{type}" workers => 2 flush_size => 20000 idle_flush_time => 10 } }
filebeat指定type類型,logstash判斷區分日誌
日誌格式複雜的話,能夠在filter中使用grok+json相似這種多層解析。
if以內能夠再嵌套if,經過if "_grokparsefailure" in [tags]來匹配上一個匹配失敗的日誌。
使用日誌時間做爲索引字段,默認是使用日誌導入時間。
# COMMON DATETIME (?<timestamp>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) # FOR NGINX DATETIME (?<timestamp>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) NGINX_ERROR_DATE_TIME %{YEAR}\/%{MONTHNUM}\/%{MONTHDAY} %{TIME} NGINXLOG \[%{HTTPDATE:timestamp}\] %{NUMBER:request_time} %{IPORHOST:remote_addr} %{INT:status} %{INT:body_bytes_sent} "%{WORD:method} %{URIPATH:path}(?:%{URIPARAM:param})? HTTP/%{NUMBER:httpversion}" %{QS:http_referer} %{QS:http_user_agent} %{QS:cookie} NGINX_ERROR_LOG (?<timestamp>\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2}) \[%{DATA:err_severity}\] (%{NUMBER:pid:int}#%{NUMBER}: \*%{NUMBER}|\*%{NUMBER}) %{DATA:err_message}(?:, client: (?<client_ip>%{IP}|%{HOSTNAME}))(?:, server: %{IPORHOST:server})(?:, request: %{QS:request})?(?:, host: %{QS:client_ip})?(?:, referrer: \"%{URI:referrer})? # FOR JAVA JAVACLASS (?:[a-zA-Z0-9-]+\.)+[A-Za-z0-9$]+ JAVALOGMESSAGE (.*) CATALINA_EXEC %{WORD}-%{WORD}-%{WORD} TOMCAT_DATESTAMP %{YEAR}-%{MONTHNUM}-%{MONTHDAY} %{HOUR}:?%{MINUTE}(?::?%{SECOND}) TOMCAT_ERROR %{TOMCAT_DATESTAMP:timestamp} \[%{CATALINA_EXEC:catalina_exec}\] %{LOGLEVEL:level} %{JAVACLASS:class} - %{JAVALOGMESSAGE_NONGREEDY:logmessage}\n%{JAVALOGMESSAGE_NONGREEDY:logmessage}\n\t%{GREEDYDATA:fulllogmessge} # FOR openresty lua相關日誌 (?<timestamp>\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2}) \[%{LOGLEVEL:severity}\] %{POSINT:pid}#%{NUMBER}: %{GREEDYDATA:errormessage}\n(?<traceback>\S+ \S+):\n(?<coroutine>\S+ \d):\s+(?<function>\[C]: in function 'require')?\s+?(?<func-errmsg>.*>), (client: (?<client_ip>%{IP}|%{HOSTNAME}))(?:, server: %{IPORHOST:server})(?:, request: %{QS:request})(?:, host: %{QS:client_ip})
error 格式不固定,很差處理,上面的方法也只能處理個別的errorlog。
cluster.name: my-application bootstrap.mlockall: true network.host: 192.168.241.130 http.port: 9200 security.manager.enabled: false # 若是連接hadoop,要設置false
elasticsearch.url: "http://192.168.241.130:9200"
清理elasticsearch過時數據:
curl -XDELETE 'http://192.168.241.130:9200/logstash-2016.02.24'
1. 不要用vim修改被監控文件,會形成保存後,以前存在的內容也被髮送給logstash。
2. 目前filebeat ssl驗證貌似有問題。