ELK 之 Logstash

ELK 之 Logstash

簡介:

ELK 之 LogstashLogstash 是一個接收,處理,轉發日誌的工具。支持系統日誌,webserver 日誌,錯誤日誌,應用日誌,總之包括全部能夠拋出來的日誌類型。在一個典型的使用場景下(ELK):用 Elasticsearch 做爲後臺數據的存儲,kibana用來前端的報表展現。Logstash 在其過程當中擔任搬運工的角色,它爲數據存儲,報表查詢和日誌解析建立了一個功能強大的管道鏈。Logstash 提供了多種多樣的 input,filters,codecs 和 output 組件,讓使用者輕鬆實現強大的功能。php

安裝:

(須要 jdk 環境,安裝過程這裏再也不闡述,筆者此處使用 jdk 1.8) 
這裏使用 2.4.1 版本,是爲了和公司 elasticsearch2.x 配合,版本自行控制。 
注意: ELK 技術棧有 version check,軟件大版本號須要一致前端

yum -y install https://download.elastic.co/logstash/logstash/packages/centos/logstash-2.4.1.noarch.rpm

安裝完成後會生成兩個主要目錄和一個配置文件 
程序主體目錄:/opt/logstash 
log 分析配置文件目錄:/etc/logstash/conf.d 
程序運行配置文件:/etc/sysconfig/logstash 
先測試是否安裝成功node

[root@~]#/opt/logstash/bin/logstash -e 'input{stdin{}}output{stdout{codec=>rubydebug}}'
Settings: Default pipeline workers: 4
Pipeline main started
hello world! # 輸入測試字符串
{
"message" => "hello world!",  # 成功輸出
"@version" => "1",
"@timestamp" => "2017-08-07T07:47:35.938Z",
"host" => "iZbp13lsytivlvvks4ulatZ"
}

如何執行按指定配置文件執行mysql

/opt/logstash/bin/logstash –w 2 -f /etc/logstash/conf.d/test.conf
  • 1

參數

-w # 指定線程,默認是 cpu 核數 
-f # 指定配置文件 
-t # 測試配置文件是否正常 
-b # 執行 filter 模塊以前最大能積累的日誌,數值越大性能越好,同時越佔內 
nginx

配置文件寫法:

# 日誌導入
input {
}
# 日誌篩選匹配處理
filter {
}
# 日誌匹配輸出
output {
}

日誌解析配置文件的框架共分爲三個模塊,input,output,filter。後面會一一講解, 每一個模塊裏面存在不一樣的插件。web

input 模塊

列子1

input {
    # file爲經常使用文件插件,插件內選項不少,可根據需求自行判斷
    file {
        path => "/var/lib/mysql/slow.log"
        # 要導入的文件的位置,可使用*,例如/var/log/nginx/*.log
        Excude =>」*.gz」
        # 要排除的文件
        start_position => "beginning"
        # 從文件開始的位置開始讀,end表示從結尾開始讀
        ignore_older => 0  
        # 多久以內沒修改過的文件不讀取,0爲無限制,單位爲秒
        sincedb_path => "/dev/null"
        # 記錄文件上次讀取位置,輸出到null表示每次都從文件首行開始解析
        type => "mysql-slow"
        # type字段,可代表導入的日誌類型
    }   
}

例子2

input {

    # redis插件爲經常使用插件,插件內選項不少,可根據需求自行判斷
    redis {
        batch_count => 1 
        # EVAL命令返回的事件數目,設置爲5表示一次請求返回5條日誌信息
        data_type => "list" 
        # logstash redis插件工做方式
        key => "logstash-test-list" 
        # 監聽的鍵值
        host => "127.0.0.1" 
        # redis地址
        port => 6379 
        # redis端口號
        password => "123qwe" 
        # 若是有安全認證,此項爲認證密碼
        db => 0 
        # 若是應用使用了不一樣的數據庫,此爲redis數據庫的編號,默認爲0。
        threads => 1 
        # 啓用線程數量
      }
}

經常使用的 input 插件其實有不少,這裏只舉例了兩種。其餘還有 kafka,tcp 等等redis

filter 模塊

例子

output 模塊

例子1

output {
    # tdout { codec => "rubydebug" }
    # 篩選過濾後的內容輸出到終端顯示

    elasticsearch {  # 導出到es,最經常使用的插件
        codec => "json"
        # 導出格式爲json
        hosts => ["127.0.0.1:9200"]
        # ES地址+端口
        index => "logstash-slow-%{+YYYY.MM.dd}"
        # 導出到index內,可使用時間變量
        user => "admin"
        password => "xxxxxx"
        # ES若是有安全認證就使用帳號密碼驗證,無安全認證就不須要
        flush_size => 500
        # 默認500,logstash一次性攢夠500條的數據在向es發送
        idle_flush_time => 1
        # 默認1s,若是1s內沒攢夠500,仍是會一次性把數據發給ES
    }   
}

例子2

output {
     redis{  # 輸出到redis的插件,下面選項根據需求使用
         batch => true
         # 設爲false,一次rpush,發一條數據,true爲發送一批
         batch_events => 50
         # 一次rpush發送多少數據
         batch_timeout => 5
         # 一次rpush消耗多少時間
         codec => plain
         # 對輸出數據進行codec,避免使用logstash的separate filter
         congestion_interval => 1
         # 多長時間進項一次擁塞檢查
         congestion_threshold => 5
         # 限制一個list中能夠存在多少個item,當數量足夠時,就會阻塞直到有其餘消費者消費list中的數據
         data_type => list
         # 使用list仍是publish
         db => 0
         # 使用redis的那個數據庫,默認爲0號
         host => ["127.0.0.1:6379"]
         # redis 的地址和端口,會覆蓋全局端口
         key => xxx
         # list或channel的名字
         password => xxx
         # redis的密碼,默認不使用
         port => 6379
         # 全局端口,默認6379,若是host已指定,本條失效
         reconnect_interval => 1
         # 失敗重連的間隔,默認爲1s
         timeout => 5
         # 鏈接超時的時間
         workers => 1
         # 工做進程
     }
}

經常使用插件還有不少,更多的插件使用能夠查看官方文檔 
經過上面的介紹,咱們大致知道了 logstash 的處理流程: 
input => filter => output 
接下來就看一完整的應用例子sql

完整的應用:

Elasticsearch slow-log

input {
    file {
        path => ["/var/log/elasticsearch/private_test_index_search_slowlog.log"]
        start_position => "beginning"
        ignore_older => 0
        # sincedb_path => "/dev/null"
        type => "elasticsearch_slow"
        }   
}

filter {
    grok {
        match =>  { "message" => "^\[(\d\d){1,2}-(?:0[1-9]|1[0-2])-(?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9])\s+(?:2[0123]|[01]?[0-9]):(?:[0-5][0-9]):(?:(?:[0-5]?[0-9]|60)(?:[:.,][0-9]+)?)\]\[(TRACE|DEBUG|WARN\s|INFO\s)\]\[(?<io_type>[a-z\.]+)\]\s\[(?<node>[a-z0-9\-\.]+)\]\s\[(?<index>[A-Za-z0-9\.\_\-]+)\]\[\d+\]\s+took\[(?<took_time>[\.\d]+(ms|s|m))\]\,\s+took_millis\[(\d)+\]\,\s+types\[(?<types>([A-Za-z\_]+|[A-Za-z\_]*))\]\,\s+stats\[\]\,\s+search_type\[(?<search_type>[A-Z\_]+)\]\,\s+total_shards\[\d+\]\,\s+source\[(?<source>[\s\S]+)\]\,\s+extra_source\[[\s\S]*\]\,\s*$" }
        remove_field => ["message"]
        }   

    date {
        match => ["timestamp","dd/MMM/yyyy:HH:mm:ss Z"] 
        }   
    ruby {
        code => "event.timestamp.time.localtime"
        }   
    }

output {
     elasticsearch {
         codec => "json"
         hosts => ["127.0.0.1:9200"]
         index => "logstash-elasticsearch-slow-%{+YYYY.MM.dd}"
         user => "admin"
         password => "xxxx"
    }   

}

Mysql-slow log

input {
    file {
        path => "/var/lib/mysql/slow.log"
        start_position => "beginning"
        ignore_older => 0
        # sincedb_path => "/dev/null"
        type => "mysql-slow"
    }   
}
filter {
    if ([message] =~ "^(\/usr\/local|Tcp|Time)[\s\S]*") { drop {} }
    multiline {
        pattern => "^\#\s+Time\:\s+\d+\s+(0[1-9]|[12][0-9]|3[01]|[1-9])"
        negate => true
        what => "previous"
    }   
    grok {
        match => { "message" => "^\#\sTime\:\s+\d+\s+(?<datetime>%{TIME})\n+\#\s+User@Host\:\s+[A-Za-z0-9\_]+\[(?<mysql_user>[A-Za-z0-9\_]+)\]\s+@\s+(?<mysql_host>[A-Za-z0-9\_]+)\s+\[\]\n+\#\s+Query\_time\:\s+(?<query_time>[0-9\.]+)\s+Lock\_time\:\s+(?<lock_time>[0-9\.]+)\s+Rows\_sent\:\s+(?<rows_sent>\d+)\s+Rows\_examined\:\s+(?<rows_examined>\d+)(\n+|\n+use\s+(?<dbname>[A-Za-z0-9\_]+)\;\n+)SET\s+timestamp\=\d+\;\n+(?<slow_message>[\s\S]+)$"
   }   
        remove_field => ["message"]
   }   
    date {
        match => ["timestamp","dd/MMM/yyyy:HH:mm:ss Z"] 
    }   
    ruby {
        code => "event.timestamp.time.localtime"
    }   
}
output { 
    elasticsearch {
        codec => "json"
        hosts => ["127.0.0.1:9200"]
        index => "logstash-mysql-slow-%{+YYYY.MM.dd}"
        user => "admin"
        password => "xxxxx"
    }   
}

Nginx access.log

logstash 中內置 nginx 的正則,咱們只要稍做修改就能使用 
將下面的內容寫入到/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash- 
patterns-core-2.0.5/patterns/grok-patterns 文件中數據庫

X_FOR (%{IPV4}|-)

NGINXACCESS %{COMBINEDAPACHELOG} \"%{X_FOR:http_x_forwarded_for}\"

ERRORDATE %{YEAR}/%{MONTHNUM}/%{MONTHDAY} %{TIME}

NGINXERROR_ERROR %{ERRORDATE:timestamp}\s{1,}\[%{DATA:err_severity}\]\s{1,}(%{NUMBER:pid:int}#%{NUMBER}:\s{1,}\*%{NUMBER}|\*%{NUMBER}) %{DATA:err_message}(?:,\s{1,}client:\s{1,}(?<client_ip>%{IP}|%{HOSTNAME}))(?:,\s{1,}server:\s{1,}%{IPORHOST:server})(?:, request: %{QS:request})?(?:, host: %{QS:server_ip})?(?:, referrer:\"%{URI:referrer})?

NGINXERROR_OTHER %{ERRORDATE:timestamp}\s{1,}\[%{DATA:err_severity}\]\s{1,}%{GREEDYDATA:err_message}

以後的 log 配置文件以下json

input {
    file {
    path => [ "/var/log/nginx/www-access.log" ]
    start_position => "beginning"
    # sincedb_path => "/dev/null"
    type => "nginx_access"
    }   
}
filter {
    grok {
         match => { "message" => "%{NGINXACCESS}"}
    }
    mutate {
        convert => [ "response","integer" ]
        convert => [ "bytes","integer" ]
    }
    date {
        match => [ "timestamp","dd/MMM/yyyy:HH:mm:ss Z"]
    }   
    ruby {
        code => "event.timestamp.time.localtime"
    }   
}
output {
    elasticsearch {
        codec => "json"
        hosts => ["127.0.0.1:9200"]
        index => "logstash-nginx-access-%{+YYYY.MM.dd}"
        user => "admin"
        password => "xxxx"
    }
}

Nginx error.log

input {
    file {
    path => [ "/var/log/nginx/www-error.log" ]
    start_position => "beginning"
    # sincedb_path => "/dev/null"
    type => "nginx_error"
    }
}
filter {
    grok {
        match => [
                   "message","%{NGINXERROR_ERROR}",
                   "message","%{NGINXERROR_OTHER}"
                 ]
    }   
    ruby {
        code => "event.timestamp.time.localtime"
    }   
     date {
         match => [ "timestamp","dd/MMM/yyyy:HH:mm:ss"]
     } 

}

output {
    elasticsearch {
        codec => "json"
        hosts => ["127.0.0.1:9200"]
        index => "logstash-nginx-error-%{+YYYY.MM.dd}"
        user => "admin"
        password => "xxxx"
    }   
}

PHP error.log

input {
    file {
        path => ["/var/log/php/error.log"]
        start_position => "beginning"
        # sincedb_path => "/dev/null"
        type => "php-fpm_error"
    }   
}

filter {
    multiline {
        pattern => "^\[(0[1-9]|[12][0-9]|3[01]|[1-9])\-%{MONTH}-%{YEAR}[\s\S]+"
        negate => true
        what => "previous"
    }   
    grok {
        match => { "message" => "^\[(?<timestamp>(0[1-9]|[12][0-9]|3[01]|[1-9])\-%{MONTH}-%{YEAR}\s+%{TIME}?)\s+[A-Za-z]+\/[A-Za-z]+\]\s+(?<category>(?:[A-Z]{3}\s+[A-Z]{1}[a-z]{5,7}|[A-Z]{3}\s+[A-Z]{1}[a-z\s]{9,11}))\:\s+(?<error_message>[\s\S]+$)" }

        remove_field => ["message"]
    }   

    date {
        match => ["timestamp","dd/MMM/yyyy:HH:mm:ss Z"] 
    }   

    ruby {
        code => "event.timestamp.time.localtime"
    }   

}

output {
    elasticsearch {
        codec => "json"
        hosts => ["127.0.0.1:9200"]
        index => "logstash-php-error-%{+YYYY.MM.dd}"
        user => "admin"
        password => "xxxxx"
    }   
}

Php-fpm slow-log

input {
    file {
        path => ["/var/log/php-fpm/www.slow.log"]
        start_position => "beginning"
        # sincedb_path => "/dev/null"
        type => "php-fpm_slow"
    }   
}

filter {
    multiline {
        pattern => "^$"
        negate => true
        what => "previous"
    }   
    grok {
        match => { "message" => "^\[(?<timestamp>(0[1-9]|[12][0-9]|3[01]|[1-9])\-%{MONTH}-%{YEAR}\s+%{TIME})\]\s+\[[a-z]{4}\s+(?<pool>[A-Za-z0-9]{1,8})\]\s+[a-z]{3}\s+(?<pid>\d{1,7})\n(?<slow_message>[\s\S]+$)" }

        remove_field => ["message"]
    }   

    date {
        match => ["timestamp","dd-MMM-yyyy:HH:mm:ss Z"] 
    }   

    ruby {
        code => "event.timestamp.time.localtime"
    }   

}

output {

    elasticsearch {
        codec => "json"
        hosts => ["127.0.0.1:9200"]
        index => "logstash-php-fpm-slow-%{+YYYY.MM.dd}"
        user => "admin"
        password => "xxxx"
    }   
}

log 解析配置文件統一放在/etc/logstash/conf.d 目錄下,不過也能夠任意放 
置,統一塊兒來最好。 
在多個配置文件的時候,不能使用以下命令運行logstash:

/opt/logstash/bin/logstash -f /etc/logstash/conf.d/(或者有個*)
  • 1

這個命令會拼接配置文件,不會單個使用,會報錯。 
若是有多個配置文件,就一個一個啓動:

/opt/logstash/bin/logstash -f /etc/logstash/conf.d/nginx_error.conf
  • 1

可是這樣也很麻煩,若是配置文件不少的狀況下須要一個個來,而且啓動 
速度還很慢,因此我寫了一個測試腳本用來方便使用,僅供參考:

#!/bin/bash
conf_path=/etc/logstash/conf.d  
# /配置文件存放目錄根據需求本身更改
conf_name=$( ls ${conf_path} ) 

case $1 in

start)
    echo "-----------please wait.----------"
    echo "The start-up process is too slow."
    for cf in ${conf_name}
    do  
        /opt/logstash/bin/logstash  -f $conf_path/$cf > /dev/null 2>&1 &
            if [ $? -ne 0 ];then
                echo 'The '${cf}' start-up failed.'
            fi
        sleep 20
    done
    echo "start-up success."
;;    
stop)
    ps -ef |grep logstash |grep -v grep > /dev/null 2>&1 
    if [ $? -eq 0 ];then
        ps -ef|grep logstash |grep -v grep |awk '{print $2}'|xargs  kill -9 > /dev/null 2>&1
        sleep 2
        echo "Stop success."
    fi  
;;
restart)
    ps -ef |grep logstash |grep -v grep 2>&1
    if [ $? -eq 0 ];then
        ps -ef|grep logstash |grep -v grep |awk '{print $2}'|xargs  kill -9 > /dev/null 2>&1
        sleep 3
        echo "Stop success."
    fi  
    echo "-----------please wait.----------"
    echo "The start-up process is too slow."
    for cf in ${conf_name}
    do  
        /opt/logstash/bin/logstash  -f $conf_path/$cf > /dev/null 2>&1 &
            if [ $? -ne 0 ];then
                echo 'The '${cf}' start-up failed.'
            fi
        sleep 10
    done 
    echo "start-up success."
;;
*)
    echo "Usage: "$0" {start|stop|restart|}"
    exit 1
esac

腳本的名字中不要包含 logstash,這裏保存爲 log_stash.sh  使用./log_stash.sh (start|stop|restart) 來執行腳本

相關文章
相關標籤/搜索