如下是實例vim

原始數據:api

{"countnum":2,"checktime":"2017-05-23 16:59:32"}ruby

{"countnum":2,"checktime":"2017-05-23 16:59:32"}  bash

 

 

一、無涉及字段類型轉換   logstash filter  配置以下參數便可

if [type] == "onlinecount" {

       json{

    source => "message"

     }

  }

 

 

二、涉及字段類型轉換

logstash filter  

 

if [type] == "onlinecount" {

mutate{

split=>["message",","]

add_field => {

"coutnum" => "%{[message][0]}"

}

add_field => {

"checktime" => "%{[message][1]}"

}

remove_field => ["message"]

}

json{

source => "coutnum"

source => "checktime"

#convert => { "coutnum" => "integer" }

target => "coutnum"

target => "checktime"

}

}

 

 


 

kafka數據:{
{"cluster":"qy_api_v2_pool","body_bytes_sent":"8579","http_versioncode":"Android_32"}\n
{"cluster":"qy_api_v2_pool","body_bytes_sent":"8579","http_versioncode":"Android_33"}\n
{"cluster":"qy_api_v2_pool","body_bytes_sent":"8579","http_versioncode":"Android_34"}\n
....
}
 

kafka團隊因考慮性能問題,將原始日誌多條合併一條發送(每一條用換行符分割),這樣我讀的kafka就必須拆成一條一條的寫入到ES,否則數據就不許確了,請問這種需求該如何處理呢?

已解決,開始走了彎路,用的下列方法致使還在一條數據
filter {
      mutate {
              split=>["message","
"]
      }


正解方案
filter {
        split {
                        field => "message"
               }


 
還有一個小問題split中terminator默認是\n,可是我以下寫法爲何切割不成功呢,不寫terminator是能夠的
filter {
        split {
                        field => "message"
                        terminator => "\\n"
               }
 


 

現有json:

{
"name":"zhangsan",
"friends":
{
"friend1":"lisi",
"friend2":"wangwu",
"msg":["haha","yaya"]
}
}
1
2
3
4
5
6
7
8
9
將其解析爲:

{
"name":"zhangsan",
"friend1":"lisi",
"friend2":"wangwu",
"msg":["haha","yaya"]
}
1
2
3
4
5
6
logstash.conf

input
{
stdin
{
codec => json
}
}

filter
{
mutate
{
add_field => { "@friends" => "%{friends}" } #先新建一個新的字段,並將friends賦值給它
}
json
{
source => "@friends" #再進行解析
remove_field => [ "@alert","alert" ] #刪除沒必要要的字段,也能夠不用這語句
}
}

output
{
stdout { }
}
---------------------
做者:姚賢賢
來源:CSDN
原文:https://blog.csdn.net/u011311291/article/details/86743642
版權聲明:本文爲博主原創文章,轉載請附上博文連接!


 

因爲咱們的埋點日誌是嵌套json類型,要想最終全部字段展開來統計分析就必須把嵌套json展開。

  1. 日誌格式以下:
2019-01-22 19:25:58 172.17.12.177 /statistics/EventAgent appkey=yiche&enc=0&ltype=view&yc_log={"uuid":"73B333EB-EC87-4F9F-867B-A9BF38CBEBB2","mac":"02:00:00:00:00:00","uid":-1,"idfa":"2BFD67CF-ED60-4CF6-BA6E-FC0B18FDDDF8","osv":"iOS11.4.1","fac":"apple","mdl":"iPhone SE","req_id":"360C8C43-73AC-4429-9E43-2C08F4C1C425","itime":1548156351820,"os":"2","sn_id":"6B937D83-BFB2-4C22-85A8-5B3E82D9D0F1","dvid":"3676b52dc155e1eec3ca514f38736fd6","aptkn":"4fb9b2bffb808515aa0e9a5f5b17d826769e432f63d5cf87f7fb5ce4d67ef9f1","cha":"App Store","idfv":"B1EAD56F-E456-4FF2-A3C2-9A8FA0693C22","nt":4,"lg_vl":{"pfrom":"shouye","ptitle":"shouye"},"av":"10.3.3"} 218.15.255.124 200 
  1. 最開始Logstash的配置文件以下:
input {
  file {
    path => ["/data/test_logstash.log"] type => ["nginx_log"] start_position => "beginning" } } filter { if [type] =~ "nginx_log" { grok { match => { "message" => "%{TIMESTAMP_ISO8601:create_time} %{IP:server_ip} %{URIPATH:uri} %{GREEDYDATA:args} %{IP:client_ip} %{NUMBER:status}" } } urldecode{ field =>args } kv { source =>"args" field_split =>"&" remove_field => [ "args","@timestamp","message","path","@version","path","host" ] } json { source => "yc_log" remove_field => [ "yc_log" ] } } } output { stdout { codec => rubydebug } } 

按照以上配置文件運行Logstash獲得的結果以下:

{
      "server_ip" => "172.17.12.177", "cha" => "App Store", "mdl" => "iPhone SE", "type" => "nginx_log", "mac" => "02:00:00:00:00:00", "ptitle" => "shouye", "appkey" => "yiche", "idfv" => "B1EAD56F-E456-4FF2-A3C2-9A8FA0693C22", "sn_id" => "6B937D83-BFB2-4C22-85A8-5B3E82D9D0F1", "aptkn" => "4fb9b2bffb808515aa0e9a5f5b17d826769e432f63d5cf87f7fb5ce4d67ef9f1", "av" => "10.3.3", "os" => "2", "idfa" => "2BFD67CF-ED60-4CF6-BA6E-FC0B18FDDDF8", "uid" => -1, "uuid" => "73B333EB-EC87-4F9F-867B-A9BF38CBEBB2", "req_id" => "360C8C43-73AC-4429-9E43-2C08F4C1C425", "status" => "200", "uri" => "/statistics/EventAgent", "enc" => "0", "ltype" => "view", "lg_vl" => { "ptitle" => "shouye", "pfrom" => "shouye" }, "nt" => 4, "pfrom" => "shouye", "itime" => 1548156351820, "client_ip" => "218.15.255.124", "create_time" => "2019-01-22 19:25:58", "dvid" => "3676b52dc155e1eec3ca514f38736fd6", "fac" => "apple", "lg_value" => "{\"pfrom\":\"shouye\",\"ptitle\":\"shouye\"}", "osv" => "iOS11.4.1" } 

能夠看到lg_vl字段仍然是json格式,沒有解析出來。若是直接在配置文件中添加

json { source => "lg_vl" } 

會報jsonParseException錯。

  1. 正確作法
input {
  file {
    path => ["/data/test_logstash.log"] type => ["nginx_log"] start_position => "beginning" } } filter { if [type] =~ "nginx_log" { grok { match => { "message" => "%{TIMESTAMP_ISO8601:create_time} %{IP:server_ip} %{URIPATH:uri} %{GREEDYDATA:args} %{IP:client_ip} %{NUMBER:status}" } } urldecode{ field =>args } kv { source =>"args" field_split =>"&" remove_field => [ "args","@timestamp","message","path","@version","path","host" ] } json { source => "yc_log" remove_field => [ "yc_log" ] } mutate { add_field => { "lg_value" => "%{lg_vl}" } } json { source => "lg_value" remove_field => [ "lg_vl","lg_value" ] } } } output { stdout { codec => rubydebug } } 

在解析完上一層json以後添加一個字段lg_value,再將lg_vl的內容賦值給lg_value;以後單獨對lg_value進行json解析就能夠了。解析完結果以下:

{
           "type" => "nginx_log", "nt" => 4, "dvid" => "3676b52dc155e1eec3ca514f38736fd6", "os" => "2", "fac" => "apple", "ltype" => "view", "client_ip" => "218.15.255.124", "itime" => 1548156351820, "mac" => "02:00:00:00:00:00", "idfa" => "2BFD67CF-ED60-4CF6-BA6E-FC0B18FDDDF8", "uri" => "/statistics/EventAgent", "aptkn" => "4fb9b2bffb808515aa0e9a5f5b17d826769e432f63d5cf87f7fb5ce4d67ef9f1", "sn_id" => "6B937D83-BFB2-4C22-85A8-5B3E82D9D0F1", "create_time" => "2019-01-22 19:25:58", "osv" => "iOS11.4.1", "req_id" => "360C8C43-73AC-4429-9E43-2C08F4C1C425", "ptitle" => "shouye", "av" => "10.3.3", "server_ip" => "172.17.12.177", "pfrom" => "shouye", "enc" => "0", "mdl" => "iPhone SE", "cha" => "App Store", "idfv" => "B1EAD56F-E456-4FF2-A3C2-9A8FA0693C22", "uid" => -1, "uuid" => "73B333EB-EC87-4F9F-867B-A9BF38CBEBB2", "appkey" => "yiche", "status" => "200" } 

完美,棒棒噠!!!

 


做者:神祕的寇先森
連接:https://www.jianshu.com/p/de06284e1484
來源:簡書
簡書著做權歸做者全部,任何形式的轉載都請聯繫做者得到受權並註明出處。

Logstash替換字符串,解析json數據,修改數據類型,獲取日誌時間

 

在某些狀況下,有些日誌文本文件類json,但它的是單引號,具體格式以下,咱們須要根據下列日誌數據,獲取正確的字段和字段類型

{'usdCnyRate': '6.728', 'futureIndex': '463.36', 'timestamp': '1532933162361'} {'usdCnyRate': '6.728', 'futureIndex': '463.378', 'timestamp': '1532933222335'} {'usdCnyRate': '6.728', 'futureIndex': '463.38', 'timestamp': '1532933348347'} {'usdCnyRate': '6.728', 'futureIndex': '463.252', 'timestamp': '1532933366866'} {'usdCnyRate': '6.728', 'futureIndex': '463.31', 'timestamp': '1532933372350'} {'usdCnyRate': '6.728', 'futureIndex': '463.046', 'timestamp': '1532933426899'} {'usdCnyRate': '6.728', 'futureIndex': '462.806', 'timestamp': '1532933432346'} {'usdCnyRate': '6.728', 'futureIndex': '462.956', 'timestamp': '1532933438353'} {'usdCnyRate': '6.728', 'futureIndex': '462.954', 'timestamp': '1532933456796'} {'usdCnyRate': '6.728', 'futureIndex': '462.856', 'timestamp': '1532933492411'} {'usdCnyRate': '6.728', 'futureIndex': '462.776', 'timestamp': '1532933564378'} {'usdCnyRate': '6.728', 'futureIndex': '462.628', 'timestamp': '1532933576849'} {'usdCnyRate': '6.728', 'futureIndex': '462.612', 'timestamp': '1532933588338'} {'usdCnyRate': '6.728', 'futureIndex': '462.718', 'timestamp': '1532933636808'}

此時咱們若是當json直接用logstash Json filter plugin來解析會以下報錯

[WARN ] 2018-07-31 10:20:12.708 [Ruby-0-Thread-5@[main]>worker1: :1] json - Error parsing json {:source=>"message", :raw=>"{'usdCnyRate': '6.728', 'futureIndex': '462.134', 'timestamp': '1532933714371'}", :exception=>#<LogStash::Json::ParserError: Unexpected character (''' (code 39)): was expecting double-quote to start field name at [Source: (byte[])"{'usdCnyRate': '6.728', 'futureIndex': '462.134', 'timestamp': '1532933714371'}"; line: 1, column: 3]>}

此處我認爲簡單的作法是替換單引號爲雙引號,替換過程應用了logstash mutate gsub
必定要看清楚我10-12行的寫法,做用爲替換字符串,14-15行爲解析json。咱們還須要將usdCnyRate和futureIndex轉爲float類型(18-21行),將timestamp轉爲時間類型,並從新定義一個logdate來存儲(23-25行)此處用到
logstash date filter plugin

input{
    file {
        path => "/usr/share/logstash/wb.cond/test.log" start_position => "beginning" sincedb_path => "/dev/null" } } filter{ mutate { gsub =>[ "message", "'", '"' ] } json { source => "message" } mutate { convert => { "usdCnyRate" => "float" "futureIndex" => "float" } } date { match => [ "timestamp", "UNIX_MS" ] target => "logdate" } } output{ stdout{ codec=>rubydebug } }

利用上述配置文件,咱們能正確解析出日誌文件的字段和類型

{
        "message" => "{\"usdCnyRate\": \"6.728\", \"futureIndex\": \"463.378\", \"timestamp\": \"1532933222335\"}", "@timestamp" => 2018-07-31T10:48:48.600Z, "host" => "logstashvm0", "path" => "/usr/share/logstash/wb.cond/test.log", "@version" => "1", "logdate" => 2018-07-30T06:47:02.335Z, "usdCnyRate" => 6.728, "timestamp" => "1532933222335", "futureIndex" => 463.378 } { "message" => "{\"usdCnyRate\": \"6.728\", \"futureIndex\": \"463.252\", \"timestamp\": \"1532933366866\"}", "@timestamp" => 2018-07-31T10:48:48.602Z, "host" => "logstashvm0", "path" => "/usr/share/logstash/wb.cond/test.log", "@version" => "1", "logdate" => 2018-07-30T06:49:26.866Z, "usdCnyRate" => 6.728, "timestamp" => "1532933366866", "futureIndex" => 463.252 } { "message" => "{\"usdCnyRate\": \"6.728\", \"futureIndex\": \"463.31\", \"timestamp\": \"1532933372350\"}", "@timestamp" => 2018-07-31T10:48:48.602Z, "host" => "logstashvm0", "path" => "/usr/share/logstash/wb.cond/test.log", "@version" => "1", "logdate" => 2018-07-30T06:49:32.350Z, "usdCnyRate" => 6.728, "timestamp" => "1532933372350", "futureIndex" => 463.31 }
歡迎轉載,註明出處。有任何問題和建議,歡迎留言討論,也能夠發我郵箱wenbya@outlook.com