【Logstash】node
[root@localhost ~]# wget https://artifacts.elastic.co/downloads/logstash/logstash-6.3.2.tar.gzlinux
[root@localhost ~]# tar zxvf logstash-6.3.2.tar.gz -C /usr/local/web
[root@localhost ~]# mv /usr/local/logstash-6.3.2/ /usr/local/logstashapache
[root@localhost ~]# mkdir /usr/local/logstash/config/etc/json
[root@localhost ~]# vim /usr/local/logstash/config/etc/test01.conf bootstrap
測試:小試牛刀,咱們先將kafka隊列消息輸出到終端,主要爲了測試filebeat生產web日誌。推送到kafka隊列,logstash端可否消費,很明顯,下圖所示,消費正常!vim
input { kafka { bootstrap_servers => "192.168.37.134:9092,192.168.37.135:9092,192.168.37.136:9092" topics => ["webapache"] codec => "json" } } output { stdout { codec => rubydebug } }
input接收源變成kafka,經過bootstrap_server和topics兩個選項指定了接收源kafka集羣以及主題屬性;由於咱們的logstash從kafka獲取的的日誌數據格式爲json,因此須要 在inout字段中加上codec=>"json"解析緩存
輸出端是stdout,表示標準輸出(從終端輸出),這裏的codex是個插件,表示格式,放在stout中表示輸出的格式,rubydebug是專門測試的格式,通常在用來在終端輸出json格式信息ruby
[root@localhost ~]#/usr/local/logstash/bin/logstash -f config/etc/test01.conf服務器
[root@localhost etc]# vim /usr/local/logstash/config/etc/httpd_log.conf #咱們配置將kafka消息存入到ES集羣中,
input { kafka { bootstrap_servers => "192.168.37.134:9092,192.168.37.135:9092,192.168.37.136:9092" topics => ["webapache"] codec => "json" } } output { elasticsearch { hosts => ["192.168.37.134:9200","192.168.37.135:9200","192.168.37.136:9200"] index => "webapachelog-%{+YYYY-MM-dd}" } }
root@localhost etc]# nohup /usr/local/logstash/bin/logstash -f httpd_log.conf &
詳解:上述能夠看到,
input接收源變成kafka,經過bootstrap_server和topics兩個選項指定了接收源kafka集羣以及主題屬性;由於咱們的logstash從kafka獲取的的日誌數據格式爲json,因此須要 在inout字段中加上codec=>"json"解析
在看輸出端,output輸出類型是elasticseach,經過hosts指定Elasticsearch集羣ip,將kafka消息隊列的數據存入elasticsearch中,最後經過index定義索引名稱;
[Kibana]
[root@localhost ~]# tar zxvf kibana-6.3.2-linux-x86_64.tar.gz -C /usr/local/
[root@localhost ~]# mv /usr/local/kibana-6.3.2-linux-x86_64/ /usr/local/kibana
[root@localhost ~]# cd /usr/local/kibana/config/
[root@localhost config]# egrep -v "#|^$" kibana.yml
server.port: 5601 #kibana默認端口 server.host: "192.168.37.136" #kibana綁定的IP地址,寫本地IP便可 elasticsearch.url: "http://192.168.37.134:9200" #kibana訪問Elasticsearch地址,能夠任意ES集羣節點IP,推薦寫Client node角色IP
kibana.index: ".kibana" #存儲kibana數據信息的索引
這裏只須要填寫「webapachelog-*」便可,會自動檢測ES索引對應的文件並抓取映射,前提是filebeat檢測的日誌消息通過kafka隊列消息存入 ES中;不然會失敗
選擇「@timestamp」進行時間排序
Kibana界面展現:
#Discover:主要用來進行日誌檢索,查詢數據
#Visualize:數據可視化,咱們能夠建立各類惟獨的可視化圖表,例如:面積圖,折線圖,餅圖 等
#Dashboard:儀盤表功能,儀盤表就是可視化圖表的組合,經過將各類可視化圖表組合到一塊兒,總體上了解數據和日誌的各類狀態
#Timeblion:時間畫像,能夠建立時間序列可視化圖形
#Dev Tools:調試工具控制檯,簡單的理解就是kibana提供與ES交互的平臺
#Management:管理界面,能夠在此建立索引模式,調整kibana設置等操做
總結:整個流程呢,我簡單的說一下(大白話說明)
1.首先是由filebeat檢索監控日誌,將監控的日誌路徑推送到kafka消息隊列中生成topics消息,這裏filebeat至關於生產者
2.kafka消息隊列將日誌消息存儲起來,這裏結合zookeeper集羣,並容許logstash拉取
3.Logstash充當消費者,向kafka集羣拉取日誌數據,並轉發存入Elasticsearch;
4.Kibana爲webUI展現頁面,與Elasticsearch進行交互,將日誌數據可視化的展示出來
根據上圖架構,從總體分爲兩個部分:即日誌收集,日誌檢索;細分以下描述
1.首先是生產日誌,filebeat收集業務服務器上的日誌數據,咱們只須要將其安裝在須要收集日誌服務器上便可,隨後將日誌數據實時推送到Kafka集羣中,kafka集羣會對日誌數據進行緩衝和存儲,簡稱緩存,哈哈,這裏的filebeat至關於kafka集羣中的producer(生產者)
2.filebeat推送到Kafka集羣以後,logstash會主動去kafka集羣中拉取數據,由於這樣consumer消費者能夠自主的控制消費速率和方式,從而減小消費過程當中出錯的概率;其實logstash拉取數據就是分爲input,filter和output,中間接受不規則的數據,過濾,分析並轉換成格式化數據最後輸出,
3.Logstash將格式化的數據中轉到Elasticsearch中進行存儲索引,全部的數據都會存儲到Elasticsearch集羣中
4.Kibana將elasticsearch中的數據在web GUI界面進行可視化展現