做者 | WenasWeihtml
Logstash是具備實時流水線功能的開源數據收集引擎。Logstash能夠動態統一來自不一樣來源的數據,並將數據標準化到您選擇的目標位置。清除全部數據並使其民主化,以用於各類高級下游分析和可視化用例。java
Logstash 是一個數據流引擎:node
官方介紹:Logstash is an open source data collection engine with real-time pipelining capabilities。簡單來講logstash就是一根具有實時數據傳輸能力的管道,負責將數據信息從管道的輸入端傳輸到管道的輸出端;與此同時這根管道還可讓你根據本身的需求在中間加上濾網,Logstash提供裏不少功能強大的濾網以知足你的各類應用場景。mysql
Logstash 是一個功能強大的工具,可與各類部署集成。 它提供了大量插件,可幫助你解析,豐富,轉換和緩衝來自各類來源的數據。 若是你的數據須要 Beats 中沒有的其餘處理,則須要將 Logstash 添加到部署中。linux
當下最爲流行的數據源:git
Logstash 能夠攝入日誌,文件,指標或者網路真實數據。通過 Logstash 的處理,變爲可使用的 Web
Apps 能夠消耗的數據,也能夠存儲於數據中心,或變爲其它的流式數據:程序員
Logstash 包含3個主要部分: 輸入(inputs),過濾器(filters)和輸出(outputs)github
Logstash的事件(logstash將數據流中等每一條數據稱之爲一個event)處理流水線有三個主要角色完成:inputs –> filters –> outputs:web
tar -zxvf jdk-8u152-linux-x64.tar.gz
mkdir -p /usr/local/java
mv jdk1.8.0_152/ /usr/local/java/
chown -R root:root /usr/local/java/
vi /etc/environment
PATH="/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games" export JAVA_HOME=/usr/local/java/jdk1.8.0_152 export JRE_HOME=/usr/local/java/jdk1.8.0_152/jre export CLASSPATH=$CLASSPATH:$JAVA_HOME/lib:$JAVA_HOME/jre/lib
nano /etc/profile
if [ "$PS1" ]; then if [ "$BASH" ] && [ "$BASH" != "/bin/sh" ]; then # The file bash.bashrc already sets the default PS1. # PS1='\h:\w\$ ' if [ -f /etc/bash.bashrc ]; then . /etc/bash.bashrc fi else if [ "`id -u`" -eq 0 ]; then PS1='# ' else PS1='$ ' fi fi fi export JAVA_HOME=/usr/local/java/jdk1.8.0_152 export JRE_HOME=/usr/local/java/jdk1.8.0_152/jre export CLASSPATH=$CLASSPATH:$JAVA_HOME/lib:$JAVA_HOME/jre/lib export PATH=$JAVA_HOME/bin:$JAVA_HOME/jre/bin:$PATH:$HOME/bin if [ -d /etc/profile.d ]; then for i in /etc/profile.d/*.sh; do if [ -r $i ]; then . $i fi done unset i fi
source /etc/profile
$ java -version java version "1.8.0_152" Java(TM) SE Runtime Environment (build 1.8.0_152-b16) Java HotSpot(TM) 64-Bit Server VM (build 25.152-b16, mixed mode)
$ sudo mkdir /usr/local/logstash
$ wget -P /usr/local/logstash https://artifacts.elastic.co/downloads/logstash/logstash-6.2.4.tar.gz
$ cd /usr/local/logstash/ $ sudo tar -zxvf logstash-6.2.4.tar.gz
測試: 快速啓動,標準輸入輸出做爲input和output,沒有filter正則表達式
$ cd logstash-6.2.4/ $ ./bin/logstash -e 'input { stdin {} } output { stdout {} }' Sending Logstash's logs to /usr/local/logstash/logstash-6.2.4/logs which is now configured via log4j2.properties [2021-05-27T00:22:28,729][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"fb_apache", :directory=>"/usr/local/logstash/logstash-6.2.4/modules/fb_apache/configuration"} [2021-05-27T00:22:28,804][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"netflow", :directory=>"/usr/local/logstash/logstash-6.2.4/modules/netflow/configuration"} [2021-05-27T00:22:29,827][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified [2021-05-27T00:22:30,979][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"6.2.4"} [2021-05-27T00:22:31,821][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600} [2021-05-27T00:22:36,463][INFO ][logstash.pipeline ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>1, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50} [2021-05-27T00:22:36,690][INFO ][logstash.pipeline ] Pipeline started successfully {:pipeline_id=>"main", :thread=>"#<Thread:0x55a5abea run>"} The stdin plugin is now waiting for input: [2021-05-27T00:22:36,853][INFO ][logstash.agent ] Pipelines running {:count=>1, :pipelines=>["main"]} ## 此時命令窗口停留在等待輸入狀態,鍵盤鍵入任意字符 ## hello world ## 下方是Logstash輸出到效果 ## { "@timestamp" => 2021-05-26T16:22:52.527Z, "host" => "*******", "message" => "hello world", "@version" => "1" }
參數 | 說明 | 舉例 |
---|---|---|
-e | 當即執行,使用命令行裏的配置參數啓動實例 | ./bin/logstash -e ‘input {stdin {}} output {stdout {}}’ |
-f | 指定啓動實例的配置文件 | ./bin/logstash -f config/test.conf |
-t | 測試配置文件的正確性 | ./bin/logstash-f config/test.conf -t |
-l | 指定日誌文件名稱 | ./bin/logstash-f config/test.conf -l logs/test.log |
-w | 指定filter線程數量,默認線程數是5 | ./bin/logstash-f config/test.conf -w 8 |
Logstash經過{}來定義區域,區域內能夠定義插件,一個區域內能夠定義多個插件,以下:
input { stdin { } beats { port => 5044 } }
Logstash僅支持少許的數據類型:
Logstash數據流中的數據被稱之爲Event對象,Event以JSON結構構成,Event的屬性被稱之爲字段,若是你像在配置文件中引用這些字段,只須要把字段的名字寫在中括號[]裏就好了,如[type],對於嵌套字段每層字段名稱都寫在[]裏就能夠了,好比:tags;除此以外,對於Logstash的arrag類型支持下標與倒序下表,如:tags[0],tags[-1]。
Logstash支持下面的操做符:
例如:
if EXPRESSION { ... } else if EXPRESSION { ... } else { ... }
Logstash支持引用系統環境變量,環境變量不存在時能夠設置默認值,例如:
export TCP_PORT=12345 input { tcp { port => "${TCP_PORT:54321}" } }
輸入插件包含有如下多種,詳情查看官網文檔-經常使用輸入插件:
文件讀取插件主要用來抓取文件的變化信息,將變化信息封裝成Event進程處理或者傳遞。
配置事例
input file { path => ["/var/log/*.log", "/var/log/message"] type => "system" start_position => "beginning" } }
參數名稱 | 類型 | 默認值 | 描述信息 |
---|---|---|---|
add_field | hash | {} | 用於向Event中添加字段 |
close_older | number | 3600 | 設置文件多久秒內沒有更新就關掉對文件的監聽 |
codec | string | 「plain」 | 輸入數據以後對數據進行解碼 |
delimiter | string | 「\n」 | 文件內容的行分隔符,默認按照行進行Event封裝 |
discover_interval | number | 15 | 間隔多少秒查看一下path匹配對路徑下是否有新文件產生 |
enable_metric | boolean | true | |
exclude | array | 無 | path匹配的文件中指定例外,如:path => 「/var/log/「;exclude =>」.gz」 |
id | string | 無 | 區分兩個相同類型的插件,好比兩個filter,在使用Monitor API監控是能夠區分,建議設置上ID |
ignore_older | number | 無 | 忽略歷史修改,若是設置3600秒,logstash只會發現一小時內被修改過的文件,一小時以前修改的文件的變化不會被讀取,若是再次修改該文件,全部的變化都會被讀取,默認被禁用 |
max_open_files | number | 無 | logstash能夠同時監控的文件個數(同時打開的file_handles個數),若是你須要處理多於這個數量多文件,可使用「close_older」去關閉一些文件 |
path | array | 無 | 必須設置項,用於匹配被監控的文件,如「/var/log/.log」或者「/var/log//.log」,必須使用絕對路徑 |
sincedb_path | string | 無 | 文件讀取記錄,必須指定一個文件而不是目錄,文件中保存沒個被監控的文件等當前inode和byteoffset,默認存放位置「$HOME/.sincedb*」 |
sincedb_write_interval | number | 15 | 間隔多少秒寫一次sincedb文件 |
start_position | 「beginning」,「end」 | 」 end」 | 從文件等開頭仍是結尾讀取文件內容,默認是結尾,若是須要導入文件中的老數據,能夠設置爲「beginning」,該選項只在第一次啓動logstash時有效,若是文件已經存在於sincedb的記錄內,則此配置無效 |
stat_interval | number | 1 | 間隔多少秒檢查一下文件是否被修改,加大此參數將下降系統負載,可是增長了發現新日誌的間隔時間 |
tags | array | 無 | 能夠在Event中增長標籤,以便於在後續的處理流程中使用 |
type | string | Event的type字段,若是採用elasticsearch作store,在默認狀況下將做爲elasticsearch的type |
TCP插件有兩種工做模式,「Client」和「Server」,分別用於發送網絡數據和監聽網絡數據。
配置事例
tcp { port => 41414 }
參數名稱 | 類型 | 默認值 | 描述信息 |
---|---|---|---|
add_field | |||
codec | |||
enable_metric | |||
host | |||
id | |||
mode | 「server」、「client」 | 「server」 | 「server」監聽「client」的鏈接請求,「client」鏈接「server」 |
port | number | 無 | 必須設置項,「server」模式時指定監聽端口,「client」模式指定鏈接端口 |
proxy_protocol | boolean | false | Proxyprotocol support, only v1 is supported at this time |
ssl_cert | |||
ssl_enable | |||
ssl_extra_chain_certs | |||
ssl_key | |||
ssl_key_passphrase | |||
ssl_verify | |||
tags | |||
type |
用於讀取Redis中緩存的數據信息。
配置事例
input { redis { host => "127.0.0.1" port => 6379 data_type => "list" key => "logstash-list" } }
參數名稱 | 類型 | 默認值 | 描述信息 |
---|---|---|---|
add_field | |||
batch_count | number | 125 | 使用redis的batch特性,須要redis2.6.0或者更新的版本 |
codec | |||
data_type | list,channel, pattern_channel | 無 | 必須設置項,根據設置不一樣,訂閱redis使用不一樣的命令,依次是:BLPOP、SUBSCRIBE、PSUBSCRIBE |
db | number | 0 | 指定使用的redis數據庫 |
enable_metric | |||
host | string | 127.0.0.1 | redis服務地址 |
id | |||
key | string | 無 | 必須設置項,reidslist或者channel的key名稱 |
password | string | 無 | redis密碼 |
port | number | 6379 | redis鏈接端口號 |
tags | |||
threads | number | 1 | |
timeout | number | 5 | redis服務鏈接超時時間,單位:秒 |
注意:
data_type 須要注意的是「channel」和「pattern_channel」是廣播類型,相同的數據會同時發送給訂閱了該channel的logstash,也就是說在logstash集羣環境下會出現數據重複,集羣中的每個節點都將收到一樣的數據,可是在單節點狀況下,「pattern_channel」能夠同時定於知足pattern的多個key
用於讀取Kafka中推送的主題數據信息。
配置事例
input { kafka { bootstrap_servers => "kafka-01:9092,kafka-02:9092,kafka-03:9092" topics_pattern => "elk-.*" consumer_threads => 5 decorate_events => true codec => "json" auto_offset_reset => "latest" group_id => "logstash1"##logstash 集羣需相同 } }
參數名稱 | 類型 | 默認值 | 描述信息 |
---|---|---|---|
bootstrap_servers | string | localhost:9092 | Kafka列表,用於創建與集羣的初始鏈接 |
topics_pattern | string | 要訂閱的主題正則表達式模式。使用此配置時,主題配置將被忽略。 | |
consumer_threads | number | 併發線程數,理想狀況下,您應該擁有與分區數量同樣多的線程 | |
decorate_events | string | none | 可接受的值爲:none/basic/extended/false |
codec | codec | plain | 用於輸入數據的編解碼器 |
auto_offset_reset | string | 當Kafka初始偏移量 | |
group_id | String | logstash | 該消費者所屬的組的標識符 |
注意:
豐富的過濾器插件的是 logstash威力如此強大的重要因素,過濾器插件主要處理流經當前Logstash的事件信息,能夠添加字段、移除字段、轉換字段類型,經過正則表達式切分數據等,也能夠根據條件判斷來進行不一樣的數據處理方式,詳情查看官網文檔-經常使用過濾插件
grok 是Logstash中將非結構化數據解析成結構化數據以便於查詢的最好工具,很是適合解析syslog logs,apache log, mysql log,以及一些其餘的web log
例如:
Event的內容爲「[debug] 127.0.0.1 - test log content」,匹配%{IP:client}將得到「client: 127.0.0.1」的結果,前提安裝了IP表達式;若是你在捕獲數據時想進行數據類型轉換可使用%{NUMBER:num:int}這種語法,默認狀況下,全部的返回結果都是string類型,當前Logstash所支持的轉換類型僅有「int」和「float」;
一個稍微完整一點的事例:
配置文件內容:
input { file { path => "/var/log/http.log" } } filter { grok { match => {"message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"} } }
輸出結果:
client: 55.3.244.1 method: GET request: /index.html bytes: 15824 duration: 0.043
語法:(?<field_name>the pattern here)
舉例:捕獲10或11和長度的十六進制queue_id可使用表達式(?<queue_id>[0-9A-F]{10,11})
安裝自定義表達式
與預約義表達式相同,你也能夠將自定義的表達式配置到Logstash中,而後就能夠像於定義的表達式同樣使用;如下是操做步驟說明:
二、在文件「extra」中添加表達式,格式:patternName regexp,名稱與表達式之間用空格隔開便可,以下:
# contents of ./patterns/postfix: POSTFIX_QUEUEID [0-9A-F]{10,11}
<1>日誌內容
Jan 1 06:25:43 mailserver14 postfix/cleanup[21403]: BEF25A72965: message-id=<20130101142543.5828399CCAF@mailserver14.example.com>
<2>Logstash配置
filter { grok { patterns_dir => ["./patterns"] match => { "message" => "%{SYSLOGBASE} %{POSTFIX_QUEUEID:queue_id}: %{GREEDYDATA:syslog_message}" } } }
<3>運行結果
timestamp: Jan 1 06:25:43 logsource: mailserver14 program: postfix/cleanup pid: 21403 queue_id: BEF25A72965
參數名稱 | 類型 | 默認值 | 描述信息 |
---|---|---|---|
add_field | |||
add_tag | |||
break_on_match | boolean | true | match字段存在多個pattern時,當第一個匹配成功後結束後面的匹配,若是想匹配全部的pattern,將此參數設置爲false |
enable_metric | |||
id | |||
keep_empty_captures | boolean | false | 若是爲true,捕獲失敗的字段獎設置爲空值 |
match | array | {} | 設置pattern數組: match=> {「message」 => [「Duration: %{NUMBER:duration}」,」Speed: %{NUMBER:speed}」]} |
named_captures_only | boolean | true | If true, only store named captures from grok. |
overwrite | array | [] | 覆蓋字段內容: match=> { 「message」 => 「%{SYSLOGBASE} %{DATA:message}」 } overwrite=> [ 「message」 ] |
patterns_dir | array | [] | 指定自定義的pattern文件存放目錄,Logstash在啓動時會讀取文件夾內patterns_files_glob 匹配的全部文件內容 |
patterns_files_glob | string | 「*」 | 用於匹配patterns_dir中的文件 |
periodic_flush | boolean | false | 按期調用filter的flush方法 |
remove_field | array | [] | 從Event中刪除任意字段: remove_field=> [ 「foo_%{somefield}」 ] |
remove_tag | array | [] | 刪除「tags」中的值: remove_tag=> [ 「foo_%{somefield}」 ] |
tag_on_failure | array | [「_grokparsefailure」] | 當沒有匹配成功時,將此array添加到「tags」字段內 |
tag_on_timeout | string | 「_groktimeout」 | 當匹配超時時,將此內容添加到「tags」字段內 |
timeout_millis | number | 30000 | 設置單個match到超時時間,單位:毫秒,若是設置爲0,則不啓用超時設置 |
該插件用於時間字段的格式轉換,好比將「Apr 17 09:32:01」(MMM dd HH:mm:ss)轉換爲「MM-dd HH:mm:ss」。並且一般狀況下,Logstash會爲自動給Event打上時間戳,可是這個時間戳是Event的處理時間(主要是input接收數據的時間),和日誌記錄時間會存在誤差(主要緣由是buffer),咱們可使用此插件用日誌發生時間替換掉默認是時間戳的值。
參數名稱 | 類型 | 默認值 | 描述信息 |
---|---|---|---|
add_field | |||
add_tag | |||
enable_metric | |||
id | |||
locale | |||
match | array | [] | 時間字段匹配,可自定多種格式,直到匹配到或匹配結束 |
periodic_flush | |||
remove_field | |||
remove_tag | |||
tag_on_failure | |||
target | string | 「@timestamp」 | 指定match匹配而且轉換爲date類型的存儲位置(字段),默認覆蓋到「@timestamp」 |
timezone | string | 無 | 指定時間格式化的時區 |
注意:
match的格式:時間字段匹配,可自定多種格式,直到匹配到或匹配結束,格式: [ field,formats…],如:match=>[ 「logdate」,「MMM dd yyyy HH:mm:ss」,「MMM d yyyy HH:mm:ss」,「ISO8601」]
mutate 插件是 Logstash另外一個重要插件。它提供了豐富的基礎類型數據處理能力。能夠重命名,刪除,替換和修改事件中的字段。
參數名稱 | 類型 | 默認值 | 描述信息 |
---|---|---|---|
add_field | |||
add_tag | |||
convert | hash | 無 | 將指定字段轉換爲指定類型,字段內容是數組,則轉換全部數組元素,若是字段內容是hash,則不作任何處理,目前支持的轉換類型包括:integer,float, string, and boolean.例如: convert=> { 「fieldname」 => 「integer」 } |
enable_metric | |||
gsub | array | 無 | 相似replace方法,使用指定內容替換掉目標字符串的現有內容,前提是目標字段必須是字符串,不然不作任何處理,例如:[ 「fieldname」, 「/」, 「「, 「fieldname2」, 「[\?#-]」, 「.」],解釋:使用「」替換掉「fieldname」中的全部「/」,使用「.」替換掉「fieldname2」中的全部「\」「?」、「#」和「-」 |
id | |||
join | hash | 無 | 使用指定的符號將array字段的每一個元素鏈接起來,對非array字段無效。例如: 使用「,」將array字段「fieldname」的每個元素鏈接成一個字符串: join=> { 「fieldname」 => 「,」 } |
lowercase | array | 無 | 將自定的字段值轉換爲小寫 |
merge | hash | 無 | 合併兩個array或者hash,若是是字符串,將自動轉換爲一個單元素數組;將一個array和一個hash合併。例如: 將」added_field」合併到」dest_field」: merge=> { 「dest_field」 => 「added_field」 } |
periodic_flush | |||
remove_field | |||
remove_tag | |||
rename | hash | 無 | 修改一個或者多個字段的名稱。例如: 將」HOSTORIP」更名爲」client_ip」: rename=> { 「HOSTORIP」 => 「client_ip」 } |
replace | hash | 無 | 使用新值完整的替換掉指定字段的原內容,支持變量引用。例如: 使用字段「source_host」的內容拼接上字符串「: My new message」以後的結果替換「message」的值: replace=> { 「message」 => 「%{source_host}: My new message」 } |
split | hash | 無 | 按照自定的分隔符將字符串字段拆分紅array字段,只能做用於string類型的字段。例如: 將「fieldname」的內容按照「,」拆分紅數組: split=> { 「fieldname」 => 「,」 } |
strip | array | 無 | 去掉字段內容兩頭的空白字符。例如: 去掉「field1」和「field2」兩頭的空格: strip=> [「field1」, 「field2」] |
update | hash | 無 | 更新現有字段的內容,例如: 將「sample」字段的內容更新爲「Mynew message」: update=> { 「sample」 => 「My new message」 } |
uppercase | array | 無 | 將字符串轉換爲大寫 |
JSON插件用於解碼JSON格式的字符串,通常是一堆日誌信息中,部分是JSON格式,部分不是的狀況下
json { source => ... }
事例配置,message是JSON格式的字符串:"{\"uid\":3081609001,\"type\":\"signal\"}"
filter { json { source => "message" target => "jsoncontent" } }
輸出結果:
{ "@version": "1", "@timestamp": "2014-11-18T08:11:33.000Z", "host": "web121.mweibo.tc.sinanode.com", "message": "{\"uid\":3081609001,\"type\":\"signal\"}", "jsoncontent": { "uid": 3081609001, "type": "signal" } }
若是從事例配置中刪除target
,輸出結果以下:
{ "@version": "1", "@timestamp": "2014-11-18T08:11:33.000Z", "host": "web121.mweibo.tc.sinanode.com", "message": "{\"uid\":3081609001,\"type\":\"signal\"}", "uid": 3081609001, "type": "signal" }
參數名稱 | 類型 | 默認值 | 描述信息 |
---|---|---|---|
add_field | |||
add_tag | |||
enable_metric | |||
id | |||
periodic_flush | |||
remove_field | |||
remove_tag | |||
skip_on_invalid_json | boolean | false | 是否跳過驗證不經過的JSON |
source | string | 無 | 必須設置項,指定須要解碼的JSON字符串字段 |
tag_on_failure | |||
target | string | 無 | 解析以後的JSON對象所在的字段名稱,若是沒有,JSON對象的全部字段將掛在根節點下 |
用於查詢Elasticsearch中的事件,可將查詢結果應用於當前事件中
參數名稱 | 類型 | 默認值 | 描述信息 |
---|---|---|---|
add_field | |||
add_tag | |||
ca_file | string | 無 | SSL Certificate Authority file path |
enable_sort | boolean | true | 是否對結果進行排序 |
fields | array | {} | 從老事件中複製字段到新事件中,老事件來源於elasticsearch(用於查詢更新) |
hosts | array | [「localhost:9200」] | elasticsearch服務列表 |
index | string | 「」 | 用逗號分隔的elasticsearch索引列表,若是要操做全部全部使用「_all」或者「」,保存數據到elasticsearch時,若是索引不存在會自動以此建立 |
password | string | 無 | 密碼 |
periodic_flush | |||
query | string | 無 | 查詢elasticsearch的查詢字符串 |
remove_field | |||
remove_tag | |||
result_size | number | 1 | 查詢elasticsearch時,返回結果的數量 |
sort | string | 「@timestamp:desc」 | 逗號分隔的「:」列表,用於查詢結果排序 |
ssl | boolean | false | SSL |
tag_on_failure | |||
user | string | 無 | 用戶名 |
用於將事件信息寫入到Elasticsearch中,官方推薦插件,ELK必備插件
output { elasticsearch { hosts => ["127.0.0.1:9200"] index => "filebeat-%{type}-%{+yyyy.MM.dd}" template_overwrite => true } }
參數名稱 | 類型 | 默認值 | 描述信息 |
---|---|---|---|
absolute_healthcheck_path | boolean | false | 當配置了「healthcheck_path」時,決定elasticsearch健康檢查URL是否按照絕對路徑配置。例如: elasticsearch訪問路徑爲:」http://localhost:9200/es「,「h... 當前參數爲true時的訪問路徑爲:」http://localhost:9200/es/heal... 當前參數爲false時的訪問路徑爲:」http://localhost:9200/health」 |
absolute_sniffing_path | boolean | false | 當配置了「sniffing_path」時,決定elasticsearch的sniffing訪問路徑配置。例如: elasticsearch訪問路徑爲:「http://localhost:9200/es」,「s... 當前參數爲true時的訪問路徑爲:「http://localhost:9200/es/_sni... 當前參數爲false時的訪問路徑爲:「http://localhost:9200/_sniffing」 |
action | string | 「index」 | 對elasticsearch的操做類型,可用的操做類型: index:索引Logstash事件數據到elasticsearch; delete:根據id刪除文檔,id必須指定; delete:根據id刪除文檔,id必須指定; update:根據id更新文檔 |
cacert | string | 無 | .cer或者.pem證書文件路徑,使用證書進行elasticsearch認證 |
codec | |||
doc_as_upsert | boolean | false | 使update啓用upsert模式,即文檔不存在時建立新文檔 |
document_id | string | 無 | elasticsearch中的文檔id,用來覆蓋已經保存到elasticsearch中的文檔 |
document_type | string | 無 | 指定存入elasticsearch中的文檔的type,沒有指定的狀況下會使用Event信息中的「type」字段的值做爲elasticsearch的type |
enable_metric | |||
failure_type_logging_whitelist | array | [] | elasricsearch報錯白名單,白名單的異常信息不會被記入logstash的log中,好比你想忽略掉全部的「document_already_exists_exception」異常 |
flush_size | |||
healthcheck_path | string | 「/」 | elasricsearch檢查狀態檢查路徑 |
hosts | string | [//127.0.0.1] | elasticsearch服務地址列表,若是配置多個將啓用負載均衡 |
id | |||
idle_flush_time | number | 1 | 間隔多長時間將數據輸出到elasticsearch中一次,主要用於較慢的事件 |
index | string | 「logstash-%{+YYYY.MM.dd}」 | 指定elasticsearch存儲數據時的全部名稱,支持變量引用,好比你能夠按天建立索引,方便刪除歷史數據或者查詢制定範圍內的數據 |
keystore | string | 無 | 用於指定密鑰庫路徑,能夠是.jks或者.p12 |
keystore_password | string | 無 | 密鑰庫密碼 |
manage_template | boolean | true | 是否啓用elasticsearch模版,Logstash自帶一個模版,可是隻有名稱匹配「logstash-*」的索引纔會應用該默版 |
parameters | hash | 無 | 添加到elasticsearch URL後面的參數鍵值對 |
parent | string | 「nil」 | 爲文檔子節點指定父節點的id |
password | string | 無 | elasticsearch集羣訪問密碼 |
path | string | 無 | 當設置了elasticsearch代理時用此參數從定向HTTP API,若是「hosts」中已經包含此路徑,則不須要設置 |
pipeline | string | 「nil」 | 設置Event管道 |
pool_max | number | 1000 | elasticsearch最大鏈接數 |
pool_max_per_route | number | 100 | 每一個「endpoint」的最大鏈接數 |
proxy | string | 無 | 代理URL |
resurrect_delay | number | 5 | 檢查掛掉的「endpoint」是否恢復正常的頻率 |
retry_initial_interval | number | 2 | 設置批量重試的時間間隔,重試到 「retry_max_interval」次 |
retry_max_interval | number | 64 | Setmax interval in seconds between bulk retries. |
retry_on_conflict | number | 1 | Thenumber of times Elasticsearch should internally retry an update/upserteddocument |
routing | string | 無 | 指定Event路由 |
script | string | 「」 | 設置「scriptedupdate」模式下的腳本名稱 |
script_lang | string | 「painless」 | 設置腳本語言 |
script_type | 「inline」、「indexed」、 「file」 | [「inline」] | Definethe type of script referenced by 「script」 variable inline :」script」 contains inline script indexed : 「script」 containsthe name of script directly indexed in elasticsearch file : 「script」contains the name of script stored in elasticseach’s config directory |
script_var_name | string | 「event」 | Setvariable name passed to script (scripted update) |
scripted_upsert | boolean | false | ifenabled, script is in charge of creating non-existent document (scriptedupdate) |
sniffing | |||
sniffing_delay | |||
sniffing_path | |||
ssl | |||
ssl_certificate_verification | |||
template | string | 無 | 設置自定義的默版存放路徑 |
template_name | string | 「logstash」 | 設置使用的默版名稱 |
template_overwrite | boolean | false | 是否始終覆蓋現有模版 |
timeout | number | 60 | 網絡超時時間 |
truststore | string | 無 | 「:truststore」或者「:cacert」證書庫路徑 |
truststore_password | string | 無 | 證書庫密碼 |
upsert | string | 「」 | Setupsert content for update mode.s Create a new document with this parameter asjson string if document_id doesn’texists |
user | string | 「」 | elasticsearch用戶名 |
validate_after_inactivity | number | 10000 | 間隔多長時間保持鏈接可用 |
version | string | 無 | 存入elasticsearch的文檔的版本號 |
version_type | 「internal」、「external」、 「external_gt」、 「external_gte」、「force」 | 無 | |
workers | string | 1 | whenwe no longer support the :legacy type This is hacky, but it can only be herne |
用於將Event寫入Redis中進行緩存,一般狀況下Logstash的Filter處理比較吃系統資源,複雜的Filter處理會很是耗時,若是Event產生速度比較快,可使用Redis做爲buffer使用
output { redis { host => "127.0.0.1" port => 6379 data_type => "list" key => "logstash-list" } }
參數名稱 | 類型 | 默認值 | 描述信息 |
---|---|---|---|
batch | boolean | false | 是否啓用redis的batch模式,僅在data_type=」list」時有效 |
batch_events | number | 50 | batch大小,batch達到此大小時執行「RPUSH」 |
batch_timeout | number | 5 | batch超時時間,超過這個時間執行「RPUSH」 |
codec | |||
congestion_interval | number | 1 | 間隔多長時間檢查阻塞,若是設置爲0,則沒個Event檢查一次 |
congestion_threshold | number | 0 | |
data_type | 「list」、「channel」 | 無 | 存儲在redis中的數據類型,若是使用「list」,將採用「RPUSH」操做,若是是「channel」,將採用「PUBLISH」操做 |
db | number | 0 | 使用的redis數據庫編號 |
enable_metric | |||
host | array | [「127.0.0.1」] | redis服務列表,若是配置多個,將隨機選擇一個,若是當前的redis服務不可用,將選擇下一個 |
id | |||
key | string | 無 | Thename of a Redis list or channel. Dynamic names are valid here, forexample logstash-%{type}. |
password | string | 無 | redis服務密碼 |
port | number | 6379 | redis服務監聽端口 |
reconnect_interval | number | 1 | 鏈接失敗時的重連間隔 |
shuffle_hosts | boolean | true | Shufflethe host list during Logstash startup. |
timeout | number | 5 | redis鏈接超時時間 |
workers | number | 1 | whenwe no longer support the :legacy type This is hacky, but it can only be herne |
用於將Event輸出到文件內
output { file { path => ... codec => line { format => "custom format: %{message}"} } }
參數名稱 | 類型 | 默認值 | 描述信息 |
---|---|---|---|
codec | |||
create_if_deleted | boolean | true | 若是目標文件被刪除,則在寫入事件時建立新文件 |
dir_mode | number | -1 | 設置目錄的訪問權限,若是爲「-1」,使用操做系統默認的訪問權限 |
enable_metric | |||
file_mode | number | -1 | 設置文件的訪問權限,若是爲「-1」,使用操做系統默認的訪問權限 |
filename_failure | string | 「_filepath_failures」 | 若是指定的文件路徑無效,這會在目錄內建立這個文件並記錄數據 |
flush_interval | number | 2 | flush間隔 |
gzip | boolean | false | 是否啓用gzip壓縮 |
id | |||
path | string | 無 | 必須設置項,文件輸出路徑,如:path =>」./test-%{+YYYY-MM-dd}.txt」 |
workers | string | 1 | whenwe no longer support the :legacy type This is hacky, but it can only be herne |
用於將Event輸出到Kafka指定的Topic中,官網Kafka詳情配置
output { kafka { bootstrap_servers => "localhost:9092" topic_id => "test" compression_type => "gzip" } }
參數名稱 | 類型 | 默認值 | 描述信息 |
---|---|---|---|
bootstrap_servers | string | Kafka集羣信息,格式爲 host1:port1,host2:port2 | |
topic_id | string | 產生消息的主題 | |
compression_type | String | none | 生產者生成的全部數據的壓縮類型。默認爲無(即無壓縮)。有效值爲 none、gzip、snappy 或 lz4。 |
batch_size | number | 16384 | 配置以字節爲單位控制默認批處理大小 |
buffer_memory | number | 33554432(32MB) | 生產者可用於緩衝等待發送到服務器的記錄的總內存字節數 |
max_request_size | number | 1048576(1MB) | 請求的最大大小 |
flush_interval | number | 2 | flush間隔 |
gzip | boolean | false | 是否啓用gzip壓縮 |
id | |||
path | string | 無 | 必須設置項,文件輸出路徑,如:path =>」./test-%{+YYYY-MM-dd}.txt」 |
workers | string | 1 | whenwe no longer support the :legacy type This is hacky, but it can only be herne |
直接輸入預約義好的 JSON 數據,這樣就能夠省略掉 filter/grok 配置
配置事例
json { }
經常使用配置參數
參數名稱 | 類型 | 默認值 | 描述信息 |
---|---|---|---|
charset | string | 「UTF-8」 | 字符集 |
enable_metric | |||
id |
input { beats { port => 5044 } } output { redis { host => "127.0.0.1" port => 6379 data_type => "list" key => "logstash-list" } }
input { redis { host => "127.0.0.1" port => 6379 data_type => "list" key => "logstash-list" } } filter { if [type] == "application" { grok { match => ["message", "(?m)-(?<systemName>.+?):(?<logTime>(?>\d\d){1,2}-(?:0?[1-9]|1[0-2])-(?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9]) (?:2[0123]|[01]?[0-9]):(?:[0-5][0-9]):(?:(?:[0-5][0-9]|60)(?:[:.,][0-9]+)?)) \[(?<level>(\b\w+\b)) *\] (?<thread>(\b\w+\b)) \((?<point>.*?)\) - (?<content>.*)"] } date { match => ["logTime", "yyyy-MM-dd HH:mm:ss,SSS"] } json { source => "message" } date { match => ["timestamp", "yyyy-MM-dd HH:mm:ss,SSS"] } } if [type] == "application_bizz" { json { source => "message" } date { match => ["timestamp", "yyyy-MM-dd HH:mm:ss,SSS"] } } mutate { remove_field => ["@version", "beat", "logTime"] } } output { stdout{ } elasticsearch { hosts => ["127.0.0.1:9200"] index => "filebeat-%{type}-%{+yyyy.MM.dd}" document_type => "%{documentType}" template_overwrite => true } }
架構:logstash採集、處理、轉發到elasticsearch存儲,在kibana進行展現
特色:這種結構由於須要在各個服務器上部署 Logstash,而它比較消耗 CPU 和內存資源,因此比較適合計算資源豐富的服務器,不然容易形成服務器性能降低,甚至可能致使沒法正常工做。
消息模式:Beats 還不支持輸出到消息隊列(新版本除外:5.0版本及以上),因此在消息隊列先後兩端只能是 Logstash 實例。logstash從各個數據源蒐集數據,不通過任何處理轉換僅轉發出到消息隊列(kafka、redis、rabbitMQ等),後logstash從消息隊列取數據進行轉換分析過濾,輸出到elasticsearch,並在kibana進行圖形化展現
架構(Logstash進行日誌解析所在服務器性能各方面必需要足夠好):
模式特色:這種架構適合於日誌規模比較龐大的狀況。但因爲 Logstash 日誌解析節點和 Elasticsearch 的負荷比較重,可將他們配置爲集羣模式,以分擔負荷。引入消息隊列,均衡了網絡傳輸,從而下降了網絡閉塞,尤爲是丟失數據的可能性,但依然存在 Logstash 佔用系統資源過多的問題
工做流程:Filebeat採集—> logstash轉發到kafka—> logstash處理從kafka緩存的數據進行分析—> 輸出到es—> 顯示在kibana
首先從結構對比,咱們會驚人的發現,二者是多麼的類似!Logstash的Shipper、Broker、Indexer分別和Flume的Source、Channel、Sink各自對應!只不過是Logstash集成了,Broker能夠不須要,而Flume須要單獨配置,且缺一不可,但這再一次說明了計算機的設計思想都是通用的!只是實現方式會不一樣而已。
從程序員的角度來講,上文也提到過了,Flume是真的很繁瑣,你須要分別做source、channel、sink的手工配置,並且涉及到複雜的數據採集環境,你可能還要作多個配置,這在上面提過了,反過來講Logstash的配置就很是簡潔清晰,三個部分的屬性都定義好了,程序員本身去選擇就行,就算沒有,也能夠自行開發插件,很是方便。固然了,Flume的插件也不少,但Channel就只有內存和文件這兩種(其實如今不止了,但經常使用的也就兩種)。讀者能夠看得出來,二者其實配置都是很是靈活的,只不過看場景取捨罷了。
其實從做者和歷史背景來看,二者最初的設計目的就不太同樣。Flume自己最初設計的目的是爲了把數據傳入HDFS中(並非爲了採集日誌而設計,這和Logstash有根本的區別),因此理所應當側重於數據的傳輸,程序員要很是清楚整個數據的路由,而且比Logstash還多了一個可靠性策略,上文中的channel就是用於持久化目的,數據除非確認傳輸到下一位置了,不然不會刪除,這一步是經過事務來控制的,這樣的設計使得可靠性很是好。相反,Logstash則明顯側重對數據的預處理,由於日誌的字段須要大量的預處理,爲解析作鋪墊。
回過來看我當初爲何先講Logstash而後講Flume?這裏面有幾個考慮
最後總結下來,咱們能夠這麼理解他們的區別:
Logstash就像是買來的臺式機,主板、電源、硬盤,機箱(Logstash)把裏面的東西所有裝好了,你能夠直接用,固然也能夠本身組裝修改;
Flume就像提供給你一套完整的主板,電源、硬盤,Flume沒有打包,只是像說明書同樣指導你如何組裝,才能運行的起來。