Logstash 就是一個開源的數據流工具,它會作三件事:html
例如:java
方便省事。mysql
假設你須要從 kafka 中消費數據,而後寫入 elasticsearch ,若是本身編碼,你得去對接 kafka 和 elasticsearch 的 API 吧,若是你用 Logstash ,這部分就不用本身去實現了,由於 Logstash 已經爲你封裝了對應的 plugin
插件,你只須要寫一個配置文件形如:git
input { kafka { # kafka consumer 配置 } } filter { # 數據處理配置 } output { elasticsearch { # elasticsearch 輸出配置 } }
而後運行 logstash 就能夠了。github
Logstash 提供了兩百多個封裝好的 plugin
插件,這些插件被分爲三類:web
input plugin
: 從哪裏拉取數據filter plugin
: 數據如何處理output plugin
: 數據寫入何處使用 logstash 你只要編寫一個配置文件,在配置文件中挑選組合這些 plugin
插件,就能夠輕鬆實現數據從輸入源到輸出源的實時流動。正則表達式
請參數:官方文檔redis
假設你已經安裝好了 logstash ,而且可執行文件的路徑已經加入到了 PATH 環境變量中。sql
下面開始咱們的第一個示例,編寫 pipeline.conf
文件,內容爲:mongodb
input { stdin { } } filter { } output { stdout { } }
這個配置文件的含義是:
input
輸入爲 stdin
(標準輸入)filter
爲空(也就是不進行數據的處理)output
輸出爲 stdout
(標準輸出)執行命令:
logstash -f pipeline.conf
等待 logstash 啓動完畢,輸入 hello world 而後回車, 你就會看到如下輸出內容:
{ "message" => "hello world", "@version" => "1", "@timestamp" => 2020-11-01T08:25:10.987Z, "host" => "local" }
咱們輸入的內容已經存在於 message
字段中了。
當你輸入其餘內容後也會看到相似的輸出。
至此,咱們的第一個示例已經完成,正如配置文件中所定義的,Logstash 從 stdin 標準輸入讀取數據,不對源數據作任何處理,而後輸出到 stdout 標準輸出。
event
: 數據在 logstash 中被包裝成 event
事件的形式從 input 到 filter 再到 output 流轉。@timestamp
: 特殊字段,標記 event 發生的時間。@version
: 特殊字段,標記 event 的版本號。message
: 源數據內容。@metadata
: 元數據,key/value 的形式,是否有數據得看具體插件,例如 kafka 的 input 插件會在 @metadata
裏記錄 topic、consumer_group、partition、offset 等一些元數據。tags
: 記錄 tag 的字符串數組。在配置文件中,能夠經過 [field]
的形式引用字段內容,若是在字符串中,則能夠經過 %{[field]}
的方式進行引用。
示例:
input { kafka { # kafka 配置 } } filter { # 引用 log_level 字段的內容進行判斷 if [log_level] == "debug" { } } output { elasticsearch { # %{+yyyy.MM.dd} 來源於 @timestamp index => "log-%{+yyyy.MM.dd}" document_type => "_doc" document_id => "%{[@metadata][kafka][key]}" hosts => ["127.0.0.1:9200"] } }
用好 Logstash 的第一步就是熟悉 plugin 插件,只有熟悉了這些插件你才能快速高效的創建數據管道。
Input plugin
Input
插件定義了數據源,即 logstash 從哪裏拉取數據。
beats
: 從 Elastic Beats 框架中接收數據。示例:
input { beats { port => 5044 } }
dead_letter_queue
: 從 Logstash 本身的 dead letter queue 中拉取數據,目前 dead letter queue 只支持記錄 output 爲 elasticsearch 時寫入 400 或 404 的數據。示例:
input { dead_letter_queue { path => "/var/logstash/data/dead_letter_queue" start_timestamp => "2017-04-04T23:40:37" } }
elasticsearch
: 從 elasticsearch 中讀取 search query 的結果。示例:
input { elasticsearch { hosts => "localhost" query => '{ "query": { "match": { "statuscode": 200 } } }' } }
exec
: 按期執行一個 shell 命令,而後捕獲其輸出。示例:
input { exec { command => "ls" interval => 30 } }
file
: 從文件中流式讀取內容。示例:
input { file { path => ["/var/log/*.log", "/var/log/message"] start_position => "beginning" } }
generator
: 生成隨機數據。示例:
input { generator { count => 3 lines => [ "line 1", "line 2", "line 3" ] } }
github
: 從 github webhooks 中讀取數據。graphite
: 接受 graphite 的 metrics 指標數據。heartbeat
: 生成心跳信息。這樣作的通常目的是測試 Logstash 的性能和可用性。http
: Logstash 接受 http 請求做爲數據。http_poller
: Logstash 發起 http 請求,讀取響應數據。示例:
input { http_poller { urls => { test1 => "http://localhost:9200" test2 => { method => get user => "AzureDiamond" password => "hunter2" url => "http://localhost:9200/_cluster/health" headers => { Accept => "application/json" } } } request_timeout => 60 schedule => { cron => "* * * * * UTC"} codec => "json" metadata_target => "http_poller_metadata" } }
imap
: 從 IMAP 服務器讀取郵件。jdbc
: 經過 JDBC 接口導入數據庫中的數據。示例:
input { jdbc { jdbc_driver_library => "mysql-connector-java-5.1.36-bin.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://localhost:3306/mydb" jdbc_user => "mysql" parameters => { "favorite_artist" => "Beethoven" } schedule => "* * * * *" statement => "SELECT * from songs where artist = :favorite_artist" } }
kafka
: 消費 kafka 中的消息。示例:
input { kafka { bootstrap_servers => "127.0.0.1:9092" group_id => "consumer_group" topics => ["kafka_topic"] enable_auto_commit => true auto_commit_interval_ms => 5000 auto_offset_reset => "latest" decorate_events => true isolation_level => "read_uncommitted" max_poll_records => 1000 } }
rabbitmq
: 從 RabbitMQ 隊列中拉取數據。redis
: 從 redis 中讀取數據。stdin
: 從標準輸入讀取數據。syslog
: 讀取 syslog 數據。tcp
: 經過 TCP socket 讀取數據。udp
: 經過 udp 讀取數據。unix
: 經過 UNIX socket 讀取數據。websocket
: 經過 websocket 協議 讀取數據。Output plugin
Output
插件定義了數據的輸出地,即 logstash 將數據寫入何處。
csv
: 將數據寫入 csv 文件。elasticsearch
: 寫入 Elasticsearch 。email
: 發送 email 郵件。exec
: 執行命令。file
: 寫入磁盤文件。graphite
: 寫入 Graphite 。http
: 發送 http 請求。influxdb
: 寫入 InfluxDB 。kafka
: 寫入 Kafka 。mongodb
: 寫入 MongoDB 。opentsdb
: 寫入 OpenTSDB 。rabbitmq
: 寫入 RabbitMQ 。redis
: 使用 RPUSH 的方式寫入到 Redis 隊列。sink
: 將數據丟棄,不寫入任何地方。syslog
: 將數據發送到 syslog 服務端。tcp
: 發送 TCP socket。udp
: 發送 UDP 。webhdfs
: 經過 webhdfs REST API 寫入 HDFS 。websocket
: 推送 websocket 消息 。Filter plugin
Filter
插件定義對數據進行如何處理。
aggregate
: 聚合數據。alter
: 修改數據。bytes
: 將存儲大小如 "123 MB" 或 "5.6gb" 的字符串表示形式解析爲以字節爲單位的數值。cidr
: 檢查 IP 地址是否在指定範圍內。示例:
filter { cidr { add_tag => [ "testnet" ] address => [ "%{src_ip}", "%{dst_ip}" ] network => [ "192.0.2.0/24" ] } }
cipher
: 對數據進行加密或解密。clone
: 複製 event 事件。csv
: 解析 CSV 格式的數據。date
: 解析字段中的日期數據。示例,匹配輸入的 timestamp 字段,而後替換 @timestamp :
filter { date { match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss ZZ"] target => "@timestamp" } }
dissect
: 使用 %{}
的形式拆分字符串並提取出特定內容,比較經常使用,具體語法見 dissect 文檔。drop
: 丟棄這個 event 。示例:
filter { if [loglevel] == "debug" { drop { } } }
elapsed
: 經過記錄開始和結束時間跟蹤 event 的耗時。elasticsearch
: 在 elasticsearch 中進行搜索,並將數據複製到當前 event 中。environment
: 將環境變量中的數據存儲到 @metadata 字段中。extractnumbers
: 提取字符串中找到的全部數字。fingerprint
: 根據一個或多個字段的內容建立哈希值,並存儲到新的字段中。geoip
: 使用綁定的 GeoLite2 數據庫添加有關 IP 地址的地理位置的信息,這個插件很是有用,你能夠根據 IP 地址獲得對應的國家、省份、城市、經緯度等地理位置數據。示例,經過 clent_ip 字段獲取對應的地理位置信息:
filter { geoip { cache_size => 1000 default_database_type => "City" source => "clent_ip" target => "geo" tag_on_failure => ["_geoip_city_fail"] add_field => { "geo_country_name" => "%{[geo][country_name]}" "geo_region_name" => "%{[geo][region_name]}" "geo_city_name" => "%{[geo][city_name]}" "geo_location" => "%{[geo][latitude]},%{[geo][longitude]}" } remove_field => ["geo"] } }
grok
: 經過正則表達式去處理字符串,比較經常使用,具體語法見 grok 文檔。http
: 與外部 web services/REST APIs 集成。i18n
: 從字段中刪除特殊字符。java_uuid
: 生成 UUID 。jdbc_static
: 從遠程數據庫中讀取數據,而後豐富 event 。jdbc_streaming
: 執行 SQL 查詢而後將結果存儲到指定字段。json
: 解析 json 字符串,生成 field 和 value。示例:
filter { json { skip_on_invalid_json => true source => "message" } }
若是輸入的 message 字段是 json 字符串如 "{"a": 1, "b": 2}"
, 那麼解析後就會增長兩個字段,字段名分別是 a 和 b 。
kv
: 解析 key=value 形式的數據。memcached
: 與外部 memcached 集成。metrics
: logstash 在內存中去聚合指標數據。mutate
: 對字段進行一些常規更改。示例:
filter { mutate { split => ["hostname", "."] add_field => { "shortHostname" => "%{hostname[0]}" } } mutate { rename => ["shortHostname", "hostname"] } }
prune
: 經過黑白名單的方式刪除多餘的字段。示例:
filter { prune { blacklist_names => [ "method", "(referrer|status)", "${some}_field" ] } }
ruby
: 執行 ruby 代碼。示例,解析 http://example.com/abc?q=haha
形式字符串中的 query 參數 q 的值 :
filter { ruby { code => " require 'cgi' req = event.get('request_uri').split('?') query = '' if req.length > 1 query = req[1] qh = CGI::parse(query) event.set('search_q', qh['q'][0]) end " } }
在 ruby 代碼中,字段的獲取和設置經過 event.get()
和 event.set()
方法進行操做。
sleep
: 休眠指定時間。split
: 拆分字段。throttle
: 限流,限制 event 數量。translate
: 根據指定的字典文件將數據進行對應轉換。示例:
filter { translate { field => "[http_status]" destination => "[http_status_description]" dictionary => { "100" => "Continue" "101" => "Switching Protocols" "200" => "OK" "500" => "Server Error" } fallback => "I'm a teapot" } }
truncate
: 將字段內容超出長度的部分裁剪掉。urldecode
: 對 urlencoded 的內容進行解碼。useragent
: 解析 user-agent 的內容獲得諸如設備、操做系統、版本等信息。示例:
filter { # ua_device : 設備 # ua_name : 瀏覽器 # ua_os : 操做系統 useragent { lru_cache_size => 1000 source => "user_agent" target => "ua" add_field => { "ua_device" => "%{[ua][device]}" "ua_name" => "%{[ua][name]}" "ua_os" => "%{[ua][os_name]}" } remove_field => ["ua"] } }
uuid
: 生成 UUID 。xml
: 解析 XML 格式的數據。Logstash 的插件除了本文提到的這些以外還有不少,想要詳細的瞭解每一個插件如何使用仍是要去查閱官方文檔。
得益於 Logstash 的插件體系,你只須要編寫一個配置文件,聲明使用哪些插件,就能夠很輕鬆的構建數據管道。