強大的功能,豐富的插件,讓logstash在數據處理的行列中出類拔萃html
一般日誌數據除了要入ES提供實時展現和簡單統計外,還須要寫入大數據集羣來提供更爲深刻的邏輯處理,前邊幾篇ELK的文章介紹過利用logstash將kafka的數據寫入到elasticsearch集羣,這篇文章將會介紹如何經過logstash將數據寫入HDFSnode
本文全部演示均基於logstash 6.6.2版本nginx
logstash默認不支持數據直接寫入HDFS,官方推薦的output插件是webhdfs
,webhdfs使用HDFS提供的API將數據寫入HDFS集羣web
插件安裝比較簡單,直接使用內置命令便可json
# cd /home/opt/tools/logstash-6.6.2 # ./bin/logstash-plugin install logstash-output-webhdfs
HDFS集羣內經過主機名進行通訊因此logstash所在的主機須要配置hadoop集羣的hosts信息bootstrap
# cat /etc/hosts 192.168.107.154 master01 192.168.107.155 slave01 192.168.107.156 slave02 192.168.107.157 slave03
若是不配置host信息,可能會報下邊的錯api
[WARN ][logstash.outputs.webhdfs ] Failed to flush outgoing items
kafka裏邊的源日誌格式能夠參考這片文章:ELK日誌系統之使用Rsyslog快速方便的收集Nginx日誌ruby
logstash的配置以下:架構
# cat config/indexer_rsyslog_nginx.conf input { kafka { bootstrap_servers => "10.82.9.202:9092,10.82.9.203:9092,10.82.9.204:9092" topics => ["rsyslog_nginx"] codec => "json" } } filter { date { match => ["time_local","dd/MMM/yyyy:HH:mm:ss Z"] target => "time_local" } ruby { code => "event.set('index.date', event.get('time_local').time.localtime.strftime('%Y%m%d'))" } ruby { code => "event.set('index.hour', event.get('time_local').time.localtime.strftime('%H'))" } } output { webhdfs { host => "master01" port => 50070 user => "hadmin" path => "/logs/nginx/%{index.date}/%{index.hour}.log" codec => "json" } stdout { codec => rubydebug } }
logstash配置文件分爲三部分:input、filter、output運維
input指定源在哪裏,咱們是從kafka取數據,這裏就寫kafka集羣的配置信息,配置解釋:
filter能夠對input輸入的內容進行過濾或處理,例如格式化,添加字段,刪除字段等等
output指定處理過的日誌輸出到哪裏,能夠是ES或者是HDFS等等,能夠同時配置多個,webhdfs主要配置解釋:
webhdfs還有一些其餘的參數例如compression
,flush_size
,standby_host
,standby_port
等可查看官方文檔瞭解詳細用法
# bin/logstash -f config/indexer_rsyslog_nginx.conf
由於logstash配置中開了stdout
輸出,因此能在控制檯看到格式化的數據,以下:
{ "server_addr" => "172.18.90.17", "http_user_agent" => "Mozilla/5.0 (iPhone; CPU iPhone OS 10_2 like Mac OS X) AppleWebKit/602.3.12 (KHTML, like Gecko) Mobile/14C92 Safari/601.1 wechatdevtools/1.02.1902010 MicroMessenger/6.7.3 Language/zh_CN webview/ token/e7b92168159736c30401a55589317d8c", "remote_addr" => "172.18.101.0", "status" => 200, "http_referer" => "https://ops-coffee.cn/wx02935bb29080a7b4/devtools/page-frame.html", "upstream_response_time" => "0.056", "host" => "ops-coffee.cn", "request_uri" => "/api/community/v2/news/list", "request_time" => 0.059, "upstream_status" => "200", "@version" => "1", "http_x_forwarded_for" => "192.168.106.100", "time_local" => 2019-03-18T11:03:45.000Z, "body_bytes_sent" => 12431, "@timestamp" => 2019-03-18T11:03:45.984Z, "index.date" => "20190318", "index.hour" => "19", "request_method" => "POST", "upstream_addr" => "127.0.0.1:8181" }
查看hdfs發現數據已經按照定義好的路徑正常寫入
$ hadoop fs -ls /logs/nginx/20190318/19.log -rw-r--r-- 3 hadmin supergroup 7776 2019-03-18 19:07 /logs/nginx/20190318/19.log
至此kafka到hdfs數據轉儲完成
logstash在處理數據時會自動生成一個字段@timestamp
,默認狀況下這個字段存儲的是logstash收到消息的時間,使用的是UTC時區,會跟國內的時間差8小時
咱們output到ES或者HDFS時一般會使用相似於rsyslog-nginx-%{+YYYY.MM.dd}
這樣的變量來動態的設置index或者文件名,方便後續的檢索,這裏的變量YYYY
使用的就是@timestamp
中的時間,由於時區的問題生成的index或者文件名就差8小時不是很準確,這個問題在ELK架構中由於所有都是用的UTC時間且最終kibana展現時會自動轉換咱們無需關心,但這裏要生成文件就須要認真對待下了
這裏採用的方案是解析日誌中的時間字段time_local
,而後根據日誌中的時間字段添加兩個新字段index.date
和index.hour
來分別標識日期和小時,在output的時候使用這兩個新加的字段作變量來生成文件
logstash filter配置以下:
filter { # 匹配原始日誌中的time_local字段並設置爲時間字段 # time_local字段爲本地時間字段,沒有8小時的時間差 date { match => ["time_local","dd/MMM/yyyy:HH:mm:ss Z"] target => "time_local" } # 添加一個index.date字段,值設置爲time_local的日期 ruby { code => "event.set('index.date', event.get('time_local').time.localtime.strftime('%Y%m%d'))" } # 添加一個index.hour字段,值設置爲time_local的小時 ruby { code => "event.set('index.hour', event.get('time_local').time.localtime.strftime('%H'))" } }
output的path中配置以下
path => "/logs/nginx/%{index.date}/%{index.hour}.log"
在沒有指定codec的狀況下,logstash會給每一條日誌添加時間和host字段,例如:
源日誌格式爲
ops-coffee.cn | 192.168.105.91 | 19/Mar/2019:14:28:07 +0800 | GET / HTTP/1.1 | 304 | 0 | - | 0.000
通過logstash處理後多了時間和host字段
2019-03-19T06:28:07.510Z %{host} ops-coffee.cn | 192.168.105.91 | 19/Mar/2019:14:28:07 +0800 | GET / HTTP/1.1 | 304 | 0 | - | 0.000
若是不須要咱們能夠指定最終的format只取message,解決方法爲在output中添加以下配置:
codec => line { format => "%{message}" }
文章未完,所有內容請關注公衆號【運維咖啡吧】或我的網站https://ops-coffee.cn查看,運維咖啡吧專一於原創精品內容分享,感謝您的支持