前面的時候,我由於後臺粉絲的一些問題,整理了一篇文章,將ELK三個技術進行詳細的講解,從原理到實踐,全面覆蓋,可是由於篇幅緣由,我分紅了兩篇進行整理,上篇主講EShtml
今天是其餘的個技術:Logstash+Kibana,中間穿插着講解Kafka應用,我的公衆號:Java架構師聯盟,每日更新技術好文node
話很少說,直接上正題git
Logstash是一款輕量級的日誌蒐集處理框架,能夠方便的把分散的、多樣化的日誌蒐集起來,並進行自定義的處理,而後傳輸到指定的位置,好比某個服務器或者文件。github
而在官網,對於Logstash的介紹更是完整,我這裏就展現一下官網的介紹web
輸入:採集各類樣式、大小和來源的數據redis
過濾器:實時解析和轉換數據apache
輸出:選擇你的存儲,導出你的數據json
而在官網的介紹中,最讓我興奮的就是可擴展性,Logstash 採用可插拔框架,擁有 200 多個插件。您能夠將不一樣的輸入選擇、過濾器和輸出選擇混合搭配、精心安排,讓它們在管道中和諧地運行。這也就意味着能夠用本身的方式建立和配置管道,就跟樂高積木同樣,我本身感受太爽了bootstrap
好了,理論的東西過一遍就好vim
ps:不過這也體現出官網在學習的過程當中的重要性,雖然都是英文的,可是,如今能夠翻譯的軟件的太多了,這不是問題
全部的技術,不本身實際操做一下是不能夠的,安裝上本身動手實踐一下,毛爺爺都說:實踐是檢驗真理的惟一標準,不得不誇獎一下Logstash的工程師,真的太人性化了,下載後直接解壓,就能夠了。
並且提供了不少的安裝方式供你選擇,舒服
開始咱們今天的第一個實踐吧,就像咱們剛開始學Java的時候,第一個命令就是helloworld,不知道各位還能不能手寫出來呢?來看一下logstash的第一個運行時怎麼處理的
經過命令行,進入到logstash/bin目錄,執行下面的命令:
input { kafka { type => "accesslogs" codec => "plain" auto_offset_reset => "smallest" group_id => "elas1" topic_id => "accesslogs" zk_connect => "172.16.0.11:2181,172.16.0.12:2181,172.16.0.13:2181" } kafka { type => "gamelogs" auto_offset_reset => "smallest" codec => "plain" group_id => "elas2" topic_id => "gamelogs" zk_connect => "172.16.0.11:2181,172.16.0.12:2181,172.16.0.13:2181" } } filter { if [type] == "accesslogs" { json { source => "message" remove_field => [ "message" ] target => "access" } } if [type] == "gamelogs" { mutate { split => { "message" => " " } add_field => { "event_type" => "%{message[3]}" "current_map" => "%{message[4]}" "current_X" => "%{message[5]}" "current_y" => "%{message[6]}" "user" => "%{message[7]}" "item" => "%{message[8]}" "item_id" => "%{message[9]}" "current_time" => "%{message[12]}" } remove_field => [ "message" ] } } } output { if [type] == "accesslogs" { elasticsearch { index => "accesslogs" codec => "json" hosts => ["172.16.0.14:9200", "172.16.0.15:9200", "172.16.0.16:9200"] } } if [type] == "gamelogs" { elasticsearch { index => "gamelogs" codec => plain { charset => "UTF-16BE" } hosts => ["172.16.0.14:9200", "172.16.0.15:9200", "172.16.0.16:9200"] } } }
能夠看到提示下面信息(這個命令稍後介紹),輸入hello world!
能夠看到logstash爲咱們自動添加了幾個字段:
時間戳:@ timestamp
版本:@ version
輸入的類型:type
主機名:host。
Logstash使用管道方式進行日誌的蒐集處理和輸出。有點相似*NIX系統的管道命令 xxx | ccc | ddd,xxx執行完了會執行ccc,而後執行ddd。
在logstash中,包括了三個階段:
輸入input --> 處理filter(不是必須的) --> 輸出output
每一個階段都有不少的插件配合工做,好比file、elasticsearch、redis等等。
每一個階段也能夠指定多種方式,好比輸出既能夠輸出到elasticsearch中,也能夠指定到stdout在控制檯打印。
因爲這種插件式的組織方式,使得logstash變得易於擴展和定製。
-f:經過這個命令能夠指定Logstash的配置文件,根據配置文件配置logstash
-e:後面跟着字符串,該字符串能夠被當作logstash的配置(若是是「」 則默認使用stdin做爲輸入,stdout做爲輸出)
-l:日誌輸出的地址(默認就是stdout直接在控制檯中輸出)
-t:測試配置文件是否正確,而後退出。
前面介紹過logstash基本上由三部分組成,input、output以及用戶須要才添加的filter,所以標準的配置文件格式以下:
input {...} filter {...} output {...}
在每一個部分中,也能夠指定多個訪問方式,例如我想要指定兩個日誌來源文件,則能夠這樣寫:
input { file { path =>"/var/log/messages" type =>"syslog"} file { path =>"/var/log/apache/access.log" type =>"apache"} }
相似的,若是在filter中添加了多種處理規則,則按照它的順序一一處理,可是有一些插件並非線程安全的。
好比在filter中指定了兩個同樣的的插件,這兩個任務並不能保證準確的按順序執行,所以官方也推薦避免在filter中重複使用插件。
說完這些,簡單的建立一個配置文件的小例子看看:
input { file { #指定監聽的文件路徑,注意必須是絕對路徑 path => "E:/software/logstash-1.5.4/logstash-1.5.4/data/test.log" start_position => beginning } } filter { } output { stdout {} }
日誌大體以下:注意最後有一個空行。
1 hello,this is first line in test.log! 2 hello,my name is xingoo! 3 goodbye.this is last line in test.log! 4
執行命令獲得以下信息:
這個插件能夠從指定的目錄或者文件讀取內容,輸入到管道處理,也算是logstash的核心插件了,大多數的使用場景都會用到這個插件,所以這裏詳細講述下各個參數的含義與使用。
在Logstash中能夠在 input{} 裏面添加file配置,默認的最小化配置以下:
input { file { path => "E:/software/logstash-1.5.4/logstash-1.5.4/data/*" } } filter { } output { stdout {} }
固然也能夠監聽多個目標文件:
input { file { path => ["E:/software/logstash-1.5.4/logstash-1.5.4/data/*","F:/test.txt"] } } filter { } output { stdout {} }
另外,處理path這個必須的項外,file還提供了不少其餘的屬性:
input { file { #監聽文件的路徑 path => ["E:/software/logstash-1.5.4/logstash-1.5.4/data/*","F:/test.txt"] #排除不想監聽的文件 exclude => "1.log" #添加自定義的字段 add_field => {"test"=>"test"} #增長標籤 tags => "tag1" #設置新事件的標誌 delimiter => "\n" #設置多長時間掃描目錄,發現新文件 discover_interval => 15 #設置多長時間檢測文件是否修改 stat_interval => 1 #監聽文件的起始位置,默認是end start_position => beginning #監聽文件讀取信息記錄的位置 sincedb_path => "E:/software/logstash-1.5.4/logstash-1.5.4/test.txt" #設置多長時間會寫入讀取的位置信息 sincedb_write_interval => 15 } } filter { } output { stdout {} }
其中值得注意的是:
1 path
是必須的選項,每個file配置,都至少有一個path
2 exclude
是不想監聽的文件,logstash會自動忽略該文件的監聽。配置的規則與path相似,支持字符串或者數組,可是要求必須是絕對路徑。
3 start_position
是監聽的位置,默認是end,即一個文件若是沒有記錄它的讀取信息,則從文件的末尾開始讀取,也就是說,僅僅讀取新添加的內容。對於一些更新的日誌類型的監聽,一般直接使用end就能夠了;相反,beginning就會從一個文件的頭開始讀取。可是若是記錄過文件的讀取信息,這個配置也就失去做用了。
4 sincedb_path
這個選項配置了默認的讀取文件信息記錄在哪一個文件中,默認是按照文件的inode等信息自動生成。其中記錄了inode、主設備號、次設備號以及讀取的位置。所以,若是一個文件僅僅是重命名,那麼它的inode以及其餘信息就不會改變,所以也不會從新讀取文件的任何信息。相似的,若是複製了一個文件,就至關於建立了一個新的inode,若是監聽的是一個目錄,就會讀取該文件的全部信息。
5 其餘的關於掃描和檢測的時間,按照默認的來就行了,若是頻繁建立新的文件,想要快速監聽,那麼能夠考慮縮短檢測的時間。
//6 add_field #這個技術感受挺六的,可是其實就是增長一個字段,例如: file { add_field => {"test"=>"test"} path => "D:/tools/logstash/path/to/groksample.log" start_position => beginning }
基於Logstash跑通Kafka仍是須要注意不少東西,最重要的就是理解Kafka的原理。
6.1. Logstash工做原理
因爲Kafka採用解耦的設計思想,並不是原始的發佈訂閱,生產者負責產生消息,直接推送給消費者。而是在中間加入持久化層——broker,生產者把數據存放在broker中,消費者從broker中取數據。這樣就帶來了幾個好處:
1 生產者的負載與消費者的負載解耦
2 消費者按照本身的能力fetch數據
3 消費者能夠自定義消費的數量
另外,因爲broker採用了主題topic-->分區的思想,使得某個分區內部的順序能夠保證有序性,可是分區間的數據不保證有序性。這樣,消費者能夠以分區爲單位,自定義讀取的位置——offset。
Kafka採用zookeeper做爲管理,記錄了producer到broker的信息,以及consumer與broker中partition的對應關係。所以,生產者能夠直接把數據傳遞給broker,broker經過zookeeper進行leader-->followers的選舉管理;消費者經過zookeeper保存讀取的位置offset以及讀取的topic的partition分區信息。
因爲上面的架構設計,使得生產者與broker相連;消費者與zookeeper相連。有了這樣的對應關係,就容易部署logstash-->kafka-->logstash的方案了。
接下來,按照下面的步驟就能夠實現logstash與kafka的對接了。
6.2. 啓動kafka
##啓動zookeeper: $zookeeper/bin/zkServer.sh start ##啓動kafka: $kafka/bin/kafka-server-start.sh $kafka/config/server.properties &
6.3. 建立主題
#建立主題: $kafka/bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --topic hello --replication-factor 1 --partitions 1 #查看主題: $kafka/bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe
6.4. 測試環境
#執行生產者腳本: $kafka/bin/kafka-console-producer.sh --broker-list 10.0.67.101:9092 --topic hello #執行消費者腳本,查看是否寫入: $kafka/bin/kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --from-beginning --topic hello
6.5. 向kafka中輸出數據
input{ stdin{} } output{ kafka{ topic_id => "hello" bootstrap_servers => "192.168.0.4:9092,172.16.0.12:9092" # kafka的地址 batch_size => 5 codec => plain { format => "%{message}" charset => "UTF-8" } } stdout{ codec => rubydebug } }
6.6. 從kafka中讀取數據
logstash配置文件:
input{ kafka { codec => "plain" group_id => "logstash1" auto_offset_reset => "smallest" reset_beginning => true topic_id => "hello" zk_connect => "192.168.0.5:2181" } } output{ stdout{ codec => rubydebug } }
7.1. 過濾插件grok組件
#日誌 55.3.244.1 GET /index.html 15824 0.043 bin/logstash -e ' input { stdin {} } filter { grok { match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" } } } output { stdout {codec => rubydebug} }'
7.2. 分割插件split
filter { mutate { split => { "message" => " " } add_field => { "event_type" => "%{message[3]}" "current_map" => "%{message[4]}" "current_X" => "%{message[5]}" "current_y" => "%{message[6]}" "user" => "%{message[7]}" "item" => "%{message[8]}" "item_id" => "%{message[9]}" "current_time" => "%{message[12]}" } remove_field => [ "message" ] } }
Logstash 早期曾經自帶了一個特別簡單的 logstash-web 用來查看 ES 中的數據。其功能太過簡單,因而產生了Kibana。不過是用PHP編寫,後來爲了知足更多的使用需求,懶人推進科技的進步嘛,而且Logstash使用ruby進行編寫,因此從新編寫Kibana,直到如今,Kibana由於重構,致使3,4某些狀況下不兼容,因此出現了一山容二虎的狀況,具體怎麼選擇,能夠根據業務場景進行實際分析
在Kibana衆多的優秀特性中,我我的最喜歡的是這一個特性,我起名叫包容性
由於在官網介紹中,Kibana能夠很是方便地把來自Logstash、ES-Hadoop、Beats或第三方技術的數據整合到Elasticsearch,支持的第三方技術包括Apache Flume、Fluentd等。這也就代表我在平常的開發工做中,對於技術選型和操做的時候,我能夠有更多的選擇,在開發時也能找到相應的開發實例,節省了大量的開發時間
ps:有一次體現了官網的重要性,真的,有時候官網能夠幫你解決大多數的問題,有時間能夠去看一下官網啊,好了,話很少說,看正題
下載安裝包後解壓
編輯文件config/kibana.yml ,配置屬性:
[root@H32 ~]# cd kibana/config/ [root@H32 config]# vim kibana.yml //添加: server.host: "192.168.80.32" elasticsearch.url: "http://172.16.0.14:9200"
先啓動ES,而後再啓動
cd /usr/local/kibana530bin/kibana
注意:
一、kibana必須是在root下運行,不然會報錯,啓動失敗
二、下載解壓安裝包,必定要裝與ES相同的版本
咱們將使用莎士比亞全集做爲咱們的示例數據。要更好的使用 Kibana,你須要爲本身的新索引應用一個映射集(mapping)。咱們用下面這個映射集建立"莎士比亞全集"索引。實際數據的字段比這要多,可是咱們只須要指定下面這些字段的映射就能夠了。注意到咱們設置了對 speaker 和 play_name 不分析。緣由會在稍後講明。
在終端運行下面命令:
curl -XPUT http://localhost:9200/shakespeare -d ' { "mappings" : { "_default_" : { "properties" : { "speaker" : {"type": "string", "index" : "not_analyzed" }, "play_name" : {"type": "string", "index" : "not_analyzed" }, "line_id" : { "type" : "integer" }, "speech_number" : { "type" : "integer" } } } } }
咱們這就建立好了索引。如今須要作的時導入數據。莎士比亞全集的內容咱們已經整理成了 elasticsearch 批量 導入所須要的格式,你能夠經過shakeseare.json下載。
用以下命令導入數據到你本地的 elasticsearch 進程中。
curl -XPUT localhost:9200/_bulk --data-binary @shakespeare.json
打開瀏覽器,訪問已經發布了 Kibana 的本地服務器。
若是你解壓路徑無誤(譯者注:使用 github 源碼的讀者記住發佈目錄應該是 kibana/src/ 裏面),你已經就能夠看到上面這個可愛的歡迎頁面。點擊 Sample Dashboard 連接
好了,如今顯示的就是你的 sample dashboard!若是你是用新的 elasticsearch 進程開始本教程的,你會看到一個百分比佔比很重的餅圖。這裏顯示的是你的索引中,文檔類型的狀況。如你所見,99% 都是 lines,只有少許的 acts 和scenes。
在下面,你會看到一長段 JSON 格式的莎士比亞詩文。
Kibana 容許使用者採用 Lucene Query String 語法搜索 Elasticsearch 中的數據。請求能夠在頁面頂部的請求輸入框中書寫。
在請求框中輸入以下內容。而後查看錶格中的前幾行內容。
friends, romans, countrymen
目前 Kibana 指向的是 Elasticsearch 一個特殊的索引叫 _all。 _all 能夠理解爲所有索引的大集合。目前你只有一個索引, shakespeare,但將來你會有更多其餘方面的索引,你確定不但願 Kibana 在你只想搜《麥克白》裏心愛的句子的時候還要搜索所有內容。
配置索引,點擊右上角的配置按鈕:
在這裏,你能夠設置你的索引爲 shakespeare ,這樣 Kibana 就只會搜索 shakespeare 索引的內容了。
這是由於 ES1.4 加強了權限管理。你須要在 ES 配置文件 elasticsearch.yml 中添加下列配置並重啓服務後才能正常訪問:
http.cors.enabled: true http.cors.allow-origin: "*"
記住 kibana3 頁面也要刷新緩存才行。
此外,若是你能夠很明確本身 kibana 之外沒有其餘 http 訪問,能夠把 kibana 的網址寫在http.cors.allow-origin 參數的值中。好比:
http.cors.allow-origin: "/https?:\/\/kbndomain/"
好了,到這裏就結束了,不知道有沒有收穫呀,有收穫的朋友給我點個贊吧~