logstash解析嵌套json格式數據
一、源文件
1.原日誌文件爲git
2019-10-28 09:49:44:947 [http-nio-8080-exec-23] INFO [siftLog][qewrw123ffwer2323fdsafd] - logTime:2019-10-28 09:49:25.833-receiveTime:2019-10-28 09:49:44.044-{"area":"","frontInitTime":0,"initiatePaymentMode":"plugin_manual","network":"電信","os":"Microsoft Windows 7","payStatus":"1","reqs":[{"curlCode":"0","end":"2019-10-28 09:49:25.233","errorCode":"","errorDesc":"","totalTime":2153}],"settleAccountsTime":0}
在這裏咱們須要先把json前面一段的正則寫出來,因爲這些數據在實際生產沒什麼實際意義,因此沒重點分字段json
DATETIME %{YEAR}-%{MONTHNUM}-%{MONTHDAY}[T ]%{HOUR}:?%{MINUTE}(?::?%{SECOND})?
ACCESSLOG %{DATETIME:logTime} \[%{DATA:threadName}\] %{DATA:loglevel} \[%{DATA:logType}\]\[%{DATA:appId}\] - logTime:%{DATETIME:logTime2}-receiveTime:%{DATETIME:receiveTime}-%{GREEDYDATA:jsonMsg}bootstrap
這個文件json中間還嵌套了一個json,因此須要把裏面嵌套的json在拿出來解析,故logstash配置文件應該寫成 tomcat
input { kafka { #bootstrap_servers => "kafka-service.ops:9092" bootstrap_servers => "172.27.27.220:9092,172.27.27.221:9092,172.27.27.222:9092" topics => ["test-grok"] codec => "json" type => "test-grok" } } filter { if [type] == "test-grok" { grok { patterns_dir => [ "/opt/appl/logstash/patterns" ] match => { "message" => "%{ACCESSLOG}" } } mutate { gsub => [ "jsonMsg","\[","" ] gsub => [ "jsonMsg","\]","" ] } json { source => "jsonMsg" } mutate { add_field => { "reqs_json" => "%{reqs}" } } json { source => "reqs_json" remove_field => ["reqs","reqs_json","message","jsonMsg"] } } ruby { code => "event.timestamp.time.localtime" } } output { elasticsearch { hosts => ["172.27.27.220:9200","172.27.27.221:9200","172.27.27.222:9200"] index => "logstash-test-grok-%{+YYYY.MM.dd}" template_overwrite => true } }
2.原日誌文件爲 ruby
[2019-10-28 10:01:01.169] [Thread-13086] INFO [192.168.2.1, 192.168.1.1, 192.168.1.2_1572_smallTrade] [INTERFACE] - [HTTP] [request] - {"latitude":"","cardCode":"","memberCouponNo":"","transAmount":"900","hbFqNum":"","confirmCode":"9357","couponAmount":"","lastCost":"2360","memberMobile":"","timestamp":"1572228060000","longitude":""}
日誌只須要取到有lastCost這個關鍵字的,因此filebeat配置應該爲 bash
- type: log enabled: true paths: - /opt/appl/tomcat/logs/test/test.log include_lines: ['.*lastCost.*'] tail_files: true fields: type: interface log_module: test-interface output.kafka: enabled: true hosts: ["172.27.27.220:9092,172.27.27.221:9092,172.27.27.222:9092"] topic: '%{[fields][type]}'
因爲研發同事把客戶端的IP加到了第一個第四個字段的第一個IP,因此要把這個IP單獨拿出來分析app
DATETIME %{YEAR}-%{MONTHNUM}-%{MONTHDAY}[T ]%{HOUR}:?%{MINUTE}(?::?%{SECOND})?curl
input { kafka { bootstrap_servers => "172.27.27.220:9092,172.27.27.221:9092,172.27.27.222:9092" topics => ["interface"] codec => "json" type => "test-interface" } } filter { if [type] == "test-interface" { grok { patterns_dir => [ "/opt/logstash/patters" ] match => { "message" => "\[%{DATETIME:log_timestamp}\] \[%{DATA:ThreadName}\] %{LOGLEVEL:logLevel} \[%{DATA:IP}\] \[%{DATA:InterfaceTag}\] - \[%{DATA:Protocol}\] \[%{DATA:LogType}\] - %{GREEDYDATA:jsonMsg2}" } } json { source => "jsonMsg2" remove_field => [ "jsonMsg2","message" ] } mutate { convert => [ "lastCost","float" ] split => ["IP",", "] add_field => { "clientIp" => "%{[IP][0]}" } add_field => { "proxyIp" => "%{[IP][1]}" } add_field => { "time" => "%{[IP][2]}" } } geoip { source => "clientIp" #database => "/opt/logstash-interface/Geoip/GeoLite2-City_20191022/GeoLite2-City.mmdb" } } ruby { code => "event.timestamp.time.localtime" } } output { elasticsearch { hosts => ["172.27.27.220:9200","172.27.27.221:9200","172.27.27.222:9200"] index => "logstash-test-interface-%{+YYYY.MM.dd}" template_overwrite => true } }