這裏記錄Logstash配置中注意的事項:html
整個配置文件分爲三部分:input,filter,output。參考這裏的介紹 https://www.elastic.co/guide/en/logstash/current/configuration-file-structure.htmljava
1 在Windows中,文件路徑中分隔符要使用/而不是\。若是使用了\,那麼*匹配將會失敗。node
2 默認的@timestamp是使用UTC時間表示的,因此對於北京時間會有8小時差,咱們不少狀況會使用日期來作type區分日誌,這時有個辦法是在filter中增長ruby來作轉換。實際上是新加了一個屬性來表示日期。正則表達式
另外有個選項是在date中有timezone的屬性,可是若是設置它爲UTC,那麼整個日誌中的時間就是UTC時間了,不是一個理想的作法。redis
下面是一個配置解釋shell
2016-5-5更新:apache
#整個配置文件分爲三部分:input,filter,output #參考這裏的介紹 https://www.elastic.co/guide/en/logstash/current/configuration-file-structure.html input { #file能夠屢次使用,也能夠只寫一個file而設置它的path屬性配置多個文件實現多文件監控 file { #type是給結果增長了一個屬性叫type值爲"<xxx>"的條目。這裏的type,對應了ES中index中的type,即若是輸入ES時,沒有指定type,那麼這裏的type將做爲ES中index的type。 type => "apache-access" path => "/apphome/ptc/Windchill_10.0/Apache/logs/access_log*" #start_position能夠設置爲beginning或者end,beginning表示從頭開始讀取文件,end表示讀取最新的,這個也要和ignore_older一塊兒使用。 start_position => beginning #sincedb_path表示文件讀取進度的記錄,每行表示一個文件,每行有兩個數字,第一個表示文件的inode,第二個表示文件讀取到的位置(byteoffset)。默認爲$HOME/.sincedb* sincedb_path => "/opt/logstash-2.3.1/sincedb_path/access_progress" #ignore_older表示了針對多久的文件進行監控,默認一天,單位爲秒,能夠本身定製,好比默認只讀取一天內被修改的文件。 ignore_older => 604800 #add_field增長屬性。這裏使用了${HOSTNAME},即本機的環境變量,若是要使用本機的環境變量,那麼須要在啓動命令上加--alow-env。 add_field => {"log_hostname"=>"${HOSTNAME}"} #這個值默認是\n 換行符,若是設置爲空"",那麼後果是每一個字符表明一個event delimiter => "" #這個表示關閉超過(默認)3600秒後追蹤文件。這個對於multiline來講特別有用。... 這個參數和logstash對文件的讀取方式有關,兩種方式read tail,若是是read close_older => 3600 coodec => multiline { pattern => "^\s" #這個negate是否認的意思,意思跟pattern相反,也就是不知足patter的意思。 # negate => "" #what有兩個值可選 previous和next,舉例說明,java的異常從第二行以空格開始,這裏就能夠pattern匹配空格開始,what設置爲previous意思是空格開頭這行跟上一行屬於同一event。另外一個例子,有時候一條命令太長,當以\結尾時表示這行屬於跟下一行屬於同一event,這時須要使用negate=>true,what=>'next'。 what => "previous" auto_flush_interval => 60 } } file { type => "methodserver-log" path => "/apphome/ptc/Windchill_10.0/Windchill/logs/MethodServer-1604221021-32380.log" start_position => beginning sincedb_path => "/opt/logstash-2.3.1/sincedb_path/methodserver_process" # ignore_older => 604800 } } filter{ #執行ruby程序,下面例子是將日期轉化爲字符串賦予daytag ruby { code => "event['daytag'] = event.timestamp.time.localtime.strftime('%Y-%m-%d')" } # if [path] =~ "access" {} else if [path] =~ "methodserver" {} else if [path] =~ "servermanager" {} else {} 注意語句結構 if [path] =~ "MethodServer" { #z這裏的=~是匹配正則表達式 grok { patterns_dir => ["/opt/logstash-2.3.1/patterns"] #自定義正則匹配 # Tue 4/12/16 14:24:17: TP-Processor2: hirecode---->77LS match => { "message" => "%{DAY:log_weekday} %{DATE_US:log_date} %{TIME:log_time}: %{GREEDYDATA:log_data}"} } #mutage是作轉換用的 mutate { replace => { "type" => "apache" } #替換屬性值 convert => { #類型轉換 "bytes" => "integer" #例如還有float "duration" => "integer" "state" => "integer" } #date主要是用來處理文件內容中的日期的。內容中讀取的是字符串,經過date將它轉換爲@timestamp。參考https://www.elastic.co/guide/en/logstash/current/plugins-filters-date.html#plugins-filters-date-match # date { # match => [ "logTime" , "dd/MMM/yyyy:HH:mm:ss Z" ] # } }else if [type] in ['tbg_qas','mbg_pre'] { # if ... else if ... else if ... else結構 }else { drop{} # 將event丟棄 } } output { stdout{ codec=>rubydebug} # 直接輸出,調試用起來方便 # 輸出到redis redis { host => '10.120.20.208' data_type => 'list' key => '10.99.201.34:access_log_2016-04' } # 輸出到ES elasticsearch { hosts =>"192.168.0.15:9200" index => "%{sysid}_%{type}" document_type => "%{daytag}" } }
下面是兩個真實的例子,第一個是從應用到redis,第二個是從redis到ES。ruby
input { file { type => "log_raw_data" path => "/apphome/ptc/Windchill_10.0/Windchill/logs/gc/*GC.log" start_position => end sincedb_path => "/opt/logstash-2.3.1/sincedb_path/log_progress" # ignore_older => 604800 add_field => {"sysid"=>"tbg_qas"} } file { type => "log_raw_data" path => ["/apphome/ptc/Windchill_10.0/Windchill/logs/*MethodServer*.log","/apphome/ptc/Windchill_10.0/Windchill/logs/ServerManager-*.log"] start_position => end sincedb_path => "/opt/logstash-2.3.1/sincedb_path/log_progress" # ignore_older => 604800 add_field => {"sysid"=>"tbg_qas"} close_older => 60 codec => multiline { # patterns_dir => ["D:/app/logstash-2.3.1/patterns"] pattern => "^%{DAY} %{DATESTAMP}:" negate => true what => "previous" # auto_flush_interval => 20 } } } output { # stdout{ codec=>rubydebug} redis { host => '10.120.20.208' data_type => 'list' key => 'log_raw_data' } redis { host => '10.120.31.142' data_type => 'list' key => 'log_raw_data' } }
input { redis { host => "localhost" data_type => "list" port => "6379" key => "log_raw_data" type => "redis-input" } } filter{ ruby { code => "event['daytag'] = event.timestamp.time.localtime.strftime('%Y-%m-%d')" } if [path] =~ "access" { grok { match => { "message" => "%{IPORHOST:clientip} %{HTTPDUSER:ident} %{USER:username} \[%{HTTPDATE:logtime}\] \"%{WORD:verb} %{NOTSPACE:request} (?:%{NOTSPACE:httpversion}|)\" (?:%{NUMBER:state}|-) (?:%{NUMBER:bytes}|-) %{NUMBER:duration}"} } mutate { replace => { "type" => "apache" } convert => { "bytes" => "integer" "duration" => "integer" "state" => "integer" } } date { match => [ "logtime" , "dd/MMM/yyyy:HH:mm:ss Z" ] } }else if [path] =~ ".*ServerManager.*GC\.log" { if [message] =~ "\[Full GC" { grok { match => {"message" => "%{TIMESTAMP_ISO8601:logtime}: %{GREEDYDATA:gcdetail} \[Times: user=%{BASE10NUM:usertime} sys=%{BASE10NUM:systime}, real=%{BASE10NUM:realtime} secs\]"} } date { match => ["logtime" , "yyyy-MM-dd'T'HH:mm:ss.SSS'+0800'"] } }else if [message] =~ "\[GC" { grok { match => {"message" => "%{TIMESTAMP_ISO8601:logtime}: %{GREEDYDATA:gcdetail} \[Times: user=%{BASE10NUM:usertime} sys=%{BASE10NUM:systime}, real=%{BASE10NUM:realtime} secs\]"} } date { match => ["logtime" , "yyyy-MM-dd'T'HH:mm:ss.SSS'+0800'"] } }else{ drop {} } mutate { replace => {"type" => "smgc" } convert => { "usertime" => "float" "systime" => "float" "realtime" => "float" } } }else if [path] =~ ".*MethodServer.*GC\.log" { if [message] =~ "\[Full GC" { grok { match => {"message" => "%{TIMESTAMP_ISO8601:logtime}: %{GREEDYDATA:gcdetail} \[Times: user=%{BASE10NUM:usertime} sys=%{BASE10NUM:systime}, real=%{BASE10NUM:realtime} secs\]"} } date { match => ["logtime" , "yyyy-MM-dd'T'HH:mm:ss.SSS'+0800'"] } }else if [message] =~ "\[GC" { grok { match => {"message" => "%{TIMESTAMP_ISO8601:logtime}: %{GREEDYDATA:gcdetail} \[Times: user=%{BASE10NUM:usertime} sys=%{BASE10NUM:systime}, real=%{BASE10NUM:realtime} secs\]"} } date { match => ["logtime" , "yyyy-MM-dd'T'HH:mm:ss.SSS'+0800'"] } }else{ drop {} } mutate { replace => {"type" => "msgc" } convert => { "usertime" => "float" "systime" => "float" "realtime" => "float" } } }else if [path] =~ "MethodServer" { grok { match => { "message" => "%{DAY:weekday} %{DATESTAMP:logtime}: %{GREEDYDATA:logdata}"} } date { match => [ "logtime" , "M/d/yy HH:mm:ss" ] } mutate { replace => { "type" => "ms" } } }else if [path] =~ "ServerManager" { grok { match => { "message" => "%{DAY:weekday} %{DATESTAMP:logtime}: %{GREEDYDATA:logdata}"} } date { match => [ "logtime" , "M/d/yy HH:mm:ss" ] } mutate { replace => { "type" => "sm" } } }else if [path] =~ "Process_Archive" { grok { patterns_dir => ["/opt/logstash-2.3.1/patterns"] match => { "message" => "%{PROCESS_DATETIME:logtime} %{GREEDYDATA:logdata}"} } date { match => [ "logtime" , "yyyy MMM dd HH:mm:ss:SSS 'GMT +8'" ] } mutate { replace => { "type" => "prc_arc" } } }else if [path] =~ "ESISAPAdapterConfiguration" { grok { patterns_dir => ["/opt/logstash-2.3.1/patterns"] match => { "message" => "%{PROCESS_DATETIME:logtime} %{GREEDYDATA:logdata}"} } date { match => [ "logtime" , "yyyy MMM dd HH:mm:ss:SSS 'GMT +8'" ] } mutate { replace => { "type" => "esi_adp" } } }else if [path] =~ "LenovoAdapterConfiguration" { grok { patterns_dir => ["/opt/logstash-2.3.1/patterns"] match => { "message" => "%{PROCESS_DATETIME:logtime} %{GREEDYDATA:logdata}"} } date { match => [ "logtime" , "yyyy MMM dd HH:mm:ss:SSS 'GMT +8'" ] } mutate { replace => { "type" => "le_adp" } } }else { mutate { replace => { "type" => "other" } } # drop {} } # extractnumbers { # source => "duration" # } } output { # stdout{ codec=>rubydebug} elasticsearch { hosts =>"192.168.0.15:9200" index => "%{sysid}_%{type}" document_type => "%{daytag}" } }