Logstash-數據流引擎

Logstash-數據流引擎

Logstash

做者 | WenasWeihtml

一 Logstash

Logstash是具備實時流水線功能的開源數據收集引擎。Logstash能夠動態統一來自不一樣來源的數據,並將數據標準化到您選擇的目標位置。清除全部數據並使其民主化,以用於各類高級下游分析和可視化用例。java

1.1 Logstash簡介

Logstash 是一個數據流引擎:node

  • 它是用於數據物流的開源流式 ETL(Extract-Transform-Load)引擎
  • 在幾分鐘內創建數據流管道
  • 具備水平可擴展及韌性且具備自適應緩衝
  • 不可知的數據源
  • 具備200多個集成和處理器的插件生態系統
  • 使用 Elastic Stack 監視和管理部署

官方介紹:Logstash is an open source data collection engine with real-time pipelining capabilities。簡單來講logstash就是一根具有實時數據傳輸能力的管道,負責將數據信息從管道的輸入端傳輸到管道的輸出端;與此同時這根管道還可讓你根據本身的需求在中間加上濾網,Logstash提供裏不少功能強大的濾網以知足你的各類應用場景。mysql

1.2 數據處理

Logstash 是一個功能強大的工具,可與各類部署集成。 它提供了大量插件,可幫助你解析,豐富,轉換和緩衝來自各類來源的數據。 若是你的數據須要 Beats 中沒有的其餘處理,則須要將 Logstash 添加到部署中。linux

當下最爲流行的數據源:git

數據源

Logstash 能夠攝入日誌,文件,指標或者網路真實數據。通過 Logstash 的處理,變爲可使用的 Web
Apps 能夠消耗的數據,也能夠存儲於數據中心,或變爲其它的流式數據:程序員

  • Logstash 能夠很方便地和 Beats一塊兒合做,這也是被推薦的方法
  • Logstash 也能夠和那些著名的雲廠商的服務一塊兒合做處理它們的數據
  • 它也能夠和最爲一樣的信息消息隊列,好比 redis 或 kafka 一塊兒協做
  • Logstash 也可使用 JDBC 來訪問 RDMS 數據
  • 它也能夠和 IoT 設備一塊兒處理它們的數據
  • Logstash 不只僅能夠把數據傳送到 Elasticsearch,並且它還能夠把數據發送至不少其它的目的地,並做爲它們的輸入源作進一步的處理

二 Logstash系統架構

Logstash 包含3個主要部分: 輸入(inputs),過濾器(filters)和輸出(outputs)github

系統架構

Logstash的事件(logstash將數據流中等每一條數據稱之爲一個event)處理流水線有三個主要角色完成:inputs –> filters –> outputs:web

  • inpust:必須,負責產生事件(Inputs generate events),經常使用:File、syslog、redis、kakfa、beats(如:Filebeats)
  • filters:可選,負責數據處理與轉換(filters modify them),經常使用:grok、mutate、drop、clone、geoip
  • outpus:必須,負責數據輸出(outputs ship them elsewhere),經常使用:elasticsearch、file、graphite、kakfa、statsd

數據流

三 Logstash安裝

3.1 環境清單
  • 操做系統:Linux #56-Ubuntu SMP Tue Jun 4 22:49:08 UTC 2019 x86_64
  • Logstash版本:logstash-6.2.4
  • Jdk版本:1.8.0_152
3.2 Linux安裝JDK
3.2.1 解壓縮並移動到指定目錄(約定的目錄:/usr/local)
(1)解壓縮
tar -zxvf jdk-8u152-linux-x64.tar.gz
(2)建立目錄
mkdir -p /usr/local/java
(3)移動安裝包
mv jdk1.8.0_152/ /usr/local/java/
(4)設置全部者
chown -R root:root /usr/local/java/
3.2.2 配置環境變量
(1)配置系統環境變量
vi /etc/environment
(2)添加以下語句
PATH="/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games"
export JAVA_HOME=/usr/local/java/jdk1.8.0_152
export JRE_HOME=/usr/local/java/jdk1.8.0_152/jre
export CLASSPATH=$CLASSPATH:$JAVA_HOME/lib:$JAVA_HOME/jre/lib
(3)配置用戶環境變量
nano /etc/profile
(4)添加以下語句(必定要放中間)
if [ "$PS1" ]; then
  if [ "$BASH" ] && [ "$BASH" != "/bin/sh" ]; then
    # The file bash.bashrc already sets the default PS1.
    # PS1='\h:\w\$ '
    if [ -f /etc/bash.bashrc ]; then
      . /etc/bash.bashrc
    fi
  else
    if [ "`id -u`" -eq 0 ]; then
      PS1='# '
    else
      PS1='$ '
    fi
  fi
fi

export JAVA_HOME=/usr/local/java/jdk1.8.0_152
export JRE_HOME=/usr/local/java/jdk1.8.0_152/jre
export CLASSPATH=$CLASSPATH:$JAVA_HOME/lib:$JAVA_HOME/jre/lib
export PATH=$JAVA_HOME/bin:$JAVA_HOME/jre/bin:$PATH:$HOME/bin

if [ -d /etc/profile.d ]; then
  for i in /etc/profile.d/*.sh; do
    if [ -r $i ]; then
      . $i
    fi
  done
  unset i
fi
(5)使用戶環境變量生效
source /etc/profile
(6)測試是否安裝成功
$ java -version
java version "1.8.0_152"
Java(TM) SE Runtime Environment (build 1.8.0_152-b16)
Java HotSpot(TM) 64-Bit Server VM (build 25.152-b16, mixed mode)
3.3 安裝Logstash
3.3.1 建立安裝目錄
$ sudo mkdir /usr/local/logstash
3.3.2 下載Logstash安裝文件
$ wget -P /usr/local/logstash https://artifacts.elastic.co/downloads/logstash/logstash-6.2.4.tar.gz
3.3.2 解壓縮安裝文件
$ cd /usr/local/logstash/
$ sudo tar -zxvf logstash-6.2.4.tar.gz
3.3.3 測試安裝是否成功

測試: 快速啓動,標準輸入輸出做爲input和output,沒有filter正則表達式

$ cd logstash-6.2.4/
$ ./bin/logstash -e 'input { stdin {} } output { stdout {} }'

Sending Logstash's logs to /usr/local/logstash/logstash-6.2.4/logs which is now configured via log4j2.properties
[2021-05-27T00:22:28,729][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"fb_apache", :directory=>"/usr/local/logstash/logstash-6.2.4/modules/fb_apache/configuration"}
[2021-05-27T00:22:28,804][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"netflow", :directory=>"/usr/local/logstash/logstash-6.2.4/modules/netflow/configuration"}
[2021-05-27T00:22:29,827][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2021-05-27T00:22:30,979][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"6.2.4"}
[2021-05-27T00:22:31,821][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
[2021-05-27T00:22:36,463][INFO ][logstash.pipeline        ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>1, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50}
[2021-05-27T00:22:36,690][INFO ][logstash.pipeline        ] Pipeline started successfully {:pipeline_id=>"main", :thread=>"#<Thread:0x55a5abea run>"}
The stdin plugin is now waiting for input:
[2021-05-27T00:22:36,853][INFO ][logstash.agent           ] Pipelines running {:count=>1, :pipelines=>["main"]}

## 此時命令窗口停留在等待輸入狀態,鍵盤鍵入任意字符 ##

hello world

## 下方是Logstash輸出到效果 ##

{
    "@timestamp" => 2021-05-26T16:22:52.527Z,
          "host" => "*******",
       "message" => "hello world",
      "@version" => "1"
}

四 Logstash參數與配置

4.1 經常使用啓動參數
參數 說明 舉例
-e 當即執行,使用命令行裏的配置參數啓動實例 ./bin/logstash -e ‘input {stdin {}} output {stdout {}}’
-f 指定啓動實例的配置文件 ./bin/logstash -f config/test.conf
-t 測試配置文件的正確性 ./bin/logstash-f config/test.conf -t
-l 指定日誌文件名稱 ./bin/logstash-f config/test.conf -l logs/test.log
-w 指定filter線程數量,默認線程數是5 ./bin/logstash-f config/test.conf -w 8
4.2 配置文件結構及語法
(1)區段

Logstash經過{}來定義區域,區域內能夠定義插件,一個區域內能夠定義多個插件,以下:

input {
    stdin {
    }
    beats {
        port => 5044
    }
}
(2)數據類型

Logstash僅支持少許的數據類型:

  • Boolean:ssl_enable => true
  • Number:port => 33
  • String:name => 「Hello world」
  • Commonts:# this is a comment
(3)字段引用

Logstash數據流中的數據被稱之爲Event對象,Event以JSON結構構成,Event的屬性被稱之爲字段,若是你像在配置文件中引用這些字段,只須要把字段的名字寫在中括號[]裏就好了,如[type],對於嵌套字段每層字段名稱都寫在[]裏就能夠了,好比:tags;除此以外,對於Logstash的arrag類型支持下標與倒序下表,如:tags[0],tags[-1]。

(4)條件判斷

Logstash支持下面的操做符:

  • equality:==, !=, <, >, <=, >=
  • regexp:=~, !~
  • inclusion:in, not in
  • boolean:and, or, nand, xor
  • unary:!

例如:

if EXPRESSION {
  ...
} else if EXPRESSION {
  ...
} else {
  ...
}
(5)環境變量引用

Logstash支持引用系統環境變量,環境變量不存在時能夠設置默認值,例如:

export TCP_PORT=12345

input {
  tcp {
    port => "${TCP_PORT:54321}"
  }
}
4.3 經常使用輸入插件(Input plugin)

輸入插件包含有如下多種,詳情查看官網文檔-經常使用輸入插件:

  • elasticsearch
  • exec
  • file
  • github
  • http
  • jdbc
  • jms
  • jmx
  • kafka
  • log4j
  • rabbitmq
  • redis
  • tcp
  • udp
  • unix
  • websocket
4.3.1 File讀取插件

文件讀取插件主要用來抓取文件的變化信息,將變化信息封裝成Event進程處理或者傳遞。

  • 配置事例

    input
    file {
      path => ["/var/log/*.log", "/var/log/message"]
      type => "system"
      start_position => "beginning"
    }
    }
  • 經常使用參數
參數名稱 類型 默認值 描述信息
add_field hash {} 用於向Event中添加字段
close_older number 3600 設置文件多久秒內沒有更新就關掉對文件的監聽
codec string 「plain」 輸入數據以後對數據進行解碼
delimiter string 「\n」 文件內容的行分隔符,默認按照行進行Event封裝
discover_interval number 15 間隔多少秒查看一下path匹配對路徑下是否有新文件產生
enable_metric boolean true
exclude array path匹配的文件中指定例外,如:path => 「/var/log/「;exclude =>」.gz」
id string 區分兩個相同類型的插件,好比兩個filter,在使用Monitor API監控是能夠區分,建議設置上ID
ignore_older number 忽略歷史修改,若是設置3600秒,logstash只會發現一小時內被修改過的文件,一小時以前修改的文件的變化不會被讀取,若是再次修改該文件,全部的變化都會被讀取,默認被禁用
max_open_files number logstash能夠同時監控的文件個數(同時打開的file_handles個數),若是你須要處理多於這個數量多文件,可使用「close_older」去關閉一些文件
path array 必須設置項,用於匹配被監控的文件,如「/var/log/.log」或者「/var/log//.log」,必須使用絕對路徑
sincedb_path string 文件讀取記錄,必須指定一個文件而不是目錄,文件中保存沒個被監控的文件等當前inode和byteoffset,默認存放位置「$HOME/.sincedb*」
sincedb_write_interval number 15 間隔多少秒寫一次sincedb文件
start_position 「beginning」,「end」 」 end」 從文件等開頭仍是結尾讀取文件內容,默認是結尾,若是須要導入文件中的老數據,能夠設置爲「beginning」,該選項只在第一次啓動logstash時有效,若是文件已經存在於sincedb的記錄內,則此配置無效
stat_interval number 1 間隔多少秒檢查一下文件是否被修改,加大此參數將下降系統負載,可是增長了發現新日誌的間隔時間
tags array 能夠在Event中增長標籤,以便於在後續的處理流程中使用
type string Event的type字段,若是採用elasticsearch作store,在默認狀況下將做爲elasticsearch的type
4.3.2 TCP監聽插件

TCP插件有兩種工做模式,「Client」和「Server」,分別用於發送網絡數據和監聽網絡數據。

  • 配置事例

    tcp {
      port => 41414
    }
  • 經常使用參數(空 => 同上)
參數名稱 類型 默認值 描述信息
add_field
codec
enable_metric
host
id
mode 「server」、「client」 「server」 「server」監聽「client」的鏈接請求,「client」鏈接「server」
port number 必須設置項,「server」模式時指定監聽端口,「client」模式指定鏈接端口
proxy_protocol boolean false Proxyprotocol support, only v1 is supported at this time
ssl_cert
ssl_enable
ssl_extra_chain_certs
ssl_key
ssl_key_passphrase
ssl_verify
tags
type
4.3.3 Redis讀取插件

用於讀取Redis中緩存的數據信息。

  • 配置事例

    input {
    redis {
      host => "127.0.0.1"
      port => 6379
      data_type => "list"
      key => "logstash-list"
    }
    }
  • 經常使用參數(空 => 同上)
參數名稱 類型 默認值 描述信息
add_field
batch_count number 125 使用redis的batch特性,須要redis2.6.0或者更新的版本
codec
data_type list,channel, pattern_channel 必須設置項,根據設置不一樣,訂閱redis使用不一樣的命令,依次是:BLPOP、SUBSCRIBE、PSUBSCRIBE
db number 0 指定使用的redis數據庫
enable_metric
host string 127.0.0.1 redis服務地址
id
key string 必須設置項,reidslist或者channel的key名稱
password string redis密碼
port number 6379 redis鏈接端口號
tags
threads number 1
timeout number 5 redis服務鏈接超時時間,單位:秒

注意:

data_type 須要注意的是「channel」和「pattern_channel」是廣播類型,相同的數據會同時發送給訂閱了該channel的logstash,也就是說在logstash集羣環境下會出現數據重複,集羣中的每個節點都將收到一樣的數據,可是在單節點狀況下,「pattern_channel」能夠同時定於知足pattern的多個key

4.3.4 Kafka讀取插件

用於讀取Kafka中推送的主題數據信息。

  • 配置事例

    input {
    kafka {
      bootstrap_servers => "kafka-01:9092,kafka-02:9092,kafka-03:9092"
      topics_pattern  => "elk-.*"
      consumer_threads => 5
      decorate_events => true
      codec => "json"
      auto_offset_reset => "latest"
      group_id => "logstash1"##logstash 集羣需相同
    }
    }
  • 經常使用參數:
參數名稱 類型 默認值 描述信息
bootstrap_servers string localhost:9092 Kafka列表,用於創建與集羣的初始鏈接
topics_pattern string 要訂閱的主題正則表達式模式。使用此配置時,主題配置將被忽略。
consumer_threads number 併發線程數,理想狀況下,您應該擁有與分區數量同樣多的線程
decorate_events string none 可接受的值爲:none/basic/extended/false
codec codec plain 用於輸入數據的編解碼器
auto_offset_reset string 當Kafka初始偏移量
group_id String logstash 該消費者所屬的組的標識符

注意:

  • auto_offset_reset: earliest-將偏移量自動重置爲最先的偏移量;latest-自動將偏移量重置爲最新偏移量;none-若是未找到消費者組的先前偏移量,則向消費者拋出異常;anything else-向消費者拋出異常。
  • decorate_events: none:未添加元數據,basic:添加了記錄的屬性,extended:記錄的屬性,添加標題,false:不建議使用的別名 none,true:不建議使用的別名 basic
4.4 經常使用過濾插件(Filter plugin)

豐富的過濾器插件的是 logstash威力如此強大的重要因素,過濾器插件主要處理流經當前Logstash的事件信息,能夠添加字段、移除字段、轉換字段類型,經過正則表達式切分數據等,也能夠根據條件判斷來進行不一樣的數據處理方式,詳情查看官網文檔-經常使用過濾插件

4.4.1 grok正則捕獲

grok 是Logstash中將非結構化數據解析成結構化數據以便於查詢的最好工具,很是適合解析syslog logs,apache log, mysql log,以及一些其餘的web log

(1)預約義表達式調用:
  • Logstash提供120個經常使用正則表達式可供安裝使用,安裝以後你能夠經過名稱調用它們,語法以下:%{SYNTAX:SEMANTIC}
  • SYNTAX:表示已經安裝的正則表達式的名稱
  • SEMANTIC:表示從Event中匹配到的內容的名稱

例如:
Event的內容爲「[debug] 127.0.0.1 - test log content」,匹配%{IP:client}將得到「client: 127.0.0.1」的結果,前提安裝了IP表達式;若是你在捕獲數據時想進行數據類型轉換可使用%{NUMBER:num:int}這種語法,默認狀況下,全部的返回結果都是string類型,當前Logstash所支持的轉換類型僅有「int」和「float」;

一個稍微完整一點的事例:

  • 日誌文件http.log內容:55.3.244.1 GET /index.html 15824 0.043
  • 表達式:%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}
  • 配置文件內容:

    input {
    file {
      path => "/var/log/http.log"
    }
    }
    filter {
    grok {
      match => {"message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"}
    }
    }
  • 輸出結果:

    client: 55.3.244.1
    method: GET
    request: /index.html
    bytes: 15824
    duration: 0.043
(2)自定義表達式調用

語法:(?<field_name>the pattern here)

舉例:捕獲10或11和長度的十六進制queue_id可使用表達式(?<queue_id>[0-9A-F]{10,11})
安裝自定義表達式

與預約義表達式相同,你也能夠將自定義的表達式配置到Logstash中,而後就能夠像於定義的表達式同樣使用;如下是操做步驟說明:

  • 一、在Logstash根目錄下建立文件夾「patterns」,在「patterns」文件夾中建立文件「extra」(文件名稱無所謂,可本身選擇有意義的文件名稱);
  • 二、在文件「extra」中添加表達式,格式:patternName regexp,名稱與表達式之間用空格隔開便可,以下:

    # contents of ./patterns/postfix:
    POSTFIX_QUEUEID [0-9A-F]{10,11}
  • 三、使用自定義的表達式時須要指定「patterns_dir」變量,變量內容指向表達式文件所在的目錄,舉例以下:

<1>日誌內容

Jan  1 06:25:43 mailserver14 postfix/cleanup[21403]: BEF25A72965: message-id=<20130101142543.5828399CCAF@mailserver14.example.com>

<2>Logstash配置

filter {
  grok {
    patterns_dir => ["./patterns"]
    match => { "message" => "%{SYSLOGBASE} %{POSTFIX_QUEUEID:queue_id}: %{GREEDYDATA:syslog_message}" }
  }
}

<3>運行結果

timestamp: Jan 1 06:25:43
logsource: mailserver14
program: postfix/cleanup
pid: 21403
queue_id: BEF25A72965
(3)grok經常使用配置參數(空 => 同上)
參數名稱 類型 默認值 描述信息
add_field
add_tag
break_on_match boolean true match字段存在多個pattern時,當第一個匹配成功後結束後面的匹配,若是想匹配全部的pattern,將此參數設置爲false
enable_metric
id
keep_empty_captures boolean false 若是爲true,捕獲失敗的字段獎設置爲空值
match array {} 設置pattern數組: match=> {「message」 => [「Duration: %{NUMBER:duration}」,」Speed: %{NUMBER:speed}」]}
named_captures_only boolean true If true, only store named captures from grok.
overwrite array [] 覆蓋字段內容: match=> { 「message」 => 「%{SYSLOGBASE} %{DATA:message}」 } overwrite=> [ 「message」 ]
patterns_dir array [] 指定自定義的pattern文件存放目錄,Logstash在啓動時會讀取文件夾內patterns_files_glob 匹配的全部文件內容
patterns_files_glob string 「*」 用於匹配patterns_dir中的文件
periodic_flush boolean false 按期調用filter的flush方法
remove_field array [] 從Event中刪除任意字段: remove_field=> [ 「foo_%{somefield}」 ]
remove_tag array [] 刪除「tags」中的值: remove_tag=> [ 「foo_%{somefield}」 ]
tag_on_failure array [「_grokparsefailure」] 當沒有匹配成功時,將此array添加到「tags」字段內
tag_on_timeout string 「_groktimeout」 當匹配超時時,將此內容添加到「tags」字段內
timeout_millis number 30000 設置單個match到超時時間,單位:毫秒,若是設置爲0,則不啓用超時設置
4.4.2 date時間處理插件

該插件用於時間字段的格式轉換,好比將「Apr 17 09:32:01」(MMM dd HH:mm:ss)轉換爲「MM-dd HH:mm:ss」。並且一般狀況下,Logstash會爲自動給Event打上時間戳,可是這個時間戳是Event的處理時間(主要是input接收數據的時間),和日誌記錄時間會存在誤差(主要緣由是buffer),咱們可使用此插件用日誌發生時間替換掉默認是時間戳的值。

經常使用配置參數(空 => 同上)
參數名稱 類型 默認值 描述信息
add_field
add_tag
enable_metric
id
locale
match array [] 時間字段匹配,可自定多種格式,直到匹配到或匹配結束
periodic_flush
remove_field
remove_tag
tag_on_failure
target string 「@timestamp」 指定match匹配而且轉換爲date類型的存儲位置(字段),默認覆蓋到「@timestamp」
timezone string 指定時間格式化的時區

注意:

match的格式:時間字段匹配,可自定多種格式,直到匹配到或匹配結束,格式: [ field,formats…],如:match=>[ 「logdate」,「MMM dd yyyy HH:mm:ss」,「MMM d yyyy HH:mm:ss」,「ISO8601」]

4.4.3 mutate數據修改插件

mutate 插件是 Logstash另外一個重要插件。它提供了豐富的基礎類型數據處理能力。能夠重命名,刪除,替換和修改事件中的字段。

經常使用配置參數(空 => 同上)
參數名稱 類型 默認值 描述信息
add_field
add_tag
convert hash 將指定字段轉換爲指定類型,字段內容是數組,則轉換全部數組元素,若是字段內容是hash,則不作任何處理,目前支持的轉換類型包括:integer,float, string, and boolean.例如: convert=> { 「fieldname」 => 「integer」 }
enable_metric
gsub array 相似replace方法,使用指定內容替換掉目標字符串的現有內容,前提是目標字段必須是字符串,不然不作任何處理,例如:[ 「fieldname」, 「/」, 「「, 「fieldname2」, 「[\?#-]」, 「.」],解釋:使用「」替換掉「fieldname」中的全部「/」,使用「.」替換掉「fieldname2」中的全部「\」「?」、「#」和「-」
id
join hash 使用指定的符號將array字段的每一個元素鏈接起來,對非array字段無效。例如: 使用「,」將array字段「fieldname」的每個元素鏈接成一個字符串: join=> { 「fieldname」 => 「,」 }
lowercase array 將自定的字段值轉換爲小寫
merge hash 合併兩個array或者hash,若是是字符串,將自動轉換爲一個單元素數組;將一個array和一個hash合併。例如: 將」added_field」合併到」dest_field」: merge=> { 「dest_field」 => 「added_field」 }
periodic_flush
remove_field
remove_tag
rename hash 修改一個或者多個字段的名稱。例如: 將」HOSTORIP」更名爲」client_ip」: rename=> { 「HOSTORIP」 => 「client_ip」 }
replace hash 使用新值完整的替換掉指定字段的原內容,支持變量引用。例如: 使用字段「source_host」的內容拼接上字符串「: My new message」以後的結果替換「message」的值: replace=> { 「message」 => 「%{source_host}: My new message」 }
split hash 按照自定的分隔符將字符串字段拆分紅array字段,只能做用於string類型的字段。例如: 將「fieldname」的內容按照「,」拆分紅數組: split=> { 「fieldname」 => 「,」 }
strip array 去掉字段內容兩頭的空白字符。例如: 去掉「field1」和「field2」兩頭的空格: strip=> [「field1」, 「field2」]
update hash 更新現有字段的內容,例如: 將「sample」字段的內容更新爲「Mynew message」: update=> { 「sample」 => 「My new message」 }
uppercase array 將字符串轉換爲大寫
4.4.4 JSON插件

JSON插件用於解碼JSON格式的字符串,通常是一堆日誌信息中,部分是JSON格式,部分不是的狀況下

(1)配置事例
json {
    source => ...
}
  • 事例配置,message是JSON格式的字符串:"{\"uid\":3081609001,\"type\":\"signal\"}"

    filter {
      json {
          source => "message"
          target => "jsoncontent"
      }
    }
  • 輸出結果:

    {
      "@version": "1",
      "@timestamp": "2014-11-18T08:11:33.000Z",
      "host": "web121.mweibo.tc.sinanode.com",
      "message": "{\"uid\":3081609001,\"type\":\"signal\"}",
      "jsoncontent": {
          "uid": 3081609001,
          "type": "signal"
      }
    }
  • 若是從事例配置中刪除target,輸出結果以下:

    {
      "@version": "1",
      "@timestamp": "2014-11-18T08:11:33.000Z",
      "host": "web121.mweibo.tc.sinanode.com",
      "message": "{\"uid\":3081609001,\"type\":\"signal\"}",
      "uid": 3081609001,
      "type": "signal"
    }
(2)經常使用配置參數(空 => 同上)
參數名稱 類型 默認值 描述信息
add_field
add_tag
enable_metric
id
periodic_flush
remove_field
remove_tag
skip_on_invalid_json boolean false 是否跳過驗證不經過的JSON
source string 必須設置項,指定須要解碼的JSON字符串字段
tag_on_failure
target string 解析以後的JSON對象所在的字段名稱,若是沒有,JSON對象的全部字段將掛在根節點下
4.4.5 elasticsearch查詢過濾插件

用於查詢Elasticsearch中的事件,可將查詢結果應用於當前事件中

經常使用配置參數(空 => 同上)
參數名稱 類型 默認值 描述信息
add_field
add_tag
ca_file string SSL Certificate Authority file path
enable_sort boolean true 是否對結果進行排序
fields array {} 從老事件中複製字段到新事件中,老事件來源於elasticsearch(用於查詢更新)
hosts array [「localhost:9200」] elasticsearch服務列表
index string 「」 用逗號分隔的elasticsearch索引列表,若是要操做全部全部使用「_all」或者「」,保存數據到elasticsearch時,若是索引不存在會自動以此建立
password string 密碼
periodic_flush
query string 查詢elasticsearch的查詢字符串
remove_field
remove_tag
result_size number 1 查詢elasticsearch時,返回結果的數量
sort string 「@timestamp:desc」 逗號分隔的「:」列表,用於查詢結果排序
ssl boolean false SSL
tag_on_failure
user string 用戶名
4.5 經常使用輸出插件(Output plugin)
4.5.1 ElasticSearch輸出插件

用於將事件信息寫入到Elasticsearch中,官方推薦插件,ELK必備插件

(1)配置事例
output {
    elasticsearch {
        hosts => ["127.0.0.1:9200"]
        index => "filebeat-%{type}-%{+yyyy.MM.dd}"
        template_overwrite => true
    }
}
(2)經常使用配置參數(空 => 同上)
參數名稱 類型 默認值 描述信息
absolute_healthcheck_path boolean false 當配置了「healthcheck_path」時,決定elasticsearch健康檢查URL是否按照絕對路徑配置。例如: elasticsearch訪問路徑爲:」http://localhost:9200/es「,「h... 當前參數爲true時的訪問路徑爲:」http://localhost:9200/es/heal... 當前參數爲false時的訪問路徑爲:」http://localhost:9200/health」
absolute_sniffing_path boolean false 當配置了「sniffing_path」時,決定elasticsearch的sniffing訪問路徑配置。例如: elasticsearch訪問路徑爲:「http://localhost:9200/es」,「s... 當前參數爲true時的訪問路徑爲:「http://localhost:9200/es/_sni... 當前參數爲false時的訪問路徑爲:「http://localhost:9200/_sniffing」
action string 「index」 對elasticsearch的操做類型,可用的操做類型: index:索引Logstash事件數據到elasticsearch; delete:根據id刪除文檔,id必須指定; delete:根據id刪除文檔,id必須指定; update:根據id更新文檔
cacert string .cer或者.pem證書文件路徑,使用證書進行elasticsearch認證
codec
doc_as_upsert boolean false 使update啓用upsert模式,即文檔不存在時建立新文檔
document_id string elasticsearch中的文檔id,用來覆蓋已經保存到elasticsearch中的文檔
document_type string 指定存入elasticsearch中的文檔的type,沒有指定的狀況下會使用Event信息中的「type」字段的值做爲elasticsearch的type
enable_metric
failure_type_logging_whitelist array [] elasricsearch報錯白名單,白名單的異常信息不會被記入logstash的log中,好比你想忽略掉全部的「document_already_exists_exception」異常
flush_size
healthcheck_path string 「/」 elasricsearch檢查狀態檢查路徑
hosts string [//127.0.0.1] elasticsearch服務地址列表,若是配置多個將啓用負載均衡
id
idle_flush_time number 1 間隔多長時間將數據輸出到elasticsearch中一次,主要用於較慢的事件
index string 「logstash-%{+YYYY.MM.dd}」 指定elasticsearch存儲數據時的全部名稱,支持變量引用,好比你能夠按天建立索引,方便刪除歷史數據或者查詢制定範圍內的數據
keystore string 用於指定密鑰庫路徑,能夠是.jks或者.p12
keystore_password string 密鑰庫密碼
manage_template boolean true 是否啓用elasticsearch模版,Logstash自帶一個模版,可是隻有名稱匹配「logstash-*」的索引纔會應用該默版
parameters hash 添加到elasticsearch URL後面的參數鍵值對
parent string 「nil」 爲文檔子節點指定父節點的id
password string elasticsearch集羣訪問密碼
path string 當設置了elasticsearch代理時用此參數從定向HTTP API,若是「hosts」中已經包含此路徑,則不須要設置
pipeline string 「nil」 設置Event管道
pool_max number 1000 elasticsearch最大鏈接數
pool_max_per_route number 100 每一個「endpoint」的最大鏈接數
proxy string 代理URL
resurrect_delay number 5 檢查掛掉的「endpoint」是否恢復正常的頻率
retry_initial_interval number 2 設置批量重試的時間間隔,重試到 「retry_max_interval」次
retry_max_interval number 64 Setmax interval in seconds between bulk retries.
retry_on_conflict number 1 Thenumber of times Elasticsearch should internally retry an update/upserteddocument
routing string 指定Event路由
script string 「」 設置「scriptedupdate」模式下的腳本名稱
script_lang string 「painless」 設置腳本語言
script_type 「inline」、「indexed」、 「file」 [「inline」] Definethe type of script referenced by 「script」 variable inline :」script」 contains inline script indexed : 「script」 containsthe name of script directly indexed in elasticsearch file : 「script」contains the name of script stored in elasticseach’s config directory
script_var_name string 「event」 Setvariable name passed to script (scripted update)
scripted_upsert boolean false ifenabled, script is in charge of creating non-existent document (scriptedupdate)
sniffing
sniffing_delay
sniffing_path
ssl
ssl_certificate_verification
template string 設置自定義的默版存放路徑
template_name string 「logstash」 設置使用的默版名稱
template_overwrite boolean false 是否始終覆蓋現有模版
timeout number 60 網絡超時時間
truststore string 「:truststore」或者「:cacert」證書庫路徑
truststore_password string 證書庫密碼
upsert string 「」 Setupsert content for update mode.s Create a new document with this parameter asjson string if document_id doesn’texists
user string 「」 elasticsearch用戶名
validate_after_inactivity number 10000 間隔多長時間保持鏈接可用
version string 存入elasticsearch的文檔的版本號
version_type 「internal」、「external」、 「external_gt」、 「external_gte」、「force」
workers string 1 whenwe no longer support the :legacy type This is hacky, but it can only be herne
4.5.2 Redis輸出插件

用於將Event寫入Redis中進行緩存,一般狀況下Logstash的Filter處理比較吃系統資源,複雜的Filter處理會很是耗時,若是Event產生速度比較快,可使用Redis做爲buffer使用

(1)配置事例
output {
    redis {
        host => "127.0.0.1"
        port => 6379
        data_type => "list"
        key => "logstash-list"
    }
}
(2)經常使用配置參數(空 => 同上)
參數名稱 類型 默認值 描述信息
batch boolean false 是否啓用redis的batch模式,僅在data_type=」list」時有效
batch_events number 50 batch大小,batch達到此大小時執行「RPUSH」
batch_timeout number 5 batch超時時間,超過這個時間執行「RPUSH」
codec
congestion_interval number 1 間隔多長時間檢查阻塞,若是設置爲0,則沒個Event檢查一次
congestion_threshold number 0
data_type 「list」、「channel」 存儲在redis中的數據類型,若是使用「list」,將採用「RPUSH」操做,若是是「channel」,將採用「PUBLISH」操做
db number 0 使用的redis數據庫編號
enable_metric
host array [「127.0.0.1」] redis服務列表,若是配置多個,將隨機選擇一個,若是當前的redis服務不可用,將選擇下一個
id
key string Thename of a Redis list or channel. Dynamic names are valid here, forexample logstash-%{type}.
password string redis服務密碼
port number 6379 redis服務監聽端口
reconnect_interval number 1 鏈接失敗時的重連間隔
shuffle_hosts boolean true Shufflethe host list during Logstash startup.
timeout number 5 redis鏈接超時時間
workers number 1 whenwe no longer support the :legacy type This is hacky, but it can only be herne
4.5.3 File輸出插件

用於將Event輸出到文件內

(1)配置事例
output {
    file {
        path => ...
        codec => line { format => "custom format: %{message}"}
    }
}
(2)經常使用配置參數(空 => 同上)
參數名稱 類型 默認值 描述信息
codec
create_if_deleted boolean true 若是目標文件被刪除,則在寫入事件時建立新文件
dir_mode number -1 設置目錄的訪問權限,若是爲「-1」,使用操做系統默認的訪問權限
enable_metric
file_mode number -1 設置文件的訪問權限,若是爲「-1」,使用操做系統默認的訪問權限
filename_failure string 「_filepath_failures」 若是指定的文件路徑無效,這會在目錄內建立這個文件並記錄數據
flush_interval number 2 flush間隔
gzip boolean false 是否啓用gzip壓縮
id
path string 必須設置項,文件輸出路徑,如:path =>」./test-%{+YYYY-MM-dd}.txt」
workers string 1 whenwe no longer support the :legacy type This is hacky, but it can only be herne
4.5.4 Kafka輸出插件

用於將Event輸出到Kafka指定的Topic中,官網Kafka詳情配置

(1)配置事例
output {
    kafka {
        bootstrap_servers => "localhost:9092"
        topic_id => "test"
        compression_type => "gzip"
    }
}
(2)經常使用配置參數(空 => 同上)
參數名稱 類型 默認值 描述信息
bootstrap_servers string Kafka集羣信息,格式爲 host1:port1,host2:port2
topic_id string 產生消息的主題
compression_type String none 生產者生成的全部數據的壓縮類型。默認爲無(即無壓縮)。有效值爲 none、gzip、snappy 或 lz4。
batch_size number 16384 配置以字節爲單位控制默認批處理大小
buffer_memory number 33554432(32MB) 生產者可用於緩衝等待發送到服務器的記錄的總內存字節數
max_request_size number 1048576(1MB) 請求的最大大小
flush_interval number 2 flush間隔
gzip boolean false 是否啓用gzip壓縮
id
path string 必須設置項,文件輸出路徑,如:path =>」./test-%{+YYYY-MM-dd}.txt」
workers string 1 whenwe no longer support the :legacy type This is hacky, but it can only be herne
4.6 經常使用編碼插件(Codec plugin)
4.6.1 JSON編碼插件

直接輸入預約義好的 JSON 數據,這樣就能夠省略掉 filter/grok 配置

  • 配置事例

    json {
    }
  • 經常使用配置參數

    參數名稱 類型 默認值 描述信息
    charset string 「UTF-8」 字符集
    enable_metric
    id

五 Logstash實例

5.1 接收Filebeat事件,輸出到Redis
input {
    beats {
        port => 5044
    }
}

output {
    redis {
        host => "127.0.0.1"
        port => 6379
        data_type => "list"
        key => "logstash-list"
    }
}
5.2 讀取Redis數據,根據「type」判斷,分別處理,輸出到ES
input {
    redis {
        host => "127.0.0.1"
        port => 6379
        data_type => "list"
        key => "logstash-list"
    }
}

filter {
    if [type] == "application" {
        grok {
            match => ["message", "(?m)-(?<systemName>.+?):(?<logTime>(?>\d\d){1,2}-(?:0?[1-9]|1[0-2])-(?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9]) (?:2[0123]|[01]?[0-9]):(?:[0-5][0-9]):(?:(?:[0-5][0-9]|60)(?:[:.,][0-9]+)?)) \[(?<level>(\b\w+\b)) *\] (?<thread>(\b\w+\b)) \((?<point>.*?)\) - (?<content>.*)"]
        }
        date {
            match => ["logTime", "yyyy-MM-dd HH:mm:ss,SSS"]
        }
        json {
            source => "message"
        }
        date {
            match => ["timestamp", "yyyy-MM-dd HH:mm:ss,SSS"]
        }
    }
    if [type] == "application_bizz" {
        json {
            source => "message"
        }
        date {
            match => ["timestamp", "yyyy-MM-dd HH:mm:ss,SSS"]
        }
    }
    mutate {
        remove_field => ["@version", "beat", "logTime"]
    }
}

output {
    stdout{
    }
    elasticsearch {
        hosts => ["127.0.0.1:9200"]
        index => "filebeat-%{type}-%{+yyyy.MM.dd}"
        document_type => "%{documentType}"
        template_overwrite => true
    }
}

六 應用場景

6.1 以logstash做爲日誌搜索器

架構:logstash採集、處理、轉發到elasticsearch存儲,在kibana進行展現

特色:這種結構由於須要在各個服務器上部署 Logstash,而它比較消耗 CPU 和內存資源,因此比較適合計算資源豐富的服務器,不然容易形成服務器性能降低,甚至可能致使沒法正常工做。

6.2 消息模式

消息模式:Beats 還不支持輸出到消息隊列(新版本除外:5.0版本及以上),因此在消息隊列先後兩端只能是 Logstash 實例。logstash從各個數據源蒐集數據,不通過任何處理轉換僅轉發出到消息隊列(kafka、redis、rabbitMQ等),後logstash從消息隊列取數據進行轉換分析過濾,輸出到elasticsearch,並在kibana進行圖形化展現

架構(Logstash進行日誌解析所在服務器性能各方面必需要足夠好):

模式特色:這種架構適合於日誌規模比較龐大的狀況。但因爲 Logstash 日誌解析節點和 Elasticsearch 的負荷比較重,可將他們配置爲集羣模式,以分擔負荷。引入消息隊列,均衡了網絡傳輸,從而下降了網絡閉塞,尤爲是丟失數據的可能性,但依然存在 Logstash 佔用系統資源過多的問題

工做流程:Filebeat採集—> logstash轉發到kafka—> logstash處理從kafka緩存的數據進行分析—> 輸出到es—> 顯示在kibana

6.3 logstash(非filebeat)進行文件採集,輸出到kafka緩存,讀取kafka數據並處理輸出到文件或es
6.4 logstash同步mysql數據庫數據到es(logstash5版本以上已集成jdbc插件,無需下載安裝,直接使用)

七 Logstash和Flume對比

首先從結構對比,咱們會驚人的發現,二者是多麼的類似!Logstash的Shipper、Broker、Indexer分別和Flume的Source、Channel、Sink各自對應!只不過是Logstash集成了,Broker能夠不須要,而Flume須要單獨配置,且缺一不可,但這再一次說明了計算機的設計思想都是通用的!只是實現方式會不一樣而已。

從程序員的角度來講,上文也提到過了,Flume是真的很繁瑣,你須要分別做source、channel、sink的手工配置,並且涉及到複雜的數據採集環境,你可能還要作多個配置,這在上面提過了,反過來講Logstash的配置就很是簡潔清晰,三個部分的屬性都定義好了,程序員本身去選擇就行,就算沒有,也能夠自行開發插件,很是方便。固然了,Flume的插件也不少,但Channel就只有內存和文件這兩種(其實如今不止了,但經常使用的也就兩種)。讀者能夠看得出來,二者其實配置都是很是靈活的,只不過看場景取捨罷了。

其實從做者和歷史背景來看,二者最初的設計目的就不太同樣。Flume自己最初設計的目的是爲了把數據傳入HDFS中(並非爲了採集日誌而設計,這和Logstash有根本的區別),因此理所應當側重於數據的傳輸,程序員要很是清楚整個數據的路由,而且比Logstash還多了一個可靠性策略,上文中的channel就是用於持久化目的,數據除非確認傳輸到下一位置了,不然不會刪除,這一步是經過事務來控制的,這樣的設計使得可靠性很是好。相反,Logstash則明顯側重對數據的預處理,由於日誌的字段須要大量的預處理,爲解析作鋪墊。

回過來看我當初爲何先講Logstash而後講Flume?這裏面有幾個考慮

  • 其一:Logstash其實更有點像通用的模型,因此對新人來講理解起來更簡單,而Flume這樣輕量級的線程,可能有必定的計算機編程基礎理解起來更好;
  • 其二:目前大部分的狀況下,Logstash用的更加多,這個數據我本身沒有統計過,可是根據經驗判斷,Logstash能夠和ELK其餘組件配合使用,開發、應用都會簡單不少,技術成熟,使用場景普遍。相反Flume組件就須要和其餘不少工具配合使用,場景的針對性會比較強,更不用提Flume的配置過於繁瑣複雜了。

最後總結下來,咱們能夠這麼理解他們的區別:

Logstash就像是買來的臺式機,主板、電源、硬盤,機箱(Logstash)把裏面的東西所有裝好了,你能夠直接用,固然也能夠本身組裝修改;

Flume就像提供給你一套完整的主板,電源、硬盤,Flume沒有打包,只是像說明書同樣指導你如何組裝,才能運行的起來。

參考文檔:

相關文章
相關標籤/搜索