inputs:輸入html
codecs:解碼web
filters:過濾正則表達式
outputs:輸出redis
logstash中支持少許的數據類型sql
bool 類型:debug => true
string類型: host => 「hostname」
number類型: port => 5044
array類型: match => [「datetime」 , 「unitime」]
hash類型: options => { key1 => 「value1」, key2 => 「value2」 }數據庫
logstash中字段引用json
要在 Logstash 配置中使用字段的值,只須要把字段的名字寫在中括號 [] 裏就好了,這就叫字段引用。還需注意字段層次。若是引用的是一個頂級字段,能夠省略[],直接指定字段名。要引用嵌套的字段,須要指定完整的路徑。bootstrap
例如:下面配置中有5個頂級字段(agent、ip、request、response、ua),3個嵌套字段(status、bytes、os)windows
{ "agent": "Mozilla/5.0 (compatible; MSIE 9.0)", "ip": "192.168.24.44", "request": "/index.html" "response": { "status": 200, "bytes": 52353 }, "ua": { "os": "<a href="http://www.ttlsa.com/windows/" title="windows"target="_blank">Windows</a> 7" } }
引用os字段,需指定[ua][os]。引用頂級字段如request,直接指定request便可。安全
條件判斷
表達式直接語法格式
● ==(等於), !=(不等於), <(小於), >(大於), <=(小於等於), >=(大於等於)
● =~(匹配正則), !~(不匹配正則)
● in(包含), not in(不包含)
● and(與), or(或), nand(非與), xor(非或)
● ()(複合表達式), !()(對複合表達式結果取反)
stdin標準輸入
參數 | input type | required | default value |
add_field | hash | NO | { } |
codec | codec | NO | "line" |
tags | array | NO | |
type | string | NO | 自定義 |
file文件輸入
參數 | input type | required | default value |
codec | codec | NO | 默認plain,可經過該參數設置編碼方式 |
discover_interval | number | NO | 隔多久檢查一次path下是否有新文件,默認15秒 |
exclude | array | NO | 排除不想監聽的文件 |
sincedb_path | string | NO | 記錄logstash讀取目標文件位置的文件,可自定義指定 |
stat_interval | number | NO | 隔多久檢查一次被監聽是否有更新,默認1秒 |
start_position | string | NO | 從什麼位置開始讀取文件,默認從結束位置,相似tial -f命令,配置爲"beginning"就會從開頭讀取文件 |
path | array | YES | 處理文件路徑,能夠配置多個 |
tags | array | NO | 在數據處理過程當中,由具體插件來添加或刪除的標記 |
type | string | NO | 自定義數據類型 |
實例:收集用戶操做日誌
配置文件
input { file { path => [ "/var/log/history.log" ] type => "history_log" start_position => "beginning" } } output { stdout{} }
TCP/UDP輸入
TCP
參數 | input type | required | default value |
add_field | hash | NO | { } |
codec | codec | NO | plain |
data_timeout | number | NO | 1 |
host | NO | 0.0.0.0 | |
mode | NO | "server","client"其中之一,默認是server | |
port | number | YES | 端口號 |
ssl_cacert | NO | ||
ssl_cert | NO | ||
ssl_enable | NO | ||
ssk_key | NO | ||
ssl_key_passphrase | NO | ||
ssl_verify | NO | ||
tags | array | NO | |
type | string | NO |
UDP
參數 | input type | required | default value |
add_field | hash | 否 | { } |
host | string | NO | 0.0.0.0 |
port | number | YES | 端口號,監聽數據包 |
tags | array | NO | |
type | string | NO | |
workers | number | NO | 處理線程數默認2 |
實例:接收rsyslog日誌
配置文件
input{ udp{ port => 1519 type => sentinel } tcp{ port => 1519 type => sentinel } } output{ elasticsearch{ hosts => ["10.118.213.223:9200"] index => "all-log" } stdout{ codec => rubydebug } #測試用,可在控制檯查看接收到的數據 }
輸入
輸出:
介紹:codec:對logstash的輸入或者輸出進行編碼、解碼的插件。
位置:input{ }和output{ }裏面的任何插件裏面。
codec插件:plain
介紹:plain是一個空的解析器,它可讓用戶本身指定格式。
codec插件:json
介紹:若是輸入到logstash的內容是json格式,能夠在input裏面加入codec=>json來進行解析。若是想讓logstash輸出爲json格式,能夠在output字段加入codec=>json。
語法格式:codec=>json
codec插件:json_line
介紹:若是你的json文件比較長,須要換行,能夠用json_line
codec插件:rubydebug
介紹:rubydebug將採用Ruby Awsome Print庫來解析日誌。
實例:輸入json格式,經過rubydebug輸出鍵值對。
配置文件logstash.conf
input{ stdin{ codec => json } } output{ stdout{ codec => rubydebug } }
啓動logstash:./bin/logstash -f logstash.conf
輸入:
{"bookname":"sfpay sentinel log","price":120}
輸出:
{ "@timestamp" => 2018-05-25T10:33:57.149Z, "bookname" => "sfpay sentinel log", "host" => "0.0.0.0", "@version" => "1", "price" => 120 }
codec插件:multiline
介紹:輸入爲多行內容時使用。
filter插件json
介紹:若是數據格式是json,能夠經過該filter把數據解析成你想要的數據結構
參數 | input type | required | default value |
add_field | hash | NO | 添加屬性,默認{ } |
add_tag | array | NO | 添加標識,默認{ } |
remove_field | array | NO | 刪除屬性,默認{ } |
remove_tag | array | NO | 刪除標識,默認{ } |
source | string | YES | 指定來源數據 |
target | string | NO | 定義將要解析的目標字段 |
實例:將message解析到content上面
配置文件
input{ stdin{ } } filter{ json{ source => "message" target => "content" } } output{ stdout{ codec => rubydebug } }
啓動logstash:./bin/logstash -f logstash.conf
輸入:
{"bookname":"sfpay sentinel log","price":120,"user":{"age":32,"name":"sean","comp":"sfpay"}}
輸出:
{ "message" => "{\"bookname\":\"sfpay sentinel log\",\"price\":120,\"user\":{\"age\":32,\"name\":\"sean\",\"comp\":\"sfpay\"}}", "@timestamp" => 2018-05-25T12:01:30.454Z, "content" => { "user" => { "comp" => "sfpay", "age" => 32, "name" => "sean" }, "bookname" => "sfpay sentinel log", "price" => 120 }, "host" => "0.0.0.0", "@version" => "1" }
filter插件grok
介紹:grok是目前logstash中最好的一種解析非結構化數據的攻擊,grok能夠過濾日誌中你想要的字段,測試grok匹配規則的網站:grokdebug.herokuapp.com
參數:
filter { grok { match=>... #可選項 寫匹配規則 } }
Grok是logstash最重要的插件,能夠將非結構化的數據解析爲結構化數據輸出。grok是經過正則匹配的方式進行解析的,以下例子:咱們輸入到logstash的數據爲「55.3.244.1|GET|15824」分別表示客戶端ip,請求方法、請求時間,咱們要如何從裏面解析出咱們須要的數據來呢?
filter{ grok{ match => {"message" => "^(?<client>\d+\.\d+\.\d+\.\d+)\|(?<method>\w+)\|(?<times>\d+)$" } remove_field => ["message"] } }
輸出結果
{ "times" => "15824", "type" => "sentinel", "@timestamp" => 2018-05-26T09:13:22.912Z, "host" => "192.168.1.103", "method" => "GET", "client" => "55.3.244.1", "@version" => "1" }
Grok 支持把預約義的 grok 表達式 寫入到文件中。
下面是從官方文件中摘抄的最簡單可是足夠說明用法的示例
USERNAME [a-zA-Z0-9._-]+ USER %{USERNAME}
第一行,用普通的正則表達式來定義一個 grok 表達式;
第二行,經過打印賦值格式,用前面定義好的 grok 表達式來定義另外一個 grok 表達式。
grok 表達式的打印複製格式的完整語法是下面這樣的:
%{PATTERN_NAME:capture_name:data_type} #data_type 目前只支持兩個值:int 和 float
好比輸入日誌
2015-05-07-16:03:04|10.4.29.158|120.131.74.116|WEB|11299073|http://quxue.renren.com/shareApp?isappinstalled=0&userId=11299073&from=groupmessage|/shareApp|null|Mozilla/5.0 (iPhone; CPU iPhone OS 8_2 like Mac OS X) AppleWebKit/600.1.4 (KHTML, like Gecko) Mobile/12D508 MicroMessenger/6.1.5 NetType/WIFI|duringTime|98||
進行匹配:
filter { grok { match => { "message" => "%{DATA:timestamp}\|%{IP:serverIp}\|%{IP:clientIp}\|%{DATA:logSource}\|%{DATA:userId}\|%{DATA:reqUrl}\|%{DATA:reqUri}\|%{DATA:refer}\|%{DATA:device}\|%{DATA:textDuring}\|%{DATA:duringTime:int}\|\|"} } }
上面中屢次出現的DATA其實就是 (.*?) 正則表達式。
grok實例
input { file { path => ["/var/log/messages_test", "/var/log/maillog_test"] type => "syslog" } file { path => "/root/custom.log" type => "custom" } file { path => "/root/access.log" type => "web" } } filter { if [type] == "custom" { grok { patterns_dir => "./patterns" match => { "message" => '"%{CUSTOM_DATE:timestamp}" "%{CUSTOM_DATA:msg}" "%{CUSTOM_DATA:msg1}" "%{CUSTOM_DATA:msg2}" "%{CUSTOM_DATA:msg3}" "%{CUSTOM_DATA:msg4}" "%{CUSTOM_DATA:msg5}"' } } date { locale => "en_US" timezone => "Asia/Shanghai" match => [ "timestamp", "yyyy-MM-dd HH:mm:ss"] target => "@timestamp" } ruby { code => "event['@timestamp'] = event['@timestamp'].getlocal" } } if [type] == "syslog" { grok { match => { "message" => "%{SYSLOGTIMESTAMP:timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" } add_field => [ "received_at", "%{@timestamp}" ] add_field => [ "received_from", "%{host}" ] } date { locale => "en_US" timezone => "Asia/Shanghai" match => [ "timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss", "ISO8601" ] target => "@timestamp" } ruby { code => "event['@timestamp'] = event['@timestamp'].getlocal" } } if [type] == "web" { grok { match => { "message" => "%{NGINXLOG}" } } date { match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ] } } } output { tcp { # codec => line { # charset => "UTF-8" # } host => "192.168.8.21" mode => "client" port => 44444 } stdout { codec => rubydebug } }
filter插件kv
介紹:經過指定分隔符截取key和value
介紹:output插件功能就是讓logstash處理完數據以後輸出到指定目標。
file插件
配置參數:
path => "/root/access_result" #結果輸出到文件裏
path => "/root/%{+YYYY.MM.dd}-%{host}-access" #結果輸出到文件裏,以時間命名文件
message_format => "%{ip}" #只輸出filter過濾出來的指定字段
elasticsearch插件
介紹:經過logstash解析過的數據輸出到elasticsearch中
例子:
elasticsearch { # 導出到es,最經常使用的插件 codec => "json" # 導出格式爲json hosts => ["127.0.0.1:9200"] # ES地址+端口 index => "logstash-slow-%{+YYYY.MM.dd}" # 導出到index內,可使用時間變量 user => "admin" password => "xxxxxx" # ES若是有安全認證就使用帳號密碼驗證,無安全認證就不須要 }
redis插件
介紹:能夠把數據輸出到redis中
實例:
redis{ # 輸出到redis的插件,下面選項根據需求使用 batch => true # 設爲false,一次rpush,發一條數據,true爲發送一批 batch_events => 50 # 一次rpush發送多少數據 batch_timeout => 5 # 一次rpush消耗多少時間 codec => plain # 對輸出數據進行codec,避免使用logstash的separate filter congestion_interval => 1 # 多長時間進項一次擁塞檢查 congestion_threshold => 5 # 限制一個list中能夠存在多少個item,當數量足夠時,就會阻塞直到有其餘消費者消費list中的數據 data_type => list # 使用list仍是publish db => 0 # 使用redis的那個數據庫,默認爲0號 host => ["127.0.0.1:6379"] # redis 的地址和端口,會覆蓋全局端口 key => xxx # list或channel的名字 password => xxx # redis的密碼,默認不使用 port => 6379 # 全局端口,默認6379,若是host已指定,本條失效 reconnect_interval => 1 # 失敗重連的間隔,默認爲1s timeout => 5 # 鏈接超時的時間 workers => 1 # 工做進程 }
-w # 指定線程,默認是 cpu 核數
-f # 指定配置文件
-t # 測試配置文件是否正常
-b # 執行 filter 模塊以前最大能積累的日誌,數值越大性能越好,同時越佔內存
需求:經過syslog1519端口接收數據庫審計日誌,在logstash進行格式化,而後輸出到kafka中,一樣經過logstash將數據從kafka中讀取出來寫入到elasticsearch中。
架構圖:
從syslog接收數據寫入kafka中logstash配置:
input{ udp{ port => 1519 type => sentinel_audit workers => 4 } } filter{ grok{ match => {"message" => "dst=(?<dip>\d+\.\d+\.\d+\.\d+)\sdpt=(?<dport>\d+)\sduser=(?<dun>.*?)\ssrc=(?<sip>\d+\.\d+\.\d+\.\d+)\sspt=(?<sport>\d+)\sproto=TCP\srt=(?<etime>.*?),\scat=Audit\s(?<title>.*?)\scs1Label=Policy\scs2=(?<sgroup>.*?)\scs2Label=.*?cs5=(?<eid>.*?)\scs5Label=EventId\scs6=(?<etype>.*?)\scs6Label=EventType.*?cs10=(?<sapp>.*?)\scs10Label=SourceApplication\scs11=(?<osuser>.*?)\scs11Label=OSUser\scs12=.*?cs12Label=HostName\scs13=(?<dbname>.*?)\scs13Label=Database.*?cs16=(?<sql>.*?)\scs16Label=ParsedQuery.*?cs19=(?<rsize>\d+)\scs19Label"} } grok{ match =>{"sql" => "(?<sqltype>\w+)\s.+"} } } output{ kafka{ codec => json bootstrap_servers => "10.118.213.175:9092,10.118.213.219:9092" topic_id => "original_log" } stdout{ codec => rubydebug } }
其中時間字段@timestamp由於時區問題,比當前時間晚八小時,可是在kibana插件中會自行處理,後面會用到該插件,若是在這裏修改了時間問題,使用kibana插件時還要再反轉回來。因此就沒有修改時間問題。
其中也有幾個問題須要弄明白
1.寫入在新版本的logstash output kafka插件裏面已經沒有了workers屬性配置,如何配置寫入kafka線程數、如何控制向kafka中每一個分區寫數據?
參考資料:https://www.elastic.co/guide/en/logstash/current/plugins-outputs-kafka.html
從kafka中讀取數據寫入elasticsearch中logstash:
input{ kafka{ codec => json bootstrap_servers => "10.118.213.175:9092,10.118.213.219:9092" client_id => "logstash_index1" consumer_threads => 4 group_id => "logstash_index" topics =>["original_log"] type => "sentinel_audit" } } output{ elasticsearch{ codec => json hosts => ["10.118.213.223:9200","10.118.213.224:9200"] index => "db_audit" } stdout{ codec => rubydebug } }
讀取消息時使用多個logstash從kafka中消費同一個主題,每一個logstash配置消費若干分區,且這些logstash要配置相同group_id.(一個分區只能被一個線程消費,但一個消費線程能夠消費多個分區的數據)
參考:https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html
官網上面有詳細的參數配置說明。