Logstash插件介紹

1.Logstash插件分類

inputs:輸入html

codecs:解碼web

filters:過濾正則表達式

outputs:輸出redis

logstash中支持少許的數據類型sql

bool 類型:debug => true 
string類型: host => 「hostname」 
number類型: port => 5044 
array類型: match => [「datetime」 , 「unitime」] 
hash類型: options => { key1 => 「value1」, key2 => 「value2」 }數據庫

logstash中字段引用json

要在 Logstash 配置中使用字段的值,只須要把字段的名字寫在中括號 [] 裏就好了,這就叫字段引用。還需注意字段層次。若是引用的是一個頂級字段,能夠省略[],直接指定字段名。要引用嵌套的字段,須要指定完整的路徑。bootstrap

例如:下面配置中有5個頂級字段(agent、ip、request、response、ua),3個嵌套字段(status、bytes、os)windows

{
  "agent": "Mozilla/5.0 (compatible; MSIE 9.0)",
  "ip": "192.168.24.44",
  "request": "/index.html"
  "response": {
    "status": 200,
    "bytes": 52353
  },
  "ua": {
    "os": "<a href="http://www.ttlsa.com/windows/" title="windows"target="_blank">Windows</a> 7"
  }
}

引用os字段,需指定[ua][os]。引用頂級字段如request,直接指定request便可。安全

條件判斷

表達式直接語法格式

● ==(等於), !=(不等於), <(小於), >(大於), <=(小於等於), >=(大於等於) 
● =~(匹配正則), !~(不匹配正則) 
● in(包含), not in(不包含) 
● and(與), or(或), nand(非與), xor(非或) 
● ()(複合表達式), !()(對複合表達式結果取反)

 

2.Logstash的input插件

stdin標準輸入

參數 input type required default value
add_field hash NO { }
codec codec NO "line"
tags array NO  
type string NO 自定義

file文件輸入

參數 input type required default value
codec codec NO 默認plain,可經過該參數設置編碼方式
discover_interval number NO 隔多久檢查一次path下是否有新文件,默認15秒
exclude array NO 排除不想監聽的文件
sincedb_path string NO 記錄logstash讀取目標文件位置的文件,可自定義指定
stat_interval number NO 隔多久檢查一次被監聽是否有更新,默認1秒
start_position string NO 從什麼位置開始讀取文件,默認從結束位置,相似tial -f命令,配置爲"beginning"就會從開頭讀取文件
path array YES 處理文件路徑,能夠配置多個
tags array NO 在數據處理過程當中,由具體插件來添加或刪除的標記
type string NO 自定義數據類型

實例:收集用戶操做日誌

配置文件

input {
	file {
		path => [ "/var/log/history.log" ]
		type => "history_log"
        start_position => "beginning"
  	}
}
output {
    stdout{}
}

TCP/UDP輸入

TCP

參數 input type required default value
add_field hash NO { }
codec codec NO plain
data_timeout number NO 1
host   NO 0.0.0.0
mode   NO "server","client"其中之一,默認是server
port number YES 端口號
ssl_cacert   NO  
ssl_cert   NO  
ssl_enable   NO  
ssk_key   NO  
ssl_key_passphrase   NO  
ssl_verify   NO  
tags array NO  
type string NO  

UDP

參數 input type required default value
add_field hash { }
host string NO 0.0.0.0
port number YES 端口號,監聽數據包
tags array NO  
type string NO  
workers number NO 處理線程數默認2

實例:接收rsyslog日誌

配置文件

input{
        udp{
                port => 1519
                type => sentinel
        }
        tcp{
                port => 1519
                type => sentinel
        }
}
output{
        elasticsearch{
                hosts => ["10.118.213.223:9200"]
                index => "all-log"
        }

        stdout{ codec => rubydebug }   #測試用,可在控制檯查看接收到的數據

}
  • 輸入

    • 設置監聽TCP/UDP的1519端口,類型是rsyslog。
    • 這樣配置後logstash服務啓動時將會做爲1個rsyslog服務器接收來自其餘rsyslog的日誌。
  • 輸出:

    • 將日誌輸出到elasticsearch,
    • 同時,將收到的日誌打印到標準輸出。

3.Logstash的codec插件

介紹:codec:對logstash的輸入或者輸出進行編碼、解碼的插件。

位置:input{ }和output{ }裏面的任何插件裏面。

codec插件:plain

介紹:plain是一個空的解析器,它可讓用戶本身指定格式。

codec插件:json

介紹:若是輸入到logstash的內容是json格式,能夠在input裏面加入codec=>json來進行解析。若是想讓logstash輸出爲json格式,能夠在output字段加入codec=>json。

語法格式:codec=>json

codec插件:json_line

介紹:若是你的json文件比較長,須要換行,能夠用json_line

codec插件:rubydebug

介紹:rubydebug將採用Ruby Awsome Print庫來解析日誌。

實例:輸入json格式,經過rubydebug輸出鍵值對。

配置文件logstash.conf

input{
        stdin{
                codec => json
        }
}
output{
        stdout{
                codec => rubydebug
        }
}

啓動logstash:./bin/logstash -f logstash.conf

輸入:

{"bookname":"sfpay sentinel log","price":120}

輸出:

{
    "@timestamp" => 2018-05-25T10:33:57.149Z,
      "bookname" => "sfpay sentinel log",
          "host" => "0.0.0.0",
      "@version" => "1",
         "price" => 120
}

codec插件:multiline

介紹:輸入爲多行內容時使用。

4.Logstash的filter插件

filter插件json

介紹:若是數據格式是json,能夠經過該filter把數據解析成你想要的數據結構

參數 input type required default value
add_field hash NO 添加屬性,默認{ }
add_tag array NO 添加標識,默認{ }
remove_field array NO 刪除屬性,默認{ }
remove_tag array NO 刪除標識,默認{ }
source string YES 指定來源數據
target string NO 定義將要解析的目標字段

實例:將message解析到content上面

配置文件

input{
        stdin{
        }
}

filter{
        json{
                source => "message"
                target => "content"
        }
}

output{
        stdout{
                codec => rubydebug
        }
}

啓動logstash:./bin/logstash -f logstash.conf

 

輸入:

{"bookname":"sfpay sentinel log","price":120,"user":{"age":32,"name":"sean","comp":"sfpay"}}

輸出:

{
       "message" => "{\"bookname\":\"sfpay sentinel log\",\"price\":120,\"user\":{\"age\":32,\"name\":\"sean\",\"comp\":\"sfpay\"}}",
    "@timestamp" => 2018-05-25T12:01:30.454Z,
       "content" => {
            "user" => {
            "comp" => "sfpay",
             "age" => 32,
            "name" => "sean"
        },
        "bookname" => "sfpay sentinel log",
           "price" => 120
    },
          "host" => "0.0.0.0",
      "@version" => "1"
}

filter插件grok

介紹:grok是目前logstash中最好的一種解析非結構化數據的攻擊,grok能夠過濾日誌中你想要的字段,測試grok匹配規則的網站:grokdebug.herokuapp.com

參數:

filter {
    grok {
        match=>...        #可選項        寫匹配規則
    }
}

 Grok是logstash最重要的插件,能夠將非結構化的數據解析爲結構化數據輸出。grok是經過正則匹配的方式進行解析的,以下例子:咱們輸入到logstash的數據爲「55.3.244.1|GET|15824」分別表示客戶端ip,請求方法、請求時間,咱們要如何從裏面解析出咱們須要的數據來呢?

filter{
        grok{
                match => {"message" => "^(?<client>\d+\.\d+\.\d+\.\d+)\|(?<method>\w+)\|(?<times>\d+)$"  }
                remove_field => ["message"]
        }
}

輸出結果

{
         "times" => "15824",
          "type" => "sentinel",
    "@timestamp" => 2018-05-26T09:13:22.912Z,
          "host" => "192.168.1.103",
        "method" => "GET",
        "client" => "55.3.244.1",
      "@version" => "1"
}

Grok 支持把預約義的 grok 表達式 寫入到文件中。

下面是從官方文件中摘抄的最簡單可是足夠說明用法的示例

USERNAME [a-zA-Z0-9._-]+
USER %{USERNAME}

第一行,用普通的正則表達式來定義一個 grok 表達式;

第二行,經過打印賦值格式,用前面定義好的 grok 表達式來定義另外一個 grok 表達式。

grok 表達式的打印複製格式的完整語法是下面這樣的:

%{PATTERN_NAME:capture_name:data_type}   #data_type 目前只支持兩個值:int 和 float

好比輸入日誌

2015-05-07-16:03:04|10.4.29.158|120.131.74.116|WEB|11299073|http://quxue.renren.com/shareApp?isappinstalled=0&userId=11299073&from=groupmessage|/shareApp|null|Mozilla/5.0 (iPhone; CPU iPhone OS 8_2 like Mac OS X) AppleWebKit/600.1.4 (KHTML, like Gecko) Mobile/12D508 MicroMessenger/6.1.5 NetType/WIFI|duringTime|98||

進行匹配:

filter {

  grok {
    match => { "message" => "%{DATA:timestamp}\|%{IP:serverIp}\|%{IP:clientIp}\|%{DATA:logSource}\|%{DATA:userId}\|%{DATA:reqUrl}\|%{DATA:reqUri}\|%{DATA:refer}\|%{DATA:device}\|%{DATA:textDuring}\|%{DATA:duringTime:int}\|\|"}
  }

}

上面中屢次出現的DATA其實就是 (.*?) 正則表達式。

grok實例

input {
	file {
	  path => ["/var/log/messages_test", "/var/log/maillog_test"]
	  type => "syslog"
	}

	file {
	  path => "/root/custom.log"
	  type => "custom"
	}

	file {
	  path => "/root/access.log"
	  type => "web"
	}

}

filter {
	if [type] == "custom" {
	    grok {
	      patterns_dir => "./patterns"
	      match => { "message" => '"%{CUSTOM_DATE:timestamp}" "%{CUSTOM_DATA:msg}" "%{CUSTOM_DATA:msg1}" "%{CUSTOM_DATA:msg2}" "%{CUSTOM_DATA:msg3}" "%{CUSTOM_DATA:msg4}" "%{CUSTOM_DATA:msg5}"' }
	    }
	    date {
	        locale => "en_US"
	        timezone => "Asia/Shanghai"
	        match => [ "timestamp", "yyyy-MM-dd HH:mm:ss"]
	        target => "@timestamp"
	    }

	    ruby {
	        code => "event['@timestamp'] = event['@timestamp'].getlocal"
	    }
	}

	if [type] == "syslog" {
	    grok {
	      match => { "message" => "%{SYSLOGTIMESTAMP:timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }
	      add_field => [ "received_at", "%{@timestamp}" ]
	      add_field => [ "received_from", "%{host}" ]
	    }
	    date {
	        locale => "en_US"
	        timezone => "Asia/Shanghai"
	        match => [ "timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss", "ISO8601" ]
	        target => "@timestamp"
	    }

	    ruby {
	        code => "event['@timestamp'] = event['@timestamp'].getlocal"
	    }
  	}

	if [type] == "web" {
	    grok {
	    	match => { "message" => "%{NGINXLOG}" }
	    }

	    date {
	  	match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
	    }
	}
}

output {
  tcp {
    #  codec => line {
    #    charset => "UTF-8"
    #  }
    host => "192.168.8.21"
    mode => "client"
    port => 44444
  }
  stdout { codec => rubydebug }
}

filter插件kv

介紹:經過指定分隔符截取key和value

5.Logstash的output插件

介紹:output插件功能就是讓logstash處理完數據以後輸出到指定目標。

file插件

配置參數:

path => "/root/access_result"                 #結果輸出到文件裏
path => "/root/%{+YYYY.MM.dd}-%{host}-access" #結果輸出到文件裏,以時間命名文件
message_format => "%{ip}"                      #只輸出filter過濾出來的指定字段

elasticsearch插件

介紹:經過logstash解析過的數據輸出到elasticsearch中

例子:

elasticsearch {  # 導出到es,最經常使用的插件
        codec => "json"
        # 導出格式爲json
        hosts => ["127.0.0.1:9200"]
        # ES地址+端口
        index => "logstash-slow-%{+YYYY.MM.dd}"
        # 導出到index內,可使用時間變量
        user => "admin"
        password => "xxxxxx"
        # ES若是有安全認證就使用帳號密碼驗證,無安全認證就不須要
    }

redis插件

介紹:能夠把數據輸出到redis中

實例:

redis{  # 輸出到redis的插件,下面選項根據需求使用
         batch => true
         # 設爲false,一次rpush,發一條數據,true爲發送一批
         batch_events => 50
         # 一次rpush發送多少數據
         batch_timeout => 5
         # 一次rpush消耗多少時間
         codec => plain
         # 對輸出數據進行codec,避免使用logstash的separate filter
         congestion_interval => 1
         # 多長時間進項一次擁塞檢查
         congestion_threshold => 5
         # 限制一個list中能夠存在多少個item,當數量足夠時,就會阻塞直到有其餘消費者消費list中的數據
         data_type => list
         # 使用list仍是publish
         db => 0
         # 使用redis的那個數據庫,默認爲0號
         host => ["127.0.0.1:6379"]
         # redis 的地址和端口,會覆蓋全局端口
         key => xxx
         # list或channel的名字
         password => xxx
         # redis的密碼,默認不使用
         port => 6379
         # 全局端口,默認6379,若是host已指定,本條失效
         reconnect_interval => 1
         # 失敗重連的間隔,默認爲1s
         timeout => 5
         # 鏈接超時的時間
         workers => 1
         # 工做進程
     }

6.Logstash啓動參數

-w # 指定線程,默認是 cpu 核數 
-f # 指定配置文件 
-t # 測試配置文件是否正常 
-b # 執行 filter 模塊以前最大能積累的日誌,數值越大性能越好,同時越佔內存

7.項目實戰

需求:經過syslog1519端口接收數據庫審計日誌,在logstash進行格式化,而後輸出到kafka中,一樣經過logstash將數據從kafka中讀取出來寫入到elasticsearch中。

架構圖:

從syslog接收數據寫入kafka中logstash配置:

input{
        udp{
                port => 1519
                type => sentinel_audit
                workers => 4
        }
}

filter{
        grok{
                match => {"message" => "dst=(?<dip>\d+\.\d+\.\d+\.\d+)\sdpt=(?<dport>\d+)\sduser=(?<dun>.*?)\ssrc=(?<sip>\d+\.\d+\.\d+\.\d+)\sspt=(?<sport>\d+)\sproto=TCP\srt=(?<etime>.*?),\scat=Audit\s(?<title>.*?)\scs1Label=Policy\scs2=(?<sgroup>.*?)\scs2Label=.*?cs5=(?<eid>.*?)\scs5Label=EventId\scs6=(?<etype>.*?)\scs6Label=EventType.*?cs10=(?<sapp>.*?)\scs10Label=SourceApplication\scs11=(?<osuser>.*?)\scs11Label=OSUser\scs12=.*?cs12Label=HostName\scs13=(?<dbname>.*?)\scs13Label=Database.*?cs16=(?<sql>.*?)\scs16Label=ParsedQuery.*?cs19=(?<rsize>\d+)\scs19Label"}
        }

        grok{
                match =>{"sql" => "(?<sqltype>\w+)\s.+"}
        }
}


output{
        kafka{
                codec => json
                bootstrap_servers => "10.118.213.175:9092,10.118.213.219:9092"
                topic_id => "original_log"
        }

        stdout{ codec => rubydebug }

}

其中時間字段@timestamp由於時區問題,比當前時間晚八小時,可是在kibana插件中會自行處理,後面會用到該插件,若是在這裏修改了時間問題,使用kibana插件時還要再反轉回來。因此就沒有修改時間問題。

其中也有幾個問題須要弄明白

1.寫入在新版本的logstash output kafka插件裏面已經沒有了workers屬性配置,如何配置寫入kafka線程數、如何控制向kafka中每一個分區寫數據?

參考資料:https://www.elastic.co/guide/en/logstash/current/plugins-outputs-kafka.html

從kafka中讀取數據寫入elasticsearch中logstash:

input{
        kafka{
                codec => json
                bootstrap_servers => "10.118.213.175:9092,10.118.213.219:9092"
                client_id => "logstash_index1"
                consumer_threads => 4
                group_id => "logstash_index"
                topics =>["original_log"]
                type => "sentinel_audit"
        }
}

output{
        elasticsearch{
                codec => json
                hosts => ["10.118.213.223:9200","10.118.213.224:9200"]
                index => "db_audit"
        }
        stdout{ codec => rubydebug }
}

讀取消息時使用多個logstash從kafka中消費同一個主題,每一個logstash配置消費若干分區,且這些logstash要配置相同group_id.(一個分區只能被一個線程消費,但一個消費線程能夠消費多個分區的數據)

 

參考:https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html

官網上面有詳細的參數配置說明。

相關文章
相關標籤/搜索