1、Logstash 介紹php
Logstash 是一款強大的數據處理工具,它能夠實現數據傳輸,格式處理,格式化輸出,還有強大的插件功能,經常使用於日誌處理。
官網地址:https://www.elastic.co/products/logstash laravel
Logstash 工做的三個階段:redis
input 數據輸入端,能夠接收來自任何地方的源數據。mongodb
redis:從redis-server list 中獲取
Filter 數據中轉層,主要進行格式處理,數據類型轉換、數據過濾、字段添加,修改等,經常使用的過濾器以下。
grok: 經過正則解析和結構化任何文本。Grok 目前是logstash最好的方式對非結構化日誌數據解析成結構化和可查詢化。logstash內置了120個匹配模式,知足大部分需求。
mutate: 在事件字段執行通常的轉換。能夠重命名、刪除、替換和修改事件字段。
drop: 徹底丟棄事件,如debug事件。
clone: 複製事件,可能添加或者刪除字段。
geoip: 添加有關IP地址地理位置信息。
output 是logstash工做的最後一個階段,負責將數據輸出到指定位置,兼容大多數應用,經常使用的有:
elasticsearch: 發送事件數據到 Elasticsearch,便於查詢,分析,繪圖。
file: 將事件數據寫入到磁盤文件上。
mongodb:將事件數據發送至高性能NoSQL mongodb,便於永久存儲,查詢,分析,大數據分片。
graphite: 發送事件數據到graphite。http://graphite.wikidot.com/
statsd: 發送事件數據到 statsd。
# 配置yum源,新建yum文件 vim /etc/yum.repos.d/es.repo # 加入如下內容 [kibana-4.5] name=Kibana repository for 4.5.x packages baseurl=http://packages.elastic.co/kibana/4.5/centos gpgcheck=1 gpgkey=http://packages.elastic.co/GPG-KEY-elasticsearch enabled=1 [beats] name=Elastic Beats Repository baseurl=https://packages.elastic.co/beats/yum/el/$basearch enabled=1 gpgkey=https://packages.elastic.co/GPG-KEY-elasticsearch gpgcheck=1 # 執行安裝指令 yum install logstash -y
logstash/bin/logstash -e 'input{stdin{}}output{stdout{codec=>rubydebug}}' # 輸入 abc 輸出以下 { "message" => "abc", "@version" => "1", "@timestamp" => "2016-08-20T03:33:00.769Z", "host" => "iZ23tzjZ" }
logstash使用{ }來定義配置區域,區域內又能夠包含其插件的區域配置。
# 最基本的配置文件定義,必須包含input 和 output。 input{ stdin{ } } output{ stdout{ codec=>rubydebug } } # 若是須要對數據進操做,則須要加上filter段 input{ stdin{ } } filter{ # 裏面能夠包含各類數據處理的插件,如文本格式處理 grok、鍵值定義 kv、字段添加、 # geoip 獲取地理位置信息等等... } output{ stdout{ codec=>rubydebug } } # 能夠定義多個輸入源與多個輸出位置 input{ stdin{ } file{ path => ["/var/log/message"] type => "system" start_position => "beginning" } } output{ stdout{ codec=>rubydebug } file { path => "/var/datalog/mysystem.log.gz" gzip => true } }
# 經過手動指定配置文件啓動 /bin/logstash -f /etc/logstash/conf.d/nginx_logstash.conf # 以daemon方式運行,則在指令後面加一個 & 符號 /bin/logstash -f /etc/logstash/conf.d/nginx_logstash.conf & # 若是是經過rpm包安裝的logstash則可使用自帶的腳本啓動 /etc/init.d/logstash start # 經過這種方式啓動,logstash會自動加載 /etc/logstash/conf.d/ 下的配置文件
filebeat是基於原先 logstash-forwarder 的源碼開發而來,無需JAVA環境,運行起來更輕便,無疑是業務服務器端的日誌收集工具。
配 置
# 配置文件路徑 "/etc/filebeat/filebeat.yml" # 一個配置文件能夠包含多個prospectors,一個prospectors能夠包含多個path。 filebeat: # List of prospectors to fetch data. prospectors: # Each - is a prospector. Below are the prospector specific configurations - paths: - /var/log/messages input_type: log document_type: messages - paths: - /alidata/log/nginx/access/access.log.json input_type: log document_type: nginxacclog - paths: - /alidata/www/storage/logs/laravel.log input_type: log document_type: larlog - paths: - /alidata/www/500_error/500_error.log input_type: log document_type: error500 - paths: - /alidata/www/deposit/deposit.log input_type: log document_type: deposit - paths: - /alidata/www/call_error.log input_type: log document_type: call_error - paths: - /alidata/www/weixin_deposit.log input_type: log document_type: weixin_deposit - paths: - /alidata/log/php/php-fpm.log.slow input_type: log document_type: phpslowlog # 多行處理 multiline: pattern: '^[[:space:]]' negate: true match: after # Additional prospector registry_file: /var/lib/filebeat/registry ############################# Libbeat Config ################################## # Base config file used by all other beats for using libbeat features ############################# Output ########################################## # 輸出數據到 redis output: redis: host: "" port: 6379 password: "123456" # 輸出數據到 logstash ,通常二者選用其一 logstash: hosts: [""] ############################# Shipper ######################################### shipper: # 打上服務器tag name: "host_2" ############################# Logging ######################################### logging: files: rotateeverybytes: 10485760 # = 10MB
filebeat主要配置就是這個配置文件了,設定好以後啓動服務就會自動從源拉取數據發送到指定位置,當數據爲普通行數據時,filebeat會自動爲其添加字段信息,其中一項字段 @timestamp 爲filebeat讀取到這條數據的時間,默認格式爲UTC時間,比中國大陸時間早8小時。
nginx 日誌格式配置
log_format json '{"@timestamp":"$time_iso8601",' '"slbip":"$remote_addr",' '"clientip":"$http_x_forwarded_for",' '"serverip":"$server_addr",' '"size":$body_bytes_sent,' '"responsetime":$request_time,' '"domain":"$host",' '"method":"$request_method",' '"requesturi":"$request_uri",' '"url":"$uri",' '"appversion":"$HTTP_APP_VERSION",' '"referer":"$http_referer",' '"agent":"$http_user_agent",' '"status":"$status"}';
filebeat 配置
filebeat: # List of prospectors to fetch data. prospectors: # Each - is a prospector. Below are the prospector specific configurations - paths: - /alidata/log/nginx/access/access.log.json input_type: log document_type: nginxacclog ############################# Output ########################################## output: logstash: hosts: [""] # 其餘部分配置省略。
logstash 配置 (此處logstash用於接收filebeat的數據,而後轉存redis)
input { beats { port => 5044 codec => "json" } } filter { if [type] == "nginxacclog" { geoip { source => "clientip" target => "geoip" database => "/u01/elk/logstash/GeoLiteCity.dat" add_field => [ "[geoip][coordinates]","%{[geoip][longitude]}" ] add_field => [ "[geoip][coordinates]","%{[geoip][latitude]}" ] } mutate { convert => [ "[geoip][coordinates]","float" ] } } } output{ if [type] == "nginxacclog" { redis { data_type => "list" key => "nginxacclog" host => "" port => "26379" password => "123456" db => "0" } } if [type] == "messages" { redis { data_type => "list" key => "messages" host => "" port => "26379" password => "123456" db => "0" } } }
logstash 配置 (此處logstash用於讀取redis list中的數據,而後轉存elasticsearch)
input{ redis { host => "" port => "26379" db => "0" key => "nginxacclog" threads => 300 password => "123456" data_type => "list" codec => "json" } redis { host => "" port => "26379" db => "0" key => "messages" password => "123456" threads => 50 data_type => "list" codec => "json" } } output { if [type] == "nginxacclog" { elasticsearch { hosts => [""] index => "logstash-nginxacclog-%{+YYYY.MM.dd}" manage_template => true flush_size => 50000 idle_flush_time => 10 workers => 2 } } if [type] == "messages" { elasticsearch { hosts => [""] index => "logstash-messages-%{+YYYY.MM.dd}" manage_template => true flush_size => 50000 idle_flush_time => 30 workers => 1 } } }
threads 開啓多少個線程讀取redis數據,也就是從redis輸入到logstash的速度,線程越多讀取速度越快,可是根據接收節點的接收速度來設置,若是輸入過快,接收速度不夠,則會出現丟數據的狀況,設置一個最佳的threads值須要和接收節點作反覆測試才能得出。
flush_size 控制logstash向Elasticsearch批量發送數據,上面的配置表示,logstash會努力贊到50000條數據一次發送給Elasticsearch。
idle_flush_time 控制logstash多長時間向Elasticsearch發送一次數據,默認爲1秒,根據以上配置,logstash積攢數據未到flush_size 10秒後也會向Elasticsearch發送一次數據。
workers 建議設置爲1或2,若是機器性能不錯能夠設置爲2. 不建議設置的更高。
filebeat 配置(從日誌文件讀取到的數據直接緩存至redis隊列)
filebeat: # List of prospectors to fetch data. prospectors: # Each - is a prospector. Below are the prospector specific configurations - paths: - /alidata/log/nginx/access/access.log.json input_type: log document_type: nginxacclog ############################# Output ########################################## output: redis: host: "" port: 26379 password: "123456"
document_type 自定義日誌類型,在logstash中可經過type判斷作不一樣的處理。
logstash 配置 (此處logstash用於讀取redis list中的數據,而後轉存mongodb)
input { redis { host => "" port => "26379" key => "filebeat" data_type => "list" password => "123456" threads => 50 } redis { host => "" port => "26379" key => "mycat" data_type => "list" password => "123456" threads => 50 type => "mycat" } } output { if [type] == "mycat" { mongodb{ collection => "mycat%{+yyyyMMdd}" isodate => true database => "logdb" uri => "mongodb://log_user:123456@" } } if [type_xi09wnk] == "nginxacclog" { mongodb{ collection => "nginx_accress%{years_dik3k}%{months_dik3k}%{days_dik3k}" isodate => true database => "logdb" uri => "mongodb://log_user:123456@" } } }