Logstash技術入門

Logstash簡介

Logstash是一個接收,處理,轉發日誌的工具。支持系統日誌,webserver日誌,錯誤日誌,應用日誌,總之包括全部能夠拋出來的日誌類型。怎麼樣聽起來挺厲害的吧?php

依賴條件:JAVAcss

Logstash運行僅僅依賴java運行環境(jre)。各位能夠在命令行下運行java -version命令java

啓動和運行Logstash的兩條命令示例

第一步咱們先下載Logstashweb

curl -O https://artifacts.elastic.co/downloads/logstash/logstash-6.4.0.tar.gz

如今你應該有了一個叫logstash-6.4.0.tar.gz的文件了。 咱們把它解壓一下面試

tar zxvf logstash-6.4.0.tar.gz
cd logstash-6.4.0

如今咱們來運行一下:redis

bin/logstash -e 'input { stdin { } } output { stdout {} }'

咱們如今能夠在命令行下輸入一些字符,而後咱們將看到logstash的輸出內容:shell

hello world
2013-11-21T01:22:14.405+0000 0.0.0.0 hello world

 以上例子咱們在運行logstash中,定義了一個叫"stdin"的input還有一個"stdout"的output,不管咱們輸入什麼字符,Logstash都會按照某種格式來返回咱們輸入的字符。這裏注意咱們在命令行中使用了 -e 參數,該參數容許Logstash直接經過命令行接受設置。這點尤爲快速的幫助咱們反覆的測試配置是否正確而不用寫配置文件。數據庫

讓咱們再試個更有意思的例子。首先咱們在命令行下使用CTRL-C命令退出以前運行的Logstash。如今咱們從新運行Logstash使用下面的命令:apache

bin/logstash -e 'input { stdin { } } output { stdout { codec => rubydebug } }'

咱們再輸入一些字符,此次咱們輸入"goodnight moon":編程

goodnight moon
{
   "message" => 
   "goodnight moon",
   "@timestamp" => "2013-11-20T23:48:05.335Z",
   "@version" => "1",
   "host" => "my-laptop"
}

以上示例經過從新設置了叫"stdout"的output(添加了"codec"參數),咱們就能夠改變Logstash的輸出表現。相似的咱們能夠經過在你的配置文件中添加或者修改inputs、outputs、filters,就可使隨意的格式化日誌數據成爲可能,從而訂製更合理的存儲格式爲查詢提供便利。

使用Elasticsearch存儲日誌

如今,你也許會說:"它看起來還挺高大上的,不過手工輸入字符,並把字符從控制檯回顯出來。實際狀況並不實用"。說的好,那麼接下來咱們將創建Elasticsearch來存儲輸入到Logstash的日誌數據。若是你尚未安裝Elasticsearch,你能夠 下載RPM/DEB包 或者手動下載tar包,經過如下命令:

curl -O https://download.elasticsearch.org/elasticsearch/elasticsearch/elasticsearch-6.4.0.tar.gz
tar zxvf elasticsearch-6.4.0.tar.gz
cd elasticsearch-6.4.0/
./bin/elasticsearch
注意

本篇文章實例使用Logstash 6.4.0和Elasticsearch 6.4.0。不一樣的Logstash版本都有對應的建議Elasticsearch版本。請確認你使用的Logstash版本!

更多有關安裝和設置Elasticsearch的信息能夠參考 Elasticsearch官網 。由於咱們主要介紹Logstash的入門使用,Elasticsearch默認的安裝和配置就已經知足咱們要求。

言歸正專,如今Elasticsearch已經運行並監聽9200端口了,經過簡單的設置Logstash就可使用Elasticsearch做爲它的後端。默認的配置對於Logstash和Elasticsearch已經足夠,咱們忽略一些額外的選項來設置elasticsearch做爲output:

bin/logstash -e 'input { stdin { } } output { elasticsearch { host => localhost } }'

隨意的輸入一些字符,Logstash會像以前同樣處理日誌(不過此次咱們將不會看到任何的輸出,由於咱們沒有設置stdout做爲output選項)

you know, for logs

咱們可使用curl命令發送請求來查看ES是否接收到了數據:

curl 'http://localhost:9200/_search?pretty'

返回內容以下:

{
  "took" : 2,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  },
  "hits" : {
    "total" : 1,
    "max_score" : 1.0,
    "hits" : [ {
      "_index" : "logstash-2013.11.21",
      "_type" : "logs",
      "_id" : "2ijaoKqARqGvbMgP3BspJA",
      "_score" : 1.0, "_source" : {"message":"you know, for logs","@timestamp":"2013-11-21T18:45:09.862Z","@version":"1","host":"my-laptop"}
    } ]
  }}

恭喜,至此你已經成功利用Elasticsearch和Logstash來收集日誌數據了。

Elasticsearch 插件(題外話)

這裏介紹另一個對於查詢你的Logstash數據(Elasticsearch中數據 )很是有用的工具叫Elasticsearch-kopf插件。更多的信息請見 Elasticsearch插件 。安裝elasticsearch-kopf,只要在你安裝Elasticsearch的目錄中執行如下命令便可:

bin/plugin -install lmenezes/elasticsearch-kopf

接下來訪問 http://localhost:9200/_plugin/kopf 來瀏覽保存在Elasticsearch中的數據,設置及映射!

多重輸出

做爲一個簡單的例子來設置多重輸出,讓咱們同時設置stdout和elasticsearch做爲output來從新運行一下Logstash,以下:

bin/logstash -e 'input { stdin { } } output { elasticsearch { host => localhost } stdout { } }'

當咱們輸入了一些詞組以後,這些輸入的內容回回顯到咱們的終端,同時還會保存到Elasticsearch!(可使用curl和kopf插件來驗證)。

默認配置 - 按照每日日期創建索引

你將發現Logstash能夠足夠靈巧的在Elasticsearch上創建索引... 天天會按照默認格式是logstash-YYYY.MM.DD來創建索引。在午夜(GMT),Logstash自動按照時間戳更新索引。咱們能夠根據追溯多長時間的數據做爲依據來制定保持多少數據,固然你也能夠把比較老的數據遷移到其餘的地方(從新索引)來方便查詢,此外若是僅僅是簡單的刪除一段時間數據咱們可使用 Elasticsearch Curator 。

接下來

接下來咱們開始瞭解更多高級的配置項。在下面的章節,咱們着重討論logstash一些核心的特性,以及如何和logstash引擎交互的。

事件的生命週期

Inputs,Outputs,Codecs,Filters構成了Logstash的核心配置項。Logstash經過創建一條事件處理的管道,從你的日誌提取出數據保存到Elasticsearch中,爲高效的查詢數據提供基礎。爲了讓你快速的瞭解Logstash提供的多種選項,讓咱們先討論一下最經常使用的一些配置。更多的信息,請參考 Logstash事件管道 。

Inputs

input 及輸入是指日誌數據傳輸到Logstash中。其中常見的配置以下:

  • file:從文件系統中讀取一個文件,很像UNIX命令 "tail -0a"
  • syslog:監聽514端口,按照RFC3164標準解析日誌數據
  • redis:從redis服務器讀取數據,支持channel(發佈訂閱)和list模式。redis通常 在Logstash消費集羣中 做爲"broker"角色,保存events隊列共Logstash消費。
  • lumberjack:使用lumberjack協議來接收數據,目前已經改成 logstash-forwarder。

Filters

Fillters 在Logstash處理鏈中擔任中間處理組件。他們常常被組合起來實現一些特定的行爲來,處理匹配特定規則的事件流。常見的filters以下:

  • grok:解析無規則的文字並轉化爲有結構的格式。Grok 是目前最好的方式來將無結構的數據轉換爲有結構可查詢的數據。有120多種匹配規則,會有一種知足你的須要。
  • mutate:mutate filter 容許改變輸入的文檔,你能夠從命名,刪除,移動或者修改字段在處理事件的過程當中。
  • drop:丟棄一部分events不進行處理,例如:debug events。
  • clone:拷貝 event,這個過程當中也能夠添加或移除字段。
  • geoip:添加地理信息(爲前臺kibana圖形化展現使用)

Outputs

outputs是logstash處理管道的最末端組件。一個event能夠在處理過程當中通過多重輸出,可是一旦全部的outputs都執行結束,這個event也就完成生命週期。一些經常使用的outputs包括:

  • elasticsearch:若是你計劃將高效的保存數據,而且可以方便和簡單的進行查詢...Elasticsearch是一個好的方式。是的,此處有作廣告的嫌疑,呵呵。
  • file:將event數據保存到文件中。
  • graphite:將event數據發送到圖形化組件中,一個很流行的開源存儲圖形化展現的組件。 http://graphite.wikidot.com/ 。
  • statsd:statsd是一個統計服務,好比技術和時間統計,經過udp通信,聚合一個或者多個後臺服務,若是你已經開始使用statsd,該選項對你應該頗有用。

Codecs

codecs 是基於數據流的過濾器,它能夠做爲input,output的一部分配置。Codecs能夠幫助你輕鬆的分割發送過來已經被序列化的數據。流行的codecs包括 json,msgpack,plain(text)。

  • json:使用json格式對數據進行編碼/解碼
  • multiline:將匯多個事件中數據彙總爲一個單一的行。好比:java異常信息和堆棧信息

獲取完整的配置信息,請參考 Logstash文檔 中 "plugin configuration"部分。


更多有趣Logstash內容

使用配置文件

使用-e參數 在命令行中指定配置是很經常使用的方式,不過若是須要配置更多設置則須要很長的內容。這種狀況,咱們首先建立一個簡單的配置文件,而且指定logstash使用這個配置文件。如咱們建立一個文件名是"logstash-simple.conf"的配置文件而且保存在和Logstash相同的目錄中。內容以下:

input { stdin { } }
output {
elasticsearch { host => localhost }
stdout { codec => rubydebug }
}

接下來,執行命令:

bin/logstash -f logstash-simple.conf

咱們看到logstash按照你剛剛建立的配置文件來運行例子,這樣更加的方便。注意,咱們使用-f參數來從文件獲取而代替以前使用-e參數從命令行中獲取配置。以上演示很是簡單的例子,固然解析來咱們繼續寫一些複雜一些的例子。

過濾器

filters是一個行處理機制將提供的爲格式化的數據整理成你須要的數據,讓咱們看看下面的一個例子,叫grok filter的過濾器。

input { stdin { } }

filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
date {
match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
}
}

output {
elasticsearch { host => localhost }
stdout { codec => rubydebug }
}

執行Logstash按照以下參數:

bin/logstash -f logstash-filter.conf

如今粘貼下面一行信息到你的終端(固然Logstash就會處理這個標準的輸入內容):

127.0.0.1 - - [11/Dec/2013:00:01:45 -0800] "GET /xampp/status.php HTTP/1.1" 200 3891 "http://cadenza/xampp/navi.php" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:25.0) Gecko/20100101 Firefox/25.0"

你將看到相似以下內容的反饋信息:

{
      "message" => "127.0.0.1 - - [11/Dec/2013:00:01:45 -0800] \"GET /xampp/status.php HTTP/1.1\" 200 3891 \"http://cadenza/xampp/navi.php\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:25.0) Gecko/20100101 Firefox/25.0\"",
    "@timestamp" => "2013-12-11T08:01:45.000Z",
     "@version" => "1",
        "host" => "cadenza",
    "clientip" => "127.0.0.1",
       "ident" => "-",
        "auth" => "-",
    "timestamp" => "11/Dec/2013:00:01:45 -0800",
        "verb" => "GET",
      "request" => "/xampp/status.php",
   "httpversion" => "1.1",
     "response" => "200",
       "bytes" => "3891",
     "referrer" => "\"http://cadenza/xampp/navi.php\"",
       "agent" => "\"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:25.0) Gecko/20100101 Firefox/25.0\""

正像你看到的那樣,Logstash(使用了grok過濾器)可以將一行的日誌數據(Apache的"combined log"格式)分割設置爲不一樣的數據字段。這一點對於往後解析和查詢咱們本身的日誌數據很是有用。好比:HTTP的返回狀態碼,IP地址相關等等,很是的容易。不多有匹配規則沒有被grok包含,因此若是你正嘗試的解析一些常見的日誌格式,或許已經有人爲了作了這樣的工做。若是查看詳細匹配規則,參考 logstash grok patterns 。

另一個過濾器是date filter。這個過濾器來負責解析出來日誌中的時間戳並將值賦給timestame字段(無論這個數據是何時收集到logstash的)。你也許注意到在這個例子中@timestamp字段是設置成December 11, 2013, 說明logstash在日誌產生以後一段時間進行處理的。這個字段在處理日誌中回添到數據中的,舉例來講... 這個值就是logstash處理event的時間戳。


實用的例子

Apache 日誌(從文件獲取)

如今,讓咱們使用一些很是實用的配置... apache2訪問日誌!咱們將從本地讀取日誌文件,而且經過條件設置處理知足咱們須要的event。首先,咱們建立一個文件名是logstash-apache.conf的配置文件,內容以下(你能夠根據實際狀況修改你的文件名和路徑):

input {
file {
path => "/tmp/access_log"
start_position => beginning
}
}

filter {
if [path] =~ "access" {
mutate { replace => { "type" => "apache_access" } }
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
}
date {
match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
}
}

output {
elasticsearch {
host => localhost
}

stdout { codec => rubydebug }
}

接下來,咱們按照上面的配置建立一個文件(在例子中是"/tmp/access.log"),能夠將下面日誌信息做爲文件內容(也能夠用你本身的webserver產生的日誌):

71.141.244.242 - kurt [18/May/2011:01:48:10 -0700] "GET /admin HTTP/1.1" 301 566 "-" "Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US; rv:1.9.2.3) Gecko/20100401 Firefox/3.6.3"
134.39.72.245 - - [18/May/2011:12:40:18 -0700] "GET /favicon.ico HTTP/1.1" 200 1189 "-" "Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 5.1; Trident/4.0; .NET CLR 2.0.50727; .NET CLR 3.0.4506.2152; .NET CLR 3.5.30729; InfoPath.2; .NET4.0C; .NET4.0E)"
98.83.179.51 - - [18/May/2011:19:35:08 -0700] "GET /css/main.css HTTP/1.1" 200 1837 "http://www.safesand.com/information.htm" "Mozilla/5.0 (Windows NT 6.0; WOW64; rv:2.0.1) Gecko/20100101 Firefox/4.0.1"

如今使用-f參數來執行一下上面的例子:

bin/logstash -f logstash-apache.conf

你能夠看到apache的日誌數據已經導入到ES中了。這裏logstash會按照你的配置讀取,處理指定的文件,任何後添加到文件的內容也會被捕獲處理最後保存到ES中。此外,數據中type的字段值會被替換成"apache_access"(這個功能在配置中已經指定)。

這個配置只是讓Logstash監控了apache access_log,可是在實際中每每並不夠用可能還須要監控error_log,只要在上面的配置中改變一行既能夠實現,以下:

input {
file {
path => "/tmp/*_log"...

如今你能夠看到logstash處理了error日誌和access日誌。然而,若是你檢查了你的數據(也許用elasticsearch-kopf ),你將發現access_log日誌被分紅不一樣的字段,可是error_log確沒有這樣。這是由於咱們使用了「grok」filter並僅僅配置匹配combinedapachelog日誌格式,這樣知足條件的日誌就會自動的被分割成不一樣的字段。咱們能夠經過控制日誌按照它本身的某種格式來解析日誌,不是很好的嗎?對吧。

此外,你也許還會發現Logstash不會重複處理文件中已經處理過得events。由於Logstash已經記錄了文件處理的位置,這樣就只處理文件中新加入的行數。漂亮!

條件判斷

咱們利用上一個例子來介紹一下條件判斷的概念。這個概念通常狀況下應該被大多數的Logstash用戶熟悉掌握。你能夠像其餘普通的編程語言同樣來使用if,else if和else語句。讓咱們把每一個event依賴的日誌文件類型都標記出來(access_log,error_log其餘以log結尾的日誌文件)。

input {
file {
path => "/tmp/*_log"
}
}


filter {
if [path] =~ "access" {
mutate { replace => { type => "apache_access" } }
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
date {
match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
}

} else if [path] =~ "error" {
mutate { replace => { type => "apache_error" } }
} else {
mutate { replace => { type => "random_logs" } }
}
}


output {
elasticsearch { host => localhost }
stdout { codec => rubydebug }

}

我想你已經注意到了,咱們使用"type"字段來標記每一個event,可是咱們實際上沒有解析"error"和」random"類型的日誌... 而實際狀況下可能會有不少不少類型的錯誤日誌,如何解析就做爲練習留給各位讀者吧,你能夠依賴已經存在的日誌。

Syslog

Ok,如今咱們繼續瞭解一個很實用的例子:syslog。Syslog對於Logstash是一個很長用的配置,而且它有很好的表現(協議格式符合RFC3164)。Syslog其實是UNIX的一個網絡日誌標準,由客戶端發送日誌數據到本地文件或者日誌服務器。在這個例子中,你根本不用創建syslog實例;咱們經過命令行就能夠實現一個syslog服務,經過這個例子你將會看到發生什麼。

首先,讓咱們建立一個簡單的配置文件來實現logstash+syslog,文件名是 logstash-syslog.conf

input {
tcp {
port => 5000
type => syslog
}

udp {
port => 5000
type => syslog
}
}


filter {
if [type] == "syslog" {
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:[%{POSINT:syslog_pid}])?: %{GREEDYDATA:syslog_message}" }
add_field => [ "received_at", "%{@timestamp}" ]
add_field => [ "received_from", "%{host}" ]
}

syslog_pri { }
date {
match => [ "syslog_timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
}
}
}


output {
elasticsearch { host => localhost }
stdout { codec => rubydebug }
}

執行logstash:

bin/logstash -f logstash-syslog.conf

一般,須要一個客戶端連接到Logstash服務器上的5000端口而後發送日誌數據。在這個簡單的演示中咱們簡單的使用telnet連接到logstash服務器發送日誌數據(與以前例子中咱們在命令行標準輸入狀態下發送日誌數據相似)。首先咱們打開一個新的shell窗口,而後輸入下面的命令:

telnet localhost 5000

你能夠複製粘貼下面的樣例信息(固然也可使用其餘字符,不過這樣可能會被grok filter不能正確的解析):

Dec 23 12:11:43 louis postfix/smtpd[31499]: connect from unknown[95.75.93.154]
Dec 23 14:42:56 louis named[16000]: client 199.48.164.7#64817: query (cache) 'amsterdamboothuren.com/MX/IN' deniedDec 23 14:30:01 louis CRON[619]: (www-data) CMD (php /usr/share/cacti/site/poller.php >/dev/null 2\>/var/log/cacti/poller-error.log)
Dec 22 18:28:06 louis rsyslogd: [origin software="rsyslogd" swVersion="4.2.0" x-pid="2253" x-info="http://www.rsyslog.com"] rsyslogd was HUPed, type 'lightweight'.

以後你能夠在你以前運行Logstash的窗口中看到輸出結果,信息被處理和解析!

{

         "message" => "Dec 23 14:30:01 louis CRON[619]: (www-data) CMD (php /usr/share/cacti/site/poller.php >/dev/null 2>/var/log/cacti/poller-error.log)",

        "@timestamp" => "2013-12-23T22:30:01.000Z",

        "@version" => "1",

          "type" => "syslog",

          "host" => "0:0:0:0:0:0:0:1:52617",

    "syslog_timestamp" => "Dec 23 14:30:01",

     "syslog_hostname" => "louis",

      "syslog_program" => "CRON",

        "syslog_pid" => "619",

      "syslog_message" => "(www-data) CMD (php /usr/share/cacti/site/poller.php >/dev/null 2>/var/log/cacti/poller-error.log)",

       "received_at" => "2013-12-23 22:49:22 UTC",

       "received_from" => "0:0:0:0:0:0:0:1:52617",

  "syslog_severity_code" => 5,

  "syslog_facility_code" => 1,

     "syslog_facility" => "user-level",

     "syslog_severity" => "notice"

}

看到這裏你已經成爲一個合格的Logstash用戶了。你將能夠輕鬆的配置,運行Logstash,還能夠發送event給Logstash,可是這個過程隨着使用還會有不少值得深挖的地方。

參考:Logstash官方網站文檔,多數細節來源於官方文檔描述,小編加以翻譯與我的理解整理而成。

歡迎你們關注個人微信公衆號【民工哥技術之路】,最新整理的 2TB 技術乾貨:包括架構師實戰教程、大數據、Docker容器、系統運維、數據庫、redis、MogoDB、電子書、Java基礎課程、Java實戰項目、ELK Stack、機器學習、BAT面試精講視頻等。只需在「 民工哥技術之路」微信公衆號對話框回覆關鍵字:1024便可獲取所有資料。

相關文章
相關標籤/搜索