Logstash配置總結和實例

這裏記錄Logstash配置中注意的事項:html

整個配置文件分爲三部分:input,filter,output。參考這裏的介紹 https://www.elastic.co/guide/en/logstash/current/configuration-file-structure.htmljava

1 在Windows中,文件路徑中分隔符要使用/而不是\。若是使用了\,那麼*匹配將會失敗。node

2 默認的@timestamp是使用UTC時間表示的,因此對於北京時間會有8小時差,咱們不少狀況會使用日期來作type區分日誌,這時有個辦法是在filter中增長ruby來作轉換。實際上是新加了一個屬性來表示日期。正則表達式

另外有個選項是在date中有timezone的屬性,可是若是設置它爲UTC,那麼整個日誌中的時間就是UTC時間了,不是一個理想的作法。redis


下面是一個配置解釋shell

2016-5-5更新:apache

#整個配置文件分爲三部分:input,filter,output
#參考這裏的介紹 https://www.elastic.co/guide/en/logstash/current/configuration-file-structure.html
input {
  #file能夠屢次使用,也能夠只寫一個file而設置它的path屬性配置多個文件實現多文件監控
  file {
    #type是給結果增長了一個屬性叫type值爲"<xxx>"的條目。這裏的type,對應了ES中index中的type,即若是輸入ES時,沒有指定type,那麼這裏的type將做爲ES中index的type。
    type => "apache-access" 
    path => "/apphome/ptc/Windchill_10.0/Apache/logs/access_log*"
    #start_position能夠設置爲beginning或者end,beginning表示從頭開始讀取文件,end表示讀取最新的,這個也要和ignore_older一塊兒使用。
    start_position => beginning
    #sincedb_path表示文件讀取進度的記錄,每行表示一個文件,每行有兩個數字,第一個表示文件的inode,第二個表示文件讀取到的位置(byteoffset)。默認爲$HOME/.sincedb*
    sincedb_path => "/opt/logstash-2.3.1/sincedb_path/access_progress"
    #ignore_older表示了針對多久的文件進行監控,默認一天,單位爲秒,能夠本身定製,好比默認只讀取一天內被修改的文件。
    ignore_older => 604800
    #add_field增長屬性。這裏使用了${HOSTNAME},即本機的環境變量,若是要使用本機的環境變量,那麼須要在啓動命令上加--alow-env。
    add_field => {"log_hostname"=>"${HOSTNAME}"}
    #這個值默認是\n 換行符,若是設置爲空"",那麼後果是每一個字符表明一個event
    delimiter => ""
    #這個表示關閉超過(默認)3600秒後追蹤文件。這個對於multiline來講特別有用。... 這個參數和logstash對文件的讀取方式有關,兩種方式read tail,若是是read
    close_older => 3600
    coodec => multiline {
      pattern => "^\s"
      #這個negate是否認的意思,意思跟pattern相反,也就是不知足patter的意思。
#      negate => ""
      #what有兩個值可選 previous和next,舉例說明,java的異常從第二行以空格開始,這裏就能夠pattern匹配空格開始,what設置爲previous意思是空格開頭這行跟上一行屬於同一event。另外一個例子,有時候一條命令太長,當以\結尾時表示這行屬於跟下一行屬於同一event,這時須要使用negate=>true,what=>'next'。
      what => "previous"
      auto_flush_interval => 60
    }
  }
  file { 
    type => "methodserver-log" 
    path => "/apphome/ptc/Windchill_10.0/Windchill/logs/MethodServer-1604221021-32380.log" 
    start_position => beginning 
    sincedb_path => "/opt/logstash-2.3.1/sincedb_path/methodserver_process"
#    ignore_older => 604800
  }
}
filter{
  #執行ruby程序,下面例子是將日期轉化爲字符串賦予daytag
  ruby {
    code => "event['daytag'] = event.timestamp.time.localtime.strftime('%Y-%m-%d')"
  }
  # if [path] =~ "access" {} else if [path] =~ "methodserver" {} else if [path] =~ "servermanager" {} else {} 注意語句結構
  if [path] =~ "MethodServer" { #z這裏的=~是匹配正則表達式
    grok {
      patterns_dir => ["/opt/logstash-2.3.1/patterns"] #自定義正則匹配
#      Tue 4/12/16 14:24:17: TP-Processor2: hirecode---->77LS
      match => { "message" => "%{DAY:log_weekday} %{DATE_US:log_date} %{TIME:log_time}: %{GREEDYDATA:log_data}"}
    }
    #mutage是作轉換用的
    mutate { 
      replace => { "type" => "apache" } #替換屬性值
      convert => { #類型轉換
        "bytes" => "integer" #例如還有float
        "duration" => "integer"
        "state" => "integer"
      }
    #date主要是用來處理文件內容中的日期的。內容中讀取的是字符串,經過date將它轉換爲@timestamp。參考https://www.elastic.co/guide/en/logstash/current/plugins-filters-date.html#plugins-filters-date-match
#    date {
#      match => [ "logTime" , "dd/MMM/yyyy:HH:mm:ss Z" ]
#    }
  }else if [type] in ['tbg_qas','mbg_pre'] { # if ... else if ... else if ... else結構
  }else {
    drop{} # 將event丟棄
  }
}
output {
  stdout{ codec=>rubydebug} # 直接輸出,調試用起來方便
  # 輸出到redis
  redis {
    host => '10.120.20.208'
    data_type => 'list'
    key => '10.99.201.34:access_log_2016-04'
  }
  # 輸出到ES
  elasticsearch {
    hosts =>"192.168.0.15:9200"
    index => "%{sysid}_%{type}"
    document_type => "%{daytag}"
  }
}

下面是兩個真實的例子,第一個是從應用到redis,第二個是從redis到ES。ruby

input {
  file {
    type => "log_raw_data"
    path => "/apphome/ptc/Windchill_10.0/Windchill/logs/gc/*GC.log"
    start_position => end
    sincedb_path => "/opt/logstash-2.3.1/sincedb_path/log_progress"
#    ignore_older => 604800
    add_field => {"sysid"=>"tbg_qas"}
  }
  file {
    type => "log_raw_data"
    path => ["/apphome/ptc/Windchill_10.0/Windchill/logs/*MethodServer*.log","/apphome/ptc/Windchill_10.0/Windchill/logs/ServerManager-*.log"]
    start_position => end
    sincedb_path => "/opt/logstash-2.3.1/sincedb_path/log_progress"
#    ignore_older => 604800
    add_field => {"sysid"=>"tbg_qas"}
    close_older => 60
    codec => multiline {
#      patterns_dir => ["D:/app/logstash-2.3.1/patterns"]
      pattern => "^%{DAY} %{DATESTAMP}:"
      negate => true
      what => "previous"
#      auto_flush_interval => 20
    }
  }
}
output {
#  stdout{ codec=>rubydebug}
  redis {
    host => '10.120.20.208' 
    data_type => 'list' 
    key => 'log_raw_data' 
  } 
  redis {
    host => '10.120.31.142' 
    data_type => 'list' 
    key => 'log_raw_data' 
  } 
}
input {
  redis {
    host => "localhost"
    data_type => "list"
    port => "6379"
    key => "log_raw_data"
    type => "redis-input"
  }
}
filter{
  ruby {
    code => "event['daytag'] = event.timestamp.time.localtime.strftime('%Y-%m-%d')"
  }
  if [path] =~ "access" {
    grok {
      match => { "message" => "%{IPORHOST:clientip} %{HTTPDUSER:ident} %{USER:username} \[%{HTTPDATE:logtime}\] \"%{WORD:verb} %{NOTSPACE:request} (?:%{NOTSPACE:httpversion}|)\" (?:%{NUMBER:state}|-) (?:%{NUMBER:bytes}|-) %{NUMBER:duration}"}
    }
    mutate { 
      replace => { "type" => "apache" } 
      convert => {
        "bytes" => "integer"
        "duration" => "integer"
        "state" => "integer"
      }
    }
    date {
      match => [ "logtime" , "dd/MMM/yyyy:HH:mm:ss Z" ]
    }
  }else if [path] =~ ".*ServerManager.*GC\.log" {
    if [message] =~ "\[Full GC" {
      grok {
        match => {"message" => "%{TIMESTAMP_ISO8601:logtime}: %{GREEDYDATA:gcdetail} \[Times: user=%{BASE10NUM:usertime} sys=%{BASE10NUM:systime}, real=%{BASE10NUM:realtime} secs\]"}
      }
      date {
        match => ["logtime" , "yyyy-MM-dd'T'HH:mm:ss.SSS'+0800'"]
      }
    }else if [message] =~ "\[GC" {
      grok {
        match => {"message" => "%{TIMESTAMP_ISO8601:logtime}: %{GREEDYDATA:gcdetail} \[Times: user=%{BASE10NUM:usertime} sys=%{BASE10NUM:systime}, real=%{BASE10NUM:realtime} secs\]"}
      }
      date {
        match => ["logtime" , "yyyy-MM-dd'T'HH:mm:ss.SSS'+0800'"]
      }
    }else{
      drop {}
    }
    mutate {
      replace => {"type" => "smgc" }
      convert => {
        "usertime" => "float"
        "systime" => "float"
        "realtime" => "float"
      }
    }
  }else if [path] =~ ".*MethodServer.*GC\.log" {
    if [message] =~ "\[Full GC" {
      grok {
        match => {"message" => "%{TIMESTAMP_ISO8601:logtime}: %{GREEDYDATA:gcdetail} \[Times: user=%{BASE10NUM:usertime} sys=%{BASE10NUM:systime}, real=%{BASE10NUM:realtime} secs\]"}
      }
      date {
        match => ["logtime" , "yyyy-MM-dd'T'HH:mm:ss.SSS'+0800'"]
      }
    }else if [message] =~ "\[GC" {
      grok {
        match => {"message" => "%{TIMESTAMP_ISO8601:logtime}: %{GREEDYDATA:gcdetail} \[Times: user=%{BASE10NUM:usertime} sys=%{BASE10NUM:systime}, real=%{BASE10NUM:realtime} secs\]"}
      }
      date {
        match => ["logtime" , "yyyy-MM-dd'T'HH:mm:ss.SSS'+0800'"]
      }
    }else{
      drop {}
    }
    mutate {
      replace => {"type" => "msgc" }
      convert => {
        "usertime" => "float"
        "systime" => "float"
        "realtime" => "float"
      }
    }
  }else if [path] =~ "MethodServer" {
    grok {
      match => { "message" => "%{DAY:weekday} %{DATESTAMP:logtime}: %{GREEDYDATA:logdata}"}
    }
    date {
      match => [ "logtime" , "M/d/yy HH:mm:ss" ]
    }
    mutate { replace => { "type" => "ms" } }
  }else if [path] =~ "ServerManager" {
    grok {
      match => { "message" => "%{DAY:weekday} %{DATESTAMP:logtime}: %{GREEDYDATA:logdata}"}
    }
    date {
      match => [ "logtime" , "M/d/yy HH:mm:ss" ]
    }
    mutate { replace => { "type" => "sm" } }
  }else if [path] =~ "Process_Archive" {
    grok {
      patterns_dir => ["/opt/logstash-2.3.1/patterns"]
      match => { "message" => "%{PROCESS_DATETIME:logtime} %{GREEDYDATA:logdata}"}
    }
    date {
      match => [ "logtime" , "yyyy MMM dd HH:mm:ss:SSS 'GMT +8'" ]
    }
    mutate { replace => { "type" => "prc_arc" } }
  }else if [path] =~ "ESISAPAdapterConfiguration" {
    grok {
      patterns_dir => ["/opt/logstash-2.3.1/patterns"]
      match => { "message" => "%{PROCESS_DATETIME:logtime} %{GREEDYDATA:logdata}"}
    }
    date {
      match => [ "logtime" , "yyyy MMM dd HH:mm:ss:SSS 'GMT +8'" ]
    }
    mutate { replace => { "type" => "esi_adp" } }
  }else if [path] =~ "LenovoAdapterConfiguration" {
    grok {
      patterns_dir => ["/opt/logstash-2.3.1/patterns"]
      match => { "message" => "%{PROCESS_DATETIME:logtime} %{GREEDYDATA:logdata}"}
    }
    date {
      match => [ "logtime" , "yyyy MMM dd HH:mm:ss:SSS 'GMT +8'" ]
    }
    mutate { replace => { "type" => "le_adp" } }
  }else {
    mutate { replace => { "type" => "other" } }
#    drop {}
  }
#  extractnumbers {
#    source => "duration"
#  }
}
output {
#  stdout{ codec=>rubydebug}
  elasticsearch {
    hosts =>"192.168.0.15:9200"
    index => "%{sysid}_%{type}"
    document_type => "%{daytag}"
  }
}
相關文章
相關標籤/搜索