logstash採集與清洗數據到elasticsearch案例實戰

原文地址:https://www.2cto.com/kf/201610/560348.htmlcss

Logstash的使用

logstash支持把配置寫入文件 xxx.conf,而後經過讀取配置文件來採集數據
./bin/logstash –f xxx.conf

logstash最終會把數據封裝成json類型,默認會添加@timestamp時間字段、host主機字段、type字段。原消息數據會整個封裝進message字段。若是數據處理過程當中,用戶解析添加了多個字段,則最終結果又會多出多個字段。也能夠在數據處理過程當中移除多個字段,總之,logstash最終輸出的數據格式是json格式。html

Logstash的結構

Logstash由 input,filter,output三個組件去完成採集數據
以下是一個logstash的配置實例:java

input {
    file {
        type => "log"
        path => "/log/*/*.log"
        discover_interval => 10
        start_position => "beginning"
    }
}
filter {
} 
output {
    elasticsearch {
    index => "log-%{+YYYY.MM.dd}"
    hosts => ["172.16.0.14:9200", "172.16.0.15:9200", "172.16.0.16:9200"]
    }
    stdout {codec => rubydebug}
}

input

input組件負責讀取數據,能夠採用file插件讀取本地文本文件,stdin插件讀取標準輸入數據,tcp插件讀取網絡數據,log4j插件讀取log4j發送過來的數據等等。web

filter

filter插件負責過濾解析input讀取的數據,能夠用grok插件正則解析數據,date插件解析日期,json插件解析json等等。正則表達式

output

output插件負責將filter處理過的數據輸出。能夠用elasticsearch插件輸出到es,rediss插件輸出到redis,stdout插件標準輸出,kafka插件輸出到kafka等等
trade.log日誌採集。redis

trade.log日誌採集

配置內容以下:json

input {
    file {
        type => "tradelog"
        path => "/home/elk/his/trade.log*"
        discover_interval => 5
        start_position => "beginning"
         
        sincedb_path => "/home/elk/myconf/sincedb_trade.txt"
        sincedb_write_interval => 15
         
        codec => plain { charset => "GB2312" }
    }    
}
 
filter {
    grok {
        match => { "message" => "%{DATESTAMP_CN:[@metadata][logdate]} .* - %{WORD:opeType}\|%{WORD:name}\|Oid: %{WORD:oid}\|IP: %{IP:ip}\|MAC: %{GREEDYDATA:mac}\|%{WORD:result}\|%{GREEDYDATA:exception}\|" }    
        match => { "message" => "%{DATESTAMP_CN:[@metadata][logdate]} .* - %{WORD:opeType}\|%{WORD:name}\|Oid: %{WORD:oid}\|IP: %{IP:ip}\|MAC: %{GREEDYDATA:mac}\|%{WORD:result}\|"  }
        match => { "message" => "%{DATESTAMP_CN:[@metadata][logdate]} .* - %{WORD:opeType}\|%{WORD:name}\|Oid: %{WORD:oid}\|IP: %{IP:ip}\|MAC: %{GREEDYDATA:mac}\|"  }    
        match => { "message" => "%{DATESTAMP_CN:[@metadata][logdate]} .* - %{WORD:opeType}\|IP: %{IP:ip}\|MAC: %{GREEDYDATA:mac}\|%{WORD:result}\|"  }
        match => { "message" => "%{DATESTAMP_CN:[@metadata][logdate]} .* - %{WORD:opeType}\|IP: %{IP:ip}\|MAC: %{GREEDYDATA:mac}\|" }
        remove_field  => "message"
    }    
    date {
        match => ["[@metadata][logdate]", "YYYY-MM-dd HH:mm:ss,SSS"]
    }    
}
 
output {
    if "_grokparsefailure" not in [tags] and "_dateparsefailure" not in [tags] {
        stdout {codec => rubydebug}
         
        elasticsearch {
            index => "log4j-tradelog"
            hosts => ["168.7.1.67:9200"]
            manage_template => true
            template_overwrite => true
            template_name => "log4j-tradelog"
            template => "/home/elk/myconf/tradelog_template.json"
        }
    }    
}

input

1. start_position:設置beginning保證從文件開頭讀取數據。
2. path:填入文件路徑。
3. type:自定義類型爲tradelog,由用戶任意填寫。
4. codec:設置讀取文件的編碼爲GB2312,用戶也能夠設置爲UTF-8等等
5. discover_interval:每隔多久去檢查一次被監聽的 path 下是否有新文件,默認值是15秒
6. sincedb_path:設置記錄源文件讀取位置的文件,默認爲文件所在位置的隱藏文件。
7. sincedb_write_interval:每隔15秒記錄一下文件讀取位置瀏覽器

filter

日誌格式以下:ruby

2016-05-09 09:49:13,817 [] [ACTIVE] ExecuteThread: '1' for queue: 'weblogic.kernel.Default (self-tuning)' [INFO ] com.c.command.StartLogCommand.execute(StartLogCommand.java:46) - FrontPageFinProdListQry|IP: 192.168.1.105|MAC: A1345C05-26C1-4263-8845-01CFCA6EC4FD|
2016-05-09 09:49:13,928 [] [ACTIVE] ExecuteThread: '2' for queue: 'weblogic.kernel.Default (self-tuning)' [INFO ] com.c.command.EndLogCommand.execute(EndLogCommand.java:44) - FrontPageAdvertListQry|IP: 192.168.1.105|MAC: A1245C05-26C1-4263-8845-01CFCA6EC4FD|Success|

grok插件

由於該日誌中有5種格式以下,以最後幾個我須要的字段爲例說明:網絡

交易名|登陸名|編號|ip地址|mac地址|返回結果|異常信息
交易名|登陸名|編號|ip地址|mac地址|返回結果|
交易名|登陸名|編號|ip地址|mac地址|
交易名|ip地址|mac地址|返回結果|
交易名|ip地址|mac地址|

因此採用5種正則規則去匹配,logstash默認會從上到下按規則去匹配,直到匹配上爲止。(日誌中的多行錯誤信息,匹配不上,logstash會在tags字段添加」_ grokparsefailure」,因此後面輸出的時候會用if條件判斷過濾掉解析失敗的行消息)


注意:5種正則規則的上下順序,下面的規則放在上面會致使可能內容解析不全,好比源數據是:請求交易名|操做員登陸名|操做員編號|ip地址|mac地址|返回結果|異常信息,若是按照「請求交易名|ip地址|mac地址|」規則去匹配,只能識別出3個字段,並且匹配成功,不繼續往下執行,這樣識別的內容就不全。

logstash內置了不少正則匹配規則,用戶能夠直接調用這些規則來解析,例如%{WORD:result} 表示調用WORD規則(即識別字符串規則)來解析並最後賦值給result字段(result字段會自動建立)。
下面以第一條match規則爲例來講明:

match => { "message" => "%{DATESTAMP_CN:[@metadata][logdate]} .* - %{WORD:opeType}\|%{WORD:name}\|Oid: %{WORD:oid}\|IP: %{IP:ip}\|MAC: %{GREEDYDATA:mac}\|%{WORD:result}\|%{GREEDYDATA:exception}\|" }

首先行首使用DATESTAMP_CN規則來識別時間,並賦值給logdate字段名;而後.*識別任意字符串(.表明任意一個字符,包括特殊字符,*表明個數是任意個);而後使用WORD規則(即匹配字符串規則,不包含特殊字符)識別到字符串並賦值給opeType字段;後面同理。這些WORD、IP、GREEDYDATA規則都是logstash內部grok-patterns文件已經定義好了的規則。用戶能夠直接拿來使用。

注意:[@metadata]表示logdate這個字段在數據處理過程當中只是一個臨時字段,最後不會真的輸出。避免了使用remove_field手動移除字段。

注意:logstash默認不支持」YYYY-MM-dd HH:mm:ss,SSS」格式的時間匹配,須要本身定義正則表達式到logstash-2.3.1/vendor/bundle/jruby/1.9/gems/logstash-patterns-core-2.0.5/patterns/grok-patterns文件中。grok-patterns文件中追加2行內容:

DATE_CN %{YEAR}[./-]%{MONTHNUM}[./-]%{MONTHDAY}
DATESTAMP_CN %{DATE_CN} %{TIME}

注意:logstash的正則表達式採用ruby語言正則表達式,具體語法能夠參考網上。

remove_field => "message"表示解析完成以後刪除原來的 message字段,避免重複。

date插件

match => ["[@metadata][logdate]", "YYYY-MM-dd HH:mm:ss,SSS"]

logstash默認的時間字段是@timestamp,若是不設置的話,默認是數據採集時候的時間,這裏咱們將日誌打印的時間(即解析出的logdate字段的內容)設置爲@timestamp內容,方便以後kibana根據時間檢索。

注意:解析出來的@timestamp會比實際時間早8個小時,這是內置utc時間格式問題,kibana頁面展現的時候會根據瀏覽器當前時區自動轉換回來,這裏不做處理。

output

if "_grokparsefailure" not in [tags] and "_dateparsefailure" not in [tags] {
    stdout {codec => rubydebug}
     
    elasticsearch {
        index => "log4j-tradelog"
        hosts => ["134.7.1.67:9200"]
        manage_template => true
        template_overwrite => true
        template => "/home/elk/myconf/tradelog_template.json"
    }
}

前面提到過,若是grok解析失敗,會在tags字段自動添加_grokparsefailure值若是date解析失敗,會在tags字段自動添加_dateparsefailure值。因此最後的輸出,咱們採用條件過濾掉解析失敗的行內容。最終的每一行內容解析成json,一路存入elasticsearch,另外一路進行標準輸出。

elasticsearch插件

index:要導入的es索引
host:es地址,有多個節點配置多個節點
template:指定elasticsearch的mapping模板文件,若是該索引不存在,logstash會根據這個mapping模板去自動建立索引。

stdout插件

rubydebug標準輸出,便於調試,能夠不使用該插件。

最終解析出結果示例以下:

{
      "@version" => "1",
    "@timestamp" => "2016-05-09T01:44:48.366Z",
          "path" => "/home/elk/e.log",
          "host" => "ccc7",
          "type" => "tradelog",
       "opeType" => "WZQry",
          "name" => "lhcsssz2",
           "oid" => "abzzak",
            "ip" => "192.168.44.105",
           "mac" => "A1345C05-26C1-4253-8845-01CFCA8EC4FD",
        "result" => "Success"
}

error.log採集

日誌實例:

2016-09-29 17:13:24,184|ncid=1100343164|oid=acaatv|loginName=zhenglw1|transId=Withdraw|traceId=N/A-_A-88C4D-043|exceptType=com.intenft.exception.AppRTException|exceptCode=CORESYST_TXN_NATIVE_89042|exceptMsg=對不起!記錄沒有找到

配置文件以下:

input {
    file {
        path => "/home/elk/his/error.log*"
        type => "errorlog"
        start_position => "beginning"
        discover_interval => 5
         
        codec => multiline {
            charset => "GB2312"
            pattern => "^%{DATESTAMP_CN}"
            negate => true
            what => "next"       
        }
         
        sincedb_path => "/home/elk/myconf/sincedb_error.txt"
        sincedb_write_interval => 15
    }    
}
 
filter {
    grok {
        match => { "message" => "%{DATESTAMP_CN:[@metadata][logdate]}%{GREEDYDATA:[@metadata][keyvalue]}" }
        remove_field  => "message"
    }    
    date {
        match => ["[@metadata][logdate]", "YYYY-MM-dd HH:mm:ss,SSS"]
    }
    kv {
        source => "[@metadata][keyvalue]"
        field_split => "\|"
        value_split => "="
    }
}
 
output {
    if "multiline" in [tags] {
        stdout {codec => rubydebug}
        elasticsearch {
            index => "log4j-errorlog-3"
            hosts => ["168.7.1.67:9200"]
            manage_template => true
            template_overwrite => true
            template => "/home/elk/myconf/errorlog_template.json"
        }
    }    
}

input

8. start_position:設置beginning保證從文件開頭讀取數據。
9. path:填入文件路徑。
10. type:自定義類型爲tradelog,由用戶任意填寫。
11. codec:multiline插件
12. discover_interval:每隔多久去檢查一次被監聽的 path 下是否有新文件,默認值是15秒
13. sincedb_path:設置記錄源文件讀取位置的文件,默認爲文件所在位置的隱藏文件。
14. sincedb_write_interval:每隔15秒記錄一下文件讀取位置

multiline插件

logstash默認讀取一行內容爲一個消息,由於錯誤日誌包含堆棧信息,多行對應一個消息,因此使用該插件合併多行爲一條消息。
pattern:以」YYYY-MM-dd HH:mm:ss,SSS」格式開頭的匹配爲一條消息。
negate:true 表示正向使用該patttern
what:匹配到的日期屬於下一條消息
charset:設置文件編碼

filter

grok插件

匹配日期到logdata字段,匹配剩下的全部字符串到keyvalue臨時字段,」GREEDYDATA」正則表達式爲」.*」

date插件

match => ["[@metadata][logdate]", "YYYY-MM-dd HH:mm:ss,SSS"]

logstash默認的時間字段是@timestamp,若是不設置的話,默認是數據採集時候的時間,這裏咱們將日誌打印的時間(即解析出的logdate字段的內容)設置爲@timestamp內容,方便以後kibana根據時間檢索。

注意:解析出來的@timestamp會比實際時間早8個小時,這是內置utc時間格式問題,kibana頁面展現的時候會根據瀏覽器當前時區自動轉換回來,這裏不做處理。

kv插件

source:解析前面grok獲取的keyvalue字段

(好比:|ncid=1100783164|oid=acaatv|loginName=zhew1|transId=Withdraw|traceId=N/A-_A-88C4D-043|exceptType=com.inteft.exception.AppRTException|exceptCode=CORESYST_TXN_NATIVE_89042|exceptMsg=對不起!記錄沒有找到)

field_split:按」|」切分key-value對
value_split:按」=」切分key 和 value,最終切分出來key做爲字段名,value做爲字段值

output

output {
    if "multiline" in [tags] {
        stdout {codec => rubydebug}
        elasticsearch {
            index => "log4j-errorlog-3"
            hosts => ["168.7.1.67:9200"]
            manage_template => true
            template_overwrite => true
            template => "/home/elk/myconf/errorlog_template.json"
        }
    }   
}

該日誌有2種格式的日誌,一種是單行的錯誤信息日誌,一種是多行的包含堆棧信息的日誌,這2種日誌內容重複,那麼只須要解析單行格式的日誌。kv插件解析多行格式的日誌時, tags字段裏沒有」multipline」值(緣由是由於grok解析的時候不能解析換行符),因此能夠經過if條件判斷tags字段是否有」multipline」值,來過濾掉多行格式的日誌。

elasticsearch插件

index:要導入的es索引
host:es地址,有多個節點配置多個節點
template:指定elasticsearch的mapping模板文件,若是該索引不存在,logstash會根據這個mapping模板去自動建立索引。

最終解析的結果示例以下:

{
    "@timestamp" => "2016-09-29T09:14:22.194Z",
      "@version" => "1",
          "tags" => [
        [0] "multiline"
    ],
          "path" => "/home/elk/stst.log",
          "host" => "ci7",
          "type" => "sttlog",
          "ncid" => "1143164",
           "oid" => "acav",
     "loginName" => "zhew1",
       "transId" => "MyQuery",
       "traceId" => "N/A8C4E-047",
    "exceptType" => "com.exception.AppRTException",
    "exceptCode" => "CORESYNATIVE_82243",
     "exceptMsg" => "對不起!根據帳號獲取客戶信息錯誤"
}

總結:

注意:

logstash filter中的每個插件都有add_field,remove_field,add_tag,remove_tag 4個功能。

附錄:

mapping模板文件

tradelog:

{
    "template": "log4j-tradelog*",
    "settings": {
        "index.number_of_shards": 3,
        "number_of_replicas": 0
    },
    "mappings": {
        "tradelog": {
            "_all": {
                "enabled": false
            },
            "properties": {
                "@timestamp": {
                    "type": "date",
                    "format": "strict_date_optional_time||epoch_millis",
                    "doc_values": true
                },
                "@version": {
                    "type": "string",
                    "index": "not_analyzed"
                },
                "exception": {
                    "type": "string",
                    "index": "analyzed"
                },
                "path": {
                    "type": "string",
                    "index": "not_analyzed"
                },
                "host": {
                    "type": "string",
                    "index": "not_analyzed"
                },
                "ip": {
                    "type": "ip",
                    "index": "not_analyzed"
                },
                "logger_name": {
                    "type": "string",
                    "index": "not_analyzed"
                },
                "mac": {
                    "type": "string",
                    "index": "not_analyzed"
                },
                "name": {
                    "type": "string",
                    "index": "not_analyzed"
                },
                "oid": {
                    "type": "string",
                    "index": "not_analyzed"
                },
                "opeType": {
                    "type": "string",
                    "index": "not_analyzed"
                },
                "priority": {
                    "type": "string",
                    "index": "not_analyzed"
                },
                "result": {
                    "type": "string",
                    "index": "not_analyzed"
                },
                "type": {
                    "type": "string",
                    "index": "not_analyzed"
                }
            }
        }
    }
}

errorlog:

{
    "template": "log4j-errorlog*",
    "settings": {
        "index.number_of_shards": 3,
        "number_of_replicas": 0
    },
    "mappings": {
        "errorlog": {
            "_all": {
                "enabled": false
            },
            "properties": {
                "host": {
                    "type": "string",
                    "index": "not_analyzed"
                },
                "ncid": {
                    "type": "string",
                    "index": "not_analyzed"
                },
                "type": {
                    "type": "string",
                    "index": "not_analyzed"
                },
                "@version": {
                    "type": "string",
                    "index": "not_analyzed"
                },
                "exceptType": {
                    "type": "string",
                    "index": "not_analyzed"
                },
                "@timestamp": {
                    "format": "strict_date_optional_time||epoch_millis",
                    "type": "date"
                },
                "exceptCode": {
                    "type": "string",
                    "index": "not_analyzed"
                },
                "transId": {
                    "type": "string",
                    "index": "not_analyzed"
                },
                "priority": {
                    "type": "string",
                    "index": "not_analyzed"
                },
                "oid": {
                    "type": "string",
                    "index": "not_analyzed"
                },
                "traceId": {
                    "type": "string",
                    "index": "not_analyzed"
                },
                "exceptMsg": {
                    "type": "string",
                    "index": "analyzed"
                },
                "path": {
                    "type": "string",
                    "index": "not_analyzed"
                },
                "logger_name": {
                    "type": "string",
                    "index": "not_analyzed"
                },
                "loginName": {
                    "type": "string",
                    "index": "not_analyzed"
                }
            }
        }
    }
}
相關文章
相關標籤/搜索