安裝過程很簡單,直接參照官方文檔: https://www.elastic.co/guide/en/logstash/current/installing-logstash.htmlhtml
# rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch # vim /etc/yum.repos.d/logstash.repo [logstash-6.x] name=Elastic repository for 6.x packages baseurl=https://artifacts.elastic.co/packages/6.x/yum gpgcheck=1 gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch enabled=1 autorefresh=1 type=rpm-md # sudo yum install logstash
# ln -s /usr/share/logstash/bin/logstash /usr/bin/logstash # 能夠對logstash可執行文件創建一個軟連接,便於直接使用logstash命令
logstash.yml 主配置文件nginx
# cat /etc/logstash/logstash.yml |grep -v ^# path.data: /data/logstash #數據存儲路徑 path.config: /etc/logstash/conf.d/*.conf #配置文件目錄 path.logs: /var/log/logstash #日誌輸出路徑
# mkdir -p /data/logstash #建立data目錄
# chown logstash.logstash /data/logstash #受權
jvm.options 這個配置文件是有關jvm的配置,能夠配置運行時內存的最大最小值,垃圾清理機制等git
-Xms256m #設置內存大小
-Xmx256m
startup.options logstash運行相關的參數github
配置文件是寫在/etc/logstash/conf.d/ 下,以.conf結尾。正則表達式
logstash pipeline 包含兩個必須的元素:input和output,和一個可選元素:filter。redis
從input讀取事件源,(通過filter解析和處理以後),從output輸出到目標存儲庫(elasticsearch或其餘)。sql
運行一個最基本的logstash測試一下:數據庫
# logstash -e'input {stdin {}} output {stdout {}}'
看到 - Successfully started Logstash API endpoint {:port=>9600} 這條信息後 說明logstash已經成功啓動,這時輸入你要測試的內容json
這只是一個測試事件,生產環境使用logstash,通常使用都將配置寫入文件裏面,而後啓動logstash。bootstrap
例如,我要處理nginx日誌,我先在/etc/logstash/conf.d 下建立一個 nginx_access.conf的日誌。
# cat nginx_access.conf
input{ file{ path => "/var/log/nginx/access.log" start_position => "beginning" type => "nginx_access_log" } } filter{ grok{ match => {"message" => "%{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] \"%{WORD:verb} %{DATA:request} HTTP/%{NUMBER:httpversion}\" %{NUMBER:response:int} (?:-|%{NUMBER:bytes:int}) \"(?:-|%{DATA:referrer})\" \"%{DATA:user_agent}\" (?:%{IP:proxy}|-) %{DATA:upstream_addr} %{NUMBER:upstream_request_time:float} %{NUMBER:upstream_response_time:float}"} match => {"message" => "%{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] \"%{WORD:verb} %{DATA:request} HTTP/%{NUMBER:httpversion}\" %{NUMBER:response:int} (?:-|%{NUMBER:bytes:int}) \"%{DATA:referrer}\" \"%{DATA:user_agent}\" \"%{DATA:proxy}\""} } if [request] { urldecode { field => "request" } ruby { init => "@kname = ['url_path','url_arg']" code => " new_event = LogStash::Event.new(Hash[@kname.zip(event.get('request').split('?'))]) event.append(new_event)" } if [url_arg] { ruby { init => "@kname = ['key', 'value']" code => "event.set('url_args', event.get('url_arg').split('&').collect {|i| Hash[@kname.zip(i.split('='))]})" } } } geoip{ source => "clientip" } useragent{ source => "user_agent" target => "ua" remove_field => "user_agent" } date { match => ["timestamp","dd/MMM/YYYY:HH:mm:ss Z"] locale => "en" } mutate{ remove_field => ["message","timestamp","request","url_arg"] } } output{ elasticsearch { hosts => "localhost:9200" index => "nginx-access-log-%{+YYYY.MM.dd}" }
# stdout {
# codec => rubydebug
# } }
若是是想測試配置文件寫的是否正確,用下面這個方式啓動測試一下
/usr/share/logstash/bin/logstash -t -f /etc/logstash/conf.d/nginx.conf #測試配置文件 Configuration OK /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/nginx_access.conf #啓動logstash
啓動logstash
# systemctl start logstash
input plugin 讓logstash能夠讀取特定的事件源。
官網:https://www.elastic.co/guide/en/logstash/current/input-plugins.html
事件源能夠是從stdin屏幕輸入讀取,能夠從file指定的文件,也能夠從es,filebeat,kafka,redis等讀取
file{ path => ['/var/log/nginx/access.log'] #要輸入的文件路徑 type => 'nginx_access_log' start_position => "beginning" } # path 能夠用/var/log/*.log,/var/log/**/*.log,若是是/var/log則是/var/log/*.log # type 通用選項. 用於激活過濾器 # start_position 選擇logstash開始讀取文件的位置,begining或者end。 還有一些經常使用的例如:discover_interval,exclude,sincedb_path,sincedb_write_interval等能夠參考官網
syslog{ port =>"514" type => "syslog" } # port 指定監聽端口(同時創建TCP/UDP的514端口的監聽) #從syslogs讀取須要實現配置rsyslog: # cat /etc/rsyslog.conf 加入一行 *.* @172.17.128.200:514 #指定日誌輸入到這個端口,而後logstash監聽這個端口,若是有新日誌輸入則讀取 # service rsyslog restart #重啓日誌服務
beats { port => 5044 #要監聽的端口 } # 還有host等選項 # 從beat讀取須要先配置beat端,從beat輸出到logstash。 # vim /etc/filebeat/filebeat.yml .......... output.logstash: hosts: ["localhost:5044"]
kafka{ bootstrap_servers=> "kafka01:9092,kafka02:9092,kafka03:9092" topics => ["access_log"] group_id => "logstash-file" codec => "json" }
kafka{
bootstrap_servers=> "kafka01:9092,kafka02:9092,kafka03:9092" topics => ["weixin_log","user_log"] codec => "json" }
# bootstrap_servers 用於創建羣集初始鏈接的Kafka實例的URL列表。
# topics 要訂閱的主題列表,kafka topics
# group_id 消費者所屬組的標識符,默認爲logstash。kafka中一個主題的消息將經過相同的方式分發到Logstash的group_id # codec 通用選項,用於輸入數據的編解碼器。
還有不少的input插件類型,能夠參考官方文檔來配置。
grok { match => {"message"=>"^%{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "%{WORD:verb} %{DATA:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:response:int} (?:-|%{NUMBER:bytes:int}) %{QS:referrer} %{QS:agent}$"} } 匹配nginx日誌 # 203.202.254.16 - - [22/Jun/2018:16:12:54 +0800] "GET / HTTP/1.1" 200 3700 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/601.7.7 (KHTML, like Gecko) Version/9.1.2 Safari/601.7.7" #220.181.18.96 - - [13/Jun/2015:21:14:28 +0000] "GET /blog/geekery/xvfb-firefox.html HTTP/1.1" 200 10975 "-" "Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"
grok { match => ["message", "%{IP:clientip} - %{USER:user} \[%{HTTPDATE:raw_datetime}\] \"(?:%{WORD:verb} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion})\" (?:\"%{DATA:body}\" )?(?:\"%{DATA:cookie}\" )?%{NUMBER:response} (?:%{NUMBER:bytes:int}|-) \"%{DATA:referrer}\" \"%{DATA:agent}\" (?:(%{IP:proxy},? ?)*|-|unknown) (?:%{DATA:upstream_addr} |)%{NUMBER:request_time:float} (?:%{NUMBER:upstream_time:float}|-)"] match => ["message", "%{IP:clientip} - %{USER:user} \[%{HTTPDATE:raw_datetime}\] \"(?:%{WORD:verb} %{URI:request} HTTP/%{NUMBER:httpversion})\" (?:\"%{DATA:body}\" )?(?:\"%{DATA:cookie}\" )?%{NUMBER:response} (?:%{NUMBER:bytes:int}|-) \"%{DATA:referrer}\" \"%{DATA:agent}\" (?:(%{IP:proxy},? ?)*|-|unknown) (?:%{DATA:upstream_addr} |)%{NUMBER:request_time:float} (?:%{NUMBER:upstream_time:float}|-)"] }
grok 語法:%{SYNTAX:SEMANTIC} 即 %{正則:自定義字段名}
官方提供了不少正則的grok pattern能夠直接使用 :https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns
grok debug工具: http://grokdebug.herokuapp.com
正則表達式調試工具: https://www.debuggex.com/
正則學習文檔:https://www.jb51.net/tools/zhengze.html
自定義模式: (?<字段名>the pattern)
例如: 匹配 2018/06/27 14:00:54
(?<datetime>\d\d\d\d\/\d\d\/\d\d \d\d:\d\d:\d\d)
獲得結果: "datetime": "2018/06/27 14:00:54"
[2018-07-04 17:43:35,503] grok{ match => {"message"=>"%{DATA:raw_datetime}"} } date{ match => ["raw_datetime","YYYY-MM-dd HH:mm:ss,SSS"] remove_field =>["raw_datetime"] } #將raw_datetime存到@timestamp 而後刪除raw_datetime #24/Jul/2018:18:15:05 +0800 date { match => ["timestamp","dd/MMM/YYYY:HH:mm:ss Z] } #1565931281 date{ match => ["sql_timestamp","UNIX"] locale => "en" }
filter{ mutate{ # covert => ["response","integer","bytes","float"] #數組的類型轉換 convert => {"message"=>"integer"} } } #測試-------> { "host" => "localhost", "message" => 123, #沒帶「」,int類型 "@timestamp" => 2018-06-26T02:51:08.651Z, "@version" => "1" }
mutate{ split => {"message"=>","} } #----------> aaa,bbb { "@timestamp" => 2018-06-26T02:40:19.678Z, "@version" => "1", "host" => "localhost", "message" => [ [0] "aaa", [1] "bbb" ]} 192,128,1,100 { "host" => "localhost", "message" => [ [0] "192", [1] "128", [2] "1", [3] "100" ], "@timestamp" => 2018-06-26T02:45:17.877Z, "@version" => "1" }
filter{ mutate{ add_field => {"field1"=>"value1"} } mutate{ split => {"message"=>"."} #把message字段按照.分割 } mutate{ merge => {"message"=>"field1"} #將filed1字段加入到message字段 } } #---------------> abc { "message" => [ [0] "abc," [1] "value1" ], "@timestamp" => 2018-06-26T03:38:57.114Z, "field1" => "value1", "@version" => "1", "host" => "localhost" } abc,.123 { "message" => [ [0] "abc,", [1] "123", [2] "value1" ], "@timestamp" => 2018-06-26T03:38:57.114Z, "field1" => "value1", "@version" => "1", "host" => "localhost" }
filter{ mutate{ rename => {"message"=>"info"} } } #--------> 123 { "@timestamp" => 2018-06-26T02:56:00.189Z, "info" => "123", "@version" => "1", "host" => "localhost" }
mutate { remove_field => ["message","datetime"] }
mutate{ split => {"message"=>":"} } mutate{ join => {"message"=>","} } ------> abc:123 { "@timestamp" => 2018-06-26T03:55:41.426Z, "message" => "abc,123", "host" => "localhost", "@version" => "1" } aa:cc { "@timestamp" => 2018-06-26T03:55:47.501Z, "message" => "aa,cc", "host" => "localhost", "@version" => "1" }
mutate{ gsub => ["message","/","_"] #用_替換/ } ------> a/b/c/ { "@version" => "1", "message" => "a_b_c_", "host" => "localhost", "@timestamp" => 2018-06-26T06:20:10.811Z }
mutate{ add_field => {"field1"=>"value1"} } mutate{ update => {"field1"=>"v1"} update => {"field2"=>"v2"} #field2不存在 不作處理 } ----------------> { "@timestamp" => 2018-06-26T06:26:28.870Z, "field1" => "v1", "host" => "localhost", "@version" => "1", "message" => "a" }
mutate{ add_field => {"field1"=>"value1"} } mutate{ replace => {"field1"=>"v1"} replace => {"field2"=>"v2"} } ----------------------> { "message" => "1", "host" => "localhost", "@timestamp" => 2018-06-26T06:28:09.915Z, "field2" => "v2", #field2不存在,則新建 "@version" => "1", "field1" => "v1" }
geoip { source => "clientip" database =>"/tmp/GeoLiteCity.dat" }
filter{ urldecode{ field => "message" } ruby { init => "@kname = ['url_path','url_arg']" code => " new_event = LogStash::Event.new(Hash[@kname.zip(event.get('message').split('?'))]) event.append(new_event)" } if [url_arg]{ kv{ source => "url_arg" field_split => "&" target => "url_args" remove_field => ["url_arg","message"] } } } # ruby插件 # 以?爲分隔符,將request字段分紅url_path和url_arg --------------------> www.test.com?test { "url_arg" => "test", "host" => "localhost", "url_path" => "www.test.com", "message" => "www.test.com?test", "@version" => "1", "@timestamp" => 2018-06-26T07:31:04.887Z } www.test.com?title=elk&content=學習elk { "url_args" => { "title" => "elk", "content" => "學習elk" }, "host" => "localhost", "url_path" => "www.test.com", "@version" => "1", "@timestamp" => 2018-06-26T07:33:54.507Z }
urldecode{ field => "message" } # field :指定urldecode過濾器要轉碼的字段,默認值是"message" # charset(缺省): 指定過濾器使用的編碼.默認UTF-8
kv{ prefix => "url_" #給分割後的key加前綴 target => "url_ags" #將分割後的key-value放入指定字段 source => "message" #要分割的字段 field_split => "&" #指定分隔符 remove_field => "message" } --------------------------> a=1&b=2&c=3 { "host" => "localhost", "url_ags" => { "url_c" => "3", "url_a" => "1", "url_b" => "2" }, "@version" => "1", "@timestamp" => 2018-06-26T07:07:24.557Z
if [agent] != "-" { useragent { source => "agent" target => "ua" remove_field => "agent" } } # if語句,只有在agent字段不爲空時纔會使用該插件 #source 爲必填設置,目標字段 #target 將useragent信息配置到ua字段中。若是不指定將存儲在根目錄中
等於: ==, !=, <, >, <=, >=
正則: =~, !~ (checks a pattern on the right against a string value on the left)
包含關係: in, not in
支持的布爾運算符:and, or, nand, xor
支持的一元運算符: !
output{ stdout{ codec => "rubydebug" } }
file { path => "/data/logstash/%{host}/{application} codec => line { format => "%{message}"} } }
kafka{ bootstrap_servers => "localhost:9092" topic_id => "test_topic" #必需的設置。生成消息的主題 }
elasticsearch { hosts => "localhost:9200" index => "nginx-access-log-%{+YYYY.MM.dd}" } #index 事件寫入的索引。能夠按照日誌來建立索引,以便於刪舊數據和按時間來搜索日誌
codec 本質上是流過濾器,能夠做爲input 或output 插件的一部分運行。例如上面output的stdout插件裏有用到。
input { stdin { codec => multiline { pattern => "pattern, a regexp" #正則匹配規則,匹配到的內容按照下面兩個參數處理 negate => "true" or "false" # 默認爲false。處理匹配符合正則規則的行。若是爲true,處理不匹配符合正則規則的行。 what => "previous" or "next" #指定上下文。將指定的行是合併到上一行或者下一行。 } } } codec => multiline { pattern => "^\s" what => "previous" } # 以空格開頭的行都合併到上一行 codec => multiline { # Grok pattern names are valid! :) pattern => "^%{TIMESTAMP_ISO8601} " negate => true what => "previous" } # 任何不以這個時間戳格式開頭的行都與上一行合併 codec => multiline { pattern => "\\$" what => "next" } # 以反斜槓結尾的行都與下一行合併